1use std::ptr::NonNull;
23use super::*;
4use crate::chunked_array::list::iterator::AmortizedListIter;
5use crate::series::amortized_iter::{AmortSeries, ArrayBox, unstable_series_container_and_ptr};
67impl ArrayChunked {
8/// This is an iterator over a [`ArrayChunked`] that save allocations.
9 /// A Series is:
10 /// 1. [`Arc<ChunkedArray>`]
11 /// ChunkedArray is:
12 /// 2. Vec< 3. ArrayRef>
13 ///
14 /// The [`ArrayRef`] we indicated with 3. will be updated during iteration.
15 /// The Series will be pinned in memory, saving an allocation for
16 /// 1. Arc<..>
17 /// 2. Vec<...>
18 ///
19 /// # Warning
20 /// Though memory safe in the sense that it will not read unowned memory, UB, or memory leaks
21 /// this function still needs precautions. The returned should never be cloned or taken longer
22 /// than a single iteration, as every call on `next` of the iterator will change the contents of
23 /// that Series.
24 ///
25 /// # Safety
26 /// The lifetime of [AmortSeries] is bound to the iterator. Keeping it alive
27 /// longer than the iterator is UB.
28pub fn amortized_iter(&self) -> AmortizedListIter<impl Iterator<Item = Option<ArrayBox>> + '_> {
29self.amortized_iter_with_name(PlSmallStr::EMPTY)
30 }
3132/// This is an iterator over a [`ArrayChunked`] that save allocations.
33 /// A Series is:
34 /// 1. [`Arc<ChunkedArray>`]
35 /// ChunkedArray is:
36 /// 2. Vec< 3. ArrayRef>
37 ///
38 /// The ArrayRef we indicated with 3. will be updated during iteration.
39 /// The Series will be pinned in memory, saving an allocation for
40 /// 1. Arc<..>
41 /// 2. Vec<...>
42 ///
43 /// If the returned `AmortSeries` is cloned, the local copy will be replaced and a new container
44 /// will be set.
45pub fn amortized_iter_with_name(
46&self,
47 name: PlSmallStr,
48 ) -> AmortizedListIter<impl Iterator<Item = Option<ArrayBox>> + '_> {
49// we create the series container from the inner array
50 // so that the container has the proper dtype.
51let arr = self.downcast_iter().next().unwrap();
52let inner_values = arr.values();
5354let inner_dtype = self.inner_dtype();
55let iter_dtype = match inner_dtype {
56#[cfg(feature = "dtype-struct")]
57DataType::Struct(_) => inner_dtype.to_physical(),
58// TODO: figure out how to deal with physical/logical distinction
59 // physical primitives like time, date etc. work
60 // physical nested need more
61_ => inner_dtype.clone(),
62 };
6364// SAFETY:
65 // inner type passed as physical type
66let (s, ptr) =
67unsafe { unstable_series_container_and_ptr(name, inner_values.clone(), &iter_dtype) };
6869// SAFETY: `ptr` belongs to the `Series`.
70unsafe {
71 AmortizedListIter::new(
72self.len(),
73 s,
74 NonNull::new(ptr).unwrap(),
75self.downcast_iter().flat_map(|arr| arr.iter()),
76 inner_dtype.clone(),
77 )
78 }
79 }
8081pub fn try_apply_amortized_to_list<F>(&self, mut f: F) -> PolarsResult<ListChunked>
82where
83F: FnMut(AmortSeries) -> PolarsResult<Series>,
84 {
85if self.is_empty() {
86return Ok(Series::new_empty(
87self.name().clone(),
88&DataType::List(Box::new(self.inner_dtype().clone())),
89 )
90 .list()
91 .unwrap()
92 .clone());
93 }
94let mut fast_explode = self.null_count() == 0;
95let mut ca: ListChunked = {
96self.amortized_iter()
97 .map(|opt_v| {
98 opt_v
99 .map(|v| {
100let out = f(v);
101if let Ok(out) = &out {
102if out.is_empty() {
103 fast_explode = false
104}
105 };
106 out
107 })
108 .transpose()
109 })
110 .collect::<PolarsResult<_>>()?
111};
112 ca.rename(self.name().clone());
113if fast_explode {
114 ca.set_fast_explode();
115 }
116Ok(ca)
117 }
118119/// Apply a closure `F` to each array.
120 ///
121 /// # Safety
122 /// Return series of `F` must has the same dtype and number of elements as input.
123#[must_use]
124pub unsafe fn apply_amortized_same_type<F>(&self, mut f: F) -> Self
125where
126F: FnMut(AmortSeries) -> Series,
127 {
128if self.is_empty() {
129return self.clone();
130 }
131self.amortized_iter()
132 .map(|opt_v| {
133 opt_v.map(|v| {
134let out = f(v);
135 to_arr(&out)
136 })
137 })
138 .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
139 }
140141/// Try apply a closure `F` to each array.
142 ///
143 /// # Safety
144 /// Return series of `F` must has the same dtype and number of elements as input if it is Ok.
145pub unsafe fn try_apply_amortized_same_type<F>(&self, mut f: F) -> PolarsResult<Self>
146where
147F: FnMut(AmortSeries) -> PolarsResult<Series>,
148 {
149if self.is_empty() {
150return Ok(self.clone());
151 }
152self.amortized_iter()
153 .map(|opt_v| {
154 opt_v
155 .map(|v| {
156let out = f(v)?;
157Ok(to_arr(&out))
158 })
159 .transpose()
160 })
161 .try_collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
162 }
163164/// Zip with a `ChunkedArray` then apply a binary function `F` elementwise.
165 ///
166 /// # Safety
167// Return series of `F` must has the same dtype and number of elements as input series.
168#[must_use]
169pub unsafe fn zip_and_apply_amortized_same_type<'a, T, F>(
170&'a self,
171 ca: &'a ChunkedArray<T>,
172mut f: F,
173 ) -> Self
174where
175T: PolarsDataType,
176 F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> Option<Series>,
177 {
178if self.is_empty() {
179return self.clone();
180 }
181self.amortized_iter()
182 .zip(ca.iter())
183 .map(|(opt_s, opt_v)| {
184let out = f(opt_s, opt_v);
185 out.map(|s| to_arr(&s))
186 })
187 .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
188 }
189190/// Apply a closure `F` elementwise.
191#[must_use]
192pub fn apply_amortized_generic<F, K, V>(&self, f: F) -> ChunkedArray<V>
193where
194V: PolarsDataType,
195 F: FnMut(Option<AmortSeries>) -> Option<K> + Copy,
196 V::Array: ArrayFromIter<Option<K>>,
197 {
198self.amortized_iter().map(f).collect_ca(self.name().clone())
199 }
200201/// Try apply a closure `F` elementwise.
202pub fn try_apply_amortized_generic<F, K, V>(&self, f: F) -> PolarsResult<ChunkedArray<V>>
203where
204V: PolarsDataType,
205 F: FnMut(Option<AmortSeries>) -> PolarsResult<Option<K>> + Copy,
206 V::Array: ArrayFromIter<Option<K>>,
207 {
208 {
209self.amortized_iter()
210 .map(f)
211 .try_collect_ca(self.name().clone())
212 }
213 }
214215pub fn for_each_amortized<F>(&self, f: F)
216where
217F: FnMut(Option<AmortSeries>),
218 {
219self.amortized_iter().for_each(f)
220 }
221}
222223fn to_arr(s: &Series) -> ArrayRef {
224if s.chunks().len() > 1 {
225let s = s.rechunk();
226 s.chunks()[0].clone()
227 } else {
228 s.chunks()[0].clone()
229 }
230}