polars_core/chunked_array/ops/
row_encode.rs
1use arrow::compute::utils::combine_validities_and_many;
2use polars_row::{
3 RowEncodingCategoricalContext, RowEncodingContext, RowEncodingOptions, RowsEncoded,
4 convert_columns,
5};
6use polars_utils::itertools::Itertools;
7use rayon::prelude::*;
8
9use crate::POOL;
10use crate::prelude::*;
11use crate::utils::_split_offsets;
12
13pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
14 let n_threads = POOL.current_num_threads();
15 let len = by[0].len();
16 let splits = _split_offsets(len, n_threads);
17
18 let chunks = splits.into_par_iter().map(|(offset, len)| {
19 let sliced = by
20 .iter()
21 .map(|s| s.slice(offset as i64, len))
22 .collect::<Vec<_>>();
23 let rows = _get_rows_encoded_unordered(&sliced)?;
24 Ok(rows.into_array())
25 });
26 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
27
28 Ok(BinaryOffsetChunked::from_chunk_iter(
29 PlSmallStr::EMPTY,
30 chunks?,
31 ))
32}
33
34pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
36 by: &[Column],
37) -> PolarsResult<BinaryOffsetChunked> {
38 let n_threads = POOL.current_num_threads();
39 let len = by[0].len();
40 let splits = _split_offsets(len, n_threads);
41
42 let chunks = splits.into_par_iter().map(|(offset, len)| {
43 let sliced = by
44 .iter()
45 .map(|s| s.slice(offset as i64, len))
46 .collect::<Vec<_>>();
47 let rows = _get_rows_encoded_unordered(&sliced)?;
48
49 let validities = sliced
50 .iter()
51 .flat_map(|s| {
52 let s = s.rechunk();
53 #[allow(clippy::unnecessary_to_owned)]
54 s.as_materialized_series()
55 .chunks()
56 .to_vec()
57 .into_iter()
58 .map(|arr| arr.validity().cloned())
59 })
60 .collect::<Vec<_>>();
61
62 let validity = combine_validities_and_many(&validities);
63 Ok(rows.into_array().with_validity_typed(validity))
64 });
65 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
66
67 Ok(BinaryOffsetChunked::from_chunk_iter(
68 PlSmallStr::EMPTY,
69 chunks?,
70 ))
71}
72
73pub fn get_row_encoding_context(dtype: &DataType, ordered: bool) -> Option<RowEncodingContext> {
78 match dtype {
79 DataType::Boolean
80 | DataType::UInt8
81 | DataType::UInt16
82 | DataType::UInt32
83 | DataType::UInt64
84 | DataType::Int8
85 | DataType::Int16
86 | DataType::Int32
87 | DataType::Int64
88 | DataType::Int128
89 | DataType::Float32
90 | DataType::Float64
91 | DataType::String
92 | DataType::Binary
93 | DataType::BinaryOffset
94 | DataType::Null
95 | DataType::Time
96 | DataType::Date
97 | DataType::Datetime(_, _)
98 | DataType::Duration(_) => None,
99
100 DataType::Unknown(_) => panic!("Unsupported in row encoding"),
101
102 #[cfg(feature = "object")]
103 DataType::Object(_) => panic!("Unsupported in row encoding"),
104
105 #[cfg(feature = "dtype-decimal")]
106 DataType::Decimal(precision, _) => {
107 Some(RowEncodingContext::Decimal(precision.unwrap_or(38)))
108 },
109
110 #[cfg(feature = "dtype-array")]
111 DataType::Array(dtype, _) => get_row_encoding_context(dtype, ordered),
112 DataType::List(dtype) => get_row_encoding_context(dtype, ordered),
113 #[cfg(feature = "dtype-categorical")]
114 DataType::Categorical(revmap, ordering) | DataType::Enum(revmap, ordering) => {
115 let is_enum = dtype.is_enum();
116 let ctx = match revmap {
117 Some(revmap) => {
118 let (num_known_categories, lexical_sort_idxs) = match revmap.as_ref() {
119 RevMapping::Global(map, _, _) => {
120 let num_known_categories =
121 map.keys().max().copied().map_or(0, |m| m + 1);
122
123 let lexical_sort_idxs = (ordered
125 && matches!(ordering, CategoricalOrdering::Lexical))
126 .then(|| {
127 let read_map = crate::STRING_CACHE.read_map();
128 let payloads = read_map.get_current_payloads();
129 assert!(payloads.len() >= num_known_categories as usize);
130
131 let mut idxs = (0..num_known_categories).collect::<Vec<u32>>();
132 idxs.sort_by_key(|&k| payloads[k as usize].as_str());
133 let mut sort_idxs = vec![0; num_known_categories as usize];
134 for (i, idx) in idxs.into_iter().enumerate_u32() {
135 sort_idxs[idx as usize] = i;
136 }
137 sort_idxs
138 });
139
140 (num_known_categories, lexical_sort_idxs)
141 },
142 RevMapping::Local(values, _) => {
143 let lexical_sort_idxs = (ordered
145 && matches!(ordering, CategoricalOrdering::Lexical))
146 .then(|| {
147 assert_eq!(values.null_count(), 0);
148 let values: Vec<&str> = values.values_iter().collect();
149
150 let mut idxs = (0..values.len() as u32).collect::<Vec<u32>>();
151 idxs.sort_by_key(|&k| values[k as usize]);
152 let mut sort_idxs = vec![0; values.len()];
153 for (i, idx) in idxs.into_iter().enumerate_u32() {
154 sort_idxs[idx as usize] = i;
155 }
156 sort_idxs
157 });
158
159 (values.len() as u32, lexical_sort_idxs)
160 },
161 };
162
163 RowEncodingCategoricalContext {
164 num_known_categories,
165 is_enum,
166 lexical_sort_idxs,
167 }
168 },
169 None => {
170 let num_known_categories = u32::MAX;
171
172 if matches!(ordering, CategoricalOrdering::Lexical) && ordered {
173 panic!("lexical ordering not yet supported if rev-map not given");
174 }
175 RowEncodingCategoricalContext {
176 num_known_categories,
177 is_enum,
178 lexical_sort_idxs: None,
179 }
180 },
181 };
182
183 Some(RowEncodingContext::Categorical(ctx))
184 },
185 #[cfg(feature = "dtype-struct")]
186 DataType::Struct(fs) => {
187 let mut ctxts = Vec::new();
188
189 for (i, f) in fs.iter().enumerate() {
190 if let Some(ctxt) = get_row_encoding_context(f.dtype(), ordered) {
191 ctxts.reserve(fs.len());
192 ctxts.extend(std::iter::repeat_n(None, i));
193 ctxts.push(Some(ctxt));
194 break;
195 }
196 }
197
198 if ctxts.is_empty() {
199 return None;
200 }
201
202 ctxts.extend(
203 fs[ctxts.len()..]
204 .iter()
205 .map(|f| get_row_encoding_context(f.dtype(), ordered)),
206 );
207
208 Some(RowEncodingContext::Struct(ctxts))
209 },
210 }
211}
212
213pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
214 let rows = _get_rows_encoded_unordered(by)?;
215 Ok(BinaryOffsetChunked::with_chunk(
216 PlSmallStr::EMPTY,
217 rows.into_array(),
218 ))
219}
220
221pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
222 let mut cols = Vec::with_capacity(by.len());
223 let mut opts = Vec::with_capacity(by.len());
224 let mut ctxts = Vec::with_capacity(by.len());
225
226 let num_rows = by.first().map_or(0, |c| c.len());
229
230 for by in by {
231 debug_assert_eq!(by.len(), num_rows);
232
233 let by = by.as_materialized_series();
234 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
235 let opt = RowEncodingOptions::new_unsorted();
236 let ctxt = get_row_encoding_context(by.dtype(), false);
237
238 cols.push(arr);
239 opts.push(opt);
240 ctxts.push(ctxt);
241 }
242 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
243}
244
245pub fn _get_rows_encoded(
246 by: &[Column],
247 descending: &[bool],
248 nulls_last: &[bool],
249) -> PolarsResult<RowsEncoded> {
250 debug_assert_eq!(by.len(), descending.len());
251 debug_assert_eq!(by.len(), nulls_last.len());
252
253 let mut cols = Vec::with_capacity(by.len());
254 let mut opts = Vec::with_capacity(by.len());
255 let mut ctxts = Vec::with_capacity(by.len());
256
257 let num_rows = by.first().map_or(0, |c| c.len());
260
261 for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
262 debug_assert_eq!(by.len(), num_rows);
263
264 let by = by.as_materialized_series();
265 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
266 let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
267 let ctxt = get_row_encoding_context(by.dtype(), true);
268
269 cols.push(arr);
270 opts.push(opt);
271 ctxts.push(ctxt);
272 }
273 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
274}
275
276pub fn _get_rows_encoded_ca(
277 name: PlSmallStr,
278 by: &[Column],
279 descending: &[bool],
280 nulls_last: &[bool],
281) -> PolarsResult<BinaryOffsetChunked> {
282 _get_rows_encoded(by, descending, nulls_last)
283 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
284}
285
286pub fn _get_rows_encoded_arr(
287 by: &[Column],
288 descending: &[bool],
289 nulls_last: &[bool],
290) -> PolarsResult<BinaryArray<i64>> {
291 _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
292}
293
294pub fn _get_rows_encoded_ca_unordered(
295 name: PlSmallStr,
296 by: &[Column],
297) -> PolarsResult<BinaryOffsetChunked> {
298 _get_rows_encoded_unordered(by)
299 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
300}