1#[cfg(feature = "timezones")]
2use polars_core::datatypes::time_zone::parse_time_zone;
3use polars_core::prelude::*;
4use polars_core::utils::accumulate_dataframes_vertical_unchecked;
5use polars_ops::prelude::*;
6use polars_ops::series::SeriesMethods;
7
8use crate::prelude::*;
9
10pub trait PolarsUpsample {
11 fn upsample<I: IntoVec<PlSmallStr>>(
43 &self,
44 by: I,
45 time_column: &str,
46 every: Duration,
47 ) -> PolarsResult<DataFrame>;
48
49 fn upsample_stable<I: IntoVec<PlSmallStr>>(
84 &self,
85 by: I,
86 time_column: &str,
87 every: Duration,
88 ) -> PolarsResult<DataFrame>;
89}
90
91impl PolarsUpsample for DataFrame {
92 fn upsample<I: IntoVec<PlSmallStr>>(
93 &self,
94 by: I,
95 time_column: &str,
96 every: Duration,
97 ) -> PolarsResult<DataFrame> {
98 let by = by.into_vec();
99 let time_type = self.column(time_column)?.dtype();
100 ensure_duration_matches_dtype(every, time_type, "every")?;
101 upsample_impl(self, by, time_column, every, false)
102 }
103
104 fn upsample_stable<I: IntoVec<PlSmallStr>>(
105 &self,
106 by: I,
107 time_column: &str,
108 every: Duration,
109 ) -> PolarsResult<DataFrame> {
110 let by = by.into_vec();
111 let time_type = self.column(time_column)?.dtype();
112 ensure_duration_matches_dtype(every, time_type, "every")?;
113 upsample_impl(self, by, time_column, every, true)
114 }
115}
116
117fn upsample_impl(
118 source: &DataFrame,
119 by: Vec<PlSmallStr>,
120 index_column: &str,
121 every: Duration,
122 stable: bool,
123) -> PolarsResult<DataFrame> {
124 let s = source.column(index_column)?;
125 let original_type = s.dtype();
126
127 let needs_cast = matches!(
128 original_type,
129 DataType::Date | DataType::UInt32 | DataType::UInt64 | DataType::Int32 | DataType::Int64
130 );
131
132 let mut df = source.clone();
133
134 if needs_cast {
135 df.try_apply(index_column, |s| match s.dtype() {
136 #[cfg(feature = "dtype-date")]
137 DataType::Date => s.cast(&DataType::Datetime(TimeUnit::Microseconds, None)),
138 DataType::UInt32 | DataType::UInt64 | DataType::Int32 => s
139 .cast(&DataType::Int64)?
140 .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
141 DataType::Int64 => s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
142 _ => Ok(s.clone()),
143 })?;
144 }
145
146 let mut out = upsample_core(&df, by, index_column, every, stable)?;
147
148 if needs_cast {
149 out.try_apply(index_column, |s| s.cast(original_type))?;
150 }
151
152 Ok(out)
153}
154
155fn upsample_core(
156 source: &DataFrame,
157 by: Vec<PlSmallStr>,
158 index_column: &str,
159 every: Duration,
160 stable: bool,
161) -> PolarsResult<DataFrame> {
162 if by.is_empty() {
163 let index_column = source.column(index_column)?;
164 return upsample_single_impl(source, index_column.as_materialized_series(), every);
165 }
166
167 if source.height() == 0 {
168 polars_bail!(
169 ComputeError: "cannot determine upsample boundaries: all elements are null"
170 );
171 }
172
173 let source_schema = source.schema();
174
175 let group_keys_df = source.select(by)?;
176 let group_keys_schema = group_keys_df.schema();
177
178 let groups = if stable {
179 group_keys_df.group_by_stable(group_keys_schema.iter_names_cloned())
180 } else {
181 group_keys_df.group_by(group_keys_schema.iter_names_cloned())
182 }?
183 .into_groups();
184
185 let non_group_keys_df = unsafe {
186 source.select_unchecked(
187 source_schema
188 .iter_names()
189 .filter(|name| !group_keys_schema.contains(name.as_str())),
190 )?
191 };
192
193 let upsample_index_col_idx: Option<usize> = non_group_keys_df.schema().index_of(index_column);
194
195 let dfs: Vec<DataFrame> = groups
197 .iter()
198 .map(|g| {
199 let first_idx = g.first();
200
201 let mut non_group_keys_df = unsafe { non_group_keys_df.gather_group_unchecked(&g) };
202
203 if let Some(i) = upsample_index_col_idx {
204 non_group_keys_df = upsample_single_impl(
205 &non_group_keys_df,
206 non_group_keys_df.columns()[i].as_materialized_series(),
207 every,
208 )?
209 }
210
211 let mut out = non_group_keys_df;
212
213 let group_keys_df = group_keys_df.new_from_index(first_idx as usize, out.height());
214
215 let out_cols = unsafe { out.columns_mut() };
216
217 out_cols.reserve(group_keys_df.width());
218 out_cols.extend(group_keys_df.into_columns());
219
220 Ok(out)
221 })
222 .collect::<PolarsResult<_>>()?;
223
224 Ok(unsafe {
225 accumulate_dataframes_vertical_unchecked(dfs)
226 .select_unchecked(source_schema.iter_names())?
227 .with_schema(source_schema.clone())
228 })
229}
230
231fn upsample_single_impl(
232 source: &DataFrame,
233 index_column: &Series,
234 every: Duration,
235) -> PolarsResult<DataFrame> {
236 index_column.ensure_sorted_arg("upsample")?;
237 let index_col_name = index_column.name();
238
239 use DataType::*;
240 match index_column.dtype() {
241 #[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
242 Datetime(tu, tz) => {
243 let s = index_column.cast(&Int64).unwrap();
244 let ca = s.i64().unwrap();
245 let first = ca.iter().flatten().next();
246 let last = ca.iter().flatten().next_back();
247 match (first, last) {
248 (Some(first), Some(last)) => {
249 let tz = match tz {
250 #[cfg(feature = "timezones")]
251 Some(tz) => Some(parse_time_zone(tz)?),
252 _ => None,
253 };
254 let range = datetime_range_impl(
255 index_col_name.clone(),
256 first,
257 last,
258 every,
259 ClosedWindow::Both,
260 *tu,
261 tz.as_ref(),
262 )?
263 .into_series()
264 .into_frame();
265 range.join(
266 source,
267 [index_col_name.clone()],
268 [index_col_name.clone()],
269 JoinArgs::new(JoinType::Left),
270 None,
271 )
272 },
273 _ => polars_bail!(
274 ComputeError: "cannot determine upsample boundaries: all elements are null"
275 ),
276 }
277 },
278 dt => polars_bail!(
279 ComputeError: "upsample not allowed for index column of dtype {}", dt,
280 ),
281 }
282}