polars_core/chunked_array/ops/
row_encode.rs

1use std::borrow::Cow;
2
3use arrow::compute::utils::combine_validities_and_many;
4use polars_row::{
5    RowEncodingCategoricalContext, RowEncodingContext, RowEncodingOptions, RowsEncoded,
6    convert_columns,
7};
8use polars_utils::itertools::Itertools;
9use rayon::prelude::*;
10
11use crate::POOL;
12use crate::prelude::*;
13use crate::utils::_split_offsets;
14
15pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
16    let n_threads = POOL.current_num_threads();
17    let len = by[0].len();
18    let splits = _split_offsets(len, n_threads);
19
20    let chunks = splits.into_par_iter().map(|(offset, len)| {
21        let sliced = by
22            .iter()
23            .map(|s| s.slice(offset as i64, len))
24            .collect::<Vec<_>>();
25        let rows = _get_rows_encoded_unordered(&sliced)?;
26        Ok(rows.into_array())
27    });
28    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
29
30    Ok(BinaryOffsetChunked::from_chunk_iter(
31        PlSmallStr::EMPTY,
32        chunks?,
33    ))
34}
35
36// Almost the same but broadcast nulls to the row-encoded array.
37pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
38    by: &[Column],
39) -> PolarsResult<BinaryOffsetChunked> {
40    let n_threads = POOL.current_num_threads();
41    let len = by[0].len();
42    let splits = _split_offsets(len, n_threads);
43
44    let chunks = splits.into_par_iter().map(|(offset, len)| {
45        let sliced = by
46            .iter()
47            .map(|s| s.slice(offset as i64, len))
48            .collect::<Vec<_>>();
49        let rows = _get_rows_encoded_unordered(&sliced)?;
50
51        let validities = sliced
52            .iter()
53            .flat_map(|s| {
54                let s = s.rechunk();
55                #[allow(clippy::unnecessary_to_owned)]
56                s.as_materialized_series()
57                    .chunks()
58                    .to_vec()
59                    .into_iter()
60                    .map(|arr| arr.validity().cloned())
61            })
62            .collect::<Vec<_>>();
63
64        let validity = combine_validities_and_many(&validities);
65        Ok(rows.into_array().with_validity_typed(validity))
66    });
67    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
68
69    Ok(BinaryOffsetChunked::from_chunk_iter(
70        PlSmallStr::EMPTY,
71        chunks?,
72    ))
73}
74
75/// Get the [`RowEncodingContext`] for a certain [`DataType`].
76///
77/// This should be given the logical type in order to communicate Polars datatype information down
78/// into the row encoding / decoding.
79pub fn get_row_encoding_context(dtype: &DataType, ordered: bool) -> Option<RowEncodingContext> {
80    match dtype {
81        DataType::Boolean
82        | DataType::UInt8
83        | DataType::UInt16
84        | DataType::UInt32
85        | DataType::UInt64
86        | DataType::Int8
87        | DataType::Int16
88        | DataType::Int32
89        | DataType::Int64
90        | DataType::Int128
91        | DataType::Float32
92        | DataType::Float64
93        | DataType::String
94        | DataType::Binary
95        | DataType::BinaryOffset
96        | DataType::Null
97        | DataType::Time
98        | DataType::Date
99        | DataType::Datetime(_, _)
100        | DataType::Duration(_) => None,
101
102        DataType::Unknown(_) => panic!("Unsupported in row encoding"),
103
104        #[cfg(feature = "object")]
105        DataType::Object(_) => panic!("Unsupported in row encoding"),
106
107        #[cfg(feature = "dtype-decimal")]
108        DataType::Decimal(precision, _) => {
109            Some(RowEncodingContext::Decimal(precision.unwrap_or(38)))
110        },
111
112        #[cfg(feature = "dtype-array")]
113        DataType::Array(dtype, _) => get_row_encoding_context(dtype, ordered),
114        DataType::List(dtype) => get_row_encoding_context(dtype, ordered),
115        #[cfg(feature = "dtype-categorical")]
116        DataType::Categorical(revmap, ordering) | DataType::Enum(revmap, ordering) => {
117            let is_enum = dtype.is_enum();
118            let ctx = match revmap {
119                Some(revmap) => {
120                    let (num_known_categories, lexical_sort_idxs) = match revmap.as_ref() {
121                        RevMapping::Global(map, _, _) => {
122                            let num_known_categories =
123                                map.keys().max().copied().map_or(0, |m| m + 1);
124
125                            // @TODO: This should probably be cached.
126                            let lexical_sort_idxs = (ordered
127                                && matches!(ordering, CategoricalOrdering::Lexical))
128                            .then(|| {
129                                let read_map = crate::STRING_CACHE.read_map();
130                                let payloads = read_map.get_current_payloads();
131                                assert!(payloads.len() >= num_known_categories as usize);
132
133                                let mut idxs = (0..num_known_categories).collect::<Vec<u32>>();
134                                idxs.sort_by_key(|&k| payloads[k as usize].as_str());
135                                let mut sort_idxs = vec![0; num_known_categories as usize];
136                                for (i, idx) in idxs.into_iter().enumerate_u32() {
137                                    sort_idxs[idx as usize] = i;
138                                }
139                                sort_idxs
140                            });
141
142                            (num_known_categories, lexical_sort_idxs)
143                        },
144                        RevMapping::Local(values, _) => {
145                            // @TODO: This should probably be cached.
146                            let lexical_sort_idxs = (ordered
147                                && matches!(ordering, CategoricalOrdering::Lexical))
148                            .then(|| {
149                                assert_eq!(values.null_count(), 0);
150                                let values: Vec<&str> = values.values_iter().collect();
151
152                                let mut idxs = (0..values.len() as u32).collect::<Vec<u32>>();
153                                idxs.sort_by_key(|&k| values[k as usize]);
154                                let mut sort_idxs = vec![0; values.len()];
155                                for (i, idx) in idxs.into_iter().enumerate_u32() {
156                                    sort_idxs[idx as usize] = i;
157                                }
158                                sort_idxs
159                            });
160
161                            (values.len() as u32, lexical_sort_idxs)
162                        },
163                    };
164
165                    RowEncodingCategoricalContext {
166                        num_known_categories,
167                        is_enum,
168                        lexical_sort_idxs,
169                    }
170                },
171                None => {
172                    let num_known_categories = u32::MAX;
173
174                    if matches!(ordering, CategoricalOrdering::Lexical) && ordered {
175                        panic!("lexical ordering not yet supported if rev-map not given");
176                    }
177                    RowEncodingCategoricalContext {
178                        num_known_categories,
179                        is_enum,
180                        lexical_sort_idxs: None,
181                    }
182                },
183            };
184
185            Some(RowEncodingContext::Categorical(ctx))
186        },
187        #[cfg(feature = "dtype-struct")]
188        DataType::Struct(fs) => {
189            let mut ctxts = Vec::new();
190
191            for (i, f) in fs.iter().enumerate() {
192                if let Some(ctxt) = get_row_encoding_context(f.dtype(), ordered) {
193                    ctxts.reserve(fs.len());
194                    ctxts.extend(std::iter::repeat_n(None, i));
195                    ctxts.push(Some(ctxt));
196                    break;
197                }
198            }
199
200            if ctxts.is_empty() {
201                return None;
202            }
203
204            ctxts.extend(
205                fs[ctxts.len()..]
206                    .iter()
207                    .map(|f| get_row_encoding_context(f.dtype(), ordered)),
208            );
209
210            Some(RowEncodingContext::Struct(ctxts))
211        },
212    }
213}
214
215pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
216    let rows = _get_rows_encoded_unordered(by)?;
217    Ok(BinaryOffsetChunked::with_chunk(
218        PlSmallStr::EMPTY,
219        rows.into_array(),
220    ))
221}
222
223pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
224    let mut cols = Vec::with_capacity(by.len());
225    let mut opts = Vec::with_capacity(by.len());
226    let mut ctxts = Vec::with_capacity(by.len());
227
228    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
229    // columns.
230    let num_rows = by.first().map_or(0, |c| c.len());
231
232    for by in by {
233        debug_assert_eq!(by.len(), num_rows);
234
235        let by = by
236            .trim_lists_to_normalized_offsets()
237            .map_or(Cow::Borrowed(by), Cow::Owned);
238        let by = by.propagate_nulls().map_or(by, Cow::Owned);
239        let by = by.as_materialized_series();
240        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
241        let opt = RowEncodingOptions::new_unsorted();
242        let ctxt = get_row_encoding_context(by.dtype(), false);
243
244        cols.push(arr);
245        opts.push(opt);
246        ctxts.push(ctxt);
247    }
248    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
249}
250
251pub fn _get_rows_encoded(
252    by: &[Column],
253    descending: &[bool],
254    nulls_last: &[bool],
255) -> PolarsResult<RowsEncoded> {
256    debug_assert_eq!(by.len(), descending.len());
257    debug_assert_eq!(by.len(), nulls_last.len());
258
259    let mut cols = Vec::with_capacity(by.len());
260    let mut opts = Vec::with_capacity(by.len());
261    let mut ctxts = Vec::with_capacity(by.len());
262
263    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
264    // columns.
265    let num_rows = by.first().map_or(0, |c| c.len());
266
267    for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
268        debug_assert_eq!(by.len(), num_rows);
269
270        let by = by
271            .trim_lists_to_normalized_offsets()
272            .map_or(Cow::Borrowed(by), Cow::Owned);
273        let by = by.propagate_nulls().map_or(by, Cow::Owned);
274        let by = by.as_materialized_series();
275        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
276        let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
277        let ctxt = get_row_encoding_context(by.dtype(), true);
278
279        cols.push(arr);
280        opts.push(opt);
281        ctxts.push(ctxt);
282    }
283    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
284}
285
286pub fn _get_rows_encoded_ca(
287    name: PlSmallStr,
288    by: &[Column],
289    descending: &[bool],
290    nulls_last: &[bool],
291) -> PolarsResult<BinaryOffsetChunked> {
292    _get_rows_encoded(by, descending, nulls_last)
293        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
294}
295
296pub fn _get_rows_encoded_arr(
297    by: &[Column],
298    descending: &[bool],
299    nulls_last: &[bool],
300) -> PolarsResult<BinaryArray<i64>> {
301    _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
302}
303
304pub fn _get_rows_encoded_ca_unordered(
305    name: PlSmallStr,
306    by: &[Column],
307) -> PolarsResult<BinaryOffsetChunked> {
308    _get_rows_encoded_unordered(by)
309        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
310}