1#[cfg(feature = "timezones")]
2use polars_core::chunked_array::temporal::parse_time_zone;
3use polars_core::prelude::*;
4use polars_ops::prelude::*;
5use polars_ops::series::SeriesMethods;
6
7use crate::prelude::*;
8
9pub trait PolarsUpsample {
10 fn upsample<I: IntoVec<PlSmallStr>>(
42 &self,
43 by: I,
44 time_column: &str,
45 every: Duration,
46 ) -> PolarsResult<DataFrame>;
47
48 fn upsample_stable<I: IntoVec<PlSmallStr>>(
83 &self,
84 by: I,
85 time_column: &str,
86 every: Duration,
87 ) -> PolarsResult<DataFrame>;
88}
89
90impl PolarsUpsample for DataFrame {
91 fn upsample<I: IntoVec<PlSmallStr>>(
92 &self,
93 by: I,
94 time_column: &str,
95 every: Duration,
96 ) -> PolarsResult<DataFrame> {
97 let by = by.into_vec();
98 let time_type = self.column(time_column)?.dtype();
99 ensure_duration_matches_dtype(every, time_type, "every")?;
100 upsample_impl(self, by, time_column, every, false)
101 }
102
103 fn upsample_stable<I: IntoVec<PlSmallStr>>(
104 &self,
105 by: I,
106 time_column: &str,
107 every: Duration,
108 ) -> PolarsResult<DataFrame> {
109 let by = by.into_vec();
110 let time_type = self.column(time_column)?.dtype();
111 ensure_duration_matches_dtype(every, time_type, "every")?;
112 upsample_impl(self, by, time_column, every, true)
113 }
114}
115
116fn upsample_impl(
117 source: &DataFrame,
118 by: Vec<PlSmallStr>,
119 index_column: &str,
120 every: Duration,
121 stable: bool,
122) -> PolarsResult<DataFrame> {
123 let s = source.column(index_column)?;
124 let time_type = s.dtype();
125 if matches!(time_type, DataType::Date) {
126 let mut df = source.clone();
127 df.apply(index_column, |s| {
128 s.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
129 .unwrap()
130 })
131 .unwrap();
132 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
133 out.apply(index_column, |s| s.cast(time_type).unwrap())
134 .unwrap();
135 Ok(out)
136 } else if matches!(
137 time_type,
138 DataType::UInt32 | DataType::UInt64 | DataType::Int32
139 ) {
140 let mut df = source.clone();
141
142 df.apply(index_column, |s| {
143 s.cast(&DataType::Int64)
144 .unwrap()
145 .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
146 .unwrap()
147 })
148 .unwrap();
149 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
150 out.apply(index_column, |s| s.cast(time_type).unwrap())
151 .unwrap();
152 Ok(out)
153 } else if matches!(time_type, DataType::Int64) {
154 let mut df = source.clone();
155 df.apply(index_column, |s| {
156 s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
157 .unwrap()
158 })
159 .unwrap();
160 let mut out = upsample_impl(&df, by, index_column, every, stable)?;
161 out.apply(index_column, |s| s.cast(time_type).unwrap())
162 .unwrap();
163 Ok(out)
164 } else if by.is_empty() {
165 let index_column = source.column(index_column)?;
166 upsample_single_impl(source, index_column.as_materialized_series(), every)
167 } else {
168 let gb = if stable {
169 source.group_by_stable(by)
170 } else {
171 source.group_by(by)
172 };
173 gb?.apply(|df| {
175 let index_column = df.column(index_column)?;
176 upsample_single_impl(&df, index_column.as_materialized_series(), every)
177 })
178 }
179}
180
181fn upsample_single_impl(
182 source: &DataFrame,
183 index_column: &Series,
184 every: Duration,
185) -> PolarsResult<DataFrame> {
186 index_column.ensure_sorted_arg("upsample")?;
187 let index_col_name = index_column.name();
188
189 use DataType::*;
190 match index_column.dtype() {
191 Datetime(tu, tz) => {
192 let s = index_column.cast(&Int64).unwrap();
193 let ca = s.i64().unwrap();
194 let first = ca.iter().flatten().next();
195 let last = ca.iter().flatten().next_back();
196 match (first, last) {
197 (Some(first), Some(last)) => {
198 let tz = match tz {
199 #[cfg(feature = "timezones")]
200 Some(tz) => Some(parse_time_zone(tz)?),
201 _ => None,
202 };
203 let range = datetime_range_impl(
204 index_col_name.clone(),
205 first,
206 last,
207 every,
208 ClosedWindow::Both,
209 *tu,
210 tz.as_ref(),
211 )?
212 .into_series()
213 .into_frame();
214 range.join(
215 source,
216 [index_col_name.clone()],
217 [index_col_name.clone()],
218 JoinArgs::new(JoinType::Left),
219 None,
220 )
221 },
222 _ => polars_bail!(
223 ComputeError: "cannot determine upsample boundaries: all elements are null"
224 ),
225 }
226 },
227 dt => polars_bail!(
228 ComputeError: "upsample not allowed for index column of dtype {}", dt,
229 ),
230 }
231}