Skip to main content

polars_core/chunked_array/ops/
arity.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2use std::error::Error;
3
4use arrow::array::{Array, MutablePlString, StaticArray};
5use arrow::compute::utils::combine_validities_and;
6use polars_error::PolarsResult;
7use polars_utils::pl_str::PlSmallStr;
8
9use crate::chunked_array::flags::StatisticsFlags;
10use crate::datatypes::{ArrayCollectIterExt, ArrayFromIter};
11use crate::prelude::{ChunkedArray, CompatLevel, PolarsDataType, Series, StringChunked};
12use crate::utils::{align_chunks_binary, align_chunks_binary_owned, align_chunks_ternary};
13
14#[macro_export]
15macro_rules! binary_output_height {
16    ($a:expr, $b:expr, op = $op:expr) => {
17        match ($a.len(), $b.len()) {
18            (a, 1) | (1, a) => Ok(a),
19            (a, b) if a == b => Ok(a),
20            (a, b) => Err(polars_err!(
21                ShapeMismatch:
22                "{} got differing lengths \
23                ({}: {}, {}: {})",
24                $op,
25                stringify!($a.len()), $a.len(),
26                stringify!($b.len()), $b.len(),
27            )),
28        }
29    }
30}
31
32#[macro_export]
33macro_rules! ternary_output_height {
34    ($a:expr, $b:expr, $c:expr, op = $op:expr) => {
35        match ($a.len(), $b.len(), $c.len()) {
36            (a, 1, 1) | (1, a, 1) | (1, 1, a) => Ok(a),
37            (a, b, 1) | (a, 1, b) | (1, a, b) if a == b => Ok(a),
38            (a, b, c) if a == b && b == c => Ok(a),
39            (a, b, c) => Err(polars_err!(
40                ShapeMismatch:
41                "{} got differing lengths \
42                ({}: {}, {}: {}, {}: {})",
43                $op,
44                stringify!($a.len()), $a.len(),
45                stringify!($b.len()), $b.len(),
46                stringify!($c.len()), $c.len(),
47            )),
48        }
49    }
50}
51
52// We need this helper because for<'a> notation can't yet be applied properly
53// on the return type.
54pub trait UnaryFnMut<A1>: FnMut(A1) -> Self::Ret {
55    type Ret;
56}
57
58impl<A1, R, T: FnMut(A1) -> R> UnaryFnMut<A1> for T {
59    type Ret = R;
60}
61
62// We need this helper because for<'a> notation can't yet be applied properly
63// on the return type.
64pub trait TernaryFnMut<A1, A2, A3>: FnMut(A1, A2, A3) -> Self::Ret {
65    type Ret;
66}
67
68impl<A1, A2, A3, R, T: FnMut(A1, A2, A3) -> R> TernaryFnMut<A1, A2, A3> for T {
69    type Ret = R;
70}
71
72// We need this helper because for<'a> notation can't yet be applied properly
73// on the return type.
74pub trait BinaryFnMut<A1, A2>: FnMut(A1, A2) -> Self::Ret {
75    type Ret;
76}
77
78impl<A1, A2, R, T: FnMut(A1, A2) -> R> BinaryFnMut<A1, A2> for T {
79    type Ret = R;
80}
81
82/// Applies a kernel that produces `Array` types.
83#[inline]
84pub fn unary_kernel<T, V, F, Arr>(ca: &ChunkedArray<T>, op: F) -> ChunkedArray<V>
85where
86    T: PolarsDataType,
87    V: PolarsDataType<Array = Arr>,
88    Arr: Array,
89    F: FnMut(&T::Array) -> Arr,
90{
91    let iter = ca.downcast_iter().map(op);
92    ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
93}
94
95/// Applies a kernel that produces `Array` types.
96#[inline]
97pub fn unary_kernel_owned<T, V, F, Arr>(ca: ChunkedArray<T>, op: F) -> ChunkedArray<V>
98where
99    T: PolarsDataType,
100    V: PolarsDataType<Array = Arr>,
101    Arr: Array,
102    F: FnMut(T::Array) -> Arr,
103{
104    let name = ca.name().clone();
105    let iter = ca.downcast_into_iter().map(op);
106    ChunkedArray::from_chunk_iter(name, iter)
107}
108
109#[inline]
110pub fn unary_elementwise<'a, T, V, F>(ca: &'a ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
111where
112    T: PolarsDataType,
113    V: PolarsDataType,
114    F: UnaryFnMut<Option<T::Physical<'a>>>,
115    V::Array: ArrayFromIter<<F as UnaryFnMut<Option<T::Physical<'a>>>>::Ret>,
116{
117    if ca.has_nulls() {
118        let iter = ca
119            .downcast_iter()
120            .map(|arr| arr.iter().map(&mut op).collect_arr());
121        ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
122    } else {
123        let iter = ca
124            .downcast_iter()
125            .map(|arr| arr.values_iter().map(|x| op(Some(x))).collect_arr());
126        ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
127    }
128}
129
130#[inline]
131pub fn try_unary_elementwise<'a, T, V, F, K, E>(
132    ca: &'a ChunkedArray<T>,
133    mut op: F,
134) -> Result<ChunkedArray<V>, E>
135where
136    T: PolarsDataType,
137    V: PolarsDataType,
138    F: FnMut(Option<T::Physical<'a>>) -> Result<Option<K>, E>,
139    V::Array: ArrayFromIter<Option<K>>,
140{
141    let iter = ca
142        .downcast_iter()
143        .map(|arr| arr.iter().map(&mut op).try_collect_arr());
144    ChunkedArray::try_from_chunk_iter(ca.name().clone(), iter)
145}
146
147#[inline]
148pub fn unary_elementwise_values<'a, T, V, F>(ca: &'a ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
149where
150    T: PolarsDataType,
151    V: PolarsDataType,
152    F: UnaryFnMut<T::Physical<'a>>,
153    V::Array: ArrayFromIter<<F as UnaryFnMut<T::Physical<'a>>>::Ret>,
154{
155    if ca.null_count() == ca.len() {
156        let arr = V::Array::full_null(
157            ca.len(),
158            V::get_static_dtype().to_arrow(CompatLevel::newest()),
159        );
160        return ChunkedArray::with_chunk(ca.name().clone(), arr);
161    }
162
163    let iter = ca.downcast_iter().map(|arr| {
164        let validity = arr.validity().cloned();
165        let arr: V::Array = arr.values_iter().map(&mut op).collect_arr();
166        arr.with_validity_typed(validity)
167    });
168    ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
169}
170
171#[inline]
172pub fn try_unary_elementwise_values<'a, T, V, F, K, E>(
173    ca: &'a ChunkedArray<T>,
174    mut op: F,
175) -> Result<ChunkedArray<V>, E>
176where
177    T: PolarsDataType,
178    V: PolarsDataType,
179    F: FnMut(T::Physical<'a>) -> Result<K, E>,
180    V::Array: ArrayFromIter<K>,
181{
182    if ca.null_count() == ca.len() {
183        let arr = V::Array::full_null(
184            ca.len(),
185            V::get_static_dtype().to_arrow(CompatLevel::newest()),
186        );
187        return Ok(ChunkedArray::with_chunk(ca.name().clone(), arr));
188    }
189
190    let iter = ca.downcast_iter().map(|arr| {
191        let validity = arr.validity().cloned();
192        let arr: V::Array = arr.values_iter().map(&mut op).try_collect_arr()?;
193        Ok(arr.with_validity_typed(validity))
194    });
195    ChunkedArray::try_from_chunk_iter(ca.name().clone(), iter)
196}
197
198/// Applies a kernel that produces `Array` types.
199///
200/// Intended for kernels that apply on values, this function will apply the
201/// validity mask afterwards.
202#[inline]
203pub fn unary_mut_values<T, V, F, Arr>(ca: &ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
204where
205    T: PolarsDataType,
206    V: PolarsDataType<Array = Arr>,
207    Arr: Array + StaticArray,
208    F: FnMut(&T::Array) -> Arr,
209{
210    let iter = ca
211        .downcast_iter()
212        .map(|arr| op(arr).with_validity_typed(arr.validity().cloned()));
213    ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
214}
215
216/// Applies a kernel that produces `Array` types.
217#[inline]
218pub fn unary_mut_with_options<T, V, F, Arr>(ca: &ChunkedArray<T>, op: F) -> ChunkedArray<V>
219where
220    T: PolarsDataType,
221    V: PolarsDataType<Array = Arr>,
222    Arr: Array + StaticArray,
223    F: FnMut(&T::Array) -> Arr,
224{
225    ChunkedArray::from_chunk_iter(ca.name().clone(), ca.downcast_iter().map(op))
226}
227
228#[inline]
229pub fn try_unary_mut_with_options<T, V, F, Arr, E>(
230    ca: &ChunkedArray<T>,
231    op: F,
232) -> Result<ChunkedArray<V>, E>
233where
234    T: PolarsDataType,
235    V: PolarsDataType<Array = Arr>,
236    Arr: Array + StaticArray,
237    F: FnMut(&T::Array) -> Result<Arr, E>,
238    E: Error,
239{
240    ChunkedArray::try_from_chunk_iter(ca.name().clone(), ca.downcast_iter().map(op))
241}
242
243#[inline]
244pub fn binary_elementwise<T, U, V, F>(
245    lhs: &ChunkedArray<T>,
246    rhs: &ChunkedArray<U>,
247    mut op: F,
248) -> ChunkedArray<V>
249where
250    T: PolarsDataType,
251    U: PolarsDataType,
252    V: PolarsDataType,
253    F: for<'a> BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>,
254    V::Array: for<'a> ArrayFromIter<
255        <F as BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>>::Ret,
256    >,
257{
258    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
259    let iter = lhs
260        .downcast_iter()
261        .zip(rhs.downcast_iter())
262        .map(|(lhs_arr, rhs_arr)| {
263            let element_iter = lhs_arr
264                .iter()
265                .zip(rhs_arr.iter())
266                .map(|(lhs_opt_val, rhs_opt_val)| op(lhs_opt_val, rhs_opt_val));
267            element_iter.collect_arr()
268        });
269    ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
270}
271
272#[inline]
273pub fn binary_elementwise_for_each<'a, 'b, T, U, F>(
274    lhs: &'a ChunkedArray<T>,
275    rhs: &'b ChunkedArray<U>,
276    mut op: F,
277) where
278    T: PolarsDataType,
279    U: PolarsDataType,
280    F: FnMut(Option<T::Physical<'a>>, Option<U::Physical<'b>>),
281{
282    let mut lhs_arr_iter = lhs.downcast_iter();
283    let mut rhs_arr_iter = rhs.downcast_iter();
284
285    let lhs_arr = lhs_arr_iter.next().unwrap();
286    let rhs_arr = rhs_arr_iter.next().unwrap();
287
288    let mut lhs_remaining = lhs_arr.len();
289    let mut rhs_remaining = rhs_arr.len();
290    let mut lhs_iter = lhs_arr.iter();
291    let mut rhs_iter = rhs_arr.iter();
292
293    loop {
294        let range = std::cmp::min(lhs_remaining, rhs_remaining);
295
296        for _ in 0..range {
297            // SAFETY: we loop until the smaller iter is exhausted.
298            let lhs_opt_val = unsafe { lhs_iter.next().unwrap_unchecked() };
299            let rhs_opt_val = unsafe { rhs_iter.next().unwrap_unchecked() };
300            op(lhs_opt_val, rhs_opt_val)
301        }
302        lhs_remaining -= range;
303        rhs_remaining -= range;
304
305        if lhs_remaining == 0 {
306            let Some(new_arr) = lhs_arr_iter.next() else {
307                return;
308            };
309            lhs_remaining = new_arr.len();
310            lhs_iter = new_arr.iter();
311        }
312        if rhs_remaining == 0 {
313            let Some(new_arr) = rhs_arr_iter.next() else {
314                return;
315            };
316            rhs_remaining = new_arr.len();
317            rhs_iter = new_arr.iter();
318        }
319    }
320}
321
322#[inline]
323pub fn try_binary_elementwise<T, U, V, F, K, E>(
324    lhs: &ChunkedArray<T>,
325    rhs: &ChunkedArray<U>,
326    mut op: F,
327) -> Result<ChunkedArray<V>, E>
328where
329    T: PolarsDataType,
330    U: PolarsDataType,
331    V: PolarsDataType,
332    F: for<'a> FnMut(Option<T::Physical<'a>>, Option<U::Physical<'a>>) -> Result<Option<K>, E>,
333    V::Array: ArrayFromIter<Option<K>>,
334{
335    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
336    let iter = lhs
337        .downcast_iter()
338        .zip(rhs.downcast_iter())
339        .map(|(lhs_arr, rhs_arr)| {
340            let element_iter = lhs_arr
341                .iter()
342                .zip(rhs_arr.iter())
343                .map(|(lhs_opt_val, rhs_opt_val)| op(lhs_opt_val, rhs_opt_val));
344            element_iter.try_collect_arr()
345        });
346    ChunkedArray::try_from_chunk_iter(lhs.name().clone(), iter)
347}
348
349#[inline]
350pub fn binary_elementwise_values<T, U, V, F, K>(
351    lhs: &ChunkedArray<T>,
352    rhs: &ChunkedArray<U>,
353    mut op: F,
354) -> ChunkedArray<V>
355where
356    T: PolarsDataType,
357    U: PolarsDataType,
358    V: PolarsDataType,
359    F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>) -> K,
360    V::Array: ArrayFromIter<K>,
361{
362    if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
363        let len = lhs.len().min(rhs.len());
364        let arr = V::Array::full_null(len, V::get_static_dtype().to_arrow(CompatLevel::newest()));
365
366        return ChunkedArray::with_chunk(lhs.name().clone(), arr);
367    }
368
369    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
370
371    let iter = lhs
372        .downcast_iter()
373        .zip(rhs.downcast_iter())
374        .map(|(lhs_arr, rhs_arr)| {
375            let validity = combine_validities_and(lhs_arr.validity(), rhs_arr.validity());
376
377            let element_iter = lhs_arr
378                .values_iter()
379                .zip(rhs_arr.values_iter())
380                .map(|(lhs_val, rhs_val)| op(lhs_val, rhs_val));
381
382            let array: V::Array = element_iter.collect_arr();
383            array.with_validity_typed(validity)
384        });
385    ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
386}
387
388/// Apply elementwise binary function which produces string, amortising allocations.
389///
390/// Currently unused within Polars itself, but it's a useful utility for plugin authors.
391#[inline]
392pub fn binary_elementwise_into_string_amortized<T, U, F>(
393    lhs: &ChunkedArray<T>,
394    rhs: &ChunkedArray<U>,
395    mut op: F,
396) -> StringChunked
397where
398    T: PolarsDataType,
399    U: PolarsDataType,
400    F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>, &mut String),
401{
402    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
403    let mut buf = String::new();
404    let iter = lhs
405        .downcast_iter()
406        .zip(rhs.downcast_iter())
407        .map(|(lhs_arr, rhs_arr)| {
408            let mut mutarr = MutablePlString::with_capacity(lhs_arr.len());
409            lhs_arr
410                .iter()
411                .zip(rhs_arr.iter())
412                .for_each(|(lhs_opt, rhs_opt)| match (lhs_opt, rhs_opt) {
413                    (None, _) | (_, None) => mutarr.push_null(),
414                    (Some(lhs_val), Some(rhs_val)) => {
415                        buf.clear();
416                        op(lhs_val, rhs_val, &mut buf);
417                        mutarr.push_value(&buf)
418                    },
419                });
420            mutarr.freeze()
421        });
422    ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
423}
424
425/// Applies a kernel that produces `Array` types.
426///
427/// Intended for kernels that apply on values, this function will filter out any
428/// results which do not have two non-null inputs.
429#[inline]
430pub fn binary_mut_values<T, U, V, F, Arr>(
431    lhs: &ChunkedArray<T>,
432    rhs: &ChunkedArray<U>,
433    mut op: F,
434    name: PlSmallStr,
435) -> ChunkedArray<V>
436where
437    T: PolarsDataType,
438    U: PolarsDataType,
439    V: PolarsDataType<Array = Arr>,
440    Arr: Array + StaticArray,
441    F: FnMut(&T::Array, &U::Array) -> Arr,
442{
443    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
444    let iter = lhs
445        .downcast_iter()
446        .zip(rhs.downcast_iter())
447        .map(|(lhs_arr, rhs_arr)| {
448            let ret = op(lhs_arr, rhs_arr);
449            let inp_val = combine_validities_and(lhs_arr.validity(), rhs_arr.validity());
450            let val = combine_validities_and(inp_val.as_ref(), ret.validity());
451            ret.with_validity_typed(val)
452        });
453    ChunkedArray::from_chunk_iter(name, iter)
454}
455
456/// Applies a kernel that produces `Array` types.
457#[inline]
458pub fn binary_mut_with_options<T, U, V, F, Arr>(
459    lhs: &ChunkedArray<T>,
460    rhs: &ChunkedArray<U>,
461    mut op: F,
462    name: PlSmallStr,
463) -> ChunkedArray<V>
464where
465    T: PolarsDataType,
466    U: PolarsDataType,
467    V: PolarsDataType<Array = Arr>,
468    Arr: Array,
469    F: FnMut(&T::Array, &U::Array) -> Arr,
470{
471    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
472    let iter = lhs
473        .downcast_iter()
474        .zip(rhs.downcast_iter())
475        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
476    ChunkedArray::from_chunk_iter(name, iter)
477}
478
479#[inline]
480pub fn try_binary_mut_with_options<T, U, V, F, Arr, E>(
481    lhs: &ChunkedArray<T>,
482    rhs: &ChunkedArray<U>,
483    mut op: F,
484    name: PlSmallStr,
485) -> Result<ChunkedArray<V>, E>
486where
487    T: PolarsDataType,
488    U: PolarsDataType,
489    V: PolarsDataType<Array = Arr>,
490    Arr: Array,
491    F: FnMut(&T::Array, &U::Array) -> Result<Arr, E>,
492    E: Error,
493{
494    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
495    let iter = lhs
496        .downcast_iter()
497        .zip(rhs.downcast_iter())
498        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
499    ChunkedArray::try_from_chunk_iter(name, iter)
500}
501
502/// Applies a kernel that produces `Array` types.
503pub fn binary<T, U, V, F, Arr>(
504    lhs: &ChunkedArray<T>,
505    rhs: &ChunkedArray<U>,
506    op: F,
507) -> ChunkedArray<V>
508where
509    T: PolarsDataType,
510    U: PolarsDataType,
511    V: PolarsDataType<Array = Arr>,
512    Arr: Array,
513    F: FnMut(&T::Array, &U::Array) -> Arr,
514{
515    binary_mut_with_options(lhs, rhs, op, lhs.name().clone())
516}
517
518/// Applies a kernel that produces `Array` types.
519pub fn binary_owned<L, R, V, F, Arr>(
520    lhs: ChunkedArray<L>,
521    rhs: ChunkedArray<R>,
522    mut op: F,
523) -> ChunkedArray<V>
524where
525    L: PolarsDataType,
526    R: PolarsDataType,
527    V: PolarsDataType<Array = Arr>,
528    Arr: Array,
529    F: FnMut(L::Array, R::Array) -> Arr,
530{
531    let name = lhs.name().clone();
532    let (lhs, rhs) = align_chunks_binary_owned(lhs, rhs);
533    let iter = lhs
534        .downcast_into_iter()
535        .zip(rhs.downcast_into_iter())
536        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
537    ChunkedArray::from_chunk_iter(name, iter)
538}
539
540/// Applies a kernel that produces `Array` types.
541pub fn try_binary<T, U, V, F, Arr, E>(
542    lhs: &ChunkedArray<T>,
543    rhs: &ChunkedArray<U>,
544    mut op: F,
545) -> Result<ChunkedArray<V>, E>
546where
547    T: PolarsDataType,
548    U: PolarsDataType,
549    V: PolarsDataType<Array = Arr>,
550    Arr: Array,
551    F: FnMut(&T::Array, &U::Array) -> Result<Arr, E>,
552    E: Error,
553{
554    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
555    let iter = lhs
556        .downcast_iter()
557        .zip(rhs.downcast_iter())
558        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
559    ChunkedArray::try_from_chunk_iter(lhs.name().clone(), iter)
560}
561
562/// Applies a kernel that produces `ArrayRef` of the same type.
563///
564/// # Safety
565/// Caller must ensure that the returned `ArrayRef` belongs to `T: PolarsDataType`.
566#[inline]
567pub unsafe fn binary_unchecked_same_type<T, U, F>(
568    lhs: &ChunkedArray<T>,
569    rhs: &ChunkedArray<U>,
570    mut op: F,
571    keep_sorted: bool,
572    keep_fast_explode: bool,
573) -> ChunkedArray<T>
574where
575    T: PolarsDataType,
576    U: PolarsDataType,
577    F: FnMut(&T::Array, &U::Array) -> Box<dyn Array>,
578{
579    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
580    let chunks = lhs
581        .downcast_iter()
582        .zip(rhs.downcast_iter())
583        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
584        .collect();
585
586    let mut ca = lhs.copy_with_chunks(chunks);
587
588    let mut retain_flags = StatisticsFlags::empty();
589    use StatisticsFlags as F;
590    retain_flags.set(F::IS_SORTED_ANY, keep_sorted);
591    retain_flags.set(F::CAN_FAST_EXPLODE_LIST, keep_fast_explode);
592    ca.retain_flags_from(lhs.as_ref(), retain_flags);
593
594    ca
595}
596
597pub fn try_unary_to_series<T, F>(ca: &ChunkedArray<T>, op: F) -> PolarsResult<Series>
598where
599    T: PolarsDataType,
600    F: FnMut(&T::Array) -> PolarsResult<Box<dyn Array>>,
601{
602    let chunks = ca
603        .downcast_iter()
604        .map(op)
605        .collect::<PolarsResult<Vec<_>>>()?;
606    Series::try_from((ca.name().clone(), chunks))
607}
608
609pub fn binary_to_series<T, U, F>(
610    lhs: &ChunkedArray<T>,
611    rhs: &ChunkedArray<U>,
612    mut op: F,
613) -> PolarsResult<Series>
614where
615    T: PolarsDataType,
616    U: PolarsDataType,
617    F: FnMut(&T::Array, &U::Array) -> Box<dyn Array>,
618{
619    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
620    let chunks = lhs
621        .downcast_iter()
622        .zip(rhs.downcast_iter())
623        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
624        .collect::<Vec<_>>();
625    Series::try_from((lhs.name().clone(), chunks))
626}
627
628pub fn try_binary_to_series<T, U, F>(
629    lhs: &ChunkedArray<T>,
630    rhs: &ChunkedArray<U>,
631    mut op: F,
632) -> PolarsResult<Series>
633where
634    T: PolarsDataType,
635    U: PolarsDataType,
636    F: FnMut(&T::Array, &U::Array) -> PolarsResult<Box<dyn Array>>,
637{
638    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
639    let chunks = lhs
640        .downcast_iter()
641        .zip(rhs.downcast_iter())
642        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
643        .collect::<PolarsResult<Vec<_>>>()?;
644    Series::try_from((lhs.name().clone(), chunks))
645}
646
647/// Applies a kernel that produces `ArrayRef` of the same type.
648///
649/// # Safety
650/// Caller must ensure that the returned `ArrayRef` belongs to `T: PolarsDataType`.
651#[inline]
652pub unsafe fn try_binary_unchecked_same_type<T, U, F, E>(
653    lhs: &ChunkedArray<T>,
654    rhs: &ChunkedArray<U>,
655    mut op: F,
656    keep_sorted: bool,
657    keep_fast_explode: bool,
658) -> Result<ChunkedArray<T>, E>
659where
660    T: PolarsDataType,
661    U: PolarsDataType,
662    F: FnMut(&T::Array, &U::Array) -> Result<Box<dyn Array>, E>,
663    E: Error,
664{
665    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
666    let chunks = lhs
667        .downcast_iter()
668        .zip(rhs.downcast_iter())
669        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
670        .collect::<Result<Vec<_>, E>>()?;
671    let mut ca = lhs.copy_with_chunks(chunks);
672
673    let mut retain_flags = StatisticsFlags::empty();
674    use StatisticsFlags as F;
675    retain_flags.set(F::IS_SORTED_ANY, keep_sorted);
676    retain_flags.set(F::CAN_FAST_EXPLODE_LIST, keep_fast_explode);
677    ca.retain_flags_from(lhs.as_ref(), retain_flags);
678
679    Ok(ca)
680}
681
682#[inline]
683pub fn try_ternary_elementwise<T, U, V, G, F, K, E>(
684    ca1: &ChunkedArray<T>,
685    ca2: &ChunkedArray<U>,
686    ca3: &ChunkedArray<G>,
687    mut op: F,
688) -> Result<ChunkedArray<V>, E>
689where
690    T: PolarsDataType,
691    U: PolarsDataType,
692    V: PolarsDataType,
693    G: PolarsDataType,
694    F: for<'a> FnMut(
695        Option<T::Physical<'a>>,
696        Option<U::Physical<'a>>,
697        Option<G::Physical<'a>>,
698    ) -> Result<Option<K>, E>,
699    V::Array: ArrayFromIter<Option<K>>,
700{
701    let (ca1, ca2, ca3) = align_chunks_ternary(ca1, ca2, ca3);
702    let iter = ca1
703        .downcast_iter()
704        .zip(ca2.downcast_iter())
705        .zip(ca3.downcast_iter())
706        .map(|((ca1_arr, ca2_arr), ca3_arr)| {
707            let element_iter = ca1_arr.iter().zip(ca2_arr.iter()).zip(ca3_arr.iter()).map(
708                |((ca1_opt_val, ca2_opt_val), ca3_opt_val)| {
709                    op(ca1_opt_val, ca2_opt_val, ca3_opt_val)
710                },
711            );
712            element_iter.try_collect_arr()
713        });
714    ChunkedArray::try_from_chunk_iter(ca1.name().clone(), iter)
715}
716
717#[inline]
718pub fn ternary_elementwise<T, U, V, G, F>(
719    ca1: &ChunkedArray<T>,
720    ca2: &ChunkedArray<U>,
721    ca3: &ChunkedArray<G>,
722    mut op: F,
723) -> ChunkedArray<V>
724where
725    T: PolarsDataType,
726    U: PolarsDataType,
727    G: PolarsDataType,
728    V: PolarsDataType,
729    F: for<'a> TernaryFnMut<
730            Option<T::Physical<'a>>,
731            Option<U::Physical<'a>>,
732            Option<G::Physical<'a>>,
733        >,
734    V::Array: for<'a> ArrayFromIter<
735        <F as TernaryFnMut<
736            Option<T::Physical<'a>>,
737            Option<U::Physical<'a>>,
738            Option<G::Physical<'a>>,
739        >>::Ret,
740    >,
741{
742    let (ca1, ca2, ca3) = align_chunks_ternary(ca1, ca2, ca3);
743    let iter = ca1
744        .downcast_iter()
745        .zip(ca2.downcast_iter())
746        .zip(ca3.downcast_iter())
747        .map(|((ca1_arr, ca2_arr), ca3_arr)| {
748            let element_iter = ca1_arr.iter().zip(ca2_arr.iter()).zip(ca3_arr.iter()).map(
749                |((ca1_opt_val, ca2_opt_val), ca3_opt_val)| {
750                    op(ca1_opt_val, ca2_opt_val, ca3_opt_val)
751                },
752            );
753            element_iter.collect_arr()
754        });
755    ChunkedArray::from_chunk_iter(ca1.name().clone(), iter)
756}
757
758pub fn broadcast_binary_elementwise<T, U, V, F>(
759    lhs: &ChunkedArray<T>,
760    rhs: &ChunkedArray<U>,
761    mut op: F,
762) -> ChunkedArray<V>
763where
764    T: PolarsDataType,
765    U: PolarsDataType,
766    V: PolarsDataType,
767    F: for<'a> BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>,
768    V::Array: for<'a> ArrayFromIter<
769        <F as BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>>::Ret,
770    >,
771{
772    match (lhs.len(), rhs.len()) {
773        (1, _) => {
774            let a = unsafe { lhs.get_unchecked(0) };
775            unary_elementwise(rhs, |b| op(a.clone(), b)).with_name(lhs.name().clone())
776        },
777        (_, 1) => {
778            let b = unsafe { rhs.get_unchecked(0) };
779            unary_elementwise(lhs, |a| op(a, b.clone()))
780        },
781        _ => binary_elementwise(lhs, rhs, op),
782    }
783}
784
785pub fn broadcast_try_binary_elementwise<T, U, V, F, K, E>(
786    lhs: &ChunkedArray<T>,
787    rhs: &ChunkedArray<U>,
788    mut op: F,
789) -> Result<ChunkedArray<V>, E>
790where
791    T: PolarsDataType,
792    U: PolarsDataType,
793    V: PolarsDataType,
794    F: for<'a> FnMut(Option<T::Physical<'a>>, Option<U::Physical<'a>>) -> Result<Option<K>, E>,
795    V::Array: ArrayFromIter<Option<K>>,
796{
797    match (lhs.len(), rhs.len()) {
798        (1, _) => {
799            let a = unsafe { lhs.get_unchecked(0) };
800            Ok(try_unary_elementwise(rhs, |b| op(a.clone(), b))?.with_name(lhs.name().clone()))
801        },
802        (_, 1) => {
803            let b = unsafe { rhs.get_unchecked(0) };
804            try_unary_elementwise(lhs, |a| op(a, b.clone()))
805        },
806        _ => try_binary_elementwise(lhs, rhs, op),
807    }
808}
809
810pub fn broadcast_binary_elementwise_values<T, U, V, F, K>(
811    lhs: &ChunkedArray<T>,
812    rhs: &ChunkedArray<U>,
813    mut op: F,
814) -> ChunkedArray<V>
815where
816    T: PolarsDataType,
817    U: PolarsDataType,
818    V: PolarsDataType,
819    F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>) -> K,
820    V::Array: ArrayFromIter<K>,
821{
822    if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
823        let min = lhs.len().min(rhs.len());
824        let max = lhs.len().max(rhs.len());
825        let len = if min == 1 { max } else { min };
826        let arr = V::Array::full_null(len, V::get_static_dtype().to_arrow(CompatLevel::newest()));
827
828        return ChunkedArray::with_chunk(lhs.name().clone(), arr);
829    }
830
831    match (lhs.len(), rhs.len()) {
832        (1, _) => {
833            let a = unsafe { lhs.value_unchecked(0) };
834            unary_elementwise_values(rhs, |b| op(a.clone(), b)).with_name(lhs.name().clone())
835        },
836        (_, 1) => {
837            let b = unsafe { rhs.value_unchecked(0) };
838            unary_elementwise_values(lhs, |a| op(a, b.clone()))
839        },
840        _ => binary_elementwise_values(lhs, rhs, op),
841    }
842}
843
844pub fn apply_binary_kernel_broadcast<'l, 'r, L, R, O, K, LK, RK>(
845    lhs: &'l ChunkedArray<L>,
846    rhs: &'r ChunkedArray<R>,
847    kernel: K,
848    lhs_broadcast_kernel: LK,
849    rhs_broadcast_kernel: RK,
850) -> ChunkedArray<O>
851where
852    L: PolarsDataType,
853    R: PolarsDataType,
854    O: PolarsDataType,
855    K: Fn(&L::Array, &R::Array) -> O::Array,
856    LK: Fn(L::Physical<'l>, &R::Array) -> O::Array,
857    RK: Fn(&L::Array, R::Physical<'r>) -> O::Array,
858{
859    let name = lhs.name();
860    let out = match (lhs.len(), rhs.len()) {
861        (a, b) if a == b => binary(lhs, rhs, |lhs, rhs| kernel(lhs, rhs)),
862        // broadcast right path
863        (_, 1) => {
864            let opt_rhs = rhs.get(0);
865            match opt_rhs {
866                None => {
867                    let arr = O::Array::full_null(
868                        lhs.len(),
869                        O::get_static_dtype().to_arrow(CompatLevel::newest()),
870                    );
871                    ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
872                },
873                Some(rhs) => unary_kernel(lhs, |arr| rhs_broadcast_kernel(arr, rhs.clone())),
874            }
875        },
876        (1, _) => {
877            let opt_lhs = lhs.get(0);
878            match opt_lhs {
879                None => {
880                    let arr = O::Array::full_null(
881                        rhs.len(),
882                        O::get_static_dtype().to_arrow(CompatLevel::newest()),
883                    );
884                    ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
885                },
886                Some(lhs) => unary_kernel(rhs, |arr| lhs_broadcast_kernel(lhs.clone(), arr)),
887            }
888        },
889        _ => panic!("Cannot apply operation on arrays of different lengths"),
890    };
891    out.with_name(name.clone())
892}
893
894pub fn apply_binary_kernel_broadcast_owned<L, R, O, K, LK, RK>(
895    lhs: ChunkedArray<L>,
896    rhs: ChunkedArray<R>,
897    kernel: K,
898    lhs_broadcast_kernel: LK,
899    rhs_broadcast_kernel: RK,
900) -> ChunkedArray<O>
901where
902    L: PolarsDataType,
903    R: PolarsDataType,
904    O: PolarsDataType,
905    K: Fn(L::Array, R::Array) -> O::Array,
906    for<'a> LK: Fn(L::Physical<'a>, R::Array) -> O::Array,
907    for<'a> RK: Fn(L::Array, R::Physical<'a>) -> O::Array,
908{
909    let name = lhs.name().to_owned();
910    let out = match (lhs.len(), rhs.len()) {
911        (a, b) if a == b => binary_owned(lhs, rhs, kernel),
912        // broadcast right path
913        (_, 1) => {
914            let opt_rhs = rhs.get(0);
915            match opt_rhs {
916                None => {
917                    let arr = O::Array::full_null(
918                        lhs.len(),
919                        O::get_static_dtype().to_arrow(CompatLevel::newest()),
920                    );
921                    ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
922                },
923                Some(rhs) => unary_kernel_owned(lhs, |arr| rhs_broadcast_kernel(arr, rhs.clone())),
924            }
925        },
926        (1, _) => {
927            let opt_lhs = lhs.get(0);
928            match opt_lhs {
929                None => {
930                    let arr = O::Array::full_null(
931                        rhs.len(),
932                        O::get_static_dtype().to_arrow(CompatLevel::newest()),
933                    );
934                    ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
935                },
936                Some(lhs) => unary_kernel_owned(rhs, |arr| lhs_broadcast_kernel(lhs.clone(), arr)),
937            }
938        },
939        _ => panic!("Cannot apply operation on arrays of different lengths"),
940    };
941    out.with_name(name)
942}