polars_core/frame/
builder.rs

1use std::sync::Arc;
2
3use arrow::array::builder::ShareStrategy;
4use polars_utils::IdxSize;
5
6use crate::frame::DataFrame;
7use crate::prelude::*;
8use crate::schema::Schema;
9use crate::series::builder::SeriesBuilder;
10
11pub struct DataFrameBuilder {
12    schema: Arc<Schema>,
13    builders: Vec<SeriesBuilder>,
14    height: usize,
15}
16
17impl DataFrameBuilder {
18    pub fn new(schema: Arc<Schema>) -> Self {
19        let builders = schema
20            .iter_values()
21            .map(|dt| SeriesBuilder::new(dt.clone()))
22            .collect();
23        Self {
24            schema,
25            builders,
26            height: 0,
27        }
28    }
29
30    pub fn reserve(&mut self, additional: usize) {
31        for builder in &mut self.builders {
32            builder.reserve(additional);
33        }
34    }
35
36    pub fn freeze(self) -> DataFrame {
37        let columns = self
38            .schema
39            .iter_names()
40            .zip(self.builders)
41            .map(|(n, b)| {
42                let s = b.freeze(n.clone());
43                assert!(s.len() == self.height);
44                Column::from(s)
45            })
46            .collect();
47
48        // SAFETY: we checked the lengths and the names are unique because they
49        // come from Schema.
50        unsafe { DataFrame::new_no_checks(self.height, columns) }
51    }
52
53    pub fn freeze_reset(&mut self) -> DataFrame {
54        let columns = self
55            .schema
56            .iter_names()
57            .zip(&mut self.builders)
58            .map(|(n, b)| {
59                let s = b.freeze_reset(n.clone());
60                assert!(s.len() == self.height);
61                Column::from(s)
62            })
63            .collect();
64
65        // SAFETY: we checked the lengths and the names are unique because they
66        // come from Schema.
67        let out = unsafe { DataFrame::new_no_checks(self.height, columns) };
68        self.height = 0;
69        out
70    }
71
72    pub fn len(&self) -> usize {
73        self.height
74    }
75
76    pub fn is_empty(&self) -> bool {
77        self.height == 0
78    }
79
80    /// Extends this builder with the contents of the given dataframe. May panic
81    /// if other does not match the schema of this builder.
82    pub fn extend(&mut self, other: &DataFrame, share: ShareStrategy) {
83        self.subslice_extend(other, 0, other.height(), share);
84        self.height += other.height();
85    }
86
87    /// Extends this builder with the contents of the given dataframe subslice.
88    /// May panic if other does not match the schema of this builder.
89    pub fn subslice_extend(
90        &mut self,
91        other: &DataFrame,
92        start: usize,
93        length: usize,
94        share: ShareStrategy,
95    ) {
96        let columns = other.get_columns();
97        assert!(self.builders.len() == columns.len());
98        for (builder, column) in self.builders.iter_mut().zip(columns) {
99            match column {
100                Column::Series(s) => {
101                    builder.subslice_extend(s, start, length, share);
102                },
103                Column::Partitioned(p) => {
104                    // @scalar-opt
105                    builder.subslice_extend(p.as_materialized_series(), start, length, share);
106                },
107                Column::Scalar(sc) => {
108                    let len = sc.len().saturating_sub(start).min(length);
109                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
110                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, len, share);
111                },
112            }
113        }
114
115        self.height += length.min(other.height().saturating_sub(start));
116    }
117
118    /// Extends this builder with the contents of the given dataframe at the given
119    /// indices. That is, `other[idxs[i]]` is appended to this builder in order,
120    /// for each i=0..idxs.len(). May panic if other does not match the schema
121    /// of this builder, or if the other dataframe is not rechunked.
122    ///
123    /// # Safety
124    /// The indices must be in-bounds.
125    pub unsafe fn gather_extend(
126        &mut self,
127        other: &DataFrame,
128        idxs: &[IdxSize],
129        share: ShareStrategy,
130    ) {
131        let columns = other.get_columns();
132        assert!(self.builders.len() == columns.len());
133        for (builder, column) in self.builders.iter_mut().zip(columns) {
134            match column {
135                Column::Series(s) => {
136                    builder.gather_extend(s, idxs, share);
137                },
138                Column::Partitioned(p) => {
139                    // @scalar-opt
140                    builder.gather_extend(p.as_materialized_series(), idxs, share);
141                },
142                Column::Scalar(sc) => {
143                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
144                    builder.subslice_extend_repeated(&scalar_as_series, 0, 1, idxs.len(), share);
145                },
146            }
147        }
148
149        self.height += idxs.len();
150    }
151
152    /// Extends this builder with the contents of the given dataframe at the given
153    /// indices. That is, `other[idxs[i]]` is appended to this builder in order,
154    /// for each i=0..idxs.len(). Out-of-bounds indices extend with nulls.
155    /// May panic if other does not match the schema of this builder, or if the
156    /// other dataframe is not rechunked.
157    pub fn opt_gather_extend(&mut self, other: &DataFrame, idxs: &[IdxSize], share: ShareStrategy) {
158        let mut trans_idxs = Vec::new();
159        let columns = other.get_columns();
160        assert!(self.builders.len() == columns.len());
161        for (builder, column) in self.builders.iter_mut().zip(columns) {
162            match column {
163                Column::Series(s) => {
164                    builder.opt_gather_extend(s, idxs, share);
165                },
166                Column::Partitioned(p) => {
167                    // @scalar-opt
168                    builder.opt_gather_extend(p.as_materialized_series(), idxs, share);
169                },
170                Column::Scalar(sc) => {
171                    let scalar_as_series = sc.scalar().clone().into_series(PlSmallStr::default());
172                    // Reduce call overhead by transforming indices to 0/1 and dispatching to
173                    // opt_gather_extend on the scalar as series.
174                    for idx_chunk in idxs.chunks(4096) {
175                        trans_idxs.clear();
176                        trans_idxs.extend(
177                            idx_chunk
178                                .iter()
179                                .map(|idx| ((*idx as usize) >= sc.len()) as IdxSize),
180                        );
181                        builder.opt_gather_extend(&scalar_as_series, &trans_idxs, share);
182                    }
183                },
184            }
185        }
186
187        self.height += idxs.len();
188    }
189}