polars_core/chunked_array/ops/
row_encode.rs1use std::borrow::Cow;
2
3use arrow::compute::utils::combine_validities_and_many;
4use polars_row::{
5 RowEncodingCategoricalContext, RowEncodingContext, RowEncodingOptions, RowsEncoded,
6 convert_columns,
7};
8use polars_utils::itertools::Itertools;
9use rayon::prelude::*;
10
11use crate::POOL;
12use crate::prelude::*;
13use crate::utils::_split_offsets;
14
15pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
16 let n_threads = POOL.current_num_threads();
17 let len = by[0].len();
18 let splits = _split_offsets(len, n_threads);
19
20 let chunks = splits.into_par_iter().map(|(offset, len)| {
21 let sliced = by
22 .iter()
23 .map(|s| s.slice(offset as i64, len))
24 .collect::<Vec<_>>();
25 let rows = _get_rows_encoded_unordered(&sliced)?;
26 Ok(rows.into_array())
27 });
28 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
29
30 Ok(BinaryOffsetChunked::from_chunk_iter(
31 PlSmallStr::EMPTY,
32 chunks?,
33 ))
34}
35
36pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
38 by: &[Column],
39) -> PolarsResult<BinaryOffsetChunked> {
40 let n_threads = POOL.current_num_threads();
41 let len = by[0].len();
42 let splits = _split_offsets(len, n_threads);
43
44 let chunks = splits.into_par_iter().map(|(offset, len)| {
45 let sliced = by
46 .iter()
47 .map(|s| s.slice(offset as i64, len))
48 .collect::<Vec<_>>();
49 let rows = _get_rows_encoded_unordered(&sliced)?;
50
51 let validities = sliced
52 .iter()
53 .flat_map(|s| {
54 let s = s.rechunk();
55 #[allow(clippy::unnecessary_to_owned)]
56 s.as_materialized_series()
57 .chunks()
58 .to_vec()
59 .into_iter()
60 .map(|arr| arr.validity().cloned())
61 })
62 .collect::<Vec<_>>();
63
64 let validity = combine_validities_and_many(&validities);
65 Ok(rows.into_array().with_validity_typed(validity))
66 });
67 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
68
69 Ok(BinaryOffsetChunked::from_chunk_iter(
70 PlSmallStr::EMPTY,
71 chunks?,
72 ))
73}
74
75pub fn get_row_encoding_context(dtype: &DataType, ordered: bool) -> Option<RowEncodingContext> {
80 match dtype {
81 DataType::Boolean
82 | DataType::UInt8
83 | DataType::UInt16
84 | DataType::UInt32
85 | DataType::UInt64
86 | DataType::Int8
87 | DataType::Int16
88 | DataType::Int32
89 | DataType::Int64
90 | DataType::Int128
91 | DataType::Float32
92 | DataType::Float64
93 | DataType::String
94 | DataType::Binary
95 | DataType::BinaryOffset
96 | DataType::Null
97 | DataType::Time
98 | DataType::Date
99 | DataType::Datetime(_, _)
100 | DataType::Duration(_) => None,
101
102 DataType::Unknown(_) => panic!("Unsupported in row encoding"),
103
104 #[cfg(feature = "object")]
105 DataType::Object(_) => panic!("Unsupported in row encoding"),
106
107 #[cfg(feature = "dtype-decimal")]
108 DataType::Decimal(precision, _) => {
109 Some(RowEncodingContext::Decimal(precision.unwrap_or(38)))
110 },
111
112 #[cfg(feature = "dtype-array")]
113 DataType::Array(dtype, _) => get_row_encoding_context(dtype, ordered),
114 DataType::List(dtype) => get_row_encoding_context(dtype, ordered),
115 #[cfg(feature = "dtype-categorical")]
116 DataType::Categorical(revmap, ordering) | DataType::Enum(revmap, ordering) => {
117 let is_enum = dtype.is_enum();
118 let ctx = match revmap {
119 Some(revmap) => {
120 let (num_known_categories, lexical_sort_idxs) = match revmap.as_ref() {
121 RevMapping::Global(map, _, _) => {
122 let num_known_categories =
123 map.keys().max().copied().map_or(0, |m| m + 1);
124
125 let lexical_sort_idxs = (ordered
127 && matches!(ordering, CategoricalOrdering::Lexical))
128 .then(|| {
129 let read_map = crate::STRING_CACHE.read_map();
130 let payloads = read_map.get_current_payloads();
131 assert!(payloads.len() >= num_known_categories as usize);
132
133 let mut idxs = (0..num_known_categories).collect::<Vec<u32>>();
134 idxs.sort_by_key(|&k| payloads[k as usize].as_str());
135 let mut sort_idxs = vec![0; num_known_categories as usize];
136 for (i, idx) in idxs.into_iter().enumerate_u32() {
137 sort_idxs[idx as usize] = i;
138 }
139 sort_idxs
140 });
141
142 (num_known_categories, lexical_sort_idxs)
143 },
144 RevMapping::Local(values, _) => {
145 let lexical_sort_idxs = (ordered
147 && matches!(ordering, CategoricalOrdering::Lexical))
148 .then(|| {
149 assert_eq!(values.null_count(), 0);
150 let values: Vec<&str> = values.values_iter().collect();
151
152 let mut idxs = (0..values.len() as u32).collect::<Vec<u32>>();
153 idxs.sort_by_key(|&k| values[k as usize]);
154 let mut sort_idxs = vec![0; values.len()];
155 for (i, idx) in idxs.into_iter().enumerate_u32() {
156 sort_idxs[idx as usize] = i;
157 }
158 sort_idxs
159 });
160
161 (values.len() as u32, lexical_sort_idxs)
162 },
163 };
164
165 RowEncodingCategoricalContext {
166 num_known_categories,
167 is_enum,
168 lexical_sort_idxs,
169 }
170 },
171 None => {
172 let num_known_categories = u32::MAX;
173
174 if matches!(ordering, CategoricalOrdering::Lexical) && ordered {
175 panic!("lexical ordering not yet supported if rev-map not given");
176 }
177 RowEncodingCategoricalContext {
178 num_known_categories,
179 is_enum,
180 lexical_sort_idxs: None,
181 }
182 },
183 };
184
185 Some(RowEncodingContext::Categorical(ctx))
186 },
187 #[cfg(feature = "dtype-struct")]
188 DataType::Struct(fs) => {
189 let mut ctxts = Vec::new();
190
191 for (i, f) in fs.iter().enumerate() {
192 if let Some(ctxt) = get_row_encoding_context(f.dtype(), ordered) {
193 ctxts.reserve(fs.len());
194 ctxts.extend(std::iter::repeat_n(None, i));
195 ctxts.push(Some(ctxt));
196 break;
197 }
198 }
199
200 if ctxts.is_empty() {
201 return None;
202 }
203
204 ctxts.extend(
205 fs[ctxts.len()..]
206 .iter()
207 .map(|f| get_row_encoding_context(f.dtype(), ordered)),
208 );
209
210 Some(RowEncodingContext::Struct(ctxts))
211 },
212 }
213}
214
215pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
216 let rows = _get_rows_encoded_unordered(by)?;
217 Ok(BinaryOffsetChunked::with_chunk(
218 PlSmallStr::EMPTY,
219 rows.into_array(),
220 ))
221}
222
223pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
224 let mut cols = Vec::with_capacity(by.len());
225 let mut opts = Vec::with_capacity(by.len());
226 let mut ctxts = Vec::with_capacity(by.len());
227
228 let num_rows = by.first().map_or(0, |c| c.len());
231
232 for by in by {
233 debug_assert_eq!(by.len(), num_rows);
234
235 let by = by
236 .trim_lists_to_normalized_offsets()
237 .map_or(Cow::Borrowed(by), Cow::Owned);
238 let by = by.propagate_nulls().map_or(by, Cow::Owned);
239 let by = by.as_materialized_series();
240 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
241 let opt = RowEncodingOptions::new_unsorted();
242 let ctxt = get_row_encoding_context(by.dtype(), false);
243
244 cols.push(arr);
245 opts.push(opt);
246 ctxts.push(ctxt);
247 }
248 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
249}
250
251pub fn _get_rows_encoded(
252 by: &[Column],
253 descending: &[bool],
254 nulls_last: &[bool],
255) -> PolarsResult<RowsEncoded> {
256 debug_assert_eq!(by.len(), descending.len());
257 debug_assert_eq!(by.len(), nulls_last.len());
258
259 let mut cols = Vec::with_capacity(by.len());
260 let mut opts = Vec::with_capacity(by.len());
261 let mut ctxts = Vec::with_capacity(by.len());
262
263 let num_rows = by.first().map_or(0, |c| c.len());
266
267 for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
268 debug_assert_eq!(by.len(), num_rows);
269
270 let by = by
271 .trim_lists_to_normalized_offsets()
272 .map_or(Cow::Borrowed(by), Cow::Owned);
273 let by = by.propagate_nulls().map_or(by, Cow::Owned);
274 let by = by.as_materialized_series();
275 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
276 let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
277 let ctxt = get_row_encoding_context(by.dtype(), true);
278
279 cols.push(arr);
280 opts.push(opt);
281 ctxts.push(ctxt);
282 }
283 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
284}
285
286pub fn _get_rows_encoded_ca(
287 name: PlSmallStr,
288 by: &[Column],
289 descending: &[bool],
290 nulls_last: &[bool],
291) -> PolarsResult<BinaryOffsetChunked> {
292 _get_rows_encoded(by, descending, nulls_last)
293 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
294}
295
296pub fn _get_rows_encoded_arr(
297 by: &[Column],
298 descending: &[bool],
299 nulls_last: &[bool],
300) -> PolarsResult<BinaryArray<i64>> {
301 _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
302}
303
304pub fn _get_rows_encoded_ca_unordered(
305 name: PlSmallStr,
306 by: &[Column],
307) -> PolarsResult<BinaryOffsetChunked> {
308 _get_rows_encoded_unordered(by)
309 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
310}