polars_core/chunked_array/ops/
row_encode.rs

1use std::borrow::Cow;
2
3use arrow::compute::utils::combine_validities_and_many;
4use polars_row::{RowEncodingContext, RowEncodingOptions, RowsEncoded, convert_columns};
5use rayon::prelude::*;
6
7use crate::POOL;
8use crate::prelude::*;
9use crate::utils::_split_offsets;
10
11pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
12    let n_threads = POOL.current_num_threads();
13    let len = by[0].len();
14    let splits = _split_offsets(len, n_threads);
15
16    let chunks = splits.into_par_iter().map(|(offset, len)| {
17        let sliced = by
18            .iter()
19            .map(|s| s.slice(offset as i64, len))
20            .collect::<Vec<_>>();
21        let rows = _get_rows_encoded_unordered(&sliced)?;
22        Ok(rows.into_array())
23    });
24    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
25
26    Ok(BinaryOffsetChunked::from_chunk_iter(
27        PlSmallStr::EMPTY,
28        chunks?,
29    ))
30}
31
32// Almost the same but broadcast nulls to the row-encoded array.
33pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
34    by: &[Column],
35) -> PolarsResult<BinaryOffsetChunked> {
36    let n_threads = POOL.current_num_threads();
37    let len = by[0].len();
38    let splits = _split_offsets(len, n_threads);
39
40    let chunks = splits.into_par_iter().map(|(offset, len)| {
41        let sliced = by
42            .iter()
43            .map(|s| s.slice(offset as i64, len))
44            .collect::<Vec<_>>();
45        let rows = _get_rows_encoded_unordered(&sliced)?;
46
47        let validities = sliced
48            .iter()
49            .flat_map(|s| {
50                let s = s.rechunk();
51                #[allow(clippy::unnecessary_to_owned)]
52                s.as_materialized_series()
53                    .chunks()
54                    .to_vec()
55                    .into_iter()
56                    .map(|arr| arr.validity().cloned())
57            })
58            .collect::<Vec<_>>();
59
60        let validity = combine_validities_and_many(&validities);
61        Ok(rows.into_array().with_validity_typed(validity))
62    });
63    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
64
65    Ok(BinaryOffsetChunked::from_chunk_iter(
66        PlSmallStr::EMPTY,
67        chunks?,
68    ))
69}
70
71/// Get the [`RowEncodingContext`] for a certain [`DataType`].
72///
73/// This should be given the logical type in order to communicate Polars datatype information down
74/// into the row encoding / decoding.
75pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
76    match dtype {
77        DataType::Boolean
78        | DataType::UInt8
79        | DataType::UInt16
80        | DataType::UInt32
81        | DataType::UInt64
82        | DataType::UInt128
83        | DataType::Int8
84        | DataType::Int16
85        | DataType::Int32
86        | DataType::Int64
87        | DataType::Int128
88        | DataType::Float16
89        | DataType::Float32
90        | DataType::Float64
91        | DataType::String
92        | DataType::Binary
93        | DataType::BinaryOffset
94        | DataType::Null
95        | DataType::Time
96        | DataType::Date
97        | DataType::Datetime(_, _)
98        | DataType::Duration(_) => None,
99
100        #[cfg(feature = "dtype-categorical")]
101        DataType::Categorical(_, mapping) | DataType::Enum(_, mapping) => {
102            use polars_row::RowEncodingCategoricalContext;
103
104            Some(RowEncodingContext::Categorical(
105                RowEncodingCategoricalContext {
106                    is_enum: matches!(dtype, DataType::Enum(_, _)),
107                    mapping: mapping.clone(),
108                },
109            ))
110        },
111
112        DataType::Unknown(_) => panic!("Unsupported in row encoding"),
113
114        #[cfg(feature = "object")]
115        DataType::Object(_) => panic!("Unsupported in row encoding"),
116
117        #[cfg(feature = "dtype-decimal")]
118        DataType::Decimal(precision, _) => Some(RowEncodingContext::Decimal(*precision)),
119
120        #[cfg(feature = "dtype-array")]
121        DataType::Array(dtype, _) => get_row_encoding_context(dtype),
122        DataType::List(dtype) => get_row_encoding_context(dtype),
123        #[cfg(feature = "dtype-struct")]
124        DataType::Struct(fs) => {
125            let mut ctxts = Vec::new();
126
127            for (i, f) in fs.iter().enumerate() {
128                if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
129                    ctxts.reserve(fs.len());
130                    ctxts.extend(std::iter::repeat_n(None, i));
131                    ctxts.push(Some(ctxt));
132                    break;
133                }
134            }
135
136            if ctxts.is_empty() {
137                return None;
138            }
139
140            ctxts.extend(
141                fs[ctxts.len()..]
142                    .iter()
143                    .map(|f| get_row_encoding_context(f.dtype())),
144            );
145
146            Some(RowEncodingContext::Struct(ctxts))
147        },
148        #[cfg(feature = "dtype-extension")]
149        DataType::Extension(_, storage) => get_row_encoding_context(storage),
150    }
151}
152
153pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
154    let rows = _get_rows_encoded_unordered(by)?;
155    Ok(BinaryOffsetChunked::with_chunk(
156        PlSmallStr::EMPTY,
157        rows.into_array(),
158    ))
159}
160
161pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
162    let mut cols = Vec::with_capacity(by.len());
163    let mut opts = Vec::with_capacity(by.len());
164    let mut ctxts = Vec::with_capacity(by.len());
165
166    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
167    // columns.
168    let num_rows = by.first().map_or(0, |c| c.len());
169
170    for by in by {
171        debug_assert_eq!(by.len(), num_rows);
172
173        let by = by
174            .trim_lists_to_normalized_offsets()
175            .map_or(Cow::Borrowed(by), Cow::Owned);
176        let by = by.propagate_nulls().map_or(by, Cow::Owned);
177        let by = by.as_materialized_series();
178        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
179        let opt = RowEncodingOptions::new_unsorted();
180        let ctxt = get_row_encoding_context(by.dtype());
181
182        cols.push(arr);
183        opts.push(opt);
184        ctxts.push(ctxt);
185    }
186    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
187}
188
189pub fn _get_rows_encoded(
190    by: &[Column],
191    descending: &[bool],
192    nulls_last: &[bool],
193) -> PolarsResult<RowsEncoded> {
194    debug_assert_eq!(by.len(), descending.len());
195    debug_assert_eq!(by.len(), nulls_last.len());
196
197    let mut cols = Vec::with_capacity(by.len());
198    let mut opts = Vec::with_capacity(by.len());
199    let mut ctxts = Vec::with_capacity(by.len());
200
201    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
202    // columns.
203    let num_rows = by.first().map_or(0, |c| c.len());
204
205    for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
206        debug_assert_eq!(by.len(), num_rows);
207
208        let by = by
209            .trim_lists_to_normalized_offsets()
210            .map_or(Cow::Borrowed(by), Cow::Owned);
211        let by = by.propagate_nulls().map_or(by, Cow::Owned);
212        let by = by.as_materialized_series();
213        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
214        let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
215        let ctxt = get_row_encoding_context(by.dtype());
216
217        cols.push(arr);
218        opts.push(opt);
219        ctxts.push(ctxt);
220    }
221    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
222}
223
224pub fn _get_rows_encoded_ca(
225    name: PlSmallStr,
226    by: &[Column],
227    descending: &[bool],
228    nulls_last: &[bool],
229) -> PolarsResult<BinaryOffsetChunked> {
230    _get_rows_encoded(by, descending, nulls_last)
231        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
232}
233
234pub fn _get_rows_encoded_arr(
235    by: &[Column],
236    descending: &[bool],
237    nulls_last: &[bool],
238) -> PolarsResult<BinaryArray<i64>> {
239    _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
240}
241
242pub fn _get_rows_encoded_ca_unordered(
243    name: PlSmallStr,
244    by: &[Column],
245) -> PolarsResult<BinaryOffsetChunked> {
246    _get_rows_encoded_unordered(by)
247        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
248}
249
250#[cfg(feature = "dtype-struct")]
251pub fn row_encoding_decode(
252    ca: &BinaryOffsetChunked,
253    fields: &[Field],
254    opts: &[RowEncodingOptions],
255) -> PolarsResult<StructChunked> {
256    let (ctxts, dtypes) = fields
257        .iter()
258        .map(|f| {
259            (
260                get_row_encoding_context(f.dtype()),
261                f.dtype().to_physical().to_arrow(CompatLevel::newest()),
262            )
263        })
264        .collect::<(Vec<_>, Vec<_>)>();
265
266    let struct_arrow_dtype = ArrowDataType::Struct(
267        fields
268            .iter()
269            .map(|v| v.to_physical().to_arrow(CompatLevel::newest()))
270            .collect(),
271    );
272
273    let mut rows = Vec::new();
274    let chunks = ca
275        .downcast_iter()
276        .map(|array| {
277            let decoded_arrays = unsafe {
278                polars_row::decode::decode_rows_from_binary(array, opts, &ctxts, &dtypes, &mut rows)
279            };
280            assert_eq!(decoded_arrays.len(), fields.len());
281
282            StructArray::new(
283                struct_arrow_dtype.clone(),
284                array.len(),
285                decoded_arrays,
286                None,
287            )
288            .to_boxed()
289        })
290        .collect::<Vec<_>>();
291
292    Ok(unsafe {
293        StructChunked::from_chunks_and_dtype(
294            ca.name().clone(),
295            chunks,
296            DataType::Struct(fields.to_vec()),
297        )
298    })
299}