1use arrow::legacy::time_zone::Tz;
2use arrow::temporal_conversions::*;
3use chrono::NaiveDateTime;
4#[cfg(feature = "timezones")]
5use chrono::TimeZone;
6use now::DateTimeNow;
7use polars_core::prelude::*;
8
9use crate::prelude::*;
10
11#[allow(clippy::too_many_arguments)]
25pub(crate) fn ensure_t_in_or_in_front_of_window(
26 mut every: Duration,
27 t: i64,
28 offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
29 nte_duration_fn: fn(&Duration) -> i64,
30 period: Duration,
31 mut start: i64,
32 closed_window: ClosedWindow,
33 tz: Option<&Tz>,
34) -> PolarsResult<Bounds> {
35 every.negative = !every.negative;
36 let mut stop = offset_fn(&period, start, tz)?;
37
38 while Bounds::new(start, stop).is_past(t, closed_window) {
39 let mut gap = start - t;
40 if matches!(closed_window, ClosedWindow::Right | ClosedWindow::None) {
41 gap += 1;
42 }
43 debug_assert!(gap >= 1);
44
45 let stride = (gap + nte_duration_fn(&every) - 1) / nte_duration_fn(&every);
47 debug_assert!(stride >= 1);
48 let stride = std::cmp::max(stride, 1);
49
50 start = offset_fn(&(every * stride), start, tz)?;
51 stop = offset_fn(&period, start, tz)?;
52 }
53 Ok(Bounds::new_checked(start, stop))
54}
55
56#[derive(Copy, Clone)]
58pub struct Window {
59 pub(crate) every: Duration,
63 pub(crate) period: Duration,
64 pub offset: Duration,
65}
66
67impl Window {
68 pub fn new(every: Duration, period: Duration, offset: Duration) -> Self {
69 debug_assert!(!every.negative);
70 Self {
71 every,
72 period,
73 offset,
74 }
75 }
76
77 pub fn truncate_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
79 self.every.truncate_ns(t, tz)
80 }
81
82 pub fn truncate_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
84 self.every.truncate_us(t, tz)
85 }
86
87 pub fn truncate_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
89 self.every.truncate_ms(t, tz)
90 }
91
92 pub fn round_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
94 let t = t + self.every.duration_ns() / 2_i64;
95 self.truncate_ns(t, tz)
96 }
97
98 pub fn round_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
100 let t = t + self.every.duration_ns()
101 / (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);
102 self.truncate_us(t, tz)
103 }
104
105 pub fn round_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
107 let t = t + self.every.duration_ns()
108 / (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);
109 self.truncate_ms(t, tz)
110 }
111
112 pub fn get_earliest_bounds_ns(
116 &self,
117 t: i64,
118 closed_window: ClosedWindow,
119 tz: Option<&Tz>,
120 ) -> PolarsResult<Bounds> {
121 let start = self.truncate_ns(t, tz)?;
122 let start = self.offset.add_ns(start, tz)?;
123 ensure_t_in_or_in_front_of_window(
124 self.every,
125 t,
126 Duration::add_ns,
127 Duration::nte_duration_ns,
128 self.period,
129 start,
130 closed_window,
131 tz,
132 )
133 }
134
135 pub fn get_earliest_bounds_us(
136 &self,
137 t: i64,
138 closed_window: ClosedWindow,
139 tz: Option<&Tz>,
140 ) -> PolarsResult<Bounds> {
141 let start = self.truncate_us(t, tz)?;
142 let start = self.offset.add_us(start, tz)?;
143 ensure_t_in_or_in_front_of_window(
144 self.every,
145 t,
146 Duration::add_us,
147 Duration::nte_duration_us,
148 self.period,
149 start,
150 closed_window,
151 tz,
152 )
153 }
154
155 pub fn get_earliest_bounds_ms(
156 &self,
157 t: i64,
158 closed_window: ClosedWindow,
159 tz: Option<&Tz>,
160 ) -> PolarsResult<Bounds> {
161 let start = self.truncate_ms(t, tz)?;
162 let start = self.offset.add_ms(start, tz)?;
163 ensure_t_in_or_in_front_of_window(
164 self.every,
165 t,
166 Duration::add_ms,
167 Duration::nte_duration_ms,
168 self.period,
169 start,
170 closed_window,
171 tz,
172 )
173 }
174
175 pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {
176 (boundary.duration() / self.every.duration_ns()
177 + self.period.duration_ns() / self.every.duration_ns()) as usize
178 }
179
180 pub(crate) fn estimate_overlapping_bounds_us(&self, boundary: Bounds) -> usize {
181 (boundary.duration() / self.every.duration_us()
182 + self.period.duration_us() / self.every.duration_us()) as usize
183 }
184
185 pub(crate) fn estimate_overlapping_bounds_ms(&self, boundary: Bounds) -> usize {
186 (boundary.duration() / self.every.duration_ms()
187 + self.period.duration_ms() / self.every.duration_ms()) as usize
188 }
189
190 pub fn get_overlapping_bounds_iter<'a>(
191 &'a self,
192 boundary: Bounds,
193 closed_window: ClosedWindow,
194 tu: TimeUnit,
195 tz: Option<&'a Tz>,
196 start_by: StartBy,
197 ) -> PolarsResult<BoundsIter<'a>> {
198 BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)
199 }
200}
201
202pub struct BoundsIter<'a> {
203 window: Window,
204 boundary: Bounds,
206 bi: Bounds,
208 tu: TimeUnit,
209 tz: Option<&'a Tz>,
210}
211impl<'a> BoundsIter<'a> {
212 fn new(
213 window: Window,
214 closed_window: ClosedWindow,
215 boundary: Bounds,
216 tu: TimeUnit,
217 tz: Option<&'a Tz>,
218 start_by: StartBy,
219 ) -> PolarsResult<Self> {
220 let bi = match start_by {
221 StartBy::DataPoint => {
222 let mut boundary = boundary;
223 let offset_fn = match tu {
224 TimeUnit::Nanoseconds => Duration::add_ns,
225 TimeUnit::Microseconds => Duration::add_us,
226 TimeUnit::Milliseconds => Duration::add_ms,
227 };
228 boundary.stop = offset_fn(&window.period, boundary.start, tz)?;
229 boundary
230 },
231 StartBy::WindowBound => match tu {
232 TimeUnit::Nanoseconds => {
233 window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?
234 },
235 TimeUnit::Microseconds => {
236 window.get_earliest_bounds_us(boundary.start, closed_window, tz)?
237 },
238 TimeUnit::Milliseconds => {
239 window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?
240 },
241 },
242 _ => {
243 {
244 #[allow(clippy::type_complexity)]
245 let (from, to, offset_fn, nte_duration_fn): (
246 fn(i64) -> NaiveDateTime,
247 fn(NaiveDateTime) -> i64,
248 fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
249 fn(&Duration) -> i64,
250 ) = match tu {
251 TimeUnit::Nanoseconds => (
252 timestamp_ns_to_datetime,
253 datetime_to_timestamp_ns,
254 Duration::add_ns,
255 Duration::nte_duration_ns,
256 ),
257 TimeUnit::Microseconds => (
258 timestamp_us_to_datetime,
259 datetime_to_timestamp_us,
260 Duration::add_us,
261 Duration::nte_duration_us,
262 ),
263 TimeUnit::Milliseconds => (
264 timestamp_ms_to_datetime,
265 datetime_to_timestamp_ms,
266 Duration::add_ms,
267 Duration::nte_duration_ms,
268 ),
269 };
270 let dt = from(boundary.start);
272 match tz {
273 #[cfg(feature = "timezones")]
274 Some(tz) => {
275 let dt = tz.from_utc_datetime(&dt);
276 let dt = dt.beginning_of_week();
277 let dt = dt.naive_utc();
278 let start = to(dt);
279 let start = offset_fn(
281 &Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
282 start,
283 Some(tz),
284 )?;
285 let start = offset_fn(&window.offset, start, Some(tz))?;
287 ensure_t_in_or_in_front_of_window(
290 window.every,
291 boundary.start,
292 offset_fn,
293 nte_duration_fn,
294 window.period,
295 start,
296 closed_window,
297 Some(tz),
298 )?
299 },
300 _ => {
301 let tz = chrono::Utc;
302 let dt = dt.and_local_timezone(tz).unwrap();
303 let dt = dt.beginning_of_week();
304 let dt = dt.naive_utc();
305 let start = to(dt);
306 let start = offset_fn(
308 &Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
309 start,
310 None,
311 )
312 .unwrap();
313 let start = offset_fn(&window.offset, start, None).unwrap();
315 ensure_t_in_or_in_front_of_window(
318 window.every,
319 boundary.start,
320 offset_fn,
321 nte_duration_fn,
322 window.period,
323 start,
324 closed_window,
325 None,
326 )?
327 },
328 }
329 }
330 },
331 };
332 Ok(Self {
333 window,
334 boundary,
335 bi,
336 tu,
337 tz,
338 })
339 }
340}
341
342impl Iterator for BoundsIter<'_> {
343 type Item = Bounds;
344
345 fn next(&mut self) -> Option<Self::Item> {
346 if self.bi.start < self.boundary.stop {
347 let out = self.bi;
348 match self.tu {
349 TimeUnit::Nanoseconds => {
352 self.bi.start = self.window.every.add_ns(self.bi.start, self.tz).unwrap();
353 self.bi.stop = self.window.period.add_ns(self.bi.start, self.tz).unwrap();
354 },
355 TimeUnit::Microseconds => {
356 self.bi.start = self.window.every.add_us(self.bi.start, self.tz).unwrap();
357 self.bi.stop = self.window.period.add_us(self.bi.start, self.tz).unwrap();
358 },
359 TimeUnit::Milliseconds => {
360 self.bi.start = self.window.every.add_ms(self.bi.start, self.tz).unwrap();
361 self.bi.stop = self.window.period.add_ms(self.bi.start, self.tz).unwrap();
362 },
363 }
364 Some(out)
365 } else {
366 None
367 }
368 }
369
370 fn nth(&mut self, n: usize) -> Option<Self::Item> {
371 let n: i64 = n.try_into().unwrap();
372 if self.bi.start < self.boundary.stop {
373 match self.tu {
374 TimeUnit::Nanoseconds => {
375 self.bi.start = (self.window.every * n)
376 .add_ns(self.bi.start, self.tz)
377 .unwrap();
378 self.bi.stop = (self.window.period).add_ns(self.bi.start, self.tz).unwrap();
379 },
380 TimeUnit::Microseconds => {
381 self.bi.start = (self.window.every * n)
382 .add_us(self.bi.start, self.tz)
383 .unwrap();
384 self.bi.stop = (self.window.period).add_us(self.bi.start, self.tz).unwrap();
385 },
386 TimeUnit::Milliseconds => {
387 self.bi.start = (self.window.every * n)
388 .add_ms(self.bi.start, self.tz)
389 .unwrap();
390 self.bi.stop = (self.window.period).add_ms(self.bi.start, self.tz).unwrap();
391 },
392 }
393 self.next()
394 } else {
395 None
396 }
397 }
398}
399
400impl<'a> BoundsIter<'a> {
401 pub fn get_stride(&self, target: i64) -> usize {
406 let mut stride = 0;
407 if self.bi.start < self.boundary.stop && target > self.bi.start {
408 let gap = target - self.bi.start;
409 match self.tu {
410 TimeUnit::Nanoseconds => {
411 if gap
412 > self.window.every.nte_duration_ns() + self.window.period.nte_duration_ns()
413 {
414 stride = ((gap - self.window.period.nte_duration_ns()) as usize)
415 / (self.window.every.nte_duration_ns() as usize);
416 }
417 },
418 TimeUnit::Microseconds => {
419 if gap
420 > self.window.every.nte_duration_us() + self.window.period.nte_duration_us()
421 {
422 stride = ((gap - self.window.period.nte_duration_us()) as usize)
423 / (self.window.every.nte_duration_us() as usize);
424 }
425 },
426 TimeUnit::Milliseconds => {
427 if gap
428 > self.window.every.nte_duration_ms() + self.window.period.nte_duration_ms()
429 {
430 stride = ((gap - self.window.period.nte_duration_ms()) as usize)
431 / (self.window.every.nte_duration_ms() as usize);
432 }
433 },
434 }
435 }
436 stride
437 }
438}