polars_ops/chunked_array/list/
sum_mean.rs

1use std::ops::Div;
2
3use arrow::array::{Array, PrimitiveArray};
4use arrow::bitmap::Bitmap;
5use arrow::compute::utils::combine_validities_and;
6use arrow::types::NativeType;
7use num_traits::{NumCast, ToPrimitive};
8
9use super::*;
10use crate::chunked_array::sum::sum_slice;
11
12fn sum_between_offsets<T, S>(values: &[T], offset: &[i64]) -> Vec<S>
13where
14    T: NativeType + ToPrimitive,
15    S: NumCast + std::iter::Sum,
16{
17    offset
18        .windows(2)
19        .map(|w| {
20            values
21                .get(w[0] as usize..w[1] as usize)
22                .map(sum_slice)
23                .unwrap_or(S::from(0).unwrap())
24        })
25        .collect()
26}
27
28fn dispatch_sum<T, S>(arr: &dyn Array, offsets: &[i64], validity: Option<&Bitmap>) -> ArrayRef
29where
30    T: NativeType + ToPrimitive,
31    S: NativeType + NumCast + std::iter::Sum,
32{
33    let values = arr.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
34    let values = values.values().as_slice();
35    Box::new(PrimitiveArray::from_data_default(
36        sum_between_offsets::<_, S>(values, offsets).into(),
37        validity.cloned(),
38    )) as ArrayRef
39}
40
41pub(super) fn sum_list_numerical(ca: &ListChunked, inner_type: &DataType) -> Series {
42    use DataType::*;
43    let chunks = ca
44        .downcast_iter()
45        .map(|arr| {
46            let offsets = arr.offsets().as_slice();
47            let values = arr.values().as_ref();
48
49            match inner_type {
50                Int8 => dispatch_sum::<i8, i64>(values, offsets, arr.validity()),
51                Int16 => dispatch_sum::<i16, i64>(values, offsets, arr.validity()),
52                Int32 => dispatch_sum::<i32, i32>(values, offsets, arr.validity()),
53                Int64 => dispatch_sum::<i64, i64>(values, offsets, arr.validity()),
54                Int128 => dispatch_sum::<i128, i128>(values, offsets, arr.validity()),
55                UInt8 => dispatch_sum::<u8, i64>(values, offsets, arr.validity()),
56                UInt16 => dispatch_sum::<u16, i64>(values, offsets, arr.validity()),
57                UInt32 => dispatch_sum::<u32, u32>(values, offsets, arr.validity()),
58                UInt64 => dispatch_sum::<u64, u64>(values, offsets, arr.validity()),
59                Float32 => dispatch_sum::<f32, f32>(values, offsets, arr.validity()),
60                Float64 => dispatch_sum::<f64, f64>(values, offsets, arr.validity()),
61                _ => unimplemented!(),
62            }
63        })
64        .collect::<Vec<_>>();
65
66    Series::try_from((ca.name().clone(), chunks)).unwrap()
67}
68
69pub(super) fn sum_with_nulls(ca: &ListChunked, inner_dtype: &DataType) -> PolarsResult<Series> {
70    use DataType::*;
71    // TODO: add fast path for smaller ints?
72    let mut out = match inner_dtype {
73        Boolean => {
74            let out: IdxCa =
75                ca.apply_amortized_generic(|s| s.map(|s| s.as_ref().sum::<IdxSize>().unwrap()));
76            out.into_series()
77        },
78        UInt32 => {
79            let out: UInt32Chunked =
80                ca.apply_amortized_generic(|s| s.map(|s| s.as_ref().sum::<u32>().unwrap()));
81            out.into_series()
82        },
83        UInt64 => {
84            let out: UInt64Chunked =
85                ca.apply_amortized_generic(|s| s.map(|s| s.as_ref().sum::<u64>().unwrap()));
86            out.into_series()
87        },
88        Int32 => {
89            let out: Int32Chunked =
90                ca.apply_amortized_generic(|s| s.map(|s| s.as_ref().sum::<i32>().unwrap()));
91            out.into_series()
92        },
93        Int64 => {
94            let out: Int64Chunked =
95                ca.apply_amortized_generic(|s| s.map(|s| s.as_ref().sum::<i64>().unwrap()));
96            out.into_series()
97        },
98        Float32 => {
99            let out: Float32Chunked =
100                ca.apply_amortized_generic(|s| s.map(|s| s.as_ref().sum::<f32>().unwrap()));
101            out.into_series()
102        },
103        Float64 => {
104            let out: Float64Chunked =
105                ca.apply_amortized_generic(|s| s.map(|s| s.as_ref().sum::<f64>().unwrap()));
106            out.into_series()
107        },
108        // slowest sum_as_series path
109        dt => ca
110            .try_apply_amortized(|s| {
111                s.as_ref()
112                    .sum_reduce()
113                    .map(|sc| sc.into_series(PlSmallStr::EMPTY))
114            })?
115            .explode()
116            .unwrap()
117            .into_series()
118            .cast(dt)?,
119    };
120    out.rename(ca.name().clone());
121    Ok(out)
122}
123
124fn mean_between_offsets<T, S>(values: &[T], offset: &[i64]) -> PrimitiveArray<S>
125where
126    T: NativeType + ToPrimitive,
127    S: NativeType + NumCast + std::iter::Sum + Div<Output = S>,
128{
129    offset
130        .windows(2)
131        .map(|w| {
132            values
133                .get(w[0] as usize..w[1] as usize)
134                .filter(|sl| !sl.is_empty())
135                .map(|sl| sum_slice::<_, S>(sl) / NumCast::from(sl.len()).unwrap())
136        })
137        .collect()
138}
139
140fn dispatch_mean<T, S>(arr: &dyn Array, offsets: &[i64], validity: Option<&Bitmap>) -> ArrayRef
141where
142    T: NativeType + ToPrimitive,
143    S: NativeType + NumCast + std::iter::Sum + Div<Output = S>,
144{
145    let values = arr.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
146    let values = values.values().as_slice();
147    let out = mean_between_offsets::<_, S>(values, offsets);
148    let new_validity = combine_validities_and(out.validity(), validity);
149    out.with_validity(new_validity).to_boxed()
150}
151
152pub(super) fn mean_list_numerical(ca: &ListChunked, inner_type: &DataType) -> Series {
153    use DataType::*;
154    let chunks = ca
155        .downcast_iter()
156        .map(|arr| {
157            let offsets = arr.offsets().as_slice();
158            let values = arr.values().as_ref();
159
160            match inner_type {
161                Int8 => dispatch_mean::<i8, f64>(values, offsets, arr.validity()),
162                Int16 => dispatch_mean::<i16, f64>(values, offsets, arr.validity()),
163                Int32 => dispatch_mean::<i32, f64>(values, offsets, arr.validity()),
164                Int64 => dispatch_mean::<i64, f64>(values, offsets, arr.validity()),
165                Int128 => dispatch_mean::<i128, f64>(values, offsets, arr.validity()),
166                UInt8 => dispatch_mean::<u8, f64>(values, offsets, arr.validity()),
167                UInt16 => dispatch_mean::<u16, f64>(values, offsets, arr.validity()),
168                UInt32 => dispatch_mean::<u32, f64>(values, offsets, arr.validity()),
169                UInt64 => dispatch_mean::<u64, f64>(values, offsets, arr.validity()),
170                Float32 => dispatch_mean::<f32, f32>(values, offsets, arr.validity()),
171                Float64 => dispatch_mean::<f64, f64>(values, offsets, arr.validity()),
172                _ => unimplemented!(),
173            }
174        })
175        .collect::<Vec<_>>();
176
177    Series::try_from((ca.name().clone(), chunks)).unwrap()
178}
179
180pub(super) fn mean_with_nulls(ca: &ListChunked) -> Series {
181    match ca.inner_dtype() {
182        DataType::Float32 => {
183            let out: Float32Chunked = ca
184                .apply_amortized_generic(|s| s.and_then(|s| s.as_ref().mean().map(|v| v as f32)))
185                .with_name(ca.name().clone());
186            out.into_series()
187        },
188        #[cfg(feature = "dtype-datetime")]
189        DataType::Date => {
190            const MS_IN_DAY: i64 = 86_400_000;
191            let out: Int64Chunked = ca
192                .apply_amortized_generic(|s| {
193                    s.and_then(|s| s.as_ref().mean().map(|v| (v * (MS_IN_DAY as f64)) as i64))
194                })
195                .with_name(ca.name().clone());
196            out.into_datetime(TimeUnit::Milliseconds, None)
197                .into_series()
198        },
199        dt if dt.is_temporal() => {
200            let out: Int64Chunked = ca
201                .apply_amortized_generic(|s| s.and_then(|s| s.as_ref().mean().map(|v| v as i64)))
202                .with_name(ca.name().clone());
203            out.cast(dt).unwrap()
204        },
205        _ => {
206            let out: Float64Chunked = ca
207                .apply_amortized_generic(|s| s.and_then(|s| s.as_ref().mean()))
208                .with_name(ca.name().clone());
209            out.into_series()
210        },
211    }
212}