1mod agg_list;
2mod boolean;
3mod dispatch;
4mod string;
5
6use std::borrow::Cow;
7
8pub use agg_list::*;
9use arrow::bitmap::{Bitmap, MutableBitmap};
10use arrow::legacy::kernels::take_agg::*;
11use arrow::legacy::trusted_len::TrustedLenPush;
12use arrow::types::NativeType;
13use num_traits::pow::Pow;
14use num_traits::{Bounded, Float, Num, NumCast, ToPrimitive, Zero};
15use polars_compute::rolling::no_nulls::{
16 MaxWindow, MinWindow, MomentWindow, QuantileWindow, RollingAggWindowNoNulls,
17};
18use polars_compute::rolling::nulls::{RollingAggWindowNulls, VarianceMoment};
19use polars_compute::rolling::quantile_filter::SealedRolling;
20use polars_compute::rolling::{
21 self, MeanWindow, QuantileMethod, RollingFnParams, RollingQuantileParams, RollingVarParams,
22 SumWindow, quantile_filter,
23};
24use polars_utils::float::IsFloat;
25#[cfg(feature = "dtype-f16")]
26use polars_utils::float16::pf16;
27use polars_utils::idx_vec::IdxVec;
28use polars_utils::kahan_sum::KahanSum;
29use polars_utils::min_max::MinMax;
30use rayon::prelude::*;
31
32use crate::POOL;
33use crate::chunked_array::cast::CastOptions;
34#[cfg(feature = "object")]
35use crate::chunked_array::object::extension::create_extension;
36use crate::frame::group_by::GroupsIdx;
37#[cfg(feature = "object")]
38use crate::frame::group_by::GroupsIndicator;
39use crate::prelude::*;
40use crate::series::IsSorted;
41use crate::series::implementations::SeriesWrap;
42use crate::utils::NoNull;
43
44fn idx2usize(idx: &[IdxSize]) -> impl ExactSizeIterator<Item = usize> + '_ {
45 idx.iter().map(|i| *i as usize)
46}
47
48pub fn _use_rolling_kernels(
55 groups: &GroupsSlice,
56 overlapping: bool,
57 monotonic: bool,
58 chunks: &[ArrayRef],
59) -> bool {
60 match groups.len() {
61 0 | 1 => false,
62 _ => overlapping && monotonic && chunks.len() == 1,
63 }
64}
65
66pub fn _rolling_apply_agg_window_nulls<'a, Agg, T, O>(
68 values: &'a [T],
69 validity: &'a Bitmap,
70 offsets: O,
71 params: Option<RollingFnParams>,
72) -> PrimitiveArray<T>
73where
74 O: Iterator<Item = (IdxSize, IdxSize)> + TrustedLen,
75 Agg: RollingAggWindowNulls<'a, T>,
76 T: IsFloat + NativeType,
77{
78 if values.is_empty() {
79 let out: Vec<T> = vec![];
80 return PrimitiveArray::new(T::PRIMITIVE.into(), out.into(), None);
81 }
82
83 let output_len = offsets.size_hint().0;
86 let mut agg_window = unsafe { Agg::new(values, validity, 0, 0, params, None) };
90
91 let mut validity = MutableBitmap::with_capacity(output_len);
92 validity.extend_constant(output_len, true);
93
94 let out = offsets
95 .enumerate()
96 .map(|(idx, (start, len))| {
97 let end = start + len;
98
99 let agg = unsafe { agg_window.update(start as usize, end as usize) };
102
103 match agg {
104 Some(val) => val,
105 None => {
106 unsafe { validity.set_unchecked(idx, false) };
108 T::default()
109 },
110 }
111 })
112 .collect_trusted::<Vec<_>>();
113
114 PrimitiveArray::new(T::PRIMITIVE.into(), out.into(), Some(validity.into()))
115}
116
117pub fn _rolling_apply_agg_window_no_nulls<'a, Agg, T, O>(
119 values: &'a [T],
120 offsets: O,
121 params: Option<RollingFnParams>,
122) -> PrimitiveArray<T>
123where
124 Agg: RollingAggWindowNoNulls<'a, T>,
126 O: Iterator<Item = (IdxSize, IdxSize)> + TrustedLen,
127 T: IsFloat + NativeType,
128{
129 if values.is_empty() {
130 let out: Vec<T> = vec![];
131 return PrimitiveArray::new(T::PRIMITIVE.into(), out.into(), None);
132 }
133 let mut agg_window = Agg::new(values, 0, 0, params, None);
135
136 offsets
137 .map(|(start, len)| {
138 let end = start + len;
139
140 unsafe { agg_window.update(start as usize, end as usize) }
142 })
143 .collect::<PrimitiveArray<T>>()
144}
145
146pub fn _slice_from_offsets<T>(ca: &ChunkedArray<T>, first: IdxSize, len: IdxSize) -> ChunkedArray<T>
147where
148 T: PolarsDataType,
149{
150 ca.slice(first as i64, len as usize)
151}
152
153pub fn _agg_helper_idx<T, F>(groups: &GroupsIdx, f: F) -> Series
155where
156 F: Fn((IdxSize, &IdxVec)) -> Option<T::Native> + Send + Sync,
157 T: PolarsNumericType,
158{
159 let ca: ChunkedArray<T> = POOL.install(|| groups.into_par_iter().map(f).collect());
160 ca.into_series()
161}
162
163pub fn _agg_helper_idx_no_null<T, F>(groups: &GroupsIdx, f: F) -> Series
165where
166 F: Fn((IdxSize, &IdxVec)) -> T::Native + Send + Sync,
167 T: PolarsNumericType,
168{
169 let ca: NoNull<ChunkedArray<T>> = POOL.install(|| groups.into_par_iter().map(f).collect());
170 ca.into_inner().into_series()
171}
172
173fn agg_helper_idx_on_all<T, F>(groups: &GroupsIdx, f: F) -> Series
176where
177 F: Fn(&IdxVec) -> Option<T::Native> + Send + Sync,
178 T: PolarsNumericType,
179{
180 let ca: ChunkedArray<T> = POOL.install(|| groups.all().into_par_iter().map(f).collect());
181 ca.into_series()
182}
183
184pub fn _agg_helper_slice<T, F>(groups: &[[IdxSize; 2]], f: F) -> Series
185where
186 F: Fn([IdxSize; 2]) -> Option<T::Native> + Send + Sync,
187 T: PolarsNumericType,
188{
189 let ca: ChunkedArray<T> = POOL.install(|| groups.par_iter().copied().map(f).collect());
190 ca.into_series()
191}
192
193pub fn _agg_helper_slice_no_null<T, F>(groups: &[[IdxSize; 2]], f: F) -> Series
194where
195 F: Fn([IdxSize; 2]) -> T::Native + Send + Sync,
196 T: PolarsNumericType,
197{
198 let ca: NoNull<ChunkedArray<T>> = POOL.install(|| groups.par_iter().copied().map(f).collect());
199 ca.into_inner().into_series()
200}
201
202trait QuantileDispatcher<K> {
206 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<K>>;
207
208 fn _median(self) -> Option<K>;
209}
210
211impl<T> QuantileDispatcher<f64> for ChunkedArray<T>
212where
213 T: PolarsIntegerType,
214 T::Native: Ord,
215{
216 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f64>> {
217 self.quantile_faster(quantile, method)
218 }
219 fn _median(self) -> Option<f64> {
220 self.median_faster()
221 }
222}
223
224#[cfg(feature = "dtype-f16")]
225impl QuantileDispatcher<pf16> for Float16Chunked {
226 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<pf16>> {
227 self.quantile_faster(quantile, method)
228 }
229 fn _median(self) -> Option<pf16> {
230 self.median_faster()
231 }
232}
233
234impl QuantileDispatcher<f32> for Float32Chunked {
235 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f32>> {
236 self.quantile_faster(quantile, method)
237 }
238 fn _median(self) -> Option<f32> {
239 self.median_faster()
240 }
241}
242impl QuantileDispatcher<f64> for Float64Chunked {
243 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f64>> {
244 self.quantile_faster(quantile, method)
245 }
246 fn _median(self) -> Option<f64> {
247 self.median_faster()
248 }
249}
250
251unsafe fn agg_quantile_generic<T, K>(
252 ca: &ChunkedArray<T>,
253 groups: &GroupsType,
254 quantile: f64,
255 method: QuantileMethod,
256) -> Series
257where
258 T: PolarsNumericType,
259 ChunkedArray<T>: QuantileDispatcher<K::Native>,
260 K: PolarsNumericType,
261 <K as datatypes::PolarsNumericType>::Native: num_traits::Float + quantile_filter::SealedRolling,
262{
263 let invalid_quantile = !(0.0..=1.0).contains(&quantile);
264 if invalid_quantile {
265 return Series::full_null(ca.name().clone(), groups.len(), ca.dtype());
266 }
267 match groups {
268 GroupsType::Idx(groups) => {
269 let ca = ca.rechunk();
270 agg_helper_idx_on_all::<K, _>(groups, |idx| {
271 debug_assert!(idx.len() <= ca.len());
272 if idx.is_empty() {
273 return None;
274 }
275 let take = { ca.take_unchecked(idx) };
276 take._quantile(quantile, method).unwrap_unchecked()
278 })
279 },
280 GroupsType::Slice {
281 groups,
282 overlapping,
283 monotonic,
284 } => {
285 if _use_rolling_kernels(groups, *overlapping, *monotonic, ca.chunks()) {
286 let s = ca
288 .cast_with_options(&K::get_static_dtype(), CastOptions::Overflowing)
289 .unwrap();
290 let ca: &ChunkedArray<K> = s.as_ref().as_ref();
291 let arr = ca.downcast_iter().next().unwrap();
292 let values = arr.values().as_slice();
293 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
294 let arr = match arr.validity() {
295 None => _rolling_apply_agg_window_no_nulls::<QuantileWindow<_>, _, _>(
296 values,
297 offset_iter,
298 Some(RollingFnParams::Quantile(RollingQuantileParams {
299 prob: quantile,
300 method,
301 })),
302 ),
303 Some(validity) => {
304 _rolling_apply_agg_window_nulls::<rolling::nulls::QuantileWindow<_>, _, _>(
305 values,
306 validity,
307 offset_iter,
308 Some(RollingFnParams::Quantile(RollingQuantileParams {
309 prob: quantile,
310 method,
311 })),
312 )
313 },
314 };
315 ChunkedArray::<K>::with_chunk(PlSmallStr::EMPTY, arr).into_series()
318 } else {
319 _agg_helper_slice::<K, _>(groups, |[first, len]| {
320 debug_assert!(first + len <= ca.len() as IdxSize);
321 match len {
322 0 => None,
323 1 => ca.get(first as usize).map(|v| NumCast::from(v).unwrap()),
324 _ => {
325 let arr_group = _slice_from_offsets(ca, first, len);
326 arr_group
328 ._quantile(quantile, method)
329 .unwrap_unchecked()
330 .map(|flt| NumCast::from(flt).unwrap_unchecked())
331 },
332 }
333 })
334 }
335 },
336 }
337}
338
339unsafe fn agg_median_generic<T, K>(ca: &ChunkedArray<T>, groups: &GroupsType) -> Series
340where
341 T: PolarsNumericType,
342 ChunkedArray<T>: QuantileDispatcher<K::Native>,
343 K: PolarsNumericType,
344 <K as datatypes::PolarsNumericType>::Native: num_traits::Float + SealedRolling,
345{
346 match groups {
347 GroupsType::Idx(groups) => {
348 let ca = ca.rechunk();
349 agg_helper_idx_on_all::<K, _>(groups, |idx| {
350 debug_assert!(idx.len() <= ca.len());
351 if idx.is_empty() {
352 return None;
353 }
354 let take = { ca.take_unchecked(idx) };
355 take._median()
356 })
357 },
358 GroupsType::Slice { .. } => {
359 agg_quantile_generic::<T, K>(ca, groups, 0.5, QuantileMethod::Linear)
360 },
361 }
362}
363
364#[cfg(feature = "bitwise")]
368unsafe fn bitwise_agg<T: PolarsNumericType>(
369 ca: &ChunkedArray<T>,
370 groups: &GroupsType,
371 f: fn(&ChunkedArray<T>) -> Option<T::Native>,
372) -> Series
373where
374 ChunkedArray<T>: ChunkTakeUnchecked<[IdxSize]> + ChunkBitwiseReduce<Physical = T::Native>,
375{
376 let s = if groups.len() > 1 {
379 ca.rechunk()
380 } else {
381 Cow::Borrowed(ca)
382 };
383
384 match groups {
385 GroupsType::Idx(groups) => agg_helper_idx_on_all::<T, _>(groups, |idx| {
386 debug_assert!(idx.len() <= s.len());
387 if idx.is_empty() {
388 None
389 } else {
390 let take = unsafe { s.take_unchecked(idx) };
391 f(&take)
392 }
393 }),
394 GroupsType::Slice { groups, .. } => _agg_helper_slice::<T, _>(groups, |[first, len]| {
395 debug_assert!(len <= s.len() as IdxSize);
396 if len == 0 {
397 None
398 } else {
399 let take = _slice_from_offsets(&s, first, len);
400 f(&take)
401 }
402 }),
403 }
404}
405
406#[cfg(feature = "bitwise")]
407impl<T> ChunkedArray<T>
408where
409 T: PolarsNumericType,
410 ChunkedArray<T>: ChunkTakeUnchecked<[IdxSize]> + ChunkBitwiseReduce<Physical = T::Native>,
411{
412 pub(crate) unsafe fn agg_and(&self, groups: &GroupsType) -> Series {
416 unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::and_reduce) }
417 }
418
419 pub(crate) unsafe fn agg_or(&self, groups: &GroupsType) -> Series {
423 unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::or_reduce) }
424 }
425
426 pub(crate) unsafe fn agg_xor(&self, groups: &GroupsType) -> Series {
430 unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::xor_reduce) }
431 }
432}
433
434impl<T> ChunkedArray<T>
435where
436 T: PolarsNumericType + Sync,
437 T::Native: NativeType + PartialOrd + Num + NumCast + Zero + Bounded + std::iter::Sum<T::Native>,
438 ChunkedArray<T>: ChunkAgg<T::Native>,
439{
440 pub(crate) unsafe fn agg_min(&self, groups: &GroupsType) -> Series {
441 match self.is_sorted_flag() {
443 IsSorted::Ascending => return self.clone().into_series().agg_first_non_null(groups),
444 IsSorted::Descending => return self.clone().into_series().agg_last_non_null(groups),
445 _ => {},
446 }
447 match groups {
448 GroupsType::Idx(groups) => {
449 let ca = self.rechunk();
450 let arr = ca.downcast_iter().next().unwrap();
451 let no_nulls = arr.null_count() == 0;
452 _agg_helper_idx::<T, _>(groups, |(first, idx)| {
453 debug_assert!(idx.len() <= arr.len());
454 if idx.is_empty() {
455 None
456 } else if idx.len() == 1 {
457 arr.get(first as usize)
458 } else if no_nulls {
459 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
460 .reduce(|a, b| a.min_ignore_nan(b))
461 } else {
462 take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
463 .reduce(|a, b| a.min_ignore_nan(b))
464 }
465 })
466 },
467 GroupsType::Slice {
468 groups: groups_slice,
469 overlapping,
470 monotonic,
471 } => {
472 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
473 let arr = self.downcast_iter().next().unwrap();
474 let values = arr.values().as_slice();
475 let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
476 let arr = match arr.validity() {
477 None => _rolling_apply_agg_window_no_nulls::<MinWindow<_>, _, _>(
478 values,
479 offset_iter,
480 None,
481 ),
482 Some(validity) => _rolling_apply_agg_window_nulls::<
483 rolling::nulls::MinWindow<_>,
484 _,
485 _,
486 >(
487 values, validity, offset_iter, None
488 ),
489 };
490 Self::from(arr).into_series()
491 } else {
492 _agg_helper_slice::<T, _>(groups_slice, |[first, len]| {
493 debug_assert!(len <= self.len() as IdxSize);
494 match len {
495 0 => None,
496 1 => self.get(first as usize),
497 _ => {
498 let arr_group = _slice_from_offsets(self, first, len);
499 ChunkAgg::min(&arr_group)
500 },
501 }
502 })
503 }
504 },
505 }
506 }
507
508 pub(crate) unsafe fn agg_max(&self, groups: &GroupsType) -> Series {
509 match (self.is_sorted_flag(), self.null_count()) {
511 (IsSorted::Ascending, 0) => {
512 return self.clone().into_series().agg_last(groups);
513 },
514 (IsSorted::Descending, 0) => {
515 return self.clone().into_series().agg_first(groups);
516 },
517 _ => {},
518 }
519
520 match groups {
521 GroupsType::Idx(groups) => {
522 let ca = self.rechunk();
523 let arr = ca.downcast_iter().next().unwrap();
524 let no_nulls = arr.null_count() == 0;
525 _agg_helper_idx::<T, _>(groups, |(first, idx)| {
526 debug_assert!(idx.len() <= arr.len());
527 if idx.is_empty() {
528 None
529 } else if idx.len() == 1 {
530 arr.get(first as usize)
531 } else if no_nulls {
532 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
533 .reduce(|a, b| a.max_ignore_nan(b))
534 } else {
535 take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
536 .reduce(|a, b| a.max_ignore_nan(b))
537 }
538 })
539 },
540 GroupsType::Slice {
541 groups: groups_slice,
542 overlapping,
543 monotonic,
544 } => {
545 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
546 let arr = self.downcast_iter().next().unwrap();
547 let values = arr.values().as_slice();
548 let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
549 let arr = match arr.validity() {
550 None => _rolling_apply_agg_window_no_nulls::<MaxWindow<_>, _, _>(
551 values,
552 offset_iter,
553 None,
554 ),
555 Some(validity) => _rolling_apply_agg_window_nulls::<
556 rolling::nulls::MaxWindow<_>,
557 _,
558 _,
559 >(
560 values, validity, offset_iter, None
561 ),
562 };
563 Self::from(arr).into_series()
564 } else {
565 _agg_helper_slice::<T, _>(groups_slice, |[first, len]| {
566 debug_assert!(len <= self.len() as IdxSize);
567 match len {
568 0 => None,
569 1 => self.get(first as usize),
570 _ => {
571 let arr_group = _slice_from_offsets(self, first, len);
572 ChunkAgg::max(&arr_group)
573 },
574 }
575 })
576 }
577 },
578 }
579 }
580
581 pub(crate) unsafe fn agg_sum(&self, groups: &GroupsType) -> Series {
582 match groups {
583 GroupsType::Idx(groups) => {
584 let ca = self.rechunk();
585 let arr = ca.downcast_iter().next().unwrap();
586 let no_nulls = arr.null_count() == 0;
587 _agg_helper_idx_no_null::<T, _>(groups, |(first, idx)| {
588 debug_assert!(idx.len() <= self.len());
589 if idx.is_empty() {
590 T::Native::zero()
591 } else if idx.len() == 1 {
592 arr.get(first as usize).unwrap_or(T::Native::zero())
593 } else if no_nulls {
594 if T::Native::is_float() {
595 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
596 .fold(KahanSum::default(), |k, x| k + x)
597 .sum()
598 } else {
599 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
600 .fold(T::Native::zero(), |a, b| a + b)
601 }
602 } else if T::Native::is_float() {
603 take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
604 .fold(KahanSum::default(), |k, x| k + x)
605 .sum()
606 } else {
607 take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
608 .fold(T::Native::zero(), |a, b| a + b)
609 }
610 })
611 },
612 GroupsType::Slice {
613 groups,
614 overlapping,
615 monotonic,
616 } => {
617 if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
618 let arr = self.downcast_iter().next().unwrap();
619 let values = arr.values().as_slice();
620 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
621 let arr = match arr.validity() {
622 None => _rolling_apply_agg_window_no_nulls::<
623 SumWindow<T::Native, T::Native>,
624 _,
625 _,
626 >(values, offset_iter, None),
627 Some(validity) => _rolling_apply_agg_window_nulls::<
628 SumWindow<T::Native, T::Native>,
629 _,
630 _,
631 >(
632 values, validity, offset_iter, None
633 ),
634 };
635 Self::from(arr).into_series()
636 } else {
637 _agg_helper_slice_no_null::<T, _>(groups, |[first, len]| {
638 debug_assert!(len <= self.len() as IdxSize);
639 match len {
640 0 => T::Native::zero(),
641 1 => self.get(first as usize).unwrap_or(T::Native::zero()),
642 _ => {
643 let arr_group = _slice_from_offsets(self, first, len);
644 arr_group.sum().unwrap_or(T::Native::zero())
645 },
646 }
647 })
648 }
649 },
650 }
651 }
652}
653
654impl<T> SeriesWrap<ChunkedArray<T>>
655where
656 T: PolarsFloatType,
657 ChunkedArray<T>: ChunkVar
658 + VarAggSeries
659 + ChunkQuantile<T::Native>
660 + QuantileAggSeries
661 + ChunkAgg<T::Native>,
662 T::Native: Pow<T::Native, Output = T::Native>,
663{
664 pub(crate) unsafe fn agg_mean(&self, groups: &GroupsType) -> Series {
665 match groups {
666 GroupsType::Idx(groups) => {
667 let ca = self.rechunk();
668 let arr = ca.downcast_iter().next().unwrap();
669 let no_nulls = arr.null_count() == 0;
670 _agg_helper_idx::<T, _>(groups, |(first, idx)| {
671 debug_assert!(idx.len() <= self.len());
677 let out = if idx.is_empty() {
678 None
679 } else if idx.len() == 1 {
680 arr.get(first as usize).map(|sum| sum.to_f64().unwrap())
681 } else if no_nulls {
682 Some(
683 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
684 .fold(KahanSum::default(), |a, b| {
685 a + b.to_f64().unwrap_unchecked()
686 })
687 .sum()
688 / idx.len() as f64,
689 )
690 } else {
691 take_agg_primitive_iter_unchecked_count_nulls(
692 arr,
693 idx2usize(idx),
694 KahanSum::default(),
695 |a, b| a + b.to_f64().unwrap_unchecked(),
696 idx.len() as IdxSize,
697 )
698 .map(|(sum, null_count)| sum.sum() / (idx.len() as f64 - null_count as f64))
699 };
700 out.map(|flt| NumCast::from(flt).unwrap())
701 })
702 },
703 GroupsType::Slice {
704 groups,
705 overlapping,
706 monotonic,
707 } => {
708 if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
709 let arr = self.downcast_iter().next().unwrap();
710 let values = arr.values().as_slice();
711 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
712 let arr = match arr.validity() {
713 None => _rolling_apply_agg_window_no_nulls::<MeanWindow<_>, _, _>(
714 values,
715 offset_iter,
716 None,
717 ),
718 Some(validity) => _rolling_apply_agg_window_nulls::<MeanWindow<_>, _, _>(
719 values,
720 validity,
721 offset_iter,
722 None,
723 ),
724 };
725 ChunkedArray::<T>::from(arr).into_series()
726 } else {
727 _agg_helper_slice::<T, _>(groups, |[first, len]| {
728 debug_assert!(len <= self.len() as IdxSize);
729 match len {
730 0 => None,
731 1 => self.get(first as usize),
732 _ => {
733 let arr_group = _slice_from_offsets(self, first, len);
734 arr_group.mean().map(|flt| NumCast::from(flt).unwrap())
735 },
736 }
737 })
738 }
739 },
740 }
741 }
742
743 pub(crate) unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series
744 where
745 <T as datatypes::PolarsNumericType>::Native: num_traits::Float,
746 {
747 let ca = &self.0.rechunk();
748 match groups {
749 GroupsType::Idx(groups) => {
750 let ca = ca.rechunk();
751 let arr = ca.downcast_iter().next().unwrap();
752 let no_nulls = arr.null_count() == 0;
753 agg_helper_idx_on_all::<T, _>(groups, |idx| {
754 debug_assert!(idx.len() <= ca.len());
755 if idx.is_empty() {
756 return None;
757 }
758 let out = if no_nulls {
759 take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
760 } else {
761 take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
762 };
763 out.map(|flt| NumCast::from(flt).unwrap())
764 })
765 },
766 GroupsType::Slice {
767 groups,
768 overlapping,
769 monotonic,
770 } => {
771 if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
772 let arr = self.downcast_iter().next().unwrap();
773 let values = arr.values().as_slice();
774 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
775 let arr = match arr.validity() {
776 None => _rolling_apply_agg_window_no_nulls::<
777 MomentWindow<_, VarianceMoment>,
778 _,
779 _,
780 >(
781 values,
782 offset_iter,
783 Some(RollingFnParams::Var(RollingVarParams { ddof })),
784 ),
785 Some(validity) => _rolling_apply_agg_window_nulls::<
786 rolling::nulls::MomentWindow<_, VarianceMoment>,
787 _,
788 _,
789 >(
790 values,
791 validity,
792 offset_iter,
793 Some(RollingFnParams::Var(RollingVarParams { ddof })),
794 ),
795 };
796 ChunkedArray::<T>::from(arr).into_series()
797 } else {
798 _agg_helper_slice::<T, _>(groups, |[first, len]| {
799 debug_assert!(len <= self.len() as IdxSize);
800 match len {
801 0 => None,
802 1 => {
803 if ddof == 0 {
804 NumCast::from(0)
805 } else {
806 None
807 }
808 },
809 _ => {
810 let arr_group = _slice_from_offsets(self, first, len);
811 arr_group.var(ddof).map(|flt| NumCast::from(flt).unwrap())
812 },
813 }
814 })
815 }
816 },
817 }
818 }
819 pub(crate) unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series
820 where
821 <T as datatypes::PolarsNumericType>::Native: num_traits::Float,
822 {
823 let ca = &self.0.rechunk();
824 match groups {
825 GroupsType::Idx(groups) => {
826 let arr = ca.downcast_iter().next().unwrap();
827 let no_nulls = arr.null_count() == 0;
828 agg_helper_idx_on_all::<T, _>(groups, |idx| {
829 debug_assert!(idx.len() <= ca.len());
830 if idx.is_empty() {
831 return None;
832 }
833 let out = if no_nulls {
834 take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
835 } else {
836 take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
837 };
838 out.map(|flt| NumCast::from(flt.sqrt()).unwrap())
839 })
840 },
841 GroupsType::Slice {
842 groups,
843 overlapping,
844 monotonic,
845 } => {
846 if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
847 let arr = ca.downcast_iter().next().unwrap();
848 let values = arr.values().as_slice();
849 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
850 let arr = match arr.validity() {
851 None => _rolling_apply_agg_window_no_nulls::<
852 MomentWindow<_, VarianceMoment>,
853 _,
854 _,
855 >(
856 values,
857 offset_iter,
858 Some(RollingFnParams::Var(RollingVarParams { ddof })),
859 ),
860 Some(validity) => _rolling_apply_agg_window_nulls::<
861 rolling::nulls::MomentWindow<_, rolling::nulls::VarianceMoment>,
862 _,
863 _,
864 >(
865 values,
866 validity,
867 offset_iter,
868 Some(RollingFnParams::Var(RollingVarParams { ddof })),
869 ),
870 };
871
872 let mut ca = ChunkedArray::<T>::from(arr);
873 ca.apply_mut(|v| v.powf(NumCast::from(0.5).unwrap()));
874 ca.into_series()
875 } else {
876 _agg_helper_slice::<T, _>(groups, |[first, len]| {
877 debug_assert!(len <= self.len() as IdxSize);
878 match len {
879 0 => None,
880 1 => {
881 if ddof == 0 {
882 NumCast::from(0)
883 } else {
884 None
885 }
886 },
887 _ => {
888 let arr_group = _slice_from_offsets(self, first, len);
889 arr_group.std(ddof).map(|flt| NumCast::from(flt).unwrap())
890 },
891 }
892 })
893 }
894 },
895 }
896 }
897}
898
899impl Float32Chunked {
900 pub(crate) unsafe fn agg_quantile(
901 &self,
902 groups: &GroupsType,
903 quantile: f64,
904 method: QuantileMethod,
905 ) -> Series {
906 agg_quantile_generic::<_, Float32Type>(self, groups, quantile, method)
907 }
908 pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
909 agg_median_generic::<_, Float32Type>(self, groups)
910 }
911}
912impl Float64Chunked {
913 pub(crate) unsafe fn agg_quantile(
914 &self,
915 groups: &GroupsType,
916 quantile: f64,
917 method: QuantileMethod,
918 ) -> Series {
919 agg_quantile_generic::<_, Float64Type>(self, groups, quantile, method)
920 }
921 pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
922 agg_median_generic::<_, Float64Type>(self, groups)
923 }
924}
925
926impl<T> ChunkedArray<T>
927where
928 T: PolarsIntegerType,
929 ChunkedArray<T>: ChunkAgg<T::Native> + ChunkVar,
930 T::Native: NumericNative + Ord,
931{
932 pub(crate) unsafe fn agg_mean(&self, groups: &GroupsType) -> Series {
933 match groups {
934 GroupsType::Idx(groups) => {
935 let ca = self.rechunk();
936 let arr = ca.downcast_get(0).unwrap();
937 _agg_helper_idx::<Float64Type, _>(groups, |(first, idx)| {
938 debug_assert!(idx.len() <= self.len());
944 if idx.is_empty() {
945 None
946 } else if idx.len() == 1 {
947 self.get(first as usize).map(|sum| sum.to_f64().unwrap())
948 } else {
949 match (self.has_nulls(), self.chunks.len()) {
950 (false, 1) => Some(
951 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
952 .fold(KahanSum::default(), |a, b| a + b.to_f64().unwrap())
953 .sum()
954 / idx.len() as f64,
955 ),
956 (_, 1) => {
957 take_agg_primitive_iter_unchecked_count_nulls(
958 arr,
959 idx2usize(idx),
960 KahanSum::default(),
961 |a, b| a + b.to_f64().unwrap(),
962 idx.len() as IdxSize,
963 )
964 }
965 .map(|(sum, null_count)| {
966 sum.sum() / (idx.len() as f64 - null_count as f64)
967 }),
968 _ => {
969 let take = { self.take_unchecked(idx) };
970 take.mean()
971 },
972 }
973 }
974 })
975 },
976 GroupsType::Slice {
977 groups: groups_slice,
978 overlapping,
979 monotonic,
980 } => {
981 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
982 let ca = self
983 .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
984 .unwrap();
985 ca.agg_mean(groups)
986 } else {
987 _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
988 debug_assert!(first + len <= self.len() as IdxSize);
989 match len {
990 0 => None,
991 1 => self.get(first as usize).map(|v| NumCast::from(v).unwrap()),
992 _ => {
993 let arr_group = _slice_from_offsets(self, first, len);
994 arr_group.mean()
995 },
996 }
997 })
998 }
999 },
1000 }
1001 }
1002
1003 pub(crate) unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series {
1004 match groups {
1005 GroupsType::Idx(groups) => {
1006 let ca_self = self.rechunk();
1007 let arr = ca_self.downcast_iter().next().unwrap();
1008 let no_nulls = arr.null_count() == 0;
1009 agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
1010 debug_assert!(idx.len() <= arr.len());
1011 if idx.is_empty() {
1012 return None;
1013 }
1014 if no_nulls {
1015 take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1016 } else {
1017 take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1018 }
1019 })
1020 },
1021 GroupsType::Slice {
1022 groups: groups_slice,
1023 overlapping,
1024 monotonic,
1025 } => {
1026 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
1027 let ca = self
1028 .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
1029 .unwrap();
1030 ca.agg_var(groups, ddof)
1031 } else {
1032 _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
1033 debug_assert!(first + len <= self.len() as IdxSize);
1034 match len {
1035 0 => None,
1036 1 => {
1037 if ddof == 0 {
1038 NumCast::from(0)
1039 } else {
1040 None
1041 }
1042 },
1043 _ => {
1044 let arr_group = _slice_from_offsets(self, first, len);
1045 arr_group.var(ddof)
1046 },
1047 }
1048 })
1049 }
1050 },
1051 }
1052 }
1053 pub(crate) unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series {
1054 match groups {
1055 GroupsType::Idx(groups) => {
1056 let ca_self = self.rechunk();
1057 let arr = ca_self.downcast_iter().next().unwrap();
1058 let no_nulls = arr.null_count() == 0;
1059 agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
1060 debug_assert!(idx.len() <= self.len());
1061 if idx.is_empty() {
1062 return None;
1063 }
1064 let out = if no_nulls {
1065 take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1066 } else {
1067 take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1068 };
1069 out.map(|v| v.sqrt())
1070 })
1071 },
1072 GroupsType::Slice {
1073 groups: groups_slice,
1074 overlapping,
1075 monotonic,
1076 } => {
1077 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
1078 let ca = self
1079 .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
1080 .unwrap();
1081 ca.agg_std(groups, ddof)
1082 } else {
1083 _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
1084 debug_assert!(first + len <= self.len() as IdxSize);
1085 match len {
1086 0 => None,
1087 1 => {
1088 if ddof == 0 {
1089 NumCast::from(0)
1090 } else {
1091 None
1092 }
1093 },
1094 _ => {
1095 let arr_group = _slice_from_offsets(self, first, len);
1096 arr_group.std(ddof)
1097 },
1098 }
1099 })
1100 }
1101 },
1102 }
1103 }
1104
1105 pub(crate) unsafe fn agg_quantile(
1106 &self,
1107 groups: &GroupsType,
1108 quantile: f64,
1109 method: QuantileMethod,
1110 ) -> Series {
1111 agg_quantile_generic::<_, Float64Type>(self, groups, quantile, method)
1112 }
1113 pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
1114 agg_median_generic::<_, Float64Type>(self, groups)
1115 }
1116}