polars_core/frame/
builder.rs

1use std::sync::Arc;
2
3use arrow::array::builder::ShareStrategy;
4use polars_utils::IdxSize;
5
6use crate::frame::DataFrame;
7use crate::prelude::*;
8use crate::schema::Schema;
9use crate::series::builder::SeriesBuilder;
10
11pub struct DataFrameBuilder {
12    schema: Arc<Schema>,
13    builders: Vec<SeriesBuilder>,
14    height: usize,
15}
16
17impl DataFrameBuilder {
18    pub fn new(schema: Arc<Schema>) -> Self {
19        let builders = schema
20            .iter_values()
21            .map(|dt| SeriesBuilder::new(dt.clone()))
22            .collect();
23        Self {
24            schema,
25            builders,
26            height: 0,
27        }
28    }
29
30    pub fn reserve(&mut self, additional: usize) {
31        for builder in &mut self.builders {
32            builder.reserve(additional);
33        }
34    }
35
36    pub fn freeze(self) -> DataFrame {
37        let columns = self
38            .schema
39            .iter_names()
40            .zip(self.builders)
41            .map(|(n, b)| {
42                let s = b.freeze(n.clone());
43                assert!(s.len() == self.height);
44                Column::from(s)
45            })
46            .collect();
47
48        // SAFETY: we checked the lengths and the names are unique because they
49        // come from Schema.
50        unsafe { DataFrame::new_no_checks(self.height, columns) }
51    }
52
53    pub fn freeze_reset(&mut self) -> DataFrame {
54        let columns = self
55            .schema
56            .iter_names()
57            .zip(&mut self.builders)
58            .map(|(n, b)| {
59                let s = b.freeze_reset(n.clone());
60                assert!(s.len() == self.height);
61                Column::from(s)
62            })
63            .collect();
64
65        // SAFETY: we checked the lengths and the names are unique because they
66        // come from Schema.
67        let out = unsafe { DataFrame::new_no_checks(self.height, columns) };
68        self.height = 0;
69        out
70    }
71
72    pub fn len(&self) -> usize {
73        self.height
74    }
75
76    pub fn is_empty(&self) -> bool {
77        self.height == 0
78    }
79
80    /// Extends this builder with the contents of the given dataframe. May panic
81    /// if other does not match the schema of this builder.
82    pub fn extend(&mut self, other: &DataFrame, share: ShareStrategy) {
83        self.subslice_extend(other, 0, other.height(), share);
84        self.height += other.height();
85    }
86
87    /// Extends this builder with the contents of the given dataframe subslice.
88    /// May panic if other does not match the schema of this builder.
89    pub fn subslice_extend(
90        &mut self,
91        other: &DataFrame,
92        start: usize,
93        length: usize,
94        share: ShareStrategy,
95    ) {
96        let columns = other.get_columns();
97        assert!(self.builders.len() == columns.len());
98        for (builder, column) in self.builders.iter_mut().zip(columns) {
99            match column {
100                Column::Series(s) => {
101                    builder.subslice_extend(s, start, length, share);
102                },
103                Column::Partitioned(p) => {
104                    // @scalar-opt
105                    builder.subslice_extend(p.as_materialized_series(), start, length, share);
106                },
107                Column::Scalar(sc) => {
108                    let len = sc.len().saturating_sub(start).min(length);
109                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
110                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len, share);
111                },
112            }
113        }
114
115        self.height += length.min(other.height().saturating_sub(start));
116    }
117
118    /// Extends this builder with the contents of the given dataframe subslice, repeating it `repeats` times.
119    /// May panic if other does not match the schema of this builder.
120    pub fn subslice_extend_repeated(
121        &mut self,
122        other: &DataFrame,
123        start: usize,
124        length: usize,
125        repeats: usize,
126        share: ShareStrategy,
127    ) {
128        let columns = other.get_columns();
129        assert!(self.builders.len() == columns.len());
130        for (builder, column) in self.builders.iter_mut().zip(columns) {
131            match column {
132                Column::Series(s) => {
133                    builder.subslice_extend_repeated(s, start, length, repeats, share);
134                },
135                Column::Partitioned(p) => {
136                    // @scalar-opt
137                    builder.subslice_extend_repeated(
138                        p.as_materialized_series(),
139                        start,
140                        length,
141                        repeats,
142                        share,
143                    );
144                },
145                Column::Scalar(sc) => {
146                    let len = sc.len().saturating_sub(start).min(length);
147                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
148                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len * repeats, share);
149                },
150            }
151        }
152
153        self.height += length.min(other.height().saturating_sub(start)) * repeats;
154    }
155
156    /// Extends this builder with the contents of the given dataframe subslice.
157    /// Each element is repeated repeats times. May panic if other does not
158    /// match the schema of this builder.
159    pub fn subslice_extend_each_repeated(
160        &mut self,
161        other: &DataFrame,
162        start: usize,
163        length: usize,
164        repeats: usize,
165        share: ShareStrategy,
166    ) {
167        let columns = other.get_columns();
168        assert!(self.builders.len() == columns.len());
169        for (builder, column) in self.builders.iter_mut().zip(columns) {
170            match column {
171                Column::Series(s) => {
172                    builder.subslice_extend_each_repeated(s, start, length, repeats, share);
173                },
174                Column::Partitioned(p) => {
175                    // @scalar-opt
176                    builder.subslice_extend_each_repeated(
177                        p.as_materialized_series(),
178                        start,
179                        length,
180                        repeats,
181                        share,
182                    );
183                },
184                Column::Scalar(sc) => {
185                    let len = sc.len().saturating_sub(start).min(length);
186                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
187                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len * repeats, share);
188                },
189            }
190        }
191
192        self.height += length.min(other.height().saturating_sub(start)) * repeats;
193    }
194
195    /// Extends this builder with the contents of the given dataframe at the given
196    /// indices. That is, `other[idxs[i]]` is appended to this builder in order,
197    /// for each i=0..idxs.len(). May panic if other does not match the schema
198    /// of this builder, or if the other dataframe is not rechunked.
199    ///
200    /// # Safety
201    /// The indices must be in-bounds.
202    pub unsafe fn gather_extend(
203        &mut self,
204        other: &DataFrame,
205        idxs: &[IdxSize],
206        share: ShareStrategy,
207    ) {
208        let columns = other.get_columns();
209        assert!(self.builders.len() == columns.len());
210        for (builder, column) in self.builders.iter_mut().zip(columns) {
211            match column {
212                Column::Series(s) => {
213                    builder.gather_extend(s, idxs, share);
214                },
215                Column::Partitioned(p) => {
216                    // @scalar-opt
217                    builder.gather_extend(p.as_materialized_series(), idxs, share);
218                },
219                Column::Scalar(sc) => {
220                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
221                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, idxs.len(), share);
222                },
223            }
224        }
225
226        self.height += idxs.len();
227    }
228
229    /// Extends this builder with the contents of the given dataframe at the given
230    /// indices. That is, `other[idxs[i]]` is appended to this builder in order,
231    /// for each i=0..idxs.len(). Out-of-bounds indices extend with nulls.
232    /// May panic if other does not match the schema of this builder, or if the
233    /// other dataframe is not rechunked.
234    pub fn opt_gather_extend(&mut self, other: &DataFrame, idxs: &[IdxSize], share: ShareStrategy) {
235        let mut trans_idxs = Vec::new();
236        let columns = other.get_columns();
237        assert!(self.builders.len() == columns.len());
238        for (builder, column) in self.builders.iter_mut().zip(columns) {
239            match column {
240                Column::Series(s) => {
241                    builder.opt_gather_extend(s, idxs, share);
242                },
243                Column::Partitioned(p) => {
244                    // @scalar-opt
245                    builder.opt_gather_extend(p.as_materialized_series(), idxs, share);
246                },
247                Column::Scalar(sc) => {
248                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
249                    // Reduce call overhead by transforming indices to 0/1 and dispatching to
250                    // opt_gather_extend on the scalar as series.
251                    for idx_chunk in idxs.chunks(4096) {
252                        trans_idxs.clear();
253                        trans_idxs.extend(
254                            idx_chunk
255                                .iter()
256                                .map(|idx| ((*idx as usize) >= sc.len()) as IdxSize),
257                        );
258                        builder.opt_gather_extend(&scalar_as_series, &trans_idxs, share);
259                    }
260                },
261            }
262        }
263
264        self.height += idxs.len();
265    }
266}