polars_core/utils/
flatten.rs

1use arrow::bitmap::MutableBitmap;
2use polars_utils::sync::SyncPtr;
3
4use super::*;
5
6pub fn flatten_df_iter(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
7    df.iter_chunks_physical()
8        .filter(|chunk| !chunk.is_empty())
9        .map(|chunk| {
10            let height = chunk.len();
11            let columns = df
12                .columns()
13                .iter()
14                .zip(chunk.into_arrays())
15                .map(|(s, arr)| {
16                    // SAFETY:
17                    // datatypes are correct
18                    let mut out = unsafe {
19                        Series::from_chunks_and_dtype_unchecked(
20                            s.name().clone(),
21                            vec![arr],
22                            s.dtype(),
23                        )
24                    };
25                    out.set_sorted_flag(s.is_sorted_flag());
26                    Column::from(out)
27                })
28                .collect::<Vec<_>>();
29
30            unsafe { DataFrame::new_unchecked(height, columns) }
31        })
32}
33
34pub fn flatten_series(s: &Series) -> Vec<Series> {
35    let name = s.name();
36    let dtype = s.dtype();
37    unsafe {
38        s.chunks()
39            .iter()
40            .map(|arr| {
41                Series::from_chunks_and_dtype_unchecked(name.clone(), vec![arr.clone()], dtype)
42            })
43            .collect()
44    }
45}
46
47pub fn cap_and_offsets<I>(v: &[Vec<I>]) -> (usize, Vec<usize>) {
48    let cap = v.iter().map(|v| v.len()).sum::<usize>();
49    let offsets = v
50        .iter()
51        .scan(0_usize, |acc, v| {
52            let out = *acc;
53            *acc += v.len();
54            Some(out)
55        })
56        .collect::<Vec<_>>();
57    (cap, offsets)
58}
59
60pub fn flatten_par<T: Send + Sync + Copy, S: AsRef<[T]>>(bufs: &[S]) -> Vec<T> {
61    let mut len = 0;
62    let mut offsets = Vec::with_capacity(bufs.len());
63    let bufs = bufs
64        .iter()
65        .map(|s| {
66            offsets.push(len);
67            let slice = s.as_ref();
68            len += slice.len();
69            slice
70        })
71        .collect::<Vec<_>>();
72    flatten_par_impl(&bufs, len, offsets)
73}
74
75fn flatten_par_impl<T: Send + Sync + Copy>(
76    bufs: &[&[T]],
77    len: usize,
78    offsets: Vec<usize>,
79) -> Vec<T> {
80    let mut out = Vec::with_capacity(len);
81    let out_ptr = unsafe { SyncPtr::new(out.as_mut_ptr()) };
82
83    POOL.install(|| {
84        offsets.into_par_iter().enumerate().for_each(|(i, offset)| {
85            let buf = bufs[i];
86            let ptr: *mut T = out_ptr.get();
87            unsafe {
88                let dst = ptr.add(offset);
89                let src = buf.as_ptr();
90                std::ptr::copy_nonoverlapping(src, dst, buf.len())
91            }
92        })
93    });
94    unsafe {
95        out.set_len(len);
96    }
97    out
98}
99
100pub fn flatten_nullable<S: AsRef<[NullableIdxSize]> + Send + Sync>(
101    bufs: &[S],
102) -> PrimitiveArray<IdxSize> {
103    let a = || flatten_par(bufs);
104    let b = || {
105        let cap = bufs.iter().map(|s| s.as_ref().len()).sum::<usize>();
106        let mut validity = MutableBitmap::with_capacity(cap);
107        validity.extend_constant(cap, true);
108
109        let mut count = 0usize;
110        for s in bufs {
111            let s = s.as_ref();
112
113            for id in s {
114                if id.is_null_idx() {
115                    unsafe { validity.set_unchecked(count, false) };
116                }
117
118                count += 1;
119            }
120        }
121        validity.freeze()
122    };
123
124    let (a, b) = POOL.join(a, b);
125    PrimitiveArray::from_vec(bytemuck::cast_vec::<_, IdxSize>(a)).with_validity(Some(b))
126}