polars_time/
upsample.rs

1#[cfg(feature = "timezones")]
2use polars_core::datatypes::time_zone::parse_time_zone;
3use polars_core::prelude::*;
4use polars_core::utils::accumulate_dataframes_vertical_unchecked;
5use polars_ops::prelude::*;
6use polars_ops::series::SeriesMethods;
7
8use crate::prelude::*;
9
10pub trait PolarsUpsample {
11    /// Upsample a [`DataFrame`] at a regular frequency.
12    ///
13    /// # Arguments
14    /// * `by` - First group by these columns and then upsample for every group
15    /// * `time_column` - Will be used to determine a date_range.
16    ///   Note that this column has to be sorted for the output to make sense.
17    /// * `every` - interval will start 'every' duration
18    /// * `offset` - change the start of the date_range by this offset.
19    ///
20    /// The `every` and `offset` arguments are created with
21    /// the following string language:
22    /// - 1ns   (1 nanosecond)
23    /// - 1us   (1 microsecond)
24    /// - 1ms   (1 millisecond)
25    /// - 1s    (1 second)
26    /// - 1m    (1 minute)
27    /// - 1h    (1 hour)
28    /// - 1d    (1 calendar day)
29    /// - 1w    (1 calendar week)
30    /// - 1mo   (1 calendar month)
31    /// - 1q    (1 calendar quarter)
32    /// - 1y    (1 calendar year)
33    /// - 1i    (1 index count)
34    ///
35    /// Or combine them:
36    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
37    ///
38    /// By "calendar day", we mean the corresponding time on the next
39    /// day (which may not be 24 hours, depending on daylight savings).
40    /// Similarly for "calendar week", "calendar month", "calendar quarter",
41    /// and "calendar year".
42    fn upsample<I: IntoVec<PlSmallStr>>(
43        &self,
44        by: I,
45        time_column: &str,
46        every: Duration,
47    ) -> PolarsResult<DataFrame>;
48
49    /// Upsample a [`DataFrame`] at a regular frequency.
50    ///
51    /// Similar to [`upsample`][PolarsUpsample::upsample], but order of the
52    /// DataFrame is maintained when `by` is specified.
53    ///
54    /// # Arguments
55    /// * `by` - First group by these columns and then upsample for every group
56    /// * `time_column` - Will be used to determine a date_range.
57    ///   Note that this column has to be sorted for the output to make sense.
58    /// * `every` - interval will start 'every' duration
59    /// * `offset` - change the start of the date_range by this offset.
60    ///
61    /// The `every` and `offset` arguments are created with
62    /// the following string language:
63    /// - 1ns   (1 nanosecond)
64    /// - 1us   (1 microsecond)
65    /// - 1ms   (1 millisecond)
66    /// - 1s    (1 second)
67    /// - 1m    (1 minute)
68    /// - 1h    (1 hour)
69    /// - 1d    (1 calendar day)
70    /// - 1w    (1 calendar week)
71    /// - 1mo   (1 calendar month)
72    /// - 1q    (1 calendar quarter)
73    /// - 1y    (1 calendar year)
74    /// - 1i    (1 index count)
75    ///
76    /// Or combine them:
77    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
78    ///
79    /// By "calendar day", we mean the corresponding time on the next
80    /// day (which may not be 24 hours, depending on daylight savings).
81    /// Similarly for "calendar week", "calendar month", "calendar quarter",
82    /// and "calendar year".
83    fn upsample_stable<I: IntoVec<PlSmallStr>>(
84        &self,
85        by: I,
86        time_column: &str,
87        every: Duration,
88    ) -> PolarsResult<DataFrame>;
89}
90
91impl PolarsUpsample for DataFrame {
92    fn upsample<I: IntoVec<PlSmallStr>>(
93        &self,
94        by: I,
95        time_column: &str,
96        every: Duration,
97    ) -> PolarsResult<DataFrame> {
98        let by = by.into_vec();
99        let time_type = self.column(time_column)?.dtype();
100        ensure_duration_matches_dtype(every, time_type, "every")?;
101        upsample_impl(self, by, time_column, every, false)
102    }
103
104    fn upsample_stable<I: IntoVec<PlSmallStr>>(
105        &self,
106        by: I,
107        time_column: &str,
108        every: Duration,
109    ) -> PolarsResult<DataFrame> {
110        let by = by.into_vec();
111        let time_type = self.column(time_column)?.dtype();
112        ensure_duration_matches_dtype(every, time_type, "every")?;
113        upsample_impl(self, by, time_column, every, true)
114    }
115}
116
117fn upsample_impl(
118    source: &DataFrame,
119    by: Vec<PlSmallStr>,
120    index_column: &str,
121    every: Duration,
122    stable: bool,
123) -> PolarsResult<DataFrame> {
124    let s = source.column(index_column)?;
125    let time_type = s.dtype();
126    match time_type {
127        #[cfg(feature = "dtype-date")]
128        DataType::Date => {
129            let mut df = source.clone();
130            df.apply(index_column, |s| {
131                s.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
132                    .unwrap()
133            })
134            .unwrap();
135            let mut out = upsample_impl(&df, by, index_column, every, stable)?;
136            out.apply(index_column, |s| s.cast(time_type).unwrap())
137                .unwrap();
138            Ok(out)
139        },
140        DataType::UInt32 | DataType::UInt64 | DataType::Int32 => {
141            let mut df = source.clone();
142
143            df.apply(index_column, |s| {
144                s.cast(&DataType::Int64)
145                    .unwrap()
146                    .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
147                    .unwrap()
148            })
149            .unwrap();
150            let mut out = upsample_impl(&df, by, index_column, every, stable)?;
151            out.apply(index_column, |s| s.cast(time_type).unwrap())
152                .unwrap();
153            Ok(out)
154        },
155        DataType::Int64 => {
156            let mut df = source.clone();
157            df.apply(index_column, |s| {
158                s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
159                    .unwrap()
160            })
161            .unwrap();
162            let mut out = upsample_impl(&df, by, index_column, every, stable)?;
163            out.apply(index_column, |s| s.cast(time_type).unwrap())
164                .unwrap();
165            Ok(out)
166        },
167        _ => {
168            if by.is_empty() {
169                let index_column = source.column(index_column)?;
170                upsample_single_impl(source, index_column.as_materialized_series(), every)
171            } else {
172                let source_schema = source.schema();
173
174                let group_keys_df = source.select(by)?;
175                let group_keys_schema = group_keys_df.schema();
176
177                let groups = if stable {
178                    group_keys_df.group_by_stable(group_keys_schema.iter_names_cloned())
179                } else {
180                    group_keys_df.group_by(group_keys_schema.iter_names_cloned())
181                }?
182                .into_groups();
183
184                let non_group_keys_df = unsafe {
185                    source.project_names(
186                        source_schema
187                            .iter_names()
188                            .filter(|name| !group_keys_schema.contains(name.as_str())),
189                    )?
190                };
191
192                let upsample_index_col_idx: Option<usize> =
193                    non_group_keys_df.schema().index_of(index_column);
194
195                // don't parallelize this, this may SO on large data.
196                let dfs: Vec<DataFrame> = groups
197                    .iter()
198                    .map(|g| {
199                        let first_idx = g.first();
200
201                        let mut non_group_keys_df =
202                            unsafe { non_group_keys_df.gather_group_unchecked(&g) };
203
204                        if let Some(i) = upsample_index_col_idx {
205                            non_group_keys_df = upsample_single_impl(
206                                &non_group_keys_df,
207                                non_group_keys_df.get_columns()[i].as_materialized_series(),
208                                every,
209                            )?
210                        }
211
212                        let mut out = non_group_keys_df;
213
214                        let group_keys_df =
215                            group_keys_df.new_from_index(first_idx as usize, out.height());
216
217                        out.clear_schema();
218                        let out_cols = unsafe { out.get_columns_mut() };
219
220                        out_cols.reserve(group_keys_df.width());
221                        out_cols.extend(group_keys_df.into_columns());
222
223                        Ok(out)
224                    })
225                    .collect::<PolarsResult<_>>()?;
226
227                unsafe {
228                    accumulate_dataframes_vertical_unchecked(dfs).project(source_schema.clone())
229                }
230            }
231        },
232    }
233}
234
235fn upsample_single_impl(
236    source: &DataFrame,
237    index_column: &Series,
238    every: Duration,
239) -> PolarsResult<DataFrame> {
240    index_column.ensure_sorted_arg("upsample")?;
241    let index_col_name = index_column.name();
242
243    use DataType::*;
244    match index_column.dtype() {
245        #[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
246        Datetime(tu, tz) => {
247            let s = index_column.cast(&Int64).unwrap();
248            let ca = s.i64().unwrap();
249            let first = ca.iter().flatten().next();
250            let last = ca.iter().flatten().next_back();
251            match (first, last) {
252                (Some(first), Some(last)) => {
253                    let tz = match tz {
254                        #[cfg(feature = "timezones")]
255                        Some(tz) => Some(parse_time_zone(tz)?),
256                        _ => None,
257                    };
258                    let range = datetime_range_impl(
259                        index_col_name.clone(),
260                        first,
261                        last,
262                        every,
263                        ClosedWindow::Both,
264                        *tu,
265                        tz.as_ref(),
266                    )?
267                    .into_series()
268                    .into_frame();
269                    range.join(
270                        source,
271                        [index_col_name.clone()],
272                        [index_col_name.clone()],
273                        JoinArgs::new(JoinType::Left),
274                        None,
275                    )
276                },
277                _ => polars_bail!(
278                    ComputeError: "cannot determine upsample boundaries: all elements are null"
279                ),
280            }
281        },
282        dt => polars_bail!(
283            ComputeError: "upsample not allowed for index column of dtype {}", dt,
284        ),
285    }
286}