polars_time/
upsample.rs

1#[cfg(feature = "timezones")]
2use polars_core::datatypes::time_zone::parse_time_zone;
3use polars_core::prelude::*;
4use polars_ops::prelude::*;
5use polars_ops::series::SeriesMethods;
6
7use crate::prelude::*;
8
9pub trait PolarsUpsample {
10    /// Upsample a [`DataFrame`] at a regular frequency.
11    ///
12    /// # Arguments
13    /// * `by` - First group by these columns and then upsample for every group
14    /// * `time_column` - Will be used to determine a date_range.
15    ///   Note that this column has to be sorted for the output to make sense.
16    /// * `every` - interval will start 'every' duration
17    /// * `offset` - change the start of the date_range by this offset.
18    ///
19    /// The `every` and `offset` arguments are created with
20    /// the following string language:
21    /// - 1ns   (1 nanosecond)
22    /// - 1us   (1 microsecond)
23    /// - 1ms   (1 millisecond)
24    /// - 1s    (1 second)
25    /// - 1m    (1 minute)
26    /// - 1h    (1 hour)
27    /// - 1d    (1 calendar day)
28    /// - 1w    (1 calendar week)
29    /// - 1mo   (1 calendar month)
30    /// - 1q    (1 calendar quarter)
31    /// - 1y    (1 calendar year)
32    /// - 1i    (1 index count)
33    ///
34    /// Or combine them:
35    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
36    ///
37    /// By "calendar day", we mean the corresponding time on the next
38    /// day (which may not be 24 hours, depending on daylight savings).
39    /// Similarly for "calendar week", "calendar month", "calendar quarter",
40    /// and "calendar year".
41    fn upsample<I: IntoVec<PlSmallStr>>(
42        &self,
43        by: I,
44        time_column: &str,
45        every: Duration,
46    ) -> PolarsResult<DataFrame>;
47
48    /// Upsample a [`DataFrame`] at a regular frequency.
49    ///
50    /// Similar to [`upsample`][PolarsUpsample::upsample], but order of the
51    /// DataFrame is maintained when `by` is specified.
52    ///
53    /// # Arguments
54    /// * `by` - First group by these columns and then upsample for every group
55    /// * `time_column` - Will be used to determine a date_range.
56    ///   Note that this column has to be sorted for the output to make sense.
57    /// * `every` - interval will start 'every' duration
58    /// * `offset` - change the start of the date_range by this offset.
59    ///
60    /// The `every` and `offset` arguments are created with
61    /// the following string language:
62    /// - 1ns   (1 nanosecond)
63    /// - 1us   (1 microsecond)
64    /// - 1ms   (1 millisecond)
65    /// - 1s    (1 second)
66    /// - 1m    (1 minute)
67    /// - 1h    (1 hour)
68    /// - 1d    (1 calendar day)
69    /// - 1w    (1 calendar week)
70    /// - 1mo   (1 calendar month)
71    /// - 1q    (1 calendar quarter)
72    /// - 1y    (1 calendar year)
73    /// - 1i    (1 index count)
74    ///
75    /// Or combine them:
76    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
77    ///
78    /// By "calendar day", we mean the corresponding time on the next
79    /// day (which may not be 24 hours, depending on daylight savings).
80    /// Similarly for "calendar week", "calendar month", "calendar quarter",
81    /// and "calendar year".
82    fn upsample_stable<I: IntoVec<PlSmallStr>>(
83        &self,
84        by: I,
85        time_column: &str,
86        every: Duration,
87    ) -> PolarsResult<DataFrame>;
88}
89
90impl PolarsUpsample for DataFrame {
91    fn upsample<I: IntoVec<PlSmallStr>>(
92        &self,
93        by: I,
94        time_column: &str,
95        every: Duration,
96    ) -> PolarsResult<DataFrame> {
97        let by = by.into_vec();
98        let time_type = self.column(time_column)?.dtype();
99        ensure_duration_matches_dtype(every, time_type, "every")?;
100        upsample_impl(self, by, time_column, every, false)
101    }
102
103    fn upsample_stable<I: IntoVec<PlSmallStr>>(
104        &self,
105        by: I,
106        time_column: &str,
107        every: Duration,
108    ) -> PolarsResult<DataFrame> {
109        let by = by.into_vec();
110        let time_type = self.column(time_column)?.dtype();
111        ensure_duration_matches_dtype(every, time_type, "every")?;
112        upsample_impl(self, by, time_column, every, true)
113    }
114}
115
116fn upsample_impl(
117    source: &DataFrame,
118    by: Vec<PlSmallStr>,
119    index_column: &str,
120    every: Duration,
121    stable: bool,
122) -> PolarsResult<DataFrame> {
123    let s = source.column(index_column)?;
124    let time_type = s.dtype();
125    match time_type {
126        #[cfg(feature = "dtype-date")]
127        DataType::Date => {
128            let mut df = source.clone();
129            df.apply(index_column, |s| {
130                s.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
131                    .unwrap()
132            })
133            .unwrap();
134            let mut out = upsample_impl(&df, by, index_column, every, stable)?;
135            out.apply(index_column, |s| s.cast(time_type).unwrap())
136                .unwrap();
137            Ok(out)
138        },
139        DataType::UInt32 | DataType::UInt64 | DataType::Int32 => {
140            let mut df = source.clone();
141
142            df.apply(index_column, |s| {
143                s.cast(&DataType::Int64)
144                    .unwrap()
145                    .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
146                    .unwrap()
147            })
148            .unwrap();
149            let mut out = upsample_impl(&df, by, index_column, every, stable)?;
150            out.apply(index_column, |s| s.cast(time_type).unwrap())
151                .unwrap();
152            Ok(out)
153        },
154        DataType::Int64 => {
155            let mut df = source.clone();
156            df.apply(index_column, |s| {
157                s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
158                    .unwrap()
159            })
160            .unwrap();
161            let mut out = upsample_impl(&df, by, index_column, every, stable)?;
162            out.apply(index_column, |s| s.cast(time_type).unwrap())
163                .unwrap();
164            Ok(out)
165        },
166        _ => {
167            if by.is_empty() {
168                let index_column = source.column(index_column)?;
169                upsample_single_impl(source, index_column.as_materialized_series(), every)
170            } else {
171                let gb = if stable {
172                    source.group_by_stable(by)
173                } else {
174                    source.group_by(by)
175                };
176                // don't parallelize this, this may SO on large data.
177                gb?.apply(|df| {
178                    let index_column = df.column(index_column)?;
179                    upsample_single_impl(&df, index_column.as_materialized_series(), every)
180                })
181            }
182        },
183    }
184}
185
186fn upsample_single_impl(
187    source: &DataFrame,
188    index_column: &Series,
189    every: Duration,
190) -> PolarsResult<DataFrame> {
191    index_column.ensure_sorted_arg("upsample")?;
192    let index_col_name = index_column.name();
193
194    use DataType::*;
195    match index_column.dtype() {
196        #[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
197        Datetime(tu, tz) => {
198            let s = index_column.cast(&Int64).unwrap();
199            let ca = s.i64().unwrap();
200            let first = ca.iter().flatten().next();
201            let last = ca.iter().flatten().next_back();
202            match (first, last) {
203                (Some(first), Some(last)) => {
204                    let tz = match tz {
205                        #[cfg(feature = "timezones")]
206                        Some(tz) => Some(parse_time_zone(tz)?),
207                        _ => None,
208                    };
209                    let range = datetime_range_impl(
210                        index_col_name.clone(),
211                        first,
212                        last,
213                        every,
214                        ClosedWindow::Both,
215                        *tu,
216                        tz.as_ref(),
217                    )?
218                    .into_series()
219                    .into_frame();
220                    range.join(
221                        source,
222                        [index_col_name.clone()],
223                        [index_col_name.clone()],
224                        JoinArgs::new(JoinType::Left),
225                        None,
226                    )
227                },
228                _ => polars_bail!(
229                    ComputeError: "cannot determine upsample boundaries: all elements are null"
230                ),
231            }
232        },
233        dt => polars_bail!(
234            ComputeError: "upsample not allowed for index column of dtype {}", dt,
235        ),
236    }
237}