polars_time/windows/
window.rs

1use arrow::legacy::time_zone::Tz;
2use arrow::temporal_conversions::*;
3use chrono::NaiveDateTime;
4#[cfg(feature = "timezones")]
5use chrono::TimeZone;
6use now::DateTimeNow;
7use polars_core::prelude::*;
8
9use crate::prelude::*;
10
11/// Ensure that earliest datapoint (`t`) is in, or in front of, first window.
12///
13/// For example, if we have:
14///
15/// - first datapoint is `2020-01-01 01:00`
16/// - `every` is `'1d'`
17/// - `period` is `'2d'`
18/// - `offset` is `'6h'`
19///
20/// then truncating the earliest datapoint by `every` and adding `offset` results
21/// in the window `[2020-01-01 06:00, 2020-01-03 06:00)`. To give the earliest datapoint
22/// a chance of being included, we then shift the window back by `every` to
23/// `[2019-12-31 06:00, 2020-01-02 06:00)`.
24#[allow(clippy::too_many_arguments)]
25pub(crate) fn ensure_t_in_or_in_front_of_window(
26    mut every: Duration,
27    t: i64,
28    offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
29    nte_duration_fn: fn(&Duration) -> i64,
30    period: Duration,
31    mut start: i64,
32    closed_window: ClosedWindow,
33    tz: Option<&Tz>,
34) -> PolarsResult<Bounds> {
35    every.negative = !every.negative;
36    let mut stop = offset_fn(&period, start, tz)?;
37
38    while Bounds::new(start, stop).is_past(t, closed_window) {
39        let mut gap = start - t;
40        if matches!(closed_window, ClosedWindow::Right | ClosedWindow::None) {
41            gap += 1;
42        }
43        debug_assert!(gap >= 1);
44
45        // Ceil division
46        let stride = (gap + nte_duration_fn(&every) - 1) / nte_duration_fn(&every);
47        debug_assert!(stride >= 1);
48        let stride = std::cmp::max(stride, 1);
49
50        start = offset_fn(&(every * stride), start, tz)?;
51        stop = offset_fn(&period, start, tz)?;
52    }
53    Ok(Bounds::new_checked(start, stop))
54}
55
56/// Represents a window in time
57#[derive(Copy, Clone)]
58pub struct Window {
59    // The ith window start is expressed via this equation:
60    //   window_start_i = zero + every * i
61    //   window_stop_i = zero + every * i + period
62    pub(crate) every: Duration,
63    pub(crate) period: Duration,
64    pub offset: Duration,
65}
66
67impl Window {
68    pub fn new(every: Duration, period: Duration, offset: Duration) -> Self {
69        debug_assert!(!every.negative);
70        Self {
71            every,
72            period,
73            offset,
74        }
75    }
76
77    /// Truncate the given ns timestamp by the window boundary.
78    pub fn truncate_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
79        self.every.truncate_ns(t, tz)
80    }
81
82    /// Truncate the given us timestamp by the window boundary.
83    pub fn truncate_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
84        self.every.truncate_us(t, tz)
85    }
86
87    /// Truncate the given ms timestamp by the window boundary.
88    pub fn truncate_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
89        self.every.truncate_ms(t, tz)
90    }
91
92    /// Round the given ns timestamp by the window boundary.
93    pub fn round_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
94        let t = t + self.every.duration_ns() / 2_i64;
95        self.truncate_ns(t, tz)
96    }
97
98    /// Round the given us timestamp by the window boundary.
99    pub fn round_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
100        let t = t + self.every.duration_ns()
101            / (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);
102        self.truncate_us(t, tz)
103    }
104
105    /// Round the given ms timestamp by the window boundary.
106    pub fn round_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
107        let t = t + self.every.duration_ns()
108            / (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);
109        self.truncate_ms(t, tz)
110    }
111
112    /// returns the bounds for the earliest window bounds
113    /// that contains the given time t.  For underlapping windows that
114    /// do not contain time t, the window directly after time t will be returned.
115    pub fn get_earliest_bounds_ns(
116        &self,
117        t: i64,
118        closed_window: ClosedWindow,
119        tz: Option<&Tz>,
120    ) -> PolarsResult<Bounds> {
121        let start = self.truncate_ns(t, tz)?;
122        let start = self.offset.add_ns(start, tz)?;
123        ensure_t_in_or_in_front_of_window(
124            self.every,
125            t,
126            Duration::add_ns,
127            Duration::nte_duration_ns,
128            self.period,
129            start,
130            closed_window,
131            tz,
132        )
133    }
134
135    pub fn get_earliest_bounds_us(
136        &self,
137        t: i64,
138        closed_window: ClosedWindow,
139        tz: Option<&Tz>,
140    ) -> PolarsResult<Bounds> {
141        let start = self.truncate_us(t, tz)?;
142        let start = self.offset.add_us(start, tz)?;
143        ensure_t_in_or_in_front_of_window(
144            self.every,
145            t,
146            Duration::add_us,
147            Duration::nte_duration_us,
148            self.period,
149            start,
150            closed_window,
151            tz,
152        )
153    }
154
155    pub fn get_earliest_bounds_ms(
156        &self,
157        t: i64,
158        closed_window: ClosedWindow,
159        tz: Option<&Tz>,
160    ) -> PolarsResult<Bounds> {
161        let start = self.truncate_ms(t, tz)?;
162        let start = self.offset.add_ms(start, tz)?;
163        ensure_t_in_or_in_front_of_window(
164            self.every,
165            t,
166            Duration::add_ms,
167            Duration::nte_duration_ms,
168            self.period,
169            start,
170            closed_window,
171            tz,
172        )
173    }
174
175    pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {
176        (boundary.duration() / self.every.duration_ns()
177            + self.period.duration_ns() / self.every.duration_ns()) as usize
178    }
179
180    pub(crate) fn estimate_overlapping_bounds_us(&self, boundary: Bounds) -> usize {
181        (boundary.duration() / self.every.duration_us()
182            + self.period.duration_us() / self.every.duration_us()) as usize
183    }
184
185    pub(crate) fn estimate_overlapping_bounds_ms(&self, boundary: Bounds) -> usize {
186        (boundary.duration() / self.every.duration_ms()
187            + self.period.duration_ms() / self.every.duration_ms()) as usize
188    }
189
190    pub fn get_overlapping_bounds_iter<'a>(
191        &'a self,
192        boundary: Bounds,
193        closed_window: ClosedWindow,
194        tu: TimeUnit,
195        tz: Option<&'a Tz>,
196        start_by: StartBy,
197    ) -> PolarsResult<BoundsIter<'a>> {
198        BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)
199    }
200}
201
202pub struct BoundsIter<'a> {
203    window: Window,
204    // wrapping boundary
205    boundary: Bounds,
206    // boundary per window iterator
207    bi: Bounds,
208    tu: TimeUnit,
209    tz: Option<&'a Tz>,
210}
211impl<'a> BoundsIter<'a> {
212    fn new(
213        window: Window,
214        closed_window: ClosedWindow,
215        boundary: Bounds,
216        tu: TimeUnit,
217        tz: Option<&'a Tz>,
218        start_by: StartBy,
219    ) -> PolarsResult<Self> {
220        let bi = match start_by {
221            StartBy::DataPoint => {
222                let mut boundary = boundary;
223                let offset_fn = match tu {
224                    TimeUnit::Nanoseconds => Duration::add_ns,
225                    TimeUnit::Microseconds => Duration::add_us,
226                    TimeUnit::Milliseconds => Duration::add_ms,
227                };
228                boundary.stop = offset_fn(&window.period, boundary.start, tz)?;
229                boundary
230            },
231            StartBy::WindowBound => match tu {
232                TimeUnit::Nanoseconds => {
233                    window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?
234                },
235                TimeUnit::Microseconds => {
236                    window.get_earliest_bounds_us(boundary.start, closed_window, tz)?
237                },
238                TimeUnit::Milliseconds => {
239                    window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?
240                },
241            },
242            _ => {
243                {
244                    #[allow(clippy::type_complexity)]
245                    let (from, to, offset_fn, nte_duration_fn): (
246                        fn(i64) -> NaiveDateTime,
247                        fn(NaiveDateTime) -> i64,
248                        fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
249                        fn(&Duration) -> i64,
250                    ) = match tu {
251                        TimeUnit::Nanoseconds => (
252                            timestamp_ns_to_datetime,
253                            datetime_to_timestamp_ns,
254                            Duration::add_ns,
255                            Duration::nte_duration_ns,
256                        ),
257                        TimeUnit::Microseconds => (
258                            timestamp_us_to_datetime,
259                            datetime_to_timestamp_us,
260                            Duration::add_us,
261                            Duration::nte_duration_us,
262                        ),
263                        TimeUnit::Milliseconds => (
264                            timestamp_ms_to_datetime,
265                            datetime_to_timestamp_ms,
266                            Duration::add_ms,
267                            Duration::nte_duration_ms,
268                        ),
269                    };
270                    // find beginning of the week.
271                    let dt = from(boundary.start);
272                    match tz {
273                        #[cfg(feature = "timezones")]
274                        Some(tz) => {
275                            let dt = tz.from_utc_datetime(&dt);
276                            let dt = dt.beginning_of_week();
277                            let dt = dt.naive_utc();
278                            let start = to(dt);
279                            // adjust start of the week based on given day of the week
280                            let start = offset_fn(
281                                &Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
282                                start,
283                                Some(tz),
284                            )?;
285                            // apply the 'offset'
286                            let start = offset_fn(&window.offset, start, Some(tz))?;
287                            // make sure the first datapoint has a chance to be included
288                            // and compute the end of the window defined by the 'period'
289                            ensure_t_in_or_in_front_of_window(
290                                window.every,
291                                boundary.start,
292                                offset_fn,
293                                nte_duration_fn,
294                                window.period,
295                                start,
296                                closed_window,
297                                Some(tz),
298                            )?
299                        },
300                        _ => {
301                            let tz = chrono::Utc;
302                            let dt = dt.and_local_timezone(tz).unwrap();
303                            let dt = dt.beginning_of_week();
304                            let dt = dt.naive_utc();
305                            let start = to(dt);
306                            // adjust start of the week based on given day of the week
307                            let start = offset_fn(
308                                &Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
309                                start,
310                                None,
311                            )
312                            .unwrap();
313                            // apply the 'offset'
314                            let start = offset_fn(&window.offset, start, None).unwrap();
315                            // make sure the first datapoint has a chance to be included
316                            // and compute the end of the window defined by the 'period'
317                            ensure_t_in_or_in_front_of_window(
318                                window.every,
319                                boundary.start,
320                                offset_fn,
321                                nte_duration_fn,
322                                window.period,
323                                start,
324                                closed_window,
325                                None,
326                            )?
327                        },
328                    }
329                }
330            },
331        };
332        Ok(Self {
333            window,
334            boundary,
335            bi,
336            tu,
337            tz,
338        })
339    }
340}
341
342impl Iterator for BoundsIter<'_> {
343    type Item = Bounds;
344
345    fn next(&mut self) -> Option<Self::Item> {
346        if self.bi.start < self.boundary.stop {
347            let out = self.bi;
348            match self.tu {
349                // TODO: find some way to propagate error instead of unwrapping?
350                // Issue is that `next` needs to return `Option`.
351                TimeUnit::Nanoseconds => {
352                    self.bi.start = self.window.every.add_ns(self.bi.start, self.tz).unwrap();
353                    self.bi.stop = self.window.period.add_ns(self.bi.start, self.tz).unwrap();
354                },
355                TimeUnit::Microseconds => {
356                    self.bi.start = self.window.every.add_us(self.bi.start, self.tz).unwrap();
357                    self.bi.stop = self.window.period.add_us(self.bi.start, self.tz).unwrap();
358                },
359                TimeUnit::Milliseconds => {
360                    self.bi.start = self.window.every.add_ms(self.bi.start, self.tz).unwrap();
361                    self.bi.stop = self.window.period.add_ms(self.bi.start, self.tz).unwrap();
362                },
363            }
364            Some(out)
365        } else {
366            None
367        }
368    }
369
370    fn nth(&mut self, n: usize) -> Option<Self::Item> {
371        let n: i64 = n.try_into().unwrap();
372        if self.bi.start < self.boundary.stop {
373            match self.tu {
374                TimeUnit::Nanoseconds => {
375                    self.bi.start = (self.window.every * n)
376                        .add_ns(self.bi.start, self.tz)
377                        .unwrap();
378                    self.bi.stop = (self.window.period).add_ns(self.bi.start, self.tz).unwrap();
379                },
380                TimeUnit::Microseconds => {
381                    self.bi.start = (self.window.every * n)
382                        .add_us(self.bi.start, self.tz)
383                        .unwrap();
384                    self.bi.stop = (self.window.period).add_us(self.bi.start, self.tz).unwrap();
385                },
386                TimeUnit::Milliseconds => {
387                    self.bi.start = (self.window.every * n)
388                        .add_ms(self.bi.start, self.tz)
389                        .unwrap();
390                    self.bi.stop = (self.window.period).add_ms(self.bi.start, self.tz).unwrap();
391                },
392            }
393            self.next()
394        } else {
395            None
396        }
397    }
398}
399
400impl<'a> BoundsIter<'a> {
401    /// Number of iterations to advance, such that the bounds are on target; or, in
402    /// the case of non-constant duration, close to target.
403    /// Follows the `nth()` convention on Iterator indexing, i.e., a return value of 0
404    /// implies advancing 1 iteration.
405    pub fn get_stride(&self, target: i64) -> usize {
406        let mut stride = 0;
407        if self.bi.start < self.boundary.stop && target > self.bi.start {
408            let gap = target - self.bi.start;
409            match self.tu {
410                TimeUnit::Nanoseconds => {
411                    if gap
412                        > self.window.every.nte_duration_ns() + self.window.period.nte_duration_ns()
413                    {
414                        stride = ((gap - self.window.period.nte_duration_ns()) as usize)
415                            / (self.window.every.nte_duration_ns() as usize);
416                    }
417                },
418                TimeUnit::Microseconds => {
419                    if gap
420                        > self.window.every.nte_duration_us() + self.window.period.nte_duration_us()
421                    {
422                        stride = ((gap - self.window.period.nte_duration_us()) as usize)
423                            / (self.window.every.nte_duration_us() as usize);
424                    }
425                },
426                TimeUnit::Milliseconds => {
427                    if gap
428                        > self.window.every.nte_duration_ms() + self.window.period.nte_duration_ms()
429                    {
430                        stride = ((gap - self.window.period.nte_duration_ms()) as usize)
431                            / (self.window.every.nte_duration_ms() as usize);
432                    }
433                },
434            }
435        }
436        stride
437    }
438}