1mod agg_list;
2mod boolean;
3#[cfg(feature = "dtype-categorical")]
4mod categorical;
5mod dispatch;
6mod string;
7
8use std::borrow::Cow;
9
10pub use agg_list::*;
11use arrow::bitmap::{Bitmap, MutableBitmap};
12use arrow::legacy::kernels::take_agg::*;
13use arrow::legacy::trusted_len::TrustedLenPush;
14use arrow::types::NativeType;
15use num_traits::pow::Pow;
16use num_traits::{Bounded, Float, Num, NumCast, ToPrimitive, Zero};
17use polars_compute::rolling::no_nulls::{
18 MaxWindow, MinWindow, MomentWindow, QuantileWindow, RollingAggWindowNoNulls,
19};
20use polars_compute::rolling::nulls::{RollingAggWindowNulls, VarianceMoment};
21use polars_compute::rolling::quantile_filter::SealedRolling;
22use polars_compute::rolling::{
23 self, ArgMaxWindow, ArgMinWindow, MeanWindow, QuantileMethod, RollingFnParams,
24 RollingQuantileParams, RollingVarParams, SumWindow, quantile_filter,
25};
26use polars_utils::arg_min_max::ArgMinMax;
27use polars_utils::float::IsFloat;
28#[cfg(feature = "dtype-f16")]
29use polars_utils::float16::pf16;
30use polars_utils::idx_vec::IdxVec;
31use polars_utils::kahan_sum::KahanSum;
32use polars_utils::min_max::MinMax;
33use rayon::prelude::*;
34
35use crate::POOL;
36use crate::chunked_array::cast::CastOptions;
37#[cfg(feature = "object")]
38use crate::chunked_array::object::extension::create_extension;
39use crate::chunked_array::{arg_max_numeric, arg_min_numeric};
40#[cfg(feature = "object")]
41use crate::frame::group_by::GroupsIndicator;
42use crate::prelude::*;
43use crate::series::IsSorted;
44use crate::series::implementations::SeriesWrap;
45use crate::utils::NoNull;
46
47fn idx2usize(idx: &[IdxSize]) -> impl ExactSizeIterator<Item = usize> + '_ {
48 idx.iter().map(|i| *i as usize)
49}
50
51pub fn _use_rolling_kernels(
58 groups: &GroupsSlice,
59 overlapping: bool,
60 monotonic: bool,
61 chunks: &[ArrayRef],
62) -> bool {
63 match groups.len() {
64 0 | 1 => false,
65 _ => overlapping && monotonic && chunks.len() == 1,
66 }
67}
68
69pub fn _rolling_apply_agg_window_nulls<Agg, T, O, Out>(
71 values: &[T],
72 validity: &Bitmap,
73 offsets: O,
74 params: Option<RollingFnParams>,
75) -> PrimitiveArray<Out>
76where
77 O: Iterator<Item = (IdxSize, IdxSize)> + TrustedLen,
78 Agg: RollingAggWindowNulls<T, Out>,
79 T: IsFloat + NativeType,
80 Out: NativeType,
81{
82 let output_len = offsets.size_hint().0;
85 let mut agg_window = Agg::new(values, validity, 0, 0, params, None);
87
88 let mut validity = MutableBitmap::with_capacity(output_len);
89 validity.extend_constant(output_len, true);
90
91 let out = offsets
92 .enumerate()
93 .map(|(idx, (start, len))| {
94 let end = start + len;
95
96 unsafe { agg_window.update(start as usize, end as usize) };
99 match agg_window.get_agg(idx) {
100 Some(val) => val,
101 None => {
102 unsafe { validity.set_unchecked(idx, false) };
104 Out::default()
105 },
106 }
107 })
108 .collect_trusted::<Vec<_>>();
109
110 PrimitiveArray::new(Out::PRIMITIVE.into(), out.into(), Some(validity.into()))
111}
112
113pub fn _rolling_apply_agg_window_no_nulls<Agg, T, O, Out>(
115 values: &[T],
116 offsets: O,
117 params: Option<RollingFnParams>,
118) -> PrimitiveArray<Out>
119where
120 Agg: RollingAggWindowNoNulls<T, Out>,
122 O: Iterator<Item = (IdxSize, IdxSize)> + TrustedLen,
123 T: IsFloat + NativeType,
124 Out: NativeType,
125{
126 let mut agg_window = Agg::new(values, 0, 0, params, None);
128
129 offsets
130 .enumerate()
131 .map(|(idx, (start, len))| {
132 let end = start + len;
133
134 unsafe { agg_window.update(start as usize, end as usize) };
136 agg_window.get_agg(idx)
137 })
138 .collect::<PrimitiveArray<Out>>()
139}
140
141pub fn _slice_from_offsets<T>(ca: &ChunkedArray<T>, first: IdxSize, len: IdxSize) -> ChunkedArray<T>
142where
143 T: PolarsDataType,
144{
145 ca.slice(first as i64, len as usize)
146}
147
148pub fn _agg_helper_idx<T, F>(groups: &GroupsIdx, f: F) -> Series
150where
151 F: Fn((IdxSize, &IdxVec)) -> Option<T::Native> + Send + Sync,
152 T: PolarsNumericType,
153{
154 let ca: ChunkedArray<T> = POOL.install(|| groups.into_par_iter().map(f).collect());
155 ca.into_series()
156}
157
158pub fn _agg_helper_idx_no_null<T, F>(groups: &GroupsIdx, f: F) -> Series
160where
161 F: Fn((IdxSize, &IdxVec)) -> T::Native + Send + Sync,
162 T: PolarsNumericType,
163{
164 let ca: NoNull<ChunkedArray<T>> = POOL.install(|| groups.into_par_iter().map(f).collect());
165 ca.into_inner().into_series()
166}
167
168fn agg_helper_idx_on_all<T, F>(groups: &GroupsIdx, f: F) -> Series
171where
172 F: Fn(&IdxVec) -> Option<T::Native> + Send + Sync,
173 T: PolarsNumericType,
174{
175 let ca: ChunkedArray<T> = POOL.install(|| groups.all().into_par_iter().map(f).collect());
176 ca.into_series()
177}
178
179pub fn _agg_helper_slice<T, F>(groups: &[[IdxSize; 2]], f: F) -> Series
180where
181 F: Fn([IdxSize; 2]) -> Option<T::Native> + Send + Sync,
182 T: PolarsNumericType,
183{
184 let ca: ChunkedArray<T> = POOL.install(|| groups.par_iter().copied().map(f).collect());
185 ca.into_series()
186}
187
188pub fn _agg_helper_idx_idx<'a, F>(groups: &'a GroupsIdx, f: F) -> Series
189where
190 F: Fn((IdxSize, &'a IdxVec)) -> Option<IdxSize> + Send + Sync,
191{
192 let ca: IdxCa = POOL.install(|| groups.into_par_iter().map(f).collect());
193 ca.into_series()
194}
195
196pub fn _agg_helper_slice_idx<F>(groups: &[[IdxSize; 2]], f: F) -> Series
197where
198 F: Fn([IdxSize; 2]) -> Option<IdxSize> + Send + Sync,
199{
200 let ca: IdxCa = POOL.install(|| groups.par_iter().copied().map(f).collect());
201 ca.into_series()
202}
203
204pub fn _agg_helper_slice_no_null<T, F>(groups: &[[IdxSize; 2]], f: F) -> Series
205where
206 F: Fn([IdxSize; 2]) -> T::Native + Send + Sync,
207 T: PolarsNumericType,
208{
209 let ca: NoNull<ChunkedArray<T>> = POOL.install(|| groups.par_iter().copied().map(f).collect());
210 ca.into_inner().into_series()
211}
212
213trait QuantileDispatcher<K> {
217 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<K>>;
218
219 fn _median(self) -> Option<K>;
220}
221
222impl<T> QuantileDispatcher<f64> for ChunkedArray<T>
223where
224 T: PolarsIntegerType,
225 T::Native: Ord,
226{
227 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f64>> {
228 self.quantile_faster(quantile, method)
229 }
230 fn _median(self) -> Option<f64> {
231 self.median_faster()
232 }
233}
234
235#[cfg(feature = "dtype-f16")]
236impl QuantileDispatcher<pf16> for Float16Chunked {
237 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<pf16>> {
238 self.quantile_faster(quantile, method)
239 }
240 fn _median(self) -> Option<pf16> {
241 self.median_faster()
242 }
243}
244
245impl QuantileDispatcher<f32> for Float32Chunked {
246 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f32>> {
247 self.quantile_faster(quantile, method)
248 }
249 fn _median(self) -> Option<f32> {
250 self.median_faster()
251 }
252}
253impl QuantileDispatcher<f64> for Float64Chunked {
254 fn _quantile(self, quantile: f64, method: QuantileMethod) -> PolarsResult<Option<f64>> {
255 self.quantile_faster(quantile, method)
256 }
257 fn _median(self) -> Option<f64> {
258 self.median_faster()
259 }
260}
261
262unsafe fn agg_quantile_generic<T, K>(
263 ca: &ChunkedArray<T>,
264 groups: &GroupsType,
265 quantile: f64,
266 method: QuantileMethod,
267) -> Series
268where
269 T: PolarsNumericType,
270 ChunkedArray<T>: QuantileDispatcher<K::Native>,
271 K: PolarsNumericType,
272 <K as datatypes::PolarsNumericType>::Native: num_traits::Float + quantile_filter::SealedRolling,
273{
274 let invalid_quantile = !(0.0..=1.0).contains(&quantile);
275 if invalid_quantile {
276 return Series::full_null(ca.name().clone(), groups.len(), ca.dtype());
277 }
278 match groups {
279 GroupsType::Idx(groups) => {
280 let ca = ca.rechunk();
281 agg_helper_idx_on_all::<K, _>(groups, |idx| {
282 debug_assert!(idx.len() <= ca.len());
283 if idx.is_empty() {
284 return None;
285 }
286 let take = { ca.take_unchecked(idx) };
287 take._quantile(quantile, method).unwrap_unchecked()
289 })
290 },
291 GroupsType::Slice {
292 groups,
293 overlapping,
294 monotonic,
295 } => {
296 if _use_rolling_kernels(groups, *overlapping, *monotonic, ca.chunks()) {
297 let s = ca
299 .cast_with_options(&K::get_static_dtype(), CastOptions::Overflowing)
300 .unwrap();
301 let ca: &ChunkedArray<K> = s.as_ref().as_ref();
302 let arr = ca.downcast_iter().next().unwrap();
303 let values = arr.values().as_slice();
304 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
305 let arr = match arr.validity() {
306 None => _rolling_apply_agg_window_no_nulls::<QuantileWindow<_>, _, _, _>(
307 values,
308 offset_iter,
309 Some(RollingFnParams::Quantile(RollingQuantileParams {
310 prob: quantile,
311 method,
312 })),
313 ),
314 Some(validity) => {
315 _rolling_apply_agg_window_nulls::<rolling::nulls::QuantileWindow<_>, _, _, _>(
316 values,
317 validity,
318 offset_iter,
319 Some(RollingFnParams::Quantile(RollingQuantileParams {
320 prob: quantile,
321 method,
322 })),
323 )
324 },
325 };
326 ChunkedArray::<K>::with_chunk(PlSmallStr::EMPTY, arr).into_series()
329 } else {
330 _agg_helper_slice::<K, _>(groups, |[first, len]| {
331 debug_assert!(first + len <= ca.len() as IdxSize);
332 match len {
333 0 => None,
334 1 => ca.get(first as usize).map(|v| NumCast::from(v).unwrap()),
335 _ => {
336 let arr_group = _slice_from_offsets(ca, first, len);
337 arr_group
339 ._quantile(quantile, method)
340 .unwrap_unchecked()
341 .map(|flt| NumCast::from(flt).unwrap_unchecked())
342 },
343 }
344 })
345 }
346 },
347 }
348}
349
350unsafe fn agg_median_generic<T, K>(ca: &ChunkedArray<T>, groups: &GroupsType) -> Series
351where
352 T: PolarsNumericType,
353 ChunkedArray<T>: QuantileDispatcher<K::Native>,
354 K: PolarsNumericType,
355 <K as datatypes::PolarsNumericType>::Native: num_traits::Float + SealedRolling,
356{
357 match groups {
358 GroupsType::Idx(groups) => {
359 let ca = ca.rechunk();
360 agg_helper_idx_on_all::<K, _>(groups, |idx| {
361 debug_assert!(idx.len() <= ca.len());
362 if idx.is_empty() {
363 return None;
364 }
365 let take = { ca.take_unchecked(idx) };
366 take._median()
367 })
368 },
369 GroupsType::Slice { .. } => {
370 agg_quantile_generic::<T, K>(ca, groups, 0.5, QuantileMethod::Linear)
371 },
372 }
373}
374
375#[cfg(feature = "bitwise")]
379unsafe fn bitwise_agg<T: PolarsNumericType>(
380 ca: &ChunkedArray<T>,
381 groups: &GroupsType,
382 f: fn(&ChunkedArray<T>) -> Option<T::Native>,
383) -> Series
384where
385 ChunkedArray<T>: ChunkTakeUnchecked<[IdxSize]> + ChunkBitwiseReduce<Physical = T::Native>,
386{
387 let s = if groups.len() > 1 {
390 ca.rechunk()
391 } else {
392 Cow::Borrowed(ca)
393 };
394
395 match groups {
396 GroupsType::Idx(groups) => agg_helper_idx_on_all::<T, _>(groups, |idx| {
397 debug_assert!(idx.len() <= s.len());
398 if idx.is_empty() {
399 None
400 } else {
401 let take = unsafe { s.take_unchecked(idx) };
402 f(&take)
403 }
404 }),
405 GroupsType::Slice { groups, .. } => _agg_helper_slice::<T, _>(groups, |[first, len]| {
406 debug_assert!(len <= s.len() as IdxSize);
407 if len == 0 {
408 None
409 } else {
410 let take = _slice_from_offsets(&s, first, len);
411 f(&take)
412 }
413 }),
414 }
415}
416
417#[cfg(feature = "bitwise")]
418impl<T> ChunkedArray<T>
419where
420 T: PolarsNumericType,
421 ChunkedArray<T>: ChunkTakeUnchecked<[IdxSize]> + ChunkBitwiseReduce<Physical = T::Native>,
422{
423 pub(crate) unsafe fn agg_and(&self, groups: &GroupsType) -> Series {
427 unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::and_reduce) }
428 }
429
430 pub(crate) unsafe fn agg_or(&self, groups: &GroupsType) -> Series {
434 unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::or_reduce) }
435 }
436
437 pub(crate) unsafe fn agg_xor(&self, groups: &GroupsType) -> Series {
441 unsafe { bitwise_agg(self, groups, ChunkBitwiseReduce::xor_reduce) }
442 }
443}
444
445impl<T> ChunkedArray<T>
446where
447 T: PolarsNumericType + Sync,
448 T::Native: NativeType + PartialOrd + Num + NumCast + Zero + Bounded + std::iter::Sum<T::Native>,
449 ChunkedArray<T>: ChunkAgg<T::Native>,
450{
451 pub(crate) unsafe fn agg_min(&self, groups: &GroupsType) -> Series {
452 if groups.is_sorted_flag() {
454 match self.is_sorted_flag() {
455 IsSorted::Ascending => {
456 return self.clone().into_series().agg_first_non_null(groups);
457 },
458 IsSorted::Descending => {
459 return self.clone().into_series().agg_last_non_null(groups);
460 },
461 _ => {},
462 }
463 }
464
465 match groups {
466 GroupsType::Idx(groups) => {
467 let ca = self.rechunk();
468 let arr = ca.downcast_iter().next().unwrap();
469 let no_nulls = arr.null_count() == 0;
470 _agg_helper_idx::<T, _>(groups, |(first, idx)| {
471 debug_assert!(idx.len() <= arr.len());
472 if idx.is_empty() {
473 None
474 } else if idx.len() == 1 {
475 arr.get(first as usize)
476 } else if no_nulls {
477 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
478 .reduce(|a, b| a.min_ignore_nan(b))
479 } else {
480 take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
481 .reduce(|a, b| a.min_ignore_nan(b))
482 }
483 })
484 },
485 GroupsType::Slice {
486 groups: groups_slice,
487 overlapping,
488 monotonic,
489 } => {
490 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
491 let arr = self.downcast_iter().next().unwrap();
492 let values = arr.values().as_slice();
493 let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
494 let arr = match arr.validity() {
495 None => _rolling_apply_agg_window_no_nulls::<MinWindow<_>, _, _, _>(
496 values,
497 offset_iter,
498 None,
499 ),
500 Some(validity) => {
501 _rolling_apply_agg_window_nulls::<rolling::nulls::MinWindow<_>, _, _, _>(
502 values,
503 validity,
504 offset_iter,
505 None,
506 )
507 },
508 };
509 Self::from(arr).into_series()
510 } else {
511 _agg_helper_slice::<T, _>(groups_slice, |[first, len]| {
512 debug_assert!(len <= self.len() as IdxSize);
513 match len {
514 0 => None,
515 1 => self.get(first as usize),
516 _ => {
517 let arr_group = _slice_from_offsets(self, first, len);
518 ChunkAgg::min(&arr_group)
519 },
520 }
521 })
522 }
523 },
524 }
525 }
526
527 pub(crate) unsafe fn agg_arg_min(&self, groups: &GroupsType) -> Series
528 where
529 for<'b> &'b [T::Native]: ArgMinMax,
530 {
531 if groups.is_sorted_flag() {
532 match self.is_sorted_flag() {
533 IsSorted::Ascending => {
534 return self.clone().into_series().agg_arg_first_non_null(groups);
535 },
536 IsSorted::Descending => {
537 return self.clone().into_series().agg_arg_last_non_null(groups);
538 },
539 _ => {},
540 }
541 }
542
543 match groups {
544 GroupsType::Idx(groups) => {
545 let ca = self.rechunk();
546 let arr = ca.downcast_iter().next().unwrap();
547 let no_nulls = !arr.has_nulls();
548
549 agg_helper_idx_on_all::<IdxType, _>(groups, |idx| {
550 if idx.is_empty() {
551 return None;
552 }
553
554 if no_nulls {
555 let first_i = idx[0] as usize;
556 let mut best_pos: IdxSize = 0;
557 let mut best_val: T::Native = unsafe { arr.value_unchecked(first_i) };
558
559 for (pos, &i) in idx.iter().enumerate().skip(1) {
560 let v = unsafe { arr.value_unchecked(i as usize) };
561 if v.nan_max_lt(&best_val) {
562 best_val = v;
563 best_pos = pos as IdxSize;
564 }
565 }
566 Some(best_pos)
567 } else {
568 let (start_pos, mut best_val) = idx
569 .iter()
570 .enumerate()
571 .find_map(|(pos, &i)| arr.get(i as usize).map(|v| (pos, v)))?;
572
573 let mut best_pos: IdxSize = start_pos as IdxSize;
574
575 for (pos, &i) in idx.iter().enumerate().skip(start_pos + 1) {
576 if let Some(v) = arr.get(i as usize) {
577 if v.nan_max_lt(&best_val) {
578 best_val = v;
579 best_pos = pos as IdxSize;
580 }
581 }
582 }
583
584 Some(best_pos)
585 }
586 })
587 },
588 GroupsType::Slice {
589 groups: groups_slice,
590 overlapping,
591 monotonic,
592 } => {
593 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
594 let arr = self.downcast_as_array();
595 let values = arr.values().as_slice();
596 let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
597 let idx_arr = match arr.validity() {
598 None => {
599 _rolling_apply_agg_window_no_nulls::<ArgMinWindow<_>, _, _, IdxSize>(
600 values,
601 offset_iter,
602 None,
603 )
604 },
605 Some(validity) => {
606 _rolling_apply_agg_window_nulls::<ArgMinWindow<_>, _, _, IdxSize>(
607 values,
608 validity,
609 offset_iter,
610 None,
611 )
612 },
613 };
614
615 IdxCa::from(idx_arr).into_series()
616 } else {
617 _agg_helper_slice::<IdxType, _>(groups_slice, |[first, len]| {
618 debug_assert!(len <= self.len() as IdxSize);
619 match len {
620 0 => None,
621 1 => Some(0 as IdxSize),
622 _ => {
623 let group_ca = _slice_from_offsets(self, first, len);
624 let pos_in_group: Option<usize> = arg_min_numeric(&group_ca);
625 pos_in_group.map(|p| p as IdxSize)
626 },
627 }
628 })
629 }
630 },
631 }
632 }
633
634 pub(crate) unsafe fn agg_max(&self, groups: &GroupsType) -> Series {
635 if groups.is_sorted_flag() {
637 match self.is_sorted_flag() {
638 IsSorted::Ascending => return self.clone().into_series().agg_last_non_null(groups),
639 IsSorted::Descending => {
640 return self.clone().into_series().agg_first_non_null(groups);
641 },
642 _ => {},
643 }
644 }
645
646 match groups {
647 GroupsType::Idx(groups) => {
648 let ca = self.rechunk();
649 let arr = ca.downcast_iter().next().unwrap();
650 let no_nulls = arr.null_count() == 0;
651 _agg_helper_idx::<T, _>(groups, |(first, idx)| {
652 debug_assert!(idx.len() <= arr.len());
653 if idx.is_empty() {
654 None
655 } else if idx.len() == 1 {
656 arr.get(first as usize)
657 } else if no_nulls {
658 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
659 .reduce(|a, b| a.max_ignore_nan(b))
660 } else {
661 take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
662 .reduce(|a, b| a.max_ignore_nan(b))
663 }
664 })
665 },
666 GroupsType::Slice {
667 groups: groups_slice,
668 overlapping,
669 monotonic,
670 } => {
671 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
672 let arr = self.downcast_iter().next().unwrap();
673 let values = arr.values().as_slice();
674 let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
675 let arr = match arr.validity() {
676 None => _rolling_apply_agg_window_no_nulls::<MaxWindow<_>, _, _, _>(
677 values,
678 offset_iter,
679 None,
680 ),
681 Some(validity) => {
682 _rolling_apply_agg_window_nulls::<rolling::nulls::MaxWindow<_>, _, _, _>(
683 values,
684 validity,
685 offset_iter,
686 None,
687 )
688 },
689 };
690 Self::from(arr).into_series()
691 } else {
692 _agg_helper_slice::<T, _>(groups_slice, |[first, len]| {
693 debug_assert!(len <= self.len() as IdxSize);
694 match len {
695 0 => None,
696 1 => self.get(first as usize),
697 _ => {
698 let arr_group = _slice_from_offsets(self, first, len);
699 ChunkAgg::max(&arr_group)
700 },
701 }
702 })
703 }
704 },
705 }
706 }
707
708 pub(crate) unsafe fn agg_arg_max(&self, groups: &GroupsType) -> Series
709 where
710 for<'b> &'b [T::Native]: ArgMinMax,
711 {
712 if groups.is_sorted_flag() {
713 match self.is_sorted_flag() {
714 IsSorted::Ascending => {
715 return self.clone().into_series().agg_arg_last_non_null(groups);
716 },
717 IsSorted::Descending => {
718 return self.clone().into_series().agg_arg_first_non_null(groups);
719 },
720 _ => {},
721 }
722 }
723 match groups {
724 GroupsType::Idx(groups) => {
725 let ca = self.rechunk();
726 let arr = ca.downcast_as_array();
727 let no_nulls = arr.null_count() == 0;
728
729 agg_helper_idx_on_all::<IdxType, _>(groups, |idx| {
730 if idx.is_empty() {
731 return None;
732 }
733
734 if no_nulls {
735 let first_i = idx[0] as usize;
736 let mut best_pos: IdxSize = 0;
737 let mut best_val: T::Native = unsafe { arr.value_unchecked(first_i) };
738
739 for (pos, &i) in idx.iter().enumerate().skip(1) {
740 let v = unsafe { arr.value_unchecked(i as usize) };
741
742 if v.nan_min_gt(&best_val) {
743 best_val = v;
744 best_pos = pos as IdxSize;
745 }
746 }
747
748 Some(best_pos)
749 } else {
750 let (start_pos, mut best_val) = idx
751 .iter()
752 .enumerate()
753 .find_map(|(pos, &i)| arr.get(i as usize).map(|v| (pos, v)))?;
754
755 let mut best_pos: IdxSize = start_pos as IdxSize;
756
757 for (pos, &i) in idx.iter().enumerate().skip(start_pos + 1) {
758 if let Some(v) = arr.get(i as usize) {
759 if v.nan_min_gt(&best_val) {
760 best_val = v;
761 best_pos = pos as IdxSize;
762 }
763 }
764 }
765
766 Some(best_pos)
767 }
768 })
769 },
770
771 GroupsType::Slice {
772 groups: groups_slice,
773 overlapping,
774 monotonic,
775 } => {
776 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
777 let arr = self.downcast_iter().next().unwrap();
778 let values = arr.values().as_slice();
779 let offset_iter = groups_slice.iter().map(|[first, len]| (*first, *len));
780 let idx_arr = match arr.validity() {
781 None => {
782 _rolling_apply_agg_window_no_nulls::<ArgMaxWindow<_>, _, _, IdxSize>(
783 values,
784 offset_iter,
785 None,
786 )
787 },
788 Some(validity) => {
789 _rolling_apply_agg_window_nulls::<ArgMaxWindow<_>, _, _, IdxSize>(
790 values,
791 validity,
792 offset_iter,
793 None,
794 )
795 },
796 };
797 IdxCa::from(idx_arr).into_series()
798 } else {
799 _agg_helper_slice::<IdxType, _>(groups_slice, |[first, len]| {
800 debug_assert!(len <= self.len() as IdxSize);
801 match len {
802 0 => None,
803 1 => Some(0 as IdxSize),
804 _ => {
805 let group_ca = _slice_from_offsets(self, first, len);
806 let pos_in_group: Option<usize> = arg_max_numeric(&group_ca);
807 pos_in_group.map(|p| p as IdxSize)
808 },
809 }
810 })
811 }
812 },
813 }
814 }
815 pub(crate) unsafe fn agg_sum(&self, groups: &GroupsType) -> Series {
816 match groups {
817 GroupsType::Idx(groups) => {
818 let ca = self.rechunk();
819 let arr = ca.downcast_iter().next().unwrap();
820 let no_nulls = arr.null_count() == 0;
821 _agg_helper_idx_no_null::<T, _>(groups, |(first, idx)| {
822 debug_assert!(idx.len() <= self.len());
823 if idx.is_empty() {
824 T::Native::zero()
825 } else if idx.len() == 1 {
826 arr.get(first as usize).unwrap_or(T::Native::zero())
827 } else if no_nulls {
828 if T::Native::is_float() {
829 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
830 .fold(KahanSum::default(), |k, x| k + x)
831 .sum()
832 } else {
833 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
834 .fold(T::Native::zero(), |a, b| a + b)
835 }
836 } else if T::Native::is_float() {
837 take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
838 .fold(KahanSum::default(), |k, x| k + x)
839 .sum()
840 } else {
841 take_agg_primitive_iter_unchecked(arr, idx2usize(idx))
842 .fold(T::Native::zero(), |a, b| a + b)
843 }
844 })
845 },
846 GroupsType::Slice {
847 groups,
848 overlapping,
849 monotonic,
850 } => {
851 if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
852 let arr = self.downcast_iter().next().unwrap();
853 let values = arr.values().as_slice();
854 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
855 let arr = match arr.validity() {
856 None => _rolling_apply_agg_window_no_nulls::<
857 SumWindow<T::Native, T::Native>,
858 _,
859 _,
860 _,
861 >(values, offset_iter, None),
862 Some(validity) => {
863 _rolling_apply_agg_window_nulls::<
864 SumWindow<T::Native, T::Native>,
865 _,
866 _,
867 _,
868 >(values, validity, offset_iter, None)
869 },
870 };
871 Self::from(arr).into_series()
872 } else {
873 _agg_helper_slice_no_null::<T, _>(groups, |[first, len]| {
874 debug_assert!(len <= self.len() as IdxSize);
875 match len {
876 0 => T::Native::zero(),
877 1 => self.get(first as usize).unwrap_or(T::Native::zero()),
878 _ => {
879 let arr_group = _slice_from_offsets(self, first, len);
880 arr_group.sum().unwrap_or(T::Native::zero())
881 },
882 }
883 })
884 }
885 },
886 }
887 }
888}
889
890impl<T> SeriesWrap<ChunkedArray<T>>
891where
892 T: PolarsFloatType,
893 ChunkedArray<T>: ChunkVar
894 + VarAggSeries
895 + ChunkQuantile<T::Native>
896 + QuantileAggSeries
897 + ChunkAgg<T::Native>,
898 T::Native: Pow<T::Native, Output = T::Native>,
899{
900 pub(crate) unsafe fn agg_mean(&self, groups: &GroupsType) -> Series {
901 match groups {
902 GroupsType::Idx(groups) => {
903 let ca = self.rechunk();
904 let arr = ca.downcast_iter().next().unwrap();
905 let no_nulls = arr.null_count() == 0;
906 _agg_helper_idx::<T, _>(groups, |(first, idx)| {
907 debug_assert!(idx.len() <= self.len());
913 let out = if idx.is_empty() {
914 None
915 } else if idx.len() == 1 {
916 arr.get(first as usize).map(|sum| sum.to_f64().unwrap())
917 } else if no_nulls {
918 Some(
919 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
920 .fold(KahanSum::default(), |a, b| {
921 a + b.to_f64().unwrap_unchecked()
922 })
923 .sum()
924 / idx.len() as f64,
925 )
926 } else {
927 take_agg_primitive_iter_unchecked_count_nulls(
928 arr,
929 idx2usize(idx),
930 KahanSum::default(),
931 |a, b| a + b.to_f64().unwrap_unchecked(),
932 idx.len() as IdxSize,
933 )
934 .map(|(sum, null_count)| sum.sum() / (idx.len() as f64 - null_count as f64))
935 };
936 out.map(|flt| NumCast::from(flt).unwrap())
937 })
938 },
939 GroupsType::Slice {
940 groups,
941 overlapping,
942 monotonic,
943 } => {
944 if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
945 let arr = self.downcast_iter().next().unwrap();
946 let values = arr.values().as_slice();
947 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
948 let arr = match arr.validity() {
949 None => _rolling_apply_agg_window_no_nulls::<MeanWindow<_>, _, _, _>(
950 values,
951 offset_iter,
952 None,
953 ),
954 Some(validity) => {
955 _rolling_apply_agg_window_nulls::<MeanWindow<_>, _, _, _>(
956 values,
957 validity,
958 offset_iter,
959 None,
960 )
961 },
962 };
963 ChunkedArray::<T>::from(arr).into_series()
964 } else {
965 _agg_helper_slice::<T, _>(groups, |[first, len]| {
966 debug_assert!(len <= self.len() as IdxSize);
967 match len {
968 0 => None,
969 1 => self.get(first as usize),
970 _ => {
971 let arr_group = _slice_from_offsets(self, first, len);
972 arr_group.mean().map(|flt| NumCast::from(flt).unwrap())
973 },
974 }
975 })
976 }
977 },
978 }
979 }
980
981 pub(crate) unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series
982 where
983 <T as datatypes::PolarsNumericType>::Native: num_traits::Float,
984 {
985 let ca = &self.0.rechunk();
986 match groups {
987 GroupsType::Idx(groups) => {
988 let ca = ca.rechunk();
989 let arr = ca.downcast_iter().next().unwrap();
990 let no_nulls = arr.null_count() == 0;
991 agg_helper_idx_on_all::<T, _>(groups, |idx| {
992 debug_assert!(idx.len() <= ca.len());
993 if idx.is_empty() {
994 return None;
995 }
996 let out = if no_nulls {
997 take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
998 } else {
999 take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1000 };
1001 out.map(|flt| NumCast::from(flt).unwrap())
1002 })
1003 },
1004 GroupsType::Slice {
1005 groups,
1006 overlapping,
1007 monotonic,
1008 } => {
1009 if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
1010 let arr = self.downcast_iter().next().unwrap();
1011 let values = arr.values().as_slice();
1012 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
1013 let arr = match arr.validity() {
1014 None => _rolling_apply_agg_window_no_nulls::<
1015 MomentWindow<_, VarianceMoment>,
1016 _,
1017 _,
1018 _,
1019 >(
1020 values,
1021 offset_iter,
1022 Some(RollingFnParams::Var(RollingVarParams { ddof })),
1023 ),
1024 Some(validity) => _rolling_apply_agg_window_nulls::<
1025 rolling::nulls::MomentWindow<_, VarianceMoment>,
1026 _,
1027 _,
1028 _,
1029 >(
1030 values,
1031 validity,
1032 offset_iter,
1033 Some(RollingFnParams::Var(RollingVarParams { ddof })),
1034 ),
1035 };
1036 ChunkedArray::<T>::from(arr).into_series()
1037 } else {
1038 _agg_helper_slice::<T, _>(groups, |[first, len]| {
1039 debug_assert!(len <= self.len() as IdxSize);
1040 match len {
1041 0 => None,
1042 1 => {
1043 if ddof == 0 {
1044 NumCast::from(0)
1045 } else {
1046 None
1047 }
1048 },
1049 _ => {
1050 let arr_group = _slice_from_offsets(self, first, len);
1051 arr_group.var(ddof).map(|flt| NumCast::from(flt).unwrap())
1052 },
1053 }
1054 })
1055 }
1056 },
1057 }
1058 }
1059 pub(crate) unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series
1060 where
1061 <T as datatypes::PolarsNumericType>::Native: num_traits::Float,
1062 {
1063 let ca = &self.0.rechunk();
1064 match groups {
1065 GroupsType::Idx(groups) => {
1066 let arr = ca.downcast_iter().next().unwrap();
1067 let no_nulls = arr.null_count() == 0;
1068 agg_helper_idx_on_all::<T, _>(groups, |idx| {
1069 debug_assert!(idx.len() <= ca.len());
1070 if idx.is_empty() {
1071 return None;
1072 }
1073 let out = if no_nulls {
1074 take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1075 } else {
1076 take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1077 };
1078 out.map(|flt| NumCast::from(flt.sqrt()).unwrap())
1079 })
1080 },
1081 GroupsType::Slice {
1082 groups,
1083 overlapping,
1084 monotonic,
1085 } => {
1086 if _use_rolling_kernels(groups, *overlapping, *monotonic, self.chunks()) {
1087 let arr = ca.downcast_iter().next().unwrap();
1088 let values = arr.values().as_slice();
1089 let offset_iter = groups.iter().map(|[first, len]| (*first, *len));
1090 let arr = match arr.validity() {
1091 None => _rolling_apply_agg_window_no_nulls::<
1092 MomentWindow<_, VarianceMoment>,
1093 _,
1094 _,
1095 _,
1096 >(
1097 values,
1098 offset_iter,
1099 Some(RollingFnParams::Var(RollingVarParams { ddof })),
1100 ),
1101 Some(validity) => _rolling_apply_agg_window_nulls::<
1102 rolling::nulls::MomentWindow<_, rolling::nulls::VarianceMoment>,
1103 _,
1104 _,
1105 _,
1106 >(
1107 values,
1108 validity,
1109 offset_iter,
1110 Some(RollingFnParams::Var(RollingVarParams { ddof })),
1111 ),
1112 };
1113
1114 let mut ca = ChunkedArray::<T>::from(arr);
1115 ca.apply_mut(|v| v.powf(NumCast::from(0.5).unwrap()));
1116 ca.into_series()
1117 } else {
1118 _agg_helper_slice::<T, _>(groups, |[first, len]| {
1119 debug_assert!(len <= self.len() as IdxSize);
1120 match len {
1121 0 => None,
1122 1 => {
1123 if ddof == 0 {
1124 NumCast::from(0)
1125 } else {
1126 None
1127 }
1128 },
1129 _ => {
1130 let arr_group = _slice_from_offsets(self, first, len);
1131 arr_group.std(ddof).map(|flt| NumCast::from(flt).unwrap())
1132 },
1133 }
1134 })
1135 }
1136 },
1137 }
1138 }
1139}
1140
1141impl Float32Chunked {
1142 pub(crate) unsafe fn agg_quantile(
1143 &self,
1144 groups: &GroupsType,
1145 quantile: f64,
1146 method: QuantileMethod,
1147 ) -> Series {
1148 agg_quantile_generic::<_, Float32Type>(self, groups, quantile, method)
1149 }
1150 pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
1151 agg_median_generic::<_, Float32Type>(self, groups)
1152 }
1153}
1154impl Float64Chunked {
1155 pub(crate) unsafe fn agg_quantile(
1156 &self,
1157 groups: &GroupsType,
1158 quantile: f64,
1159 method: QuantileMethod,
1160 ) -> Series {
1161 agg_quantile_generic::<_, Float64Type>(self, groups, quantile, method)
1162 }
1163 pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
1164 agg_median_generic::<_, Float64Type>(self, groups)
1165 }
1166}
1167
1168impl<T> ChunkedArray<T>
1169where
1170 T: PolarsIntegerType,
1171 ChunkedArray<T>: ChunkAgg<T::Native> + ChunkVar,
1172 T::Native: NumericNative + Ord,
1173{
1174 pub(crate) unsafe fn agg_mean(&self, groups: &GroupsType) -> Series {
1175 match groups {
1176 GroupsType::Idx(groups) => {
1177 let ca = self.rechunk();
1178 let arr = ca.downcast_get(0).unwrap();
1179 _agg_helper_idx::<Float64Type, _>(groups, |(first, idx)| {
1180 debug_assert!(idx.len() <= self.len());
1186 if idx.is_empty() {
1187 None
1188 } else if idx.len() == 1 {
1189 self.get(first as usize).map(|sum| sum.to_f64().unwrap())
1190 } else {
1191 match (self.has_nulls(), self.chunks.len()) {
1192 (false, 1) => Some(
1193 take_agg_no_null_primitive_iter_unchecked(arr, idx2usize(idx))
1194 .fold(KahanSum::default(), |a, b| a + b.to_f64().unwrap())
1195 .sum()
1196 / idx.len() as f64,
1197 ),
1198 (_, 1) => {
1199 take_agg_primitive_iter_unchecked_count_nulls(
1200 arr,
1201 idx2usize(idx),
1202 KahanSum::default(),
1203 |a, b| a + b.to_f64().unwrap(),
1204 idx.len() as IdxSize,
1205 )
1206 }
1207 .map(|(sum, null_count)| {
1208 sum.sum() / (idx.len() as f64 - null_count as f64)
1209 }),
1210 _ => {
1211 let take = { self.take_unchecked(idx) };
1212 take.mean()
1213 },
1214 }
1215 }
1216 })
1217 },
1218 GroupsType::Slice {
1219 groups: groups_slice,
1220 overlapping,
1221 monotonic,
1222 } => {
1223 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
1224 let ca = self
1225 .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
1226 .unwrap();
1227 ca.agg_mean(groups)
1228 } else {
1229 _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
1230 debug_assert!(first + len <= self.len() as IdxSize);
1231 match len {
1232 0 => None,
1233 1 => self.get(first as usize).map(|v| NumCast::from(v).unwrap()),
1234 _ => {
1235 let arr_group = _slice_from_offsets(self, first, len);
1236 arr_group.mean()
1237 },
1238 }
1239 })
1240 }
1241 },
1242 }
1243 }
1244
1245 pub(crate) unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series {
1246 match groups {
1247 GroupsType::Idx(groups) => {
1248 let ca_self = self.rechunk();
1249 let arr = ca_self.downcast_iter().next().unwrap();
1250 let no_nulls = arr.null_count() == 0;
1251 agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
1252 debug_assert!(idx.len() <= arr.len());
1253 if idx.is_empty() {
1254 return None;
1255 }
1256 if no_nulls {
1257 take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1258 } else {
1259 take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1260 }
1261 })
1262 },
1263 GroupsType::Slice {
1264 groups: groups_slice,
1265 overlapping,
1266 monotonic,
1267 } => {
1268 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
1269 let ca = self
1270 .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
1271 .unwrap();
1272 ca.agg_var(groups, ddof)
1273 } else {
1274 _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
1275 debug_assert!(first + len <= self.len() as IdxSize);
1276 match len {
1277 0 => None,
1278 1 => {
1279 if ddof == 0 {
1280 NumCast::from(0)
1281 } else {
1282 None
1283 }
1284 },
1285 _ => {
1286 let arr_group = _slice_from_offsets(self, first, len);
1287 arr_group.var(ddof)
1288 },
1289 }
1290 })
1291 }
1292 },
1293 }
1294 }
1295 pub(crate) unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series {
1296 match groups {
1297 GroupsType::Idx(groups) => {
1298 let ca_self = self.rechunk();
1299 let arr = ca_self.downcast_iter().next().unwrap();
1300 let no_nulls = arr.null_count() == 0;
1301 agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
1302 debug_assert!(idx.len() <= self.len());
1303 if idx.is_empty() {
1304 return None;
1305 }
1306 let out = if no_nulls {
1307 take_var_no_null_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1308 } else {
1309 take_var_nulls_primitive_iter_unchecked(arr, idx2usize(idx), ddof)
1310 };
1311 out.map(|v| v.sqrt())
1312 })
1313 },
1314 GroupsType::Slice {
1315 groups: groups_slice,
1316 overlapping,
1317 monotonic,
1318 } => {
1319 if _use_rolling_kernels(groups_slice, *overlapping, *monotonic, self.chunks()) {
1320 let ca = self
1321 .cast_with_options(&DataType::Float64, CastOptions::Overflowing)
1322 .unwrap();
1323 ca.agg_std(groups, ddof)
1324 } else {
1325 _agg_helper_slice::<Float64Type, _>(groups_slice, |[first, len]| {
1326 debug_assert!(first + len <= self.len() as IdxSize);
1327 match len {
1328 0 => None,
1329 1 => {
1330 if ddof == 0 {
1331 NumCast::from(0)
1332 } else {
1333 None
1334 }
1335 },
1336 _ => {
1337 let arr_group = _slice_from_offsets(self, first, len);
1338 arr_group.std(ddof)
1339 },
1340 }
1341 })
1342 }
1343 },
1344 }
1345 }
1346
1347 pub(crate) unsafe fn agg_quantile(
1348 &self,
1349 groups: &GroupsType,
1350 quantile: f64,
1351 method: QuantileMethod,
1352 ) -> Series {
1353 agg_quantile_generic::<_, Float64Type>(self, groups, quantile, method)
1354 }
1355 pub(crate) unsafe fn agg_median(&self, groups: &GroupsType) -> Series {
1356 agg_median_generic::<_, Float64Type>(self, groups)
1357 }
1358}