polars_core/chunked_array/ops/
apply.rs

1//! Implementations of the ChunkApply Trait.
2#![allow(unsafe_op_in_unsafe_fn)]
3use std::borrow::Cow;
4
5use crate::chunked_array::arity::{unary_elementwise, unary_elementwise_values};
6use crate::chunked_array::cast::CastOptions;
7use crate::prelude::*;
8use crate::series::IsSorted;
9
10impl<T> ChunkedArray<T>
11where
12    T: PolarsDataType,
13{
14    /// Applies a function only to the non-null elements, propagating nulls.
15    pub fn apply_nonnull_values_generic<'a, U, K, F>(
16        &'a self,
17        dtype: DataType,
18        mut op: F,
19    ) -> ChunkedArray<U>
20    where
21        U: PolarsDataType,
22        F: FnMut(T::Physical<'a>) -> K,
23        U::Array: ArrayFromIterDtype<K> + ArrayFromIterDtype<Option<K>>,
24    {
25        let iter = self.downcast_iter().map(|arr| {
26            if arr.null_count() == 0 {
27                let out: U::Array = arr
28                    .values_iter()
29                    .map(&mut op)
30                    .collect_arr_with_dtype(dtype.to_arrow(CompatLevel::newest()));
31                out.with_validity_typed(arr.validity().cloned())
32            } else {
33                let out: U::Array = arr
34                    .iter()
35                    .map(|opt| opt.map(&mut op))
36                    .collect_arr_with_dtype(dtype.to_arrow(CompatLevel::newest()));
37                out.with_validity_typed(arr.validity().cloned())
38            }
39        });
40
41        ChunkedArray::from_chunk_iter(self.name().clone(), iter)
42    }
43
44    /// Applies a function only to the non-null elements, propagating nulls.
45    pub fn try_apply_nonnull_values_generic<'a, U, K, F, E>(
46        &'a self,
47        mut op: F,
48    ) -> Result<ChunkedArray<U>, E>
49    where
50        U: PolarsDataType,
51        F: FnMut(T::Physical<'a>) -> Result<K, E>,
52        U::Array: ArrayFromIter<K> + ArrayFromIter<Option<K>>,
53    {
54        let iter = self.downcast_iter().map(|arr| {
55            let arr = if arr.null_count() == 0 {
56                let out: U::Array = arr.values_iter().map(&mut op).try_collect_arr()?;
57                out.with_validity_typed(arr.validity().cloned())
58            } else {
59                let out: U::Array = arr
60                    .iter()
61                    .map(|opt| opt.map(&mut op).transpose())
62                    .try_collect_arr()?;
63                out.with_validity_typed(arr.validity().cloned())
64            };
65            Ok(arr)
66        });
67
68        ChunkedArray::try_from_chunk_iter(self.name().clone(), iter)
69    }
70
71    pub fn apply_into_string_amortized<'a, F>(&'a self, mut f: F) -> StringChunked
72    where
73        F: FnMut(T::Physical<'a>, &mut String),
74    {
75        let mut buf = String::new();
76        let chunks = self
77            .downcast_iter()
78            .map(|arr| {
79                let mut mutarr = MutablePlString::with_capacity(arr.len());
80                arr.iter().for_each(|opt| match opt {
81                    None => mutarr.push_null(),
82                    Some(v) => {
83                        buf.clear();
84                        f(v, &mut buf);
85                        mutarr.push_value(&buf)
86                    },
87                });
88                mutarr.freeze()
89            })
90            .collect::<Vec<_>>();
91        ChunkedArray::from_chunk_iter(self.name().clone(), chunks)
92    }
93
94    pub fn try_apply_into_string_amortized<'a, F, E>(&'a self, mut f: F) -> Result<StringChunked, E>
95    where
96        F: FnMut(T::Physical<'a>, &mut String) -> Result<(), E>,
97    {
98        let mut buf = String::new();
99        let chunks = self
100            .downcast_iter()
101            .map(|arr| {
102                let mut mutarr = MutablePlString::with_capacity(arr.len());
103                for opt in arr.iter() {
104                    match opt {
105                        None => mutarr.push_null(),
106                        Some(v) => {
107                            buf.clear();
108                            f(v, &mut buf)?;
109                            mutarr.push_value(&buf)
110                        },
111                    };
112                }
113                Ok(mutarr.freeze())
114            })
115            .collect::<Vec<_>>();
116        ChunkedArray::try_from_chunk_iter(self.name().clone(), chunks)
117    }
118}
119
120fn apply_in_place_impl<S, F>(name: PlSmallStr, chunks: Vec<ArrayRef>, f: F) -> ChunkedArray<S>
121where
122    F: Fn(S::Native) -> S::Native + Copy,
123    S: PolarsNumericType,
124{
125    use arrow::Either::*;
126    let chunks = chunks.into_iter().map(|arr| {
127        let owned_arr = arr
128            .as_any()
129            .downcast_ref::<PrimitiveArray<S::Native>>()
130            .unwrap()
131            .clone();
132        // Make sure we have a single ref count coming in.
133        drop(arr);
134
135        let compute_immutable = |arr: &PrimitiveArray<S::Native>| {
136            arrow::compute::arity::unary(arr, f, S::get_dtype().to_arrow(CompatLevel::newest()))
137        };
138
139        if owned_arr.values().is_sliced() {
140            compute_immutable(&owned_arr)
141        } else {
142            match owned_arr.into_mut() {
143                Left(immutable) => compute_immutable(&immutable),
144                Right(mut mutable) => {
145                    let vals = mutable.values_mut_slice();
146                    vals.iter_mut().for_each(|v| *v = f(*v));
147                    mutable.into()
148                },
149            }
150        }
151    });
152
153    ChunkedArray::from_chunk_iter(name, chunks)
154}
155
156impl<T: PolarsNumericType> ChunkedArray<T> {
157    /// Cast a numeric array to another numeric data type and apply a function in place.
158    /// This saves an allocation.
159    pub fn cast_and_apply_in_place<F, S>(&self, f: F) -> ChunkedArray<S>
160    where
161        F: Fn(S::Native) -> S::Native + Copy,
162        S: PolarsNumericType,
163    {
164        // if we cast, we create a new arrow buffer
165        // then we clone the arrays and drop the cast arrays
166        // this will ensure we have a single ref count
167        // and we can mutate in place
168        let chunks = {
169            let s = self
170                .cast_with_options(&S::get_dtype(), CastOptions::Overflowing)
171                .unwrap();
172            s.chunks().clone()
173        };
174        apply_in_place_impl(self.name().clone(), chunks, f)
175    }
176
177    /// Cast a numeric array to another numeric data type and apply a function in place.
178    /// This saves an allocation.
179    pub fn apply_in_place<F>(mut self, f: F) -> Self
180    where
181        F: Fn(T::Native) -> T::Native + Copy,
182    {
183        let chunks = std::mem::take(&mut self.chunks);
184        apply_in_place_impl(self.name().clone(), chunks, f)
185    }
186}
187
188impl<T: PolarsNumericType> ChunkedArray<T> {
189    pub fn apply_mut<F>(&mut self, f: F)
190    where
191        F: Fn(T::Native) -> T::Native + Copy,
192    {
193        // SAFETY, we do no t change the lengths
194        unsafe {
195            self.downcast_iter_mut()
196                .for_each(|arr| arrow::compute::arity_assign::unary(arr, f))
197        };
198        // can be in any order now
199        self.compute_len();
200        self.set_sorted_flag(IsSorted::Not);
201    }
202}
203
204impl<'a, T> ChunkApply<'a, T::Native> for ChunkedArray<T>
205where
206    T: PolarsNumericType,
207{
208    type FuncRet = T::Native;
209
210    fn apply_values<F>(&'a self, f: F) -> Self
211    where
212        F: Fn(T::Native) -> T::Native + Copy,
213    {
214        let chunks = self
215            .data_views()
216            .zip(self.iter_validities())
217            .map(|(slice, validity)| {
218                let arr: T::Array = slice.iter().copied().map(f).collect_arr();
219                arr.with_validity(validity.cloned())
220            });
221        ChunkedArray::from_chunk_iter(self.name().clone(), chunks)
222    }
223
224    fn apply<F>(&'a self, f: F) -> Self
225    where
226        F: Fn(Option<T::Native>) -> Option<T::Native> + Copy,
227    {
228        let chunks = self.downcast_iter().map(|arr| {
229            let iter = arr.into_iter().map(|opt_v| f(opt_v.copied()));
230            PrimitiveArray::<T::Native>::from_trusted_len_iter(iter)
231        });
232        Self::from_chunk_iter(self.name().clone(), chunks)
233    }
234
235    fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
236    where
237        F: Fn(Option<T::Native>, &V) -> V,
238    {
239        assert!(slice.len() >= self.len());
240
241        let mut idx = 0;
242        self.downcast_iter().for_each(|arr| {
243            arr.into_iter().for_each(|opt_val| {
244                // SAFETY:
245                // length asserted above
246                let item = unsafe { slice.get_unchecked_mut(idx) };
247                *item = f(opt_val.copied(), item);
248                idx += 1;
249            })
250        });
251    }
252}
253
254impl<'a> ChunkApply<'a, bool> for BooleanChunked {
255    type FuncRet = bool;
256
257    fn apply_values<F>(&self, f: F) -> Self
258    where
259        F: Fn(bool) -> bool + Copy,
260    {
261        // Can just fully deduce behavior from two invocations.
262        match (f(false), f(true)) {
263            (false, false) => self.apply_kernel(&|arr| {
264                Box::new(
265                    BooleanArray::full(arr.len(), false, ArrowDataType::Boolean)
266                        .with_validity(arr.validity().cloned()),
267                )
268            }),
269            (false, true) => self.clone(),
270            (true, false) => !self,
271            (true, true) => self.apply_kernel(&|arr| {
272                Box::new(
273                    BooleanArray::full(arr.len(), true, ArrowDataType::Boolean)
274                        .with_validity(arr.validity().cloned()),
275                )
276            }),
277        }
278    }
279
280    fn apply<F>(&'a self, f: F) -> Self
281    where
282        F: Fn(Option<bool>) -> Option<bool> + Copy,
283    {
284        unary_elementwise(self, f)
285    }
286
287    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
288    where
289        F: Fn(Option<bool>, &T) -> T,
290    {
291        assert!(slice.len() >= self.len());
292
293        let mut idx = 0;
294        self.downcast_iter().for_each(|arr| {
295            arr.into_iter().for_each(|opt_val| {
296                // SAFETY:
297                // length asserted above
298                let item = unsafe { slice.get_unchecked_mut(idx) };
299                *item = f(opt_val, item);
300                idx += 1;
301            })
302        });
303    }
304}
305
306impl StringChunked {
307    pub fn apply_mut<'a, F>(&'a self, mut f: F) -> Self
308    where
309        F: FnMut(&'a str) -> &'a str,
310    {
311        let chunks = self.downcast_iter().map(|arr| {
312            let iter = arr.values_iter().map(&mut f);
313            let new = Utf8ViewArray::arr_from_iter(iter);
314            new.with_validity(arr.validity().cloned())
315        });
316        StringChunked::from_chunk_iter(self.name().clone(), chunks)
317    }
318}
319
320impl BinaryChunked {
321    pub fn apply_mut<'a, F>(&'a self, mut f: F) -> Self
322    where
323        F: FnMut(&'a [u8]) -> &'a [u8],
324    {
325        let chunks = self.downcast_iter().map(|arr| {
326            let iter = arr.values_iter().map(&mut f);
327            let new = BinaryViewArray::arr_from_iter(iter);
328            new.with_validity(arr.validity().cloned())
329        });
330        BinaryChunked::from_chunk_iter(self.name().clone(), chunks)
331    }
332}
333
334impl<'a> ChunkApply<'a, &'a str> for StringChunked {
335    type FuncRet = Cow<'a, str>;
336
337    fn apply_values<F>(&'a self, f: F) -> Self
338    where
339        F: Fn(&'a str) -> Cow<'a, str> + Copy,
340    {
341        unary_elementwise_values(self, f)
342    }
343
344    fn apply<F>(&'a self, f: F) -> Self
345    where
346        F: Fn(Option<&'a str>) -> Option<Cow<'a, str>> + Copy,
347    {
348        unary_elementwise(self, f)
349    }
350
351    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
352    where
353        F: Fn(Option<&'a str>, &T) -> T,
354    {
355        assert!(slice.len() >= self.len());
356
357        let mut idx = 0;
358        self.downcast_iter().for_each(|arr| {
359            arr.into_iter().for_each(|opt_val| {
360                // SAFETY:
361                // length asserted above
362                let item = unsafe { slice.get_unchecked_mut(idx) };
363                *item = f(opt_val, item);
364                idx += 1;
365            })
366        });
367    }
368}
369
370impl<'a> ChunkApply<'a, &'a [u8]> for BinaryChunked {
371    type FuncRet = Cow<'a, [u8]>;
372
373    fn apply_values<F>(&'a self, f: F) -> Self
374    where
375        F: Fn(&'a [u8]) -> Cow<'a, [u8]> + Copy,
376    {
377        unary_elementwise_values(self, f)
378    }
379
380    fn apply<F>(&'a self, f: F) -> Self
381    where
382        F: Fn(Option<&'a [u8]>) -> Option<Cow<'a, [u8]>> + Copy,
383    {
384        unary_elementwise(self, f)
385    }
386
387    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
388    where
389        F: Fn(Option<&'a [u8]>, &T) -> T,
390    {
391        assert!(slice.len() >= self.len());
392
393        let mut idx = 0;
394        self.downcast_iter().for_each(|arr| {
395            arr.into_iter().for_each(|opt_val| {
396                // SAFETY:
397                // length asserted above
398                let item = unsafe { slice.get_unchecked_mut(idx) };
399                *item = f(opt_val, item);
400                idx += 1;
401            })
402        });
403    }
404}
405
406impl ChunkApplyKernel<BooleanArray> for BooleanChunked {
407    fn apply_kernel(&self, f: &dyn Fn(&BooleanArray) -> ArrayRef) -> Self {
408        let chunks = self.downcast_iter().map(f).collect();
409        unsafe { Self::from_chunks(self.name().clone(), chunks) }
410    }
411
412    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&BooleanArray) -> ArrayRef) -> ChunkedArray<S>
413    where
414        S: PolarsDataType,
415    {
416        let chunks = self.downcast_iter().map(f).collect();
417        unsafe { ChunkedArray::<S>::from_chunks(self.name().clone(), chunks) }
418    }
419}
420
421impl<T> ChunkApplyKernel<PrimitiveArray<T::Native>> for ChunkedArray<T>
422where
423    T: PolarsNumericType,
424{
425    fn apply_kernel(&self, f: &dyn Fn(&PrimitiveArray<T::Native>) -> ArrayRef) -> Self {
426        self.apply_kernel_cast(&f)
427    }
428    fn apply_kernel_cast<S>(
429        &self,
430        f: &dyn Fn(&PrimitiveArray<T::Native>) -> ArrayRef,
431    ) -> ChunkedArray<S>
432    where
433        S: PolarsDataType,
434    {
435        let chunks = self.downcast_iter().map(f).collect();
436        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
437    }
438}
439
440impl ChunkApplyKernel<Utf8ViewArray> for StringChunked {
441    fn apply_kernel(&self, f: &dyn Fn(&Utf8ViewArray) -> ArrayRef) -> Self {
442        self.apply_kernel_cast(&f)
443    }
444
445    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&Utf8ViewArray) -> ArrayRef) -> ChunkedArray<S>
446    where
447        S: PolarsDataType,
448    {
449        let chunks = self.downcast_iter().map(f).collect();
450        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
451    }
452}
453
454impl ChunkApplyKernel<BinaryViewArray> for BinaryChunked {
455    fn apply_kernel(&self, f: &dyn Fn(&BinaryViewArray) -> ArrayRef) -> Self {
456        self.apply_kernel_cast(&f)
457    }
458
459    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&BinaryViewArray) -> ArrayRef) -> ChunkedArray<S>
460    where
461        S: PolarsDataType,
462    {
463        let chunks = self.downcast_iter().map(f).collect();
464        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
465    }
466}
467
468impl<'a> ChunkApply<'a, Series> for ListChunked {
469    type FuncRet = Series;
470
471    /// Apply a closure `F` elementwise.
472    fn apply_values<F>(&'a self, f: F) -> Self
473    where
474        F: Fn(Series) -> Series + Copy,
475    {
476        if self.is_empty() {
477            return self.clone();
478        }
479        let mut fast_explode = true;
480        let mut function = |s: Series| {
481            let out = f(s);
482            if out.is_empty() {
483                fast_explode = false;
484            }
485            out
486        };
487        let mut ca: ListChunked = {
488            if !self.has_nulls() {
489                self.into_no_null_iter()
490                    .map(&mut function)
491                    .collect_trusted()
492            } else {
493                self.into_iter()
494                    .map(|opt_v| opt_v.map(&mut function))
495                    .collect_trusted()
496            }
497        };
498        if fast_explode {
499            ca.set_fast_explode()
500        }
501        ca
502    }
503
504    fn apply<F>(&'a self, f: F) -> Self
505    where
506        F: Fn(Option<Series>) -> Option<Series> + Copy,
507    {
508        if self.is_empty() {
509            return self.clone();
510        }
511        self.into_iter().map(f).collect_trusted()
512    }
513
514    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
515    where
516        F: Fn(Option<Series>, &T) -> T,
517    {
518        assert!(slice.len() >= self.len());
519
520        let mut idx = 0;
521        self.downcast_iter().for_each(|arr| {
522            arr.iter().for_each(|opt_val| {
523                let opt_val = opt_val
524                    .map(|arrayref| Series::try_from((PlSmallStr::EMPTY, arrayref)).unwrap());
525
526                // SAFETY:
527                // length asserted above
528                let item = unsafe { slice.get_unchecked_mut(idx) };
529                *item = f(opt_val, item);
530                idx += 1;
531            })
532        });
533    }
534}
535
536#[cfg(feature = "object")]
537impl<'a, T> ChunkApply<'a, &'a T> for ObjectChunked<T>
538where
539    T: PolarsObject,
540{
541    type FuncRet = T;
542
543    fn apply_values<F>(&'a self, f: F) -> Self
544    where
545        F: Fn(&'a T) -> T + Copy,
546    {
547        let mut ca: ObjectChunked<T> = self.into_iter().map(|opt_v| opt_v.map(f)).collect();
548        ca.rename(self.name().clone());
549        ca
550    }
551
552    fn apply<F>(&'a self, f: F) -> Self
553    where
554        F: Fn(Option<&'a T>) -> Option<T> + Copy,
555    {
556        let mut ca: ObjectChunked<T> = self.into_iter().map(f).collect();
557        ca.rename(self.name().clone());
558        ca
559    }
560
561    fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
562    where
563        F: Fn(Option<&'a T>, &V) -> V,
564    {
565        assert!(slice.len() >= self.len());
566        let mut idx = 0;
567        self.downcast_iter().for_each(|arr| {
568            arr.into_iter().for_each(|opt_val| {
569                // SAFETY:
570                // length asserted above
571                let item = unsafe { slice.get_unchecked_mut(idx) };
572                *item = f(opt_val, item);
573                idx += 1;
574            })
575        });
576    }
577}
578
579impl StringChunked {
580    /// # Safety
581    /// Update the views. All invariants of the views apply.
582    pub unsafe fn apply_views<F: FnMut(View, &str) -> View + Copy>(&self, update_view: F) -> Self {
583        let mut out = self.clone();
584        for arr in out.downcast_iter_mut() {
585            *arr = arr.apply_views(update_view);
586        }
587        out
588    }
589}