1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_buffer::Buffer;
6use polars_core::chunked_array::builder::NullChunkedBuilder;
7use polars_core::prelude::*;
8use polars_core::series::IsSorted;
9use polars_core::utils::accumulate_dataframes_vertical;
10use polars_core::{POOL, config};
11use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
12use rayon::prelude::*;
13
14use super::mmap::mmap_columns;
15use super::utils::materialize_empty_df;
16use super::{ParallelStrategy, mmap};
17use crate::RowIndex;
18use crate::hive::materialize_hive_partitions;
19use crate::mmap::{MmapBytesReader, ReaderBytes};
20use crate::parquet::metadata::FileMetadataRef;
21use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
22use crate::utils::slice::split_slice_at_file;
23
24#[cfg(debug_assertions)]
25fn assert_dtypes(dtype: &ArrowDataType) {
28 use ArrowDataType as D;
29
30 match dtype {
31 D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
33
34 D::Float16 => unreachable!(),
36
37 D::List(_) => unreachable!(),
39
40 D::Map(_, _) => unreachable!(),
42
43 D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
45 D::Extension(ext) => assert_dtypes(&ext.inner),
46 D::LargeList(inner) => assert_dtypes(&inner.dtype),
47 D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
48 D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
49
50 _ => {},
51 }
52}
53
54fn should_copy_sortedness(dtype: &DataType) -> bool {
55 use DataType as D;
57
58 matches!(
59 dtype,
60 D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
61 )
62}
63
64pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {
65 let Some((sorted_col, is_sorted)) = sorting_map.first() else {
66 return;
67 };
68 if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {
69 return;
70 }
71 if config::verbose() {
72 eprintln!(
73 "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
74 series.name()
75 );
76 }
77
78 series.set_sorted_flag(*is_sorted);
79}
80
81pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {
82 let capacity = md.sorting_columns().map_or(0, |s| s.len());
83 let mut sorting_map = Vec::with_capacity(capacity);
84
85 if let Some(sorting_columns) = md.sorting_columns() {
86 for sorting in sorting_columns {
87 sorting_map.push((
88 sorting.column_idx as usize,
89 if sorting.descending {
90 IsSorted::Descending
91 } else {
92 IsSorted::Ascending
93 },
94 ))
95 }
96 }
97
98 sorting_map
99}
100
101fn column_idx_to_series(
102 column_i: usize,
103 field_md: &[&ColumnChunkMetadata],
105 filter: Option<Filter>,
106 file_schema: &ArrowSchema,
107 store: &mmap::ColumnStore,
108) -> PolarsResult<(Series, Bitmap)> {
109 let field = file_schema.get_at_index(column_i).unwrap().1;
110
111 #[cfg(debug_assertions)]
112 {
113 assert_dtypes(field.dtype())
114 }
115 let columns = mmap_columns(store, field_md);
116 let (arrays, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
117 let series = Series::try_from((field, arrays))?;
118
119 Ok((series, pred_true_mask))
120}
121
122#[allow(clippy::too_many_arguments)]
123fn rg_to_dfs(
124 store: &mmap::ColumnStore,
125 previous_row_count: &mut IdxSize,
126 row_group_start: usize,
127 row_group_end: usize,
128 pre_slice: (usize, usize),
129 file_metadata: &FileMetadata,
130 schema: &ArrowSchemaRef,
131 row_index: Option<RowIndex>,
132 parallel: ParallelStrategy,
133 projection: &[usize],
134 hive_partition_columns: Option<&[Series]>,
135) -> PolarsResult<Vec<DataFrame>> {
136 if config::verbose() {
137 eprintln!("parquet scan with parallel = {parallel:?}");
138 }
139
140 if projection.is_empty() {
142 if let Some(row_index) = row_index {
143 let placeholder =
144 NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
145 return Ok(vec![
146 DataFrame::new_infer_height(vec![placeholder.into_series().into_column()])?
147 .with_row_index(
148 row_index.name.clone(),
149 Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
150 )?
151 .select(std::iter::once(row_index.name))?,
152 ]);
153 }
154 }
155
156 use ParallelStrategy as S;
157
158 match parallel {
159 S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
160 store,
161 previous_row_count,
162 row_group_start,
163 row_group_end,
164 pre_slice,
165 file_metadata,
166 schema,
167 row_index,
168 parallel,
169 projection,
170 hive_partition_columns,
171 ),
172 _ => rg_to_dfs_par_over_rg(
173 store,
174 row_group_start,
175 row_group_end,
176 previous_row_count,
177 pre_slice,
178 file_metadata,
179 schema,
180 row_index,
181 projection,
182 hive_partition_columns,
183 ),
184 }
185}
186
187#[allow(clippy::too_many_arguments)]
188fn rg_to_dfs_optionally_par_over_columns(
190 store: &mmap::ColumnStore,
191 previous_row_count: &mut IdxSize,
192 row_group_start: usize,
193 row_group_end: usize,
194 slice: (usize, usize),
195 file_metadata: &FileMetadata,
196 schema: &ArrowSchemaRef,
197 row_index: Option<RowIndex>,
198 parallel: ParallelStrategy,
199 projection: &[usize],
200 hive_partition_columns: Option<&[Series]>,
201) -> PolarsResult<Vec<DataFrame>> {
202 let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
203
204 let mut n_rows_processed: usize = (0..row_group_start)
205 .map(|i| file_metadata.row_groups[i].num_rows())
206 .sum();
207 let slice_end = slice.0 + slice.1;
208
209 for rg_idx in row_group_start..row_group_end {
210 let md = &file_metadata.row_groups[rg_idx];
211
212 let rg_slice =
213 split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
214 let current_row_count = md.num_rows() as IdxSize;
215
216 let sorting_map = create_sorting_map(md);
217
218 let f = |column_i: &usize| {
219 let (name, field) = schema.get_at_index(*column_i).unwrap();
220
221 let Some(iter) = md.columns_under_root_iter(name) else {
222 return Ok(Column::full_null(
223 name.clone(),
224 rg_slice.1,
225 &DataType::from_arrow_field(field),
226 ));
227 };
228
229 let part = iter.collect::<Vec<_>>();
230
231 let (mut series, _) = column_idx_to_series(
232 *column_i,
233 part.as_slice(),
234 Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
235 schema,
236 store,
237 )?;
238
239 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
240 Ok(series.into_column())
241 };
242
243 let columns = if let ParallelStrategy::Columns = parallel {
244 POOL.install(|| {
245 projection
246 .par_iter()
247 .map(f)
248 .collect::<PolarsResult<Vec<_>>>()
249 })?
250 } else {
251 projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
252 };
253
254 let mut df = unsafe { DataFrame::new_unchecked(rg_slice.1, columns) };
255 if let Some(rc) = &row_index {
256 unsafe {
257 df.with_row_index_mut(
258 rc.name.clone(),
259 Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
260 )
261 };
262 }
263
264 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
265
266 *previous_row_count = previous_row_count
267 .checked_add(current_row_count)
268 .ok_or_else(|| {
269 polars_err!(
270 ComputeError: "Parquet file produces more than pow(2, 32) rows; \
271 consider compiling with polars-bigidx feature (pip install polars[rt64]), \
272 or set 'streaming'"
273 )
274 })?;
275 dfs.push(df);
276
277 if *previous_row_count as usize >= slice_end {
278 break;
279 }
280 }
281
282 Ok(dfs)
283}
284
285#[allow(clippy::too_many_arguments)]
286fn rg_to_dfs_par_over_rg(
288 store: &mmap::ColumnStore,
289 row_group_start: usize,
290 row_group_end: usize,
291 rows_read: &mut IdxSize,
292 slice: (usize, usize),
293 file_metadata: &FileMetadata,
294 schema: &ArrowSchemaRef,
295 row_index: Option<RowIndex>,
296 projection: &[usize],
297 hive_partition_columns: Option<&[Series]>,
298) -> PolarsResult<Vec<DataFrame>> {
299 let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
301
302 let mut n_rows_processed: usize = (0..row_group_start)
303 .map(|i| file_metadata.row_groups[i].num_rows())
304 .sum();
305 let slice_end = slice.0 + slice.1;
306
307 let mut rows_scanned: IdxSize;
311
312 if row_group_start > 0 {
313 rows_scanned = (0..row_group_start)
317 .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
318 .sum();
319 } else {
320 rows_scanned = 0;
321 }
322
323 for i in row_group_start..row_group_end {
324 let row_count_start = rows_scanned;
325 let rg_md = &file_metadata.row_groups[i];
326 let n_rows_this_file = rg_md.num_rows();
327 let rg_slice =
328 split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
329 rows_scanned = rows_scanned
330 .checked_add(n_rows_this_file as IdxSize)
331 .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
332
333 *rows_read += rg_slice.1 as IdxSize;
334
335 if rg_slice.1 == 0 {
336 continue;
337 }
338
339 row_groups.push((rg_md, rg_slice, row_count_start));
340 }
341
342 let dfs = POOL.install(|| {
343 row_groups
346 .into_par_iter()
347 .map(|(md, slice, row_count_start)| {
348 if slice.1 == 0 {
349 return Ok(None);
350 }
351 #[cfg(debug_assertions)]
353 {
354 assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
355 }
356
357 let sorting_map = create_sorting_map(md);
358
359 let columns = projection
360 .iter()
361 .map(|column_i| {
362 let (name, field) = schema.get_at_index(*column_i).unwrap();
363
364 let Some(iter) = md.columns_under_root_iter(name) else {
365 return Ok(Column::full_null(
366 name.clone(),
367 md.num_rows(),
368 &DataType::from_arrow_field(field),
369 ));
370 };
371
372 let part = iter.collect::<Vec<_>>();
373
374 let (mut series, _) = column_idx_to_series(
375 *column_i,
376 part.as_slice(),
377 Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
378 schema,
379 store,
380 )?;
381
382 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
383 Ok(series.into_column())
384 })
385 .collect::<PolarsResult<Vec<_>>>()?;
386
387 let mut df = unsafe { DataFrame::new_unchecked(slice.1, columns) };
388
389 if let Some(rc) = &row_index {
390 unsafe {
391 df.with_row_index_mut(
392 rc.name.clone(),
393 Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
394 )
395 };
396 }
397
398 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
399
400 Ok(Some(df))
401 })
402 .collect::<PolarsResult<Vec<_>>>()
403 })?;
404 Ok(dfs.into_iter().flatten().collect())
405}
406
407#[allow(clippy::too_many_arguments)]
408pub fn read_parquet<R: MmapBytesReader>(
409 mut reader: R,
410 pre_slice: (usize, usize),
411 projection: Option<&[usize]>,
412 reader_schema: &ArrowSchemaRef,
413 metadata: Option<FileMetadataRef>,
414 mut parallel: ParallelStrategy,
415 row_index: Option<RowIndex>,
416 hive_partition_columns: Option<&[Series]>,
417) -> PolarsResult<DataFrame> {
418 if pre_slice.1 == 0 {
420 return Ok(materialize_empty_df(
421 projection,
422 reader_schema,
423 hive_partition_columns,
424 row_index.as_ref(),
425 ));
426 }
427
428 let file_metadata = metadata
429 .map(Ok)
430 .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
431 let n_row_groups = file_metadata.row_groups.len();
432
433 let materialized_projection = projection
434 .map(Cow::Borrowed)
435 .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
436
437 if ParallelStrategy::Auto == parallel {
438 if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
439 {
440 parallel = ParallelStrategy::RowGroups;
441 } else {
442 parallel = ParallelStrategy::Columns;
443 }
444 }
445
446 if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
447 parallel = ParallelStrategy::None;
448 }
449
450 let reader = ReaderBytes::from(&mut reader);
451 Buffer::with_slice(&reader, |buf| {
452 let store = mmap::ColumnStore::Local(buf);
453 let dfs = rg_to_dfs(
454 &store,
455 &mut 0,
456 0,
457 n_row_groups,
458 pre_slice,
459 &file_metadata,
460 reader_schema,
461 row_index.clone(),
462 parallel,
463 &materialized_projection,
464 hive_partition_columns,
465 )?;
466
467 if dfs.is_empty() {
468 Ok(materialize_empty_df(
469 projection,
470 reader_schema,
471 hive_partition_columns,
472 row_index.as_ref(),
473 ))
474 } else {
475 accumulate_dataframes_vertical(dfs)
476 }
477 })
478}
479
480pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
481 let num_edges = mask.num_edges() as f64;
482 let rg_len = mask.len() as f64;
483
484 (num_edges / rg_len).clamp(0.0, 1.0)
501}
502
503#[derive(Clone, Copy)]
504pub enum PrefilterMaskSetting {
505 Auto,
506 Pre,
507 Post,
508}
509
510impl PrefilterMaskSetting {
511 pub fn init_from_env() -> Self {
512 std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
513 "auto" => Self::Auto,
514 "pre" => Self::Pre,
515 "post" => Self::Post,
516 _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
517 })
518 }
519
520 pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
521 match self {
522 Self::Auto => {
523 let is_nested = dtype.is_nested();
526
527 !is_nested && prefilter_cost <= 0.01
529 },
530 Self::Pre => true,
531 Self::Post => false,
532 }
533 }
534}