polars_core/frame/column/
partitioned.rs1use std::borrow::Cow;
2use std::convert::identity;
3use std::sync::{Arc, OnceLock};
4
5use polars_error::{PolarsResult, polars_ensure};
6use polars_utils::IdxSize;
7use polars_utils::pl_str::PlSmallStr;
8
9use super::{AnyValue, Column, DataType, Field, IntoColumn, Series};
10use crate::chunked_array::cast::CastOptions;
11use crate::frame::Scalar;
12use crate::series::IsSorted;
13
14#[derive(Debug, Clone)]
15#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16pub struct PartitionedColumn {
17 name: PlSmallStr,
18
19 values: Series,
20 ends: Arc<[IdxSize]>,
21
22 #[cfg_attr(feature = "serde", serde(skip))]
23 materialized: OnceLock<Series>,
24}
25
26impl IntoColumn for PartitionedColumn {
27 fn into_column(self) -> Column {
28 Column::Partitioned(self)
29 }
30}
31
32impl From<PartitionedColumn> for Column {
33 fn from(value: PartitionedColumn) -> Self {
34 value.into_column()
35 }
36}
37
38fn verify_invariants(values: &Series, ends: &[IdxSize]) -> PolarsResult<()> {
39 polars_ensure!(
40 values.len() == ends.len(),
41 ComputeError: "partitioned column `values` length does not match `ends` length ({} != {})",
42 values.len(),
43 ends.len()
44 );
45
46 for vs in ends.windows(2) {
47 polars_ensure!(
48 vs[0] <= vs[1],
49 ComputeError: "partitioned column `ends` are not monotonely non-decreasing",
50 );
51 }
52
53 Ok(())
54}
55
56impl PartitionedColumn {
57 pub fn new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
58 Self::try_new(name, values, ends).unwrap()
59 }
60
61 pub unsafe fn new_unchecked(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
68 if cfg!(debug_assertions) {
69 verify_invariants(&values, ends.as_ref()).unwrap();
70 }
71
72 let values = values.rechunk();
73 Self {
74 name,
75 values,
76 ends,
77 materialized: OnceLock::new(),
78 }
79 }
80
81 pub fn try_new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> PolarsResult<Self> {
82 verify_invariants(&values, ends.as_ref())?;
83
84 Ok(unsafe { Self::new_unchecked(name, values, ends) })
86 }
87
88 pub fn new_empty(name: PlSmallStr, dtype: DataType) -> Self {
89 Self {
90 name,
91 values: Series::new_empty(PlSmallStr::EMPTY, &dtype),
92 ends: Arc::default(),
93
94 materialized: OnceLock::new(),
95 }
96 }
97
98 pub fn len(&self) -> usize {
99 self.ends.last().map_or(0, |last| *last as usize)
100 }
101
102 pub fn is_empty(&self) -> bool {
103 self.len() == 0
104 }
105
106 pub fn name(&self) -> &PlSmallStr {
107 &self.name
108 }
109
110 pub fn dtype(&self) -> &DataType {
111 self.values.dtype()
112 }
113
114 #[inline]
115 pub fn field(&self) -> Cow<Field> {
116 match self.lazy_as_materialized_series() {
117 None => Cow::Owned(Field::new(self.name().clone(), self.dtype().clone())),
118 Some(s) => s.field(),
119 }
120 }
121
122 pub fn rename(&mut self, name: PlSmallStr) -> &mut Self {
123 self.name = name;
124 self
125 }
126
127 fn _to_series(name: PlSmallStr, values: &Series, ends: &[IdxSize]) -> Series {
128 let dtype = values.dtype();
129 let mut column = Column::Series(Series::new_empty(name, dtype).into());
130
131 let mut prev_offset = 0;
132 for (i, &offset) in ends.iter().enumerate() {
133 let length = offset - prev_offset;
135 column
136 .extend(&Column::new_scalar(
137 PlSmallStr::EMPTY,
138 Scalar::new(dtype.clone(), values.get(i).unwrap().into_static()),
139 length as usize,
140 ))
141 .unwrap();
142 prev_offset = offset;
143 }
144
145 debug_assert_eq!(column.len(), prev_offset as usize);
146
147 column.take_materialized_series()
148 }
149
150 fn to_series(&self) -> Series {
152 Self::_to_series(self.name.clone(), &self.values, &self.ends)
153 }
154
155 pub fn lazy_as_materialized_series(&self) -> Option<&Series> {
157 self.materialized.get()
158 }
159
160 pub fn as_materialized_series(&self) -> &Series {
164 self.materialized.get_or_init(|| self.to_series())
165 }
166
167 pub fn take_materialized_series(self) -> Series {
169 self.materialized
170 .into_inner()
171 .unwrap_or_else(|| Self::_to_series(self.name, &self.values, &self.ends))
172 }
173
174 pub fn apply_unary_elementwise(&self, f: impl Fn(&Series) -> Series) -> Self {
175 let result = f(&self.values).rechunk();
176 assert_eq!(self.values.len(), result.len());
177 unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) }
178 }
179
180 pub fn try_apply_unary_elementwise(
181 &self,
182 f: impl Fn(&Series) -> PolarsResult<Series>,
183 ) -> PolarsResult<Self> {
184 let result = f(&self.values)?.rechunk();
185 assert_eq!(self.values.len(), result.len());
186 Ok(unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) })
187 }
188
189 pub fn extend_constant(&self, value: AnyValue, n: usize) -> PolarsResult<Self> {
190 let mut new_ends = self.ends.to_vec();
191 let new_length = (self.len() + n) as IdxSize;
193
194 let values = if !self.is_empty() && self.values.last().value() == &value {
195 *new_ends.last_mut().unwrap() = new_length;
196 self.values.clone()
197 } else {
198 new_ends.push(new_length);
199 self.values.extend_constant(value, 1)?
200 };
201
202 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, new_ends.into()) })
203 }
204
205 pub unsafe fn get_unchecked(&self, index: usize) -> AnyValue {
206 debug_assert!(index < self.len());
207
208 if index < self.ends[0] as usize {
210 return unsafe { self.get_unchecked(0) };
211 }
212
213 let value_idx = self
214 .ends
215 .binary_search(&(index as IdxSize))
216 .map_or_else(identity, identity);
217
218 self.get_unchecked(value_idx)
219 }
220
221 pub fn min_reduce(&self) -> PolarsResult<Scalar> {
222 self.values.min_reduce()
223 }
224 pub fn max_reduce(&self) -> Result<Scalar, polars_error::PolarsError> {
225 self.values.max_reduce()
226 }
227
228 pub fn reverse(&self) -> Self {
229 let values = self.values.reverse();
230 let mut ends = Vec::with_capacity(self.ends.len());
231
232 let mut offset = 0;
233 ends.extend(self.ends.windows(2).rev().map(|vs| {
234 offset += vs[1] - vs[0];
235 offset
236 }));
237 ends.push(self.len() as IdxSize);
238
239 unsafe { Self::new_unchecked(self.name.clone(), values, ends.into()) }
240 }
241
242 pub fn set_sorted_flag(&mut self, sorted: IsSorted) {
243 self.values.set_sorted_flag(sorted);
244 }
245
246 pub fn cast_with_options(&self, dtype: &DataType, options: CastOptions) -> PolarsResult<Self> {
247 let values = self.values.cast_with_options(dtype, options)?;
248 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
249 }
250
251 pub fn strict_cast(&self, dtype: &DataType) -> PolarsResult<Self> {
252 let values = self.values.strict_cast(dtype)?;
253 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
254 }
255
256 pub fn cast(&self, dtype: &DataType) -> PolarsResult<Self> {
257 let values = self.values.cast(dtype)?;
258 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
259 }
260
261 pub unsafe fn cast_unchecked(&self, dtype: &DataType) -> PolarsResult<Self> {
262 let values = unsafe { self.values.cast_unchecked(dtype) }?;
263 Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
264 }
265
266 pub fn null_count(&self) -> usize {
267 match self.lazy_as_materialized_series() {
268 Some(s) => s.null_count(),
269 None => {
270 self.as_materialized_series().null_count()
272 },
273 }
274 }
275
276 pub fn clear(&self) -> Self {
277 Self::new_empty(self.name.clone(), self.values.dtype().clone())
278 }
279
280 pub fn partitions(&self) -> &Series {
281 &self.values
282 }
283 pub fn partition_ends(&self) -> &[IdxSize] {
284 &self.ends
285 }
286
287 pub fn or_reduce(&self) -> PolarsResult<Scalar> {
288 self.values.or_reduce()
289 }
290
291 pub fn and_reduce(&self) -> PolarsResult<Scalar> {
292 self.values.and_reduce()
293 }
294}