1#[cfg(feature = "timezones")]
2use polars_core::datatypes::time_zone::parse_time_zone;
3use polars_core::prelude::*;
4use polars_core::utils::accumulate_dataframes_vertical_unchecked;
5use polars_ops::prelude::*;
6use polars_ops::series::SeriesMethods;
7
8use crate::prelude::*;
9
10pub trait PolarsUpsample {
11 fn upsample<I: IntoVec<PlSmallStr>>(
43 &self,
44 by: I,
45 time_column: &str,
46 every: Duration,
47 ) -> PolarsResult<DataFrame>;
48
49 fn upsample_stable<I: IntoVec<PlSmallStr>>(
84 &self,
85 by: I,
86 time_column: &str,
87 every: Duration,
88 ) -> PolarsResult<DataFrame>;
89}
90
91impl PolarsUpsample for DataFrame {
92 fn upsample<I: IntoVec<PlSmallStr>>(
93 &self,
94 by: I,
95 time_column: &str,
96 every: Duration,
97 ) -> PolarsResult<DataFrame> {
98 let by = by.into_vec();
99 let time_type = self.column(time_column)?.dtype();
100 ensure_duration_matches_dtype(every, time_type, "every")?;
101 upsample_impl(self, by, time_column, every, false)
102 }
103
104 fn upsample_stable<I: IntoVec<PlSmallStr>>(
105 &self,
106 by: I,
107 time_column: &str,
108 every: Duration,
109 ) -> PolarsResult<DataFrame> {
110 let by = by.into_vec();
111 let time_type = self.column(time_column)?.dtype();
112 ensure_duration_matches_dtype(every, time_type, "every")?;
113 upsample_impl(self, by, time_column, every, true)
114 }
115}
116
117fn upsample_impl(
118 source: &DataFrame,
119 by: Vec<PlSmallStr>,
120 index_column: &str,
121 every: Duration,
122 stable: bool,
123) -> PolarsResult<DataFrame> {
124 let s = source.column(index_column)?;
125 let original_type = s.dtype();
126
127 let needs_cast = matches!(
128 original_type,
129 DataType::Date | DataType::UInt32 | DataType::UInt64 | DataType::Int32 | DataType::Int64
130 );
131
132 let mut df = source.clone();
133
134 if needs_cast {
135 df.try_apply(index_column, |s| match s.dtype() {
136 #[cfg(feature = "dtype-date")]
137 DataType::Date => s.cast(&DataType::Datetime(TimeUnit::Microseconds, None)),
138 DataType::UInt32 | DataType::UInt64 | DataType::Int32 => s
139 .cast(&DataType::Int64)?
140 .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
141 DataType::Int64 => s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
142 _ => Ok(s.clone()),
143 })?;
144 }
145
146 let mut out = upsample_core(&df, by, index_column, every, stable)?;
147
148 if needs_cast {
149 out.try_apply(index_column, |s| s.cast(original_type))?;
150 }
151
152 Ok(out)
153}
154
155fn upsample_core(
156 source: &DataFrame,
157 by: Vec<PlSmallStr>,
158 index_column: &str,
159 every: Duration,
160 stable: bool,
161) -> PolarsResult<DataFrame> {
162 if by.is_empty() {
163 let index_column = source.column(index_column)?;
164 return upsample_single_impl(source, index_column.as_materialized_series(), every);
165 }
166
167 let source_schema = source.schema();
168
169 let group_keys_df = source.select(by)?;
170 let group_keys_schema = group_keys_df.schema();
171
172 let groups = if stable {
173 group_keys_df.group_by_stable(group_keys_schema.iter_names_cloned())
174 } else {
175 group_keys_df.group_by(group_keys_schema.iter_names_cloned())
176 }?
177 .into_groups();
178
179 let non_group_keys_df = unsafe {
180 source.select_unchecked(
181 source_schema
182 .iter_names()
183 .filter(|name| !group_keys_schema.contains(name.as_str())),
184 )?
185 };
186
187 let upsample_index_col_idx: Option<usize> = non_group_keys_df.schema().index_of(index_column);
188
189 let dfs: Vec<DataFrame> = groups
191 .iter()
192 .map(|g| {
193 let first_idx = g.first();
194
195 let mut non_group_keys_df = unsafe { non_group_keys_df.gather_group_unchecked(&g) };
196
197 if let Some(i) = upsample_index_col_idx {
198 non_group_keys_df = upsample_single_impl(
199 &non_group_keys_df,
200 non_group_keys_df.columns()[i].as_materialized_series(),
201 every,
202 )?
203 }
204
205 let mut out = non_group_keys_df;
206
207 let group_keys_df = group_keys_df.new_from_index(first_idx as usize, out.height());
208
209 let out_cols = unsafe { out.columns_mut() };
210
211 out_cols.reserve(group_keys_df.width());
212 out_cols.extend(group_keys_df.into_columns());
213
214 Ok(out)
215 })
216 .collect::<PolarsResult<_>>()?;
217
218 Ok(unsafe {
219 accumulate_dataframes_vertical_unchecked(dfs)
220 .select_unchecked(source_schema.iter_names())?
221 .with_schema(source_schema.clone())
222 })
223}
224
225fn upsample_single_impl(
226 source: &DataFrame,
227 index_column: &Series,
228 every: Duration,
229) -> PolarsResult<DataFrame> {
230 index_column.ensure_sorted_arg("upsample")?;
231 let index_col_name = index_column.name();
232
233 use DataType::*;
234 match index_column.dtype() {
235 #[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
236 Datetime(tu, tz) => {
237 let s = index_column.cast(&Int64).unwrap();
238 let ca = s.i64().unwrap();
239 let first = ca.iter().flatten().next();
240 let last = ca.iter().flatten().next_back();
241 match (first, last) {
242 (Some(first), Some(last)) => {
243 let tz = match tz {
244 #[cfg(feature = "timezones")]
245 Some(tz) => Some(parse_time_zone(tz)?),
246 _ => None,
247 };
248 let range = datetime_range_impl(
249 index_col_name.clone(),
250 first,
251 last,
252 every,
253 ClosedWindow::Both,
254 *tu,
255 tz.as_ref(),
256 )?
257 .into_series()
258 .into_frame();
259 range.join(
260 source,
261 [index_col_name.clone()],
262 [index_col_name.clone()],
263 JoinArgs::new(JoinType::Left),
264 None,
265 )
266 },
267 _ => polars_bail!(
268 ComputeError: "cannot determine upsample boundaries: all elements are null"
269 ),
270 }
271 },
272 dt => polars_bail!(
273 ComputeError: "upsample not allowed for index column of dtype {}", dt,
274 ),
275 }
276}