polars_time/
upsample.rs

1#[cfg(feature = "timezones")]
2use polars_core::datatypes::time_zone::parse_time_zone;
3use polars_core::prelude::*;
4use polars_core::utils::accumulate_dataframes_vertical_unchecked;
5use polars_ops::prelude::*;
6use polars_ops::series::SeriesMethods;
7
8use crate::prelude::*;
9
10pub trait PolarsUpsample {
11    /// Upsample a [`DataFrame`] at a regular frequency.
12    ///
13    /// # Arguments
14    /// * `by` - First group by these columns and then upsample for every group
15    /// * `time_column` - Will be used to determine a date_range.
16    ///   Note that this column has to be sorted for the output to make sense.
17    /// * `every` - interval will start 'every' duration
18    /// * `offset` - change the start of the date_range by this offset.
19    ///
20    /// The `every` and `offset` arguments are created with
21    /// the following string language:
22    /// - 1ns   (1 nanosecond)
23    /// - 1us   (1 microsecond)
24    /// - 1ms   (1 millisecond)
25    /// - 1s    (1 second)
26    /// - 1m    (1 minute)
27    /// - 1h    (1 hour)
28    /// - 1d    (1 calendar day)
29    /// - 1w    (1 calendar week)
30    /// - 1mo   (1 calendar month)
31    /// - 1q    (1 calendar quarter)
32    /// - 1y    (1 calendar year)
33    /// - 1i    (1 index count)
34    ///
35    /// Or combine them:
36    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
37    ///
38    /// By "calendar day", we mean the corresponding time on the next
39    /// day (which may not be 24 hours, depending on daylight savings).
40    /// Similarly for "calendar week", "calendar month", "calendar quarter",
41    /// and "calendar year".
42    fn upsample<I: IntoVec<PlSmallStr>>(
43        &self,
44        by: I,
45        time_column: &str,
46        every: Duration,
47    ) -> PolarsResult<DataFrame>;
48
49    /// Upsample a [`DataFrame`] at a regular frequency.
50    ///
51    /// Similar to [`upsample`][PolarsUpsample::upsample], but order of the
52    /// DataFrame is maintained when `by` is specified.
53    ///
54    /// # Arguments
55    /// * `by` - First group by these columns and then upsample for every group
56    /// * `time_column` - Will be used to determine a date_range.
57    ///   Note that this column has to be sorted for the output to make sense.
58    /// * `every` - interval will start 'every' duration
59    /// * `offset` - change the start of the date_range by this offset.
60    ///
61    /// The `every` and `offset` arguments are created with
62    /// the following string language:
63    /// - 1ns   (1 nanosecond)
64    /// - 1us   (1 microsecond)
65    /// - 1ms   (1 millisecond)
66    /// - 1s    (1 second)
67    /// - 1m    (1 minute)
68    /// - 1h    (1 hour)
69    /// - 1d    (1 calendar day)
70    /// - 1w    (1 calendar week)
71    /// - 1mo   (1 calendar month)
72    /// - 1q    (1 calendar quarter)
73    /// - 1y    (1 calendar year)
74    /// - 1i    (1 index count)
75    ///
76    /// Or combine them:
77    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
78    ///
79    /// By "calendar day", we mean the corresponding time on the next
80    /// day (which may not be 24 hours, depending on daylight savings).
81    /// Similarly for "calendar week", "calendar month", "calendar quarter",
82    /// and "calendar year".
83    fn upsample_stable<I: IntoVec<PlSmallStr>>(
84        &self,
85        by: I,
86        time_column: &str,
87        every: Duration,
88    ) -> PolarsResult<DataFrame>;
89}
90
91impl PolarsUpsample for DataFrame {
92    fn upsample<I: IntoVec<PlSmallStr>>(
93        &self,
94        by: I,
95        time_column: &str,
96        every: Duration,
97    ) -> PolarsResult<DataFrame> {
98        let by = by.into_vec();
99        let time_type = self.column(time_column)?.dtype();
100        ensure_duration_matches_dtype(every, time_type, "every")?;
101        upsample_impl(self, by, time_column, every, false)
102    }
103
104    fn upsample_stable<I: IntoVec<PlSmallStr>>(
105        &self,
106        by: I,
107        time_column: &str,
108        every: Duration,
109    ) -> PolarsResult<DataFrame> {
110        let by = by.into_vec();
111        let time_type = self.column(time_column)?.dtype();
112        ensure_duration_matches_dtype(every, time_type, "every")?;
113        upsample_impl(self, by, time_column, every, true)
114    }
115}
116
117fn upsample_impl(
118    source: &DataFrame,
119    by: Vec<PlSmallStr>,
120    index_column: &str,
121    every: Duration,
122    stable: bool,
123) -> PolarsResult<DataFrame> {
124    let s = source.column(index_column)?;
125    let original_type = s.dtype();
126
127    let needs_cast = matches!(
128        original_type,
129        DataType::Date | DataType::UInt32 | DataType::UInt64 | DataType::Int32 | DataType::Int64
130    );
131
132    let mut df = source.clone();
133
134    if needs_cast {
135        df.try_apply(index_column, |s| match s.dtype() {
136            #[cfg(feature = "dtype-date")]
137            DataType::Date => s.cast(&DataType::Datetime(TimeUnit::Microseconds, None)),
138            DataType::UInt32 | DataType::UInt64 | DataType::Int32 => s
139                .cast(&DataType::Int64)?
140                .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
141            DataType::Int64 => s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
142            _ => Ok(s.clone()),
143        })?;
144    }
145
146    let mut out = upsample_core(&df, by, index_column, every, stable)?;
147
148    if needs_cast {
149        out.try_apply(index_column, |s| s.cast(original_type))?;
150    }
151
152    Ok(out)
153}
154
155fn upsample_core(
156    source: &DataFrame,
157    by: Vec<PlSmallStr>,
158    index_column: &str,
159    every: Duration,
160    stable: bool,
161) -> PolarsResult<DataFrame> {
162    if by.is_empty() {
163        let index_column = source.column(index_column)?;
164        return upsample_single_impl(source, index_column.as_materialized_series(), every);
165    }
166
167    let source_schema = source.schema();
168
169    let group_keys_df = source.select(by)?;
170    let group_keys_schema = group_keys_df.schema();
171
172    let groups = if stable {
173        group_keys_df.group_by_stable(group_keys_schema.iter_names_cloned())
174    } else {
175        group_keys_df.group_by(group_keys_schema.iter_names_cloned())
176    }?
177    .into_groups();
178
179    let non_group_keys_df = unsafe {
180        source.select_unchecked(
181            source_schema
182                .iter_names()
183                .filter(|name| !group_keys_schema.contains(name.as_str())),
184        )?
185    };
186
187    let upsample_index_col_idx: Option<usize> = non_group_keys_df.schema().index_of(index_column);
188
189    // don't parallelize this, this may SO on large data.
190    let dfs: Vec<DataFrame> = groups
191        .iter()
192        .map(|g| {
193            let first_idx = g.first();
194
195            let mut non_group_keys_df = unsafe { non_group_keys_df.gather_group_unchecked(&g) };
196
197            if let Some(i) = upsample_index_col_idx {
198                non_group_keys_df = upsample_single_impl(
199                    &non_group_keys_df,
200                    non_group_keys_df.columns()[i].as_materialized_series(),
201                    every,
202                )?
203            }
204
205            let mut out = non_group_keys_df;
206
207            let group_keys_df = group_keys_df.new_from_index(first_idx as usize, out.height());
208
209            let out_cols = unsafe { out.columns_mut() };
210
211            out_cols.reserve(group_keys_df.width());
212            out_cols.extend(group_keys_df.into_columns());
213
214            Ok(out)
215        })
216        .collect::<PolarsResult<_>>()?;
217
218    Ok(unsafe {
219        accumulate_dataframes_vertical_unchecked(dfs)
220            .select_unchecked(source_schema.iter_names())?
221            .with_schema(source_schema.clone())
222    })
223}
224
225fn upsample_single_impl(
226    source: &DataFrame,
227    index_column: &Series,
228    every: Duration,
229) -> PolarsResult<DataFrame> {
230    index_column.ensure_sorted_arg("upsample")?;
231    let index_col_name = index_column.name();
232
233    use DataType::*;
234    match index_column.dtype() {
235        #[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
236        Datetime(tu, tz) => {
237            let s = index_column.cast(&Int64).unwrap();
238            let ca = s.i64().unwrap();
239            let first = ca.iter().flatten().next();
240            let last = ca.iter().flatten().next_back();
241            match (first, last) {
242                (Some(first), Some(last)) => {
243                    let tz = match tz {
244                        #[cfg(feature = "timezones")]
245                        Some(tz) => Some(parse_time_zone(tz)?),
246                        _ => None,
247                    };
248                    let range = datetime_range_impl(
249                        index_col_name.clone(),
250                        first,
251                        last,
252                        every,
253                        ClosedWindow::Both,
254                        *tu,
255                        tz.as_ref(),
256                    )?
257                    .into_series()
258                    .into_frame();
259                    range.join(
260                        source,
261                        [index_col_name.clone()],
262                        [index_col_name.clone()],
263                        JoinArgs::new(JoinType::Left),
264                        None,
265                    )
266                },
267                _ => polars_bail!(
268                    ComputeError: "cannot determine upsample boundaries: all elements are null"
269                ),
270            }
271        },
272        dt => polars_bail!(
273            ComputeError: "upsample not allowed for index column of dtype {}", dt,
274        ),
275    }
276}