polars_core/frame/column/
partitioned.rs

1use std::borrow::Cow;
2use std::convert::identity;
3use std::sync::{Arc, OnceLock};
4
5use polars_error::{PolarsResult, polars_ensure};
6use polars_utils::IdxSize;
7use polars_utils::pl_str::PlSmallStr;
8
9use super::{AnyValue, Column, DataType, Field, IntoColumn, Series};
10use crate::chunked_array::cast::CastOptions;
11use crate::frame::Scalar;
12use crate::series::IsSorted;
13
14#[derive(Debug, Clone)]
15#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16pub struct PartitionedColumn {
17    name: PlSmallStr,
18
19    values: Series,
20    ends: Arc<[IdxSize]>,
21
22    #[cfg_attr(feature = "serde", serde(skip))]
23    materialized: OnceLock<Series>,
24}
25
26impl IntoColumn for PartitionedColumn {
27    fn into_column(self) -> Column {
28        Column::Partitioned(self)
29    }
30}
31
32impl From<PartitionedColumn> for Column {
33    fn from(value: PartitionedColumn) -> Self {
34        value.into_column()
35    }
36}
37
38fn verify_invariants(values: &Series, ends: &[IdxSize]) -> PolarsResult<()> {
39    polars_ensure!(
40        values.len() == ends.len(),
41        ComputeError: "partitioned column `values` length does not match `ends` length ({} != {})",
42        values.len(),
43        ends.len()
44    );
45
46    for vs in ends.windows(2) {
47        polars_ensure!(
48            vs[0] <= vs[1],
49            ComputeError: "partitioned column `ends` are not monotonely non-decreasing",
50        );
51    }
52
53    Ok(())
54}
55
56impl PartitionedColumn {
57    pub fn new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
58        Self::try_new(name, values, ends).unwrap()
59    }
60
61    /// # Safety
62    ///
63    /// Safe if:
64    /// - `values.len() == ends.len()`
65    /// - all values can have `dtype`
66    /// - `ends` is monotonely non-decreasing
67    pub unsafe fn new_unchecked(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
68        if cfg!(debug_assertions) {
69            verify_invariants(&values, ends.as_ref()).unwrap();
70        }
71
72        let values = values.rechunk();
73        Self {
74            name,
75            values,
76            ends,
77            materialized: OnceLock::new(),
78        }
79    }
80
81    pub fn try_new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> PolarsResult<Self> {
82        verify_invariants(&values, ends.as_ref())?;
83
84        // SAFETY: Invariants checked before
85        Ok(unsafe { Self::new_unchecked(name, values, ends) })
86    }
87
88    pub fn new_empty(name: PlSmallStr, dtype: DataType) -> Self {
89        Self {
90            name,
91            values: Series::new_empty(PlSmallStr::EMPTY, &dtype),
92            ends: Arc::default(),
93
94            materialized: OnceLock::new(),
95        }
96    }
97
98    pub fn len(&self) -> usize {
99        self.ends.last().map_or(0, |last| *last as usize)
100    }
101
102    pub fn is_empty(&self) -> bool {
103        self.len() == 0
104    }
105
106    pub fn name(&self) -> &PlSmallStr {
107        &self.name
108    }
109
110    pub fn dtype(&self) -> &DataType {
111        self.values.dtype()
112    }
113
114    #[inline]
115    pub fn field(&self) -> Cow<Field> {
116        match self.lazy_as_materialized_series() {
117            None => Cow::Owned(Field::new(self.name().clone(), self.dtype().clone())),
118            Some(s) => s.field(),
119        }
120    }
121
122    pub fn rename(&mut self, name: PlSmallStr) -> &mut Self {
123        self.name = name;
124        self
125    }
126
127    fn _to_series(name: PlSmallStr, values: &Series, ends: &[IdxSize]) -> Series {
128        let dtype = values.dtype();
129        let mut column = Column::Series(Series::new_empty(name, dtype).into());
130
131        let mut prev_offset = 0;
132        for (i, &offset) in ends.iter().enumerate() {
133            // @TODO: Optimize
134            let length = offset - prev_offset;
135            column
136                .extend(&Column::new_scalar(
137                    PlSmallStr::EMPTY,
138                    Scalar::new(dtype.clone(), values.get(i).unwrap().into_static()),
139                    length as usize,
140                ))
141                .unwrap();
142            prev_offset = offset;
143        }
144
145        debug_assert_eq!(column.len(), prev_offset as usize);
146
147        column.take_materialized_series()
148    }
149
150    /// Materialize the [`PartitionedColumn`] into a [`Series`].
151    fn to_series(&self) -> Series {
152        Self::_to_series(self.name.clone(), &self.values, &self.ends)
153    }
154
155    /// Get the [`PartitionedColumn`] as [`Series`] if it was already materialized.
156    pub fn lazy_as_materialized_series(&self) -> Option<&Series> {
157        self.materialized.get()
158    }
159
160    /// Get the [`PartitionedColumn`] as [`Series`]
161    ///
162    /// This needs to materialize upon the first call. Afterwards, this is cached.
163    pub fn as_materialized_series(&self) -> &Series {
164        self.materialized.get_or_init(|| self.to_series())
165    }
166
167    /// Take the [`PartitionedColumn`] and materialize as a [`Series`] if not already done.
168    pub fn take_materialized_series(self) -> Series {
169        self.materialized
170            .into_inner()
171            .unwrap_or_else(|| Self::_to_series(self.name, &self.values, &self.ends))
172    }
173
174    pub fn apply_unary_elementwise(&self, f: impl Fn(&Series) -> Series) -> Self {
175        let result = f(&self.values).rechunk();
176        assert_eq!(self.values.len(), result.len());
177        unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) }
178    }
179
180    pub fn try_apply_unary_elementwise(
181        &self,
182        f: impl Fn(&Series) -> PolarsResult<Series>,
183    ) -> PolarsResult<Self> {
184        let result = f(&self.values)?.rechunk();
185        assert_eq!(self.values.len(), result.len());
186        Ok(unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) })
187    }
188
189    pub fn extend_constant(&self, value: AnyValue, n: usize) -> PolarsResult<Self> {
190        let mut new_ends = self.ends.to_vec();
191        // @TODO: IdxSize checks
192        let new_length = (self.len() + n) as IdxSize;
193
194        let values = if !self.is_empty() && self.values.last().value() == &value {
195            *new_ends.last_mut().unwrap() = new_length;
196            self.values.clone()
197        } else {
198            new_ends.push(new_length);
199            self.values.extend_constant(value, 1)?
200        };
201
202        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, new_ends.into()) })
203    }
204
205    pub unsafe fn get_unchecked(&self, index: usize) -> AnyValue {
206        debug_assert!(index < self.len());
207
208        // Common situation get_unchecked(0)
209        if index < self.ends[0] as usize {
210            return unsafe { self.get_unchecked(0) };
211        }
212
213        let value_idx = self
214            .ends
215            .binary_search(&(index as IdxSize))
216            .map_or_else(identity, identity);
217
218        self.get_unchecked(value_idx)
219    }
220
221    pub fn min_reduce(&self) -> PolarsResult<Scalar> {
222        self.values.min_reduce()
223    }
224    pub fn max_reduce(&self) -> Result<Scalar, polars_error::PolarsError> {
225        self.values.max_reduce()
226    }
227
228    pub fn reverse(&self) -> Self {
229        let values = self.values.reverse();
230        let mut ends = Vec::with_capacity(self.ends.len());
231
232        let mut offset = 0;
233        ends.extend(self.ends.windows(2).rev().map(|vs| {
234            offset += vs[1] - vs[0];
235            offset
236        }));
237        ends.push(self.len() as IdxSize);
238
239        unsafe { Self::new_unchecked(self.name.clone(), values, ends.into()) }
240    }
241
242    pub fn set_sorted_flag(&mut self, sorted: IsSorted) {
243        self.values.set_sorted_flag(sorted);
244    }
245
246    pub fn cast_with_options(&self, dtype: &DataType, options: CastOptions) -> PolarsResult<Self> {
247        let values = self.values.cast_with_options(dtype, options)?;
248        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
249    }
250
251    pub fn strict_cast(&self, dtype: &DataType) -> PolarsResult<Self> {
252        let values = self.values.strict_cast(dtype)?;
253        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
254    }
255
256    pub fn cast(&self, dtype: &DataType) -> PolarsResult<Self> {
257        let values = self.values.cast(dtype)?;
258        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
259    }
260
261    pub unsafe fn cast_unchecked(&self, dtype: &DataType) -> PolarsResult<Self> {
262        let values = unsafe { self.values.cast_unchecked(dtype) }?;
263        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
264    }
265
266    pub fn null_count(&self) -> usize {
267        match self.lazy_as_materialized_series() {
268            Some(s) => s.null_count(),
269            None => {
270                // @partition-opt
271                self.as_materialized_series().null_count()
272            },
273        }
274    }
275
276    pub fn clear(&self) -> Self {
277        Self::new_empty(self.name.clone(), self.values.dtype().clone())
278    }
279
280    pub fn partitions(&self) -> &Series {
281        &self.values
282    }
283    pub fn partition_ends(&self) -> &[IdxSize] {
284        &self.ends
285    }
286
287    pub fn or_reduce(&self) -> PolarsResult<Scalar> {
288        self.values.or_reduce()
289    }
290
291    pub fn and_reduce(&self) -> PolarsResult<Scalar> {
292        self.values.and_reduce()
293    }
294}