1use arrow::legacy::time_zone::Tz;
2use arrow::trusted_len::TrustedLen;
3use polars_core::POOL;
4use polars_core::prelude::*;
5use polars_core::utils::_split_offsets;
6use polars_core::utils::flatten::flatten_par;
7use rayon::prelude::*;
8#[cfg(feature = "serde")]
9use serde::{Deserialize, Serialize};
10use strum_macros::IntoStaticStr;
11
12use crate::prelude::*;
13
14#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
17#[strum(serialize_all = "snake_case")]
18pub enum ClosedWindow {
19 Left,
20 Right,
21 Both,
22 None,
23}
24
25#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
26#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
27#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
28#[strum(serialize_all = "snake_case")]
29pub enum Label {
30 Left,
31 Right,
32 DataPoint,
33}
34
35#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
36#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
37#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
38#[strum(serialize_all = "snake_case")]
39pub enum StartBy {
40 WindowBound,
41 DataPoint,
42 Monday,
44 Tuesday,
45 Wednesday,
46 Thursday,
47 Friday,
48 Saturday,
49 Sunday,
50}
51
52impl Default for StartBy {
53 fn default() -> Self {
54 Self::WindowBound
55 }
56}
57
58impl StartBy {
59 pub fn weekday(&self) -> Option<u32> {
60 match self {
61 StartBy::Monday => Some(0),
62 StartBy::Tuesday => Some(1),
63 StartBy::Wednesday => Some(2),
64 StartBy::Thursday => Some(3),
65 StartBy::Friday => Some(4),
66 StartBy::Saturday => Some(5),
67 StartBy::Sunday => Some(6),
68 _ => None,
69 }
70 }
71}
72
73#[allow(clippy::too_many_arguments)]
74fn update_groups_and_bounds(
75 bounds_iter: BoundsIter<'_>,
76 mut start: usize,
77 time: &[i64],
78 closed_window: ClosedWindow,
79 include_lower_bound: bool,
80 include_upper_bound: bool,
81 lower_bound: &mut Vec<i64>,
82 upper_bound: &mut Vec<i64>,
83 groups: &mut Vec<[IdxSize; 2]>,
84) {
85 'bounds: for bi in bounds_iter {
86 for &t in &time[start..time.len().saturating_sub(1)] {
88 if bi.is_future(t, closed_window) {
90 continue 'bounds;
91 }
92 if bi.is_member_entry(t, closed_window) {
93 break;
94 }
95 start += 1;
96 }
97
98 let mut end = start;
100
101 if end == time.len() - 1 {
103 let t = time[end];
104 if bi.is_member(t, closed_window) {
105 if include_lower_bound {
106 lower_bound.push(bi.start);
107 }
108 if include_upper_bound {
109 upper_bound.push(bi.stop);
110 }
111 groups.push([end as IdxSize, 1])
112 }
113 continue;
114 }
115 for &t in &time[end..] {
116 if !bi.is_member_exit(t, closed_window) {
117 break;
118 }
119 end += 1;
120 }
121 let len = end - start;
122
123 if include_lower_bound {
124 lower_bound.push(bi.start);
125 }
126 if include_upper_bound {
127 upper_bound.push(bi.stop);
128 }
129 groups.push([start as IdxSize, len as IdxSize])
130 }
131}
132
133#[allow(clippy::too_many_arguments)]
145pub fn group_by_windows(
146 window: Window,
147 time: &[i64],
148 closed_window: ClosedWindow,
149 tu: TimeUnit,
150 tz: &Option<TimeZone>,
151 include_lower_bound: bool,
152 include_upper_bound: bool,
153 start_by: StartBy,
154) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
155 let start = time[0];
156 let boundary = if time.len() > 1 {
160 let stop = time[time.len() - 1] + 1;
162 Bounds::new_checked(start, stop)
163 } else {
164 let stop = start + 1;
165 Bounds::new_checked(start, stop)
166 };
167
168 let size = {
169 match tu {
170 TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
171 TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
172 TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
173 }
174 };
175 let size_lower = if include_lower_bound { size } else { 0 };
176 let size_upper = if include_upper_bound { size } else { 0 };
177 let mut lower_bound = Vec::with_capacity(size_lower);
178 let mut upper_bound = Vec::with_capacity(size_upper);
179
180 let mut groups = Vec::with_capacity(size);
181 let start_offset = 0;
182
183 match tz {
184 #[cfg(feature = "timezones")]
185 Some(tz) => {
186 update_groups_and_bounds(
187 window.get_overlapping_bounds_iter(
188 boundary,
189 closed_window,
190 tu,
191 tz.parse::<Tz>().ok().as_ref(),
192 start_by,
193 )?,
194 start_offset,
195 time,
196 closed_window,
197 include_lower_bound,
198 include_upper_bound,
199 &mut lower_bound,
200 &mut upper_bound,
201 &mut groups,
202 );
203 },
204 _ => {
205 update_groups_and_bounds(
206 window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
207 start_offset,
208 time,
209 closed_window,
210 include_lower_bound,
211 include_upper_bound,
212 &mut lower_bound,
213 &mut upper_bound,
214 &mut groups,
215 );
216 },
217 };
218
219 Ok((groups, lower_bound, upper_bound))
220}
221
222#[inline]
226#[allow(clippy::too_many_arguments)]
227pub(crate) fn group_by_values_iter_lookbehind(
228 period: Duration,
229 offset: Duration,
230 time: &[i64],
231 closed_window: ClosedWindow,
232 tu: TimeUnit,
233 tz: Option<Tz>,
234 start_offset: usize,
235 upper_bound: Option<usize>,
236) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
237 debug_assert!(offset.duration_ns() == period.duration_ns());
238 debug_assert!(offset.negative);
239 let add = match tu {
240 TimeUnit::Nanoseconds => Duration::add_ns,
241 TimeUnit::Microseconds => Duration::add_us,
242 TimeUnit::Milliseconds => Duration::add_ms,
243 };
244
245 let upper_bound = upper_bound.unwrap_or(time.len());
246 let mut start = if let Some(&t) = time.get(start_offset) {
248 let lower = add(&offset, t, tz.as_ref())?;
249 let upper = t;
254 let b = Bounds::new(lower, upper);
255 let slice = &time[..start_offset];
256 slice.partition_point(|v| !b.is_member(*v, closed_window))
257 } else {
258 0
259 };
260 let mut end = start;
261 let mut last = time[start_offset];
262 Ok(time[start_offset..upper_bound]
263 .iter()
264 .enumerate()
265 .map(move |(mut i, t)| {
266 if *t == last && i > 0 {
268 let len = end - start;
269 let offset = start as IdxSize;
270 return Ok((offset, len as IdxSize));
271 }
272 last = *t;
273 i += start_offset;
274
275 let lower = add(&offset, *t, tz.as_ref())?;
276 let upper = *t;
277
278 let b = Bounds::new(lower, upper);
279
280 for &t in unsafe { time.get_unchecked(start..i) } {
281 if b.is_member_entry(t, closed_window) {
282 break;
283 }
284 start += 1;
285 }
286
287 if b.is_member_exit(*t, closed_window) {
289 end = i;
290 } else {
291 end = std::cmp::max(end, start);
292 }
293 for &t in unsafe { time.get_unchecked(end..) } {
295 if !b.is_member_exit(t, closed_window) {
296 break;
297 }
298 end += 1;
299 }
300
301 let len = end - start;
302 let offset = start as IdxSize;
303
304 Ok((offset, len as IdxSize))
305 }))
306}
307
308pub(crate) fn group_by_values_iter_window_behind_t(
313 period: Duration,
314 offset: Duration,
315 time: &[i64],
316 closed_window: ClosedWindow,
317 tu: TimeUnit,
318 tz: Option<Tz>,
319) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
320 let add = match tu {
321 TimeUnit::Nanoseconds => Duration::add_ns,
322 TimeUnit::Microseconds => Duration::add_us,
323 TimeUnit::Milliseconds => Duration::add_ms,
324 };
325
326 let mut start = 0;
327 let mut end = start;
328 let mut last = time[0];
329 let mut started = false;
330 time.iter().map(move |lower| {
331 if *lower == last && started {
333 let len = end - start;
334 let offset = start as IdxSize;
335 return Ok((offset, len as IdxSize));
336 }
337 last = *lower;
338 started = true;
339 let lower = add(&offset, *lower, tz.as_ref())?;
340 let upper = add(&period, lower, tz.as_ref())?;
341
342 let b = Bounds::new(lower, upper);
343 if b.is_future(time[0], closed_window) {
344 Ok((0, 0))
345 } else {
346 for &t in &time[start..] {
347 if b.is_member_entry(t, closed_window) {
348 break;
349 }
350 start += 1;
351 }
352
353 end = std::cmp::max(start, end);
354 for &t in &time[end..] {
355 if !b.is_member_exit(t, closed_window) {
356 break;
357 }
358 end += 1;
359 }
360
361 let len = end - start;
362 let offset = start as IdxSize;
363
364 Ok((offset, len as IdxSize))
365 }
366 })
367}
368
369pub(crate) fn group_by_values_iter_partial_lookbehind(
373 period: Duration,
374 offset: Duration,
375 time: &[i64],
376 closed_window: ClosedWindow,
377 tu: TimeUnit,
378 tz: Option<Tz>,
379) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
380 let add = match tu {
381 TimeUnit::Nanoseconds => Duration::add_ns,
382 TimeUnit::Microseconds => Duration::add_us,
383 TimeUnit::Milliseconds => Duration::add_ms,
384 };
385
386 let mut start = 0;
387 let mut end = start;
388 let mut last = time[0];
389 time.iter().enumerate().map(move |(i, lower)| {
390 if *lower == last && i > 0 {
392 let len = end - start;
393 let offset = start as IdxSize;
394 return Ok((offset, len as IdxSize));
395 }
396 last = *lower;
397
398 let lower = add(&offset, *lower, tz.as_ref())?;
399 let upper = add(&period, lower, tz.as_ref())?;
400
401 let b = Bounds::new(lower, upper);
402
403 for &t in &time[start..] {
404 if b.is_member_entry(t, closed_window) || start == i {
405 break;
406 }
407 start += 1;
408 }
409
410 end = std::cmp::max(start, end);
411 for &t in &time[end..] {
412 if !b.is_member_exit(t, closed_window) {
413 break;
414 }
415 end += 1;
416 }
417
418 let len = end - start;
419 let offset = start as IdxSize;
420
421 Ok((offset, len as IdxSize))
422 })
423}
424
425#[allow(clippy::too_many_arguments)]
426pub(crate) fn group_by_values_iter_lookahead(
430 period: Duration,
431 offset: Duration,
432 time: &[i64],
433 closed_window: ClosedWindow,
434 tu: TimeUnit,
435 tz: Option<Tz>,
436 start_offset: usize,
437 upper_bound: Option<usize>,
438) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
439 let upper_bound = upper_bound.unwrap_or(time.len());
440
441 let add = match tu {
442 TimeUnit::Nanoseconds => Duration::add_ns,
443 TimeUnit::Microseconds => Duration::add_us,
444 TimeUnit::Milliseconds => Duration::add_ms,
445 };
446 let mut start = start_offset;
447 let mut end = start;
448
449 let mut last = time[start_offset];
450 let mut started = false;
451 time[start_offset..upper_bound].iter().map(move |lower| {
452 if *lower == last && started {
454 let len = end - start;
455 let offset = start as IdxSize;
456 return Ok((offset, len as IdxSize));
457 }
458 started = true;
459 last = *lower;
460
461 let lower = add(&offset, *lower, tz.as_ref())?;
462 let upper = add(&period, lower, tz.as_ref())?;
463
464 let b = Bounds::new(lower, upper);
465
466 for &t in &time[start..] {
467 if b.is_member_entry(t, closed_window) {
468 break;
469 }
470 start += 1;
471 }
472
473 end = std::cmp::max(start, end);
474 for &t in &time[end..] {
475 if !b.is_member_exit(t, closed_window) {
476 break;
477 }
478 end += 1;
479 }
480
481 let len = end - start;
482 let offset = start as IdxSize;
483
484 Ok((offset, len as IdxSize))
485 })
486}
487
488#[cfg(feature = "rolling_window_by")]
489#[inline]
490pub(crate) fn group_by_values_iter(
491 period: Duration,
492 time: &[i64],
493 closed_window: ClosedWindow,
494 tu: TimeUnit,
495 tz: Option<Tz>,
496) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
497 let mut offset = period;
498 offset.negative = true;
499 group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
501}
502
503fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
506 let is_valid = |window: &[(usize, usize)]| -> bool {
507 debug_assert_eq!(window.len(), 2);
508 let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
509 let right_block_start = window[1].0;
510 time[left_block_end] != time[right_block_start]
511 };
512
513 if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
514 return;
515 }
516
517 let mut new = vec![];
518 for window in thread_offsets.windows(2) {
519 let this_block_is_valid = is_valid(window);
520 if this_block_is_valid {
521 new.push(window[0])
523 }
524 }
525 if thread_offsets.len().is_multiple_of(2) {
527 let window = &thread_offsets[thread_offsets.len() - 2..];
528 if is_valid(window) {
529 new.push(thread_offsets[thread_offsets.len() - 1])
530 }
531 }
532 if new.len() <= 1 {
534 new = vec![(0, time.len())];
535 } else {
536 let mut previous_start = time.len();
537 for window in new.iter_mut().rev() {
538 window.1 = previous_start - window.0;
539 previous_start = window.0;
540 }
541 new[0].0 = 0;
542 new[0].1 = new[1].0;
543 debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
544 prune_splits_on_duplicates(time, &mut new)
546 }
547 std::mem::swap(thread_offsets, &mut new);
548}
549
550#[allow(clippy::too_many_arguments)]
551fn group_by_values_iter_lookbehind_collected(
552 period: Duration,
553 offset: Duration,
554 time: &[i64],
555 closed_window: ClosedWindow,
556 tu: TimeUnit,
557 tz: Option<Tz>,
558 start_offset: usize,
559 upper_bound: Option<usize>,
560) -> PolarsResult<Vec<[IdxSize; 2]>> {
561 let iter = group_by_values_iter_lookbehind(
562 period,
563 offset,
564 time,
565 closed_window,
566 tu,
567 tz,
568 start_offset,
569 upper_bound,
570 )?;
571 iter.map(|result| result.map(|(offset, len)| [offset, len]))
572 .collect::<PolarsResult<Vec<_>>>()
573}
574
575#[allow(clippy::too_many_arguments)]
576pub(crate) fn group_by_values_iter_lookahead_collected(
577 period: Duration,
578 offset: Duration,
579 time: &[i64],
580 closed_window: ClosedWindow,
581 tu: TimeUnit,
582 tz: Option<Tz>,
583 start_offset: usize,
584 upper_bound: Option<usize>,
585) -> PolarsResult<Vec<[IdxSize; 2]>> {
586 let iter = group_by_values_iter_lookahead(
587 period,
588 offset,
589 time,
590 closed_window,
591 tu,
592 tz,
593 start_offset,
594 upper_bound,
595 );
596 iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
597 .collect::<PolarsResult<Vec<_>>>()
598}
599
600pub fn group_by_values(
608 period: Duration,
609 offset: Duration,
610 time: &[i64],
611 closed_window: ClosedWindow,
612 tu: TimeUnit,
613 tz: Option<Tz>,
614) -> PolarsResult<GroupsSlice> {
615 if time.is_empty() {
616 return Ok(GroupsSlice::from(vec![]));
617 }
618
619 let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
620 prune_splits_on_duplicates(time, &mut thread_offsets);
622
623 let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);
625
626 if offset.negative && !offset.is_zero() {
628 if offset.duration_ns() == period.duration_ns() {
630 if !run_parallel {
634 let vecs = group_by_values_iter_lookbehind_collected(
635 period,
636 offset,
637 time,
638 closed_window,
639 tu,
640 tz,
641 0,
642 None,
643 )?;
644 return Ok(GroupsSlice::from(vecs));
645 }
646
647 POOL.install(|| {
648 let vals = thread_offsets
649 .par_iter()
650 .copied()
651 .map(|(base_offset, len)| {
652 let upper_bound = base_offset + len;
653 group_by_values_iter_lookbehind_collected(
654 period,
655 offset,
656 time,
657 closed_window,
658 tu,
659 tz,
660 base_offset,
661 Some(upper_bound),
662 )
663 })
664 .collect::<PolarsResult<Vec<_>>>()?;
665 Ok(flatten_par(&vals))
666 })
667 } else if ((offset.duration_ns() >= period.duration_ns())
668 && matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
669 || ((offset.duration_ns() > period.duration_ns())
670 && matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
671 {
672 let iter =
676 group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
677 iter.map(|result| result.map(|(offset, len)| [offset, len]))
678 .collect::<PolarsResult<_>>()
679 }
680 else {
687 let iter = group_by_values_iter_partial_lookbehind(
688 period,
689 offset,
690 time,
691 closed_window,
692 tu,
693 tz,
694 );
695 iter.map(|result| result.map(|(offset, len)| [offset, len]))
696 .collect::<PolarsResult<_>>()
697 }
698 } else if !offset.is_zero()
699 || closed_window == ClosedWindow::Right
700 || closed_window == ClosedWindow::None
701 {
702 if !run_parallel {
707 let vecs = group_by_values_iter_lookahead_collected(
708 period,
709 offset,
710 time,
711 closed_window,
712 tu,
713 tz,
714 0,
715 None,
716 )?;
717 return Ok(GroupsSlice::from(vecs));
718 }
719
720 POOL.install(|| {
721 let vals = thread_offsets
722 .par_iter()
723 .copied()
724 .map(|(base_offset, len)| {
725 let lower_bound = base_offset;
726 let upper_bound = base_offset + len;
727 group_by_values_iter_lookahead_collected(
728 period,
729 offset,
730 time,
731 closed_window,
732 tu,
733 tz,
734 lower_bound,
735 Some(upper_bound),
736 )
737 })
738 .collect::<PolarsResult<Vec<_>>>()?;
739 Ok(flatten_par(&vals))
740 })
741 } else {
742 if !run_parallel {
743 let vecs = group_by_values_iter_lookahead_collected(
744 period,
745 offset,
746 time,
747 closed_window,
748 tu,
749 tz,
750 0,
751 None,
752 )?;
753 return Ok(GroupsSlice::from(vecs));
754 }
755
756 POOL.install(|| {
761 let vals = thread_offsets
762 .par_iter()
763 .copied()
764 .map(|(base_offset, len)| {
765 let lower_bound = base_offset;
766 let upper_bound = base_offset + len;
767 group_by_values_iter_lookahead_collected(
768 period,
769 offset,
770 time,
771 closed_window,
772 tu,
773 tz,
774 lower_bound,
775 Some(upper_bound),
776 )
777 })
778 .collect::<PolarsResult<Vec<_>>>()?;
779 Ok(flatten_par(&vals))
780 })
781 }
782}
783
784#[cfg(test)]
785mod test {
786 use super::*;
787
788 #[test]
789 fn test_prune_duplicates() {
790 let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
793 let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
794 prune_splits_on_duplicates(time, &mut splits);
795 assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
796 }
797}