polars_core/chunked_array/list/
iterator.rs

1use std::marker::PhantomData;
2use std::ptr::NonNull;
3use std::rc::Rc;
4
5use crate::chunked_array::flags::StatisticsFlags;
6use crate::prelude::*;
7use crate::series::amortized_iter::{AmortSeries, ArrayBox, unstable_series_container_and_ptr};
8
9pub struct AmortizedListIter<'a, I: Iterator<Item = Option<ArrayBox>>> {
10    len: usize,
11    series_container: Rc<Series>,
12    inner: NonNull<ArrayRef>,
13    lifetime: PhantomData<&'a ArrayRef>,
14    iter: I,
15    // used only if feature="dtype-struct"
16    #[allow(dead_code)]
17    inner_dtype: DataType,
18}
19
20impl<I: Iterator<Item = Option<ArrayBox>>> AmortizedListIter<'_, I> {
21    pub(crate) unsafe fn new(
22        len: usize,
23        series_container: Series,
24        inner: NonNull<ArrayRef>,
25        iter: I,
26        inner_dtype: DataType,
27    ) -> Self {
28        Self {
29            len,
30            series_container: Rc::new(series_container),
31            inner,
32            lifetime: PhantomData,
33            iter,
34            inner_dtype,
35        }
36    }
37}
38
39impl<I: Iterator<Item = Option<ArrayBox>>> Iterator for AmortizedListIter<'_, I> {
40    type Item = Option<AmortSeries>;
41
42    fn next(&mut self) -> Option<Self::Item> {
43        self.iter.next().map(|opt_val| {
44            opt_val.map(|array_ref| {
45                #[cfg(feature = "dtype-struct")]
46                // structs arrays are bound to the series not to the arrayref
47                // so we must get a hold to the new array
48                if matches!(self.inner_dtype, DataType::Struct(_)) {
49                    // SAFETY:
50                    // dtype is known
51                    unsafe {
52                        let s = Series::from_chunks_and_dtype_unchecked(
53                            self.series_container.name().clone(),
54                            vec![array_ref],
55                            &self.inner_dtype.to_physical(),
56                        )
57                        .from_physical_unchecked(&self.inner_dtype)
58                        .unwrap();
59                        let inner = Rc::make_mut(&mut self.series_container);
60                        *inner = s;
61
62                        return AmortSeries::new(self.series_container.clone());
63                    }
64                }
65                // The series is cloned, we make a new container.
66                if Arc::strong_count(&self.series_container.0) > 1
67                    || Rc::strong_count(&self.series_container) > 1
68                {
69                    let (s, ptr) = unsafe {
70                        unstable_series_container_and_ptr(
71                            self.series_container.name().clone(),
72                            array_ref,
73                            self.series_container.dtype(),
74                        )
75                    };
76                    self.series_container = Rc::new(s);
77                    self.inner = NonNull::new(ptr).unwrap();
78                } else {
79                    // SAFETY: we checked the RC above;
80                    let series_mut =
81                        unsafe { Rc::get_mut(&mut self.series_container).unwrap_unchecked() };
82                    // update the inner state
83                    unsafe { *self.inner.as_mut() = array_ref };
84
85                    // As an optimization, we try to minimize how many calls to
86                    // _get_inner_mut() we do.
87                    let series_mut_inner = series_mut._get_inner_mut();
88                    // last iteration could have set the sorted flag (e.g. in compute_len)
89                    series_mut_inner._set_flags(StatisticsFlags::empty());
90                    // make sure that the length is correct
91                    series_mut_inner.compute_len();
92                }
93
94                AmortSeries::new(self.series_container.clone())
95            })
96        })
97    }
98
99    fn size_hint(&self) -> (usize, Option<usize>) {
100        (self.len, Some(self.len))
101    }
102}
103
104// # Safety
105// we correctly implemented size_hint
106unsafe impl<I: Iterator<Item = Option<ArrayBox>>> TrustedLen for AmortizedListIter<'_, I> {}
107impl<I: Iterator<Item = Option<ArrayBox>>> ExactSizeIterator for AmortizedListIter<'_, I> {}
108
109impl ListChunked {
110    /// This is an iterator over a [`ListChunked`] that saves allocations.
111    /// A Series is:
112    ///     1. [`Arc<ChunkedArray>`]
113    ///     ChunkedArray is:
114    ///         2. Vec< 3. ArrayRef>
115    ///
116    /// The ArrayRef we indicated with 3. will be updated during iteration.
117    /// The Series will be pinned in memory, saving an allocation for
118    /// 1. Arc<..>
119    /// 2. Vec<...>
120    ///
121    /// If the returned `AmortSeries` is cloned, the local copy will be replaced and a new container
122    /// will be set.
123    pub fn amortized_iter(
124        &self,
125    ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
126        self.amortized_iter_with_name(PlSmallStr::EMPTY)
127    }
128
129    /// See `amortized_iter`.
130    pub fn amortized_iter_with_name(
131        &self,
132        name: PlSmallStr,
133    ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
134        // we create the series container from the inner array
135        // so that the container has the proper dtype.
136        let arr = self.downcast_iter().next().unwrap();
137        let inner_values = arr.values();
138
139        let inner_dtype = self.inner_dtype();
140        let iter_dtype = match inner_dtype {
141            #[cfg(feature = "dtype-struct")]
142            DataType::Struct(_) => inner_dtype.to_physical(),
143            // TODO: figure out how to deal with physical/logical distinction
144            // physical primitives like time, date etc. work
145            // physical nested need more
146            _ => inner_dtype.clone(),
147        };
148
149        // SAFETY:
150        // inner type passed as physical type
151        let (s, ptr) =
152            unsafe { unstable_series_container_and_ptr(name, inner_values.clone(), &iter_dtype) };
153
154        // SAFETY: ptr belongs the Series..
155        unsafe {
156            AmortizedListIter::new(
157                self.len(),
158                s,
159                NonNull::new(ptr).unwrap(),
160                self.downcast_iter().flat_map(|arr| arr.iter()),
161                inner_dtype.clone(),
162            )
163        }
164    }
165
166    /// Apply a closure `F` elementwise.
167    #[must_use]
168    pub fn apply_amortized_generic<F, K, V>(&self, f: F) -> ChunkedArray<V>
169    where
170        V: PolarsDataType,
171        F: FnMut(Option<AmortSeries>) -> Option<K> + Copy,
172        V::Array: ArrayFromIter<Option<K>>,
173    {
174        // TODO! make an amortized iter that does not flatten
175        self.amortized_iter().map(f).collect_ca(self.name().clone())
176    }
177
178    pub fn try_apply_amortized_generic<F, K, V>(&self, f: F) -> PolarsResult<ChunkedArray<V>>
179    where
180        V: PolarsDataType,
181        F: FnMut(Option<AmortSeries>) -> PolarsResult<Option<K>> + Copy,
182        V::Array: ArrayFromIter<Option<K>>,
183    {
184        // TODO! make an amortized iter that does not flatten
185        self.amortized_iter()
186            .map(f)
187            .try_collect_ca(self.name().clone())
188    }
189
190    pub fn for_each_amortized<F>(&self, f: F)
191    where
192        F: FnMut(Option<AmortSeries>),
193    {
194        self.amortized_iter().for_each(f)
195    }
196
197    /// Zip with a `ChunkedArray` then apply a binary function `F` elementwise.
198    #[must_use]
199    pub fn zip_and_apply_amortized<'a, T, I, F>(&'a self, ca: &'a ChunkedArray<T>, mut f: F) -> Self
200    where
201        T: PolarsDataType,
202        &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
203        I: TrustedLen<Item = Option<T::Physical<'a>>>,
204        F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> Option<Series>,
205    {
206        if self.is_empty() {
207            return self.clone();
208        }
209        let mut fast_explode = self.null_count() == 0;
210        let mut out: ListChunked = {
211            self.amortized_iter()
212                .zip(ca)
213                .map(|(opt_s, opt_v)| {
214                    let out = f(opt_s, opt_v);
215                    match out {
216                        Some(out) => {
217                            fast_explode &= !out.is_empty();
218                            Some(out)
219                        },
220                        None => {
221                            fast_explode = false;
222                            out
223                        },
224                    }
225                })
226                .collect_trusted()
227        };
228
229        out.rename(self.name().clone());
230        if fast_explode {
231            out.set_fast_explode();
232        }
233        out
234    }
235
236    #[must_use]
237    pub fn binary_zip_and_apply_amortized<'a, T, U, F>(
238        &'a self,
239        ca1: &'a ChunkedArray<T>,
240        ca2: &'a ChunkedArray<U>,
241        mut f: F,
242    ) -> Self
243    where
244        T: PolarsDataType,
245        U: PolarsDataType,
246        F: FnMut(
247            Option<AmortSeries>,
248            Option<T::Physical<'a>>,
249            Option<U::Physical<'a>>,
250        ) -> Option<Series>,
251    {
252        if self.is_empty() {
253            return self.clone();
254        }
255        let mut fast_explode = self.null_count() == 0;
256        let mut out: ListChunked = {
257            self.amortized_iter()
258                .zip(ca1.iter())
259                .zip(ca2.iter())
260                .map(|((opt_s, opt_u), opt_v)| {
261                    let out = f(opt_s, opt_u, opt_v);
262                    match out {
263                        Some(out) => {
264                            fast_explode &= !out.is_empty();
265                            Some(out)
266                        },
267                        None => {
268                            fast_explode = false;
269                            out
270                        },
271                    }
272                })
273                .collect_trusted()
274        };
275
276        out.rename(self.name().clone());
277        if fast_explode {
278            out.set_fast_explode();
279        }
280        out
281    }
282
283    pub fn try_binary_zip_and_apply_amortized<'a, T, U, F>(
284        &'a self,
285        ca1: &'a ChunkedArray<T>,
286        ca2: &'a ChunkedArray<U>,
287        mut f: F,
288    ) -> PolarsResult<Self>
289    where
290        T: PolarsDataType,
291        U: PolarsDataType,
292        F: FnMut(
293            Option<AmortSeries>,
294            Option<T::Physical<'a>>,
295            Option<U::Physical<'a>>,
296        ) -> PolarsResult<Option<Series>>,
297    {
298        if self.is_empty() {
299            return Ok(self.clone());
300        }
301        let mut fast_explode = self.null_count() == 0;
302        let mut out: ListChunked = {
303            self.amortized_iter()
304                .zip(ca1.iter())
305                .zip(ca2.iter())
306                .map(|((opt_s, opt_u), opt_v)| {
307                    let out = f(opt_s, opt_u, opt_v)?;
308                    match out {
309                        Some(out) => {
310                            fast_explode &= !out.is_empty();
311                            Ok(Some(out))
312                        },
313                        None => {
314                            fast_explode = false;
315                            Ok(out)
316                        },
317                    }
318                })
319                .collect::<PolarsResult<_>>()?
320        };
321
322        out.rename(self.name().clone());
323        if fast_explode {
324            out.set_fast_explode();
325        }
326        Ok(out)
327    }
328
329    pub fn try_zip_and_apply_amortized<'a, T, I, F>(
330        &'a self,
331        ca: &'a ChunkedArray<T>,
332        mut f: F,
333    ) -> PolarsResult<Self>
334    where
335        T: PolarsDataType,
336        &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
337        I: TrustedLen<Item = Option<T::Physical<'a>>>,
338        F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> PolarsResult<Option<Series>>,
339    {
340        if self.is_empty() {
341            return Ok(self.clone());
342        }
343        let mut fast_explode = self.null_count() == 0;
344        let mut out: ListChunked = {
345            self.amortized_iter()
346                .zip(ca)
347                .map(|(opt_s, opt_v)| {
348                    let out = f(opt_s, opt_v)?;
349                    match out {
350                        Some(out) => {
351                            fast_explode &= !out.is_empty();
352                            Ok(Some(out))
353                        },
354                        None => {
355                            fast_explode = false;
356                            Ok(out)
357                        },
358                    }
359                })
360                .collect::<PolarsResult<_>>()?
361        };
362
363        out.rename(self.name().clone());
364        if fast_explode {
365            out.set_fast_explode();
366        }
367        Ok(out)
368    }
369
370    /// Apply a closure `F` elementwise.
371    #[must_use]
372    pub fn apply_amortized<F>(&self, mut f: F) -> Self
373    where
374        F: FnMut(AmortSeries) -> Series,
375    {
376        if self.is_empty() {
377            return self.clone();
378        }
379        let mut fast_explode = self.null_count() == 0;
380        let mut ca: ListChunked = {
381            self.amortized_iter()
382                .map(|opt_v| {
383                    opt_v.map(|v| {
384                        let out = f(v);
385                        if out.is_empty() {
386                            fast_explode = false;
387                        }
388                        out
389                    })
390                })
391                .collect_trusted()
392        };
393
394        ca.rename(self.name().clone());
395        if fast_explode {
396            ca.set_fast_explode();
397        }
398        ca
399    }
400
401    pub fn try_apply_amortized<F>(&self, mut f: F) -> PolarsResult<Self>
402    where
403        F: FnMut(AmortSeries) -> PolarsResult<Series>,
404    {
405        if self.is_empty() {
406            return Ok(self.clone());
407        }
408        let mut fast_explode = self.null_count() == 0;
409        let mut ca: ListChunked = {
410            self.amortized_iter()
411                .map(|opt_v| {
412                    opt_v
413                        .map(|v| {
414                            let out = f(v);
415                            if let Ok(out) = &out {
416                                if out.is_empty() {
417                                    fast_explode = false
418                                }
419                            };
420                            out
421                        })
422                        .transpose()
423                })
424                .collect::<PolarsResult<_>>()?
425        };
426        ca.rename(self.name().clone());
427        if fast_explode {
428            ca.set_fast_explode();
429        }
430        Ok(ca)
431    }
432}
433
434#[cfg(test)]
435mod test {
436    use super::*;
437    use crate::chunked_array::builder::get_list_builder;
438
439    #[test]
440    fn test_iter_list() {
441        let mut builder = get_list_builder(&DataType::Int32, 10, 10, PlSmallStr::EMPTY);
442        builder
443            .append_series(&Series::new(PlSmallStr::EMPTY, &[1, 2, 3]))
444            .unwrap();
445        builder
446            .append_series(&Series::new(PlSmallStr::EMPTY, &[3, 2, 1]))
447            .unwrap();
448        builder
449            .append_series(&Series::new(PlSmallStr::EMPTY, &[1, 1]))
450            .unwrap();
451        let ca = builder.finish();
452
453        ca.amortized_iter().zip(&ca).for_each(|(s1, s2)| {
454            assert!(s1.unwrap().as_ref().equals(&s2.unwrap()));
455        })
456    }
457}