1#[cfg(feature = "timezones")]
2use polars_core::datatypes::time_zone::parse_time_zone;
3use polars_core::prelude::*;
4use polars_core::utils::accumulate_dataframes_vertical_unchecked;
5use polars_ops::prelude::*;
6use polars_ops::series::SeriesMethods;
7
8use crate::prelude::*;
9
10pub trait PolarsUpsample {
11 fn upsample<I: IntoVec<PlSmallStr>>(
43 &self,
44 by: I,
45 time_column: &str,
46 every: Duration,
47 ) -> PolarsResult<DataFrame>;
48
49 fn upsample_stable<I: IntoVec<PlSmallStr>>(
84 &self,
85 by: I,
86 time_column: &str,
87 every: Duration,
88 ) -> PolarsResult<DataFrame>;
89}
90
91impl PolarsUpsample for DataFrame {
92 fn upsample<I: IntoVec<PlSmallStr>>(
93 &self,
94 by: I,
95 time_column: &str,
96 every: Duration,
97 ) -> PolarsResult<DataFrame> {
98 let by = by.into_vec();
99 let time_type = self.column(time_column)?.dtype();
100 ensure_duration_matches_dtype(every, time_type, "every")?;
101 upsample_impl(self, by, time_column, every, false)
102 }
103
104 fn upsample_stable<I: IntoVec<PlSmallStr>>(
105 &self,
106 by: I,
107 time_column: &str,
108 every: Duration,
109 ) -> PolarsResult<DataFrame> {
110 let by = by.into_vec();
111 let time_type = self.column(time_column)?.dtype();
112 ensure_duration_matches_dtype(every, time_type, "every")?;
113 upsample_impl(self, by, time_column, every, true)
114 }
115}
116
117fn upsample_impl(
118 source: &DataFrame,
119 by: Vec<PlSmallStr>,
120 index_column: &str,
121 every: Duration,
122 stable: bool,
123) -> PolarsResult<DataFrame> {
124 let s = source.column(index_column)?;
125 let time_type = s.dtype();
126 match time_type {
127 #[cfg(feature = "dtype-date")]
128 DataType::Date => {
129 let mut df = source.clone();
130 df.apply(index_column, |s| {
131 s.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
132 .unwrap()
133 })
134 .unwrap();
135 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
136 out.apply(index_column, |s| s.cast(time_type).unwrap())
137 .unwrap();
138 Ok(out)
139 },
140 DataType::UInt32 | DataType::UInt64 | DataType::Int32 => {
141 let mut df = source.clone();
142
143 df.apply(index_column, |s| {
144 s.cast(&DataType::Int64)
145 .unwrap()
146 .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
147 .unwrap()
148 })
149 .unwrap();
150 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
151 out.apply(index_column, |s| s.cast(time_type).unwrap())
152 .unwrap();
153 Ok(out)
154 },
155 DataType::Int64 => {
156 let mut df = source.clone();
157 df.apply(index_column, |s| {
158 s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
159 .unwrap()
160 })
161 .unwrap();
162 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
163 out.apply(index_column, |s| s.cast(time_type).unwrap())
164 .unwrap();
165 Ok(out)
166 },
167 _ => {
168 if by.is_empty() {
169 let index_column = source.column(index_column)?;
170 upsample_single_impl(source, index_column.as_materialized_series(), every)
171 } else {
172 let source_schema = source.schema();
173
174 let group_keys_df = source.select(by)?;
175 let group_keys_schema = group_keys_df.schema();
176
177 let groups = if stable {
178 group_keys_df.group_by_stable(group_keys_schema.iter_names_cloned())
179 } else {
180 group_keys_df.group_by(group_keys_schema.iter_names_cloned())
181 }?
182 .into_groups();
183
184 let non_group_keys_df = unsafe {
185 source.project_names(
186 source_schema
187 .iter_names()
188 .filter(|name| !group_keys_schema.contains(name.as_str())),
189 )?
190 };
191
192 let upsample_index_col_idx: Option<usize> =
193 non_group_keys_df.schema().index_of(index_column);
194
195 let dfs: Vec<DataFrame> = groups
197 .iter()
198 .map(|g| {
199 let first_idx = g.first();
200
201 let mut non_group_keys_df =
202 unsafe { non_group_keys_df.gather_group_unchecked(&g) };
203
204 if let Some(i) = upsample_index_col_idx {
205 non_group_keys_df = upsample_single_impl(
206 &non_group_keys_df,
207 non_group_keys_df.get_columns()[i].as_materialized_series(),
208 every,
209 )?
210 }
211
212 let mut out = non_group_keys_df;
213
214 let group_keys_df =
215 group_keys_df.new_from_index(first_idx as usize, out.height());
216
217 out.clear_schema();
218 let out_cols = unsafe { out.get_columns_mut() };
219
220 out_cols.reserve(group_keys_df.width());
221 out_cols.extend(group_keys_df.into_columns());
222
223 Ok(out)
224 })
225 .collect::<PolarsResult<_>>()?;
226
227 unsafe {
228 accumulate_dataframes_vertical_unchecked(dfs).project(source_schema.clone())
229 }
230 }
231 },
232 }
233}
234
235fn upsample_single_impl(
236 source: &DataFrame,
237 index_column: &Series,
238 every: Duration,
239) -> PolarsResult<DataFrame> {
240 index_column.ensure_sorted_arg("upsample")?;
241 let index_col_name = index_column.name();
242
243 use DataType::*;
244 match index_column.dtype() {
245 #[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
246 Datetime(tu, tz) => {
247 let s = index_column.cast(&Int64).unwrap();
248 let ca = s.i64().unwrap();
249 let first = ca.iter().flatten().next();
250 let last = ca.iter().flatten().next_back();
251 match (first, last) {
252 (Some(first), Some(last)) => {
253 let tz = match tz {
254 #[cfg(feature = "timezones")]
255 Some(tz) => Some(parse_time_zone(tz)?),
256 _ => None,
257 };
258 let range = datetime_range_impl(
259 index_col_name.clone(),
260 first,
261 last,
262 every,
263 ClosedWindow::Both,
264 *tu,
265 tz.as_ref(),
266 )?
267 .into_series()
268 .into_frame();
269 range.join(
270 source,
271 [index_col_name.clone()],
272 [index_col_name.clone()],
273 JoinArgs::new(JoinType::Left),
274 None,
275 )
276 },
277 _ => polars_bail!(
278 ComputeError: "cannot determine upsample boundaries: all elements are null"
279 ),
280 }
281 },
282 dt => polars_bail!(
283 ComputeError: "upsample not allowed for index column of dtype {}", dt,
284 ),
285 }
286}