1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::ops::Range;
4
5use arrow::array::BooleanArray;
6use arrow::bitmap::{Bitmap, BitmapBuilder};
7use arrow::datatypes::ArrowSchemaRef;
8use polars_core::chunked_array::builder::NullChunkedBuilder;
9use polars_core::prelude::*;
10use polars_core::series::IsSorted;
11use polars_core::utils::{accumulate_dataframes_vertical, split_df};
12use polars_core::{POOL, config};
13use polars_parquet::read::{
14 self, ColumnChunkMetadata, FileMetadata, Filter, PredicateFilter, RowGroupMetadata,
15};
16use rayon::prelude::*;
17
18#[cfg(feature = "cloud")]
19use super::async_impl::FetchRowGroupsFromObjectStore;
20use super::mmap::{ColumnStore, mmap_columns};
21use super::predicates::read_this_row_group;
22use super::utils::materialize_empty_df;
23use super::{ParallelStrategy, mmap};
24use crate::RowIndex;
25use crate::hive::{self, materialize_hive_partitions};
26use crate::mmap::{MmapBytesReader, ReaderBytes};
27use crate::parquet::metadata::FileMetadataRef;
28use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
29use crate::predicates::{ColumnPredicateExpr, ScanIOPredicate, apply_predicate};
30use crate::utils::get_reader_bytes;
31use crate::utils::slice::split_slice_at_file;
32
33#[cfg(debug_assertions)]
34fn assert_dtypes(dtype: &ArrowDataType) {
37 use ArrowDataType as D;
38
39 match dtype {
40 D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
42
43 D::Float16 => unreachable!(),
45
46 D::List(_) => unreachable!(),
48
49 D::Map(_, _) => unreachable!(),
51
52 D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
54 D::Extension(ext) => assert_dtypes(&ext.inner),
55 D::LargeList(inner) => assert_dtypes(&inner.dtype),
56 D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
57 D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
58
59 _ => {},
60 }
61}
62
63fn should_copy_sortedness(dtype: &DataType) -> bool {
64 use DataType as D;
66
67 matches!(
68 dtype,
69 D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
70 )
71}
72
73pub fn try_set_sorted_flag(
74 series: &mut Series,
75 col_idx: usize,
76 sorting_map: &PlHashMap<usize, IsSorted>,
77) {
78 if let Some(is_sorted) = sorting_map.get(&col_idx) {
79 if should_copy_sortedness(series.dtype()) {
80 if config::verbose() {
81 eprintln!(
82 "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
83 series.name()
84 );
85 }
86
87 series.set_sorted_flag(*is_sorted);
88 }
89 }
90}
91
92pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
93 let capacity = md.sorting_columns().map_or(0, |s| s.len());
94 let mut sorting_map = PlHashMap::with_capacity(capacity);
95
96 if let Some(sorting_columns) = md.sorting_columns() {
97 for sorting in sorting_columns {
98 let prev_value = sorting_map.insert(
99 sorting.column_idx as usize,
100 if sorting.descending {
101 IsSorted::Descending
102 } else {
103 IsSorted::Ascending
104 },
105 );
106
107 debug_assert!(prev_value.is_none());
108 }
109 }
110
111 sorting_map
112}
113
114fn column_idx_to_series(
115 column_i: usize,
116 field_md: &[&ColumnChunkMetadata],
118 filter: Option<Filter>,
119 file_schema: &ArrowSchema,
120 store: &mmap::ColumnStore,
121) -> PolarsResult<(Series, Bitmap)> {
122 let field = file_schema.get_at_index(column_i).unwrap().1;
123
124 #[cfg(debug_assertions)]
125 {
126 assert_dtypes(field.dtype())
127 }
128 let columns = mmap_columns(store, field_md);
129 let (array, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
130 let series = Series::try_from((field, array))?;
131
132 Ok((series, pred_true_mask))
133}
134
135#[allow(clippy::too_many_arguments)]
136fn rg_to_dfs(
137 store: &mmap::ColumnStore,
138 previous_row_count: &mut IdxSize,
139 row_group_start: usize,
140 row_group_end: usize,
141 pre_slice: (usize, usize),
142 file_metadata: &FileMetadata,
143 schema: &ArrowSchemaRef,
144 predicate: Option<&ScanIOPredicate>,
145 row_index: Option<RowIndex>,
146 parallel: ParallelStrategy,
147 projection: &[usize],
148 use_statistics: bool,
149 hive_partition_columns: Option<&[Series]>,
150) -> PolarsResult<Vec<DataFrame>> {
151 if config::verbose() {
152 eprintln!("parquet scan with parallel = {parallel:?}");
153 }
154
155 if projection.is_empty() {
157 if let Some(row_index) = row_index {
158 let placeholder =
159 NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
160 return Ok(vec![
161 DataFrame::new(vec![placeholder.into_series().into_column()])?
162 .with_row_index(
163 row_index.name.clone(),
164 Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
165 )?
166 .select(std::iter::once(row_index.name))?,
167 ]);
168 }
169 }
170
171 use ParallelStrategy as S;
172
173 if parallel == S::Prefiltered && pre_slice == (0, usize::MAX) {
174 if let Some(predicate) = predicate {
175 if !predicate.live_columns.is_empty() {
176 return rg_to_dfs_prefiltered(
177 store,
178 previous_row_count,
179 row_group_start,
180 row_group_end,
181 file_metadata,
182 schema,
183 predicate,
184 row_index,
185 projection,
186 use_statistics,
187 hive_partition_columns,
188 );
189 }
190 }
191 }
192
193 match parallel {
194 S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
195 store,
196 previous_row_count,
197 row_group_start,
198 row_group_end,
199 pre_slice,
200 file_metadata,
201 schema,
202 predicate,
203 row_index,
204 parallel,
205 projection,
206 use_statistics,
207 hive_partition_columns,
208 ),
209 _ => rg_to_dfs_par_over_rg(
210 store,
211 row_group_start,
212 row_group_end,
213 previous_row_count,
214 pre_slice,
215 file_metadata,
216 schema,
217 predicate,
218 row_index,
219 projection,
220 use_statistics,
221 hive_partition_columns,
222 ),
223 }
224}
225
226#[allow(clippy::too_many_arguments)]
239fn rg_to_dfs_prefiltered(
240 store: &mmap::ColumnStore,
241 previous_row_count: &mut IdxSize,
242 row_group_start: usize,
243 row_group_end: usize,
244 file_metadata: &FileMetadata,
245 schema: &ArrowSchemaRef,
246 predicate: &ScanIOPredicate,
247 row_index: Option<RowIndex>,
248 projection: &[usize],
249 use_statistics: bool,
250 hive_partition_columns: Option<&[Series]>,
251) -> PolarsResult<Vec<DataFrame>> {
252 if row_group_end > u32::MAX as usize {
253 polars_bail!(ComputeError: "Parquet file contains too many row groups (> {})", u32::MAX);
254 }
255
256 let mut row_offset = *previous_row_count;
257 let rg_offsets: Vec<IdxSize> = match row_index {
258 None => Vec::new(),
259 Some(_) => (row_group_start..row_group_end)
260 .map(|index| {
261 let md = &file_metadata.row_groups[index];
262
263 let current_offset = row_offset;
264 let current_row_count = md.num_rows() as IdxSize;
265 row_offset += current_row_count;
266
267 current_offset
268 })
269 .collect(),
270 };
271
272 let num_live_columns = predicate.live_columns.len();
274 let num_dead_columns =
275 projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - num_live_columns;
276
277 if config::verbose() {
278 eprintln!("parquet live columns = {num_live_columns}, dead columns = {num_dead_columns}");
279 }
280
281 let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns);
285 let mut dead_idx_to_col_idx: Vec<usize> = Vec::with_capacity(num_dead_columns);
286 for &i in projection.iter() {
287 let name = schema.get_at_index(i).unwrap().0.as_str();
288
289 if predicate.live_columns.contains(name) {
290 live_idx_to_col_idx.push(i);
291 } else {
292 dead_idx_to_col_idx.push(i);
293 }
294 }
295
296 let do_parquet_expr = std::env::var("POLARS_PARQUET_EXPR").as_deref() == Ok("1")
297 && predicate.live_columns.len() == 1 && hive_partition_columns.is_none_or(|hc| {
299 !hc.iter()
300 .any(|c| c.name().as_str() == predicate.live_columns[0].as_str())
301 }) && !schema
303 .get(predicate.live_columns[0].as_str())
304 .unwrap()
305 .dtype()
306 .is_nested(); let column_exprs = do_parquet_expr.then(|| {
308 predicate
309 .live_columns
310 .iter()
311 .map(|name| {
312 let (p, specialized) = predicate.column_predicates.predicates.get(name)?;
313
314 let p = ColumnPredicateExpr::new(
315 name.clone(),
316 DataType::from_arrow_field(schema.get(name).unwrap()),
317 p.clone(),
318 specialized.clone(),
319 );
320
321 let eq_scalar = p.to_eq_scalar().cloned();
322 let predicate = Arc::new(p) as _;
323
324 Some((
325 PredicateFilter {
326 predicate,
327 include_values: eq_scalar.is_none(),
328 },
329 eq_scalar,
330 ))
331 })
332 .collect::<Vec<_>>()
333 });
334
335 let mask_setting = PrefilterMaskSetting::init_from_env();
336 let projected_schema = schema.try_project_indices(projection).unwrap();
337
338 let dfs: Vec<Option<DataFrame>> = POOL.install(move || {
339 (row_group_start..row_group_end)
343 .into_par_iter()
344 .map(|rg_idx| {
345 let md = &file_metadata.row_groups[rg_idx];
346
347 if use_statistics {
348 match read_this_row_group(Some(predicate), md, schema) {
349 Ok(false) => return Ok(None),
350 Ok(true) => {},
351 Err(e) => return Err(e),
352 }
353 }
354
355 let sorting_map = create_sorting_map(md);
356
357 let (live_columns, filters) = (0..live_idx_to_col_idx.len())
359 .into_par_iter()
360 .map(|i| {
361 let col_idx = live_idx_to_col_idx[i];
362
363 let (name, field) = schema.get_at_index(col_idx).unwrap();
364
365 let Some(iter) = md.columns_under_root_iter(name) else {
366 return Ok((
367 Column::full_null(
368 name.clone(),
369 md.num_rows(),
370 &DataType::from_arrow_field(field),
371 ),
372 None,
373 ));
374 };
375
376 let part = iter.collect::<Vec<_>>();
377
378 let (filter, equals_scalar) = match column_exprs.as_ref() {
379 None => (None, None),
380 Some(column_expr) => match column_expr.get(i) {
381 Some(Some((p, s))) => {
382 (Some(Filter::Predicate(p.clone())), s.clone())
383 },
384 _ => (None, None),
385 },
386 };
387
388 let (mut series, pred_true_mask) =
389 column_idx_to_series(col_idx, part.as_slice(), filter, schema, store)?;
390
391 debug_assert!(
392 pred_true_mask.is_empty() || pred_true_mask.len() == md.num_rows()
393 );
394 match equals_scalar {
395 None => {
396 try_set_sorted_flag(&mut series, col_idx, &sorting_map);
397 Ok((
398 series.into_column(),
399 (!pred_true_mask.is_empty()).then_some(pred_true_mask),
400 ))
401 },
402 Some(sc) => Ok((
403 Column::new_scalar(name.clone(), sc, pred_true_mask.set_bits()),
404 Some(pred_true_mask),
405 )),
406 }
407 })
408 .collect::<PolarsResult<(Vec<_>, Vec<_>)>>()?;
409
410 let md = &file_metadata.row_groups[rg_idx];
412 let filter_mask: Bitmap;
413 let mut df: DataFrame;
414
415 if let Some(Some(f)) = filters.first() {
416 if f.set_bits() == 0 {
417 if config::verbose() {
418 eprintln!("parquet filter mask found that row group can be skipped");
419 }
420
421 return Ok(None);
422 }
423
424 if let Some(rc) = &row_index {
425 df = unsafe { DataFrame::new_no_checks(md.num_rows(), vec![]) };
426 unsafe {
427 df.with_row_index_mut(
428 rc.name.clone(),
429 Some(rg_offsets[rg_idx] + rc.offset),
430 )
431 };
432 df = df.filter(&BooleanChunked::from_chunk_iter(
433 PlSmallStr::EMPTY,
434 [BooleanArray::new(ArrowDataType::Boolean, f.clone(), None)],
435 ))?;
436 unsafe { df.column_extend_unchecked(live_columns) }
437 } else {
438 df = DataFrame::new(live_columns).unwrap();
439 }
440
441 filter_mask = f.clone();
442 } else {
443 df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) };
444
445 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
446 let s = predicate.predicate.evaluate_io(&df)?;
447 let mask = s.bool().expect("filter predicates was not of type boolean");
448
449 df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };
452
453 if let Some(rc) = &row_index {
454 unsafe {
455 df.with_row_index_mut(
456 rc.name.clone(),
457 Some(rg_offsets[rg_idx] + rc.offset),
458 )
459 };
460 }
461 df = df.filter(mask)?;
462
463 let mut mut_filter_mask = BitmapBuilder::with_capacity(mask.len());
464
465 for chunk in mask.downcast_iter() {
467 match chunk.validity() {
468 None => mut_filter_mask.extend_from_bitmap(chunk.values()),
469 Some(validity) => {
470 mut_filter_mask.extend_from_bitmap(&(validity & chunk.values()))
471 },
472 }
473 }
474
475 filter_mask = mut_filter_mask.freeze();
476 }
477
478 debug_assert_eq!(md.num_rows(), filter_mask.len());
479 debug_assert_eq!(df.height(), filter_mask.set_bits());
480
481 if filter_mask.set_bits() == 0 {
482 if config::verbose() {
483 eprintln!("parquet filter mask found that row group can be skipped");
484 }
485
486 return Ok(None);
487 }
488
489 if dead_idx_to_col_idx.is_empty() {
491 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
492
493 return Ok(Some(df));
494 }
495
496 let prefilter_cost = matches!(mask_setting, PrefilterMaskSetting::Auto)
497 .then(|| calc_prefilter_cost(&filter_mask))
498 .unwrap_or_default();
499
500 let n_rows_in_result = filter_mask.set_bits();
507
508 let dead_columns = (0..dead_idx_to_col_idx.len())
509 .into_par_iter()
510 .map(|i| {
511 let col_idx = dead_idx_to_col_idx[i];
512
513 let (name, field) = schema.get_at_index(col_idx).unwrap();
514
515 let Some(iter) = md.columns_under_root_iter(name) else {
516 return Ok(Column::full_null(
517 name.clone(),
518 n_rows_in_result,
519 &DataType::from_arrow_field(field),
520 ));
521 };
522
523 let field_md = iter.collect::<Vec<_>>();
524
525 let pre = || {
526 let (array, _) = column_idx_to_series(
527 col_idx,
528 field_md.as_slice(),
529 Some(Filter::new_masked(filter_mask.clone())),
530 schema,
531 store,
532 )?;
533
534 PolarsResult::Ok(array)
535 };
536 let post = || {
537 let (array, _) = column_idx_to_series(
538 col_idx,
539 field_md.as_slice(),
540 None,
541 schema,
542 store,
543 )?;
544
545 debug_assert_eq!(array.len(), md.num_rows());
546
547 let mask_arr = BooleanArray::new(
548 ArrowDataType::Boolean,
549 filter_mask.clone(),
550 None,
551 );
552 let mask_arr = BooleanChunked::from(mask_arr);
553 array.filter(&mask_arr)
554 };
555
556 let mut series = if mask_setting.should_prefilter(
557 prefilter_cost,
558 &schema.get_at_index(col_idx).unwrap().1.dtype,
559 ) {
560 pre()?
561 } else {
562 post()?
563 };
564
565 debug_assert_eq!(series.len(), filter_mask.set_bits());
566
567 try_set_sorted_flag(&mut series, col_idx, &sorting_map);
568
569 Ok(series.into_column())
570 })
571 .collect::<PolarsResult<Vec<Column>>>()?;
572
573 debug_assert!(dead_columns.iter().all(|v| v.len() == df.height()));
574
575 let height = df.height();
576 let live_columns = df.take_columns();
577
578 assert_eq!(live_columns.len() + dead_columns.len(), projection.len());
579
580 let mut merged = Vec::with_capacity(live_columns.len() + dead_columns.len());
581
582 if row_index.is_some() {
589 merged.push(live_columns[0].clone());
590 };
591
592 hive::merge_sorted_to_schema_order(
593 &mut dead_columns.into_iter(), &mut live_columns.into_iter().skip(row_index.is_some() as usize), &projected_schema,
596 &mut merged,
597 );
598
599 let mut df = unsafe { DataFrame::new_no_checks(height, merged) };
602
603 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
604
605 PolarsResult::Ok(Some(df))
606 })
607 .collect::<PolarsResult<Vec<Option<DataFrame>>>>()
608 })?;
609
610 let dfs: Vec<DataFrame> = dfs.into_iter().flatten().collect();
611
612 let row_count: usize = dfs.iter().map(|df| df.height()).sum();
613 let row_count = IdxSize::try_from(row_count).map_err(|_| ROW_COUNT_OVERFLOW_ERR)?;
614 *previous_row_count = previous_row_count
615 .checked_add(row_count)
616 .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
617
618 Ok(dfs)
619}
620
621#[allow(clippy::too_many_arguments)]
622fn rg_to_dfs_optionally_par_over_columns(
624 store: &mmap::ColumnStore,
625 previous_row_count: &mut IdxSize,
626 row_group_start: usize,
627 row_group_end: usize,
628 slice: (usize, usize),
629 file_metadata: &FileMetadata,
630 schema: &ArrowSchemaRef,
631 predicate: Option<&ScanIOPredicate>,
632 row_index: Option<RowIndex>,
633 parallel: ParallelStrategy,
634 projection: &[usize],
635 use_statistics: bool,
636 hive_partition_columns: Option<&[Series]>,
637) -> PolarsResult<Vec<DataFrame>> {
638 let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
639
640 let mut n_rows_processed: usize = (0..row_group_start)
641 .map(|i| file_metadata.row_groups[i].num_rows())
642 .sum();
643 let slice_end = slice.0 + slice.1;
644
645 for rg_idx in row_group_start..row_group_end {
646 let md = &file_metadata.row_groups[rg_idx];
647
648 let rg_slice =
649 split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
650 let current_row_count = md.num_rows() as IdxSize;
651
652 if use_statistics
653 && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
654 {
655 *previous_row_count += rg_slice.1 as IdxSize;
656 continue;
657 }
658
659 let sorting_map = create_sorting_map(md);
660
661 let f = |column_i: &usize| {
662 let (name, field) = schema.get_at_index(*column_i).unwrap();
663
664 let Some(iter) = md.columns_under_root_iter(name) else {
665 return Ok(Column::full_null(
666 name.clone(),
667 rg_slice.1,
668 &DataType::from_arrow_field(field),
669 ));
670 };
671
672 let part = iter.collect::<Vec<_>>();
673
674 let (mut series, _) = column_idx_to_series(
675 *column_i,
676 part.as_slice(),
677 Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
678 schema,
679 store,
680 )?;
681
682 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
683 Ok(series.into_column())
684 };
685
686 let columns = if let ParallelStrategy::Columns = parallel {
687 POOL.install(|| {
688 projection
689 .par_iter()
690 .map(f)
691 .collect::<PolarsResult<Vec<_>>>()
692 })?
693 } else {
694 projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
695 };
696
697 let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
698 if let Some(rc) = &row_index {
699 unsafe {
700 df.with_row_index_mut(
701 rc.name.clone(),
702 Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
703 )
704 };
705 }
706
707 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
708 apply_predicate(
709 &mut df,
710 predicate.as_ref().map(|p| p.predicate.as_ref()),
711 true,
712 )?;
713
714 *previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
715 polars_err!(
716 ComputeError: "Parquet file produces more than pow(2, 32) rows; \
717 consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
718 or set 'streaming'"
719 ),
720 )?;
721 dfs.push(df);
722
723 if *previous_row_count as usize >= slice_end {
724 break;
725 }
726 }
727
728 Ok(dfs)
729}
730
731#[allow(clippy::too_many_arguments)]
732fn rg_to_dfs_par_over_rg(
734 store: &mmap::ColumnStore,
735 row_group_start: usize,
736 row_group_end: usize,
737 rows_read: &mut IdxSize,
738 slice: (usize, usize),
739 file_metadata: &FileMetadata,
740 schema: &ArrowSchemaRef,
741 predicate: Option<&ScanIOPredicate>,
742 row_index: Option<RowIndex>,
743 projection: &[usize],
744 use_statistics: bool,
745 hive_partition_columns: Option<&[Series]>,
746) -> PolarsResult<Vec<DataFrame>> {
747 let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
749
750 let mut n_rows_processed: usize = (0..row_group_start)
751 .map(|i| file_metadata.row_groups[i].num_rows())
752 .sum();
753 let slice_end = slice.0 + slice.1;
754
755 let mut rows_scanned: IdxSize;
759
760 if row_group_start > 0 {
761 rows_scanned = (0..row_group_start)
765 .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
766 .sum();
767 } else {
768 rows_scanned = 0;
769 }
770
771 for i in row_group_start..row_group_end {
772 let row_count_start = rows_scanned;
773 let rg_md = &file_metadata.row_groups[i];
774 let n_rows_this_file = rg_md.num_rows();
775 let rg_slice =
776 split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
777 rows_scanned = rows_scanned
778 .checked_add(n_rows_this_file as IdxSize)
779 .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
780
781 *rows_read += rg_slice.1 as IdxSize;
782
783 if rg_slice.1 == 0 {
784 continue;
785 }
786
787 row_groups.push((rg_md, rg_slice, row_count_start));
788 }
789
790 let dfs = POOL.install(|| {
791 row_groups
794 .into_par_iter()
795 .map(|(md, slice, row_count_start)| {
796 if slice.1 == 0 || use_statistics && !read_this_row_group(predicate, md, schema)? {
797 return Ok(None);
798 }
799 #[cfg(debug_assertions)]
801 {
802 assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
803 }
804
805 let sorting_map = create_sorting_map(md);
806
807 let columns = projection
808 .iter()
809 .map(|column_i| {
810 let (name, field) = schema.get_at_index(*column_i).unwrap();
811
812 let Some(iter) = md.columns_under_root_iter(name) else {
813 return Ok(Column::full_null(
814 name.clone(),
815 md.num_rows(),
816 &DataType::from_arrow_field(field),
817 ));
818 };
819
820 let part = iter.collect::<Vec<_>>();
821
822 let (mut series, _) = column_idx_to_series(
823 *column_i,
824 part.as_slice(),
825 Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
826 schema,
827 store,
828 )?;
829
830 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
831 Ok(series.into_column())
832 })
833 .collect::<PolarsResult<Vec<_>>>()?;
834
835 let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
836
837 if let Some(rc) = &row_index {
838 unsafe {
839 df.with_row_index_mut(
840 rc.name.clone(),
841 Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
842 )
843 };
844 }
845
846 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
847 apply_predicate(
848 &mut df,
849 predicate.as_ref().map(|p| p.predicate.as_ref()),
850 false,
851 )?;
852
853 Ok(Some(df))
854 })
855 .collect::<PolarsResult<Vec<_>>>()
856 })?;
857 Ok(dfs.into_iter().flatten().collect())
858}
859
860#[allow(clippy::too_many_arguments)]
861pub fn read_parquet<R: MmapBytesReader>(
862 mut reader: R,
863 pre_slice: (usize, usize),
864 projection: Option<&[usize]>,
865 reader_schema: &ArrowSchemaRef,
866 metadata: Option<FileMetadataRef>,
867 predicate: Option<&ScanIOPredicate>,
868 mut parallel: ParallelStrategy,
869 row_index: Option<RowIndex>,
870 use_statistics: bool,
871 hive_partition_columns: Option<&[Series]>,
872) -> PolarsResult<DataFrame> {
873 if pre_slice.1 == 0 {
875 return Ok(materialize_empty_df(
876 projection,
877 reader_schema,
878 hive_partition_columns,
879 row_index.as_ref(),
880 ));
881 }
882
883 let file_metadata = metadata
884 .map(Ok)
885 .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
886 let n_row_groups = file_metadata.row_groups.len();
887
888 let _sc = if n_row_groups > 1 {
892 #[cfg(feature = "dtype-categorical")]
893 {
894 Some(polars_core::StringCacheHolder::hold())
895 }
896 #[cfg(not(feature = "dtype-categorical"))]
897 {
898 Some(0u8)
899 }
900 } else {
901 None
902 };
903
904 let materialized_projection = projection
905 .map(Cow::Borrowed)
906 .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
907
908 if let Some(predicate) = predicate {
909 let prefilter_env = std::env::var("POLARS_PARQUET_PREFILTER");
910 let prefilter_env = prefilter_env.as_deref();
911
912 let num_live_variables = predicate.live_columns.len();
913 let mut do_prefilter = false;
914
915 do_prefilter |= prefilter_env == Ok("1"); do_prefilter |= matches!(parallel, ParallelStrategy::Auto)
917 && num_live_variables * n_row_groups >= POOL.current_num_threads()
918 && materialized_projection.len() >= num_live_variables;
919
920 do_prefilter &= prefilter_env != Ok("0"); if do_prefilter {
923 parallel = ParallelStrategy::Prefiltered;
924 }
925 }
926 if ParallelStrategy::Auto == parallel {
927 if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
928 {
929 parallel = ParallelStrategy::RowGroups;
930 } else {
931 parallel = ParallelStrategy::Columns;
932 }
933 }
934
935 if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
936 parallel = ParallelStrategy::None;
937 }
938
939 let reader = ReaderBytes::from(&mut reader);
940 let store = mmap::ColumnStore::Local(unsafe {
941 std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
942 });
943
944 let dfs = rg_to_dfs(
945 &store,
946 &mut 0,
947 0,
948 n_row_groups,
949 pre_slice,
950 &file_metadata,
951 reader_schema,
952 predicate,
953 row_index.clone(),
954 parallel,
955 &materialized_projection,
956 use_statistics,
957 hive_partition_columns,
958 )?;
959
960 if dfs.is_empty() {
961 Ok(materialize_empty_df(
962 projection,
963 reader_schema,
964 hive_partition_columns,
965 row_index.as_ref(),
966 ))
967 } else {
968 accumulate_dataframes_vertical(dfs)
969 }
970}
971
972pub struct FetchRowGroupsFromMmapReader(ReaderBytes<'static>);
973
974impl FetchRowGroupsFromMmapReader {
975 pub fn new(mut reader: Box<dyn MmapBytesReader>) -> PolarsResult<Self> {
976 assert!(reader.to_file().is_some());
979 let reader_ptr = unsafe {
980 std::mem::transmute::<&mut dyn MmapBytesReader, &'static mut dyn MmapBytesReader>(
981 reader.as_mut(),
982 )
983 };
984 let reader_bytes = get_reader_bytes(reader_ptr)?;
985 Ok(FetchRowGroupsFromMmapReader(reader_bytes))
986 }
987
988 fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
989 Ok(mmap::ColumnStore::Local(self.0.to_memslice()))
991 }
992}
993
994pub enum RowGroupFetcher {
997 #[cfg(feature = "cloud")]
998 ObjectStore(FetchRowGroupsFromObjectStore),
999 Local(FetchRowGroupsFromMmapReader),
1000}
1001
1002#[cfg(feature = "cloud")]
1003impl From<FetchRowGroupsFromObjectStore> for RowGroupFetcher {
1004 fn from(value: FetchRowGroupsFromObjectStore) -> Self {
1005 RowGroupFetcher::ObjectStore(value)
1006 }
1007}
1008
1009impl From<FetchRowGroupsFromMmapReader> for RowGroupFetcher {
1010 fn from(value: FetchRowGroupsFromMmapReader) -> Self {
1011 RowGroupFetcher::Local(value)
1012 }
1013}
1014
1015impl RowGroupFetcher {
1016 async fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
1017 match self {
1018 RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups),
1019 #[cfg(feature = "cloud")]
1020 RowGroupFetcher::ObjectStore(f) => f.fetch_row_groups(_row_groups).await,
1021 }
1022 }
1023}
1024
1025pub(super) fn compute_row_group_range(
1026 row_group_start: usize,
1027 row_group_end: usize,
1028 slice: (usize, usize),
1029 row_groups: &[RowGroupMetadata],
1030) -> std::ops::Range<usize> {
1031 let mut start = row_group_start;
1032 let mut cum_rows: usize = (0..row_group_start).map(|i| row_groups[i].num_rows()).sum();
1033 let row_group_end = row_groups.len().min(row_group_end);
1034
1035 loop {
1036 if start == row_group_end {
1037 break;
1038 }
1039
1040 cum_rows += row_groups[start].num_rows();
1041
1042 if cum_rows >= slice.0 {
1043 break;
1044 }
1045
1046 start += 1;
1047 }
1048
1049 let slice_end = slice.0 + slice.1;
1050 let mut end = (1 + start).min(row_group_end);
1051
1052 loop {
1053 if end == row_group_end {
1054 break;
1055 }
1056
1057 if cum_rows >= slice_end {
1058 break;
1059 }
1060
1061 cum_rows += row_groups[end].num_rows();
1062 end += 1;
1063 }
1064
1065 start..end
1066}
1067
1068pub struct BatchedParquetReader {
1069 #[allow(dead_code)]
1071 row_group_fetcher: RowGroupFetcher,
1072 slice: (usize, usize),
1073 projection: Arc<[usize]>,
1074 schema: ArrowSchemaRef,
1075 metadata: FileMetadataRef,
1076 predicate: Option<ScanIOPredicate>,
1077 row_index: Option<RowIndex>,
1078 rows_read: IdxSize,
1079 row_group_offset: usize,
1080 n_row_groups: usize,
1081 chunks_fifo: VecDeque<DataFrame>,
1082 parallel: ParallelStrategy,
1083 chunk_size: usize,
1084 use_statistics: bool,
1085 hive_partition_columns: Option<Arc<[Series]>>,
1086 include_file_path: Option<Column>,
1087 has_returned: bool,
1089}
1090
1091impl BatchedParquetReader {
1092 #[allow(clippy::too_many_arguments)]
1093 pub fn new(
1094 row_group_fetcher: RowGroupFetcher,
1095 metadata: FileMetadataRef,
1096 schema: ArrowSchemaRef,
1097 slice: (usize, usize),
1098 projection: Option<Vec<usize>>,
1099 predicate: Option<ScanIOPredicate>,
1100 row_index: Option<RowIndex>,
1101 chunk_size: usize,
1102 use_statistics: bool,
1103 hive_partition_columns: Option<Vec<Series>>,
1104 include_file_path: Option<(PlSmallStr, Arc<str>)>,
1105 mut parallel: ParallelStrategy,
1106 ) -> PolarsResult<Self> {
1107 let n_row_groups = metadata.row_groups.len();
1108 let projection = projection
1109 .map(Arc::from)
1110 .unwrap_or_else(|| (0usize..schema.len()).collect::<Arc<[_]>>());
1111
1112 parallel = match parallel {
1113 ParallelStrategy::Auto => {
1114 if n_row_groups > projection.len() || n_row_groups > POOL.current_num_threads() {
1115 ParallelStrategy::RowGroups
1116 } else {
1117 ParallelStrategy::Columns
1118 }
1119 },
1120 _ => parallel,
1121 };
1122
1123 if let (ParallelStrategy::Columns, true) = (parallel, projection.len() == 1) {
1124 parallel = ParallelStrategy::None;
1125 }
1126
1127 Ok(BatchedParquetReader {
1128 row_group_fetcher,
1129 slice,
1130 projection,
1131 schema,
1132 metadata,
1133 row_index,
1134 rows_read: 0,
1135 predicate,
1136 row_group_offset: 0,
1137 n_row_groups,
1138 chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()),
1139 parallel,
1140 chunk_size,
1141 use_statistics,
1142 hive_partition_columns: hive_partition_columns.map(Arc::from),
1143 include_file_path: include_file_path.map(|(col, path)| {
1144 Column::new_scalar(
1145 col,
1146 Scalar::new(
1147 DataType::String,
1148 AnyValue::StringOwned(path.as_ref().into()),
1149 ),
1150 1,
1151 )
1152 }),
1153 has_returned: false,
1154 })
1155 }
1156
1157 pub fn schema(&self) -> &ArrowSchemaRef {
1158 &self.schema
1159 }
1160
1161 pub fn is_finished(&self) -> bool {
1162 self.row_group_offset >= self.n_row_groups
1163 }
1164
1165 pub fn finishes_this_batch(&self, n: usize) -> bool {
1166 self.row_group_offset + n > self.n_row_groups
1167 }
1168
1169 #[cfg(feature = "async")]
1170 pub async fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
1171 if self.rows_read as usize == self.slice.0 + self.slice.1 && self.has_returned {
1172 return if self.chunks_fifo.is_empty() {
1173 Ok(None)
1174 } else {
1175 let n_drainable = std::cmp::min(n, self.chunks_fifo.len());
1177 Ok(Some(self.chunks_fifo.drain(..n_drainable).collect()))
1178 };
1179 }
1180
1181 let mut skipped_all_rgs = false;
1182 if (self.rows_read as usize) < self.slice.0 + self.slice.1
1184 && self.row_group_offset < self.n_row_groups
1185 && self.chunks_fifo.len() < n
1186 {
1187 let row_group_range = compute_row_group_range(
1189 self.row_group_offset,
1190 self.row_group_offset + n,
1191 self.slice,
1192 &self.metadata.row_groups,
1193 );
1194
1195 let store = self
1196 .row_group_fetcher
1197 .fetch_row_groups(row_group_range.clone())
1198 .await?;
1199
1200 let prev_rows_read = self.rows_read;
1201
1202 let mut dfs = {
1203 let mut rows_read = self.rows_read;
1208 let row_index = self.row_index.clone();
1209 let predicate = self.predicate.clone();
1210 let schema = self.schema.clone();
1211 let metadata = self.metadata.clone();
1212 let parallel = self.parallel;
1213 let projection = self.projection.clone();
1214 let use_statistics = self.use_statistics;
1215 let hive_partition_columns = self.hive_partition_columns.clone();
1216 let slice = self.slice;
1217
1218 let func = move || {
1219 let dfs = rg_to_dfs(
1220 &store,
1221 &mut rows_read,
1222 row_group_range.start,
1223 row_group_range.end,
1224 slice,
1225 &metadata,
1226 &schema,
1227 predicate.as_ref(),
1228 row_index,
1229 parallel,
1230 &projection,
1231 use_statistics,
1232 hive_partition_columns.as_deref(),
1233 );
1234
1235 dfs.map(|x| (x, rows_read))
1236 };
1237
1238 let (dfs, rows_read) = crate::pl_async::get_runtime().spawn_rayon(func).await?;
1239
1240 self.rows_read = rows_read;
1241 dfs
1242 };
1243
1244 if let Some(column) = self.include_file_path.as_ref() {
1245 if dfs.first().is_some_and(|x| x.width() > 0) {
1246 for df in &mut dfs {
1247 unsafe { df.with_column_unchecked(column.new_from_index(0, df.height())) };
1248 }
1249 } else {
1250 let (offset, len) = self.slice;
1251 let end = offset + len;
1252
1253 debug_assert_eq!(dfs.len(), 1);
1254 dfs.get_mut(0).unwrap().insert_column(
1255 0,
1256 column.new_from_index(
1257 0,
1258 (self.rows_read.min(end.try_into().unwrap_or(IdxSize::MAX))
1259 - prev_rows_read)
1260 .try_into()
1261 .unwrap(),
1262 ),
1263 )?;
1264 }
1265 }
1266
1267 self.row_group_offset += n;
1268
1269 if self.rows_read == 0 && dfs.is_empty() {
1272 let mut df = materialize_empty_df(
1273 Some(self.projection.as_ref()),
1274 &self.schema,
1275 self.hive_partition_columns.as_deref(),
1276 self.row_index.as_ref(),
1277 );
1278
1279 if let Some(ca) = &self.include_file_path {
1280 unsafe {
1281 df.with_column_unchecked(ca.clear().into_column());
1282 }
1283 };
1284
1285 return Ok(Some(vec![df]));
1286 }
1287
1288 skipped_all_rgs |= dfs.is_empty();
1292 for mut df in dfs {
1293 let n = df.height() / self.chunk_size;
1295 if n > 1 {
1296 for df in split_df(&mut df, n, false) {
1297 self.chunks_fifo.push_back(df)
1298 }
1299 } else {
1300 self.chunks_fifo.push_back(df)
1301 }
1302 }
1303 } else {
1304 skipped_all_rgs = !self.has_returned;
1305 };
1306
1307 if self.chunks_fifo.is_empty() {
1308 if skipped_all_rgs {
1309 self.has_returned = true;
1310 let mut df = materialize_empty_df(
1311 Some(self.projection.as_ref()),
1312 &self.schema,
1313 self.hive_partition_columns.as_deref(),
1314 self.row_index.as_ref(),
1315 );
1316
1317 if let Some(ca) = &self.include_file_path {
1318 unsafe {
1319 df.with_column_unchecked(ca.clear().into_column());
1320 }
1321 };
1322
1323 Ok(Some(vec![df]))
1324 } else {
1325 Ok(None)
1326 }
1327 } else {
1328 let mut chunks = Vec::with_capacity(n);
1329 let mut i = 0;
1330 while let Some(df) = self.chunks_fifo.pop_front() {
1331 chunks.push(df);
1332 i += 1;
1333 if i == n {
1334 break;
1335 }
1336 }
1337
1338 self.has_returned = true;
1339 Ok(Some(chunks))
1340 }
1341 }
1342
1343 #[cfg(feature = "async")]
1345 pub fn iter(self, batches_per_iter: usize) -> BatchedParquetIter {
1346 BatchedParquetIter {
1347 batches_per_iter,
1348 inner: self,
1349 current_batch: vec![].into_iter(),
1350 }
1351 }
1352}
1353
1354#[cfg(feature = "async")]
1355pub struct BatchedParquetIter {
1356 batches_per_iter: usize,
1357 inner: BatchedParquetReader,
1358 current_batch: std::vec::IntoIter<DataFrame>,
1359}
1360
1361#[cfg(feature = "async")]
1362impl BatchedParquetIter {
1363 pub(crate) async fn next_(&mut self) -> Option<PolarsResult<DataFrame>> {
1365 match self.current_batch.next() {
1366 Some(df) => Some(Ok(df)),
1367 None => match self.inner.next_batches(self.batches_per_iter).await {
1368 Err(e) => Some(Err(e)),
1369 Ok(opt_batch) => {
1370 let batch = opt_batch?;
1371 self.current_batch = batch.into_iter();
1372 self.current_batch.next().map(Ok)
1373 },
1374 },
1375 }
1376 }
1377}
1378
1379pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
1380 let num_edges = mask.num_edges() as f64;
1381 let rg_len = mask.len() as f64;
1382
1383 (num_edges / rg_len).clamp(0.0, 1.0)
1400}
1401
1402pub enum PrefilterMaskSetting {
1403 Auto,
1404 Pre,
1405 Post,
1406}
1407
1408impl PrefilterMaskSetting {
1409 pub fn init_from_env() -> Self {
1410 std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
1411 "auto" => Self::Auto,
1412 "pre" => Self::Pre,
1413 "post" => Self::Post,
1414 _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
1415 })
1416 }
1417
1418 pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
1419 match self {
1420 Self::Auto => {
1421 let is_nested = dtype.is_nested();
1424
1425 !is_nested && prefilter_cost <= 0.01
1427 },
1428 Self::Pre => true,
1429 Self::Post => false,
1430 }
1431 }
1432}