polars_core/chunked_array/list/
iterator.rs

1use std::marker::PhantomData;
2use std::ptr::NonNull;
3use std::rc::Rc;
4
5use crate::chunked_array::flags::StatisticsFlags;
6use crate::prelude::*;
7use crate::series::amortized_iter::{AmortSeries, ArrayBox, unstable_series_container_and_ptr};
8
9pub struct AmortizedListIter<'a, I: Iterator<Item = Option<ArrayBox>>> {
10    len: usize,
11    series_container: Rc<Series>,
12    inner: NonNull<ArrayRef>,
13    lifetime: PhantomData<&'a ArrayRef>,
14    iter: I,
15    // used only if feature="dtype-struct"
16    #[allow(dead_code)]
17    inner_dtype: DataType,
18}
19
20impl<I: Iterator<Item = Option<ArrayBox>>> AmortizedListIter<'_, I> {
21    pub(crate) unsafe fn new(
22        len: usize,
23        series_container: Series,
24        inner: NonNull<ArrayRef>,
25        iter: I,
26        inner_dtype: DataType,
27    ) -> Self {
28        Self {
29            len,
30            series_container: Rc::new(series_container),
31            inner,
32            lifetime: PhantomData,
33            iter,
34            inner_dtype,
35        }
36    }
37}
38
39impl<I: Iterator<Item = Option<ArrayBox>>> Iterator for AmortizedListIter<'_, I> {
40    type Item = Option<AmortSeries>;
41
42    fn next(&mut self) -> Option<Self::Item> {
43        self.iter.next().map(|opt_val| {
44            opt_val.map(|array_ref| {
45                #[cfg(feature = "dtype-struct")]
46                // structs arrays are bound to the series not to the arrayref
47                // so we must get a hold to the new array
48                if matches!(self.inner_dtype, DataType::Struct(_)) {
49                    // SAFETY:
50                    // dtype is known
51                    unsafe {
52                        let s = Series::from_chunks_and_dtype_unchecked(
53                            PlSmallStr::EMPTY,
54                            vec![array_ref],
55                            &self.inner_dtype.to_physical(),
56                        )
57                        .from_physical_unchecked(&self.inner_dtype)
58                        .unwrap();
59                        let inner = Rc::make_mut(&mut self.series_container);
60                        *inner = s;
61
62                        return AmortSeries::new(self.series_container.clone());
63                    }
64                }
65                // The series is cloned, we make a new container.
66                if Arc::strong_count(&self.series_container.0) > 1
67                    || Rc::strong_count(&self.series_container) > 1
68                {
69                    let (s, ptr) = unsafe {
70                        unstable_series_container_and_ptr(
71                            self.series_container.name().clone(),
72                            array_ref,
73                            self.series_container.dtype(),
74                        )
75                    };
76                    self.series_container = Rc::new(s);
77                    self.inner = NonNull::new(ptr).unwrap();
78                } else {
79                    // SAFETY: we checked the RC above;
80                    let series_mut =
81                        unsafe { Rc::get_mut(&mut self.series_container).unwrap_unchecked() };
82                    // update the inner state
83                    unsafe { *self.inner.as_mut() = array_ref };
84
85                    // As an optimization, we try to minimize how many calls to
86                    // _get_inner_mut() we do.
87                    let series_mut_inner = series_mut._get_inner_mut();
88                    // last iteration could have set the sorted flag (e.g. in compute_len)
89                    series_mut_inner._set_flags(StatisticsFlags::empty());
90                    // make sure that the length is correct
91                    series_mut_inner.compute_len();
92                }
93
94                // SAFETY:
95                // inner belongs to Series.
96                unsafe {
97                    AmortSeries::new_with_chunk(self.series_container.clone(), self.inner.as_ref())
98                }
99            })
100        })
101    }
102
103    fn size_hint(&self) -> (usize, Option<usize>) {
104        (self.len, Some(self.len))
105    }
106}
107
108// # Safety
109// we correctly implemented size_hint
110unsafe impl<I: Iterator<Item = Option<ArrayBox>>> TrustedLen for AmortizedListIter<'_, I> {}
111impl<I: Iterator<Item = Option<ArrayBox>>> ExactSizeIterator for AmortizedListIter<'_, I> {}
112
113impl ListChunked {
114    /// This is an iterator over a [`ListChunked`] that saves allocations.
115    /// A Series is:
116    ///     1. [`Arc<ChunkedArray>`]
117    ///     ChunkedArray is:
118    ///         2. Vec< 3. ArrayRef>
119    ///
120    /// The ArrayRef we indicated with 3. will be updated during iteration.
121    /// The Series will be pinned in memory, saving an allocation for
122    /// 1. Arc<..>
123    /// 2. Vec<...>
124    ///
125    /// If the returned `AmortSeries` is cloned, the local copy will be replaced and a new container
126    /// will be set.
127    pub fn amortized_iter(
128        &self,
129    ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
130        self.amortized_iter_with_name(PlSmallStr::EMPTY)
131    }
132
133    /// See `amortized_iter`.
134    pub fn amortized_iter_with_name(
135        &self,
136        name: PlSmallStr,
137    ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
138        // we create the series container from the inner array
139        // so that the container has the proper dtype.
140        let arr = self.downcast_iter().next().unwrap();
141        let inner_values = arr.values();
142
143        let inner_dtype = self.inner_dtype();
144        let iter_dtype = match inner_dtype {
145            #[cfg(feature = "dtype-struct")]
146            DataType::Struct(_) => inner_dtype.to_physical(),
147            // TODO: figure out how to deal with physical/logical distinction
148            // physical primitives like time, date etc. work
149            // physical nested need more
150            _ => inner_dtype.clone(),
151        };
152
153        // SAFETY:
154        // inner type passed as physical type
155        let (s, ptr) =
156            unsafe { unstable_series_container_and_ptr(name, inner_values.clone(), &iter_dtype) };
157
158        // SAFETY: ptr belongs the Series..
159        unsafe {
160            AmortizedListIter::new(
161                self.len(),
162                s,
163                NonNull::new(ptr).unwrap(),
164                self.downcast_iter().flat_map(|arr| arr.iter()),
165                inner_dtype.clone(),
166            )
167        }
168    }
169
170    /// Apply a closure `F` elementwise.
171    #[must_use]
172    pub fn apply_amortized_generic<F, K, V>(&self, f: F) -> ChunkedArray<V>
173    where
174        V: PolarsDataType,
175        F: FnMut(Option<AmortSeries>) -> Option<K> + Copy,
176        V::Array: ArrayFromIter<Option<K>>,
177    {
178        // TODO! make an amortized iter that does not flatten
179        self.amortized_iter().map(f).collect_ca(self.name().clone())
180    }
181
182    pub fn try_apply_amortized_generic<F, K, V>(&self, f: F) -> PolarsResult<ChunkedArray<V>>
183    where
184        V: PolarsDataType,
185        F: FnMut(Option<AmortSeries>) -> PolarsResult<Option<K>> + Copy,
186        V::Array: ArrayFromIter<Option<K>>,
187    {
188        // TODO! make an amortized iter that does not flatten
189        self.amortized_iter()
190            .map(f)
191            .try_collect_ca(self.name().clone())
192    }
193
194    pub fn for_each_amortized<F>(&self, f: F)
195    where
196        F: FnMut(Option<AmortSeries>),
197    {
198        self.amortized_iter().for_each(f)
199    }
200
201    /// Zip with a `ChunkedArray` then apply a binary function `F` elementwise.
202    #[must_use]
203    pub fn zip_and_apply_amortized<'a, T, I, F>(&'a self, ca: &'a ChunkedArray<T>, mut f: F) -> Self
204    where
205        T: PolarsDataType,
206        &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
207        I: TrustedLen<Item = Option<T::Physical<'a>>>,
208        F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> Option<Series>,
209    {
210        if self.is_empty() {
211            return self.clone();
212        }
213        let mut fast_explode = self.null_count() == 0;
214        let mut out: ListChunked = {
215            self.amortized_iter()
216                .zip(ca)
217                .map(|(opt_s, opt_v)| {
218                    let out = f(opt_s, opt_v);
219                    match out {
220                        Some(out) => {
221                            fast_explode &= !out.is_empty();
222                            Some(out)
223                        },
224                        None => {
225                            fast_explode = false;
226                            out
227                        },
228                    }
229                })
230                .collect_trusted()
231        };
232
233        out.rename(self.name().clone());
234        if fast_explode {
235            out.set_fast_explode();
236        }
237        out
238    }
239
240    #[must_use]
241    pub fn binary_zip_and_apply_amortized<'a, T, U, F>(
242        &'a self,
243        ca1: &'a ChunkedArray<T>,
244        ca2: &'a ChunkedArray<U>,
245        mut f: F,
246    ) -> Self
247    where
248        T: PolarsDataType,
249        U: PolarsDataType,
250        F: FnMut(
251            Option<AmortSeries>,
252            Option<T::Physical<'a>>,
253            Option<U::Physical<'a>>,
254        ) -> Option<Series>,
255    {
256        if self.is_empty() {
257            return self.clone();
258        }
259        let mut fast_explode = self.null_count() == 0;
260        let mut out: ListChunked = {
261            self.amortized_iter()
262                .zip(ca1.iter())
263                .zip(ca2.iter())
264                .map(|((opt_s, opt_u), opt_v)| {
265                    let out = f(opt_s, opt_u, opt_v);
266                    match out {
267                        Some(out) => {
268                            fast_explode &= !out.is_empty();
269                            Some(out)
270                        },
271                        None => {
272                            fast_explode = false;
273                            out
274                        },
275                    }
276                })
277                .collect_trusted()
278        };
279
280        out.rename(self.name().clone());
281        if fast_explode {
282            out.set_fast_explode();
283        }
284        out
285    }
286
287    pub fn try_binary_zip_and_apply_amortized<'a, T, U, F>(
288        &'a self,
289        ca1: &'a ChunkedArray<T>,
290        ca2: &'a ChunkedArray<U>,
291        mut f: F,
292    ) -> PolarsResult<Self>
293    where
294        T: PolarsDataType,
295        U: PolarsDataType,
296        F: FnMut(
297            Option<AmortSeries>,
298            Option<T::Physical<'a>>,
299            Option<U::Physical<'a>>,
300        ) -> PolarsResult<Option<Series>>,
301    {
302        if self.is_empty() {
303            return Ok(self.clone());
304        }
305        let mut fast_explode = self.null_count() == 0;
306        let mut out: ListChunked = {
307            self.amortized_iter()
308                .zip(ca1.iter())
309                .zip(ca2.iter())
310                .map(|((opt_s, opt_u), opt_v)| {
311                    let out = f(opt_s, opt_u, opt_v)?;
312                    match out {
313                        Some(out) => {
314                            fast_explode &= !out.is_empty();
315                            Ok(Some(out))
316                        },
317                        None => {
318                            fast_explode = false;
319                            Ok(out)
320                        },
321                    }
322                })
323                .collect::<PolarsResult<_>>()?
324        };
325
326        out.rename(self.name().clone());
327        if fast_explode {
328            out.set_fast_explode();
329        }
330        Ok(out)
331    }
332
333    pub fn try_zip_and_apply_amortized<'a, T, I, F>(
334        &'a self,
335        ca: &'a ChunkedArray<T>,
336        mut f: F,
337    ) -> PolarsResult<Self>
338    where
339        T: PolarsDataType,
340        &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
341        I: TrustedLen<Item = Option<T::Physical<'a>>>,
342        F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> PolarsResult<Option<Series>>,
343    {
344        if self.is_empty() {
345            return Ok(self.clone());
346        }
347        let mut fast_explode = self.null_count() == 0;
348        let mut out: ListChunked = {
349            self.amortized_iter()
350                .zip(ca)
351                .map(|(opt_s, opt_v)| {
352                    let out = f(opt_s, opt_v)?;
353                    match out {
354                        Some(out) => {
355                            fast_explode &= !out.is_empty();
356                            Ok(Some(out))
357                        },
358                        None => {
359                            fast_explode = false;
360                            Ok(out)
361                        },
362                    }
363                })
364                .collect::<PolarsResult<_>>()?
365        };
366
367        out.rename(self.name().clone());
368        if fast_explode {
369            out.set_fast_explode();
370        }
371        Ok(out)
372    }
373
374    /// Apply a closure `F` elementwise.
375    #[must_use]
376    pub fn apply_amortized<F>(&self, mut f: F) -> Self
377    where
378        F: FnMut(AmortSeries) -> Series,
379    {
380        if self.is_empty() {
381            return self.clone();
382        }
383        let mut fast_explode = self.null_count() == 0;
384        let mut ca: ListChunked = {
385            self.amortized_iter()
386                .map(|opt_v| {
387                    opt_v.map(|v| {
388                        let out = f(v);
389                        if out.is_empty() {
390                            fast_explode = false;
391                        }
392                        out
393                    })
394                })
395                .collect_trusted()
396        };
397
398        ca.rename(self.name().clone());
399        if fast_explode {
400            ca.set_fast_explode();
401        }
402        ca
403    }
404
405    pub fn try_apply_amortized<F>(&self, mut f: F) -> PolarsResult<Self>
406    where
407        F: FnMut(AmortSeries) -> PolarsResult<Series>,
408    {
409        if self.is_empty() {
410            return Ok(self.clone());
411        }
412        let mut fast_explode = self.null_count() == 0;
413        let mut ca: ListChunked = {
414            self.amortized_iter()
415                .map(|opt_v| {
416                    opt_v
417                        .map(|v| {
418                            let out = f(v);
419                            if let Ok(out) = &out {
420                                if out.is_empty() {
421                                    fast_explode = false
422                                }
423                            };
424                            out
425                        })
426                        .transpose()
427                })
428                .collect::<PolarsResult<_>>()?
429        };
430        ca.rename(self.name().clone());
431        if fast_explode {
432            ca.set_fast_explode();
433        }
434        Ok(ca)
435    }
436}
437
438#[cfg(test)]
439mod test {
440    use super::*;
441    use crate::chunked_array::builder::get_list_builder;
442
443    #[test]
444    fn test_iter_list() {
445        let mut builder = get_list_builder(&DataType::Int32, 10, 10, PlSmallStr::EMPTY);
446        builder
447            .append_series(&Series::new(PlSmallStr::EMPTY, &[1, 2, 3]))
448            .unwrap();
449        builder
450            .append_series(&Series::new(PlSmallStr::EMPTY, &[3, 2, 1]))
451            .unwrap();
452        builder
453            .append_series(&Series::new(PlSmallStr::EMPTY, &[1, 1]))
454            .unwrap();
455        let ca = builder.finish();
456
457        ca.amortized_iter().zip(&ca).for_each(|(s1, s2)| {
458            assert!(s1.unwrap().as_ref().equals(&s2.unwrap()));
459        })
460    }
461}