polars_core/utils/
flatten.rs1use arrow::bitmap::MutableBitmap;
2use polars_utils::sync::SyncPtr;
3
4use super::*;
5
6pub fn flatten_df_iter(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
7 df.iter_chunks_physical().flat_map(|chunk| {
8 let columns = df
9 .iter()
10 .zip(chunk.into_arrays())
11 .map(|(s, arr)| {
12 let mut out = unsafe {
15 Series::from_chunks_and_dtype_unchecked(s.name().clone(), vec![arr], s.dtype())
16 };
17 out.set_sorted_flag(s.is_sorted_flag());
18 Column::from(out)
19 })
20 .collect::<Vec<_>>();
21
22 let height = DataFrame::infer_height(&columns);
23 let df = unsafe { DataFrame::new_no_checks(height, columns) };
24 if df.is_empty() { None } else { Some(df) }
25 })
26}
27
28pub fn flatten_series(s: &Series) -> Vec<Series> {
29 let name = s.name();
30 let dtype = s.dtype();
31 unsafe {
32 s.chunks()
33 .iter()
34 .map(|arr| {
35 Series::from_chunks_and_dtype_unchecked(name.clone(), vec![arr.clone()], dtype)
36 })
37 .collect()
38 }
39}
40
41pub fn cap_and_offsets<I>(v: &[Vec<I>]) -> (usize, Vec<usize>) {
42 let cap = v.iter().map(|v| v.len()).sum::<usize>();
43 let offsets = v
44 .iter()
45 .scan(0_usize, |acc, v| {
46 let out = *acc;
47 *acc += v.len();
48 Some(out)
49 })
50 .collect::<Vec<_>>();
51 (cap, offsets)
52}
53
54pub fn flatten_par<T: Send + Sync + Copy, S: AsRef<[T]>>(bufs: &[S]) -> Vec<T> {
55 let mut len = 0;
56 let mut offsets = Vec::with_capacity(bufs.len());
57 let bufs = bufs
58 .iter()
59 .map(|s| {
60 offsets.push(len);
61 let slice = s.as_ref();
62 len += slice.len();
63 slice
64 })
65 .collect::<Vec<_>>();
66 flatten_par_impl(&bufs, len, offsets)
67}
68
69fn flatten_par_impl<T: Send + Sync + Copy>(
70 bufs: &[&[T]],
71 len: usize,
72 offsets: Vec<usize>,
73) -> Vec<T> {
74 let mut out = Vec::with_capacity(len);
75 let out_ptr = unsafe { SyncPtr::new(out.as_mut_ptr()) };
76
77 POOL.install(|| {
78 offsets.into_par_iter().enumerate().for_each(|(i, offset)| {
79 let buf = bufs[i];
80 let ptr: *mut T = out_ptr.get();
81 unsafe {
82 let dst = ptr.add(offset);
83 let src = buf.as_ptr();
84 std::ptr::copy_nonoverlapping(src, dst, buf.len())
85 }
86 })
87 });
88 unsafe {
89 out.set_len(len);
90 }
91 out
92}
93
94pub fn flatten_nullable<S: AsRef<[NullableIdxSize]> + Send + Sync>(
95 bufs: &[S],
96) -> PrimitiveArray<IdxSize> {
97 let a = || flatten_par(bufs);
98 let b = || {
99 let cap = bufs.iter().map(|s| s.as_ref().len()).sum::<usize>();
100 let mut validity = MutableBitmap::with_capacity(cap);
101 validity.extend_constant(cap, true);
102
103 let mut count = 0usize;
104 for s in bufs {
105 let s = s.as_ref();
106
107 for id in s {
108 if id.is_null_idx() {
109 unsafe { validity.set_unchecked(count, false) };
110 }
111
112 count += 1;
113 }
114 }
115 validity.freeze()
116 };
117
118 let (a, b) = POOL.join(a, b);
119 PrimitiveArray::from_vec(bytemuck::cast_vec::<_, IdxSize>(a)).with_validity(Some(b))
120}