polars_core/chunked_array/ops/
apply.rs

1//! Implementations of the ChunkApply Trait.
2#![allow(unsafe_op_in_unsafe_fn)]
3use std::borrow::Cow;
4
5use crate::chunked_array::arity::{unary_elementwise, unary_elementwise_values};
6use crate::chunked_array::cast::CastOptions;
7use crate::prelude::*;
8use crate::series::IsSorted;
9
10impl<T> ChunkedArray<T>
11where
12    T: PolarsDataType,
13{
14    /// Applies a function only to the non-null elements, propagating nulls.
15    pub fn apply_nonnull_values_generic<'a, U, K, F>(
16        &'a self,
17        dtype: DataType,
18        mut op: F,
19    ) -> ChunkedArray<U>
20    where
21        U: PolarsDataType,
22        F: FnMut(T::Physical<'a>) -> K,
23        U::Array: ArrayFromIterDtype<K> + ArrayFromIterDtype<Option<K>>,
24    {
25        let iter = self.downcast_iter().map(|arr| {
26            if arr.null_count() == 0 {
27                let out: U::Array = arr
28                    .values_iter()
29                    .map(&mut op)
30                    .collect_arr_with_dtype(dtype.to_arrow(CompatLevel::newest()));
31                out.with_validity_typed(arr.validity().cloned())
32            } else {
33                let out: U::Array = arr
34                    .iter()
35                    .map(|opt| opt.map(&mut op))
36                    .collect_arr_with_dtype(dtype.to_arrow(CompatLevel::newest()));
37                out.with_validity_typed(arr.validity().cloned())
38            }
39        });
40
41        ChunkedArray::from_chunk_iter(self.name().clone(), iter)
42    }
43
44    /// Applies a function only to the non-null elements, propagating nulls.
45    pub fn try_apply_nonnull_values_generic<'a, U, K, F, E>(
46        &'a self,
47        mut op: F,
48    ) -> Result<ChunkedArray<U>, E>
49    where
50        U: PolarsDataType,
51        F: FnMut(T::Physical<'a>) -> Result<K, E>,
52        U::Array: ArrayFromIter<K> + ArrayFromIter<Option<K>>,
53    {
54        let iter = self.downcast_iter().map(|arr| {
55            let arr = if arr.null_count() == 0 {
56                let out: U::Array = arr.values_iter().map(&mut op).try_collect_arr()?;
57                out.with_validity_typed(arr.validity().cloned())
58            } else {
59                let out: U::Array = arr
60                    .iter()
61                    .map(|opt| opt.map(&mut op).transpose())
62                    .try_collect_arr()?;
63                out.with_validity_typed(arr.validity().cloned())
64            };
65            Ok(arr)
66        });
67
68        ChunkedArray::try_from_chunk_iter(self.name().clone(), iter)
69    }
70
71    pub fn apply_into_string_amortized<'a, F>(&'a self, mut f: F) -> StringChunked
72    where
73        F: FnMut(T::Physical<'a>, &mut String),
74    {
75        let mut buf = String::new();
76        let chunks = self
77            .downcast_iter()
78            .map(|arr| {
79                let mut mutarr = MutablePlString::with_capacity(arr.len());
80                arr.iter().for_each(|opt| match opt {
81                    None => mutarr.push_null(),
82                    Some(v) => {
83                        buf.clear();
84                        f(v, &mut buf);
85                        mutarr.push_value(&buf)
86                    },
87                });
88                mutarr.freeze()
89            })
90            .collect::<Vec<_>>();
91        ChunkedArray::from_chunk_iter(self.name().clone(), chunks)
92    }
93
94    pub fn try_apply_into_string_amortized<'a, F, E>(&'a self, mut f: F) -> Result<StringChunked, E>
95    where
96        F: FnMut(T::Physical<'a>, &mut String) -> Result<(), E>,
97    {
98        let mut buf = String::new();
99        let chunks = self
100            .downcast_iter()
101            .map(|arr| {
102                let mut mutarr = MutablePlString::with_capacity(arr.len());
103                for opt in arr.iter() {
104                    match opt {
105                        None => mutarr.push_null(),
106                        Some(v) => {
107                            buf.clear();
108                            f(v, &mut buf)?;
109                            mutarr.push_value(&buf)
110                        },
111                    };
112                }
113                Ok(mutarr.freeze())
114            })
115            .collect::<Vec<_>>();
116        ChunkedArray::try_from_chunk_iter(self.name().clone(), chunks)
117    }
118}
119
120fn apply_in_place_impl<S, F>(name: PlSmallStr, chunks: Vec<ArrayRef>, f: F) -> ChunkedArray<S>
121where
122    F: Fn(S::Native) -> S::Native + Copy,
123    S: PolarsNumericType,
124{
125    use arrow::Either::*;
126    let chunks = chunks.into_iter().map(|arr| {
127        let owned_arr = arr
128            .as_any()
129            .downcast_ref::<PrimitiveArray<S::Native>>()
130            .unwrap()
131            .clone();
132        // Make sure we have a single ref count coming in.
133        drop(arr);
134
135        let compute_immutable = |arr: &PrimitiveArray<S::Native>| {
136            arrow::compute::arity::unary(
137                arr,
138                f,
139                S::get_static_dtype().to_arrow(CompatLevel::newest()),
140            )
141        };
142
143        if owned_arr.values().is_sliced() {
144            compute_immutable(&owned_arr)
145        } else {
146            match owned_arr.into_mut() {
147                Left(immutable) => compute_immutable(&immutable),
148                Right(mut mutable) => {
149                    let vals = mutable.values_mut_slice();
150                    vals.iter_mut().for_each(|v| *v = f(*v));
151                    mutable.into()
152                },
153            }
154        }
155    });
156
157    ChunkedArray::from_chunk_iter(name, chunks)
158}
159
160impl<T: PolarsNumericType> ChunkedArray<T> {
161    /// Cast a numeric array to another numeric data type and apply a function in place.
162    /// This saves an allocation.
163    pub fn cast_and_apply_in_place<F, S>(&self, f: F) -> ChunkedArray<S>
164    where
165        F: Fn(S::Native) -> S::Native + Copy,
166        S: PolarsNumericType,
167    {
168        // if we cast, we create a new arrow buffer
169        // then we clone the arrays and drop the cast arrays
170        // this will ensure we have a single ref count
171        // and we can mutate in place
172        let chunks = {
173            let s = self
174                .cast_with_options(&S::get_static_dtype(), CastOptions::Overflowing)
175                .unwrap();
176            s.chunks().clone()
177        };
178        apply_in_place_impl(self.name().clone(), chunks, f)
179    }
180
181    /// Cast a numeric array to another numeric data type and apply a function in place.
182    /// This saves an allocation.
183    pub fn apply_in_place<F>(mut self, f: F) -> Self
184    where
185        F: Fn(T::Native) -> T::Native + Copy,
186    {
187        let chunks = std::mem::take(&mut self.chunks);
188        apply_in_place_impl(self.name().clone(), chunks, f)
189    }
190}
191
192impl<T: PolarsNumericType> ChunkedArray<T> {
193    pub fn apply_mut<F>(&mut self, f: F)
194    where
195        F: Fn(T::Native) -> T::Native + Copy,
196    {
197        // SAFETY, we do no t change the lengths
198        unsafe {
199            self.downcast_iter_mut()
200                .for_each(|arr| arrow::compute::arity_assign::unary(arr, f))
201        };
202        // can be in any order now
203        self.compute_len();
204        self.set_sorted_flag(IsSorted::Not);
205    }
206}
207
208impl<'a, T> ChunkApply<'a, T::Native> for ChunkedArray<T>
209where
210    T: PolarsNumericType,
211{
212    type FuncRet = T::Native;
213
214    fn apply_values<F>(&'a self, f: F) -> Self
215    where
216        F: Fn(T::Native) -> T::Native + Copy,
217    {
218        let chunks = self
219            .data_views()
220            .zip(self.iter_validities())
221            .map(|(slice, validity)| {
222                let arr: T::Array = slice.iter().copied().map(f).collect_arr();
223                arr.with_validity(validity.cloned())
224            });
225        ChunkedArray::from_chunk_iter(self.name().clone(), chunks)
226    }
227
228    fn apply<F>(&'a self, f: F) -> Self
229    where
230        F: Fn(Option<T::Native>) -> Option<T::Native> + Copy,
231    {
232        let chunks = self.downcast_iter().map(|arr| {
233            let iter = arr.into_iter().map(|opt_v| f(opt_v.copied()));
234            PrimitiveArray::<T::Native>::from_trusted_len_iter(iter)
235        });
236        Self::from_chunk_iter(self.name().clone(), chunks)
237    }
238
239    fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
240    where
241        F: Fn(Option<T::Native>, &V) -> V,
242    {
243        assert!(slice.len() >= self.len());
244
245        let mut idx = 0;
246        self.downcast_iter().for_each(|arr| {
247            arr.into_iter().for_each(|opt_val| {
248                // SAFETY:
249                // length asserted above
250                let item = unsafe { slice.get_unchecked_mut(idx) };
251                *item = f(opt_val.copied(), item);
252                idx += 1;
253            })
254        });
255    }
256}
257
258impl<'a> ChunkApply<'a, bool> for BooleanChunked {
259    type FuncRet = bool;
260
261    fn apply_values<F>(&self, f: F) -> Self
262    where
263        F: Fn(bool) -> bool + Copy,
264    {
265        // Can just fully deduce behavior from two invocations.
266        match (f(false), f(true)) {
267            (false, false) => self.apply_kernel(&|arr| {
268                Box::new(
269                    BooleanArray::full(arr.len(), false, ArrowDataType::Boolean)
270                        .with_validity(arr.validity().cloned()),
271                )
272            }),
273            (false, true) => self.clone(),
274            (true, false) => !self,
275            (true, true) => self.apply_kernel(&|arr| {
276                Box::new(
277                    BooleanArray::full(arr.len(), true, ArrowDataType::Boolean)
278                        .with_validity(arr.validity().cloned()),
279                )
280            }),
281        }
282    }
283
284    fn apply<F>(&'a self, f: F) -> Self
285    where
286        F: Fn(Option<bool>) -> Option<bool> + Copy,
287    {
288        unary_elementwise(self, f)
289    }
290
291    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
292    where
293        F: Fn(Option<bool>, &T) -> T,
294    {
295        assert!(slice.len() >= self.len());
296
297        let mut idx = 0;
298        self.downcast_iter().for_each(|arr| {
299            arr.into_iter().for_each(|opt_val| {
300                // SAFETY:
301                // length asserted above
302                let item = unsafe { slice.get_unchecked_mut(idx) };
303                *item = f(opt_val, item);
304                idx += 1;
305            })
306        });
307    }
308}
309
310impl StringChunked {
311    pub fn apply_mut<'a, F>(&'a self, mut f: F) -> Self
312    where
313        F: FnMut(&'a str) -> &'a str,
314    {
315        let chunks = self.downcast_iter().map(|arr| {
316            let iter = arr.values_iter().map(&mut f);
317            let new = Utf8ViewArray::arr_from_iter(iter);
318            new.with_validity(arr.validity().cloned())
319        });
320        StringChunked::from_chunk_iter(self.name().clone(), chunks)
321    }
322}
323
324impl BinaryChunked {
325    pub fn apply_mut<'a, F>(&'a self, mut f: F) -> Self
326    where
327        F: FnMut(&'a [u8]) -> &'a [u8],
328    {
329        let chunks = self.downcast_iter().map(|arr| {
330            let iter = arr.values_iter().map(&mut f);
331            let new = BinaryViewArray::arr_from_iter(iter);
332            new.with_validity(arr.validity().cloned())
333        });
334        BinaryChunked::from_chunk_iter(self.name().clone(), chunks)
335    }
336}
337
338impl<'a> ChunkApply<'a, &'a str> for StringChunked {
339    type FuncRet = Cow<'a, str>;
340
341    fn apply_values<F>(&'a self, f: F) -> Self
342    where
343        F: Fn(&'a str) -> Cow<'a, str> + Copy,
344    {
345        unary_elementwise_values(self, f)
346    }
347
348    fn apply<F>(&'a self, f: F) -> Self
349    where
350        F: Fn(Option<&'a str>) -> Option<Cow<'a, str>> + Copy,
351    {
352        unary_elementwise(self, f)
353    }
354
355    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
356    where
357        F: Fn(Option<&'a str>, &T) -> T,
358    {
359        assert!(slice.len() >= self.len());
360
361        let mut idx = 0;
362        self.downcast_iter().for_each(|arr| {
363            arr.into_iter().for_each(|opt_val| {
364                // SAFETY:
365                // length asserted above
366                let item = unsafe { slice.get_unchecked_mut(idx) };
367                *item = f(opt_val, item);
368                idx += 1;
369            })
370        });
371    }
372}
373
374impl<'a> ChunkApply<'a, &'a [u8]> for BinaryChunked {
375    type FuncRet = Cow<'a, [u8]>;
376
377    fn apply_values<F>(&'a self, f: F) -> Self
378    where
379        F: Fn(&'a [u8]) -> Cow<'a, [u8]> + Copy,
380    {
381        unary_elementwise_values(self, f)
382    }
383
384    fn apply<F>(&'a self, f: F) -> Self
385    where
386        F: Fn(Option<&'a [u8]>) -> Option<Cow<'a, [u8]>> + Copy,
387    {
388        unary_elementwise(self, f)
389    }
390
391    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
392    where
393        F: Fn(Option<&'a [u8]>, &T) -> T,
394    {
395        assert!(slice.len() >= self.len());
396
397        let mut idx = 0;
398        self.downcast_iter().for_each(|arr| {
399            arr.into_iter().for_each(|opt_val| {
400                // SAFETY:
401                // length asserted above
402                let item = unsafe { slice.get_unchecked_mut(idx) };
403                *item = f(opt_val, item);
404                idx += 1;
405            })
406        });
407    }
408}
409
410impl ChunkApplyKernel<BooleanArray> for BooleanChunked {
411    fn apply_kernel(&self, f: &dyn Fn(&BooleanArray) -> ArrayRef) -> Self {
412        let chunks = self.downcast_iter().map(f).collect();
413        unsafe { Self::from_chunks(self.name().clone(), chunks) }
414    }
415
416    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&BooleanArray) -> ArrayRef) -> ChunkedArray<S>
417    where
418        S: PolarsDataType,
419    {
420        let chunks = self.downcast_iter().map(f).collect();
421        unsafe { ChunkedArray::<S>::from_chunks(self.name().clone(), chunks) }
422    }
423}
424
425impl<T> ChunkApplyKernel<PrimitiveArray<T::Native>> for ChunkedArray<T>
426where
427    T: PolarsNumericType,
428{
429    fn apply_kernel(&self, f: &dyn Fn(&PrimitiveArray<T::Native>) -> ArrayRef) -> Self {
430        self.apply_kernel_cast(&f)
431    }
432    fn apply_kernel_cast<S>(
433        &self,
434        f: &dyn Fn(&PrimitiveArray<T::Native>) -> ArrayRef,
435    ) -> ChunkedArray<S>
436    where
437        S: PolarsDataType,
438    {
439        let chunks = self.downcast_iter().map(f).collect();
440        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
441    }
442}
443
444impl ChunkApplyKernel<Utf8ViewArray> for StringChunked {
445    fn apply_kernel(&self, f: &dyn Fn(&Utf8ViewArray) -> ArrayRef) -> Self {
446        self.apply_kernel_cast(&f)
447    }
448
449    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&Utf8ViewArray) -> ArrayRef) -> ChunkedArray<S>
450    where
451        S: PolarsDataType,
452    {
453        let chunks = self.downcast_iter().map(f).collect();
454        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
455    }
456}
457
458impl ChunkApplyKernel<BinaryViewArray> for BinaryChunked {
459    fn apply_kernel(&self, f: &dyn Fn(&BinaryViewArray) -> ArrayRef) -> Self {
460        self.apply_kernel_cast(&f)
461    }
462
463    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&BinaryViewArray) -> ArrayRef) -> ChunkedArray<S>
464    where
465        S: PolarsDataType,
466    {
467        let chunks = self.downcast_iter().map(f).collect();
468        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
469    }
470}
471
472impl<'a> ChunkApply<'a, Series> for ListChunked {
473    type FuncRet = Series;
474
475    /// Apply a closure `F` elementwise.
476    fn apply_values<F>(&'a self, f: F) -> Self
477    where
478        F: Fn(Series) -> Series + Copy,
479    {
480        if self.is_empty() {
481            return self.clone();
482        }
483        let mut fast_explode = true;
484        let mut function = |s: Series| {
485            let out = f(s);
486            if out.is_empty() {
487                fast_explode = false;
488            }
489            out
490        };
491        let mut ca: ListChunked = {
492            if !self.has_nulls() {
493                self.into_no_null_iter()
494                    .map(&mut function)
495                    .collect_trusted()
496            } else {
497                self.into_iter()
498                    .map(|opt_v| opt_v.map(&mut function))
499                    .collect_trusted()
500            }
501        };
502        if fast_explode {
503            ca.set_fast_explode()
504        }
505        ca
506    }
507
508    fn apply<F>(&'a self, f: F) -> Self
509    where
510        F: Fn(Option<Series>) -> Option<Series> + Copy,
511    {
512        if self.is_empty() {
513            return self.clone();
514        }
515        self.into_iter().map(f).collect_trusted()
516    }
517
518    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
519    where
520        F: Fn(Option<Series>, &T) -> T,
521    {
522        assert!(slice.len() >= self.len());
523
524        let mut idx = 0;
525        self.downcast_iter().for_each(|arr| {
526            arr.iter().for_each(|opt_val| {
527                let opt_val = opt_val
528                    .map(|arrayref| Series::try_from((PlSmallStr::EMPTY, arrayref)).unwrap());
529
530                // SAFETY:
531                // length asserted above
532                let item = unsafe { slice.get_unchecked_mut(idx) };
533                *item = f(opt_val, item);
534                idx += 1;
535            })
536        });
537    }
538}
539
540#[cfg(feature = "object")]
541impl<'a, T> ChunkApply<'a, &'a T> for ObjectChunked<T>
542where
543    T: PolarsObject,
544{
545    type FuncRet = T;
546
547    fn apply_values<F>(&'a self, f: F) -> Self
548    where
549        F: Fn(&'a T) -> T + Copy,
550    {
551        let mut ca: ObjectChunked<T> = self.into_iter().map(|opt_v| opt_v.map(f)).collect();
552        ca.rename(self.name().clone());
553        ca
554    }
555
556    fn apply<F>(&'a self, f: F) -> Self
557    where
558        F: Fn(Option<&'a T>) -> Option<T> + Copy,
559    {
560        let mut ca: ObjectChunked<T> = self.into_iter().map(f).collect();
561        ca.rename(self.name().clone());
562        ca
563    }
564
565    fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
566    where
567        F: Fn(Option<&'a T>, &V) -> V,
568    {
569        assert!(slice.len() >= self.len());
570        let mut idx = 0;
571        self.downcast_iter().for_each(|arr| {
572            arr.into_iter().for_each(|opt_val| {
573                // SAFETY:
574                // length asserted above
575                let item = unsafe { slice.get_unchecked_mut(idx) };
576                *item = f(opt_val, item);
577                idx += 1;
578            })
579        });
580    }
581}
582
583impl StringChunked {
584    /// # Safety
585    /// Update the views. All invariants of the views apply.
586    pub unsafe fn apply_views<F: FnMut(View, &str) -> View + Copy>(&self, update_view: F) -> Self {
587        let mut out = self.clone();
588        for arr in out.downcast_iter_mut() {
589            *arr = arr.apply_views(update_view);
590        }
591        out
592    }
593}