1use std::collections::VecDeque;
2
3use arrow::legacy::time_zone::Tz;
4use arrow::temporal_conversions::{
5 timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
6};
7use arrow::trusted_len::TrustedLen;
8use chrono::NaiveDateTime;
9#[cfg(feature = "timezones")]
10use chrono::TimeZone as _;
11use now::DateTimeNow;
12use polars_core::POOL;
13use polars_core::prelude::*;
14use polars_core::utils::_split_offsets;
15use polars_core::utils::flatten::flatten_par;
16use rayon::prelude::*;
17#[cfg(feature = "serde")]
18use serde::{Deserialize, Serialize};
19use strum_macros::IntoStaticStr;
20
21use crate::prelude::*;
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
24#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
25#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
26#[strum(serialize_all = "snake_case")]
27pub enum ClosedWindow {
28 Left,
29 Right,
30 Both,
31 None,
32}
33
34#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
35#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
36#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
37#[strum(serialize_all = "snake_case")]
38pub enum Label {
39 Left,
40 Right,
41 DataPoint,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
45#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
46#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
47#[strum(serialize_all = "snake_case")]
48#[derive(Default)]
49pub enum StartBy {
50 #[default]
51 WindowBound,
52 DataPoint,
53 Monday,
55 Tuesday,
56 Wednesday,
57 Thursday,
58 Friday,
59 Saturday,
60 Sunday,
61}
62
63impl StartBy {
64 pub fn weekday(&self) -> Option<u32> {
65 match self {
66 StartBy::Monday => Some(0),
67 StartBy::Tuesday => Some(1),
68 StartBy::Wednesday => Some(2),
69 StartBy::Thursday => Some(3),
70 StartBy::Friday => Some(4),
71 StartBy::Saturday => Some(5),
72 StartBy::Sunday => Some(6),
73 _ => None,
74 }
75 }
76}
77
78#[allow(clippy::too_many_arguments)]
79fn update_groups_and_bounds(
80 bounds_iter: BoundsIter<'_>,
81 mut start: usize,
82 time: &[i64],
83 closed_window: ClosedWindow,
84 include_lower_bound: bool,
85 include_upper_bound: bool,
86 lower_bound: &mut Vec<i64>,
87 upper_bound: &mut Vec<i64>,
88 groups: &mut Vec<[IdxSize; 2]>,
89) {
90 let mut iter = bounds_iter.into_iter();
91 let mut stride = 0;
92
93 'bounds: while let Some(bi) = iter.nth(stride) {
94 let mut has_member = false;
95 for &t in &time[start..time.len().saturating_sub(1)] {
97 if bi.is_future(t, closed_window) {
99 stride = iter.get_stride(t);
100 continue 'bounds;
101 }
102 if bi.is_member_entry(t, closed_window) {
103 has_member = true;
104 break;
105 }
106 start += 1;
108 }
109
110 stride = if has_member {
112 0
113 } else {
114 debug_assert!(start < time.len());
115 iter.get_stride(time[start])
116 };
117
118 let mut end = start;
120
121 if end == time.len() - 1 {
123 let t = time[end];
124 if bi.is_member(t, closed_window) {
125 if include_lower_bound {
126 lower_bound.push(bi.start);
127 }
128 if include_upper_bound {
129 upper_bound.push(bi.stop);
130 }
131 groups.push([end as IdxSize, 1])
132 }
133 continue;
134 }
135 for &t in &time[end..] {
136 if !bi.is_member_exit(t, closed_window) {
137 break;
138 }
139 end += 1;
140 }
141 let len = end - start;
142
143 if include_lower_bound {
144 lower_bound.push(bi.start);
145 }
146 if include_upper_bound {
147 upper_bound.push(bi.stop);
148 }
149 groups.push([start as IdxSize, len as IdxSize])
150 }
151}
152
153#[allow(clippy::too_many_arguments)]
165pub fn group_by_windows(
166 window: Window,
167 time: &[i64],
168 closed_window: ClosedWindow,
169 tu: TimeUnit,
170 tz: &Option<TimeZone>,
171 include_lower_bound: bool,
172 include_upper_bound: bool,
173 start_by: StartBy,
174) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
175 let start = time[0];
176 let boundary = if time.len() > 1 {
180 let stop = time[time.len() - 1] + 1;
182 Bounds::new_checked(start, stop)
183 } else {
184 let stop = start + 1;
185 Bounds::new_checked(start, stop)
186 };
187
188 let size = {
189 match tu {
190 TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
191 TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
192 TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
193 }
194 };
195 let size_lower = if include_lower_bound { size } else { 0 };
196 let size_upper = if include_upper_bound { size } else { 0 };
197 let mut lower_bound = Vec::with_capacity(size_lower);
198 let mut upper_bound = Vec::with_capacity(size_upper);
199
200 let mut groups = Vec::with_capacity(size);
201 let start_offset = 0;
202
203 match tz {
204 #[cfg(feature = "timezones")]
205 Some(tz) => {
206 update_groups_and_bounds(
207 window.get_overlapping_bounds_iter(
208 boundary,
209 closed_window,
210 tu,
211 tz.parse::<Tz>().ok().as_ref(),
212 start_by,
213 )?,
214 start_offset,
215 time,
216 closed_window,
217 include_lower_bound,
218 include_upper_bound,
219 &mut lower_bound,
220 &mut upper_bound,
221 &mut groups,
222 );
223 },
224 _ => {
225 update_groups_and_bounds(
226 window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
227 start_offset,
228 time,
229 closed_window,
230 include_lower_bound,
231 include_upper_bound,
232 &mut lower_bound,
233 &mut upper_bound,
234 &mut groups,
235 );
236 },
237 };
238
239 Ok((groups, lower_bound, upper_bound))
240}
241
242#[inline]
246#[allow(clippy::too_many_arguments)]
247pub(crate) fn group_by_values_iter_lookbehind(
248 period: Duration,
249 offset: Duration,
250 time: &[i64],
251 closed_window: ClosedWindow,
252 tu: TimeUnit,
253 tz: Option<Tz>,
254 start_offset: usize,
255 upper_bound: Option<usize>,
256) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
257 debug_assert!(offset.duration_ns() == period.duration_ns());
258 debug_assert!(offset.negative);
259 let add = match tu {
260 TimeUnit::Nanoseconds => Duration::add_ns,
261 TimeUnit::Microseconds => Duration::add_us,
262 TimeUnit::Milliseconds => Duration::add_ms,
263 };
264
265 let upper_bound = upper_bound.unwrap_or(time.len());
266 let mut start = if let Some(&t) = time.get(start_offset) {
268 let lower = add(&offset, t, tz.as_ref())?;
269 let upper = t;
274 let b = Bounds::new(lower, upper);
275 let slice = &time[..start_offset];
276 slice.partition_point(|v| !b.is_member(*v, closed_window))
277 } else {
278 0
279 };
280 let mut end = start;
281 let mut last = time[start_offset];
282 Ok(time[start_offset..upper_bound]
283 .iter()
284 .enumerate()
285 .map(move |(mut i, t)| {
286 if *t == last && i > 0 {
288 let len = end - start;
289 let offset = start as IdxSize;
290 return Ok((offset, len as IdxSize));
291 }
292 last = *t;
293 i += start_offset;
294
295 let lower = add(&offset, *t, tz.as_ref())?;
296 let upper = *t;
297
298 let b = Bounds::new(lower, upper);
299
300 for &t in unsafe { time.get_unchecked(start..i) } {
301 if b.is_member_entry(t, closed_window) {
302 break;
303 }
304 start += 1;
305 }
306
307 if b.is_member_exit(*t, closed_window) {
309 end = i;
310 } else {
311 end = std::cmp::max(end, start);
312 }
313 for &t in unsafe { time.get_unchecked(end..) } {
315 if !b.is_member_exit(t, closed_window) {
316 break;
317 }
318 end += 1;
319 }
320
321 let len = end - start;
322 let offset = start as IdxSize;
323
324 Ok((offset, len as IdxSize))
325 }))
326}
327
328pub(crate) fn group_by_values_iter_window_behind_t(
333 period: Duration,
334 offset: Duration,
335 time: &[i64],
336 closed_window: ClosedWindow,
337 tu: TimeUnit,
338 tz: Option<Tz>,
339) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
340 let add = match tu {
341 TimeUnit::Nanoseconds => Duration::add_ns,
342 TimeUnit::Microseconds => Duration::add_us,
343 TimeUnit::Milliseconds => Duration::add_ms,
344 };
345
346 let mut start = 0;
347 let mut end = start;
348 let mut last = time[0];
349 let mut started = false;
350 time.iter().map(move |lower| {
351 if *lower == last && started {
353 let len = end - start;
354 let offset = start as IdxSize;
355 return Ok((offset, len as IdxSize));
356 }
357 last = *lower;
358 started = true;
359 let lower = add(&offset, *lower, tz.as_ref())?;
360 let upper = add(&period, lower, tz.as_ref())?;
361
362 let b = Bounds::new(lower, upper);
363 if b.is_future(time[0], closed_window) {
364 Ok((0, 0))
365 } else {
366 for &t in &time[start..] {
367 if b.is_member_entry(t, closed_window) {
368 break;
369 }
370 start += 1;
371 }
372
373 end = std::cmp::max(start, end);
374 for &t in &time[end..] {
375 if !b.is_member_exit(t, closed_window) {
376 break;
377 }
378 end += 1;
379 }
380
381 let len = end - start;
382 let offset = start as IdxSize;
383
384 Ok((offset, len as IdxSize))
385 }
386 })
387}
388
389pub(crate) fn group_by_values_iter_partial_lookbehind(
393 period: Duration,
394 offset: Duration,
395 time: &[i64],
396 closed_window: ClosedWindow,
397 tu: TimeUnit,
398 tz: Option<Tz>,
399) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
400 let add = match tu {
401 TimeUnit::Nanoseconds => Duration::add_ns,
402 TimeUnit::Microseconds => Duration::add_us,
403 TimeUnit::Milliseconds => Duration::add_ms,
404 };
405
406 let mut start = 0;
407 let mut end = start;
408 let mut last = time[0];
409 time.iter().enumerate().map(move |(i, lower)| {
410 if *lower == last && i > 0 {
412 let len = end - start;
413 let offset = start as IdxSize;
414 return Ok((offset, len as IdxSize));
415 }
416 last = *lower;
417
418 let lower = add(&offset, *lower, tz.as_ref())?;
419 let upper = add(&period, lower, tz.as_ref())?;
420
421 let b = Bounds::new(lower, upper);
422
423 for &t in &time[start..] {
424 if b.is_member_entry(t, closed_window) || start == i {
425 break;
426 }
427 start += 1;
428 }
429
430 end = std::cmp::max(start, end);
431 for &t in &time[end..] {
432 if !b.is_member_exit(t, closed_window) {
433 break;
434 }
435 end += 1;
436 }
437
438 let len = end - start;
439 let offset = start as IdxSize;
440
441 Ok((offset, len as IdxSize))
442 })
443}
444
445#[allow(clippy::too_many_arguments)]
446pub(crate) fn group_by_values_iter_lookahead(
450 period: Duration,
451 offset: Duration,
452 time: &[i64],
453 closed_window: ClosedWindow,
454 tu: TimeUnit,
455 tz: Option<Tz>,
456 start_offset: usize,
457 upper_bound: Option<usize>,
458) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
459 let upper_bound = upper_bound.unwrap_or(time.len());
460
461 let add = match tu {
462 TimeUnit::Nanoseconds => Duration::add_ns,
463 TimeUnit::Microseconds => Duration::add_us,
464 TimeUnit::Milliseconds => Duration::add_ms,
465 };
466 let mut start = start_offset;
467 let mut end = start;
468
469 let mut last = time[start_offset];
470 let mut started = false;
471 time[start_offset..upper_bound].iter().map(move |lower| {
472 if *lower == last && started {
474 let len = end - start;
475 let offset = start as IdxSize;
476 return Ok((offset, len as IdxSize));
477 }
478 started = true;
479 last = *lower;
480
481 let lower = add(&offset, *lower, tz.as_ref())?;
482 let upper = add(&period, lower, tz.as_ref())?;
483
484 let b = Bounds::new(lower, upper);
485
486 for &t in &time[start..] {
487 if b.is_member_entry(t, closed_window) {
488 break;
489 }
490 start += 1;
491 }
492
493 end = std::cmp::max(start, end);
494 for &t in &time[end..] {
495 if !b.is_member_exit(t, closed_window) {
496 break;
497 }
498 end += 1;
499 }
500
501 let len = end - start;
502 let offset = start as IdxSize;
503
504 Ok((offset, len as IdxSize))
505 })
506}
507
508#[cfg(feature = "rolling_window_by")]
509#[inline]
510pub(crate) fn group_by_values_iter(
511 period: Duration,
512 time: &[i64],
513 closed_window: ClosedWindow,
514 tu: TimeUnit,
515 tz: Option<Tz>,
516) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
517 let mut offset = period;
518 offset.negative = true;
519 group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
521}
522
523fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
526 let is_valid = |window: &[(usize, usize)]| -> bool {
527 debug_assert_eq!(window.len(), 2);
528 let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
529 let right_block_start = window[1].0;
530 time[left_block_end] != time[right_block_start]
531 };
532
533 if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
534 return;
535 }
536
537 let mut new = vec![];
538 for window in thread_offsets.windows(2) {
539 let this_block_is_valid = is_valid(window);
540 if this_block_is_valid {
541 new.push(window[0])
543 }
544 }
545 if thread_offsets.len().is_multiple_of(2) {
547 let window = &thread_offsets[thread_offsets.len() - 2..];
548 if is_valid(window) {
549 new.push(thread_offsets[thread_offsets.len() - 1])
550 }
551 }
552 if new.len() <= 1 {
554 new = vec![(0, time.len())];
555 } else {
556 let mut previous_start = time.len();
557 for window in new.iter_mut().rev() {
558 window.1 = previous_start - window.0;
559 previous_start = window.0;
560 }
561 new[0].0 = 0;
562 new[0].1 = new[1].0;
563 debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
564 prune_splits_on_duplicates(time, &mut new)
566 }
567 std::mem::swap(thread_offsets, &mut new);
568}
569
570#[allow(clippy::too_many_arguments)]
571fn group_by_values_iter_lookbehind_collected(
572 period: Duration,
573 offset: Duration,
574 time: &[i64],
575 closed_window: ClosedWindow,
576 tu: TimeUnit,
577 tz: Option<Tz>,
578 start_offset: usize,
579 upper_bound: Option<usize>,
580) -> PolarsResult<Vec<[IdxSize; 2]>> {
581 let iter = group_by_values_iter_lookbehind(
582 period,
583 offset,
584 time,
585 closed_window,
586 tu,
587 tz,
588 start_offset,
589 upper_bound,
590 )?;
591 iter.map(|result| result.map(|(offset, len)| [offset, len]))
592 .collect::<PolarsResult<Vec<_>>>()
593}
594
595#[allow(clippy::too_many_arguments)]
596pub(crate) fn group_by_values_iter_lookahead_collected(
597 period: Duration,
598 offset: Duration,
599 time: &[i64],
600 closed_window: ClosedWindow,
601 tu: TimeUnit,
602 tz: Option<Tz>,
603 start_offset: usize,
604 upper_bound: Option<usize>,
605) -> PolarsResult<Vec<[IdxSize; 2]>> {
606 let iter = group_by_values_iter_lookahead(
607 period,
608 offset,
609 time,
610 closed_window,
611 tu,
612 tz,
613 start_offset,
614 upper_bound,
615 );
616 iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
617 .collect::<PolarsResult<Vec<_>>>()
618}
619
620pub fn group_by_values(
628 period: Duration,
629 offset: Duration,
630 time: &[i64],
631 closed_window: ClosedWindow,
632 tu: TimeUnit,
633 tz: Option<Tz>,
634) -> PolarsResult<GroupsSlice> {
635 if time.is_empty() {
636 return Ok(GroupsSlice::from(vec![]));
637 }
638
639 let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
640 prune_splits_on_duplicates(time, &mut thread_offsets);
642
643 let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);
645
646 if offset.negative && !offset.is_zero() {
648 if offset.duration_ns() == period.duration_ns() {
650 if !run_parallel {
654 let vecs = group_by_values_iter_lookbehind_collected(
655 period,
656 offset,
657 time,
658 closed_window,
659 tu,
660 tz,
661 0,
662 None,
663 )?;
664 return Ok(GroupsSlice::from(vecs));
665 }
666
667 POOL.install(|| {
668 let vals = thread_offsets
669 .par_iter()
670 .copied()
671 .map(|(base_offset, len)| {
672 let upper_bound = base_offset + len;
673 group_by_values_iter_lookbehind_collected(
674 period,
675 offset,
676 time,
677 closed_window,
678 tu,
679 tz,
680 base_offset,
681 Some(upper_bound),
682 )
683 })
684 .collect::<PolarsResult<Vec<_>>>()?;
685 Ok(flatten_par(&vals))
686 })
687 } else if ((offset.duration_ns() >= period.duration_ns())
688 && matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
689 || ((offset.duration_ns() > period.duration_ns())
690 && matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
691 {
692 let iter =
696 group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
697 iter.map(|result| result.map(|(offset, len)| [offset, len]))
698 .collect::<PolarsResult<_>>()
699 }
700 else {
707 let iter = group_by_values_iter_partial_lookbehind(
708 period,
709 offset,
710 time,
711 closed_window,
712 tu,
713 tz,
714 );
715 iter.map(|result| result.map(|(offset, len)| [offset, len]))
716 .collect::<PolarsResult<_>>()
717 }
718 } else if !offset.is_zero()
719 || closed_window == ClosedWindow::Right
720 || closed_window == ClosedWindow::None
721 {
722 if !run_parallel {
727 let vecs = group_by_values_iter_lookahead_collected(
728 period,
729 offset,
730 time,
731 closed_window,
732 tu,
733 tz,
734 0,
735 None,
736 )?;
737 return Ok(GroupsSlice::from(vecs));
738 }
739
740 POOL.install(|| {
741 let vals = thread_offsets
742 .par_iter()
743 .copied()
744 .map(|(base_offset, len)| {
745 let lower_bound = base_offset;
746 let upper_bound = base_offset + len;
747 group_by_values_iter_lookahead_collected(
748 period,
749 offset,
750 time,
751 closed_window,
752 tu,
753 tz,
754 lower_bound,
755 Some(upper_bound),
756 )
757 })
758 .collect::<PolarsResult<Vec<_>>>()?;
759 Ok(flatten_par(&vals))
760 })
761 } else {
762 if !run_parallel {
763 let vecs = group_by_values_iter_lookahead_collected(
764 period,
765 offset,
766 time,
767 closed_window,
768 tu,
769 tz,
770 0,
771 None,
772 )?;
773 return Ok(GroupsSlice::from(vecs));
774 }
775
776 POOL.install(|| {
781 let vals = thread_offsets
782 .par_iter()
783 .copied()
784 .map(|(base_offset, len)| {
785 let lower_bound = base_offset;
786 let upper_bound = base_offset + len;
787 group_by_values_iter_lookahead_collected(
788 period,
789 offset,
790 time,
791 closed_window,
792 tu,
793 tz,
794 lower_bound,
795 Some(upper_bound),
796 )
797 })
798 .collect::<PolarsResult<Vec<_>>>()?;
799 Ok(flatten_par(&vals))
800 })
801 }
802}
803
804pub struct RollingWindower {
805 period: Duration,
806 offset: Duration,
807 closed: ClosedWindow,
808
809 add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
810 tz: Option<Tz>,
811
812 start: IdxSize,
813 end: IdxSize,
814 length: IdxSize,
815
816 active: VecDeque<ActiveWindow>,
817}
818
819struct ActiveWindow {
820 start: i64,
821 end: i64,
822}
823
824impl ActiveWindow {
825 #[inline(always)]
826 fn above_lower_bound(&self, t: i64, closed: ClosedWindow) -> bool {
827 (t > self.start)
828 | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == self.start))
829 }
830
831 #[inline(always)]
832 fn below_upper_bound(&self, t: i64, closed: ClosedWindow) -> bool {
833 (t < self.end)
834 | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == self.end))
835 }
836}
837
838fn skip_in_2d_list(l: &[&[i64]], mut n: usize) -> (usize, usize) {
839 let mut y = 0;
840 while y < l.len() && (n >= l[y].len() || l[y].is_empty()) {
841 n -= l[y].len();
842 y += 1;
843 }
844 assert!(n == 0 || y < l.len());
845 (n, y)
846}
847fn increment_2d(x: &mut usize, y: &mut usize, l: &[&[i64]]) {
848 *x += 1;
849 while *y < l.len() && *x == l[*y].len() {
850 *y += 1;
851 *x = 0;
852 }
853}
854
855impl RollingWindower {
856 pub fn new(
857 period: Duration,
858 offset: Duration,
859 closed: ClosedWindow,
860 tu: TimeUnit,
861 tz: Option<Tz>,
862 ) -> Self {
863 Self {
864 period,
865 offset,
866 closed,
867
868 add: match tu {
869 TimeUnit::Nanoseconds => Duration::add_ns,
870 TimeUnit::Microseconds => Duration::add_us,
871 TimeUnit::Milliseconds => Duration::add_ms,
872 },
873 tz,
874
875 start: 0,
876 end: 0,
877 length: 0,
878
879 active: Default::default(),
880 }
881 }
882
883 pub fn insert(
887 &mut self,
888 time: &[&[i64]],
889 windows: &mut Vec<[IdxSize; 2]>,
890 ) -> PolarsResult<IdxSize> {
891 let (mut i_x, mut i_y) = skip_in_2d_list(time, (self.length - self.start) as usize);
892 let (mut s_x, mut s_y) = skip_in_2d_list(time, 0); let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
894
895 let time_start = self.start;
896 let mut i = self.length;
897 while i_y < time.len() {
898 let t = time[i_y][i_x];
899 let window_start = (self.add)(&self.offset, t, self.tz.as_ref())?;
900 let window_end = if self.offset == -self.period {
903 t
904 } else {
905 (self.add)(&self.period, window_start, self.tz.as_ref())?
906 };
907
908 self.active.push_back(ActiveWindow {
909 start: window_start,
910 end: window_end,
911 });
912
913 while let Some(w) = self.active.front() {
914 if w.below_upper_bound(t, self.closed) {
915 break;
916 }
917
918 let w = self.active.pop_front().unwrap();
919 while self.start < i && !w.above_lower_bound(time[s_y][s_x], self.closed) {
920 increment_2d(&mut s_x, &mut s_y, time);
921 self.start += 1;
922 }
923 while self.end < i && w.below_upper_bound(time[e_y][e_x], self.closed) {
924 increment_2d(&mut e_x, &mut e_y, time);
925 self.end += 1;
926 }
927 windows.push([self.start, self.end - self.start]);
928 }
929
930 increment_2d(&mut i_x, &mut i_y, time);
931 i += 1;
932 }
933
934 self.length = i;
935 Ok(self.start - time_start)
936 }
937
938 pub fn finalize(&mut self, time: &[&[i64]], windows: &mut Vec<[IdxSize; 2]>) {
940 assert_eq!(
941 time.iter().map(|t| t.len()).sum::<usize>() as IdxSize,
942 self.length - self.start
943 );
944
945 let (mut s_x, mut s_y) = skip_in_2d_list(time, 0);
946 let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
947
948 windows.extend(self.active.drain(..).map(|w| {
949 while self.start < self.length && !w.above_lower_bound(time[s_y][s_x], self.closed) {
950 increment_2d(&mut s_x, &mut s_y, time);
951 self.start += 1;
952 }
953 while self.end < self.length && w.below_upper_bound(time[e_y][e_x], self.closed) {
954 increment_2d(&mut e_x, &mut e_y, time);
955 self.end += 1;
956 }
957 [self.start, self.end - self.start]
958 }));
959
960 self.start = 0;
961 self.end = 0;
962 self.length = 0;
963 }
964
965 pub fn reset(&mut self) {
966 self.active.clear();
967 self.start = 0;
968 self.end = 0;
969 self.length = 0;
970 }
971}
972
973#[derive(Debug)]
974struct ActiveDynWindow {
975 start: IdxSize,
976 lower_bound: i64,
977 upper_bound: i64,
978}
979
980#[inline(always)]
981fn is_above_lower_bound(t: i64, lb: i64, closed: ClosedWindow) -> bool {
982 (t > lb) | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == lb))
983}
984#[inline(always)]
985fn is_below_upper_bound(t: i64, ub: i64, closed: ClosedWindow) -> bool {
986 (t < ub) | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == ub))
987}
988
989pub struct GroupByDynamicWindower {
990 period: Duration,
991 offset: Duration,
992 every: Duration,
993 closed: ClosedWindow,
994
995 start_by: StartBy,
996
997 add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
998 nte: fn(&Duration) -> i64,
1000 tu: TimeUnit,
1001 tz: Option<Tz>,
1002
1003 include_lower_bound: bool,
1004 include_upper_bound: bool,
1005
1006 num_seen: IdxSize,
1007 next_lower_bound: i64,
1008 active: VecDeque<ActiveDynWindow>,
1009}
1010
1011impl GroupByDynamicWindower {
1012 #[expect(clippy::too_many_arguments)]
1013 pub fn new(
1014 period: Duration,
1015 offset: Duration,
1016 every: Duration,
1017 start_by: StartBy,
1018 closed: ClosedWindow,
1019 tu: TimeUnit,
1020 tz: Option<Tz>,
1021 include_lower_bound: bool,
1022 include_upper_bound: bool,
1023 ) -> Self {
1024 Self {
1025 period,
1026 offset,
1027 every,
1028 closed,
1029
1030 start_by,
1031
1032 add: match tu {
1033 TimeUnit::Nanoseconds => Duration::add_ns,
1034 TimeUnit::Microseconds => Duration::add_us,
1035 TimeUnit::Milliseconds => Duration::add_ms,
1036 },
1037 nte: match tu {
1038 TimeUnit::Nanoseconds => Duration::nte_duration_ns,
1039 TimeUnit::Microseconds => Duration::nte_duration_us,
1040 TimeUnit::Milliseconds => Duration::nte_duration_ms,
1041 },
1042 tu,
1043 tz,
1044
1045 include_lower_bound,
1046 include_upper_bound,
1047
1048 num_seen: 0,
1049 next_lower_bound: 0,
1050 active: Default::default(),
1051 }
1052 }
1053
1054 pub fn find_first_window_around(
1055 &self,
1056 mut lower_bound: i64,
1057 target: i64,
1058 ) -> PolarsResult<Result<(i64, i64), i64>> {
1059 let mut upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
1060 while !is_below_upper_bound(target, upper_bound, self.closed) {
1061 let gap = target - lower_bound;
1062 let nth = match self.tu {
1063 TimeUnit::Nanoseconds
1064 if gap > self.every.nte_duration_ns() + self.period.nte_duration_ns() =>
1065 {
1066 ((gap - self.period.nte_duration_ns()) as usize)
1067 / (self.every.nte_duration_ns() as usize)
1068 },
1069 TimeUnit::Microseconds
1070 if gap > self.every.nte_duration_us() + self.period.nte_duration_us() =>
1071 {
1072 ((gap - self.period.nte_duration_us()) as usize)
1073 / (self.every.nte_duration_us() as usize)
1074 },
1075 TimeUnit::Milliseconds
1076 if gap > self.every.nte_duration_ms() + self.period.nte_duration_ms() =>
1077 {
1078 ((gap - self.period.nte_duration_ms()) as usize)
1079 / (self.every.nte_duration_ms() as usize)
1080 },
1081 _ => 1,
1082 };
1083
1084 let nth: i64 = nth.try_into().unwrap();
1085 lower_bound = (self.add)(&(self.every * nth), lower_bound, self.tz.as_ref())?;
1086 upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
1087 }
1088
1089 if is_above_lower_bound(target, lower_bound, self.closed) {
1090 Ok(Ok((lower_bound, upper_bound)))
1091 } else {
1092 Ok(Err(lower_bound))
1093 }
1094 }
1095
1096 fn start_lower_bound(&self, first: i64) -> PolarsResult<i64> {
1097 match self.start_by {
1098 StartBy::DataPoint => Ok(first),
1099 StartBy::WindowBound => {
1100 let get_earliest_bounds = match self.tu {
1101 TimeUnit::Nanoseconds => Window::get_earliest_bounds_ns,
1102 TimeUnit::Microseconds => Window::get_earliest_bounds_us,
1103 TimeUnit::Milliseconds => Window::get_earliest_bounds_ms,
1104 };
1105 Ok((get_earliest_bounds)(
1106 &Window::new(self.every, self.period, self.offset),
1107 first,
1108 self.closed,
1109 self.tz.as_ref(),
1110 )?
1111 .start)
1112 },
1113 _ => {
1114 {
1115 #[allow(clippy::type_complexity)]
1116 let (from, to): (
1117 fn(i64) -> NaiveDateTime,
1118 fn(NaiveDateTime) -> i64,
1119 ) = match self.tu {
1120 TimeUnit::Nanoseconds => {
1121 (timestamp_ns_to_datetime, datetime_to_timestamp_ns)
1122 },
1123 TimeUnit::Microseconds => {
1124 (timestamp_us_to_datetime, datetime_to_timestamp_us)
1125 },
1126 TimeUnit::Milliseconds => {
1127 (timestamp_ms_to_datetime, datetime_to_timestamp_ms)
1128 },
1129 };
1130 let dt = from(first);
1132 match self.tz.as_ref() {
1133 #[cfg(feature = "timezones")]
1134 Some(tz) => {
1135 let dt = tz.from_utc_datetime(&dt);
1136 let dt = dt.beginning_of_week();
1137 let dt = dt.naive_utc();
1138 let start = to(dt);
1139 let start = (self.add)(
1141 &Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
1142 start,
1143 self.tz.as_ref(),
1144 )?;
1145 let start = (self.add)(&self.offset, start, self.tz.as_ref())?;
1147 Ok(ensure_t_in_or_in_front_of_window(
1150 self.every,
1151 first,
1152 self.add,
1153 self.nte,
1154 self.period,
1155 start,
1156 self.closed,
1157 self.tz.as_ref(),
1158 )?
1159 .start)
1160 },
1161 _ => {
1162 let tz = chrono::Utc;
1163 let dt = dt.and_local_timezone(tz).unwrap();
1164 let dt = dt.beginning_of_week();
1165 let dt = dt.naive_utc();
1166 let start = to(dt);
1167 let start = (self.add)(
1169 &Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
1170 start,
1171 None,
1172 )
1173 .unwrap();
1174 let start = (self.add)(&self.offset, start, None).unwrap();
1176 Ok(ensure_t_in_or_in_front_of_window(
1179 self.every,
1180 first,
1181 self.add,
1182 self.nte,
1183 self.period,
1184 start,
1185 self.closed,
1186 None,
1187 )?
1188 .start)
1189 },
1190 }
1191 }
1192 },
1193 }
1194 }
1195
1196 pub fn insert(
1197 &mut self,
1198 time: &[i64],
1199 windows: &mut Vec<[IdxSize; 2]>,
1200 lower_bound: &mut Vec<i64>,
1201 upper_bound: &mut Vec<i64>,
1202 ) -> PolarsResult<()> {
1203 if time.is_empty() {
1204 return Ok(());
1205 }
1206
1207 if self.num_seen == 0 {
1208 debug_assert!(self.active.is_empty());
1209 self.next_lower_bound = self.start_lower_bound(time[0])?;
1210 }
1211
1212 for &t in time {
1213 while let Some(w) = self.active.front()
1214 && !is_below_upper_bound(t, w.upper_bound, self.closed)
1215 {
1216 let w = self.active.pop_front().unwrap();
1217 windows.push([w.start, self.num_seen - w.start]);
1218 if self.include_lower_bound {
1219 lower_bound.push(w.lower_bound);
1220 }
1221 if self.include_upper_bound {
1222 upper_bound.push(w.upper_bound);
1223 }
1224 }
1225
1226 while is_above_lower_bound(t, self.next_lower_bound, self.closed) {
1227 match self.find_first_window_around(self.next_lower_bound, t)? {
1228 Ok((lower_bound, upper_bound)) => {
1229 self.next_lower_bound =
1230 (self.add)(&self.every, lower_bound, self.tz.as_ref())?;
1231 self.active.push_back(ActiveDynWindow {
1232 start: self.num_seen,
1233 lower_bound,
1234 upper_bound,
1235 });
1236 },
1237 Err(lower_bound) => {
1238 self.next_lower_bound = lower_bound;
1239 break;
1240 },
1241 }
1242 }
1243
1244 self.num_seen += 1
1245 }
1246
1247 Ok(())
1248 }
1249
1250 pub fn lowest_needed_index(&self) -> IdxSize {
1251 self.active.front().map_or(self.num_seen, |w| w.start)
1252 }
1253
1254 pub fn finalize(
1255 &mut self,
1256 windows: &mut Vec<[IdxSize; 2]>,
1257 lower_bound: &mut Vec<i64>,
1258 upper_bound: &mut Vec<i64>,
1259 ) {
1260 for w in self.active.drain(..) {
1261 windows.push([w.start, self.num_seen - w.start]);
1262 if self.include_lower_bound {
1263 lower_bound.push(w.lower_bound);
1264 }
1265 if self.include_upper_bound {
1266 upper_bound.push(w.upper_bound);
1267 }
1268 }
1269
1270 self.next_lower_bound = 0;
1271 self.num_seen = 0;
1272 }
1273
1274 pub fn num_seen(&self) -> IdxSize {
1275 self.num_seen
1276 }
1277
1278 pub fn time_unit(&self) -> TimeUnit {
1279 self.tu
1280 }
1281}
1282
1283#[cfg(test)]
1284mod test {
1285 use super::*;
1286
1287 #[test]
1288 fn test_prune_duplicates() {
1289 let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
1292 let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
1293 prune_splits_on_duplicates(time, &mut splits);
1294 assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
1295 }
1296}