polars_core/chunked_array/ops/
row_encode.rs1use std::borrow::Cow;
2
3use arrow::compute::utils::combine_validities_and_many;
4use polars_row::{RowEncodingContext, RowEncodingOptions, RowsEncoded, convert_columns};
5use rayon::prelude::*;
6
7use crate::POOL;
8use crate::prelude::*;
9use crate::utils::_split_offsets;
10
11pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
12 let n_threads = POOL.current_num_threads();
13 let len = by[0].len();
14 let splits = _split_offsets(len, n_threads);
15
16 let chunks = splits.into_par_iter().map(|(offset, len)| {
17 let sliced = by
18 .iter()
19 .map(|s| s.slice(offset as i64, len))
20 .collect::<Vec<_>>();
21 let rows = _get_rows_encoded_unordered(&sliced)?;
22 Ok(rows.into_array())
23 });
24 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
25
26 Ok(BinaryOffsetChunked::from_chunk_iter(
27 PlSmallStr::EMPTY,
28 chunks?,
29 ))
30}
31
32pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
34 by: &[Column],
35) -> PolarsResult<BinaryOffsetChunked> {
36 let n_threads = POOL.current_num_threads();
37 let len = by[0].len();
38 let splits = _split_offsets(len, n_threads);
39
40 let chunks = splits.into_par_iter().map(|(offset, len)| {
41 let sliced = by
42 .iter()
43 .map(|s| s.slice(offset as i64, len))
44 .collect::<Vec<_>>();
45 let rows = _get_rows_encoded_unordered(&sliced)?;
46
47 let validities = sliced
48 .iter()
49 .flat_map(|s| {
50 let s = s.rechunk();
51 #[allow(clippy::unnecessary_to_owned)]
52 s.as_materialized_series()
53 .chunks()
54 .to_vec()
55 .into_iter()
56 .map(|arr| arr.validity().cloned())
57 })
58 .collect::<Vec<_>>();
59
60 let validity = combine_validities_and_many(&validities);
61 Ok(rows.into_array().with_validity_typed(validity))
62 });
63 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
64
65 Ok(BinaryOffsetChunked::from_chunk_iter(
66 PlSmallStr::EMPTY,
67 chunks?,
68 ))
69}
70
71pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
76 match dtype {
77 DataType::Boolean
78 | DataType::UInt8
79 | DataType::UInt16
80 | DataType::UInt32
81 | DataType::UInt64
82 | DataType::UInt128
83 | DataType::Int8
84 | DataType::Int16
85 | DataType::Int32
86 | DataType::Int64
87 | DataType::Int128
88 | DataType::Float16
89 | DataType::Float32
90 | DataType::Float64
91 | DataType::String
92 | DataType::Binary
93 | DataType::BinaryOffset
94 | DataType::Null
95 | DataType::Time
96 | DataType::Date
97 | DataType::Datetime(_, _)
98 | DataType::Duration(_) => None,
99
100 #[cfg(feature = "dtype-categorical")]
101 DataType::Categorical(_, mapping) | DataType::Enum(_, mapping) => {
102 use polars_row::RowEncodingCategoricalContext;
103
104 Some(RowEncodingContext::Categorical(
105 RowEncodingCategoricalContext {
106 is_enum: matches!(dtype, DataType::Enum(_, _)),
107 mapping: mapping.clone(),
108 },
109 ))
110 },
111
112 DataType::Unknown(_) => panic!("Unsupported in row encoding"),
113
114 #[cfg(feature = "object")]
115 DataType::Object(_) => panic!("Unsupported in row encoding"),
116
117 #[cfg(feature = "dtype-decimal")]
118 DataType::Decimal(precision, _) => Some(RowEncodingContext::Decimal(*precision)),
119
120 #[cfg(feature = "dtype-array")]
121 DataType::Array(dtype, _) => get_row_encoding_context(dtype),
122 DataType::List(dtype) => get_row_encoding_context(dtype),
123 #[cfg(feature = "dtype-struct")]
124 DataType::Struct(fs) => {
125 let mut ctxts = Vec::new();
126
127 for (i, f) in fs.iter().enumerate() {
128 if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
129 ctxts.reserve(fs.len());
130 ctxts.extend(std::iter::repeat_n(None, i));
131 ctxts.push(Some(ctxt));
132 break;
133 }
134 }
135
136 if ctxts.is_empty() {
137 return None;
138 }
139
140 ctxts.extend(
141 fs[ctxts.len()..]
142 .iter()
143 .map(|f| get_row_encoding_context(f.dtype())),
144 );
145
146 Some(RowEncodingContext::Struct(ctxts))
147 },
148 #[cfg(feature = "dtype-extension")]
149 DataType::Extension(_, storage) => get_row_encoding_context(storage),
150 }
151}
152
153pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
154 let rows = _get_rows_encoded_unordered(by)?;
155 Ok(BinaryOffsetChunked::with_chunk(
156 PlSmallStr::EMPTY,
157 rows.into_array(),
158 ))
159}
160
161pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
162 let mut cols = Vec::with_capacity(by.len());
163 let mut opts = Vec::with_capacity(by.len());
164 let mut ctxts = Vec::with_capacity(by.len());
165
166 let num_rows = by.first().map_or(0, |c| c.len());
169
170 for by in by {
171 debug_assert_eq!(by.len(), num_rows);
172
173 let by = by
174 .trim_lists_to_normalized_offsets()
175 .map_or(Cow::Borrowed(by), Cow::Owned);
176 let by = by.propagate_nulls().map_or(by, Cow::Owned);
177 let by = by.as_materialized_series();
178 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
179 let opt = RowEncodingOptions::new_unsorted();
180 let ctxt = get_row_encoding_context(by.dtype());
181
182 cols.push(arr);
183 opts.push(opt);
184 ctxts.push(ctxt);
185 }
186 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
187}
188
189pub fn _get_rows_encoded(
190 by: &[Column],
191 descending: &[bool],
192 nulls_last: &[bool],
193) -> PolarsResult<RowsEncoded> {
194 debug_assert_eq!(by.len(), descending.len());
195 debug_assert_eq!(by.len(), nulls_last.len());
196
197 let mut cols = Vec::with_capacity(by.len());
198 let mut opts = Vec::with_capacity(by.len());
199 let mut ctxts = Vec::with_capacity(by.len());
200
201 let num_rows = by.first().map_or(0, |c| c.len());
204
205 for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
206 debug_assert_eq!(by.len(), num_rows);
207
208 let by = by
209 .trim_lists_to_normalized_offsets()
210 .map_or(Cow::Borrowed(by), Cow::Owned);
211 let by = by.propagate_nulls().map_or(by, Cow::Owned);
212 let by = by.as_materialized_series();
213 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
214 let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
215 let ctxt = get_row_encoding_context(by.dtype());
216
217 cols.push(arr);
218 opts.push(opt);
219 ctxts.push(ctxt);
220 }
221 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
222}
223
224pub fn _get_rows_encoded_ca(
225 name: PlSmallStr,
226 by: &[Column],
227 descending: &[bool],
228 nulls_last: &[bool],
229) -> PolarsResult<BinaryOffsetChunked> {
230 _get_rows_encoded(by, descending, nulls_last)
231 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
232}
233
234pub fn _get_rows_encoded_arr(
235 by: &[Column],
236 descending: &[bool],
237 nulls_last: &[bool],
238) -> PolarsResult<BinaryArray<i64>> {
239 _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
240}
241
242pub fn _get_rows_encoded_ca_unordered(
243 name: PlSmallStr,
244 by: &[Column],
245) -> PolarsResult<BinaryOffsetChunked> {
246 _get_rows_encoded_unordered(by)
247 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
248}
249
250#[cfg(feature = "dtype-struct")]
251pub fn row_encoding_decode(
252 ca: &BinaryOffsetChunked,
253 fields: &[Field],
254 opts: &[RowEncodingOptions],
255) -> PolarsResult<StructChunked> {
256 let (ctxts, dtypes) = fields
257 .iter()
258 .map(|f| {
259 (
260 get_row_encoding_context(f.dtype()),
261 f.dtype().to_physical().to_arrow(CompatLevel::newest()),
262 )
263 })
264 .collect::<(Vec<_>, Vec<_>)>();
265
266 let struct_arrow_dtype = ArrowDataType::Struct(
267 fields
268 .iter()
269 .map(|v| v.to_physical().to_arrow(CompatLevel::newest()))
270 .collect(),
271 );
272
273 let mut rows = Vec::new();
274 let chunks = ca
275 .downcast_iter()
276 .map(|array| {
277 let decoded_arrays = unsafe {
278 polars_row::decode::decode_rows_from_binary(array, opts, &ctxts, &dtypes, &mut rows)
279 };
280 assert_eq!(decoded_arrays.len(), fields.len());
281
282 StructArray::new(
283 struct_arrow_dtype.clone(),
284 array.len(),
285 decoded_arrays,
286 None,
287 )
288 .to_boxed()
289 })
290 .collect::<Vec<_>>();
291
292 Ok(unsafe {
293 StructChunked::from_chunks_and_dtype(
294 ca.name().clone(),
295 chunks,
296 DataType::Struct(fields.to_vec()),
297 )
298 })
299}