polars_core/chunked_array/
from_iterator_par.rs1use std::collections::LinkedList;
3use std::sync::Mutex;
4
5use arrow::pushable::{NoOption, Pushable};
6use rayon::prelude::*;
7
8use super::from_iterator::PolarsAsRef;
9use crate::chunked_array::builder::get_list_builder;
10use crate::prelude::*;
11use crate::utils::NoNull;
12use crate::utils::flatten::flatten_par;
13
14fn vec_push<T>(mut vec: Vec<T>, elem: T) -> Vec<T> {
17 vec.push(elem);
18 vec
19}
20
21fn as_list<T>(item: T) -> LinkedList<T> {
22 let mut list = LinkedList::new();
23 list.push_back(item);
24 list
25}
26
27fn list_append<T>(mut list1: LinkedList<T>, mut list2: LinkedList<T>) -> LinkedList<T> {
28 list1.append(&mut list2);
29 list1
30}
31
32fn collect_into_linked_list_vec<I>(par_iter: I) -> LinkedList<Vec<I::Item>>
33where
34 I: IntoParallelIterator,
35{
36 let it = par_iter.into_par_iter();
37 it.fold(Vec::new, vec_push)
41 .map(as_list)
42 .reduce(LinkedList::new, list_append)
43}
44
45fn collect_into_linked_list<I, P, F>(par_iter: I, identity: F) -> LinkedList<P::Freeze>
46where
47 I: IntoParallelIterator,
48 P: Pushable<I::Item> + Send + Sync,
49 F: Fn() -> P + Sync + Send,
50 P::Freeze: Send,
51{
52 let it = par_iter.into_par_iter();
53 it.fold(identity, |mut v, item| {
54 v.push(item);
55 v
56 })
57 .map(|p| as_list(p.freeze()))
59 .reduce(LinkedList::new, list_append)
60}
61
62fn get_capacity_from_par_results<T>(ll: &LinkedList<Vec<T>>) -> usize {
63 ll.iter().map(|list| list.len()).sum()
64}
65
66impl<T> FromParallelIterator<T::Native> for NoNull<ChunkedArray<T>>
67where
68 T: PolarsNumericType,
69{
70 fn from_par_iter<I: IntoParallelIterator<Item = T::Native>>(iter: I) -> Self {
71 let vectors = collect_into_linked_list_vec(iter);
73 let vectors = vectors.into_iter().collect::<Vec<_>>();
74 let values = flatten_par(&vectors);
75 NoNull::new(ChunkedArray::new_vec(PlSmallStr::EMPTY, values))
76 }
77}
78
79impl<T> FromParallelIterator<Option<T::Native>> for ChunkedArray<T>
80where
81 T: PolarsNumericType,
82{
83 fn from_par_iter<I: IntoParallelIterator<Item = Option<T::Native>>>(iter: I) -> Self {
84 let chunks = collect_into_linked_list(iter, MutablePrimitiveArray::new);
85 Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
86 }
87}
88
89impl FromParallelIterator<bool> for BooleanChunked {
90 fn from_par_iter<I: IntoParallelIterator<Item = bool>>(iter: I) -> Self {
91 let chunks = collect_into_linked_list(iter, MutableBooleanArray::new);
92 Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
93 }
94}
95
96impl FromParallelIterator<Option<bool>> for BooleanChunked {
97 fn from_par_iter<I: IntoParallelIterator<Item = Option<bool>>>(iter: I) -> Self {
98 let chunks = collect_into_linked_list(iter, MutableBooleanArray::new);
99 Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
100 }
101}
102
103impl<Ptr> FromParallelIterator<Ptr> for StringChunked
104where
105 Ptr: PolarsAsRef<str> + Send + Sync + NoOption,
106{
107 fn from_par_iter<I: IntoParallelIterator<Item = Ptr>>(iter: I) -> Self {
108 let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
109 Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
110 }
111}
112
113impl<Ptr> FromParallelIterator<Ptr> for BinaryChunked
114where
115 Ptr: PolarsAsRef<[u8]> + Send + Sync + NoOption,
116{
117 fn from_par_iter<I: IntoParallelIterator<Item = Ptr>>(iter: I) -> Self {
118 let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
119 Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
120 }
121}
122
123impl<Ptr> FromParallelIterator<Option<Ptr>> for StringChunked
124where
125 Ptr: AsRef<str> + Send + Sync,
126{
127 fn from_par_iter<I: IntoParallelIterator<Item = Option<Ptr>>>(iter: I) -> Self {
128 let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
129 Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
130 }
131}
132
133impl<Ptr> FromParallelIterator<Option<Ptr>> for BinaryChunked
134where
135 Ptr: AsRef<[u8]> + Send + Sync,
136{
137 fn from_par_iter<I: IntoParallelIterator<Item = Option<Ptr>>>(iter: I) -> Self {
138 let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
139 Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
140 }
141}
142
143pub trait FromParIterWithDtype<K> {
144 fn from_par_iter_with_dtype<I>(iter: I, name: PlSmallStr, dtype: DataType) -> Self
145 where
146 I: IntoParallelIterator<Item = K>,
147 Self: Sized;
148}
149
150fn get_value_cap(vectors: &LinkedList<Vec<Option<Series>>>) -> usize {
151 vectors
152 .iter()
153 .map(|list| {
154 list.iter()
155 .map(|opt_s| opt_s.as_ref().map(|s| s.len()).unwrap_or(0))
156 .sum::<usize>()
157 })
158 .sum::<usize>()
159}
160
161fn get_dtype(vectors: &LinkedList<Vec<Option<Series>>>) -> DataType {
162 for v in vectors {
163 for s in v.iter().flatten() {
164 let dtype = s.dtype();
165 if !matches!(dtype, DataType::Null) {
166 return dtype.clone();
167 }
168 }
169 }
170 DataType::Null
171}
172
173fn materialize_list(
174 name: PlSmallStr,
175 vectors: &LinkedList<Vec<Option<Series>>>,
176 dtype: DataType,
177 value_capacity: usize,
178 list_capacity: usize,
179) -> PolarsResult<ListChunked> {
180 let mut builder = get_list_builder(&dtype, value_capacity, list_capacity, name);
181 for v in vectors {
182 for val in v {
183 builder.append_opt_series(val.as_ref())?;
184 }
185 }
186 Ok(builder.finish())
187}
188
189impl FromParallelIterator<Option<Series>> for ListChunked {
190 fn from_par_iter<I>(par_iter: I) -> Self
191 where
192 I: IntoParallelIterator<Item = Option<Series>>,
193 {
194 list_from_par_iter(par_iter, PlSmallStr::EMPTY).unwrap()
195 }
196}
197
198pub fn list_from_par_iter<I>(par_iter: I, name: PlSmallStr) -> PolarsResult<ListChunked>
199where
200 I: IntoParallelIterator<Item = Option<Series>>,
201{
202 let vectors = collect_into_linked_list_vec(par_iter);
203
204 let list_capacity: usize = get_capacity_from_par_results(&vectors);
205 let value_capacity = get_value_cap(&vectors);
206 let dtype = get_dtype(&vectors);
207 if let DataType::Null = dtype {
208 Ok(ListChunked::full_null_with_dtype(
209 name,
210 list_capacity,
211 &DataType::Null,
212 ))
213 } else {
214 materialize_list(name, &vectors, dtype, value_capacity, list_capacity)
215 }
216}
217
218pub fn try_list_from_par_iter<I>(par_iter: I, name: PlSmallStr) -> PolarsResult<ListChunked>
219where
220 I: IntoParallelIterator<Item = PolarsResult<Option<Series>>>,
221{
222 fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ {
223 move |item| match item {
224 Ok(item) => Some(item),
225 Err(error) => {
226 if let Ok(mut guard) = saved.try_lock() {
230 if guard.is_none() {
231 *guard = Some(error);
232 }
233 }
234 None
235 },
236 }
237 }
238
239 let saved_error = Mutex::new(None);
240 let iter = par_iter.into_par_iter().map(ok(&saved_error)).while_some();
241
242 let collection = list_from_par_iter(iter, name)?;
243
244 match saved_error.into_inner().unwrap() {
245 Some(error) => Err(error),
246 None => Ok(collection),
247 }
248}
249
250impl FromParIterWithDtype<Option<Series>> for ListChunked {
251 fn from_par_iter_with_dtype<I>(iter: I, name: PlSmallStr, dtype: DataType) -> Self
252 where
253 I: IntoParallelIterator<Item = Option<Series>>,
254 Self: Sized,
255 {
256 let vectors = collect_into_linked_list_vec(iter);
257
258 let list_capacity: usize = get_capacity_from_par_results(&vectors);
259 let value_capacity = get_value_cap(&vectors);
260 if let DataType::List(dtype) = dtype {
261 materialize_list(name, &vectors, *dtype, value_capacity, list_capacity).unwrap()
262 } else {
263 panic!("expected list dtype")
264 }
265 }
266}
267
268pub trait ChunkedCollectParIterExt: ParallelIterator {
269 fn collect_ca_with_dtype<B: FromParIterWithDtype<Self::Item>>(
270 self,
271 name: PlSmallStr,
272 dtype: DataType,
273 ) -> B
274 where
275 Self: Sized,
276 {
277 B::from_par_iter_with_dtype(self, name, dtype)
278 }
279}
280
281impl<I: ParallelIterator> ChunkedCollectParIterExt for I {}
282
283impl<C, T, E> FromParIterWithDtype<Result<T, E>> for Result<C, E>
285where
286 C: FromParIterWithDtype<T>,
287 T: Send,
288 E: Send,
289{
290 fn from_par_iter_with_dtype<I>(par_iter: I, name: PlSmallStr, dtype: DataType) -> Self
291 where
292 I: IntoParallelIterator<Item = Result<T, E>>,
293 {
294 fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ {
295 move |item| match item {
296 Ok(item) => Some(item),
297 Err(error) => {
298 if let Ok(mut guard) = saved.try_lock() {
302 if guard.is_none() {
303 *guard = Some(error);
304 }
305 }
306 None
307 },
308 }
309 }
310
311 let saved_error = Mutex::new(None);
312 let iter = par_iter.into_par_iter().map(ok(&saved_error)).while_some();
313
314 let collection = C::from_par_iter_with_dtype(iter, name, dtype);
315
316 match saved_error.into_inner().unwrap() {
317 Some(error) => Err(error),
318 None => Ok(collection),
319 }
320 }
321}