polars_core/chunked_array/ops/
row_encode.rs

1use std::borrow::Cow;
2
3use arrow::compute::utils::combine_validities_and_many;
4use polars_row::{RowEncodingContext, RowEncodingOptions, RowsEncoded, convert_columns};
5use rayon::prelude::*;
6
7use crate::POOL;
8use crate::prelude::*;
9use crate::utils::_split_offsets;
10
11pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
12    let n_threads = POOL.current_num_threads();
13    let len = by[0].len();
14    let splits = _split_offsets(len, n_threads);
15
16    let chunks = splits.into_par_iter().map(|(offset, len)| {
17        let sliced = by
18            .iter()
19            .map(|s| s.slice(offset as i64, len))
20            .collect::<Vec<_>>();
21        let rows = _get_rows_encoded_unordered(&sliced)?;
22        Ok(rows.into_array())
23    });
24    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
25
26    Ok(BinaryOffsetChunked::from_chunk_iter(
27        PlSmallStr::EMPTY,
28        chunks?,
29    ))
30}
31
32// Almost the same but broadcast nulls to the row-encoded array.
33pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
34    by: &[Column],
35) -> PolarsResult<BinaryOffsetChunked> {
36    let n_threads = POOL.current_num_threads();
37    let len = by[0].len();
38    let splits = _split_offsets(len, n_threads);
39
40    let chunks = splits.into_par_iter().map(|(offset, len)| {
41        let sliced = by
42            .iter()
43            .map(|s| s.slice(offset as i64, len))
44            .collect::<Vec<_>>();
45        let rows = _get_rows_encoded_unordered(&sliced)?;
46
47        let validities = sliced
48            .iter()
49            .flat_map(|s| {
50                let s = s.rechunk();
51                #[allow(clippy::unnecessary_to_owned)]
52                s.as_materialized_series()
53                    .chunks()
54                    .to_vec()
55                    .into_iter()
56                    .map(|arr| arr.validity().cloned())
57            })
58            .collect::<Vec<_>>();
59
60        let validity = combine_validities_and_many(&validities);
61        Ok(rows.into_array().with_validity_typed(validity))
62    });
63    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
64
65    Ok(BinaryOffsetChunked::from_chunk_iter(
66        PlSmallStr::EMPTY,
67        chunks?,
68    ))
69}
70
71/// Get the [`RowEncodingContext`] for a certain [`DataType`].
72///
73/// This should be given the logical type in order to communicate Polars datatype information down
74/// into the row encoding / decoding.
75pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
76    match dtype {
77        DataType::Boolean
78        | DataType::UInt8
79        | DataType::UInt16
80        | DataType::UInt32
81        | DataType::UInt64
82        | DataType::UInt128
83        | DataType::Int8
84        | DataType::Int16
85        | DataType::Int32
86        | DataType::Int64
87        | DataType::Int128
88        | DataType::Float32
89        | DataType::Float64
90        | DataType::String
91        | DataType::Binary
92        | DataType::BinaryOffset
93        | DataType::Null
94        | DataType::Time
95        | DataType::Date
96        | DataType::Datetime(_, _)
97        | DataType::Duration(_) => None,
98
99        #[cfg(feature = "dtype-categorical")]
100        DataType::Categorical(_, mapping) | DataType::Enum(_, mapping) => {
101            use polars_row::RowEncodingCategoricalContext;
102
103            Some(RowEncodingContext::Categorical(
104                RowEncodingCategoricalContext {
105                    is_enum: matches!(dtype, DataType::Enum(_, _)),
106                    mapping: mapping.clone(),
107                },
108            ))
109        },
110
111        DataType::Unknown(_) => panic!("Unsupported in row encoding"),
112
113        #[cfg(feature = "object")]
114        DataType::Object(_) => panic!("Unsupported in row encoding"),
115
116        #[cfg(feature = "dtype-decimal")]
117        DataType::Decimal(precision, _) => Some(RowEncodingContext::Decimal(*precision)),
118
119        #[cfg(feature = "dtype-array")]
120        DataType::Array(dtype, _) => get_row_encoding_context(dtype),
121        DataType::List(dtype) => get_row_encoding_context(dtype),
122        #[cfg(feature = "dtype-struct")]
123        DataType::Struct(fs) => {
124            let mut ctxts = Vec::new();
125
126            for (i, f) in fs.iter().enumerate() {
127                if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
128                    ctxts.reserve(fs.len());
129                    ctxts.extend(std::iter::repeat_n(None, i));
130                    ctxts.push(Some(ctxt));
131                    break;
132                }
133            }
134
135            if ctxts.is_empty() {
136                return None;
137            }
138
139            ctxts.extend(
140                fs[ctxts.len()..]
141                    .iter()
142                    .map(|f| get_row_encoding_context(f.dtype())),
143            );
144
145            Some(RowEncodingContext::Struct(ctxts))
146        },
147    }
148}
149
150pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
151    let rows = _get_rows_encoded_unordered(by)?;
152    Ok(BinaryOffsetChunked::with_chunk(
153        PlSmallStr::EMPTY,
154        rows.into_array(),
155    ))
156}
157
158pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
159    let mut cols = Vec::with_capacity(by.len());
160    let mut opts = Vec::with_capacity(by.len());
161    let mut ctxts = Vec::with_capacity(by.len());
162
163    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
164    // columns.
165    let num_rows = by.first().map_or(0, |c| c.len());
166
167    for by in by {
168        debug_assert_eq!(by.len(), num_rows);
169
170        let by = by
171            .trim_lists_to_normalized_offsets()
172            .map_or(Cow::Borrowed(by), Cow::Owned);
173        let by = by.propagate_nulls().map_or(by, Cow::Owned);
174        let by = by.as_materialized_series();
175        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
176        let opt = RowEncodingOptions::new_unsorted();
177        let ctxt = get_row_encoding_context(by.dtype());
178
179        cols.push(arr);
180        opts.push(opt);
181        ctxts.push(ctxt);
182    }
183    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
184}
185
186pub fn _get_rows_encoded(
187    by: &[Column],
188    descending: &[bool],
189    nulls_last: &[bool],
190) -> PolarsResult<RowsEncoded> {
191    debug_assert_eq!(by.len(), descending.len());
192    debug_assert_eq!(by.len(), nulls_last.len());
193
194    let mut cols = Vec::with_capacity(by.len());
195    let mut opts = Vec::with_capacity(by.len());
196    let mut ctxts = Vec::with_capacity(by.len());
197
198    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
199    // columns.
200    let num_rows = by.first().map_or(0, |c| c.len());
201
202    for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
203        debug_assert_eq!(by.len(), num_rows);
204
205        let by = by
206            .trim_lists_to_normalized_offsets()
207            .map_or(Cow::Borrowed(by), Cow::Owned);
208        let by = by.propagate_nulls().map_or(by, Cow::Owned);
209        let by = by.as_materialized_series();
210        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
211        let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
212        let ctxt = get_row_encoding_context(by.dtype());
213
214        cols.push(arr);
215        opts.push(opt);
216        ctxts.push(ctxt);
217    }
218    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
219}
220
221pub fn _get_rows_encoded_ca(
222    name: PlSmallStr,
223    by: &[Column],
224    descending: &[bool],
225    nulls_last: &[bool],
226) -> PolarsResult<BinaryOffsetChunked> {
227    _get_rows_encoded(by, descending, nulls_last)
228        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
229}
230
231pub fn _get_rows_encoded_arr(
232    by: &[Column],
233    descending: &[bool],
234    nulls_last: &[bool],
235) -> PolarsResult<BinaryArray<i64>> {
236    _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
237}
238
239pub fn _get_rows_encoded_ca_unordered(
240    name: PlSmallStr,
241    by: &[Column],
242) -> PolarsResult<BinaryOffsetChunked> {
243    _get_rows_encoded_unordered(by)
244        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
245}
246
247#[cfg(feature = "dtype-struct")]
248pub fn row_encoding_decode(
249    ca: &BinaryOffsetChunked,
250    fields: &[Field],
251    opts: &[RowEncodingOptions],
252) -> PolarsResult<StructChunked> {
253    let (ctxts, dtypes) = fields
254        .iter()
255        .map(|f| {
256            (
257                get_row_encoding_context(f.dtype()),
258                f.dtype().to_physical().to_arrow(CompatLevel::newest()),
259            )
260        })
261        .collect::<(Vec<_>, Vec<_>)>();
262
263    let struct_arrow_dtype = ArrowDataType::Struct(
264        fields
265            .iter()
266            .map(|v| v.to_physical().to_arrow(CompatLevel::newest()))
267            .collect(),
268    );
269
270    let mut rows = Vec::new();
271    let chunks = ca
272        .downcast_iter()
273        .map(|array| {
274            let decoded_arrays = unsafe {
275                polars_row::decode::decode_rows_from_binary(array, opts, &ctxts, &dtypes, &mut rows)
276            };
277            assert_eq!(decoded_arrays.len(), fields.len());
278
279            StructArray::new(
280                struct_arrow_dtype.clone(),
281                array.len(),
282                decoded_arrays,
283                None,
284            )
285            .to_boxed()
286        })
287        .collect::<Vec<_>>();
288
289    Ok(unsafe {
290        StructChunked::from_chunks_and_dtype(
291            ca.name().clone(),
292            chunks,
293            DataType::Struct(fields.to_vec()),
294        )
295    })
296}