polars_core/frame/column/
partitioned.rs

1use std::borrow::Cow;
2use std::convert::identity;
3use std::sync::{Arc, OnceLock};
4
5use polars_error::{PolarsResult, polars_ensure};
6use polars_utils::IdxSize;
7use polars_utils::pl_str::PlSmallStr;
8
9use super::{AnyValue, Column, DataType, Field, IntoColumn, Series};
10use crate::chunked_array::cast::CastOptions;
11use crate::frame::Scalar;
12use crate::series::IsSorted;
13
14#[derive(Debug, Clone)]
15#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
17pub struct PartitionedColumn {
18    name: PlSmallStr,
19
20    values: Series,
21    ends: Arc<[IdxSize]>,
22
23    #[cfg_attr(feature = "serde", serde(skip, default))]
24    materialized: OnceLock<Series>,
25}
26
27impl IntoColumn for PartitionedColumn {
28    fn into_column(self) -> Column {
29        Column::Partitioned(self)
30    }
31}
32
33impl From<PartitionedColumn> for Column {
34    fn from(value: PartitionedColumn) -> Self {
35        value.into_column()
36    }
37}
38
39fn verify_invariants(values: &Series, ends: &[IdxSize]) -> PolarsResult<()> {
40    polars_ensure!(
41        values.len() == ends.len(),
42        ComputeError: "partitioned column `values` length does not match `ends` length ({} != {})",
43        values.len(),
44        ends.len()
45    );
46
47    for vs in ends.windows(2) {
48        polars_ensure!(
49            vs[0] <= vs[1],
50            ComputeError: "partitioned column `ends` are not monotonely non-decreasing",
51        );
52    }
53
54    Ok(())
55}
56
57impl PartitionedColumn {
58    pub fn new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
59        Self::try_new(name, values, ends).unwrap()
60    }
61
62    /// # Safety
63    ///
64    /// Safe if:
65    /// - `values.len() == ends.len()`
66    /// - all values can have `dtype`
67    /// - `ends` is monotonely non-decreasing
68    pub unsafe fn new_unchecked(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
69        if cfg!(debug_assertions) {
70            verify_invariants(&values, ends.as_ref()).unwrap();
71        }
72
73        let values = values.rechunk();
74        Self {
75            name,
76            values,
77            ends,
78            materialized: OnceLock::new(),
79        }
80    }
81
82    pub fn try_new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> PolarsResult<Self> {
83        verify_invariants(&values, ends.as_ref())?;
84
85        // SAFETY: Invariants checked before
86        Ok(unsafe { Self::new_unchecked(name, values, ends) })
87    }
88
89    pub fn new_empty(name: PlSmallStr, dtype: DataType) -> Self {
90        Self {
91            name,
92            values: Series::new_empty(PlSmallStr::EMPTY, &dtype),
93            ends: Arc::default(),
94
95            materialized: OnceLock::new(),
96        }
97    }
98
99    pub fn len(&self) -> usize {
100        self.ends.last().map_or(0, |last| *last as usize)
101    }
102
103    pub fn is_empty(&self) -> bool {
104        self.len() == 0
105    }
106
107    pub fn name(&self) -> &PlSmallStr {
108        &self.name
109    }
110
111    pub fn dtype(&self) -> &DataType {
112        self.values.dtype()
113    }
114
115    #[inline]
116    pub fn field(&self) -> Cow<'_, Field> {
117        match self.lazy_as_materialized_series() {
118            None => Cow::Owned(Field::new(self.name().clone(), self.dtype().clone())),
119            Some(s) => s.field(),
120        }
121    }
122
123    pub fn rename(&mut self, name: PlSmallStr) -> &mut Self {
124        self.name = name;
125        self
126    }
127
128    fn _to_series(name: PlSmallStr, values: &Series, ends: &[IdxSize]) -> Series {
129        let dtype = values.dtype();
130        let mut column = Column::Series(Series::new_empty(name, dtype).into());
131
132        let mut prev_offset = 0;
133        for (i, &offset) in ends.iter().enumerate() {
134            // @TODO: Optimize
135            let length = offset - prev_offset;
136            column
137                .extend(&Column::new_scalar(
138                    PlSmallStr::EMPTY,
139                    Scalar::new(dtype.clone(), values.get(i).unwrap().into_static()),
140                    length as usize,
141                ))
142                .unwrap();
143            prev_offset = offset;
144        }
145
146        debug_assert_eq!(column.len(), prev_offset as usize);
147
148        column.take_materialized_series()
149    }
150
151    /// Materialize the [`PartitionedColumn`] into a [`Series`].
152    fn to_series(&self) -> Series {
153        Self::_to_series(self.name.clone(), &self.values, &self.ends)
154    }
155
156    /// Get the [`PartitionedColumn`] as [`Series`] if it was already materialized.
157    pub fn lazy_as_materialized_series(&self) -> Option<&Series> {
158        self.materialized.get()
159    }
160
161    /// Get the [`PartitionedColumn`] as [`Series`]
162    ///
163    /// This needs to materialize upon the first call. Afterwards, this is cached.
164    pub fn as_materialized_series(&self) -> &Series {
165        self.materialized.get_or_init(|| self.to_series())
166    }
167
168    /// Take the [`PartitionedColumn`] and materialize as a [`Series`] if not already done.
169    pub fn take_materialized_series(self) -> Series {
170        self.materialized
171            .into_inner()
172            .unwrap_or_else(|| Self::_to_series(self.name, &self.values, &self.ends))
173    }
174
175    pub fn apply_unary_elementwise(&self, f: impl Fn(&Series) -> Series) -> Self {
176        let result = f(&self.values).rechunk();
177        assert_eq!(self.values.len(), result.len());
178        unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) }
179    }
180
181    pub fn try_apply_unary_elementwise(
182        &self,
183        f: impl Fn(&Series) -> PolarsResult<Series>,
184    ) -> PolarsResult<Self> {
185        let result = f(&self.values)?.rechunk();
186        assert_eq!(self.values.len(), result.len());
187        Ok(unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) })
188    }
189
190    pub fn extend_constant(&self, value: AnyValue, n: usize) -> PolarsResult<Self> {
191        let mut new_ends = self.ends.to_vec();
192        // @TODO: IdxSize checks
193        let new_length = (self.len() + n) as IdxSize;
194
195        let values = if !self.is_empty() && self.values.last().value() == &value {
196            *new_ends.last_mut().unwrap() = new_length;
197            self.values.clone()
198        } else {
199            new_ends.push(new_length);
200            self.values.extend_constant(value, 1)?
201        };
202
203        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, new_ends.into()) })
204    }
205
206    pub unsafe fn get_unchecked(&self, index: usize) -> AnyValue<'_> {
207        debug_assert!(index < self.len());
208
209        // Common situation get_unchecked(0)
210        if index < self.ends[0] as usize {
211            return unsafe { self.get_unchecked(0) };
212        }
213
214        let value_idx = self
215            .ends
216            .binary_search(&(index as IdxSize))
217            .map_or_else(identity, identity);
218
219        self.get_unchecked(value_idx)
220    }
221
222    pub fn min_reduce(&self) -> PolarsResult<Scalar> {
223        self.values.min_reduce()
224    }
225    pub fn max_reduce(&self) -> Result<Scalar, polars_error::PolarsError> {
226        self.values.max_reduce()
227    }
228
229    pub fn reverse(&self) -> Self {
230        let values = self.values.reverse();
231        let mut ends = Vec::with_capacity(self.ends.len());
232
233        let mut offset = 0;
234        ends.extend(self.ends.windows(2).rev().map(|vs| {
235            offset += vs[1] - vs[0];
236            offset
237        }));
238        ends.push(self.len() as IdxSize);
239
240        unsafe { Self::new_unchecked(self.name.clone(), values, ends.into()) }
241    }
242
243    pub fn set_sorted_flag(&mut self, sorted: IsSorted) {
244        self.values.set_sorted_flag(sorted);
245    }
246
247    pub fn cast_with_options(&self, dtype: &DataType, options: CastOptions) -> PolarsResult<Self> {
248        let values = self.values.cast_with_options(dtype, options)?;
249        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
250    }
251
252    pub fn strict_cast(&self, dtype: &DataType) -> PolarsResult<Self> {
253        let values = self.values.strict_cast(dtype)?;
254        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
255    }
256
257    pub fn cast(&self, dtype: &DataType) -> PolarsResult<Self> {
258        let values = self.values.cast(dtype)?;
259        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
260    }
261
262    pub unsafe fn cast_unchecked(&self, dtype: &DataType) -> PolarsResult<Self> {
263        let values = unsafe { self.values.cast_unchecked(dtype) }?;
264        Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
265    }
266
267    pub fn null_count(&self) -> usize {
268        match self.lazy_as_materialized_series() {
269            Some(s) => s.null_count(),
270            None => {
271                // @partition-opt
272                self.as_materialized_series().null_count()
273            },
274        }
275    }
276
277    pub fn clear(&self) -> Self {
278        Self::new_empty(self.name.clone(), self.values.dtype().clone())
279    }
280
281    pub fn partitions(&self) -> &Series {
282        &self.values
283    }
284    pub fn partition_ends(&self) -> &[IdxSize] {
285        &self.ends
286    }
287
288    pub fn partition_ends_ref(&self) -> &Arc<[IdxSize]> {
289        &self.ends
290    }
291
292    pub fn or_reduce(&self) -> PolarsResult<Scalar> {
293        self.values.or_reduce()
294    }
295
296    pub fn and_reduce(&self) -> PolarsResult<Scalar> {
297        self.values.and_reduce()
298    }
299}