polars_core/chunked_array/
from_iterator_par.rs

1//! Implementations of upstream traits for [`ChunkedArray<T>`]
2use std::collections::LinkedList;
3use std::sync::Mutex;
4
5use arrow::pushable::{NoOption, Pushable};
6use rayon::prelude::*;
7
8use super::from_iterator::PolarsAsRef;
9use crate::chunked_array::builder::get_list_builder;
10use crate::prelude::*;
11use crate::utils::NoNull;
12use crate::utils::flatten::flatten_par;
13
14/// FromParallelIterator trait
15// Code taken from https://docs.rs/rayon/1.3.1/src/rayon/iter/extend.rs.html#356-366
16fn vec_push<T>(mut vec: Vec<T>, elem: T) -> Vec<T> {
17    vec.push(elem);
18    vec
19}
20
21fn as_list<T>(item: T) -> LinkedList<T> {
22    let mut list = LinkedList::new();
23    list.push_back(item);
24    list
25}
26
27fn list_append<T>(mut list1: LinkedList<T>, mut list2: LinkedList<T>) -> LinkedList<T> {
28    list1.append(&mut list2);
29    list1
30}
31
32fn collect_into_linked_list_vec<I>(par_iter: I) -> LinkedList<Vec<I::Item>>
33where
34    I: IntoParallelIterator,
35{
36    let it = par_iter.into_par_iter();
37    // be careful optimizing allocations. Its hard to figure out the size
38    // needed
39    // https://github.com/pola-rs/polars/issues/1562
40    it.fold(Vec::new, vec_push)
41        .map(as_list)
42        .reduce(LinkedList::new, list_append)
43}
44
45fn collect_into_linked_list<I, P, F>(par_iter: I, identity: F) -> LinkedList<P::Freeze>
46where
47    I: IntoParallelIterator,
48    P: Pushable<I::Item> + Send + Sync,
49    F: Fn() -> P + Sync + Send,
50    P::Freeze: Send,
51{
52    let it = par_iter.into_par_iter();
53    it.fold(identity, |mut v, item| {
54        v.push(item);
55        v
56    })
57    // The freeze on this line, ensures the null count is done in parallel
58    .map(|p| as_list(p.freeze()))
59    .reduce(LinkedList::new, list_append)
60}
61
62fn get_capacity_from_par_results<T>(ll: &LinkedList<Vec<T>>) -> usize {
63    ll.iter().map(|list| list.len()).sum()
64}
65
66impl<T> FromParallelIterator<T::Native> for NoNull<ChunkedArray<T>>
67where
68    T: PolarsNumericType,
69{
70    fn from_par_iter<I: IntoParallelIterator<Item = T::Native>>(iter: I) -> Self {
71        // Get linkedlist filled with different vec result from different threads
72        let vectors = collect_into_linked_list_vec(iter);
73        let vectors = vectors.into_iter().collect::<Vec<_>>();
74        let values = flatten_par(&vectors);
75        NoNull::new(ChunkedArray::new_vec(PlSmallStr::EMPTY, values))
76    }
77}
78
79impl<T> FromParallelIterator<Option<T::Native>> for ChunkedArray<T>
80where
81    T: PolarsNumericType,
82{
83    fn from_par_iter<I: IntoParallelIterator<Item = Option<T::Native>>>(iter: I) -> Self {
84        let chunks = collect_into_linked_list(iter, MutablePrimitiveArray::new);
85        Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
86    }
87}
88
89impl FromParallelIterator<bool> for BooleanChunked {
90    fn from_par_iter<I: IntoParallelIterator<Item = bool>>(iter: I) -> Self {
91        let chunks = collect_into_linked_list(iter, MutableBooleanArray::new);
92        Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
93    }
94}
95
96impl FromParallelIterator<Option<bool>> for BooleanChunked {
97    fn from_par_iter<I: IntoParallelIterator<Item = Option<bool>>>(iter: I) -> Self {
98        let chunks = collect_into_linked_list(iter, MutableBooleanArray::new);
99        Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
100    }
101}
102
103impl<Ptr> FromParallelIterator<Ptr> for StringChunked
104where
105    Ptr: PolarsAsRef<str> + Send + Sync + NoOption,
106{
107    fn from_par_iter<I: IntoParallelIterator<Item = Ptr>>(iter: I) -> Self {
108        let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
109        Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
110    }
111}
112
113impl<Ptr> FromParallelIterator<Ptr> for BinaryChunked
114where
115    Ptr: PolarsAsRef<[u8]> + Send + Sync + NoOption,
116{
117    fn from_par_iter<I: IntoParallelIterator<Item = Ptr>>(iter: I) -> Self {
118        let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
119        Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
120    }
121}
122
123impl<Ptr> FromParallelIterator<Option<Ptr>> for StringChunked
124where
125    Ptr: AsRef<str> + Send + Sync,
126{
127    fn from_par_iter<I: IntoParallelIterator<Item = Option<Ptr>>>(iter: I) -> Self {
128        let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
129        Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
130    }
131}
132
133impl<Ptr> FromParallelIterator<Option<Ptr>> for BinaryChunked
134where
135    Ptr: AsRef<[u8]> + Send + Sync,
136{
137    fn from_par_iter<I: IntoParallelIterator<Item = Option<Ptr>>>(iter: I) -> Self {
138        let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
139        Self::from_chunk_iter(PlSmallStr::EMPTY, chunks).optional_rechunk()
140    }
141}
142
143pub trait FromParIterWithDtype<K> {
144    fn from_par_iter_with_dtype<I>(iter: I, name: PlSmallStr, dtype: DataType) -> Self
145    where
146        I: IntoParallelIterator<Item = K>,
147        Self: Sized;
148}
149
150fn get_value_cap(vectors: &LinkedList<Vec<Option<Series>>>) -> usize {
151    vectors
152        .iter()
153        .map(|list| {
154            list.iter()
155                .map(|opt_s| opt_s.as_ref().map(|s| s.len()).unwrap_or(0))
156                .sum::<usize>()
157        })
158        .sum::<usize>()
159}
160
161fn get_dtype(vectors: &LinkedList<Vec<Option<Series>>>) -> DataType {
162    for v in vectors {
163        for s in v.iter().flatten() {
164            let dtype = s.dtype();
165            if !matches!(dtype, DataType::Null) {
166                return dtype.clone();
167            }
168        }
169    }
170    DataType::Null
171}
172
173fn materialize_list(
174    name: PlSmallStr,
175    vectors: &LinkedList<Vec<Option<Series>>>,
176    dtype: DataType,
177    value_capacity: usize,
178    list_capacity: usize,
179) -> PolarsResult<ListChunked> {
180    let mut builder = get_list_builder(&dtype, value_capacity, list_capacity, name);
181    for v in vectors {
182        for val in v {
183            builder.append_opt_series(val.as_ref())?;
184        }
185    }
186    Ok(builder.finish())
187}
188
189impl FromParallelIterator<Option<Series>> for ListChunked {
190    fn from_par_iter<I>(par_iter: I) -> Self
191    where
192        I: IntoParallelIterator<Item = Option<Series>>,
193    {
194        list_from_par_iter(par_iter, PlSmallStr::EMPTY).unwrap()
195    }
196}
197
198pub fn list_from_par_iter<I>(par_iter: I, name: PlSmallStr) -> PolarsResult<ListChunked>
199where
200    I: IntoParallelIterator<Item = Option<Series>>,
201{
202    let vectors = collect_into_linked_list_vec(par_iter);
203
204    let list_capacity: usize = get_capacity_from_par_results(&vectors);
205    let value_capacity = get_value_cap(&vectors);
206    let dtype = get_dtype(&vectors);
207    if let DataType::Null = dtype {
208        Ok(ListChunked::full_null_with_dtype(
209            name,
210            list_capacity,
211            &DataType::Null,
212        ))
213    } else {
214        materialize_list(name, &vectors, dtype, value_capacity, list_capacity)
215    }
216}
217
218pub fn try_list_from_par_iter<I>(par_iter: I, name: PlSmallStr) -> PolarsResult<ListChunked>
219where
220    I: IntoParallelIterator<Item = PolarsResult<Option<Series>>>,
221{
222    fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ {
223        move |item| match item {
224            Ok(item) => Some(item),
225            Err(error) => {
226                // We don't need a blocking `lock()`, as anybody
227                // else holding the lock will also be writing
228                // `Some(error)`, and then ours is irrelevant.
229                if let Ok(mut guard) = saved.try_lock() {
230                    if guard.is_none() {
231                        *guard = Some(error);
232                    }
233                }
234                None
235            },
236        }
237    }
238
239    let saved_error = Mutex::new(None);
240    let iter = par_iter.into_par_iter().map(ok(&saved_error)).while_some();
241
242    let collection = list_from_par_iter(iter, name)?;
243
244    match saved_error.into_inner().unwrap() {
245        Some(error) => Err(error),
246        None => Ok(collection),
247    }
248}
249
250impl FromParIterWithDtype<Option<Series>> for ListChunked {
251    fn from_par_iter_with_dtype<I>(iter: I, name: PlSmallStr, dtype: DataType) -> Self
252    where
253        I: IntoParallelIterator<Item = Option<Series>>,
254        Self: Sized,
255    {
256        let vectors = collect_into_linked_list_vec(iter);
257
258        let list_capacity: usize = get_capacity_from_par_results(&vectors);
259        let value_capacity = get_value_cap(&vectors);
260        if let DataType::List(dtype) = dtype {
261            materialize_list(name, &vectors, *dtype, value_capacity, list_capacity).unwrap()
262        } else {
263            panic!("expected list dtype")
264        }
265    }
266}
267
268pub trait ChunkedCollectParIterExt: ParallelIterator {
269    fn collect_ca_with_dtype<B: FromParIterWithDtype<Self::Item>>(
270        self,
271        name: PlSmallStr,
272        dtype: DataType,
273    ) -> B
274    where
275        Self: Sized,
276    {
277        B::from_par_iter_with_dtype(self, name, dtype)
278    }
279}
280
281impl<I: ParallelIterator> ChunkedCollectParIterExt for I {}
282
283// Adapted from rayon
284impl<C, T, E> FromParIterWithDtype<Result<T, E>> for Result<C, E>
285where
286    C: FromParIterWithDtype<T>,
287    T: Send,
288    E: Send,
289{
290    fn from_par_iter_with_dtype<I>(par_iter: I, name: PlSmallStr, dtype: DataType) -> Self
291    where
292        I: IntoParallelIterator<Item = Result<T, E>>,
293    {
294        fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ {
295            move |item| match item {
296                Ok(item) => Some(item),
297                Err(error) => {
298                    // We don't need a blocking `lock()`, as anybody
299                    // else holding the lock will also be writing
300                    // `Some(error)`, and then ours is irrelevant.
301                    if let Ok(mut guard) = saved.try_lock() {
302                        if guard.is_none() {
303                            *guard = Some(error);
304                        }
305                    }
306                    None
307                },
308            }
309        }
310
311        let saved_error = Mutex::new(None);
312        let iter = par_iter.into_par_iter().map(ok(&saved_error)).while_some();
313
314        let collection = C::from_par_iter_with_dtype(iter, name, dtype);
315
316        match saved_error.into_inner().unwrap() {
317            Some(error) => Err(error),
318            None => Ok(collection),
319        }
320    }
321}