polars_time/
upsample.rs

1#[cfg(feature = "timezones")]
2use polars_core::chunked_array::temporal::parse_time_zone;
3use polars_core::prelude::*;
4use polars_ops::prelude::*;
5use polars_ops::series::SeriesMethods;
6
7use crate::prelude::*;
8
9pub trait PolarsUpsample {
10    /// Upsample a [`DataFrame`] at a regular frequency.
11    ///
12    /// # Arguments
13    /// * `by` - First group by these columns and then upsample for every group
14    /// * `time_column` - Will be used to determine a date_range.
15    ///   Note that this column has to be sorted for the output to make sense.
16    /// * `every` - interval will start 'every' duration
17    /// * `offset` - change the start of the date_range by this offset.
18    ///
19    /// The `every` and `offset` arguments are created with
20    /// the following string language:
21    /// - 1ns   (1 nanosecond)
22    /// - 1us   (1 microsecond)
23    /// - 1ms   (1 millisecond)
24    /// - 1s    (1 second)
25    /// - 1m    (1 minute)
26    /// - 1h    (1 hour)
27    /// - 1d    (1 calendar day)
28    /// - 1w    (1 calendar week)
29    /// - 1mo   (1 calendar month)
30    /// - 1q    (1 calendar quarter)
31    /// - 1y    (1 calendar year)
32    /// - 1i    (1 index count)
33    ///
34    /// Or combine them:
35    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
36    ///
37    /// By "calendar day", we mean the corresponding time on the next
38    /// day (which may not be 24 hours, depending on daylight savings).
39    /// Similarly for "calendar week", "calendar month", "calendar quarter",
40    /// and "calendar year".
41    fn upsample<I: IntoVec<PlSmallStr>>(
42        &self,
43        by: I,
44        time_column: &str,
45        every: Duration,
46    ) -> PolarsResult<DataFrame>;
47
48    /// Upsample a [`DataFrame`] at a regular frequency.
49    ///
50    /// Similar to [`upsample`][PolarsUpsample::upsample], but order of the
51    /// DataFrame is maintained when `by` is specified.
52    ///
53    /// # Arguments
54    /// * `by` - First group by these columns and then upsample for every group
55    /// * `time_column` - Will be used to determine a date_range.
56    ///   Note that this column has to be sorted for the output to make sense.
57    /// * `every` - interval will start 'every' duration
58    /// * `offset` - change the start of the date_range by this offset.
59    ///
60    /// The `every` and `offset` arguments are created with
61    /// the following string language:
62    /// - 1ns   (1 nanosecond)
63    /// - 1us   (1 microsecond)
64    /// - 1ms   (1 millisecond)
65    /// - 1s    (1 second)
66    /// - 1m    (1 minute)
67    /// - 1h    (1 hour)
68    /// - 1d    (1 calendar day)
69    /// - 1w    (1 calendar week)
70    /// - 1mo   (1 calendar month)
71    /// - 1q    (1 calendar quarter)
72    /// - 1y    (1 calendar year)
73    /// - 1i    (1 index count)
74    ///
75    /// Or combine them:
76    /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
77    ///
78    /// By "calendar day", we mean the corresponding time on the next
79    /// day (which may not be 24 hours, depending on daylight savings).
80    /// Similarly for "calendar week", "calendar month", "calendar quarter",
81    /// and "calendar year".
82    fn upsample_stable<I: IntoVec<PlSmallStr>>(
83        &self,
84        by: I,
85        time_column: &str,
86        every: Duration,
87    ) -> PolarsResult<DataFrame>;
88}
89
90impl PolarsUpsample for DataFrame {
91    fn upsample<I: IntoVec<PlSmallStr>>(
92        &self,
93        by: I,
94        time_column: &str,
95        every: Duration,
96    ) -> PolarsResult<DataFrame> {
97        let by = by.into_vec();
98        let time_type = self.column(time_column)?.dtype();
99        ensure_duration_matches_dtype(every, time_type, "every")?;
100        upsample_impl(self, by, time_column, every, false)
101    }
102
103    fn upsample_stable<I: IntoVec<PlSmallStr>>(
104        &self,
105        by: I,
106        time_column: &str,
107        every: Duration,
108    ) -> PolarsResult<DataFrame> {
109        let by = by.into_vec();
110        let time_type = self.column(time_column)?.dtype();
111        ensure_duration_matches_dtype(every, time_type, "every")?;
112        upsample_impl(self, by, time_column, every, true)
113    }
114}
115
116fn upsample_impl(
117    source: &DataFrame,
118    by: Vec<PlSmallStr>,
119    index_column: &str,
120    every: Duration,
121    stable: bool,
122) -> PolarsResult<DataFrame> {
123    let s = source.column(index_column)?;
124    let time_type = s.dtype();
125    if matches!(time_type, DataType::Date) {
126        let mut df = source.clone();
127        df.apply(index_column, |s| {
128            s.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
129                .unwrap()
130        })
131        .unwrap();
132        let mut out = upsample_impl(&df, by, index_column, every, stable)?;
133        out.apply(index_column, |s| s.cast(time_type).unwrap())
134            .unwrap();
135        Ok(out)
136    } else if matches!(
137        time_type,
138        DataType::UInt32 | DataType::UInt64 | DataType::Int32
139    ) {
140        let mut df = source.clone();
141
142        df.apply(index_column, |s| {
143            s.cast(&DataType::Int64)
144                .unwrap()
145                .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
146                .unwrap()
147        })
148        .unwrap();
149        let mut out = upsample_impl(&df, by, index_column, every, stable)?;
150        out.apply(index_column, |s| s.cast(time_type).unwrap())
151            .unwrap();
152        Ok(out)
153    } else if matches!(time_type, DataType::Int64) {
154        let mut df = source.clone();
155        df.apply(index_column, |s| {
156            s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
157                .unwrap()
158        })
159        .unwrap();
160        let mut out = upsample_impl(&df, by, index_column, every, stable)?;
161        out.apply(index_column, |s| s.cast(time_type).unwrap())
162            .unwrap();
163        Ok(out)
164    } else if by.is_empty() {
165        let index_column = source.column(index_column)?;
166        upsample_single_impl(source, index_column.as_materialized_series(), every)
167    } else {
168        let gb = if stable {
169            source.group_by_stable(by)
170        } else {
171            source.group_by(by)
172        };
173        // don't parallelize this, this may SO on large data.
174        gb?.apply(|df| {
175            let index_column = df.column(index_column)?;
176            upsample_single_impl(&df, index_column.as_materialized_series(), every)
177        })
178    }
179}
180
181fn upsample_single_impl(
182    source: &DataFrame,
183    index_column: &Series,
184    every: Duration,
185) -> PolarsResult<DataFrame> {
186    index_column.ensure_sorted_arg("upsample")?;
187    let index_col_name = index_column.name();
188
189    use DataType::*;
190    match index_column.dtype() {
191        Datetime(tu, tz) => {
192            let s = index_column.cast(&Int64).unwrap();
193            let ca = s.i64().unwrap();
194            let first = ca.iter().flatten().next();
195            let last = ca.iter().flatten().next_back();
196            match (first, last) {
197                (Some(first), Some(last)) => {
198                    let tz = match tz {
199                        #[cfg(feature = "timezones")]
200                        Some(tz) => Some(parse_time_zone(tz)?),
201                        _ => None,
202                    };
203                    let range = datetime_range_impl(
204                        index_col_name.clone(),
205                        first,
206                        last,
207                        every,
208                        ClosedWindow::Both,
209                        *tu,
210                        tz.as_ref(),
211                    )?
212                    .into_series()
213                    .into_frame();
214                    range.join(
215                        source,
216                        [index_col_name.clone()],
217                        [index_col_name.clone()],
218                        JoinArgs::new(JoinType::Left),
219                        None,
220                    )
221                },
222                _ => polars_bail!(
223                    ComputeError: "cannot determine upsample boundaries: all elements are null"
224                ),
225            }
226        },
227        dt => polars_bail!(
228            ComputeError: "upsample not allowed for index column of dtype {}", dt,
229        ),
230    }
231}