Skip to main content

polars_core/chunked_array/list/
iterator.rs

1use std::marker::PhantomData;
2use std::ptr::NonNull;
3use std::rc::Rc;
4
5use crate::chunked_array::flags::StatisticsFlags;
6use crate::prelude::*;
7use crate::series::amortized_iter::{AmortSeries, ArrayBox, unstable_series_container_and_ptr};
8
9pub struct AmortizedListIter<'a, I: Iterator<Item = Option<ArrayBox>>> {
10    len: usize,
11    series_container: Rc<Series>,
12    inner: NonNull<ArrayRef>,
13    lifetime: PhantomData<&'a ArrayRef>,
14    iter: I,
15    // used only if feature="dtype-struct"
16    #[allow(dead_code)]
17    inner_dtype: DataType,
18}
19
20impl<I: Iterator<Item = Option<ArrayBox>>> AmortizedListIter<'_, I> {
21    pub(crate) unsafe fn new(
22        len: usize,
23        series_container: Series,
24        inner: NonNull<ArrayRef>,
25        iter: I,
26        inner_dtype: DataType,
27    ) -> Self {
28        Self {
29            len,
30            series_container: Rc::new(series_container),
31            inner,
32            lifetime: PhantomData,
33            iter,
34            inner_dtype,
35        }
36    }
37}
38
39impl<I: Iterator<Item = Option<ArrayBox>>> Iterator for AmortizedListIter<'_, I> {
40    type Item = Option<AmortSeries>;
41
42    fn next(&mut self) -> Option<Self::Item> {
43        self.iter.next().map(|opt_val| {
44            opt_val.map(|array_ref| {
45                #[cfg(feature = "dtype-struct")]
46                // structs arrays are bound to the series not to the arrayref
47                // so we must get a hold to the new array
48                if matches!(self.inner_dtype, DataType::Struct(_)) {
49                    // SAFETY:
50                    // dtype is known
51                    unsafe {
52                        let s = Series::from_chunks_and_dtype_unchecked(
53                            self.series_container.name().clone(),
54                            vec![array_ref],
55                            &self.inner_dtype.to_physical(),
56                        )
57                        .from_physical_unchecked(&self.inner_dtype)
58                        .unwrap();
59                        let inner = Rc::make_mut(&mut self.series_container);
60                        *inner = s;
61
62                        return AmortSeries::new(self.series_container.clone());
63                    }
64                }
65                // The series is cloned, we make a new container.
66                if Arc::strong_count(&self.series_container.0) > 1
67                    || Rc::strong_count(&self.series_container) > 1
68                {
69                    let (s, ptr) = unsafe {
70                        unstable_series_container_and_ptr(
71                            self.series_container.name().clone(),
72                            array_ref,
73                            self.series_container.dtype(),
74                        )
75                    };
76                    self.series_container = Rc::new(s);
77                    self.inner = NonNull::new(ptr).unwrap();
78                } else {
79                    // SAFETY: we checked the RC above;
80                    let series_mut =
81                        unsafe { Rc::get_mut(&mut self.series_container).unwrap_unchecked() };
82                    // update the inner state
83                    unsafe { *self.inner.as_mut() = array_ref };
84
85                    // As an optimization, we try to minimize how many calls to
86                    // _get_inner_mut() we do.
87                    let series_mut_inner = series_mut._get_inner_mut();
88                    // last iteration could have set the sorted flag (e.g. in compute_len)
89                    series_mut_inner._set_flags(StatisticsFlags::empty());
90                    // make sure that the length is correct
91                    series_mut_inner.compute_len();
92                }
93
94                AmortSeries::new(self.series_container.clone())
95            })
96        })
97    }
98
99    fn size_hint(&self) -> (usize, Option<usize>) {
100        (self.len, Some(self.len))
101    }
102}
103
104// # Safety
105// we correctly implemented size_hint
106unsafe impl<I: Iterator<Item = Option<ArrayBox>>> TrustedLen for AmortizedListIter<'_, I> {}
107impl<I: Iterator<Item = Option<ArrayBox>>> ExactSizeIterator for AmortizedListIter<'_, I> {}
108
109impl ListChunked {
110    /// This is an iterator over a [`ListChunked`] that saves allocations.
111    /// A Series is:
112    ///     1. [`Arc<ChunkedArray>`]
113    ///     ChunkedArray is:
114    ///         2. Vec< 3. ArrayRef>
115    ///
116    /// The ArrayRef we indicated with 3. will be updated during iteration.
117    /// The Series will be pinned in memory, saving an allocation for
118    /// 1. Arc<..>
119    /// 2. Vec<...>
120    ///
121    /// If the returned `AmortSeries` is cloned, the local copy will be replaced and a new container
122    /// will be set.
123    pub fn amortized_iter(
124        &self,
125    ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
126        self.amortized_iter_with_name(PlSmallStr::EMPTY)
127    }
128
129    /// See `amortized_iter`.
130    pub fn amortized_iter_with_name(
131        &self,
132        name: PlSmallStr,
133    ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
134        // we create the series container from the inner array
135        // so that the container has the proper dtype.
136        let arr = self.downcast_iter().next().unwrap();
137        let inner_values = arr.values();
138
139        let inner_dtype = self.inner_dtype();
140        let iter_dtype = match inner_dtype {
141            #[cfg(feature = "dtype-struct")]
142            DataType::Struct(_) => inner_dtype.to_physical(),
143            // TODO: figure out how to deal with physical/logical distinction
144            // physical primitives like time, date etc. work
145            // physical nested need more
146            _ => inner_dtype.clone(),
147        };
148
149        // SAFETY:
150        // inner type passed as physical type
151        let (s, ptr) =
152            unsafe { unstable_series_container_and_ptr(name, inner_values.clone(), &iter_dtype) };
153
154        // SAFETY: ptr belongs the Series..
155        unsafe {
156            AmortizedListIter::new(
157                self.len(),
158                s,
159                NonNull::new(ptr).unwrap(),
160                self.downcast_iter().flat_map(|arr| arr.iter()),
161                inner_dtype.clone(),
162            )
163        }
164    }
165
166    /// Apply a closure `F` elementwise.
167    #[must_use]
168    pub fn apply_amortized_generic<F, K, V>(&self, f: F) -> ChunkedArray<V>
169    where
170        V: PolarsDataType,
171        F: FnMut(Option<AmortSeries>) -> Option<K> + Copy,
172        V::Array: ArrayFromIter<Option<K>>,
173    {
174        // TODO! make an amortized iter that does not flatten
175        self.amortized_iter().map(f).collect_ca(self.name().clone())
176    }
177
178    pub fn try_apply_amortized_generic<F, K, V>(&self, f: F) -> PolarsResult<ChunkedArray<V>>
179    where
180        V: PolarsDataType,
181        F: FnMut(Option<AmortSeries>) -> PolarsResult<Option<K>> + Copy,
182        V::Array: ArrayFromIter<Option<K>>,
183    {
184        // TODO! make an amortized iter that does not flatten
185        self.amortized_iter()
186            .map(f)
187            .try_collect_ca(self.name().clone())
188    }
189
190    pub fn for_each_amortized<F>(&self, f: F)
191    where
192        F: FnMut(Option<AmortSeries>),
193    {
194        self.amortized_iter().for_each(f)
195    }
196
197    /// Zip with a `ChunkedArray` then apply a binary function `F` elementwise.
198    #[must_use]
199    pub fn zip_and_apply_amortized<'a, T, F>(&'a self, ca: &'a ChunkedArray<T>, mut f: F) -> Self
200    where
201        T: PolarsDataType,
202        F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> Option<Series>,
203    {
204        if self.is_empty() {
205            return self.clone();
206        }
207        let mut fast_explode = self.null_count() == 0;
208        let mut out: ListChunked = {
209            self.amortized_iter()
210                .zip(ca.iter())
211                .map(|(opt_s, opt_v)| {
212                    let out = f(opt_s, opt_v);
213                    match out {
214                        Some(out) => {
215                            fast_explode &= !out.is_empty();
216                            Some(out)
217                        },
218                        None => {
219                            fast_explode = false;
220                            out
221                        },
222                    }
223                })
224                .collect_trusted()
225        };
226
227        out.rename(self.name().clone());
228        if fast_explode {
229            out.set_fast_explode();
230        }
231        out
232    }
233
234    #[must_use]
235    pub fn binary_zip_and_apply_amortized<'a, T, U, F>(
236        &'a self,
237        ca1: &'a ChunkedArray<T>,
238        ca2: &'a ChunkedArray<U>,
239        mut f: F,
240    ) -> Self
241    where
242        T: PolarsDataType,
243        U: PolarsDataType,
244        F: FnMut(
245            Option<AmortSeries>,
246            Option<T::Physical<'a>>,
247            Option<U::Physical<'a>>,
248        ) -> Option<Series>,
249    {
250        if self.is_empty() {
251            return self.clone();
252        }
253        let mut fast_explode = self.null_count() == 0;
254        let mut out: ListChunked = {
255            self.amortized_iter()
256                .zip(ca1.iter())
257                .zip(ca2.iter())
258                .map(|((opt_s, opt_u), opt_v)| {
259                    let out = f(opt_s, opt_u, opt_v);
260                    match out {
261                        Some(out) => {
262                            fast_explode &= !out.is_empty();
263                            Some(out)
264                        },
265                        None => {
266                            fast_explode = false;
267                            out
268                        },
269                    }
270                })
271                .collect_trusted()
272        };
273
274        out.rename(self.name().clone());
275        if fast_explode {
276            out.set_fast_explode();
277        }
278        out
279    }
280
281    pub fn try_binary_zip_and_apply_amortized<'a, T, U, F>(
282        &'a self,
283        ca1: &'a ChunkedArray<T>,
284        ca2: &'a ChunkedArray<U>,
285        mut f: F,
286    ) -> PolarsResult<Self>
287    where
288        T: PolarsDataType,
289        U: PolarsDataType,
290        F: FnMut(
291            Option<AmortSeries>,
292            Option<T::Physical<'a>>,
293            Option<U::Physical<'a>>,
294        ) -> PolarsResult<Option<Series>>,
295    {
296        if self.is_empty() {
297            return Ok(self.clone());
298        }
299        let mut fast_explode = self.null_count() == 0;
300        let mut out: ListChunked = {
301            self.amortized_iter()
302                .zip(ca1.iter())
303                .zip(ca2.iter())
304                .map(|((opt_s, opt_u), opt_v)| {
305                    let out = f(opt_s, opt_u, opt_v)?;
306                    match out {
307                        Some(out) => {
308                            fast_explode &= !out.is_empty();
309                            Ok(Some(out))
310                        },
311                        None => {
312                            fast_explode = false;
313                            Ok(out)
314                        },
315                    }
316                })
317                .collect::<PolarsResult<_>>()?
318        };
319
320        out.rename(self.name().clone());
321        if fast_explode {
322            out.set_fast_explode();
323        }
324        Ok(out)
325    }
326
327    pub fn try_zip_and_apply_amortized<'a, T, F>(
328        &'a self,
329        ca: &'a ChunkedArray<T>,
330        mut f: F,
331    ) -> PolarsResult<Self>
332    where
333        T: PolarsDataType,
334        F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> PolarsResult<Option<Series>>,
335    {
336        if self.is_empty() {
337            return Ok(self.clone());
338        }
339        let mut fast_explode = self.null_count() == 0;
340        let mut out: ListChunked = {
341            self.amortized_iter()
342                .zip(ca.iter())
343                .map(|(opt_s, opt_v)| {
344                    let out = f(opt_s, opt_v)?;
345                    match out {
346                        Some(out) => {
347                            fast_explode &= !out.is_empty();
348                            Ok(Some(out))
349                        },
350                        None => {
351                            fast_explode = false;
352                            Ok(out)
353                        },
354                    }
355                })
356                .collect::<PolarsResult<_>>()?
357        };
358
359        out.rename(self.name().clone());
360        if fast_explode {
361            out.set_fast_explode();
362        }
363        Ok(out)
364    }
365
366    /// Apply a closure `F` to each list elementwise.
367    ///
368    /// # Safety
369    /// The closure `F` must return the same dtype as the input.
370    #[must_use]
371    pub unsafe fn apply_amortized_same_type<F>(&self, mut f: F) -> Self
372    where
373        F: FnMut(AmortSeries) -> Series,
374    {
375        if self.is_empty() {
376            return self.clone();
377        }
378        let mut fast_explode = self.null_count() == 0;
379        let mut ca: ListChunked = self
380            .amortized_iter()
381            .map(|opt_v| {
382                opt_v.map(|v| {
383                    let out = f(v);
384                    if out.is_empty() {
385                        fast_explode = false;
386                    }
387                    to_arr(&out)
388                })
389            })
390            .collect_ca_with_dtype(self.name().clone(), self.dtype().clone());
391
392        if fast_explode {
393            ca.set_fast_explode();
394        }
395        ca
396    }
397
398    /// Try apply a closure `F` elementwise (may change dtype).
399    pub fn try_apply_amortized<F>(&self, mut f: F) -> PolarsResult<Self>
400    where
401        F: FnMut(AmortSeries) -> PolarsResult<Series>,
402    {
403        if self.is_empty() {
404            return Ok(self.clone());
405        }
406        let mut fast_explode = self.null_count() == 0;
407        let mut ca: ListChunked = {
408            self.amortized_iter()
409                .map(|opt_v| {
410                    opt_v
411                        .map(|v| {
412                            let out = f(v);
413                            if let Ok(out) = &out {
414                                if out.is_empty() {
415                                    fast_explode = false
416                                }
417                            };
418                            out
419                        })
420                        .transpose()
421                })
422                .collect::<PolarsResult<_>>()?
423        };
424        ca.rename(self.name().clone());
425        if fast_explode {
426            ca.set_fast_explode();
427        }
428        Ok(ca)
429    }
430
431    /// Try apply a closure `F` to each list element.
432    ///
433    /// # Safety
434    /// The closure `F` must return the same dtype as the input.
435    pub unsafe fn try_apply_amortized_same_type<F>(&self, mut f: F) -> PolarsResult<Self>
436    where
437        F: FnMut(AmortSeries) -> PolarsResult<Series>,
438    {
439        if self.is_empty() {
440            return Ok(self.clone());
441        }
442        let mut fast_explode = self.null_count() == 0;
443        let mut ca: ListChunked = self
444            .amortized_iter()
445            .map(|opt_v| {
446                opt_v
447                    .map(|v| {
448                        let out = f(v)?;
449                        if out.is_empty() {
450                            fast_explode = false;
451                        }
452                        PolarsResult::Ok(to_arr(&out))
453                    })
454                    .transpose()
455            })
456            .try_collect_ca_with_dtype(self.name().clone(), self.dtype().clone())?;
457
458        if fast_explode {
459            ca.set_fast_explode();
460        }
461        Ok(ca)
462    }
463}
464
465fn to_arr(s: &Series) -> ArrayRef {
466    if s.chunks().len() > 1 {
467        let s = s.rechunk();
468        s.chunks()[0].clone()
469    } else {
470        s.chunks()[0].clone()
471    }
472}
473
474#[cfg(test)]
475mod test {
476    use super::*;
477    use crate::chunked_array::builder::get_list_builder;
478
479    #[test]
480    fn test_iter_list() {
481        let mut builder = get_list_builder(&DataType::Int32, 10, 10, PlSmallStr::EMPTY);
482        builder
483            .append_series(&Series::new(PlSmallStr::EMPTY, &[1, 2, 3]))
484            .unwrap();
485        builder
486            .append_series(&Series::new(PlSmallStr::EMPTY, &[3, 2, 1]))
487            .unwrap();
488        builder
489            .append_series(&Series::new(PlSmallStr::EMPTY, &[1, 1]))
490            .unwrap();
491        let ca = builder.finish();
492
493        ca.amortized_iter()
494            .zip(ca.series_iter())
495            .for_each(|(s1, s2)| {
496                assert!(s1.unwrap().as_ref().equals(&s2.unwrap()));
497            })
498    }
499}