1use std::sync::Arc;
2
3use arrow::array::builder::ShareStrategy;
4use polars_utils::IdxSize;
5
6use crate::frame::DataFrame;
7use crate::prelude::*;
8use crate::schema::Schema;
9use crate::series::builder::SeriesBuilder;
10
11pub struct DataFrameBuilder {
12 schema: Arc<Schema>,
13 builders: Vec<SeriesBuilder>,
14 height: usize,
15}
16
17impl DataFrameBuilder {
18 pub fn new(schema: Arc<Schema>) -> Self {
19 let builders = schema
20 .iter_values()
21 .map(|dt| SeriesBuilder::new(dt.clone()))
22 .collect();
23 Self {
24 schema,
25 builders,
26 height: 0,
27 }
28 }
29
30 pub fn reserve(&mut self, additional: usize) {
31 for builder in &mut self.builders {
32 builder.reserve(additional);
33 }
34 }
35
36 pub fn freeze(self) -> DataFrame {
37 let columns = self
38 .schema
39 .iter_names()
40 .zip(self.builders)
41 .map(|(n, b)| {
42 let s = b.freeze(n.clone());
43 assert!(s.len() == self.height);
44 Column::from(s)
45 })
46 .collect();
47
48 unsafe { DataFrame::new_no_checks(self.height, columns) }
51 }
52
53 pub fn freeze_reset(&mut self) -> DataFrame {
54 let columns = self
55 .schema
56 .iter_names()
57 .zip(&mut self.builders)
58 .map(|(n, b)| {
59 let s = b.freeze_reset(n.clone());
60 assert!(s.len() == self.height);
61 Column::from(s)
62 })
63 .collect();
64
65 let out = unsafe { DataFrame::new_no_checks(self.height, columns) };
68 self.height = 0;
69 out
70 }
71
72 pub fn len(&self) -> usize {
73 self.height
74 }
75
76 pub fn is_empty(&self) -> bool {
77 self.height == 0
78 }
79
80 pub fn extend(&mut self, other: &DataFrame, share: ShareStrategy) {
83 self.subslice_extend(other, 0, other.height(), share);
84 self.height += other.height();
85 }
86
87 pub fn subslice_extend(
90 &mut self,
91 other: &DataFrame,
92 start: usize,
93 length: usize,
94 share: ShareStrategy,
95 ) {
96 let columns = other.get_columns();
97 assert!(self.builders.len() == columns.len());
98 for (builder, column) in self.builders.iter_mut().zip(columns) {
99 match column {
100 Column::Series(s) => {
101 builder.subslice_extend(s, start, length, share);
102 },
103 Column::Scalar(sc) => {
104 let len = sc.len().saturating_sub(start).min(length);
105 let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
106 builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len, share);
107 },
108 }
109 }
110
111 self.height += length.min(other.height().saturating_sub(start));
112 }
113
114 pub fn subslice_extend_repeated(
117 &mut self,
118 other: &DataFrame,
119 start: usize,
120 length: usize,
121 repeats: usize,
122 share: ShareStrategy,
123 ) {
124 let columns = other.get_columns();
125 assert!(self.builders.len() == columns.len());
126 for (builder, column) in self.builders.iter_mut().zip(columns) {
127 match column {
128 Column::Series(s) => {
129 builder.subslice_extend_repeated(s, start, length, repeats, share);
130 },
131 Column::Scalar(sc) => {
132 let len = sc.len().saturating_sub(start).min(length);
133 let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
134 builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len * repeats, share);
135 },
136 }
137 }
138
139 self.height += length.min(other.height().saturating_sub(start)) * repeats;
140 }
141
142 pub fn subslice_extend_each_repeated(
146 &mut self,
147 other: &DataFrame,
148 start: usize,
149 length: usize,
150 repeats: usize,
151 share: ShareStrategy,
152 ) {
153 let columns = other.get_columns();
154 assert!(self.builders.len() == columns.len());
155 for (builder, column) in self.builders.iter_mut().zip(columns) {
156 match column {
157 Column::Series(s) => {
158 builder.subslice_extend_each_repeated(s, start, length, repeats, share);
159 },
160 Column::Scalar(sc) => {
161 let len = sc.len().saturating_sub(start).min(length);
162 let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
163 builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len * repeats, share);
164 },
165 }
166 }
167
168 self.height += length.min(other.height().saturating_sub(start)) * repeats;
169 }
170
171 pub unsafe fn gather_extend(
179 &mut self,
180 other: &DataFrame,
181 idxs: &[IdxSize],
182 share: ShareStrategy,
183 ) {
184 let columns = other.get_columns();
185 assert!(self.builders.len() == columns.len());
186 for (builder, column) in self.builders.iter_mut().zip(columns) {
187 match column {
188 Column::Series(s) => {
189 builder.gather_extend(s, idxs, share);
190 },
191 Column::Scalar(sc) => {
192 let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
193 builder.subslice_extend_repeated(&scalar_as_series, 0, 1, idxs.len(), share);
194 },
195 }
196 }
197
198 self.height += idxs.len();
199 }
200
201 pub fn opt_gather_extend(&mut self, other: &DataFrame, idxs: &[IdxSize], share: ShareStrategy) {
207 let mut trans_idxs = Vec::new();
208 let columns = other.get_columns();
209 assert!(self.builders.len() == columns.len());
210 for (builder, column) in self.builders.iter_mut().zip(columns) {
211 match column {
212 Column::Series(s) => {
213 builder.opt_gather_extend(s, idxs, share);
214 },
215 Column::Scalar(sc) => {
216 let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
217 for idx_chunk in idxs.chunks(4096) {
220 trans_idxs.clear();
221 trans_idxs.extend(
222 idx_chunk
223 .iter()
224 .map(|idx| ((*idx as usize) >= sc.len()) as IdxSize),
225 );
226 builder.opt_gather_extend(&scalar_as_series, &trans_idxs, share);
227 }
228 },
229 }
230 }
231
232 self.height += idxs.len();
233 }
234}