polars_core/chunked_array/ops/
row_encode.rs

1use arrow::compute::utils::combine_validities_and_many;
2use polars_row::{
3    RowEncodingCategoricalContext, RowEncodingContext, RowEncodingOptions, RowsEncoded,
4    convert_columns,
5};
6use polars_utils::itertools::Itertools;
7use rayon::prelude::*;
8
9use crate::POOL;
10use crate::prelude::*;
11use crate::utils::_split_offsets;
12
13pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
14    let n_threads = POOL.current_num_threads();
15    let len = by[0].len();
16    let splits = _split_offsets(len, n_threads);
17
18    let chunks = splits.into_par_iter().map(|(offset, len)| {
19        let sliced = by
20            .iter()
21            .map(|s| s.slice(offset as i64, len))
22            .collect::<Vec<_>>();
23        let rows = _get_rows_encoded_unordered(&sliced)?;
24        Ok(rows.into_array())
25    });
26    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
27
28    Ok(BinaryOffsetChunked::from_chunk_iter(
29        PlSmallStr::EMPTY,
30        chunks?,
31    ))
32}
33
34// Almost the same but broadcast nulls to the row-encoded array.
35pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
36    by: &[Column],
37) -> PolarsResult<BinaryOffsetChunked> {
38    let n_threads = POOL.current_num_threads();
39    let len = by[0].len();
40    let splits = _split_offsets(len, n_threads);
41
42    let chunks = splits.into_par_iter().map(|(offset, len)| {
43        let sliced = by
44            .iter()
45            .map(|s| s.slice(offset as i64, len))
46            .collect::<Vec<_>>();
47        let rows = _get_rows_encoded_unordered(&sliced)?;
48
49        let validities = sliced
50            .iter()
51            .flat_map(|s| {
52                let s = s.rechunk();
53                #[allow(clippy::unnecessary_to_owned)]
54                s.as_materialized_series()
55                    .chunks()
56                    .to_vec()
57                    .into_iter()
58                    .map(|arr| arr.validity().cloned())
59            })
60            .collect::<Vec<_>>();
61
62        let validity = combine_validities_and_many(&validities);
63        Ok(rows.into_array().with_validity_typed(validity))
64    });
65    let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
66
67    Ok(BinaryOffsetChunked::from_chunk_iter(
68        PlSmallStr::EMPTY,
69        chunks?,
70    ))
71}
72
73/// Get the [`RowEncodingContext`] for a certain [`DataType`].
74///
75/// This should be given the logical type in order to communicate Polars datatype information down
76/// into the row encoding / decoding.
77pub fn get_row_encoding_context(dtype: &DataType, ordered: bool) -> Option<RowEncodingContext> {
78    match dtype {
79        DataType::Boolean
80        | DataType::UInt8
81        | DataType::UInt16
82        | DataType::UInt32
83        | DataType::UInt64
84        | DataType::Int8
85        | DataType::Int16
86        | DataType::Int32
87        | DataType::Int64
88        | DataType::Int128
89        | DataType::Float32
90        | DataType::Float64
91        | DataType::String
92        | DataType::Binary
93        | DataType::BinaryOffset
94        | DataType::Null
95        | DataType::Time
96        | DataType::Date
97        | DataType::Datetime(_, _)
98        | DataType::Duration(_) => None,
99
100        DataType::Unknown(_) => panic!("Unsupported in row encoding"),
101
102        #[cfg(feature = "object")]
103        DataType::Object(_) => panic!("Unsupported in row encoding"),
104
105        #[cfg(feature = "dtype-decimal")]
106        DataType::Decimal(precision, _) => {
107            Some(RowEncodingContext::Decimal(precision.unwrap_or(38)))
108        },
109
110        #[cfg(feature = "dtype-array")]
111        DataType::Array(dtype, _) => get_row_encoding_context(dtype, ordered),
112        DataType::List(dtype) => get_row_encoding_context(dtype, ordered),
113        #[cfg(feature = "dtype-categorical")]
114        DataType::Categorical(revmap, ordering) | DataType::Enum(revmap, ordering) => {
115            let is_enum = dtype.is_enum();
116            let ctx = match revmap {
117                Some(revmap) => {
118                    let (num_known_categories, lexical_sort_idxs) = match revmap.as_ref() {
119                        RevMapping::Global(map, _, _) => {
120                            let num_known_categories =
121                                map.keys().max().copied().map_or(0, |m| m + 1);
122
123                            // @TODO: This should probably be cached.
124                            let lexical_sort_idxs = (ordered
125                                && matches!(ordering, CategoricalOrdering::Lexical))
126                            .then(|| {
127                                let read_map = crate::STRING_CACHE.read_map();
128                                let payloads = read_map.get_current_payloads();
129                                assert!(payloads.len() >= num_known_categories as usize);
130
131                                let mut idxs = (0..num_known_categories).collect::<Vec<u32>>();
132                                idxs.sort_by_key(|&k| payloads[k as usize].as_str());
133                                let mut sort_idxs = vec![0; num_known_categories as usize];
134                                for (i, idx) in idxs.into_iter().enumerate_u32() {
135                                    sort_idxs[idx as usize] = i;
136                                }
137                                sort_idxs
138                            });
139
140                            (num_known_categories, lexical_sort_idxs)
141                        },
142                        RevMapping::Local(values, _) => {
143                            // @TODO: This should probably be cached.
144                            let lexical_sort_idxs = (ordered
145                                && matches!(ordering, CategoricalOrdering::Lexical))
146                            .then(|| {
147                                assert_eq!(values.null_count(), 0);
148                                let values: Vec<&str> = values.values_iter().collect();
149
150                                let mut idxs = (0..values.len() as u32).collect::<Vec<u32>>();
151                                idxs.sort_by_key(|&k| values[k as usize]);
152                                let mut sort_idxs = vec![0; values.len()];
153                                for (i, idx) in idxs.into_iter().enumerate_u32() {
154                                    sort_idxs[idx as usize] = i;
155                                }
156                                sort_idxs
157                            });
158
159                            (values.len() as u32, lexical_sort_idxs)
160                        },
161                    };
162
163                    RowEncodingCategoricalContext {
164                        num_known_categories,
165                        is_enum,
166                        lexical_sort_idxs,
167                    }
168                },
169                None => {
170                    let num_known_categories = u32::MAX;
171
172                    if matches!(ordering, CategoricalOrdering::Lexical) && ordered {
173                        panic!("lexical ordering not yet supported if rev-map not given");
174                    }
175                    RowEncodingCategoricalContext {
176                        num_known_categories,
177                        is_enum,
178                        lexical_sort_idxs: None,
179                    }
180                },
181            };
182
183            Some(RowEncodingContext::Categorical(ctx))
184        },
185        #[cfg(feature = "dtype-struct")]
186        DataType::Struct(fs) => {
187            let mut ctxts = Vec::new();
188
189            for (i, f) in fs.iter().enumerate() {
190                if let Some(ctxt) = get_row_encoding_context(f.dtype(), ordered) {
191                    ctxts.reserve(fs.len());
192                    ctxts.extend(std::iter::repeat_n(None, i));
193                    ctxts.push(Some(ctxt));
194                    break;
195                }
196            }
197
198            if ctxts.is_empty() {
199                return None;
200            }
201
202            ctxts.extend(
203                fs[ctxts.len()..]
204                    .iter()
205                    .map(|f| get_row_encoding_context(f.dtype(), ordered)),
206            );
207
208            Some(RowEncodingContext::Struct(ctxts))
209        },
210    }
211}
212
213pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
214    let rows = _get_rows_encoded_unordered(by)?;
215    Ok(BinaryOffsetChunked::with_chunk(
216        PlSmallStr::EMPTY,
217        rows.into_array(),
218    ))
219}
220
221pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
222    let mut cols = Vec::with_capacity(by.len());
223    let mut opts = Vec::with_capacity(by.len());
224    let mut ctxts = Vec::with_capacity(by.len());
225
226    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
227    // columns.
228    let num_rows = by.first().map_or(0, |c| c.len());
229
230    for by in by {
231        debug_assert_eq!(by.len(), num_rows);
232
233        let by = by.as_materialized_series();
234        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
235        let opt = RowEncodingOptions::new_unsorted();
236        let ctxt = get_row_encoding_context(by.dtype(), false);
237
238        cols.push(arr);
239        opts.push(opt);
240        ctxts.push(ctxt);
241    }
242    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
243}
244
245pub fn _get_rows_encoded(
246    by: &[Column],
247    descending: &[bool],
248    nulls_last: &[bool],
249) -> PolarsResult<RowsEncoded> {
250    debug_assert_eq!(by.len(), descending.len());
251    debug_assert_eq!(by.len(), nulls_last.len());
252
253    let mut cols = Vec::with_capacity(by.len());
254    let mut opts = Vec::with_capacity(by.len());
255    let mut ctxts = Vec::with_capacity(by.len());
256
257    // Since ZFS exists, we might not actually have any arrays and need to get the length from the
258    // columns.
259    let num_rows = by.first().map_or(0, |c| c.len());
260
261    for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
262        debug_assert_eq!(by.len(), num_rows);
263
264        let by = by.as_materialized_series();
265        let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
266        let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
267        let ctxt = get_row_encoding_context(by.dtype(), true);
268
269        cols.push(arr);
270        opts.push(opt);
271        ctxts.push(ctxt);
272    }
273    Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
274}
275
276pub fn _get_rows_encoded_ca(
277    name: PlSmallStr,
278    by: &[Column],
279    descending: &[bool],
280    nulls_last: &[bool],
281) -> PolarsResult<BinaryOffsetChunked> {
282    _get_rows_encoded(by, descending, nulls_last)
283        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
284}
285
286pub fn _get_rows_encoded_arr(
287    by: &[Column],
288    descending: &[bool],
289    nulls_last: &[bool],
290) -> PolarsResult<BinaryArray<i64>> {
291    _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
292}
293
294pub fn _get_rows_encoded_ca_unordered(
295    name: PlSmallStr,
296    by: &[Column],
297) -> PolarsResult<BinaryOffsetChunked> {
298    _get_rows_encoded_unordered(by)
299        .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
300}