polars_time/windows/
window.rs

1use arrow::legacy::time_zone::Tz;
2use arrow::temporal_conversions::*;
3use chrono::NaiveDateTime;
4#[cfg(feature = "timezones")]
5use chrono::TimeZone;
6use now::DateTimeNow;
7use polars_core::prelude::*;
8
9use crate::prelude::*;
10
11/// Ensure that earliest datapoint (`t`) is in, or in front of, first window.
12///
13/// For example, if we have:
14///
15/// - first datapoint is `2020-01-01 01:00`
16/// - `every` is `'1d'`
17/// - `period` is `'2d'`
18/// - `offset` is `'6h'`
19///
20/// then truncating the earliest datapoint by `every` and adding `offset` results
21/// in the window `[2020-01-01 06:00, 2020-01-03 06:00)`. To give the earliest datapoint
22/// a chance of being included, we then shift the window back by `every` to
23/// `[2019-12-31 06:00, 2020-01-02 06:00)`.
24pub(crate) fn ensure_t_in_or_in_front_of_window(
25    mut every: Duration,
26    t: i64,
27    offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
28    period: Duration,
29    mut start: i64,
30    closed_window: ClosedWindow,
31    tz: Option<&Tz>,
32) -> PolarsResult<Bounds> {
33    every.negative = !every.negative;
34    let mut stop = offset_fn(&period, start, tz)?;
35    while Bounds::new(start, stop).is_past(t, closed_window) {
36        start = offset_fn(&every, start, tz)?;
37        stop = offset_fn(&period, start, tz)?;
38    }
39    Ok(Bounds::new_checked(start, stop))
40}
41
42/// Represents a window in time
43#[derive(Copy, Clone)]
44pub struct Window {
45    // The ith window start is expressed via this equation:
46    //   window_start_i = zero + every * i
47    //   window_stop_i = zero + every * i + period
48    every: Duration,
49    period: Duration,
50    pub offset: Duration,
51}
52
53impl Window {
54    pub fn new(every: Duration, period: Duration, offset: Duration) -> Self {
55        debug_assert!(!every.negative);
56        Self {
57            every,
58            period,
59            offset,
60        }
61    }
62
63    /// Truncate the given ns timestamp by the window boundary.
64    pub fn truncate_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
65        self.every.truncate_ns(t, tz)
66    }
67
68    /// Truncate the given us timestamp by the window boundary.
69    pub fn truncate_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
70        self.every.truncate_us(t, tz)
71    }
72
73    /// Truncate the given ms timestamp by the window boundary.
74    pub fn truncate_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
75        self.every.truncate_ms(t, tz)
76    }
77
78    /// Round the given ns timestamp by the window boundary.
79    pub fn round_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
80        let t = t + self.every.duration_ns() / 2_i64;
81        self.truncate_ns(t, tz)
82    }
83
84    /// Round the given us timestamp by the window boundary.
85    pub fn round_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
86        let t = t + self.every.duration_ns()
87            / (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);
88        self.truncate_us(t, tz)
89    }
90
91    /// Round the given ms timestamp by the window boundary.
92    pub fn round_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
93        let t = t + self.every.duration_ns()
94            / (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);
95        self.truncate_ms(t, tz)
96    }
97
98    /// returns the bounds for the earliest window bounds
99    /// that contains the given time t.  For underlapping windows that
100    /// do not contain time t, the window directly after time t will be returned.
101    pub fn get_earliest_bounds_ns(
102        &self,
103        t: i64,
104        closed_window: ClosedWindow,
105        tz: Option<&Tz>,
106    ) -> PolarsResult<Bounds> {
107        let start = self.truncate_ns(t, tz)?;
108        let start = self.offset.add_ns(start, tz)?;
109        ensure_t_in_or_in_front_of_window(
110            self.every,
111            t,
112            Duration::add_ns,
113            self.period,
114            start,
115            closed_window,
116            tz,
117        )
118    }
119
120    pub fn get_earliest_bounds_us(
121        &self,
122        t: i64,
123        closed_window: ClosedWindow,
124        tz: Option<&Tz>,
125    ) -> PolarsResult<Bounds> {
126        let start = self.truncate_us(t, tz)?;
127        let start = self.offset.add_us(start, tz)?;
128        ensure_t_in_or_in_front_of_window(
129            self.every,
130            t,
131            Duration::add_us,
132            self.period,
133            start,
134            closed_window,
135            tz,
136        )
137    }
138
139    pub fn get_earliest_bounds_ms(
140        &self,
141        t: i64,
142        closed_window: ClosedWindow,
143        tz: Option<&Tz>,
144    ) -> PolarsResult<Bounds> {
145        let start = self.truncate_ms(t, tz)?;
146        let start = self.offset.add_ms(start, tz)?;
147        ensure_t_in_or_in_front_of_window(
148            self.every,
149            t,
150            Duration::add_ms,
151            self.period,
152            start,
153            closed_window,
154            tz,
155        )
156    }
157
158    pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {
159        (boundary.duration() / self.every.duration_ns()
160            + self.period.duration_ns() / self.every.duration_ns()) as usize
161    }
162
163    pub(crate) fn estimate_overlapping_bounds_us(&self, boundary: Bounds) -> usize {
164        (boundary.duration() / self.every.duration_us()
165            + self.period.duration_us() / self.every.duration_us()) as usize
166    }
167
168    pub(crate) fn estimate_overlapping_bounds_ms(&self, boundary: Bounds) -> usize {
169        (boundary.duration() / self.every.duration_ms()
170            + self.period.duration_ms() / self.every.duration_ms()) as usize
171    }
172
173    pub fn get_overlapping_bounds_iter<'a>(
174        &'a self,
175        boundary: Bounds,
176        closed_window: ClosedWindow,
177        tu: TimeUnit,
178        tz: Option<&'a Tz>,
179        start_by: StartBy,
180    ) -> PolarsResult<BoundsIter<'a>> {
181        BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)
182    }
183}
184
185pub struct BoundsIter<'a> {
186    window: Window,
187    // wrapping boundary
188    boundary: Bounds,
189    // boundary per window iterator
190    bi: Bounds,
191    tu: TimeUnit,
192    tz: Option<&'a Tz>,
193}
194impl<'a> BoundsIter<'a> {
195    fn new(
196        window: Window,
197        closed_window: ClosedWindow,
198        boundary: Bounds,
199        tu: TimeUnit,
200        tz: Option<&'a Tz>,
201        start_by: StartBy,
202    ) -> PolarsResult<Self> {
203        let bi = match start_by {
204            StartBy::DataPoint => {
205                let mut boundary = boundary;
206                let offset_fn = match tu {
207                    TimeUnit::Nanoseconds => Duration::add_ns,
208                    TimeUnit::Microseconds => Duration::add_us,
209                    TimeUnit::Milliseconds => Duration::add_ms,
210                };
211                boundary.stop = offset_fn(&window.period, boundary.start, tz)?;
212                boundary
213            },
214            StartBy::WindowBound => match tu {
215                TimeUnit::Nanoseconds => {
216                    window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?
217                },
218                TimeUnit::Microseconds => {
219                    window.get_earliest_bounds_us(boundary.start, closed_window, tz)?
220                },
221                TimeUnit::Milliseconds => {
222                    window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?
223                },
224            },
225            _ => {
226                {
227                    #[allow(clippy::type_complexity)]
228                    let (from, to, offset_fn): (
229                        fn(i64) -> NaiveDateTime,
230                        fn(NaiveDateTime) -> i64,
231                        fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
232                    ) = match tu {
233                        TimeUnit::Nanoseconds => (
234                            timestamp_ns_to_datetime,
235                            datetime_to_timestamp_ns,
236                            Duration::add_ns,
237                        ),
238                        TimeUnit::Microseconds => (
239                            timestamp_us_to_datetime,
240                            datetime_to_timestamp_us,
241                            Duration::add_us,
242                        ),
243                        TimeUnit::Milliseconds => (
244                            timestamp_ms_to_datetime,
245                            datetime_to_timestamp_ms,
246                            Duration::add_ms,
247                        ),
248                    };
249                    // find beginning of the week.
250                    let dt = from(boundary.start);
251                    match tz {
252                        #[cfg(feature = "timezones")]
253                        Some(tz) => {
254                            let dt = tz.from_utc_datetime(&dt);
255                            let dt = dt.beginning_of_week();
256                            let dt = dt.naive_utc();
257                            let start = to(dt);
258                            // adjust start of the week based on given day of the week
259                            let start = offset_fn(
260                                &Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
261                                start,
262                                Some(tz),
263                            )?;
264                            // apply the 'offset'
265                            let start = offset_fn(&window.offset, start, Some(tz))?;
266                            // make sure the first datapoint has a chance to be included
267                            // and compute the end of the window defined by the 'period'
268                            ensure_t_in_or_in_front_of_window(
269                                window.every,
270                                boundary.start,
271                                offset_fn,
272                                window.period,
273                                start,
274                                closed_window,
275                                Some(tz),
276                            )?
277                        },
278                        _ => {
279                            let tz = chrono::Utc;
280                            let dt = dt.and_local_timezone(tz).unwrap();
281                            let dt = dt.beginning_of_week();
282                            let dt = dt.naive_utc();
283                            let start = to(dt);
284                            // adjust start of the week based on given day of the week
285                            let start = offset_fn(
286                                &Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
287                                start,
288                                None,
289                            )
290                            .unwrap();
291                            // apply the 'offset'
292                            let start = offset_fn(&window.offset, start, None).unwrap();
293                            // make sure the first datapoint has a chance to be included
294                            // and compute the end of the window defined by the 'period'
295                            ensure_t_in_or_in_front_of_window(
296                                window.every,
297                                boundary.start,
298                                offset_fn,
299                                window.period,
300                                start,
301                                closed_window,
302                                None,
303                            )?
304                        },
305                    }
306                }
307            },
308        };
309        Ok(Self {
310            window,
311            boundary,
312            bi,
313            tu,
314            tz,
315        })
316    }
317}
318
319impl Iterator for BoundsIter<'_> {
320    type Item = Bounds;
321
322    fn next(&mut self) -> Option<Self::Item> {
323        if self.bi.start < self.boundary.stop {
324            let out = self.bi;
325            match self.tu {
326                // TODO: find some way to propagate error instead of unwrapping?
327                // Issue is that `next` needs to return `Option`.
328                TimeUnit::Nanoseconds => {
329                    self.bi.start = self.window.every.add_ns(self.bi.start, self.tz).unwrap();
330                    self.bi.stop = self.window.period.add_ns(self.bi.start, self.tz).unwrap();
331                },
332                TimeUnit::Microseconds => {
333                    self.bi.start = self.window.every.add_us(self.bi.start, self.tz).unwrap();
334                    self.bi.stop = self.window.period.add_us(self.bi.start, self.tz).unwrap();
335                },
336                TimeUnit::Milliseconds => {
337                    self.bi.start = self.window.every.add_ms(self.bi.start, self.tz).unwrap();
338                    self.bi.stop = self.window.period.add_ms(self.bi.start, self.tz).unwrap();
339                },
340            }
341            Some(out)
342        } else {
343            None
344        }
345    }
346}