polars_core/frame/
builder.rs

1use std::sync::Arc;
2
3use arrow::array::builder::ShareStrategy;
4use polars_utils::IdxSize;
5
6use crate::frame::DataFrame;
7use crate::prelude::*;
8use crate::schema::Schema;
9use crate::series::builder::SeriesBuilder;
10
11pub struct DataFrameBuilder {
12    schema: Arc<Schema>,
13    builders: Vec<SeriesBuilder>,
14    height: usize,
15}
16
17impl DataFrameBuilder {
18    pub fn new(schema: Arc<Schema>) -> Self {
19        let builders = schema
20            .iter_values()
21            .map(|dt| SeriesBuilder::new(dt.clone()))
22            .collect();
23        Self {
24            schema,
25            builders,
26            height: 0,
27        }
28    }
29
30    pub fn reserve(&mut self, additional: usize) {
31        for builder in &mut self.builders {
32            builder.reserve(additional);
33        }
34    }
35
36    pub fn freeze(self) -> DataFrame {
37        let columns = self
38            .schema
39            .iter_names()
40            .zip(self.builders)
41            .map(|(n, b)| {
42                let s = b.freeze(n.clone());
43                assert!(s.len() == self.height);
44                Column::from(s)
45            })
46            .collect();
47
48        // SAFETY: we checked the lengths and the names are unique because they
49        // come from Schema.
50        unsafe { DataFrame::new_no_checks(self.height, columns) }
51    }
52
53    pub fn freeze_reset(&mut self) -> DataFrame {
54        let columns = self
55            .schema
56            .iter_names()
57            .zip(&mut self.builders)
58            .map(|(n, b)| {
59                let s = b.freeze_reset(n.clone());
60                assert!(s.len() == self.height);
61                Column::from(s)
62            })
63            .collect();
64
65        // SAFETY: we checked the lengths and the names are unique because they
66        // come from Schema.
67        let out = unsafe { DataFrame::new_no_checks(self.height, columns) };
68        self.height = 0;
69        out
70    }
71
72    pub fn len(&self) -> usize {
73        self.height
74    }
75
76    pub fn is_empty(&self) -> bool {
77        self.height == 0
78    }
79
80    /// Extends this builder with the contents of the given dataframe. May panic
81    /// if other does not match the schema of this builder.
82    pub fn extend(&mut self, other: &DataFrame, share: ShareStrategy) {
83        self.subslice_extend(other, 0, other.height(), share);
84        self.height += other.height();
85    }
86
87    /// Extends this builder with the contents of the given dataframe subslice.
88    /// May panic if other does not match the schema of this builder.
89    pub fn subslice_extend(
90        &mut self,
91        other: &DataFrame,
92        start: usize,
93        length: usize,
94        share: ShareStrategy,
95    ) {
96        let columns = other.get_columns();
97        assert!(self.builders.len() == columns.len());
98        for (builder, column) in self.builders.iter_mut().zip(columns) {
99            match column {
100                Column::Series(s) => {
101                    builder.subslice_extend(s, start, length, share);
102                },
103                Column::Scalar(sc) => {
104                    let len = sc.len().saturating_sub(start).min(length);
105                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
106                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len, share);
107                },
108            }
109        }
110
111        self.height += length.min(other.height().saturating_sub(start));
112    }
113
114    /// Extends this builder with the contents of the given dataframe subslice, repeating it `repeats` times.
115    /// May panic if other does not match the schema of this builder.
116    pub fn subslice_extend_repeated(
117        &mut self,
118        other: &DataFrame,
119        start: usize,
120        length: usize,
121        repeats: usize,
122        share: ShareStrategy,
123    ) {
124        let columns = other.get_columns();
125        assert!(self.builders.len() == columns.len());
126        for (builder, column) in self.builders.iter_mut().zip(columns) {
127            match column {
128                Column::Series(s) => {
129                    builder.subslice_extend_repeated(s, start, length, repeats, share);
130                },
131                Column::Scalar(sc) => {
132                    let len = sc.len().saturating_sub(start).min(length);
133                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
134                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len * repeats, share);
135                },
136            }
137        }
138
139        self.height += length.min(other.height().saturating_sub(start)) * repeats;
140    }
141
142    /// Extends this builder with the contents of the given dataframe subslice.
143    /// Each element is repeated repeats times. May panic if other does not
144    /// match the schema of this builder.
145    pub fn subslice_extend_each_repeated(
146        &mut self,
147        other: &DataFrame,
148        start: usize,
149        length: usize,
150        repeats: usize,
151        share: ShareStrategy,
152    ) {
153        let columns = other.get_columns();
154        assert!(self.builders.len() == columns.len());
155        for (builder, column) in self.builders.iter_mut().zip(columns) {
156            match column {
157                Column::Series(s) => {
158                    builder.subslice_extend_each_repeated(s, start, length, repeats, share);
159                },
160                Column::Scalar(sc) => {
161                    let len = sc.len().saturating_sub(start).min(length);
162                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
163                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len * repeats, share);
164                },
165            }
166        }
167
168        self.height += length.min(other.height().saturating_sub(start)) * repeats;
169    }
170
171    /// Extends this builder with the contents of the given dataframe at the given
172    /// indices. That is, `other[idxs[i]]` is appended to this builder in order,
173    /// for each i=0..idxs.len(). May panic if other does not match the schema
174    /// of this builder, or if the other dataframe is not rechunked.
175    ///
176    /// # Safety
177    /// The indices must be in-bounds.
178    pub unsafe fn gather_extend(
179        &mut self,
180        other: &DataFrame,
181        idxs: &[IdxSize],
182        share: ShareStrategy,
183    ) {
184        let columns = other.get_columns();
185        assert!(self.builders.len() == columns.len());
186        for (builder, column) in self.builders.iter_mut().zip(columns) {
187            match column {
188                Column::Series(s) => {
189                    builder.gather_extend(s, idxs, share);
190                },
191                Column::Scalar(sc) => {
192                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
193                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, idxs.len(), share);
194                },
195            }
196        }
197
198        self.height += idxs.len();
199    }
200
201    /// Extends this builder with the contents of the given dataframe at the given
202    /// indices. That is, `other[idxs[i]]` is appended to this builder in order,
203    /// for each i=0..idxs.len(). Out-of-bounds indices extend with nulls.
204    /// May panic if other does not match the schema of this builder, or if the
205    /// other dataframe is not rechunked.
206    pub fn opt_gather_extend(&mut self, other: &DataFrame, idxs: &[IdxSize], share: ShareStrategy) {
207        let mut trans_idxs = Vec::new();
208        let columns = other.get_columns();
209        assert!(self.builders.len() == columns.len());
210        for (builder, column) in self.builders.iter_mut().zip(columns) {
211            match column {
212                Column::Series(s) => {
213                    builder.opt_gather_extend(s, idxs, share);
214                },
215                Column::Scalar(sc) => {
216                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
217                    // Reduce call overhead by transforming indices to 0/1 and dispatching to
218                    // opt_gather_extend on the scalar as series.
219                    for idx_chunk in idxs.chunks(4096) {
220                        trans_idxs.clear();
221                        trans_idxs.extend(
222                            idx_chunk
223                                .iter()
224                                .map(|idx| ((*idx as usize) >= sc.len()) as IdxSize),
225                        );
226                        builder.opt_gather_extend(&scalar_as_series, &trans_idxs, share);
227                    }
228                },
229            }
230        }
231
232        self.height += idxs.len();
233    }
234}