1use arrow::legacy::time_zone::Tz;
2use arrow::temporal_conversions::*;
3use chrono::NaiveDateTime;
4#[cfg(feature = "timezones")]
5use chrono::TimeZone;
6use now::DateTimeNow;
7use polars_core::prelude::*;
8
9use crate::prelude::*;
10
11pub(crate) fn ensure_t_in_or_in_front_of_window(
25 mut every: Duration,
26 t: i64,
27 offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
28 period: Duration,
29 mut start: i64,
30 closed_window: ClosedWindow,
31 tz: Option<&Tz>,
32) -> PolarsResult<Bounds> {
33 every.negative = !every.negative;
34 let mut stop = offset_fn(&period, start, tz)?;
35 while Bounds::new(start, stop).is_past(t, closed_window) {
36 start = offset_fn(&every, start, tz)?;
37 stop = offset_fn(&period, start, tz)?;
38 }
39 Ok(Bounds::new_checked(start, stop))
40}
41
42#[derive(Copy, Clone)]
44pub struct Window {
45 every: Duration,
49 period: Duration,
50 pub offset: Duration,
51}
52
53impl Window {
54 pub fn new(every: Duration, period: Duration, offset: Duration) -> Self {
55 debug_assert!(!every.negative);
56 Self {
57 every,
58 period,
59 offset,
60 }
61 }
62
63 pub fn truncate_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
65 self.every.truncate_ns(t, tz)
66 }
67
68 pub fn truncate_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
70 self.every.truncate_us(t, tz)
71 }
72
73 pub fn truncate_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
75 self.every.truncate_ms(t, tz)
76 }
77
78 pub fn round_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
80 let t = t + self.every.duration_ns() / 2_i64;
81 self.truncate_ns(t, tz)
82 }
83
84 pub fn round_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
86 let t = t + self.every.duration_ns()
87 / (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);
88 self.truncate_us(t, tz)
89 }
90
91 pub fn round_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
93 let t = t + self.every.duration_ns()
94 / (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);
95 self.truncate_ms(t, tz)
96 }
97
98 pub fn get_earliest_bounds_ns(
102 &self,
103 t: i64,
104 closed_window: ClosedWindow,
105 tz: Option<&Tz>,
106 ) -> PolarsResult<Bounds> {
107 let start = self.truncate_ns(t, tz)?;
108 let start = self.offset.add_ns(start, tz)?;
109 ensure_t_in_or_in_front_of_window(
110 self.every,
111 t,
112 Duration::add_ns,
113 self.period,
114 start,
115 closed_window,
116 tz,
117 )
118 }
119
120 pub fn get_earliest_bounds_us(
121 &self,
122 t: i64,
123 closed_window: ClosedWindow,
124 tz: Option<&Tz>,
125 ) -> PolarsResult<Bounds> {
126 let start = self.truncate_us(t, tz)?;
127 let start = self.offset.add_us(start, tz)?;
128 ensure_t_in_or_in_front_of_window(
129 self.every,
130 t,
131 Duration::add_us,
132 self.period,
133 start,
134 closed_window,
135 tz,
136 )
137 }
138
139 pub fn get_earliest_bounds_ms(
140 &self,
141 t: i64,
142 closed_window: ClosedWindow,
143 tz: Option<&Tz>,
144 ) -> PolarsResult<Bounds> {
145 let start = self.truncate_ms(t, tz)?;
146 let start = self.offset.add_ms(start, tz)?;
147 ensure_t_in_or_in_front_of_window(
148 self.every,
149 t,
150 Duration::add_ms,
151 self.period,
152 start,
153 closed_window,
154 tz,
155 )
156 }
157
158 pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {
159 (boundary.duration() / self.every.duration_ns()
160 + self.period.duration_ns() / self.every.duration_ns()) as usize
161 }
162
163 pub(crate) fn estimate_overlapping_bounds_us(&self, boundary: Bounds) -> usize {
164 (boundary.duration() / self.every.duration_us()
165 + self.period.duration_us() / self.every.duration_us()) as usize
166 }
167
168 pub(crate) fn estimate_overlapping_bounds_ms(&self, boundary: Bounds) -> usize {
169 (boundary.duration() / self.every.duration_ms()
170 + self.period.duration_ms() / self.every.duration_ms()) as usize
171 }
172
173 pub fn get_overlapping_bounds_iter<'a>(
174 &'a self,
175 boundary: Bounds,
176 closed_window: ClosedWindow,
177 tu: TimeUnit,
178 tz: Option<&'a Tz>,
179 start_by: StartBy,
180 ) -> PolarsResult<BoundsIter<'a>> {
181 BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)
182 }
183}
184
185pub struct BoundsIter<'a> {
186 window: Window,
187 boundary: Bounds,
189 bi: Bounds,
191 tu: TimeUnit,
192 tz: Option<&'a Tz>,
193}
194impl<'a> BoundsIter<'a> {
195 fn new(
196 window: Window,
197 closed_window: ClosedWindow,
198 boundary: Bounds,
199 tu: TimeUnit,
200 tz: Option<&'a Tz>,
201 start_by: StartBy,
202 ) -> PolarsResult<Self> {
203 let bi = match start_by {
204 StartBy::DataPoint => {
205 let mut boundary = boundary;
206 let offset_fn = match tu {
207 TimeUnit::Nanoseconds => Duration::add_ns,
208 TimeUnit::Microseconds => Duration::add_us,
209 TimeUnit::Milliseconds => Duration::add_ms,
210 };
211 boundary.stop = offset_fn(&window.period, boundary.start, tz)?;
212 boundary
213 },
214 StartBy::WindowBound => match tu {
215 TimeUnit::Nanoseconds => {
216 window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?
217 },
218 TimeUnit::Microseconds => {
219 window.get_earliest_bounds_us(boundary.start, closed_window, tz)?
220 },
221 TimeUnit::Milliseconds => {
222 window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?
223 },
224 },
225 _ => {
226 {
227 #[allow(clippy::type_complexity)]
228 let (from, to, offset_fn): (
229 fn(i64) -> NaiveDateTime,
230 fn(NaiveDateTime) -> i64,
231 fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
232 ) = match tu {
233 TimeUnit::Nanoseconds => (
234 timestamp_ns_to_datetime,
235 datetime_to_timestamp_ns,
236 Duration::add_ns,
237 ),
238 TimeUnit::Microseconds => (
239 timestamp_us_to_datetime,
240 datetime_to_timestamp_us,
241 Duration::add_us,
242 ),
243 TimeUnit::Milliseconds => (
244 timestamp_ms_to_datetime,
245 datetime_to_timestamp_ms,
246 Duration::add_ms,
247 ),
248 };
249 let dt = from(boundary.start);
251 match tz {
252 #[cfg(feature = "timezones")]
253 Some(tz) => {
254 let dt = tz.from_utc_datetime(&dt);
255 let dt = dt.beginning_of_week();
256 let dt = dt.naive_utc();
257 let start = to(dt);
258 let start = offset_fn(
260 &Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
261 start,
262 Some(tz),
263 )?;
264 let start = offset_fn(&window.offset, start, Some(tz))?;
266 ensure_t_in_or_in_front_of_window(
269 window.every,
270 boundary.start,
271 offset_fn,
272 window.period,
273 start,
274 closed_window,
275 Some(tz),
276 )?
277 },
278 _ => {
279 let tz = chrono::Utc;
280 let dt = dt.and_local_timezone(tz).unwrap();
281 let dt = dt.beginning_of_week();
282 let dt = dt.naive_utc();
283 let start = to(dt);
284 let start = offset_fn(
286 &Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
287 start,
288 None,
289 )
290 .unwrap();
291 let start = offset_fn(&window.offset, start, None).unwrap();
293 ensure_t_in_or_in_front_of_window(
296 window.every,
297 boundary.start,
298 offset_fn,
299 window.period,
300 start,
301 closed_window,
302 None,
303 )?
304 },
305 }
306 }
307 },
308 };
309 Ok(Self {
310 window,
311 boundary,
312 bi,
313 tu,
314 tz,
315 })
316 }
317}
318
319impl Iterator for BoundsIter<'_> {
320 type Item = Bounds;
321
322 fn next(&mut self) -> Option<Self::Item> {
323 if self.bi.start < self.boundary.stop {
324 let out = self.bi;
325 match self.tu {
326 TimeUnit::Nanoseconds => {
329 self.bi.start = self.window.every.add_ns(self.bi.start, self.tz).unwrap();
330 self.bi.stop = self.window.period.add_ns(self.bi.start, self.tz).unwrap();
331 },
332 TimeUnit::Microseconds => {
333 self.bi.start = self.window.every.add_us(self.bi.start, self.tz).unwrap();
334 self.bi.stop = self.window.period.add_us(self.bi.start, self.tz).unwrap();
335 },
336 TimeUnit::Milliseconds => {
337 self.bi.start = self.window.every.add_ms(self.bi.start, self.tz).unwrap();
338 self.bi.stop = self.window.period.add_ms(self.bi.start, self.tz).unwrap();
339 },
340 }
341 Some(out)
342 } else {
343 None
344 }
345 }
346}