polars_time/windows/
group_by.rs

1use std::collections::VecDeque;
2
3use arrow::legacy::time_zone::Tz;
4use arrow::temporal_conversions::{
5    timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
6};
7use arrow::trusted_len::TrustedLen;
8use chrono::NaiveDateTime;
9#[cfg(feature = "timezones")]
10use chrono::TimeZone as _;
11use now::DateTimeNow;
12use polars_core::POOL;
13use polars_core::prelude::*;
14use polars_core::utils::_split_offsets;
15use polars_core::utils::flatten::flatten_par;
16use rayon::prelude::*;
17#[cfg(feature = "serde")]
18use serde::{Deserialize, Serialize};
19use strum_macros::IntoStaticStr;
20
21use crate::prelude::*;
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
24#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
25#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
26#[strum(serialize_all = "snake_case")]
27pub enum ClosedWindow {
28    Left,
29    Right,
30    Both,
31    None,
32}
33
34#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
35#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
36#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
37#[strum(serialize_all = "snake_case")]
38pub enum Label {
39    Left,
40    Right,
41    DataPoint,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
45#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
46#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
47#[strum(serialize_all = "snake_case")]
48#[derive(Default)]
49pub enum StartBy {
50    #[default]
51    WindowBound,
52    DataPoint,
53    /// only useful if periods are weekly
54    Monday,
55    Tuesday,
56    Wednesday,
57    Thursday,
58    Friday,
59    Saturday,
60    Sunday,
61}
62
63impl StartBy {
64    pub fn weekday(&self) -> Option<u32> {
65        match self {
66            StartBy::Monday => Some(0),
67            StartBy::Tuesday => Some(1),
68            StartBy::Wednesday => Some(2),
69            StartBy::Thursday => Some(3),
70            StartBy::Friday => Some(4),
71            StartBy::Saturday => Some(5),
72            StartBy::Sunday => Some(6),
73            _ => None,
74        }
75    }
76}
77
78#[allow(clippy::too_many_arguments)]
79fn update_groups_and_bounds(
80    bounds_iter: BoundsIter<'_>,
81    mut start: usize,
82    time: &[i64],
83    closed_window: ClosedWindow,
84    include_lower_bound: bool,
85    include_upper_bound: bool,
86    lower_bound: &mut Vec<i64>,
87    upper_bound: &mut Vec<i64>,
88    groups: &mut Vec<[IdxSize; 2]>,
89) {
90    let mut iter = bounds_iter.into_iter();
91    let mut stride = 0;
92
93    'bounds: while let Some(bi) = iter.nth(stride) {
94        let mut has_member = false;
95        // find starting point of window
96        for &t in &time[start..time.len().saturating_sub(1)] {
97            // the window is behind the time values.
98            if bi.is_future(t, closed_window) {
99                stride = iter.get_stride(t);
100                continue 'bounds;
101            }
102            if bi.is_member_entry(t, closed_window) {
103                has_member = true;
104                break;
105            }
106            // element drops out of the window
107            start += 1;
108        }
109
110        // update stride so we can fast-forward in case of sparse data
111        stride = if has_member {
112            0
113        } else {
114            debug_assert!(start < time.len());
115            iter.get_stride(time[start])
116        };
117
118        // find members of this window
119        let mut end = start;
120
121        // last value isn't always added
122        if end == time.len() - 1 {
123            let t = time[end];
124            if bi.is_member(t, closed_window) {
125                if include_lower_bound {
126                    lower_bound.push(bi.start);
127                }
128                if include_upper_bound {
129                    upper_bound.push(bi.stop);
130                }
131                groups.push([end as IdxSize, 1])
132            }
133            continue;
134        }
135        for &t in &time[end..] {
136            if !bi.is_member_exit(t, closed_window) {
137                break;
138            }
139            end += 1;
140        }
141        let len = end - start;
142
143        if include_lower_bound {
144            lower_bound.push(bi.start);
145        }
146        if include_upper_bound {
147            upper_bound.push(bi.stop);
148        }
149        groups.push([start as IdxSize, len as IdxSize])
150    }
151}
152
153/// Window boundaries are created based on the given `Window`, which is defined by:
154/// - every
155/// - period
156/// - offset
157///
158/// And every window boundary we search for the values that fit that window by the given
159/// `ClosedWindow`. The groups are return as `GroupTuples` together with the lower bound and upper
160/// bound timestamps. These timestamps indicate the start (lower) and end (upper) of the window of
161/// that group.
162///
163/// If `include_boundaries` is `false` those `lower` and `upper` vectors will be empty.
164#[allow(clippy::too_many_arguments)]
165pub fn group_by_windows(
166    window: Window,
167    time: &[i64],
168    closed_window: ClosedWindow,
169    tu: TimeUnit,
170    tz: &Option<TimeZone>,
171    include_lower_bound: bool,
172    include_upper_bound: bool,
173    start_by: StartBy,
174) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
175    let start = time[0];
176    // the boundary we define here is not yet correct. It doesn't take 'period' into account
177    // and it doesn't have the proper starting point. This boundary is used as a proxy to find
178    // the proper 'boundary' in  'window.get_overlapping_bounds_iter'.
179    let boundary = if time.len() > 1 {
180        // +1 because left or closed boundary could match the next window if it is on the boundary
181        let stop = time[time.len() - 1] + 1;
182        Bounds::new_checked(start, stop)
183    } else {
184        let stop = start + 1;
185        Bounds::new_checked(start, stop)
186    };
187
188    let size = {
189        match tu {
190            TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
191            TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
192            TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
193        }
194    };
195    let size_lower = if include_lower_bound { size } else { 0 };
196    let size_upper = if include_upper_bound { size } else { 0 };
197    let mut lower_bound = Vec::with_capacity(size_lower);
198    let mut upper_bound = Vec::with_capacity(size_upper);
199
200    let mut groups = Vec::with_capacity(size);
201    let start_offset = 0;
202
203    match tz {
204        #[cfg(feature = "timezones")]
205        Some(tz) => {
206            update_groups_and_bounds(
207                window.get_overlapping_bounds_iter(
208                    boundary,
209                    closed_window,
210                    tu,
211                    tz.parse::<Tz>().ok().as_ref(),
212                    start_by,
213                )?,
214                start_offset,
215                time,
216                closed_window,
217                include_lower_bound,
218                include_upper_bound,
219                &mut lower_bound,
220                &mut upper_bound,
221                &mut groups,
222            );
223        },
224        _ => {
225            update_groups_and_bounds(
226                window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
227                start_offset,
228                time,
229                closed_window,
230                include_lower_bound,
231                include_upper_bound,
232                &mut lower_bound,
233                &mut upper_bound,
234                &mut groups,
235            );
236        },
237    };
238
239    Ok((groups, lower_bound, upper_bound))
240}
241
242// t is right at the end of the window
243// ------t---
244// [------]
245#[inline]
246#[allow(clippy::too_many_arguments)]
247pub(crate) fn group_by_values_iter_lookbehind(
248    period: Duration,
249    offset: Duration,
250    time: &[i64],
251    closed_window: ClosedWindow,
252    tu: TimeUnit,
253    tz: Option<Tz>,
254    start_offset: usize,
255    upper_bound: Option<usize>,
256) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
257    debug_assert!(offset.duration_ns() == period.duration_ns());
258    debug_assert!(offset.negative);
259    let add = match tu {
260        TimeUnit::Nanoseconds => Duration::add_ns,
261        TimeUnit::Microseconds => Duration::add_us,
262        TimeUnit::Milliseconds => Duration::add_ms,
263    };
264
265    let upper_bound = upper_bound.unwrap_or(time.len());
266    // Use binary search to find the initial start as that is behind.
267    let mut start = if let Some(&t) = time.get(start_offset) {
268        let lower = add(&offset, t, tz.as_ref())?;
269        // We have `period == -offset`, so `t + offset + period` is equal to `t`,
270        // and `upper` is trivially equal to `t` itself. Using the trivial calculation,
271        // instead of `upper = lower + period`, avoids issues around
272        // `t - 1mo + 1mo` not round-tripping.
273        let upper = t;
274        let b = Bounds::new(lower, upper);
275        let slice = &time[..start_offset];
276        slice.partition_point(|v| !b.is_member(*v, closed_window))
277    } else {
278        0
279    };
280    let mut end = start;
281    let mut last = time[start_offset];
282    Ok(time[start_offset..upper_bound]
283        .iter()
284        .enumerate()
285        .map(move |(mut i, t)| {
286            // Fast path for duplicates.
287            if *t == last && i > 0 {
288                let len = end - start;
289                let offset = start as IdxSize;
290                return Ok((offset, len as IdxSize));
291            }
292            last = *t;
293            i += start_offset;
294
295            let lower = add(&offset, *t, tz.as_ref())?;
296            let upper = *t;
297
298            let b = Bounds::new(lower, upper);
299
300            for &t in unsafe { time.get_unchecked(start..i) } {
301                if b.is_member_entry(t, closed_window) {
302                    break;
303                }
304                start += 1;
305            }
306
307            // faster path, check if `i` is member.
308            if b.is_member_exit(*t, closed_window) {
309                end = i;
310            } else {
311                end = std::cmp::max(end, start);
312            }
313            // we still must loop to consume duplicates
314            for &t in unsafe { time.get_unchecked(end..) } {
315                if !b.is_member_exit(t, closed_window) {
316                    break;
317                }
318                end += 1;
319            }
320
321            let len = end - start;
322            let offset = start as IdxSize;
323
324            Ok((offset, len as IdxSize))
325        }))
326}
327
328// this one is correct for all lookbehind/lookaheads, but is slower
329// window is completely behind t and t itself is not a member
330// ---------------t---
331//  [---]
332pub(crate) fn group_by_values_iter_window_behind_t(
333    period: Duration,
334    offset: Duration,
335    time: &[i64],
336    closed_window: ClosedWindow,
337    tu: TimeUnit,
338    tz: Option<Tz>,
339) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
340    let add = match tu {
341        TimeUnit::Nanoseconds => Duration::add_ns,
342        TimeUnit::Microseconds => Duration::add_us,
343        TimeUnit::Milliseconds => Duration::add_ms,
344    };
345
346    let mut start = 0;
347    let mut end = start;
348    let mut last = time[0];
349    let mut started = false;
350    time.iter().map(move |lower| {
351        // Fast path for duplicates.
352        if *lower == last && started {
353            let len = end - start;
354            let offset = start as IdxSize;
355            return Ok((offset, len as IdxSize));
356        }
357        last = *lower;
358        started = true;
359        let lower = add(&offset, *lower, tz.as_ref())?;
360        let upper = add(&period, lower, tz.as_ref())?;
361
362        let b = Bounds::new(lower, upper);
363        if b.is_future(time[0], closed_window) {
364            Ok((0, 0))
365        } else {
366            for &t in &time[start..] {
367                if b.is_member_entry(t, closed_window) {
368                    break;
369                }
370                start += 1;
371            }
372
373            end = std::cmp::max(start, end);
374            for &t in &time[end..] {
375                if !b.is_member_exit(t, closed_window) {
376                    break;
377                }
378                end += 1;
379            }
380
381            let len = end - start;
382            let offset = start as IdxSize;
383
384            Ok((offset, len as IdxSize))
385        }
386    })
387}
388
389// window is with -1 periods of t
390// ----t---
391//  [---]
392pub(crate) fn group_by_values_iter_partial_lookbehind(
393    period: Duration,
394    offset: Duration,
395    time: &[i64],
396    closed_window: ClosedWindow,
397    tu: TimeUnit,
398    tz: Option<Tz>,
399) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
400    let add = match tu {
401        TimeUnit::Nanoseconds => Duration::add_ns,
402        TimeUnit::Microseconds => Duration::add_us,
403        TimeUnit::Milliseconds => Duration::add_ms,
404    };
405
406    let mut start = 0;
407    let mut end = start;
408    let mut last = time[0];
409    time.iter().enumerate().map(move |(i, lower)| {
410        // Fast path for duplicates.
411        if *lower == last && i > 0 {
412            let len = end - start;
413            let offset = start as IdxSize;
414            return Ok((offset, len as IdxSize));
415        }
416        last = *lower;
417
418        let lower = add(&offset, *lower, tz.as_ref())?;
419        let upper = add(&period, lower, tz.as_ref())?;
420
421        let b = Bounds::new(lower, upper);
422
423        for &t in &time[start..] {
424            if b.is_member_entry(t, closed_window) || start == i {
425                break;
426            }
427            start += 1;
428        }
429
430        end = std::cmp::max(start, end);
431        for &t in &time[end..] {
432            if !b.is_member_exit(t, closed_window) {
433                break;
434            }
435            end += 1;
436        }
437
438        let len = end - start;
439        let offset = start as IdxSize;
440
441        Ok((offset, len as IdxSize))
442    })
443}
444
445#[allow(clippy::too_many_arguments)]
446// window is completely ahead of t and t itself is not a member
447// --t-----------
448//        [---]
449pub(crate) fn group_by_values_iter_lookahead(
450    period: Duration,
451    offset: Duration,
452    time: &[i64],
453    closed_window: ClosedWindow,
454    tu: TimeUnit,
455    tz: Option<Tz>,
456    start_offset: usize,
457    upper_bound: Option<usize>,
458) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
459    let upper_bound = upper_bound.unwrap_or(time.len());
460
461    let add = match tu {
462        TimeUnit::Nanoseconds => Duration::add_ns,
463        TimeUnit::Microseconds => Duration::add_us,
464        TimeUnit::Milliseconds => Duration::add_ms,
465    };
466    let mut start = start_offset;
467    let mut end = start;
468
469    let mut last = time[start_offset];
470    let mut started = false;
471    time[start_offset..upper_bound].iter().map(move |lower| {
472        // Fast path for duplicates.
473        if *lower == last && started {
474            let len = end - start;
475            let offset = start as IdxSize;
476            return Ok((offset, len as IdxSize));
477        }
478        started = true;
479        last = *lower;
480
481        let lower = add(&offset, *lower, tz.as_ref())?;
482        let upper = add(&period, lower, tz.as_ref())?;
483
484        let b = Bounds::new(lower, upper);
485
486        for &t in &time[start..] {
487            if b.is_member_entry(t, closed_window) {
488                break;
489            }
490            start += 1;
491        }
492
493        end = std::cmp::max(start, end);
494        for &t in &time[end..] {
495            if !b.is_member_exit(t, closed_window) {
496                break;
497            }
498            end += 1;
499        }
500
501        let len = end - start;
502        let offset = start as IdxSize;
503
504        Ok((offset, len as IdxSize))
505    })
506}
507
508#[cfg(feature = "rolling_window_by")]
509#[inline]
510pub(crate) fn group_by_values_iter(
511    period: Duration,
512    time: &[i64],
513    closed_window: ClosedWindow,
514    tu: TimeUnit,
515    tz: Option<Tz>,
516) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
517    let mut offset = period;
518    offset.negative = true;
519    // t is at the right endpoint of the window
520    group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
521}
522
523/// Checks if the boundary elements don't split on duplicates.
524/// If they do we remove them
525fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
526    let is_valid = |window: &[(usize, usize)]| -> bool {
527        debug_assert_eq!(window.len(), 2);
528        let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
529        let right_block_start = window[1].0;
530        time[left_block_end] != time[right_block_start]
531    };
532
533    if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
534        return;
535    }
536
537    let mut new = vec![];
538    for window in thread_offsets.windows(2) {
539        let this_block_is_valid = is_valid(window);
540        if this_block_is_valid {
541            // Only push left block
542            new.push(window[0])
543        }
544    }
545    // Check last block
546    if thread_offsets.len().is_multiple_of(2) {
547        let window = &thread_offsets[thread_offsets.len() - 2..];
548        if is_valid(window) {
549            new.push(thread_offsets[thread_offsets.len() - 1])
550        }
551    }
552    // We pruned invalid blocks, now we must correct the lengths.
553    if new.len() <= 1 {
554        new = vec![(0, time.len())];
555    } else {
556        let mut previous_start = time.len();
557        for window in new.iter_mut().rev() {
558            window.1 = previous_start - window.0;
559            previous_start = window.0;
560        }
561        new[0].0 = 0;
562        new[0].1 = new[1].0;
563        debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
564        // Call again to check.
565        prune_splits_on_duplicates(time, &mut new)
566    }
567    std::mem::swap(thread_offsets, &mut new);
568}
569
570#[allow(clippy::too_many_arguments)]
571fn group_by_values_iter_lookbehind_collected(
572    period: Duration,
573    offset: Duration,
574    time: &[i64],
575    closed_window: ClosedWindow,
576    tu: TimeUnit,
577    tz: Option<Tz>,
578    start_offset: usize,
579    upper_bound: Option<usize>,
580) -> PolarsResult<Vec<[IdxSize; 2]>> {
581    let iter = group_by_values_iter_lookbehind(
582        period,
583        offset,
584        time,
585        closed_window,
586        tu,
587        tz,
588        start_offset,
589        upper_bound,
590    )?;
591    iter.map(|result| result.map(|(offset, len)| [offset, len]))
592        .collect::<PolarsResult<Vec<_>>>()
593}
594
595#[allow(clippy::too_many_arguments)]
596pub(crate) fn group_by_values_iter_lookahead_collected(
597    period: Duration,
598    offset: Duration,
599    time: &[i64],
600    closed_window: ClosedWindow,
601    tu: TimeUnit,
602    tz: Option<Tz>,
603    start_offset: usize,
604    upper_bound: Option<usize>,
605) -> PolarsResult<Vec<[IdxSize; 2]>> {
606    let iter = group_by_values_iter_lookahead(
607        period,
608        offset,
609        time,
610        closed_window,
611        tu,
612        tz,
613        start_offset,
614        upper_bound,
615    );
616    iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
617        .collect::<PolarsResult<Vec<_>>>()
618}
619
620/// Different from `group_by_windows`, where define window buckets and search which values fit that
621/// pre-defined bucket.
622///
623/// This function defines every window based on the:
624///     - timestamp (lower bound)
625///     - timestamp + period (upper bound)
626/// where timestamps are the individual values in the array `time`
627pub fn group_by_values(
628    period: Duration,
629    offset: Duration,
630    time: &[i64],
631    closed_window: ClosedWindow,
632    tu: TimeUnit,
633    tz: Option<Tz>,
634) -> PolarsResult<GroupsSlice> {
635    if time.is_empty() {
636        return Ok(GroupsSlice::from(vec![]));
637    }
638
639    let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
640    // there are duplicates in the splits, so we opt for a single partition
641    prune_splits_on_duplicates(time, &mut thread_offsets);
642
643    // If we start from within parallel work we will do this single threaded.
644    let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);
645
646    // we have a (partial) lookbehind window
647    if offset.negative && !offset.is_zero() {
648        // lookbehind
649        if offset.duration_ns() == period.duration_ns() {
650            // t is right at the end of the window
651            // ------t---
652            // [------]
653            if !run_parallel {
654                let vecs = group_by_values_iter_lookbehind_collected(
655                    period,
656                    offset,
657                    time,
658                    closed_window,
659                    tu,
660                    tz,
661                    0,
662                    None,
663                )?;
664                return Ok(GroupsSlice::from(vecs));
665            }
666
667            POOL.install(|| {
668                let vals = thread_offsets
669                    .par_iter()
670                    .copied()
671                    .map(|(base_offset, len)| {
672                        let upper_bound = base_offset + len;
673                        group_by_values_iter_lookbehind_collected(
674                            period,
675                            offset,
676                            time,
677                            closed_window,
678                            tu,
679                            tz,
680                            base_offset,
681                            Some(upper_bound),
682                        )
683                    })
684                    .collect::<PolarsResult<Vec<_>>>()?;
685                Ok(flatten_par(&vals))
686            })
687        } else if ((offset.duration_ns() >= period.duration_ns())
688            && matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
689            || ((offset.duration_ns() > period.duration_ns())
690                && matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
691        {
692            // window is completely behind t and t itself is not a member
693            // ---------------t---
694            //  [---]
695            let iter =
696                group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
697            iter.map(|result| result.map(|(offset, len)| [offset, len]))
698                .collect::<PolarsResult<_>>()
699        }
700        // partial lookbehind
701        // this one is still single threaded
702        // can make it parallel later, its a bit more complicated because the boundaries are unknown
703        // window is with -1 periods of t
704        // ----t---
705        //  [---]
706        else {
707            let iter = group_by_values_iter_partial_lookbehind(
708                period,
709                offset,
710                time,
711                closed_window,
712                tu,
713                tz,
714            );
715            iter.map(|result| result.map(|(offset, len)| [offset, len]))
716                .collect::<PolarsResult<_>>()
717        }
718    } else if !offset.is_zero()
719        || closed_window == ClosedWindow::Right
720        || closed_window == ClosedWindow::None
721    {
722        // window is completely ahead of t and t itself is not a member
723        // --t-----------
724        //        [---]
725
726        if !run_parallel {
727            let vecs = group_by_values_iter_lookahead_collected(
728                period,
729                offset,
730                time,
731                closed_window,
732                tu,
733                tz,
734                0,
735                None,
736            )?;
737            return Ok(GroupsSlice::from(vecs));
738        }
739
740        POOL.install(|| {
741            let vals = thread_offsets
742                .par_iter()
743                .copied()
744                .map(|(base_offset, len)| {
745                    let lower_bound = base_offset;
746                    let upper_bound = base_offset + len;
747                    group_by_values_iter_lookahead_collected(
748                        period,
749                        offset,
750                        time,
751                        closed_window,
752                        tu,
753                        tz,
754                        lower_bound,
755                        Some(upper_bound),
756                    )
757                })
758                .collect::<PolarsResult<Vec<_>>>()?;
759            Ok(flatten_par(&vals))
760        })
761    } else {
762        if !run_parallel {
763            let vecs = group_by_values_iter_lookahead_collected(
764                period,
765                offset,
766                time,
767                closed_window,
768                tu,
769                tz,
770                0,
771                None,
772            )?;
773            return Ok(GroupsSlice::from(vecs));
774        }
775
776        // Offset is 0 and window is closed on the left:
777        // it must be that the window starts at t and t is a member
778        // --t-----------
779        //  [---]
780        POOL.install(|| {
781            let vals = thread_offsets
782                .par_iter()
783                .copied()
784                .map(|(base_offset, len)| {
785                    let lower_bound = base_offset;
786                    let upper_bound = base_offset + len;
787                    group_by_values_iter_lookahead_collected(
788                        period,
789                        offset,
790                        time,
791                        closed_window,
792                        tu,
793                        tz,
794                        lower_bound,
795                        Some(upper_bound),
796                    )
797                })
798                .collect::<PolarsResult<Vec<_>>>()?;
799            Ok(flatten_par(&vals))
800        })
801    }
802}
803
804pub struct RollingWindower {
805    period: Duration,
806    offset: Duration,
807    closed: ClosedWindow,
808
809    add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
810    tz: Option<Tz>,
811
812    start: IdxSize,
813    end: IdxSize,
814    length: IdxSize,
815
816    active: VecDeque<ActiveWindow>,
817}
818
819struct ActiveWindow {
820    start: i64,
821    end: i64,
822}
823
824impl ActiveWindow {
825    #[inline(always)]
826    fn above_lower_bound(&self, t: i64, closed: ClosedWindow) -> bool {
827        (t > self.start)
828            | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == self.start))
829    }
830
831    #[inline(always)]
832    fn below_upper_bound(&self, t: i64, closed: ClosedWindow) -> bool {
833        (t < self.end)
834            | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == self.end))
835    }
836}
837
838fn skip_in_2d_list(l: &[&[i64]], mut n: usize) -> (usize, usize) {
839    let mut y = 0;
840    while y < l.len() && (n >= l[y].len() || l[y].is_empty()) {
841        n -= l[y].len();
842        y += 1;
843    }
844    assert!(n == 0 || y < l.len());
845    (n, y)
846}
847fn increment_2d(x: &mut usize, y: &mut usize, l: &[&[i64]]) {
848    *x += 1;
849    while *y < l.len() && *x == l[*y].len() {
850        *y += 1;
851        *x = 0;
852    }
853}
854
855impl RollingWindower {
856    pub fn new(
857        period: Duration,
858        offset: Duration,
859        closed: ClosedWindow,
860        tu: TimeUnit,
861        tz: Option<Tz>,
862    ) -> Self {
863        Self {
864            period,
865            offset,
866            closed,
867
868            add: match tu {
869                TimeUnit::Nanoseconds => Duration::add_ns,
870                TimeUnit::Microseconds => Duration::add_us,
871                TimeUnit::Milliseconds => Duration::add_ms,
872            },
873            tz,
874
875            start: 0,
876            end: 0,
877            length: 0,
878
879            active: Default::default(),
880        }
881    }
882
883    /// Insert new values into the windower.
884    ///
885    /// This should be given all the old values that were not processed yet.
886    pub fn insert(
887        &mut self,
888        time: &[&[i64]],
889        windows: &mut Vec<[IdxSize; 2]>,
890    ) -> PolarsResult<IdxSize> {
891        let (mut i_x, mut i_y) = skip_in_2d_list(time, (self.length - self.start) as usize);
892        let (mut s_x, mut s_y) = skip_in_2d_list(time, 0); // skip over empty lists
893        let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
894
895        let time_start = self.start;
896        let mut i = self.length;
897        while i_y < time.len() {
898            let t = time[i_y][i_x];
899            let window_start = (self.add)(&self.offset, t, self.tz.as_ref())?;
900            // For datetime arithmetic, it does *NOT* hold 0 + a - a == 0. Therefore, we make sure
901            // that if `offset` and `period` are inverses we keep the `t`.
902            let window_end = if self.offset == -self.period {
903                t
904            } else {
905                (self.add)(&self.period, window_start, self.tz.as_ref())?
906            };
907
908            self.active.push_back(ActiveWindow {
909                start: window_start,
910                end: window_end,
911            });
912
913            while let Some(w) = self.active.front() {
914                if w.below_upper_bound(t, self.closed) {
915                    break;
916                }
917
918                let w = self.active.pop_front().unwrap();
919                while self.start < i && !w.above_lower_bound(time[s_y][s_x], self.closed) {
920                    increment_2d(&mut s_x, &mut s_y, time);
921                    self.start += 1;
922                }
923                while self.end < i && w.below_upper_bound(time[e_y][e_x], self.closed) {
924                    increment_2d(&mut e_x, &mut e_y, time);
925                    self.end += 1;
926                }
927                windows.push([self.start, self.end - self.start]);
928            }
929
930            increment_2d(&mut i_x, &mut i_y, time);
931            i += 1;
932        }
933
934        self.length = i;
935        Ok(self.start - time_start)
936    }
937
938    /// Process all remaining items and signal that no more items are coming.
939    pub fn finalize(&mut self, time: &[&[i64]], windows: &mut Vec<[IdxSize; 2]>) {
940        assert_eq!(
941            time.iter().map(|t| t.len()).sum::<usize>() as IdxSize,
942            self.length - self.start
943        );
944
945        let (mut s_x, mut s_y) = skip_in_2d_list(time, 0);
946        let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
947
948        windows.extend(self.active.drain(..).map(|w| {
949            while self.start < self.length && !w.above_lower_bound(time[s_y][s_x], self.closed) {
950                increment_2d(&mut s_x, &mut s_y, time);
951                self.start += 1;
952            }
953            while self.end < self.length && w.below_upper_bound(time[e_y][e_x], self.closed) {
954                increment_2d(&mut e_x, &mut e_y, time);
955                self.end += 1;
956            }
957            [self.start, self.end - self.start]
958        }));
959
960        self.start = 0;
961        self.end = 0;
962        self.length = 0;
963    }
964
965    pub fn reset(&mut self) {
966        self.active.clear();
967        self.start = 0;
968        self.end = 0;
969        self.length = 0;
970    }
971}
972
973#[derive(Debug)]
974struct ActiveDynWindow {
975    start: IdxSize,
976    lower_bound: i64,
977    upper_bound: i64,
978}
979
980#[inline(always)]
981fn is_above_lower_bound(t: i64, lb: i64, closed: ClosedWindow) -> bool {
982    (t > lb) | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == lb))
983}
984#[inline(always)]
985fn is_below_upper_bound(t: i64, ub: i64, closed: ClosedWindow) -> bool {
986    (t < ub) | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == ub))
987}
988
989pub struct GroupByDynamicWindower {
990    period: Duration,
991    offset: Duration,
992    every: Duration,
993    closed: ClosedWindow,
994
995    start_by: StartBy,
996
997    add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
998    // Not-to-exceed duration (upper limit).
999    nte: fn(&Duration) -> i64,
1000    tu: TimeUnit,
1001    tz: Option<Tz>,
1002
1003    include_lower_bound: bool,
1004    include_upper_bound: bool,
1005
1006    num_seen: IdxSize,
1007    next_lower_bound: i64,
1008    active: VecDeque<ActiveDynWindow>,
1009}
1010
1011impl GroupByDynamicWindower {
1012    #[expect(clippy::too_many_arguments)]
1013    pub fn new(
1014        period: Duration,
1015        offset: Duration,
1016        every: Duration,
1017        start_by: StartBy,
1018        closed: ClosedWindow,
1019        tu: TimeUnit,
1020        tz: Option<Tz>,
1021        include_lower_bound: bool,
1022        include_upper_bound: bool,
1023    ) -> Self {
1024        Self {
1025            period,
1026            offset,
1027            every,
1028            closed,
1029
1030            start_by,
1031
1032            add: match tu {
1033                TimeUnit::Nanoseconds => Duration::add_ns,
1034                TimeUnit::Microseconds => Duration::add_us,
1035                TimeUnit::Milliseconds => Duration::add_ms,
1036            },
1037            nte: match tu {
1038                TimeUnit::Nanoseconds => Duration::nte_duration_ns,
1039                TimeUnit::Microseconds => Duration::nte_duration_us,
1040                TimeUnit::Milliseconds => Duration::nte_duration_ms,
1041            },
1042            tu,
1043            tz,
1044
1045            include_lower_bound,
1046            include_upper_bound,
1047
1048            num_seen: 0,
1049            next_lower_bound: 0,
1050            active: Default::default(),
1051        }
1052    }
1053
1054    pub fn find_first_window_around(
1055        &self,
1056        mut lower_bound: i64,
1057        target: i64,
1058    ) -> PolarsResult<Result<(i64, i64), i64>> {
1059        let mut upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
1060        while !is_below_upper_bound(target, upper_bound, self.closed) {
1061            let gap = target - lower_bound;
1062            let nth = match self.tu {
1063                TimeUnit::Nanoseconds
1064                    if gap > self.every.nte_duration_ns() + self.period.nte_duration_ns() =>
1065                {
1066                    ((gap - self.period.nte_duration_ns()) as usize)
1067                        / (self.every.nte_duration_ns() as usize)
1068                },
1069                TimeUnit::Microseconds
1070                    if gap > self.every.nte_duration_us() + self.period.nte_duration_us() =>
1071                {
1072                    ((gap - self.period.nte_duration_us()) as usize)
1073                        / (self.every.nte_duration_us() as usize)
1074                },
1075                TimeUnit::Milliseconds
1076                    if gap > self.every.nte_duration_ms() + self.period.nte_duration_ms() =>
1077                {
1078                    ((gap - self.period.nte_duration_ms()) as usize)
1079                        / (self.every.nte_duration_ms() as usize)
1080                },
1081                _ => 1,
1082            };
1083
1084            let nth: i64 = nth.try_into().unwrap();
1085            lower_bound = (self.add)(&(self.every * nth), lower_bound, self.tz.as_ref())?;
1086            upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
1087        }
1088
1089        if is_above_lower_bound(target, lower_bound, self.closed) {
1090            Ok(Ok((lower_bound, upper_bound)))
1091        } else {
1092            Ok(Err(lower_bound))
1093        }
1094    }
1095
1096    fn start_lower_bound(&self, first: i64) -> PolarsResult<i64> {
1097        match self.start_by {
1098            StartBy::DataPoint => Ok(first),
1099            StartBy::WindowBound => {
1100                let get_earliest_bounds = match self.tu {
1101                    TimeUnit::Nanoseconds => Window::get_earliest_bounds_ns,
1102                    TimeUnit::Microseconds => Window::get_earliest_bounds_us,
1103                    TimeUnit::Milliseconds => Window::get_earliest_bounds_ms,
1104                };
1105                Ok((get_earliest_bounds)(
1106                    &Window::new(self.every, self.period, self.offset),
1107                    first,
1108                    self.closed,
1109                    self.tz.as_ref(),
1110                )?
1111                .start)
1112            },
1113            _ => {
1114                {
1115                    #[allow(clippy::type_complexity)]
1116                    let (from, to): (
1117                        fn(i64) -> NaiveDateTime,
1118                        fn(NaiveDateTime) -> i64,
1119                    ) = match self.tu {
1120                        TimeUnit::Nanoseconds => {
1121                            (timestamp_ns_to_datetime, datetime_to_timestamp_ns)
1122                        },
1123                        TimeUnit::Microseconds => {
1124                            (timestamp_us_to_datetime, datetime_to_timestamp_us)
1125                        },
1126                        TimeUnit::Milliseconds => {
1127                            (timestamp_ms_to_datetime, datetime_to_timestamp_ms)
1128                        },
1129                    };
1130                    // find beginning of the week.
1131                    let dt = from(first);
1132                    match self.tz.as_ref() {
1133                        #[cfg(feature = "timezones")]
1134                        Some(tz) => {
1135                            let dt = tz.from_utc_datetime(&dt);
1136                            let dt = dt.beginning_of_week();
1137                            let dt = dt.naive_utc();
1138                            let start = to(dt);
1139                            // adjust start of the week based on given day of the week
1140                            let start = (self.add)(
1141                                &Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
1142                                start,
1143                                self.tz.as_ref(),
1144                            )?;
1145                            // apply the 'offset'
1146                            let start = (self.add)(&self.offset, start, self.tz.as_ref())?;
1147                            // make sure the first datapoint has a chance to be included
1148                            // and compute the end of the window defined by the 'period'
1149                            Ok(ensure_t_in_or_in_front_of_window(
1150                                self.every,
1151                                first,
1152                                self.add,
1153                                self.nte,
1154                                self.period,
1155                                start,
1156                                self.closed,
1157                                self.tz.as_ref(),
1158                            )?
1159                            .start)
1160                        },
1161                        _ => {
1162                            let tz = chrono::Utc;
1163                            let dt = dt.and_local_timezone(tz).unwrap();
1164                            let dt = dt.beginning_of_week();
1165                            let dt = dt.naive_utc();
1166                            let start = to(dt);
1167                            // adjust start of the week based on given day of the week
1168                            let start = (self.add)(
1169                                &Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
1170                                start,
1171                                None,
1172                            )
1173                            .unwrap();
1174                            // apply the 'offset'
1175                            let start = (self.add)(&self.offset, start, None).unwrap();
1176                            // make sure the first datapoint has a chance to be included
1177                            // and compute the end of the window defined by the 'period'
1178                            Ok(ensure_t_in_or_in_front_of_window(
1179                                self.every,
1180                                first,
1181                                self.add,
1182                                self.nte,
1183                                self.period,
1184                                start,
1185                                self.closed,
1186                                None,
1187                            )?
1188                            .start)
1189                        },
1190                    }
1191                }
1192            },
1193        }
1194    }
1195
1196    pub fn insert(
1197        &mut self,
1198        time: &[i64],
1199        windows: &mut Vec<[IdxSize; 2]>,
1200        lower_bound: &mut Vec<i64>,
1201        upper_bound: &mut Vec<i64>,
1202    ) -> PolarsResult<()> {
1203        if time.is_empty() {
1204            return Ok(());
1205        }
1206
1207        if self.num_seen == 0 {
1208            debug_assert!(self.active.is_empty());
1209            self.next_lower_bound = self.start_lower_bound(time[0])?;
1210        }
1211
1212        for &t in time {
1213            while let Some(w) = self.active.front()
1214                && !is_below_upper_bound(t, w.upper_bound, self.closed)
1215            {
1216                let w = self.active.pop_front().unwrap();
1217                windows.push([w.start, self.num_seen - w.start]);
1218                if self.include_lower_bound {
1219                    lower_bound.push(w.lower_bound);
1220                }
1221                if self.include_upper_bound {
1222                    upper_bound.push(w.upper_bound);
1223                }
1224            }
1225
1226            while is_above_lower_bound(t, self.next_lower_bound, self.closed) {
1227                match self.find_first_window_around(self.next_lower_bound, t)? {
1228                    Ok((lower_bound, upper_bound)) => {
1229                        self.next_lower_bound =
1230                            (self.add)(&self.every, lower_bound, self.tz.as_ref())?;
1231                        self.active.push_back(ActiveDynWindow {
1232                            start: self.num_seen,
1233                            lower_bound,
1234                            upper_bound,
1235                        });
1236                    },
1237                    Err(lower_bound) => {
1238                        self.next_lower_bound = lower_bound;
1239                        break;
1240                    },
1241                }
1242            }
1243
1244            self.num_seen += 1
1245        }
1246
1247        Ok(())
1248    }
1249
1250    pub fn lowest_needed_index(&self) -> IdxSize {
1251        self.active.front().map_or(self.num_seen, |w| w.start)
1252    }
1253
1254    pub fn finalize(
1255        &mut self,
1256        windows: &mut Vec<[IdxSize; 2]>,
1257        lower_bound: &mut Vec<i64>,
1258        upper_bound: &mut Vec<i64>,
1259    ) {
1260        for w in self.active.drain(..) {
1261            windows.push([w.start, self.num_seen - w.start]);
1262            if self.include_lower_bound {
1263                lower_bound.push(w.lower_bound);
1264            }
1265            if self.include_upper_bound {
1266                upper_bound.push(w.upper_bound);
1267            }
1268        }
1269
1270        self.next_lower_bound = 0;
1271        self.num_seen = 0;
1272    }
1273
1274    pub fn num_seen(&self) -> IdxSize {
1275        self.num_seen
1276    }
1277
1278    pub fn time_unit(&self) -> TimeUnit {
1279        self.tu
1280    }
1281}
1282
1283#[cfg(test)]
1284mod test {
1285    use super::*;
1286
1287    #[test]
1288    fn test_prune_duplicates() {
1289        //                     |--|------------|----|---------|
1290        //                     0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
1291        let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
1292        let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
1293        prune_splits_on_duplicates(time, &mut splits);
1294        assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
1295    }
1296}