polars_core/chunked_array/ops/
row_encode.rs

1use std::borrow::Cow;
2
3use arrow::compute::utils::combine_validities_and_many;
4use polars_row::{RowEncodingContext, RowEncodingOptions, RowsEncoded, convert_columns};
5use rayon::prelude::*;
6
7use crate::POOL;
8use crate::prelude::*;
9use crate::utils::_split_offsets;
10
11pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
12    let n_threads = POOL.current_num_threads();
13    let len = by[0].len();
14    let splits = _split_offsets(len, n_threads);
15
16    let chunks = splits.into_par_iter().map(|(offset, len)| {
17        let sliced = by
18            .iter()
19            .map(|s| s.slice(offset as i64, len))
20            .collect::<Vec<_>>();
21        let rows = _get_rows_encoded_unordered(&sliced)?;
22        Ok(rows.into_array())
23    });
24    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
25
26    Ok(BinaryOffsetChunked::from_chunk_iter(
27        PlSmallStr::EMPTY,
28        chunks?,
29    ))
30}
31
32// Almost the same but broadcast nulls to the row-encoded array.
33pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
34    by: &[Column],
35) -> PolarsResult<BinaryOffsetChunked> {
36    let n_threads = POOL.current_num_threads();
37    let len = by[0].len();
38    let splits = _split_offsets(len, n_threads);
39
40    let chunks = splits.into_par_iter().map(|(offset, len)| {
41        let sliced = by
42            .iter()
43            .map(|s| s.slice(offset as i64, len))
44            .collect::<Vec<_>>();
45        let rows = _get_rows_encoded_unordered(&sliced)?;
46
47        let validities = sliced
48            .iter()
49            .flat_map(|s| {
50                let s = s.rechunk();
51                #[allow(clippy::unnecessary_to_owned)]
52                s.as_materialized_series()
53                    .chunks()
54                    .to_vec()
55                    .into_iter()
56                    .map(|arr| arr.validity().cloned())
57            })
58            .collect::<Vec<_>>();
59
60        let validity = combine_validities_and_many(&validities);
61        Ok(rows.into_array().with_validity_typed(validity))
62    });
63    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
64
65    Ok(BinaryOffsetChunked::from_chunk_iter(
66        PlSmallStr::EMPTY,
67        chunks?,
68    ))
69}
70
71/// Get the [`RowEncodingContext`] for a certain [`DataType`].
72///
73/// This should be given the logical type in order to communicate Polars datatype information down
74/// into the row encoding / decoding.
75pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
76    match dtype {
77        DataType::Boolean
78        | DataType::UInt8
79        | DataType::UInt16
80        | DataType::UInt32
81        | DataType::UInt64
82        | DataType::Int8
83        | DataType::Int16
84        | DataType::Int32
85        | DataType::Int64
86        | DataType::Int128
87        | DataType::Float32
88        | DataType::Float64
89        | DataType::String
90        | DataType::Binary
91        | DataType::BinaryOffset
92        | DataType::Null
93        | DataType::Time
94        | DataType::Date
95        | DataType::Datetime(_, _)
96        | DataType::Duration(_) => None,
97
98        #[cfg(feature = "dtype-categorical")]
99        DataType::Categorical(_, mapping) | DataType::Enum(_, mapping) => {
100            use polars_row::RowEncodingCategoricalContext;
101
102            Some(RowEncodingContext::Categorical(
103                RowEncodingCategoricalContext {
104                    is_enum: matches!(dtype, DataType::Enum(_, _)),
105                    mapping: mapping.clone(),
106                },
107            ))
108        },
109
110        DataType::Unknown(_) => panic!("Unsupported in row encoding"),
111
112        #[cfg(feature = "object")]
113        DataType::Object(_) => panic!("Unsupported in row encoding"),
114
115        #[cfg(feature = "dtype-decimal")]
116        DataType::Decimal(precision, _) => {
117            Some(RowEncodingContext::Decimal(precision.unwrap_or(38)))
118        },
119
120        #[cfg(feature = "dtype-array")]
121        DataType::Array(dtype, _) => get_row_encoding_context(dtype),
122        DataType::List(dtype) => get_row_encoding_context(dtype),
123        #[cfg(feature = "dtype-struct")]
124        DataType::Struct(fs) => {
125            let mut ctxts = Vec::new();
126
127            for (i, f) in fs.iter().enumerate() {
128                if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
129                    ctxts.reserve(fs.len());
130                    ctxts.extend(std::iter::repeat_n(None, i));
131                    ctxts.push(Some(ctxt));
132                    break;
133                }
134            }
135
136            if ctxts.is_empty() {
137                return None;
138            }
139
140            ctxts.extend(
141                fs[ctxts.len()..]
142                    .iter()
143                    .map(|f| get_row_encoding_context(f.dtype())),
144            );
145
146            Some(RowEncodingContext::Struct(ctxts))
147        },
148    }
149}
150
151pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
152    let rows = _get_rows_encoded_unordered(by)?;
153    Ok(BinaryOffsetChunked::with_chunk(
154        PlSmallStr::EMPTY,
155        rows.into_array(),
156    ))
157}
158
159pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
160    let mut cols = Vec::with_capacity(by.len());
161    let mut opts = Vec::with_capacity(by.len());
162    let mut ctxts = Vec::with_capacity(by.len());
163
164    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
165    // columns.
166    let num_rows = by.first().map_or(0, |c| c.len());
167
168    for by in by {
169        debug_assert_eq!(by.len(), num_rows);
170
171        let by = by
172            .trim_lists_to_normalized_offsets()
173            .map_or(Cow::Borrowed(by), Cow::Owned);
174        let by = by.propagate_nulls().map_or(by, Cow::Owned);
175        let by = by.as_materialized_series();
176        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
177        let opt = RowEncodingOptions::new_unsorted();
178        let ctxt = get_row_encoding_context(by.dtype());
179
180        cols.push(arr);
181        opts.push(opt);
182        ctxts.push(ctxt);
183    }
184    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
185}
186
187pub fn _get_rows_encoded(
188    by: &[Column],
189    descending: &[bool],
190    nulls_last: &[bool],
191) -> PolarsResult<RowsEncoded> {
192    debug_assert_eq!(by.len(), descending.len());
193    debug_assert_eq!(by.len(), nulls_last.len());
194
195    let mut cols = Vec::with_capacity(by.len());
196    let mut opts = Vec::with_capacity(by.len());
197    let mut ctxts = Vec::with_capacity(by.len());
198
199    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
200    // columns.
201    let num_rows = by.first().map_or(0, |c| c.len());
202
203    for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
204        debug_assert_eq!(by.len(), num_rows);
205
206        let by = by
207            .trim_lists_to_normalized_offsets()
208            .map_or(Cow::Borrowed(by), Cow::Owned);
209        let by = by.propagate_nulls().map_or(by, Cow::Owned);
210        let by = by.as_materialized_series();
211        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
212        let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
213        let ctxt = get_row_encoding_context(by.dtype());
214
215        cols.push(arr);
216        opts.push(opt);
217        ctxts.push(ctxt);
218    }
219    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
220}
221
222pub fn _get_rows_encoded_ca(
223    name: PlSmallStr,
224    by: &[Column],
225    descending: &[bool],
226    nulls_last: &[bool],
227) -> PolarsResult<BinaryOffsetChunked> {
228    _get_rows_encoded(by, descending, nulls_last)
229        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
230}
231
232pub fn _get_rows_encoded_arr(
233    by: &[Column],
234    descending: &[bool],
235    nulls_last: &[bool],
236) -> PolarsResult<BinaryArray<i64>> {
237    _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
238}
239
240pub fn _get_rows_encoded_ca_unordered(
241    name: PlSmallStr,
242    by: &[Column],
243) -> PolarsResult<BinaryOffsetChunked> {
244    _get_rows_encoded_unordered(by)
245        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
246}
247
248#[cfg(feature = "dtype-struct")]
249pub fn row_encoding_decode(
250    ca: &BinaryOffsetChunked,
251    fields: &[Field],
252    opts: &[RowEncodingOptions],
253) -> PolarsResult<StructChunked> {
254    let (ctxts, dtypes) = fields
255        .iter()
256        .map(|f| {
257            (
258                get_row_encoding_context(f.dtype()),
259                f.dtype().to_physical().to_arrow(CompatLevel::newest()),
260            )
261        })
262        .collect::<(Vec<_>, Vec<_>)>();
263
264    let struct_arrow_dtype = ArrowDataType::Struct(
265        fields
266            .iter()
267            .map(|v| v.to_physical().to_arrow(CompatLevel::newest()))
268            .collect(),
269    );
270
271    let mut rows = Vec::new();
272    let chunks = ca
273        .downcast_iter()
274        .map(|array| {
275            let decoded_arrays = unsafe {
276                polars_row::decode::decode_rows_from_binary(array, opts, &ctxts, &dtypes, &mut rows)
277            };
278            assert_eq!(decoded_arrays.len(), fields.len());
279
280            StructArray::new(
281                struct_arrow_dtype.clone(),
282                array.len(),
283                decoded_arrays,
284                None,
285            )
286            .to_boxed()
287        })
288        .collect::<Vec<_>>();
289
290    Ok(unsafe {
291        StructChunked::from_chunks_and_dtype(
292            ca.name().clone(),
293            chunks,
294            DataType::Struct(fields.to_vec()),
295        )
296    })
297}