Skip to main content

polars_time/
upsample.rs

1#[cfg(feature = "timezones")]
2use polars_core::datatypes::time_zone::parse_time_zone;
3use polars_core::prelude::*;
4use polars_core::utils::accumulate_dataframes_vertical_unchecked;
5use polars_ops::prelude::*;
6use polars_ops::series::SeriesMethods;
7
8use crate::prelude::*;
9
10pub trait PolarsUpsample {
11    /// Upsample a [`DataFrame`] at a regular frequency.
12    ///
13    /// # Arguments
14    /// * `by` - First group by these columns and then upsample for every group
15    /// * `time_column` - Will be used to determine a date_range.
16    ///   Note that this column has to be sorted for the output to make sense.
17    /// * `every` - interval will start 'every' duration
18    /// * `offset` - change the start of the date_range by this offset.
19    ///
20    /// The `every` and `offset` arguments are created with
21    /// the following string language:
22    /// - 1ns   (1 nanosecond)
23    /// - 1us   (1 microsecond)
24    /// - 1ms   (1 millisecond)
25    /// - 1s    (1 second)
26    /// - 1m    (1 minute)
27    /// - 1h    (1 hour)
28    /// - 1d    (1 calendar day)
29    /// - 1w    (1 calendar week)
30    /// - 1mo   (1 calendar month)
31    /// - 1q    (1 calendar quarter)
32    /// - 1y    (1 calendar year)
33    /// - 1i    (1 index count)
34    ///
35    /// Or combine them:
36    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
37    ///
38    /// By "calendar day", we mean the corresponding time on the next
39    /// day (which may not be 24 hours, depending on daylight savings).
40    /// Similarly for "calendar week", "calendar month", "calendar quarter",
41    /// and "calendar year".
42    fn upsample<I: IntoVec<PlSmallStr>>(
43        &self,
44        by: I,
45        time_column: &str,
46        every: Duration,
47    ) -> PolarsResult<DataFrame>;
48
49    /// Upsample a [`DataFrame`] at a regular frequency.
50    ///
51    /// Similar to [`upsample`][PolarsUpsample::upsample], but order of the
52    /// DataFrame is maintained when `by` is specified.
53    ///
54    /// # Arguments
55    /// * `by` - First group by these columns and then upsample for every group
56    /// * `time_column` - Will be used to determine a date_range.
57    ///   Note that this column has to be sorted for the output to make sense.
58    /// * `every` - interval will start 'every' duration
59    /// * `offset` - change the start of the date_range by this offset.
60    ///
61    /// The `every` and `offset` arguments are created with
62    /// the following string language:
63    /// - 1ns   (1 nanosecond)
64    /// - 1us   (1 microsecond)
65    /// - 1ms   (1 millisecond)
66    /// - 1s    (1 second)
67    /// - 1m    (1 minute)
68    /// - 1h    (1 hour)
69    /// - 1d    (1 calendar day)
70    /// - 1w    (1 calendar week)
71    /// - 1mo   (1 calendar month)
72    /// - 1q    (1 calendar quarter)
73    /// - 1y    (1 calendar year)
74    /// - 1i    (1 index count)
75    ///
76    /// Or combine them:
77    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
78    ///
79    /// By "calendar day", we mean the corresponding time on the next
80    /// day (which may not be 24 hours, depending on daylight savings).
81    /// Similarly for "calendar week", "calendar month", "calendar quarter",
82    /// and "calendar year".
83    fn upsample_stable<I: IntoVec<PlSmallStr>>(
84        &self,
85        by: I,
86        time_column: &str,
87        every: Duration,
88    ) -> PolarsResult<DataFrame>;
89}
90
91impl PolarsUpsample for DataFrame {
92    fn upsample<I: IntoVec<PlSmallStr>>(
93        &self,
94        by: I,
95        time_column: &str,
96        every: Duration,
97    ) -> PolarsResult<DataFrame> {
98        let by = by.into_vec();
99        let time_type = self.column(time_column)?.dtype();
100        ensure_duration_matches_dtype(every, time_type, "every")?;
101        upsample_impl(self, by, time_column, every, false)
102    }
103
104    fn upsample_stable<I: IntoVec<PlSmallStr>>(
105        &self,
106        by: I,
107        time_column: &str,
108        every: Duration,
109    ) -> PolarsResult<DataFrame> {
110        let by = by.into_vec();
111        let time_type = self.column(time_column)?.dtype();
112        ensure_duration_matches_dtype(every, time_type, "every")?;
113        upsample_impl(self, by, time_column, every, true)
114    }
115}
116
117fn upsample_impl(
118    source: &DataFrame,
119    by: Vec<PlSmallStr>,
120    index_column: &str,
121    every: Duration,
122    stable: bool,
123) -> PolarsResult<DataFrame> {
124    let s = source.column(index_column)?;
125    let original_type = s.dtype();
126
127    let needs_cast = matches!(
128        original_type,
129        DataType::Date | DataType::UInt32 | DataType::UInt64 | DataType::Int32 | DataType::Int64
130    );
131
132    let mut df = source.clone();
133
134    if needs_cast {
135        df.try_apply(index_column, |s| match s.dtype() {
136            #[cfg(feature = "dtype-date")]
137            DataType::Date => s.cast(&DataType::Datetime(TimeUnit::Microseconds, None)),
138            DataType::UInt32 | DataType::UInt64 | DataType::Int32 => s
139                .cast(&DataType::Int64)?
140                .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
141            DataType::Int64 => s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
142            _ => Ok(s.clone()),
143        })?;
144    }
145
146    let mut out = upsample_core(&df, by, index_column, every, stable)?;
147
148    if needs_cast {
149        out.try_apply(index_column, |s| s.cast(original_type))?;
150    }
151
152    Ok(out)
153}
154
155fn upsample_core(
156    source: &DataFrame,
157    by: Vec<PlSmallStr>,
158    index_column: &str,
159    every: Duration,
160    stable: bool,
161) -> PolarsResult<DataFrame> {
162    if by.is_empty() {
163        let index_column = source.column(index_column)?;
164        return upsample_single_impl(source, index_column.as_materialized_series(), every);
165    }
166
167    if source.height() == 0 {
168        polars_bail!(
169            ComputeError: "cannot determine upsample boundaries: all elements are null"
170        );
171    }
172
173    let source_schema = source.schema();
174
175    let group_keys_df = source.select(by)?;
176    let group_keys_schema = group_keys_df.schema();
177
178    let groups = if stable {
179        group_keys_df.group_by_stable(group_keys_schema.iter_names_cloned())
180    } else {
181        group_keys_df.group_by(group_keys_schema.iter_names_cloned())
182    }?
183    .into_groups();
184
185    let non_group_keys_df = unsafe {
186        source.select_unchecked(
187            source_schema
188                .iter_names()
189                .filter(|name| !group_keys_schema.contains(name.as_str())),
190        )?
191    };
192
193    let upsample_index_col_idx: Option<usize> = non_group_keys_df.schema().index_of(index_column);
194
195    // don't parallelize this, this may SO on large data.
196    let dfs: Vec<DataFrame> = groups
197        .iter()
198        .map(|g| {
199            let first_idx = g.first();
200
201            let mut non_group_keys_df = unsafe { non_group_keys_df.gather_group_unchecked(&g) };
202
203            if let Some(i) = upsample_index_col_idx {
204                non_group_keys_df = upsample_single_impl(
205                    &non_group_keys_df,
206                    non_group_keys_df.columns()[i].as_materialized_series(),
207                    every,
208                )?
209            }
210
211            let mut out = non_group_keys_df;
212
213            let group_keys_df = group_keys_df.new_from_index(first_idx as usize, out.height());
214
215            let out_cols = unsafe { out.columns_mut() };
216
217            out_cols.reserve(group_keys_df.width());
218            out_cols.extend(group_keys_df.into_columns());
219
220            Ok(out)
221        })
222        .collect::<PolarsResult<_>>()?;
223
224    Ok(unsafe {
225        accumulate_dataframes_vertical_unchecked(dfs)
226            .select_unchecked(source_schema.iter_names())?
227            .with_schema(source_schema.clone())
228    })
229}
230
231fn upsample_single_impl(
232    source: &DataFrame,
233    index_column: &Series,
234    every: Duration,
235) -> PolarsResult<DataFrame> {
236    index_column.ensure_sorted_arg("upsample")?;
237    let index_col_name = index_column.name();
238
239    use DataType::*;
240    match index_column.dtype() {
241        #[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
242        Datetime(tu, tz) => {
243            let s = index_column.cast(&Int64).unwrap();
244            let ca = s.i64().unwrap();
245            let first = ca.iter().flatten().next();
246            let last = ca.iter().flatten().next_back();
247            match (first, last) {
248                (Some(first), Some(last)) => {
249                    let tz = match tz {
250                        #[cfg(feature = "timezones")]
251                        Some(tz) => Some(parse_time_zone(tz)?),
252                        _ => None,
253                    };
254                    let range = datetime_range_impl(
255                        index_col_name.clone(),
256                        first,
257                        last,
258                        every,
259                        ClosedWindow::Both,
260                        *tu,
261                        tz.as_ref(),
262                    )?
263                    .into_series()
264                    .into_frame();
265                    range.join(
266                        source,
267                        [index_col_name.clone()],
268                        [index_col_name.clone()],
269                        JoinArgs::new(JoinType::Left),
270                        None,
271                    )
272                },
273                _ => polars_bail!(
274                    ComputeError: "cannot determine upsample boundaries: all elements are null"
275                ),
276            }
277        },
278        dt => polars_bail!(
279            ComputeError: "upsample not allowed for index column of dtype {}", dt,
280        ),
281    }
282}