polars_core/chunked_array/ops/
row_encode.rs1use std::borrow::Cow;
2
3use arrow::compute::utils::combine_validities_and_many;
4use polars_row::{RowEncodingContext, RowEncodingOptions, RowsEncoded, convert_columns};
5use polars_utils::itertools::Itertools;
6use rayon::prelude::*;
7
8use crate::POOL;
9use crate::prelude::*;
10use crate::utils::_split_offsets;
11
12pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
13 let n_threads = POOL.current_num_threads();
14 let len = by[0].len();
15 let splits = _split_offsets(len, n_threads);
16
17 let chunks = splits.into_par_iter().map(|(offset, len)| {
18 let sliced = by
19 .iter()
20 .map(|s| s.slice(offset as i64, len))
21 .collect::<Vec<_>>();
22 let rows = _get_rows_encoded_unordered(&sliced)?;
23 Ok(rows.into_array())
24 });
25 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
26
27 Ok(BinaryOffsetChunked::from_chunk_iter(
28 PlSmallStr::EMPTY,
29 chunks?,
30 ))
31}
32
33pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
35 by: &[Column],
36) -> PolarsResult<BinaryOffsetChunked> {
37 let n_threads = POOL.current_num_threads();
38 let len = by[0].len();
39 let splits = _split_offsets(len, n_threads);
40
41 let chunks = splits.into_par_iter().map(|(offset, len)| {
42 let sliced = by
43 .iter()
44 .map(|s| s.slice(offset as i64, len))
45 .collect::<Vec<_>>();
46 let rows = _get_rows_encoded_unordered(&sliced)?;
47
48 let validities = sliced
49 .iter()
50 .flat_map(|s| {
51 let s = s.rechunk();
52 #[allow(clippy::unnecessary_to_owned)]
53 s.as_materialized_series()
54 .chunks()
55 .to_vec()
56 .into_iter()
57 .map(|arr| arr.validity().cloned())
58 })
59 .collect::<Vec<_>>();
60
61 let validity = combine_validities_and_many(&validities);
62 Ok(rows.into_array().with_validity_typed(validity))
63 });
64 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
65
66 Ok(BinaryOffsetChunked::from_chunk_iter(
67 PlSmallStr::EMPTY,
68 chunks?,
69 ))
70}
71
72pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
77 match dtype {
78 DataType::Boolean
79 | DataType::UInt8
80 | DataType::UInt16
81 | DataType::UInt32
82 | DataType::UInt64
83 | DataType::UInt128
84 | DataType::Int8
85 | DataType::Int16
86 | DataType::Int32
87 | DataType::Int64
88 | DataType::Int128
89 | DataType::Float16
90 | DataType::Float32
91 | DataType::Float64
92 | DataType::String
93 | DataType::Binary
94 | DataType::BinaryOffset
95 | DataType::Null
96 | DataType::Time
97 | DataType::Date
98 | DataType::Datetime(_, _)
99 | DataType::Duration(_) => None,
100
101 #[cfg(feature = "dtype-categorical")]
102 DataType::Categorical(_, mapping) | DataType::Enum(_, mapping) => {
103 use polars_row::RowEncodingCategoricalContext;
104
105 Some(RowEncodingContext::Categorical(
106 RowEncodingCategoricalContext {
107 is_enum: matches!(dtype, DataType::Enum(_, _)),
108 mapping: mapping.clone(),
109 },
110 ))
111 },
112
113 DataType::Unknown(_) => panic!("Unsupported in row encoding"),
114
115 #[cfg(feature = "object")]
116 DataType::Object(_) => panic!("Unsupported in row encoding"),
117
118 #[cfg(feature = "dtype-decimal")]
119 DataType::Decimal(precision, _) => Some(RowEncodingContext::Decimal(*precision)),
120
121 #[cfg(feature = "dtype-array")]
122 DataType::Array(dtype, _) => get_row_encoding_context(dtype),
123 DataType::List(dtype) => get_row_encoding_context(dtype),
124 #[cfg(feature = "dtype-struct")]
125 DataType::Struct(fs) => {
126 let mut ctxts = Vec::new();
127
128 for (i, f) in fs.iter().enumerate() {
129 if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
130 ctxts.reserve(fs.len());
131 ctxts.extend(std::iter::repeat_n(None, i));
132 ctxts.push(Some(ctxt));
133 break;
134 }
135 }
136
137 if ctxts.is_empty() {
138 return None;
139 }
140
141 ctxts.extend(
142 fs[ctxts.len()..]
143 .iter()
144 .map(|f| get_row_encoding_context(f.dtype())),
145 );
146
147 Some(RowEncodingContext::Struct(ctxts))
148 },
149 #[cfg(feature = "dtype-extension")]
150 DataType::Extension(_, storage) => get_row_encoding_context(storage),
151 }
152}
153
154pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
155 let rows = _get_rows_encoded_unordered(by)?;
156 Ok(BinaryOffsetChunked::with_chunk(
157 PlSmallStr::EMPTY,
158 rows.into_array(),
159 ))
160}
161
162pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
163 let mut cols = Vec::with_capacity(by.len());
164 let mut opts = Vec::with_capacity(by.len());
165 let mut ctxts = Vec::with_capacity(by.len());
166
167 let num_rows = by.first().map_or(0, |c| c.len());
170
171 for by in by {
172 debug_assert_eq!(by.len(), num_rows);
173
174 let by = by
175 .trim_lists_to_normalized_offsets()
176 .map_or(Cow::Borrowed(by), Cow::Owned);
177 let by = by.propagate_nulls().map_or(by, Cow::Owned);
178 let by = by.as_materialized_series();
179 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
180 let opt = RowEncodingOptions::new_unsorted();
181 let ctxt = get_row_encoding_context(by.dtype());
182
183 cols.push(arr);
184 opts.push(opt);
185 ctxts.push(ctxt);
186 }
187 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
188}
189
190pub fn _get_rows_encoded(
191 by: &[Column],
192 descending: &[bool],
193 nulls_last: &[bool],
194) -> PolarsResult<RowsEncoded> {
195 debug_assert_eq!(by.len(), descending.len());
196 debug_assert_eq!(by.len(), nulls_last.len());
197
198 let mut cols = Vec::with_capacity(by.len());
199 let mut opts = Vec::with_capacity(by.len());
200 let mut ctxts = Vec::with_capacity(by.len());
201
202 let num_rows = by.first().map_or(0, |c| c.len());
205
206 for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
207 debug_assert_eq!(by.len(), num_rows);
208
209 let by = by
210 .trim_lists_to_normalized_offsets()
211 .map_or(Cow::Borrowed(by), Cow::Owned);
212 let by = by.propagate_nulls().map_or(by, Cow::Owned);
213 let by = by.as_materialized_series();
214 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
215 let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
216 let ctxt = get_row_encoding_context(by.dtype());
217
218 cols.push(arr);
219 opts.push(opt);
220 ctxts.push(ctxt);
221 }
222 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
223}
224
225pub fn _get_rows_encoded_ca(
226 name: PlSmallStr,
227 by: &[Column],
228 descending: &[bool],
229 nulls_last: &[bool],
230 broadcast_nulls: bool,
231) -> PolarsResult<BinaryOffsetChunked> {
232 let mut rows_arr = _get_rows_encoded(by, descending, nulls_last)?.into_array();
233 if broadcast_nulls {
234 let validities = by
235 .iter()
236 .map(|c| c.as_materialized_series().rechunk_validity())
237 .collect_vec();
238 let combined = combine_validities_and_many(&validities);
239 rows_arr.set_validity(combined);
240 }
241 Ok(BinaryOffsetChunked::with_chunk(name, rows_arr))
242}
243
244pub fn _get_rows_encoded_arr(
245 by: &[Column],
246 descending: &[bool],
247 nulls_last: &[bool],
248 broadcast_nulls: bool,
249) -> PolarsResult<BinaryArray<i64>> {
250 let mut rows_arr = _get_rows_encoded(by, descending, nulls_last)?.into_array();
251 if broadcast_nulls {
252 let validities = by
253 .iter()
254 .map(|c| c.as_materialized_series().rechunk_validity())
255 .collect_vec();
256 let combined = combine_validities_and_many(&validities);
257 rows_arr.set_validity(combined);
258 }
259 Ok(rows_arr)
260}
261
262pub fn _get_rows_encoded_ca_unordered(
263 name: PlSmallStr,
264 by: &[Column],
265) -> PolarsResult<BinaryOffsetChunked> {
266 _get_rows_encoded_unordered(by)
267 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
268}
269
270#[cfg(feature = "dtype-struct")]
271pub fn row_encoding_decode(
272 ca: &BinaryOffsetChunked,
273 fields: &[Field],
274 opts: &[RowEncodingOptions],
275) -> PolarsResult<StructChunked> {
276 let (ctxts, dtypes) = fields
277 .iter()
278 .map(|f| {
279 (
280 get_row_encoding_context(f.dtype()),
281 f.dtype().to_physical().to_arrow(CompatLevel::newest()),
282 )
283 })
284 .collect::<(Vec<_>, Vec<_>)>();
285
286 let struct_arrow_dtype = ArrowDataType::Struct(
287 fields
288 .iter()
289 .map(|v| v.to_physical().to_arrow(CompatLevel::newest()))
290 .collect(),
291 );
292
293 let mut rows = Vec::new();
294 let chunks = ca
295 .downcast_iter()
296 .map(|array| {
297 let decoded_arrays = unsafe {
298 polars_row::decode::decode_rows_from_binary(array, opts, &ctxts, &dtypes, &mut rows)
299 };
300 assert_eq!(decoded_arrays.len(), fields.len());
301
302 StructArray::new(
303 struct_arrow_dtype.clone(),
304 array.len(),
305 decoded_arrays,
306 None,
307 )
308 .to_boxed()
309 })
310 .collect::<Vec<_>>();
311
312 Ok(unsafe {
313 StructChunked::from_chunks_and_dtype(
314 ca.name().clone(),
315 chunks,
316 DataType::Struct(fields.to_vec()),
317 )
318 })
319}