polars_core/frame/column/
partitioned.rs1use std::borrow::Cow;
2use std::convert::identity;
3use std::sync::{Arc, OnceLock};
4
5use polars_error::{PolarsResult, polars_ensure};
6use polars_utils::IdxSize;
7use polars_utils::pl_str::PlSmallStr;
8
9use super::{AnyValue, Column, DataType, Field, IntoColumn, Series};
10use crate::chunked_array::cast::CastOptions;
11use crate::frame::Scalar;
12use crate::series::IsSorted;
13
14#[derive(Debug, Clone)]
15#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
17pub struct PartitionedColumn {
18 name: PlSmallStr,
19
20 values: Series,
21 ends: Arc<[IdxSize]>,
22
23 #[cfg_attr(feature = "serde", serde(skip, default))]
24 materialized: OnceLock<Series>,
25}
26
27impl IntoColumn for PartitionedColumn {
28 fn into_column(self) -> Column {
29 Column::Partitioned(self)
30 }
31}
32
33impl From<PartitionedColumn> for Column {
34 fn from(value: PartitionedColumn) -> Self {
35 value.into_column()
36 }
37}
38
39fn verify_invariants(values: &Series, ends: &[IdxSize]) -> PolarsResult<()> {
40 polars_ensure!(
41 values.len() == ends.len(),
42 ComputeError: "partitioned column `values` length does not match `ends` length ({} != {})",
43 values.len(),
44 ends.len()
45 );
46
47 for vs in ends.windows(2) {
48 polars_ensure!(
49 vs[0] <= vs[1],
50 ComputeError: "partitioned column `ends` are not monotonely non-decreasing",
51 );
52 }
53
54 Ok(())
55}
56
57impl PartitionedColumn {
58 pub fn new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
59 Self::try_new(name, values, ends).unwrap()
60 }
61
62 pub unsafe fn new_unchecked(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
69 if cfg!(debug_assertions) {
70 verify_invariants(&values, ends.as_ref()).unwrap();
71 }
72
73 let values = values.rechunk();
74 Self {
75 name,
76 values,
77 ends,
78 materialized: OnceLock::new(),
79 }
80 }
81
82 pub fn try_new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> PolarsResult<Self> {
83 verify_invariants(&values, ends.as_ref())?;
84
85 Ok(unsafe { Self::new_unchecked(name, values, ends) })
87 }
88
89 pub fn new_empty(name: PlSmallStr, dtype: DataType) -> Self {
90 Self {
91 name,
92 values: Series::new_empty(PlSmallStr::EMPTY, &dtype),
93 ends: Arc::default(),
94
95 materialized: OnceLock::new(),
96 }
97 }
98
99 pub fn len(&self) -> usize {
100 self.ends.last().map_or(0, |last| *last as usize)
101 }
102
103 pub fn is_empty(&self) -> bool {
104 self.len() == 0
105 }
106
107 pub fn name(&self) -> &PlSmallStr {
108 &self.name
109 }
110
111 pub fn dtype(&self) -> &DataType {
112 self.values.dtype()
113 }
114
115 #[inline]
116 pub fn field(&self) -> Cow<'_, Field> {
117 match self.lazy_as_materialized_series() {
118 None => Cow::Owned(Field::new(self.name().clone(), self.dtype().clone())),
119 Some(s) => s.field(),
120 }
121 }
122
123 pub fn rename(&mut self, name: PlSmallStr) -> &mut Self {
124 self.name = name;
125 self
126 }
127
128 fn _to_series(name: PlSmallStr, values: &Series, ends: &[IdxSize]) -> Series {
129 let dtype = values.dtype();
130 let mut column = Column::Series(Series::new_empty(name, dtype).into());
131
132 let mut prev_offset = 0;
133 for (i, &offset) in ends.iter().enumerate() {
134 let length = offset - prev_offset;
136 column
137 .extend(&Column::new_scalar(
138 PlSmallStr::EMPTY,
139 Scalar::new(dtype.clone(), values.get(i).unwrap().into_static()),
140 length as usize,
141 ))
142 .unwrap();
143 prev_offset = offset;
144 }
145
146 debug_assert_eq!(column.len(), prev_offset as usize);
147
148 column.take_materialized_series()
149 }
150
151 fn to_series(&self) -> Series {
153 Self::_to_series(self.name.clone(), &self.values, &self.ends)
154 }
155
156 pub fn lazy_as_materialized_series(&self) -> Option<&Series> {
158 self.materialized.get()
159 }
160
161 pub fn as_materialized_series(&self) -> &Series {
165 self.materialized.get_or_init(|| self.to_series())
166 }
167
168 pub fn take_materialized_series(self) -> Series {
170 self.materialized
171 .into_inner()
172 .unwrap_or_else(|| Self::_to_series(self.name, &self.values, &self.ends))
173 }
174
175 pub fn apply_unary_elementwise(&self, f: impl Fn(&Series) -> Series) -> Self {
176 let result = f(&self.values).rechunk();
177 assert_eq!(self.values.len(), result.len());
178 unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) }
179 }
180
181 pub fn try_apply_unary_elementwise(
182 &self,
183 f: impl Fn(&Series) -> PolarsResult<Series>,
184 ) -> PolarsResult<Self> {
185 let result = f(&self.values)?.rechunk();
186 assert_eq!(self.values.len(), result.len());
187 Ok(unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) })
188 }
189
190 pub fn extend_constant(&self, value: AnyValue, n: usize) -> PolarsResult<Self> {
191 let mut new_ends = self.ends.to_vec();
192 let new_length = (self.len() + n) as IdxSize;
194
195 let values = if !self.is_empty() && self.values.last().value() == &value {
196 *new_ends.last_mut().unwrap() = new_length;
197 self.values.clone()
198 } else {
199 new_ends.push(new_length);
200 self.values.extend_constant(value, 1)?
201 };
202
203 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, new_ends.into()) })
204 }
205
206 pub unsafe fn get_unchecked(&self, index: usize) -> AnyValue<'_> {
207 debug_assert!(index < self.len());
208
209 if index < self.ends[0] as usize {
211 return unsafe { self.get_unchecked(0) };
212 }
213
214 let value_idx = self
215 .ends
216 .binary_search(&(index as IdxSize))
217 .map_or_else(identity, identity);
218
219 self.get_unchecked(value_idx)
220 }
221
222 pub fn min_reduce(&self) -> PolarsResult<Scalar> {
223 self.values.min_reduce()
224 }
225 pub fn max_reduce(&self) -> Result<Scalar, polars_error::PolarsError> {
226 self.values.max_reduce()
227 }
228
229 pub fn reverse(&self) -> Self {
230 let values = self.values.reverse();
231 let mut ends = Vec::with_capacity(self.ends.len());
232
233 let mut offset = 0;
234 ends.extend(self.ends.windows(2).rev().map(|vs| {
235 offset += vs[1] - vs[0];
236 offset
237 }));
238 ends.push(self.len() as IdxSize);
239
240 unsafe { Self::new_unchecked(self.name.clone(), values, ends.into()) }
241 }
242
243 pub fn set_sorted_flag(&mut self, sorted: IsSorted) {
244 self.values.set_sorted_flag(sorted);
245 }
246
247 pub fn cast_with_options(&self, dtype: &DataType, options: CastOptions) -> PolarsResult<Self> {
248 let values = self.values.cast_with_options(dtype, options)?;
249 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
250 }
251
252 pub fn strict_cast(&self, dtype: &DataType) -> PolarsResult<Self> {
253 let values = self.values.strict_cast(dtype)?;
254 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
255 }
256
257 pub fn cast(&self, dtype: &DataType) -> PolarsResult<Self> {
258 let values = self.values.cast(dtype)?;
259 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
260 }
261
262 pub unsafe fn cast_unchecked(&self, dtype: &DataType) -> PolarsResult<Self> {
263 let values = unsafe { self.values.cast_unchecked(dtype) }?;
264 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
265 }
266
267 pub fn null_count(&self) -> usize {
268 match self.lazy_as_materialized_series() {
269 Some(s) => s.null_count(),
270 None => {
271 self.as_materialized_series().null_count()
273 },
274 }
275 }
276
277 pub fn clear(&self) -> Self {
278 Self::new_empty(self.name.clone(), self.values.dtype().clone())
279 }
280
281 pub fn partitions(&self) -> &Series {
282 &self.values
283 }
284 pub fn partition_ends(&self) -> &[IdxSize] {
285 &self.ends
286 }
287
288 pub fn partition_ends_ref(&self) -> &Arc<[IdxSize]> {
289 &self.ends
290 }
291
292 pub fn or_reduce(&self) -> PolarsResult<Scalar> {
293 self.values.or_reduce()
294 }
295
296 pub fn and_reduce(&self) -> PolarsResult<Scalar> {
297 self.values.and_reduce()
298 }
299}