polars_core/utils/
flatten.rs

1use arrow::bitmap::MutableBitmap;
2use polars_utils::sync::SyncPtr;
3
4use super::*;
5
6pub fn flatten_df_iter(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
7    df.iter_chunks_physical().flat_map(|chunk| {
8        let columns = df
9            .iter()
10            .zip(chunk.into_arrays())
11            .map(|(s, arr)| {
12                // SAFETY:
13                // datatypes are correct
14                let mut out = unsafe {
15                    Series::from_chunks_and_dtype_unchecked(s.name().clone(), vec![arr], s.dtype())
16                };
17                out.set_sorted_flag(s.is_sorted_flag());
18                Column::from(out)
19            })
20            .collect::<Vec<_>>();
21
22        let height = DataFrame::infer_height(&columns);
23        let df = unsafe { DataFrame::new_no_checks(height, columns) };
24        if df.is_empty() { None } else { Some(df) }
25    })
26}
27
28pub fn flatten_series(s: &Series) -> Vec<Series> {
29    let name = s.name();
30    let dtype = s.dtype();
31    unsafe {
32        s.chunks()
33            .iter()
34            .map(|arr| {
35                Series::from_chunks_and_dtype_unchecked(name.clone(), vec![arr.clone()], dtype)
36            })
37            .collect()
38    }
39}
40
41pub fn cap_and_offsets<I>(v: &[Vec<I>]) -> (usize, Vec<usize>) {
42    let cap = v.iter().map(|v| v.len()).sum::<usize>();
43    let offsets = v
44        .iter()
45        .scan(0_usize, |acc, v| {
46            let out = *acc;
47            *acc += v.len();
48            Some(out)
49        })
50        .collect::<Vec<_>>();
51    (cap, offsets)
52}
53
54pub fn flatten_par<T: Send + Sync + Copy, S: AsRef<[T]>>(bufs: &[S]) -> Vec<T> {
55    let mut len = 0;
56    let mut offsets = Vec::with_capacity(bufs.len());
57    let bufs = bufs
58        .iter()
59        .map(|s| {
60            offsets.push(len);
61            let slice = s.as_ref();
62            len += slice.len();
63            slice
64        })
65        .collect::<Vec<_>>();
66    flatten_par_impl(&bufs, len, offsets)
67}
68
69fn flatten_par_impl<T: Send + Sync + Copy>(
70    bufs: &[&[T]],
71    len: usize,
72    offsets: Vec<usize>,
73) -> Vec<T> {
74    let mut out = Vec::with_capacity(len);
75    let out_ptr = unsafe { SyncPtr::new(out.as_mut_ptr()) };
76
77    POOL.install(|| {
78        offsets.into_par_iter().enumerate().for_each(|(i, offset)| {
79            let buf = bufs[i];
80            let ptr: *mut T = out_ptr.get();
81            unsafe {
82                let dst = ptr.add(offset);
83                let src = buf.as_ptr();
84                std::ptr::copy_nonoverlapping(src, dst, buf.len())
85            }
86        })
87    });
88    unsafe {
89        out.set_len(len);
90    }
91    out
92}
93
94pub fn flatten_nullable<S: AsRef<[NullableIdxSize]> + Send + Sync>(
95    bufs: &[S],
96) -> PrimitiveArray<IdxSize> {
97    let a = || flatten_par(bufs);
98    let b = || {
99        let cap = bufs.iter().map(|s| s.as_ref().len()).sum::<usize>();
100        let mut validity = MutableBitmap::with_capacity(cap);
101        validity.extend_constant(cap, true);
102
103        let mut count = 0usize;
104        for s in bufs {
105            let s = s.as_ref();
106
107            for id in s {
108                if id.is_null_idx() {
109                    unsafe { validity.set_unchecked(count, false) };
110                }
111
112                count += 1;
113            }
114        }
115        validity.freeze()
116    };
117
118    let (a, b) = POOL.join(a, b);
119    PrimitiveArray::from_vec(bytemuck::cast_vec::<_, IdxSize>(a)).with_validity(Some(b))
120}