polars_core/chunked_array/ops/
row_encode.rs

1use std::borrow::Cow;
2
3use arrow::compute::utils::combine_validities_and_many;
4use polars_row::{RowEncodingContext, RowEncodingOptions, RowsEncoded, convert_columns};
5use polars_utils::itertools::Itertools;
6use rayon::prelude::*;
7
8use crate::POOL;
9use crate::prelude::*;
10use crate::utils::_split_offsets;
11
12pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
13    let n_threads = POOL.current_num_threads();
14    let len = by[0].len();
15    let splits = _split_offsets(len, n_threads);
16
17    let chunks = splits.into_par_iter().map(|(offset, len)| {
18        let sliced = by
19            .iter()
20            .map(|s| s.slice(offset as i64, len))
21            .collect::<Vec<_>>();
22        let rows = _get_rows_encoded_unordered(&sliced)?;
23        Ok(rows.into_array())
24    });
25    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
26
27    Ok(BinaryOffsetChunked::from_chunk_iter(
28        PlSmallStr::EMPTY,
29        chunks?,
30    ))
31}
32
33// Almost the same but broadcast nulls to the row-encoded array.
34pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
35    by: &[Column],
36) -> PolarsResult<BinaryOffsetChunked> {
37    let n_threads = POOL.current_num_threads();
38    let len = by[0].len();
39    let splits = _split_offsets(len, n_threads);
40
41    let chunks = splits.into_par_iter().map(|(offset, len)| {
42        let sliced = by
43            .iter()
44            .map(|s| s.slice(offset as i64, len))
45            .collect::<Vec<_>>();
46        let rows = _get_rows_encoded_unordered(&sliced)?;
47
48        let validities = sliced
49            .iter()
50            .flat_map(|s| {
51                let s = s.rechunk();
52                #[allow(clippy::unnecessary_to_owned)]
53                s.as_materialized_series()
54                    .chunks()
55                    .to_vec()
56                    .into_iter()
57                    .map(|arr| arr.validity().cloned())
58            })
59            .collect::<Vec<_>>();
60
61        let validity = combine_validities_and_many(&validities);
62        Ok(rows.into_array().with_validity_typed(validity))
63    });
64    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
65
66    Ok(BinaryOffsetChunked::from_chunk_iter(
67        PlSmallStr::EMPTY,
68        chunks?,
69    ))
70}
71
72/// Get the [`RowEncodingContext`] for a certain [`DataType`].
73///
74/// This should be given the logical type in order to communicate Polars datatype information down
75/// into the row encoding / decoding.
76pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
77    match dtype {
78        DataType::Boolean
79        | DataType::UInt8
80        | DataType::UInt16
81        | DataType::UInt32
82        | DataType::UInt64
83        | DataType::UInt128
84        | DataType::Int8
85        | DataType::Int16
86        | DataType::Int32
87        | DataType::Int64
88        | DataType::Int128
89        | DataType::Float16
90        | DataType::Float32
91        | DataType::Float64
92        | DataType::String
93        | DataType::Binary
94        | DataType::BinaryOffset
95        | DataType::Null
96        | DataType::Time
97        | DataType::Date
98        | DataType::Datetime(_, _)
99        | DataType::Duration(_) => None,
100
101        #[cfg(feature = "dtype-categorical")]
102        DataType::Categorical(_, mapping) | DataType::Enum(_, mapping) => {
103            use polars_row::RowEncodingCategoricalContext;
104
105            Some(RowEncodingContext::Categorical(
106                RowEncodingCategoricalContext {
107                    is_enum: matches!(dtype, DataType::Enum(_, _)),
108                    mapping: mapping.clone(),
109                },
110            ))
111        },
112
113        DataType::Unknown(_) => panic!("Unsupported in row encoding"),
114
115        #[cfg(feature = "object")]
116        DataType::Object(_) => panic!("Unsupported in row encoding"),
117
118        #[cfg(feature = "dtype-decimal")]
119        DataType::Decimal(precision, _) => Some(RowEncodingContext::Decimal(*precision)),
120
121        #[cfg(feature = "dtype-array")]
122        DataType::Array(dtype, _) => get_row_encoding_context(dtype),
123        DataType::List(dtype) => get_row_encoding_context(dtype),
124        #[cfg(feature = "dtype-struct")]
125        DataType::Struct(fs) => {
126            let mut ctxts = Vec::new();
127
128            for (i, f) in fs.iter().enumerate() {
129                if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
130                    ctxts.reserve(fs.len());
131                    ctxts.extend(std::iter::repeat_n(None, i));
132                    ctxts.push(Some(ctxt));
133                    break;
134                }
135            }
136
137            if ctxts.is_empty() {
138                return None;
139            }
140
141            ctxts.extend(
142                fs[ctxts.len()..]
143                    .iter()
144                    .map(|f| get_row_encoding_context(f.dtype())),
145            );
146
147            Some(RowEncodingContext::Struct(ctxts))
148        },
149        #[cfg(feature = "dtype-extension")]
150        DataType::Extension(_, storage) => get_row_encoding_context(storage),
151    }
152}
153
154pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
155    let rows = _get_rows_encoded_unordered(by)?;
156    Ok(BinaryOffsetChunked::with_chunk(
157        PlSmallStr::EMPTY,
158        rows.into_array(),
159    ))
160}
161
162pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
163    let mut cols = Vec::with_capacity(by.len());
164    let mut opts = Vec::with_capacity(by.len());
165    let mut ctxts = Vec::with_capacity(by.len());
166
167    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
168    // columns.
169    let num_rows = by.first().map_or(0, |c| c.len());
170
171    for by in by {
172        debug_assert_eq!(by.len(), num_rows);
173
174        let by = by
175            .trim_lists_to_normalized_offsets()
176            .map_or(Cow::Borrowed(by), Cow::Owned);
177        let by = by.propagate_nulls().map_or(by, Cow::Owned);
178        let by = by.as_materialized_series();
179        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
180        let opt = RowEncodingOptions::new_unsorted();
181        let ctxt = get_row_encoding_context(by.dtype());
182
183        cols.push(arr);
184        opts.push(opt);
185        ctxts.push(ctxt);
186    }
187    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
188}
189
190pub fn _get_rows_encoded(
191    by: &[Column],
192    descending: &[bool],
193    nulls_last: &[bool],
194) -> PolarsResult<RowsEncoded> {
195    debug_assert_eq!(by.len(), descending.len());
196    debug_assert_eq!(by.len(), nulls_last.len());
197
198    let mut cols = Vec::with_capacity(by.len());
199    let mut opts = Vec::with_capacity(by.len());
200    let mut ctxts = Vec::with_capacity(by.len());
201
202    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
203    // columns.
204    let num_rows = by.first().map_or(0, |c| c.len());
205
206    for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
207        debug_assert_eq!(by.len(), num_rows);
208
209        let by = by
210            .trim_lists_to_normalized_offsets()
211            .map_or(Cow::Borrowed(by), Cow::Owned);
212        let by = by.propagate_nulls().map_or(by, Cow::Owned);
213        let by = by.as_materialized_series();
214        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
215        let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
216        let ctxt = get_row_encoding_context(by.dtype());
217
218        cols.push(arr);
219        opts.push(opt);
220        ctxts.push(ctxt);
221    }
222    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
223}
224
225pub fn _get_rows_encoded_ca(
226    name: PlSmallStr,
227    by: &[Column],
228    descending: &[bool],
229    nulls_last: &[bool],
230    broadcast_nulls: bool,
231) -> PolarsResult<BinaryOffsetChunked> {
232    let mut rows_arr = _get_rows_encoded(by, descending, nulls_last)?.into_array();
233    if broadcast_nulls {
234        let validities = by
235            .iter()
236            .map(|c| c.as_materialized_series().rechunk_validity())
237            .collect_vec();
238        let combined = combine_validities_and_many(&validities);
239        rows_arr.set_validity(combined);
240    }
241    Ok(BinaryOffsetChunked::with_chunk(name, rows_arr))
242}
243
244pub fn _get_rows_encoded_arr(
245    by: &[Column],
246    descending: &[bool],
247    nulls_last: &[bool],
248    broadcast_nulls: bool,
249) -> PolarsResult<BinaryArray<i64>> {
250    let mut rows_arr = _get_rows_encoded(by, descending, nulls_last)?.into_array();
251    if broadcast_nulls {
252        let validities = by
253            .iter()
254            .map(|c| c.as_materialized_series().rechunk_validity())
255            .collect_vec();
256        let combined = combine_validities_and_many(&validities);
257        rows_arr.set_validity(combined);
258    }
259    Ok(rows_arr)
260}
261
262pub fn _get_rows_encoded_ca_unordered(
263    name: PlSmallStr,
264    by: &[Column],
265) -> PolarsResult<BinaryOffsetChunked> {
266    _get_rows_encoded_unordered(by)
267        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
268}
269
270#[cfg(feature = "dtype-struct")]
271pub fn row_encoding_decode(
272    ca: &BinaryOffsetChunked,
273    fields: &[Field],
274    opts: &[RowEncodingOptions],
275) -> PolarsResult<StructChunked> {
276    let (ctxts, dtypes) = fields
277        .iter()
278        .map(|f| {
279            (
280                get_row_encoding_context(f.dtype()),
281                f.dtype().to_physical().to_arrow(CompatLevel::newest()),
282            )
283        })
284        .collect::<(Vec<_>, Vec<_>)>();
285
286    let struct_arrow_dtype = ArrowDataType::Struct(
287        fields
288            .iter()
289            .map(|v| v.to_physical().to_arrow(CompatLevel::newest()))
290            .collect(),
291    );
292
293    let mut rows = Vec::new();
294    let chunks = ca
295        .downcast_iter()
296        .map(|array| {
297            let decoded_arrays = unsafe {
298                polars_row::decode::decode_rows_from_binary(array, opts, &ctxts, &dtypes, &mut rows)
299            };
300            assert_eq!(decoded_arrays.len(), fields.len());
301
302            StructArray::new(
303                struct_arrow_dtype.clone(),
304                array.len(),
305                decoded_arrays,
306                None,
307            )
308            .to_boxed()
309        })
310        .collect::<Vec<_>>();
311
312    Ok(unsafe {
313        StructChunked::from_chunks_and_dtype(
314            ca.name().clone(),
315            chunks,
316            DataType::Struct(fields.to_vec()),
317        )
318    })
319}