1use arrow::legacy::time_zone::Tz;
2use arrow::trusted_len::TrustedLen;
3use polars_core::POOL;
4use polars_core::prelude::*;
5use polars_core::utils::_split_offsets;
6use polars_core::utils::flatten::flatten_par;
7use rayon::prelude::*;
8#[cfg(feature = "serde")]
9use serde::{Deserialize, Serialize};
10use strum_macros::IntoStaticStr;
11
12use crate::prelude::*;
13
14#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16#[strum(serialize_all = "snake_case")]
17pub enum ClosedWindow {
18 Left,
19 Right,
20 Both,
21 None,
22}
23
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
25#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
26#[strum(serialize_all = "snake_case")]
27pub enum Label {
28 Left,
29 Right,
30 DataPoint,
31}
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
34#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
35#[strum(serialize_all = "snake_case")]
36pub enum StartBy {
37 WindowBound,
38 DataPoint,
39 Monday,
41 Tuesday,
42 Wednesday,
43 Thursday,
44 Friday,
45 Saturday,
46 Sunday,
47}
48
49impl Default for StartBy {
50 fn default() -> Self {
51 Self::WindowBound
52 }
53}
54
55impl StartBy {
56 pub fn weekday(&self) -> Option<u32> {
57 match self {
58 StartBy::Monday => Some(0),
59 StartBy::Tuesday => Some(1),
60 StartBy::Wednesday => Some(2),
61 StartBy::Thursday => Some(3),
62 StartBy::Friday => Some(4),
63 StartBy::Saturday => Some(5),
64 StartBy::Sunday => Some(6),
65 _ => None,
66 }
67 }
68}
69
70#[allow(clippy::too_many_arguments)]
71fn update_groups_and_bounds(
72 bounds_iter: BoundsIter<'_>,
73 mut start: usize,
74 time: &[i64],
75 closed_window: ClosedWindow,
76 include_lower_bound: bool,
77 include_upper_bound: bool,
78 lower_bound: &mut Vec<i64>,
79 upper_bound: &mut Vec<i64>,
80 groups: &mut Vec<[IdxSize; 2]>,
81) {
82 'bounds: for bi in bounds_iter {
83 for &t in &time[start..time.len().saturating_sub(1)] {
85 if bi.is_future(t, closed_window) {
87 continue 'bounds;
88 }
89 if bi.is_member_entry(t, closed_window) {
90 break;
91 }
92 start += 1;
93 }
94
95 let mut end = start;
97
98 if end == time.len() - 1 {
100 let t = time[end];
101 if bi.is_member(t, closed_window) {
102 if include_lower_bound {
103 lower_bound.push(bi.start);
104 }
105 if include_upper_bound {
106 upper_bound.push(bi.stop);
107 }
108 groups.push([end as IdxSize, 1])
109 }
110 continue;
111 }
112 for &t in &time[end..] {
113 if !bi.is_member_exit(t, closed_window) {
114 break;
115 }
116 end += 1;
117 }
118 let len = end - start;
119
120 if include_lower_bound {
121 lower_bound.push(bi.start);
122 }
123 if include_upper_bound {
124 upper_bound.push(bi.stop);
125 }
126 groups.push([start as IdxSize, len as IdxSize])
127 }
128}
129
130#[allow(clippy::too_many_arguments)]
142pub fn group_by_windows(
143 window: Window,
144 time: &[i64],
145 closed_window: ClosedWindow,
146 tu: TimeUnit,
147 tz: &Option<TimeZone>,
148 include_lower_bound: bool,
149 include_upper_bound: bool,
150 start_by: StartBy,
151) -> (GroupsSlice, Vec<i64>, Vec<i64>) {
152 let start = time[0];
153 let boundary = if time.len() > 1 {
157 let stop = time[time.len() - 1] + 1;
159 Bounds::new_checked(start, stop)
160 } else {
161 let stop = start + 1;
162 Bounds::new_checked(start, stop)
163 };
164
165 let size = {
166 match tu {
167 TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
168 TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
169 TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
170 }
171 };
172 let size_lower = if include_lower_bound { size } else { 0 };
173 let size_upper = if include_upper_bound { size } else { 0 };
174 let mut lower_bound = Vec::with_capacity(size_lower);
175 let mut upper_bound = Vec::with_capacity(size_upper);
176
177 let mut groups = Vec::with_capacity(size);
178 let start_offset = 0;
179
180 match tz {
181 #[cfg(feature = "timezones")]
182 Some(tz) => {
183 update_groups_and_bounds(
184 window
185 .get_overlapping_bounds_iter(
186 boundary,
187 closed_window,
188 tu,
189 tz.parse::<Tz>().ok().as_ref(),
190 start_by,
191 )
192 .unwrap(),
193 start_offset,
194 time,
195 closed_window,
196 include_lower_bound,
197 include_upper_bound,
198 &mut lower_bound,
199 &mut upper_bound,
200 &mut groups,
201 );
202 },
203 _ => {
204 update_groups_and_bounds(
205 window
206 .get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)
207 .unwrap(),
208 start_offset,
209 time,
210 closed_window,
211 include_lower_bound,
212 include_upper_bound,
213 &mut lower_bound,
214 &mut upper_bound,
215 &mut groups,
216 );
217 },
218 };
219
220 (groups, lower_bound, upper_bound)
221}
222
223#[inline]
227#[allow(clippy::too_many_arguments)]
228pub(crate) fn group_by_values_iter_lookbehind(
229 period: Duration,
230 offset: Duration,
231 time: &[i64],
232 closed_window: ClosedWindow,
233 tu: TimeUnit,
234 tz: Option<Tz>,
235 start_offset: usize,
236 upper_bound: Option<usize>,
237) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
238 debug_assert!(offset.duration_ns() == period.duration_ns());
239 debug_assert!(offset.negative);
240 let add = match tu {
241 TimeUnit::Nanoseconds => Duration::add_ns,
242 TimeUnit::Microseconds => Duration::add_us,
243 TimeUnit::Milliseconds => Duration::add_ms,
244 };
245
246 let upper_bound = upper_bound.unwrap_or(time.len());
247 let mut start = if let Some(&t) = time.get(start_offset) {
249 let lower = add(&offset, t, tz.as_ref())?;
250 let upper = t;
255 let b = Bounds::new(lower, upper);
256 let slice = &time[..start_offset];
257 slice.partition_point(|v| !b.is_member(*v, closed_window))
258 } else {
259 0
260 };
261 let mut end = start;
262 let mut last = time[start_offset];
263 Ok(time[start_offset..upper_bound]
264 .iter()
265 .enumerate()
266 .map(move |(mut i, t)| {
267 if *t == last && i > 0 {
269 let len = end - start;
270 let offset = start as IdxSize;
271 return Ok((offset, len as IdxSize));
272 }
273 last = *t;
274 i += start_offset;
275
276 let lower = add(&offset, *t, tz.as_ref())?;
277 let upper = *t;
278
279 let b = Bounds::new(lower, upper);
280
281 for &t in unsafe { time.get_unchecked(start..i) } {
282 if b.is_member_entry(t, closed_window) {
283 break;
284 }
285 start += 1;
286 }
287
288 if b.is_member_exit(*t, closed_window) {
290 end = i;
291 } else {
292 end = std::cmp::max(end, start);
293 }
294 for &t in unsafe { time.get_unchecked(end..) } {
296 if !b.is_member_exit(t, closed_window) {
297 break;
298 }
299 end += 1;
300 }
301
302 let len = end - start;
303 let offset = start as IdxSize;
304
305 Ok((offset, len as IdxSize))
306 }))
307}
308
309pub(crate) fn group_by_values_iter_window_behind_t(
314 period: Duration,
315 offset: Duration,
316 time: &[i64],
317 closed_window: ClosedWindow,
318 tu: TimeUnit,
319 tz: Option<Tz>,
320) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
321 let add = match tu {
322 TimeUnit::Nanoseconds => Duration::add_ns,
323 TimeUnit::Microseconds => Duration::add_us,
324 TimeUnit::Milliseconds => Duration::add_ms,
325 };
326
327 let mut start = 0;
328 let mut end = start;
329 let mut last = time[0];
330 let mut started = false;
331 time.iter().map(move |lower| {
332 if *lower == last && started {
334 let len = end - start;
335 let offset = start as IdxSize;
336 return Ok((offset, len as IdxSize));
337 }
338 last = *lower;
339 started = true;
340 let lower = add(&offset, *lower, tz.as_ref())?;
341 let upper = add(&period, lower, tz.as_ref())?;
342
343 let b = Bounds::new(lower, upper);
344 if b.is_future(time[0], closed_window) {
345 Ok((0, 0))
346 } else {
347 for &t in &time[start..] {
348 if b.is_member_entry(t, closed_window) {
349 break;
350 }
351 start += 1;
352 }
353
354 end = std::cmp::max(start, end);
355 for &t in &time[end..] {
356 if !b.is_member_exit(t, closed_window) {
357 break;
358 }
359 end += 1;
360 }
361
362 let len = end - start;
363 let offset = start as IdxSize;
364
365 Ok((offset, len as IdxSize))
366 }
367 })
368}
369
370pub(crate) fn group_by_values_iter_partial_lookbehind(
374 period: Duration,
375 offset: Duration,
376 time: &[i64],
377 closed_window: ClosedWindow,
378 tu: TimeUnit,
379 tz: Option<Tz>,
380) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
381 let add = match tu {
382 TimeUnit::Nanoseconds => Duration::add_ns,
383 TimeUnit::Microseconds => Duration::add_us,
384 TimeUnit::Milliseconds => Duration::add_ms,
385 };
386
387 let mut start = 0;
388 let mut end = start;
389 let mut last = time[0];
390 time.iter().enumerate().map(move |(i, lower)| {
391 if *lower == last && i > 0 {
393 let len = end - start;
394 let offset = start as IdxSize;
395 return Ok((offset, len as IdxSize));
396 }
397 last = *lower;
398
399 let lower = add(&offset, *lower, tz.as_ref())?;
400 let upper = add(&period, lower, tz.as_ref())?;
401
402 let b = Bounds::new(lower, upper);
403
404 for &t in &time[start..] {
405 if b.is_member_entry(t, closed_window) || start == i {
406 break;
407 }
408 start += 1;
409 }
410
411 end = std::cmp::max(start, end);
412 for &t in &time[end..] {
413 if !b.is_member_exit(t, closed_window) {
414 break;
415 }
416 end += 1;
417 }
418
419 let len = end - start;
420 let offset = start as IdxSize;
421
422 Ok((offset, len as IdxSize))
423 })
424}
425
426#[allow(clippy::too_many_arguments)]
427pub(crate) fn group_by_values_iter_lookahead(
431 period: Duration,
432 offset: Duration,
433 time: &[i64],
434 closed_window: ClosedWindow,
435 tu: TimeUnit,
436 tz: Option<Tz>,
437 start_offset: usize,
438 upper_bound: Option<usize>,
439) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
440 let upper_bound = upper_bound.unwrap_or(time.len());
441
442 let add = match tu {
443 TimeUnit::Nanoseconds => Duration::add_ns,
444 TimeUnit::Microseconds => Duration::add_us,
445 TimeUnit::Milliseconds => Duration::add_ms,
446 };
447 let mut start = start_offset;
448 let mut end = start;
449
450 let mut last = time[start_offset];
451 let mut started = false;
452 time[start_offset..upper_bound].iter().map(move |lower| {
453 if *lower == last && started {
455 let len = end - start;
456 let offset = start as IdxSize;
457 return Ok((offset, len as IdxSize));
458 }
459 started = true;
460 last = *lower;
461
462 let lower = add(&offset, *lower, tz.as_ref())?;
463 let upper = add(&period, lower, tz.as_ref())?;
464
465 let b = Bounds::new(lower, upper);
466
467 for &t in &time[start..] {
468 if b.is_member_entry(t, closed_window) {
469 break;
470 }
471 start += 1;
472 }
473
474 end = std::cmp::max(start, end);
475 for &t in &time[end..] {
476 if !b.is_member_exit(t, closed_window) {
477 break;
478 }
479 end += 1;
480 }
481
482 let len = end - start;
483 let offset = start as IdxSize;
484
485 Ok((offset, len as IdxSize))
486 })
487}
488
489#[cfg(feature = "rolling_window_by")]
490#[inline]
491pub(crate) fn group_by_values_iter(
492 period: Duration,
493 time: &[i64],
494 closed_window: ClosedWindow,
495 tu: TimeUnit,
496 tz: Option<Tz>,
497) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
498 let mut offset = period;
499 offset.negative = true;
500 group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
502}
503
504fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
507 let is_valid = |window: &[(usize, usize)]| -> bool {
508 debug_assert_eq!(window.len(), 2);
509 let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
510 let right_block_start = window[1].0;
511 time[left_block_end] != time[right_block_start]
512 };
513
514 if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
515 return;
516 }
517
518 let mut new = vec![];
519 for window in thread_offsets.windows(2) {
520 let this_block_is_valid = is_valid(window);
521 if this_block_is_valid {
522 new.push(window[0])
524 }
525 }
526 if thread_offsets.len() % 2 == 0 {
528 let window = &thread_offsets[thread_offsets.len() - 2..];
529 if is_valid(window) {
530 new.push(thread_offsets[thread_offsets.len() - 1])
531 }
532 }
533 if new.len() <= 1 {
535 new = vec![(0, time.len())];
536 } else {
537 let mut previous_start = time.len();
538 for window in new.iter_mut().rev() {
539 window.1 = previous_start - window.0;
540 previous_start = window.0;
541 }
542 new[0].0 = 0;
543 new[0].1 = new[1].0;
544 debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
545 prune_splits_on_duplicates(time, &mut new)
547 }
548 std::mem::swap(thread_offsets, &mut new);
549}
550
551#[allow(clippy::too_many_arguments)]
552fn group_by_values_iter_lookbehind_collected(
553 period: Duration,
554 offset: Duration,
555 time: &[i64],
556 closed_window: ClosedWindow,
557 tu: TimeUnit,
558 tz: Option<Tz>,
559 start_offset: usize,
560 upper_bound: Option<usize>,
561) -> PolarsResult<Vec<[IdxSize; 2]>> {
562 let iter = group_by_values_iter_lookbehind(
563 period,
564 offset,
565 time,
566 closed_window,
567 tu,
568 tz,
569 start_offset,
570 upper_bound,
571 )?;
572 iter.map(|result| result.map(|(offset, len)| [offset, len]))
573 .collect::<PolarsResult<Vec<_>>>()
574}
575
576#[allow(clippy::too_many_arguments)]
577pub(crate) fn group_by_values_iter_lookahead_collected(
578 period: Duration,
579 offset: Duration,
580 time: &[i64],
581 closed_window: ClosedWindow,
582 tu: TimeUnit,
583 tz: Option<Tz>,
584 start_offset: usize,
585 upper_bound: Option<usize>,
586) -> PolarsResult<Vec<[IdxSize; 2]>> {
587 let iter = group_by_values_iter_lookahead(
588 period,
589 offset,
590 time,
591 closed_window,
592 tu,
593 tz,
594 start_offset,
595 upper_bound,
596 );
597 iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
598 .collect::<PolarsResult<Vec<_>>>()
599}
600
601pub fn group_by_values(
609 period: Duration,
610 offset: Duration,
611 time: &[i64],
612 closed_window: ClosedWindow,
613 tu: TimeUnit,
614 tz: Option<Tz>,
615) -> PolarsResult<GroupsSlice> {
616 if time.is_empty() {
617 return Ok(GroupsSlice::from(vec![]));
618 }
619
620 let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
621 prune_splits_on_duplicates(time, &mut thread_offsets);
623
624 let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);
626
627 if offset.negative && !offset.is_zero() {
629 if offset.duration_ns() == period.duration_ns() {
631 if !run_parallel {
635 let vecs = group_by_values_iter_lookbehind_collected(
636 period,
637 offset,
638 time,
639 closed_window,
640 tu,
641 tz,
642 0,
643 None,
644 )?;
645 return Ok(GroupsSlice::from(vecs));
646 }
647
648 POOL.install(|| {
649 let vals = thread_offsets
650 .par_iter()
651 .copied()
652 .map(|(base_offset, len)| {
653 let upper_bound = base_offset + len;
654 group_by_values_iter_lookbehind_collected(
655 period,
656 offset,
657 time,
658 closed_window,
659 tu,
660 tz,
661 base_offset,
662 Some(upper_bound),
663 )
664 })
665 .collect::<PolarsResult<Vec<_>>>()?;
666 Ok(flatten_par(&vals))
667 })
668 } else if ((offset.duration_ns() >= period.duration_ns())
669 && matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
670 || ((offset.duration_ns() > period.duration_ns())
671 && matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
672 {
673 let iter =
677 group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
678 iter.map(|result| result.map(|(offset, len)| [offset, len]))
679 .collect::<PolarsResult<_>>()
680 }
681 else {
688 let iter = group_by_values_iter_partial_lookbehind(
689 period,
690 offset,
691 time,
692 closed_window,
693 tu,
694 tz,
695 );
696 iter.map(|result| result.map(|(offset, len)| [offset, len]))
697 .collect::<PolarsResult<_>>()
698 }
699 } else if !offset.is_zero()
700 || closed_window == ClosedWindow::Right
701 || closed_window == ClosedWindow::None
702 {
703 if !run_parallel {
708 let vecs = group_by_values_iter_lookahead_collected(
709 period,
710 offset,
711 time,
712 closed_window,
713 tu,
714 tz,
715 0,
716 None,
717 )?;
718 return Ok(GroupsSlice::from(vecs));
719 }
720
721 POOL.install(|| {
722 let vals = thread_offsets
723 .par_iter()
724 .copied()
725 .map(|(base_offset, len)| {
726 let lower_bound = base_offset;
727 let upper_bound = base_offset + len;
728 group_by_values_iter_lookahead_collected(
729 period,
730 offset,
731 time,
732 closed_window,
733 tu,
734 tz,
735 lower_bound,
736 Some(upper_bound),
737 )
738 })
739 .collect::<PolarsResult<Vec<_>>>()?;
740 Ok(flatten_par(&vals))
741 })
742 } else {
743 if !run_parallel {
744 let vecs = group_by_values_iter_lookahead_collected(
745 period,
746 offset,
747 time,
748 closed_window,
749 tu,
750 tz,
751 0,
752 None,
753 )?;
754 return Ok(GroupsSlice::from(vecs));
755 }
756
757 POOL.install(|| {
762 let vals = thread_offsets
763 .par_iter()
764 .copied()
765 .map(|(base_offset, len)| {
766 let lower_bound = base_offset;
767 let upper_bound = base_offset + len;
768 group_by_values_iter_lookahead_collected(
769 period,
770 offset,
771 time,
772 closed_window,
773 tu,
774 tz,
775 lower_bound,
776 Some(upper_bound),
777 )
778 })
779 .collect::<PolarsResult<Vec<_>>>()?;
780 Ok(flatten_par(&vals))
781 })
782 }
783}
784
785#[cfg(test)]
786mod test {
787 use super::*;
788
789 #[test]
790 fn test_prune_duplicates() {
791 let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
794 let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
795 prune_splits_on_duplicates(time, &mut splits);
796 assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
797 }
798}