polars_time/upsample.rs
1#[cfg(feature = "timezones")]
2use polars_core::datatypes::time_zone::parse_time_zone;
3use polars_core::prelude::*;
4use polars_ops::prelude::*;
5use polars_ops::series::SeriesMethods;
6
7use crate::prelude::*;
8
9pub trait PolarsUpsample {
10 /// Upsample a [`DataFrame`] at a regular frequency.
11 ///
12 /// # Arguments
13 /// * `by` - First group by these columns and then upsample for every group
14 /// * `time_column` - Will be used to determine a date_range.
15 /// Note that this column has to be sorted for the output to make sense.
16 /// * `every` - interval will start 'every' duration
17 /// * `offset` - change the start of the date_range by this offset.
18 ///
19 /// The `every` and `offset` arguments are created with
20 /// the following string language:
21 /// - 1ns (1 nanosecond)
22 /// - 1us (1 microsecond)
23 /// - 1ms (1 millisecond)
24 /// - 1s (1 second)
25 /// - 1m (1 minute)
26 /// - 1h (1 hour)
27 /// - 1d (1 calendar day)
28 /// - 1w (1 calendar week)
29 /// - 1mo (1 calendar month)
30 /// - 1q (1 calendar quarter)
31 /// - 1y (1 calendar year)
32 /// - 1i (1 index count)
33 ///
34 /// Or combine them:
35 /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
36 ///
37 /// By "calendar day", we mean the corresponding time on the next
38 /// day (which may not be 24 hours, depending on daylight savings).
39 /// Similarly for "calendar week", "calendar month", "calendar quarter",
40 /// and "calendar year".
41 fn upsample<I: IntoVec<PlSmallStr>>(
42 &self,
43 by: I,
44 time_column: &str,
45 every: Duration,
46 ) -> PolarsResult<DataFrame>;
47
48 /// Upsample a [`DataFrame`] at a regular frequency.
49 ///
50 /// Similar to [`upsample`][PolarsUpsample::upsample], but order of the
51 /// DataFrame is maintained when `by` is specified.
52 ///
53 /// # Arguments
54 /// * `by` - First group by these columns and then upsample for every group
55 /// * `time_column` - Will be used to determine a date_range.
56 /// Note that this column has to be sorted for the output to make sense.
57 /// * `every` - interval will start 'every' duration
58 /// * `offset` - change the start of the date_range by this offset.
59 ///
60 /// The `every` and `offset` arguments are created with
61 /// the following string language:
62 /// - 1ns (1 nanosecond)
63 /// - 1us (1 microsecond)
64 /// - 1ms (1 millisecond)
65 /// - 1s (1 second)
66 /// - 1m (1 minute)
67 /// - 1h (1 hour)
68 /// - 1d (1 calendar day)
69 /// - 1w (1 calendar week)
70 /// - 1mo (1 calendar month)
71 /// - 1q (1 calendar quarter)
72 /// - 1y (1 calendar year)
73 /// - 1i (1 index count)
74 ///
75 /// Or combine them:
76 /// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
77 ///
78 /// By "calendar day", we mean the corresponding time on the next
79 /// day (which may not be 24 hours, depending on daylight savings).
80 /// Similarly for "calendar week", "calendar month", "calendar quarter",
81 /// and "calendar year".
82 fn upsample_stable<I: IntoVec<PlSmallStr>>(
83 &self,
84 by: I,
85 time_column: &str,
86 every: Duration,
87 ) -> PolarsResult<DataFrame>;
88}
89
90impl PolarsUpsample for DataFrame {
91 fn upsample<I: IntoVec<PlSmallStr>>(
92 &self,
93 by: I,
94 time_column: &str,
95 every: Duration,
96 ) -> PolarsResult<DataFrame> {
97 let by = by.into_vec();
98 let time_type = self.column(time_column)?.dtype();
99 ensure_duration_matches_dtype(every, time_type, "every")?;
100 upsample_impl(self, by, time_column, every, false)
101 }
102
103 fn upsample_stable<I: IntoVec<PlSmallStr>>(
104 &self,
105 by: I,
106 time_column: &str,
107 every: Duration,
108 ) -> PolarsResult<DataFrame> {
109 let by = by.into_vec();
110 let time_type = self.column(time_column)?.dtype();
111 ensure_duration_matches_dtype(every, time_type, "every")?;
112 upsample_impl(self, by, time_column, every, true)
113 }
114}
115
116fn upsample_impl(
117 source: &DataFrame,
118 by: Vec<PlSmallStr>,
119 index_column: &str,
120 every: Duration,
121 stable: bool,
122) -> PolarsResult<DataFrame> {
123 let s = source.column(index_column)?;
124 let time_type = s.dtype();
125 match time_type {
126 #[cfg(feature = "dtype-date")]
127 DataType::Date => {
128 let mut df = source.clone();
129 df.apply(index_column, |s| {
130 s.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
131 .unwrap()
132 })
133 .unwrap();
134 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
135 out.apply(index_column, |s| s.cast(time_type).unwrap())
136 .unwrap();
137 Ok(out)
138 },
139 DataType::UInt32 | DataType::UInt64 | DataType::Int32 => {
140 let mut df = source.clone();
141
142 df.apply(index_column, |s| {
143 s.cast(&DataType::Int64)
144 .unwrap()
145 .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
146 .unwrap()
147 })
148 .unwrap();
149 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
150 out.apply(index_column, |s| s.cast(time_type).unwrap())
151 .unwrap();
152 Ok(out)
153 },
154 DataType::Int64 => {
155 let mut df = source.clone();
156 df.apply(index_column, |s| {
157 s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
158 .unwrap()
159 })
160 .unwrap();
161 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
162 out.apply(index_column, |s| s.cast(time_type).unwrap())
163 .unwrap();
164 Ok(out)
165 },
166 _ => {
167 if by.is_empty() {
168 let index_column = source.column(index_column)?;
169 upsample_single_impl(source, index_column.as_materialized_series(), every)
170 } else {
171 let gb = if stable {
172 source.group_by_stable(by)
173 } else {
174 source.group_by(by)
175 };
176 // don't parallelize this, this may SO on large data.
177 gb?.apply(|df| {
178 let index_column = df.column(index_column)?;
179 upsample_single_impl(&df, index_column.as_materialized_series(), every)
180 })
181 }
182 },
183 }
184}
185
186fn upsample_single_impl(
187 source: &DataFrame,
188 index_column: &Series,
189 every: Duration,
190) -> PolarsResult<DataFrame> {
191 index_column.ensure_sorted_arg("upsample")?;
192 let index_col_name = index_column.name();
193
194 use DataType::*;
195 match index_column.dtype() {
196 #[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
197 Datetime(tu, tz) => {
198 let s = index_column.cast(&Int64).unwrap();
199 let ca = s.i64().unwrap();
200 let first = ca.iter().flatten().next();
201 let last = ca.iter().flatten().next_back();
202 match (first, last) {
203 (Some(first), Some(last)) => {
204 let tz = match tz {
205 #[cfg(feature = "timezones")]
206 Some(tz) => Some(parse_time_zone(tz)?),
207 _ => None,
208 };
209 let range = datetime_range_impl(
210 index_col_name.clone(),
211 first,
212 last,
213 every,
214 ClosedWindow::Both,
215 *tu,
216 tz.as_ref(),
217 )?
218 .into_series()
219 .into_frame();
220 range.join(
221 source,
222 [index_col_name.clone()],
223 [index_col_name.clone()],
224 JoinArgs::new(JoinType::Left),
225 None,
226 )
227 },
228 _ => polars_bail!(
229 ComputeError: "cannot determine upsample boundaries: all elements are null"
230 ),
231 }
232 },
233 dt => polars_bail!(
234 ComputeError: "upsample not allowed for index column of dtype {}", dt,
235 ),
236 }
237}