polars_core/frame/group_by/aggregations/
mod.rs

1mod agg_list;
2mod boolean;
3mod dispatch;
4mod string;
5
6use std::borrow::Cow;
7
8pub use agg_list::*;
9use arrow::bitmap::{Bitmap, MutableBitmap};
10use arrow::legacy::kernels::take_agg::*;
11use arrow::legacy::trusted_len::TrustedLenPush;
12use arrow::types::NativeType;
13use num_traits::pow::Pow;
14use num_traits::{Bounded, Float, Num, NumCast, ToPrimitive, Zero};
15use polars_compute::rolling::no_nulls::{
16    MaxWindow, MinWindow, MomentWindow, QuantileWindow, RollingAggWindowNoNulls,
17};
18use polars_compute::rolling::nulls::{RollingAggWindowNulls, VarianceMoment};
19use polars_compute::rolling::quantile_filter::SealedRolling;
20use polars_compute::rolling::{
21    self, MeanWindow, QuantileMethod, RollingFnParams, RollingQuantileParams, RollingVarParams,
22    SumWindow, quantile_filter,
23};
24use polars_utils::float::IsFloat;
25#[cfg(feature = "dtype-f16")]
26use polars_utils::float16::pf16;
27use polars_utils::idx_vec::IdxVec;
28use polars_utils::kahan_sum::KahanSum;
29use polars_utils::min_max::MinMax;
30use rayon::prelude::*;
31
32use crate::POOL;
33use crate::chunked_array::cast::CastOptions;
34#[cfg(feature = "object")]
35use crate::chunked_array::object::extension::create_extension;
36use crate::frame::group_by::GroupsIdx;
37#[cfg(feature = "object")]
38use crate::frame::group_by::GroupsIndicator;
39use crate::prelude::*;
40use crate::series::IsSorted;
41use crate::series::implementations::SeriesWrap;
42use crate::utils::NoNull;
43
44fn idx2usize(idx: &[IdxSize]) -> impl ExactSizeIterator<Item = usize> + '_ {
45    idx.iter().map(|i| *i as usize)
46}
47
48// if the windows overlap, we can use the rolling_<agg> kernels
49// they maintain state, which saves a lot of compute by not naively traversing all elements every
50// window
51//
52// if the windows don't overlap, we should not use these kernels as they are single threaded, so
53// we miss out on easy parallelization.
54pub fn _use_rolling_kernels(
55    groups: &GroupsSlice,
56    overlapping: bool,
57    monotonic: bool,
58    chunks: &[ArrayRef],
59) -> bool {
60    match groups.len() {
61        0 | 1 => false,
62        _ => overlapping && monotonic && chunks.len() == 1,
63    }
64}
65
66// Use an aggregation window that maintains the state
67pub fn _rolling_apply_agg_window_nulls<'a, Agg, T, O>(
68    values: &'a [T],
69    validity: &'a Bitmap,
70    offsets: O,
71    params: Option<RollingFnParams>,
72) -> PrimitiveArray<T>
73where
74    O: Iterator<Item = (IdxSize, IdxSize)> + TrustedLen,
75    Agg: RollingAggWindowNulls<'a, T>,
76    T: IsFloat + NativeType,
77{
78    if values.is_empty() {
79        let out: Vec<T> = vec![];
80        return PrimitiveArray::new(T::PRIMITIVE.into(), out.into(), None);
81    }
82
83    // This iterators length can be trusted
84    // these represent the number of groups in the group_by operation
85    let output_len = offsets.size_hint().0;
86    // start with a dummy index, will be overwritten on first iteration.
87    // SAFETY:
88    // we are in bounds
89    let mut agg_window = unsafe { Agg::new(values, validity, 0, 0, params, None) };
90
91    let mut validity = MutableBitmap::with_capacity(output_len);
92    validity.extend_constant(output_len, true);
93
94    let out = offsets
95        .enumerate()
96        .map(|(idx, (start, len))| {
97            let end = start + len;
98
99            // SAFETY:
100            // we are in bounds
101            let agg = unsafe { agg_window.update(start as usize, end as usize) };
102
103            match agg {
104                Some(val) => val,
105                None => {
106                    // SAFETY: we are in bounds
107                    unsafe { validity.set_unchecked(idx, false) };
108                    T::default()
109                },
110            }
111        })
112        .collect_trusted::<Vec<_>>();
113
114    PrimitiveArray::new(T::PRIMITIVE.into(), out.into(), Some(validity.into()))
115}
116
117// Use an aggregation window that maintains the state.
118pub fn _rolling_apply_agg_window_no_nulls<'a, Agg, T, O>(
119    values: &'a [T],
120    offsets: O,
121    params: Option<RollingFnParams>,
122) -> PrimitiveArray<T>
123where
124    // items (offset, len) -> so offsets are offset, offset + len
125    Agg: RollingAggWindowNoNulls<'a, T>,
126    O: Iterator<Item = (IdxSize, IdxSize)> + TrustedLen,
127    T: IsFloat + NativeType,
128{
129    if values.is_empty() {
130        let out: Vec<T> = vec![];
131        return PrimitiveArray::new(T::PRIMITIVE.into(), out.into(), None);
132    }
133    // start with a dummy index, will be overwritten on first iteration.
134    let mut agg_window = Agg::new(values, 0, 0, params, None);
135
136    offsets
137        .map(|(start, len)| {
138            let end = start + len;
139
140            // SAFETY: we are in bounds.
141            unsafe { agg_window.update(start as usize, end as usize) }
142        })
143        .collect::<PrimitiveArray<T>>()
144}
145
146pub fn _slice_from_offsets<T>(ca: &ChunkedArray<T>, first: IdxSize, len: IdxSize) -> ChunkedArray<T>
147where
148    T: PolarsDataType,
149{
150    ca.slice(first as i64, len as usize)
151}
152
153/// Helper that combines the groups into a parallel iterator over `(first, all): (u32, &Vec<u32>)`.
154pub fn _agg_helper_idx<T, F>(groups: &GroupsIdx, f: F) -> Series
155where
156    F: Fn((IdxSize, &IdxVec)) -> Option<T::Native> + Send + Sync,
157    T: PolarsNumericType,
158{
159    let ca: ChunkedArray<T> = POOL.install(|| groups.into_par_iter().map(f).collect());
160    ca.into_series()
161}
162
163/// Same helper as `_agg_helper_idx` but for aggregations that don't return an Option.
164pub fn _agg_helper_idx_no_null<T, F>(groups: &GroupsIdx, f: F) -> Series
165where
166    F: Fn((IdxSize, &IdxVec)) -> T::Native + Send + Sync,
167    T: PolarsNumericType,
168{
169    let ca: NoNull<ChunkedArray<T>> = POOL.install(|| groups.into_par_iter().map(f).collect());
170    ca.into_inner().into_series()
171}
172
173/// Helper that iterates on the `all: Vec<Vec<u32>` collection,
174/// this doesn't have traverse the `first: Vec<u32>` memory and is therefore faster.
175fn agg_helper_idx_on_all<T, F>(groups: &GroupsIdx, f: F) -> Series
176where
177    F: Fn(&IdxVec) -> Option<T::Native> + Send + Sync,
178    T: PolarsNumericType,
179{
180    let ca: ChunkedArray<T> = POOL.install(|| groups.all().into_par_iter().map(f).collect());
181    ca.into_series()
182}
183
184pub fn _agg_helper_slice<T, F>(groups: &[[IdxSize; 2]], f: F) -> Series
185where
186    F: Fn([IdxSize; 2]) -> Option<T::Native> + Send + Sync,
187    T: PolarsNumericType,
188{
189    let ca: ChunkedArray<T> = POOL.install(|| groups.par_iter().copied().map(f).collect());
190    ca.into_series()
191}
192
193pub fn _agg_helper_slice_no_null<T, F>(groups: &[[IdxSize; 2]], f: F) -> Series
194where
195    F: Fn([IdxSize; 2]) -> T::Native + Send + Sync,
196    T: PolarsNumericType,
197{
198    let ca: NoNull<ChunkedArray<T>> = POOL.install(|| groups.par_iter().copied().map(f).collect());
199    ca.into_inner().into_series()
200}
201
202/// Intermediate helper trait so we can have a single generic implementation
203/// This trait will ensure the specific dispatch works without complicating
204/// the trait bounds.
205trait QuantileDispatcher<K> {
206    fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<K>>;
207
208    fn _median(self) -> Option<K>;
209}
210
211impl<T> QuantileDispatcher<f64> for ChunkedArray<T>
212where
213    T: PolarsIntegerType,
214    T::Native: Ord,
215{
216    fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f64>> {
217        self.quantile_faster(quantile, method)
218    }
219    fn _median(self) -> Option<f64> {
220        self.median_faster()
221    }
222}
223
224#[cfg(feature = "dtype-f16")]
225impl QuantileDispatcher<pf16> for Float16Chunked {
226    fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<pf16>> {
227        self.quantile_faster(quantile, method)
228    }
229    fn _median(self) -> Option<pf16> {
230        self.median_faster()
231    }
232}
233
234impl QuantileDispatcher<f32> for Float32Chunked {
235    fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f32>> {
236        self.quantile_faster(quantile, method)
237    }
238    fn _median(self) -> Option<f32> {
239        self.median_faster()
240    }
241}
242impl QuantileDispatcher<f64> for Float64Chunked {
243    fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f64>> {
244        self.quantile_faster(quantile, method)
245    }
246    fn _median(self) -> Option<f64> {
247        self.median_faster()
248    }
249}
250
251unsafe fn agg_quantile_generic<T, K>(
252    ca: &ChunkedArray<T>,
253    groups: &GroupsType,
254    quantile: f64,
255    method: QuantileMethod,
256) -> Series
257where
258    T: PolarsNumericType,
259    ChunkedArray<T>: QuantileDispatcher<K::Native>,
260    K: PolarsNumericType,
261    <K as datatypes::PolarsNumericType>::Native: num_traits::Float + quantile_filter::SealedRolling,
262{
263    let invalid_quantile = !(0.0..=1.0).contains(&quantile);
264    if invalid_quantile {
265        return Series::full_null(ca.name().clone(), groups.len(), ca.dtype());
266    }
267    match groups {
268        GroupsType::Idx(groups) => {
269            let ca = ca.rechunk();
270            agg_helper_idx_on_all::<K, _>(groups, |idx| {
271                debug_assert!(idx.len() <= ca.len());
272                if idx.is_empty() {
273                    return None;
274                }
275                let take = { ca.take_unchecked(idx) };
276                // checked with invalid quantile check
277                take._quantile(quantile, method).unwrap_unchecked()
278            })
279        },
280        GroupsType::Slice {
281            groups,
282            overlapping,
283            monotonic,
284        } => {
285            if _use_rolling_kernels(groups, *overlapping, *monotonic, ca.chunks()) {
286                // this cast is a no-op for floats
287                let s = ca
288                    .cast_with_options(&K::get_static_dtype(), CastOptions::Overflowing)
289                    .unwrap();
290                let ca: &ChunkedArray<K> = s.as_ref().as_ref();
291                let arr = ca.downcast_iter().next().unwrap();
292                let values = arr.values().as_slice();
293                let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
294                let arr = match arr.validity() {
295                    None => _rolling_apply_agg_window_no_nulls::<QuantileWindow<_>, _, _>(
296                        values,
297                        offset_iter,
298                        Some(RollingFnParams::Quantile(RollingQuantileParams {
299                            prob: quantile,
300                            method,
301                        })),
302                    ),
303                    Some(validity) => {
304                        _rolling_apply_agg_window_nulls::<rolling::nulls::QuantileWindow<_>, _, _>(
305                            values,
306                            validity,
307                            offset_iter,
308                            Some(RollingFnParams::Quantile(RollingQuantileParams {
309                                prob: quantile,
310                                method,
311                            })),
312                        )
313                    },
314                };
315                // The rolling kernels works on the dtype, this is not yet the
316                // float output type we need.
317                ChunkedArray::<K>::with_chunk(PlSmallStr::EMPTY, arr).into_series()
318            } else {
319                _agg_helper_slice::<K, _>(groups, |[first, len]| {
320                    debug_assert!(first + len <= ca.len() as IdxSize);
321                    match len {
322                        0 => None,
323                        1 => ca.get(first as usize).map(|v| NumCast::from(v).unwrap()),
324                        _ => {
325                            let arr_group = _slice_from_offsets(ca, first, len);
326                            // unwrap checked with invalid quantile check
327                            arr_group
328                                ._quantile(quantile, method)
329                                .unwrap_unchecked()
330                                .map(|flt| NumCast::from(flt).unwrap_unchecked())
331                        },
332                    }
333                })
334            }
335        },
336    }
337}
338
339unsafe fn agg_median_generic<T, K>(ca: &ChunkedArray<T>, groups: &GroupsType) -> Series
340where
341    T: PolarsNumericType,
342    ChunkedArray<T>: QuantileDispatcher<K::Native>,
343    K: PolarsNumericType,
344    <K as datatypes::PolarsNumericType>::Native: num_traits::Float + SealedRolling,
345{
346    match groups {
347        GroupsType::Idx(groups) => {
348            let ca = ca.rechunk();
349            agg_helper_idx_on_all::<K, _>(groups, |idx| {
350                debug_assert!(idx.len() <= ca.len());
351                if idx.is_empty() {
352                    return None;
353                }
354                let take = { ca.take_unchecked(idx) };
355                take._median()
356            })
357        },
358        GroupsType::Slice { .. } => {
359            agg_quantile_generic::<T, K>(ca, groups, 0.5, QuantileMethod::Linear)
360        },
361    }
362}
363
364/// # Safety
365///
366/// No bounds checks on `groups`.
367#[cfg(feature = "bitwise")]
368unsafe fn bitwise_agg<T: PolarsNumericType>(
369    ca: &ChunkedArray<T>,
370    groups: &GroupsType,
371    f: fn(&ChunkedArray<T>) -> Option<T::Native>,
372) -> Series
373where
374    ChunkedArray<T>: ChunkTakeUnchecked<[IdxSize]> + ChunkBitwiseReduce<Physical = T::Native>,
375{
376    // Prevent a rechunk for every individual group.
377
378    let s = if groups.len() > 1 {
379        ca.rechunk()
380    } else {
381        Cow::Borrowed(ca)
382    };
383
384    match groups {
385        GroupsType::Idx(groups) => agg_helper_idx_on_all::<T, _>(groups, |idx| {
386            debug_assert!(idx.len() <= s.len());
387            if idx.is_empty() {
388                None
389            } else {
390                let take = unsafe { s.take_unchecked(idx) };
391                f(&take)
392            }
393        }),
394        GroupsType::Slice { groups, .. } => _agg_helper_slice::<T, _>(groups, |[first, len]| {
395            debug_assert!(len <= s.len() as IdxSize);
396            if len == 0 {
397                None
398            } else {
399                let take = _slice_from_offsets(&s, first, len);
400                f(&take)
401            }
402        }),
403    }
404}
405
406#[cfg(feature = "bitwise")]
407impl<T> ChunkedArray<T>
408where
409    T: PolarsNumericType,
410    ChunkedArray<T>: ChunkTakeUnchecked<[IdxSize]> + ChunkBitwiseReduce<Physical = T::Native>,
411{
412    /// # Safety
413    ///
414    /// No bounds checks on `groups`.
415    pub(crate) unsafe fn agg_and(&self, groups: &GroupsType) -> Series {
416        unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::and_reduce) }
417    }
418
419    /// # Safety
420    ///
421    /// No bounds checks on `groups`.
422    pub(crate) unsafe fn agg_or(&self, groups: &GroupsType) -> Series {
423        unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::or_reduce) }
424    }
425
426    /// # Safety
427    ///
428    /// No bounds checks on `groups`.
429    pub(crate) unsafe fn agg_xor(&self, groups: &GroupsType) -> Series {
430        unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::xor_reduce) }
431    }
432}
433
434impl<T> ChunkedArray<T>
435where
436    T: PolarsNumericType + Sync,
437    T::Native: NativeType + PartialOrd + Num + NumCast + Zero + Bounded + std::iter::Sum<T::Native>,
438    ChunkedArray<T>: ChunkAgg<T::Native>,
439{
440    pub(crate) unsafe fn agg_min(&self, groups: &GroupsType) -> Series {
441        // faster paths
442        match self.is_sorted_flag() {
443            IsSorted::Ascending => return self.clone().into_series().agg_first_non_null(groups),
444            IsSorted::Descending => return self.clone().into_series().agg_last_non_null(groups),
445            _ => {},
446        }
447        match groups {
448            GroupsType::Idx(groups) => {
449                let ca = self.rechunk();
450                let arr = ca.downcast_iter().next().unwrap();
451                let no_nulls = arr.null_count() == 0;
452                _agg_helper_idx::<T, _>(groups, |(first, idx)| {
453                    debug_assert!(idx.len() <= arr.len());
454                    if idx.is_empty() {
455                        None
456                    } else if idx.len() == 1 {
457                        arr.get(first as usize)
458                    } else if no_nulls {
459                        take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
460                            .reduce(|a, b| a.min_ignore_nan(b))
461                    } else {
462                        take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
463                            .reduce(|a, b| a.min_ignore_nan(b))
464                    }
465                })
466            },
467            GroupsType::Slice {
468                groups: groups_slice,
469                overlapping,
470                monotonic,
471            } => {
472                if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
473                    let arr = self.downcast_iter().next().unwrap();
474                    let values = arr.values().as_slice();
475                    let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
476                    let arr = match arr.validity() {
477                        None => _rolling_apply_agg_window_no_nulls::<MinWindow<_>, _, _>(
478                            values,
479                            offset_iter,
480                            None,
481                        ),
482                        Some(validity) => _rolling_apply_agg_window_nulls::<
483                            rolling::nulls::MinWindow<_>,
484                            _,
485                            _,
486                        >(
487                            values, validity, offset_iter, None
488                        ),
489                    };
490                    Self::from(arr).into_series()
491                } else {
492                    _agg_helper_slice::<T, _>(groups_slice, |[first, len]| {
493                        debug_assert!(len <= self.len() as IdxSize);
494                        match len {
495                            0 => None,
496                            1 => self.get(first as usize),
497                            _ => {
498                                let arr_group = _slice_from_offsets(self, first, len);
499                                ChunkAgg::min(&arr_group)
500                            },
501                        }
502                    })
503                }
504            },
505        }
506    }
507
508    pub(crate) unsafe fn agg_max(&self, groups: &GroupsType) -> Series {
509        // faster paths
510        match (self.is_sorted_flag(), self.null_count()) {
511            (IsSorted::Ascending, 0) => {
512                return self.clone().into_series().agg_last(groups);
513            },
514            (IsSorted::Descending, 0) => {
515                return self.clone().into_series().agg_first(groups);
516            },
517            _ => {},
518        }
519
520        match groups {
521            GroupsType::Idx(groups) => {
522                let ca = self.rechunk();
523                let arr = ca.downcast_iter().next().unwrap();
524                let no_nulls = arr.null_count() == 0;
525                _agg_helper_idx::<T, _>(groups, |(first, idx)| {
526                    debug_assert!(idx.len() <= arr.len());
527                    if idx.is_empty() {
528                        None
529                    } else if idx.len() == 1 {
530                        arr.get(first as usize)
531                    } else if no_nulls {
532                        take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
533                            .reduce(|a, b| a.max_ignore_nan(b))
534                    } else {
535                        take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
536                            .reduce(|a, b| a.max_ignore_nan(b))
537                    }
538                })
539            },
540            GroupsType::Slice {
541                groups: groups_slice,
542                overlapping,
543                monotonic,
544            } => {
545                if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
546                    let arr = self.downcast_iter().next().unwrap();
547                    let values = arr.values().as_slice();
548                    let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
549                    let arr = match arr.validity() {
550                        None => _rolling_apply_agg_window_no_nulls::<MaxWindow<_>, _, _>(
551                            values,
552                            offset_iter,
553                            None,
554                        ),
555                        Some(validity) => _rolling_apply_agg_window_nulls::<
556                            rolling::nulls::MaxWindow<_>,
557                            _,
558                            _,
559                        >(
560                            values, validity, offset_iter, None
561                        ),
562                    };
563                    Self::from(arr).into_series()
564                } else {
565                    _agg_helper_slice::<T, _>(groups_slice, |[first, len]| {
566                        debug_assert!(len <= self.len() as IdxSize);
567                        match len {
568                            0 => None,
569                            1 => self.get(first as usize),
570                            _ => {
571                                let arr_group = _slice_from_offsets(self, first, len);
572                                ChunkAgg::max(&arr_group)
573                            },
574                        }
575                    })
576                }
577            },
578        }
579    }
580
581    pub(crate) unsafe fn agg_sum(&self, groups: &GroupsType) -> Series {
582        match groups {
583            GroupsType::Idx(groups) => {
584                let ca = self.rechunk();
585                let arr = ca.downcast_iter().next().unwrap();
586                let no_nulls = arr.null_count() == 0;
587                _agg_helper_idx_no_null::<T, _>(groups, |(first, idx)| {
588                    debug_assert!(idx.len() <= self.len());
589                    if idx.is_empty() {
590                        T::Native::zero()
591                    } else if idx.len() == 1 {
592                        arr.get(first as usize).unwrap_or(T::Native::zero())
593                    } else if no_nulls {
594                        if T::Native::is_float() {
595                            take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
596                                .fold(KahanSum::default(), |k, x| k + x)
597                                .sum()
598                        } else {
599                            take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
600                                .fold(T::Native::zero(), |a, b| a + b)
601                        }
602                    } else if T::Native::is_float() {
603                        take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
604                            .fold(KahanSum::default(), |k, x| k + x)
605                            .sum()
606                    } else {
607                        take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
608                            .fold(T::Native::zero(), |a, b| a + b)
609                    }
610                })
611            },
612            GroupsType::Slice {
613                groups,
614                overlapping,
615                monotonic,
616            } => {
617                if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
618                    let arr = self.downcast_iter().next().unwrap();
619                    let values = arr.values().as_slice();
620                    let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
621                    let arr = match arr.validity() {
622                        None => _rolling_apply_agg_window_no_nulls::<
623                            SumWindow<T::Native, T::Native>,
624                            _,
625                            _,
626                        >(values, offset_iter, None),
627                        Some(validity) => _rolling_apply_agg_window_nulls::<
628                            SumWindow<T::Native, T::Native>,
629                            _,
630                            _,
631                        >(
632                            values, validity, offset_iter, None
633                        ),
634                    };
635                    Self::from(arr).into_series()
636                } else {
637                    _agg_helper_slice_no_null::<T, _>(groups, |[first, len]| {
638                        debug_assert!(len <= self.len() as IdxSize);
639                        match len {
640                            0 => T::Native::zero(),
641                            1 => self.get(first as usize).unwrap_or(T::Native::zero()),
642                            _ => {
643                                let arr_group = _slice_from_offsets(self, first, len);
644                                arr_group.sum().unwrap_or(T::Native::zero())
645                            },
646                        }
647                    })
648                }
649            },
650        }
651    }
652}
653
654impl<T> SeriesWrap<ChunkedArray<T>>
655where
656    T: PolarsFloatType,
657    ChunkedArray<T>: ChunkVar
658        + VarAggSeries
659        + ChunkQuantile<T::Native>
660        + QuantileAggSeries
661        + ChunkAgg<T::Native>,
662    T::Native: Pow<T::Native, Output = T::Native>,
663{
664    pub(crate) unsafe fn agg_mean(&self, groups: &GroupsType) -> Series {
665        match groups {
666            GroupsType::Idx(groups) => {
667                let ca = self.rechunk();
668                let arr = ca.downcast_iter().next().unwrap();
669                let no_nulls = arr.null_count() == 0;
670                _agg_helper_idx::<T, _>(groups, |(first, idx)| {
671                    // this can fail due to a bug in lazy code.
672                    // here users can create filters in aggregations
673                    // and thereby creating shorter columns than the original group tuples.
674                    // the group tuples are modified, but if that's done incorrect there can be out of bounds
675                    // access
676                    debug_assert!(idx.len() <= self.len());
677                    let out = if idx.is_empty() {
678                        None
679                    } else if idx.len() == 1 {
680                        arr.get(first as usize).map(|sum| sum.to_f64().unwrap())
681                    } else if no_nulls {
682                        Some(
683                            take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
684                                .fold(KahanSum::default(), |a, b| {
685                                    a + b.to_f64().unwrap_unchecked()
686                                })
687                                .sum()
688                                / idx.len() as f64,
689                        )
690                    } else {
691                        take_agg_primitive_iter_unchecked_count_nulls(
692                            arr,
693                            idx2usize(idx),
694                            KahanSum::default(),
695                            |a, b| a + b.to_f64().unwrap_unchecked(),
696                            idx.len() as IdxSize,
697                        )
698                        .map(|(sum, null_count)| sum.sum() / (idx.len() as f64 - null_count as f64))
699                    };
700                    out.map(|flt| NumCast::from(flt).unwrap())
701                })
702            },
703            GroupsType::Slice {
704                groups,
705                overlapping,
706                monotonic,
707            } => {
708                if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
709                    let arr = self.downcast_iter().next().unwrap();
710                    let values = arr.values().as_slice();
711                    let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
712                    let arr = match arr.validity() {
713                        None => _rolling_apply_agg_window_no_nulls::<MeanWindow<_>, _, _>(
714                            values,
715                            offset_iter,
716                            None,
717                        ),
718                        Some(validity) => _rolling_apply_agg_window_nulls::<MeanWindow<_>, _, _>(
719                            values,
720                            validity,
721                            offset_iter,
722                            None,
723                        ),
724                    };
725                    ChunkedArray::<T>::from(arr).into_series()
726                } else {
727                    _agg_helper_slice::<T, _>(groups, |[first, len]| {
728                        debug_assert!(len <= self.len() as IdxSize);
729                        match len {
730                            0 => None,
731                            1 => self.get(first as usize),
732                            _ => {
733                                let arr_group = _slice_from_offsets(self, first, len);
734                                arr_group.mean().map(|flt| NumCast::from(flt).unwrap())
735                            },
736                        }
737                    })
738                }
739            },
740        }
741    }
742
743    pub(crate) unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series
744    where
745        <T as datatypes::PolarsNumericType>::Native: num_traits::Float,
746    {
747        let ca = &self.0.rechunk();
748        match groups {
749            GroupsType::Idx(groups) => {
750                let ca = ca.rechunk();
751                let arr = ca.downcast_iter().next().unwrap();
752                let no_nulls = arr.null_count() == 0;
753                agg_helper_idx_on_all::<T, _>(groups, |idx| {
754                    debug_assert!(idx.len() <= ca.len());
755                    if idx.is_empty() {
756                        return None;
757                    }
758                    let out = if no_nulls {
759                        take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
760                    } else {
761                        take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
762                    };
763                    out.map(|flt| NumCast::from(flt).unwrap())
764                })
765            },
766            GroupsType::Slice {
767                groups,
768                overlapping,
769                monotonic,
770            } => {
771                if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
772                    let arr = self.downcast_iter().next().unwrap();
773                    let values = arr.values().as_slice();
774                    let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
775                    let arr = match arr.validity() {
776                        None => _rolling_apply_agg_window_no_nulls::<
777                            MomentWindow<_, VarianceMoment>,
778                            _,
779                            _,
780                        >(
781                            values,
782                            offset_iter,
783                            Some(RollingFnParams::Var(RollingVarParams { ddof })),
784                        ),
785                        Some(validity) => _rolling_apply_agg_window_nulls::<
786                            rolling::nulls::MomentWindow<_, VarianceMoment>,
787                            _,
788                            _,
789                        >(
790                            values,
791                            validity,
792                            offset_iter,
793                            Some(RollingFnParams::Var(RollingVarParams { ddof })),
794                        ),
795                    };
796                    ChunkedArray::<T>::from(arr).into_series()
797                } else {
798                    _agg_helper_slice::<T, _>(groups, |[first, len]| {
799                        debug_assert!(len <= self.len() as IdxSize);
800                        match len {
801                            0 => None,
802                            1 => {
803                                if ddof == 0 {
804                                    NumCast::from(0)
805                                } else {
806                                    None
807                                }
808                            },
809                            _ => {
810                                let arr_group = _slice_from_offsets(self, first, len);
811                                arr_group.var(ddof).map(|flt| NumCast::from(flt).unwrap())
812                            },
813                        }
814                    })
815                }
816            },
817        }
818    }
819    pub(crate) unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series
820    where
821        <T as datatypes::PolarsNumericType>::Native: num_traits::Float,
822    {
823        let ca = &self.0.rechunk();
824        match groups {
825            GroupsType::Idx(groups) => {
826                let arr = ca.downcast_iter().next().unwrap();
827                let no_nulls = arr.null_count() == 0;
828                agg_helper_idx_on_all::<T, _>(groups, |idx| {
829                    debug_assert!(idx.len() <= ca.len());
830                    if idx.is_empty() {
831                        return None;
832                    }
833                    let out = if no_nulls {
834                        take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
835                    } else {
836                        take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
837                    };
838                    out.map(|flt| NumCast::from(flt.sqrt()).unwrap())
839                })
840            },
841            GroupsType::Slice {
842                groups,
843                overlapping,
844                monotonic,
845            } => {
846                if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
847                    let arr = ca.downcast_iter().next().unwrap();
848                    let values = arr.values().as_slice();
849                    let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
850                    let arr = match arr.validity() {
851                        None => _rolling_apply_agg_window_no_nulls::<
852                            MomentWindow<_, VarianceMoment>,
853                            _,
854                            _,
855                        >(
856                            values,
857                            offset_iter,
858                            Some(RollingFnParams::Var(RollingVarParams { ddof })),
859                        ),
860                        Some(validity) => _rolling_apply_agg_window_nulls::<
861                            rolling::nulls::MomentWindow<_, rolling::nulls::VarianceMoment>,
862                            _,
863                            _,
864                        >(
865                            values,
866                            validity,
867                            offset_iter,
868                            Some(RollingFnParams::Var(RollingVarParams { ddof })),
869                        ),
870                    };
871
872                    let mut ca = ChunkedArray::<T>::from(arr);
873                    ca.apply_mut(|v| v.powf(NumCast::from(0.5).unwrap()));
874                    ca.into_series()
875                } else {
876                    _agg_helper_slice::<T, _>(groups, |[first, len]| {
877                        debug_assert!(len <= self.len() as IdxSize);
878                        match len {
879                            0 => None,
880                            1 => {
881                                if ddof == 0 {
882                                    NumCast::from(0)
883                                } else {
884                                    None
885                                }
886                            },
887                            _ => {
888                                let arr_group = _slice_from_offsets(self, first, len);
889                                arr_group.std(ddof).map(|flt| NumCast::from(flt).unwrap())
890                            },
891                        }
892                    })
893                }
894            },
895        }
896    }
897}
898
899impl Float32Chunked {
900    pub(crate) unsafe fn agg_quantile(
901        &self,
902        groups: &GroupsType,
903        quantile: f64,
904        method: QuantileMethod,
905    ) -> Series {
906        agg_quantile_generic::<_, Float32Type>(self, groups, quantile, method)
907    }
908    pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
909        agg_median_generic::<_, Float32Type>(self, groups)
910    }
911}
912impl Float64Chunked {
913    pub(crate) unsafe fn agg_quantile(
914        &self,
915        groups: &GroupsType,
916        quantile: f64,
917        method: QuantileMethod,
918    ) -> Series {
919        agg_quantile_generic::<_, Float64Type>(self, groups, quantile, method)
920    }
921    pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
922        agg_median_generic::<_, Float64Type>(self, groups)
923    }
924}
925
926impl<T> ChunkedArray<T>
927where
928    T: PolarsIntegerType,
929    ChunkedArray<T>: ChunkAgg<T::Native> + ChunkVar,
930    T::Native: NumericNative + Ord,
931{
932    pub(crate) unsafe fn agg_mean(&self, groups: &GroupsType) -> Series {
933        match groups {
934            GroupsType::Idx(groups) => {
935                let ca = self.rechunk();
936                let arr = ca.downcast_get(0).unwrap();
937                _agg_helper_idx::<Float64Type, _>(groups, |(first, idx)| {
938                    // this can fail due to a bug in lazy code.
939                    // here users can create filters in aggregations
940                    // and thereby creating shorter columns than the original group tuples.
941                    // the group tuples are modified, but if that's done incorrect there can be out of bounds
942                    // access
943                    debug_assert!(idx.len() <= self.len());
944                    if idx.is_empty() {
945                        None
946                    } else if idx.len() == 1 {
947                        self.get(first as usize).map(|sum| sum.to_f64().unwrap())
948                    } else {
949                        match (self.has_nulls(), self.chunks.len()) {
950                            (false, 1) => Some(
951                                take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
952                                    .fold(KahanSum::default(), |a, b| a + b.to_f64().unwrap())
953                                    .sum()
954                                    / idx.len() as f64,
955                            ),
956                            (_, 1) => {
957                                take_agg_primitive_iter_unchecked_count_nulls(
958                                    arr,
959                                    idx2usize(idx),
960                                    KahanSum::default(),
961                                    |a, b| a + b.to_f64().unwrap(),
962                                    idx.len() as IdxSize,
963                                )
964                            }
965                            .map(|(sum, null_count)| {
966                                sum.sum() / (idx.len() as f64 - null_count as f64)
967                            }),
968                            _ => {
969                                let take = { self.take_unchecked(idx) };
970                                take.mean()
971                            },
972                        }
973                    }
974                })
975            },
976            GroupsType::Slice {
977                groups: groups_slice,
978                overlapping,
979                monotonic,
980            } => {
981                if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
982                    let ca = self
983                        .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
984                        .unwrap();
985                    ca.agg_mean(groups)
986                } else {
987                    _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
988                        debug_assert!(first + len <= self.len() as IdxSize);
989                        match len {
990                            0 => None,
991                            1 => self.get(first as usize).map(|v| NumCast::from(v).unwrap()),
992                            _ => {
993                                let arr_group = _slice_from_offsets(self, first, len);
994                                arr_group.mean()
995                            },
996                        }
997                    })
998                }
999            },
1000        }
1001    }
1002
1003    pub(crate) unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series {
1004        match groups {
1005            GroupsType::Idx(groups) => {
1006                let ca_self = self.rechunk();
1007                let arr = ca_self.downcast_iter().next().unwrap();
1008                let no_nulls = arr.null_count() == 0;
1009                agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
1010                    debug_assert!(idx.len() <= arr.len());
1011                    if idx.is_empty() {
1012                        return None;
1013                    }
1014                    if no_nulls {
1015                        take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1016                    } else {
1017                        take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1018                    }
1019                })
1020            },
1021            GroupsType::Slice {
1022                groups: groups_slice,
1023                overlapping,
1024                monotonic,
1025            } => {
1026                if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
1027                    let ca = self
1028                        .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
1029                        .unwrap();
1030                    ca.agg_var(groups, ddof)
1031                } else {
1032                    _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
1033                        debug_assert!(first + len <= self.len() as IdxSize);
1034                        match len {
1035                            0 => None,
1036                            1 => {
1037                                if ddof == 0 {
1038                                    NumCast::from(0)
1039                                } else {
1040                                    None
1041                                }
1042                            },
1043                            _ => {
1044                                let arr_group = _slice_from_offsets(self, first, len);
1045                                arr_group.var(ddof)
1046                            },
1047                        }
1048                    })
1049                }
1050            },
1051        }
1052    }
1053    pub(crate) unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series {
1054        match groups {
1055            GroupsType::Idx(groups) => {
1056                let ca_self = self.rechunk();
1057                let arr = ca_self.downcast_iter().next().unwrap();
1058                let no_nulls = arr.null_count() == 0;
1059                agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
1060                    debug_assert!(idx.len() <= self.len());
1061                    if idx.is_empty() {
1062                        return None;
1063                    }
1064                    let out = if no_nulls {
1065                        take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1066                    } else {
1067                        take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1068                    };
1069                    out.map(|v| v.sqrt())
1070                })
1071            },
1072            GroupsType::Slice {
1073                groups: groups_slice,
1074                overlapping,
1075                monotonic,
1076            } => {
1077                if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
1078                    let ca = self
1079                        .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
1080                        .unwrap();
1081                    ca.agg_std(groups, ddof)
1082                } else {
1083                    _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
1084                        debug_assert!(first + len <= self.len() as IdxSize);
1085                        match len {
1086                            0 => None,
1087                            1 => {
1088                                if ddof == 0 {
1089                                    NumCast::from(0)
1090                                } else {
1091                                    None
1092                                }
1093                            },
1094                            _ => {
1095                                let arr_group = _slice_from_offsets(self, first, len);
1096                                arr_group.std(ddof)
1097                            },
1098                        }
1099                    })
1100                }
1101            },
1102        }
1103    }
1104
1105    pub(crate) unsafe fn agg_quantile(
1106        &self,
1107        groups: &GroupsType,
1108        quantile: f64,
1109        method: QuantileMethod,
1110    ) -> Series {
1111        agg_quantile_generic::<_, Float64Type>(self, groups, quantile, method)
1112    }
1113    pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
1114        agg_median_generic::<_, Float64Type>(self, groups)
1115    }
1116}