polars_core/chunked_array/array/
iterator.rs

1use std::ptr::NonNull;
2
3use super::*;
4use crate::chunked_array::list::iterator::AmortizedListIter;
5use crate::series::amortized_iter::{AmortSeries, ArrayBox, unstable_series_container_and_ptr};
6
7impl ArrayChunked {
8    /// This is an iterator over a [`ArrayChunked`] that save allocations.
9    /// A Series is:
10    ///     1. [`Arc<ChunkedArray>`]
11    ///     ChunkedArray is:
12    ///         2. Vec< 3. ArrayRef>
13    ///
14    /// The [`ArrayRef`] we indicated with 3. will be updated during iteration.
15    /// The Series will be pinned in memory, saving an allocation for
16    /// 1. Arc<..>
17    /// 2. Vec<...>
18    ///
19    /// # Warning
20    /// Though memory safe in the sense that it will not read unowned memory, UB, or memory leaks
21    /// this function still needs precautions. The returned should never be cloned or taken longer
22    /// than a single iteration, as every call on `next` of the iterator will change the contents of
23    /// that Series.
24    ///
25    /// # Safety
26    /// The lifetime of [AmortSeries] is bound to the iterator. Keeping it alive
27    /// longer than the iterator is UB.
28    pub fn amortized_iter(&self) -> AmortizedListIter<impl Iterator<Item = Option<ArrayBox>> + '_> {
29        self.amortized_iter_with_name(PlSmallStr::EMPTY)
30    }
31
32    /// This is an iterator over a [`ArrayChunked`] that save allocations.
33    /// A Series is:
34    ///     1. [`Arc<ChunkedArray>`]
35    ///     ChunkedArray is:
36    ///         2. Vec< 3. ArrayRef>
37    ///
38    /// The ArrayRef we indicated with 3. will be updated during iteration.
39    /// The Series will be pinned in memory, saving an allocation for
40    /// 1. Arc<..>
41    /// 2. Vec<...>
42    ///
43    /// If the returned `AmortSeries` is cloned, the local copy will be replaced and a new container
44    /// will be set.
45    pub fn amortized_iter_with_name(
46        &self,
47        name: PlSmallStr,
48    ) -> AmortizedListIter<impl Iterator<Item = Option<ArrayBox>> + '_> {
49        // we create the series container from the inner array
50        // so that the container has the proper dtype.
51        let arr = self.downcast_iter().next().unwrap();
52        let inner_values = arr.values();
53
54        let inner_dtype = self.inner_dtype();
55        let iter_dtype = match inner_dtype {
56            #[cfg(feature = "dtype-struct")]
57            DataType::Struct(_) => inner_dtype.to_physical(),
58            // TODO: figure out how to deal with physical/logical distinction
59            // physical primitives like time, date etc. work
60            // physical nested need more
61            _ => inner_dtype.clone(),
62        };
63
64        // SAFETY:
65        // inner type passed as physical type
66        let (s, ptr) =
67            unsafe { unstable_series_container_and_ptr(name, inner_values.clone(), &iter_dtype) };
68
69        // SAFETY: `ptr` belongs to the `Series`.
70        unsafe {
71            AmortizedListIter::new(
72                self.len(),
73                s,
74                NonNull::new(ptr).unwrap(),
75                self.downcast_iter().flat_map(|arr| arr.iter()),
76                inner_dtype.clone(),
77            )
78        }
79    }
80
81    pub fn try_apply_amortized_to_list<F>(&self, mut f: F) -> PolarsResult<ListChunked>
82    where
83        F: FnMut(AmortSeries) -> PolarsResult<Series>,
84    {
85        if self.is_empty() {
86            return Ok(Series::new_empty(
87                self.name().clone(),
88                &DataType::List(Box::new(self.inner_dtype().clone())),
89            )
90            .list()
91            .unwrap()
92            .clone());
93        }
94        let mut fast_explode = self.null_count() == 0;
95        let mut ca: ListChunked = {
96            self.amortized_iter()
97                .map(|opt_v| {
98                    opt_v
99                        .map(|v| {
100                            let out = f(v);
101                            if let Ok(out) = &out {
102                                if out.is_empty() {
103                                    fast_explode = false
104                                }
105                            };
106                            out
107                        })
108                        .transpose()
109                })
110                .collect::<PolarsResult<_>>()?
111        };
112        ca.rename(self.name().clone());
113        if fast_explode {
114            ca.set_fast_explode();
115        }
116        Ok(ca)
117    }
118
119    /// Apply a closure `F` to each array.
120    ///
121    /// # Safety
122    /// Return series of `F` must has the same dtype and number of elements as input.
123    #[must_use]
124    pub unsafe fn apply_amortized_same_type<F>(&self, mut f: F) -> Self
125    where
126        F: FnMut(AmortSeries) -> Series,
127    {
128        if self.is_empty() {
129            return self.clone();
130        }
131        self.amortized_iter()
132            .map(|opt_v| {
133                opt_v.map(|v| {
134                    let out = f(v);
135                    to_arr(&out)
136                })
137            })
138            .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
139    }
140
141    /// Try apply a closure `F` to each array.
142    ///
143    /// # Safety
144    /// Return series of `F` must has the same dtype and number of elements as input if it is Ok.
145    pub unsafe fn try_apply_amortized_same_type<F>(&self, mut f: F) -> PolarsResult<Self>
146    where
147        F: FnMut(AmortSeries) -> PolarsResult<Series>,
148    {
149        if self.is_empty() {
150            return Ok(self.clone());
151        }
152        self.amortized_iter()
153            .map(|opt_v| {
154                opt_v
155                    .map(|v| {
156                        let out = f(v)?;
157                        Ok(to_arr(&out))
158                    })
159                    .transpose()
160            })
161            .try_collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
162    }
163
164    /// Zip with a `ChunkedArray` then apply a binary function `F` elementwise.
165    ///
166    /// # Safety
167    //  Return series of `F` must has the same dtype and number of elements as input series.
168    #[must_use]
169    pub unsafe fn zip_and_apply_amortized_same_type<'a, T, F>(
170        &'a self,
171        ca: &'a ChunkedArray<T>,
172        mut f: F,
173    ) -> Self
174    where
175        T: PolarsDataType,
176        F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> Option<Series>,
177    {
178        if self.is_empty() {
179            return self.clone();
180        }
181        self.amortized_iter()
182            .zip(ca.iter())
183            .map(|(opt_s, opt_v)| {
184                let out = f(opt_s, opt_v);
185                out.map(|s| to_arr(&s))
186            })
187            .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
188    }
189
190    /// Apply a closure `F` elementwise.
191    #[must_use]
192    pub fn apply_amortized_generic<F, K, V>(&self, f: F) -> ChunkedArray<V>
193    where
194        V: PolarsDataType,
195        F: FnMut(Option<AmortSeries>) -> Option<K> + Copy,
196        V::Array: ArrayFromIter<Option<K>>,
197    {
198        self.amortized_iter().map(f).collect_ca(self.name().clone())
199    }
200
201    /// Try apply a closure `F` elementwise.
202    pub fn try_apply_amortized_generic<F, K, V>(&self, f: F) -> PolarsResult<ChunkedArray<V>>
203    where
204        V: PolarsDataType,
205        F: FnMut(Option<AmortSeries>) -> PolarsResult<Option<K>> + Copy,
206        V::Array: ArrayFromIter<Option<K>>,
207    {
208        {
209            self.amortized_iter()
210                .map(f)
211                .try_collect_ca(self.name().clone())
212        }
213    }
214
215    pub fn for_each_amortized<F>(&self, f: F)
216    where
217        F: FnMut(Option<AmortSeries>),
218    {
219        self.amortized_iter().for_each(f)
220    }
221}
222
223fn to_arr(s: &Series) -> ArrayRef {
224    if s.chunks().len() > 1 {
225        let s = s.rechunk();
226        s.chunks()[0].clone()
227    } else {
228        s.chunks()[0].clone()
229    }
230}