polars_io/parquet/read/
read_impl.rs

1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::ops::Range;
4
5use arrow::array::BooleanArray;
6use arrow::bitmap::{Bitmap, BitmapBuilder};
7use arrow::datatypes::ArrowSchemaRef;
8use polars_core::chunked_array::builder::NullChunkedBuilder;
9use polars_core::prelude::*;
10use polars_core::series::IsSorted;
11use polars_core::utils::{accumulate_dataframes_vertical, split_df};
12use polars_core::{POOL, config};
13use polars_parquet::read::{
14    self, ColumnChunkMetadata, FileMetadata, Filter, PredicateFilter, RowGroupMetadata,
15};
16use rayon::prelude::*;
17
18#[cfg(feature = "cloud")]
19use super::async_impl::FetchRowGroupsFromObjectStore;
20use super::mmap::{ColumnStore, mmap_columns};
21use super::predicates::read_this_row_group;
22use super::utils::materialize_empty_df;
23use super::{ParallelStrategy, mmap};
24use crate::RowIndex;
25use crate::hive::{self, materialize_hive_partitions};
26use crate::mmap::{MmapBytesReader, ReaderBytes};
27use crate::parquet::metadata::FileMetadataRef;
28use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
29use crate::predicates::{ColumnPredicateExpr, ScanIOPredicate, apply_predicate};
30use crate::utils::get_reader_bytes;
31use crate::utils::slice::split_slice_at_file;
32
33#[cfg(debug_assertions)]
34// Ensure we get the proper polars types from schema inference
35// This saves unneeded casts.
36fn assert_dtypes(dtype: &ArrowDataType) {
37    use ArrowDataType as D;
38
39    match dtype {
40        // These should all be cast to the BinaryView / Utf8View variants
41        D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
42
43        // These should be cast to Float32
44        D::Float16 => unreachable!(),
45
46        // This should have been converted to a LargeList
47        D::List(_) => unreachable!(),
48
49        // This should have been converted to a LargeList(Struct(_))
50        D::Map(_, _) => unreachable!(),
51
52        // Recursive checks
53        D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
54        D::Extension(ext) => assert_dtypes(&ext.inner),
55        D::LargeList(inner) => assert_dtypes(&inner.dtype),
56        D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
57        D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
58
59        _ => {},
60    }
61}
62
63fn should_copy_sortedness(dtype: &DataType) -> bool {
64    // @NOTE: For now, we are a bit conservative with this.
65    use DataType as D;
66
67    matches!(
68        dtype,
69        D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
70    )
71}
72
73pub fn try_set_sorted_flag(
74    series: &mut Series,
75    col_idx: usize,
76    sorting_map: &PlHashMap<usize, IsSorted>,
77) {
78    if let Some(is_sorted) = sorting_map.get(&col_idx) {
79        if should_copy_sortedness(series.dtype()) {
80            if config::verbose() {
81                eprintln!(
82                    "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
83                    series.name()
84                );
85            }
86
87            series.set_sorted_flag(*is_sorted);
88        }
89    }
90}
91
92pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
93    let capacity = md.sorting_columns().map_or(0, |s| s.len());
94    let mut sorting_map = PlHashMap::with_capacity(capacity);
95
96    if let Some(sorting_columns) = md.sorting_columns() {
97        for sorting in sorting_columns {
98            let prev_value = sorting_map.insert(
99                sorting.column_idx as usize,
100                if sorting.descending {
101                    IsSorted::Descending
102                } else {
103                    IsSorted::Ascending
104                },
105            );
106
107            debug_assert!(prev_value.is_none());
108        }
109    }
110
111    sorting_map
112}
113
114fn column_idx_to_series(
115    column_i: usize,
116    // The metadata belonging to this column
117    field_md: &[&ColumnChunkMetadata],
118    filter: Option<Filter>,
119    file_schema: &ArrowSchema,
120    store: &mmap::ColumnStore,
121) -> PolarsResult<(Series, Bitmap)> {
122    let field = file_schema.get_at_index(column_i).unwrap().1;
123
124    #[cfg(debug_assertions)]
125    {
126        assert_dtypes(field.dtype())
127    }
128    let columns = mmap_columns(store, field_md);
129    let (array, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
130    let series = Series::try_from((field, array))?;
131
132    Ok((series, pred_true_mask))
133}
134
135#[allow(clippy::too_many_arguments)]
136fn rg_to_dfs(
137    store: &mmap::ColumnStore,
138    previous_row_count: &mut IdxSize,
139    row_group_start: usize,
140    row_group_end: usize,
141    pre_slice: (usize, usize),
142    file_metadata: &FileMetadata,
143    schema: &ArrowSchemaRef,
144    predicate: Option<&ScanIOPredicate>,
145    row_index: Option<RowIndex>,
146    parallel: ParallelStrategy,
147    projection: &[usize],
148    use_statistics: bool,
149    hive_partition_columns: Option<&[Series]>,
150) -> PolarsResult<Vec<DataFrame>> {
151    if config::verbose() {
152        eprintln!("parquet scan with parallel = {parallel:?}");
153    }
154
155    // If we are only interested in the row_index, we take a little special path here.
156    if projection.is_empty() {
157        if let Some(row_index) = row_index {
158            let placeholder =
159                NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
160            return Ok(vec![
161                DataFrame::new(vec![placeholder.into_series().into_column()])?
162                    .with_row_index(
163                        row_index.name.clone(),
164                        Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
165                    )?
166                    .select(std::iter::once(row_index.name))?,
167            ]);
168        }
169    }
170
171    use ParallelStrategy as S;
172
173    if parallel == S::Prefiltered && pre_slice == (0, usize::MAX) {
174        if let Some(predicate) = predicate {
175            if !predicate.live_columns.is_empty() {
176                return rg_to_dfs_prefiltered(
177                    store,
178                    previous_row_count,
179                    row_group_start,
180                    row_group_end,
181                    file_metadata,
182                    schema,
183                    predicate,
184                    row_index,
185                    projection,
186                    use_statistics,
187                    hive_partition_columns,
188                );
189            }
190        }
191    }
192
193    match parallel {
194        S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
195            store,
196            previous_row_count,
197            row_group_start,
198            row_group_end,
199            pre_slice,
200            file_metadata,
201            schema,
202            predicate,
203            row_index,
204            parallel,
205            projection,
206            use_statistics,
207            hive_partition_columns,
208        ),
209        _ => rg_to_dfs_par_over_rg(
210            store,
211            row_group_start,
212            row_group_end,
213            previous_row_count,
214            pre_slice,
215            file_metadata,
216            schema,
217            predicate,
218            row_index,
219            projection,
220            use_statistics,
221            hive_partition_columns,
222        ),
223    }
224}
225
226/// Load several Parquet row groups as DataFrames while filtering predicate items.
227///
228/// This strategy works as follows:
229///
230/// ```text
231/// For each Row Group:
232///     1. Skip this row group if statistics already filter it out
233///     2. Load all the data for the columns needed for the predicate (i.e. the live columns)
234///     3. Create a predicate mask.
235///     4. Load the filtered data for the columns not in the predicate (i.e. the dead columns)
236///     5. Merge the columns into the right DataFrame
237/// ```
238#[allow(clippy::too_many_arguments)]
239fn rg_to_dfs_prefiltered(
240    store: &mmap::ColumnStore,
241    previous_row_count: &mut IdxSize,
242    row_group_start: usize,
243    row_group_end: usize,
244    file_metadata: &FileMetadata,
245    schema: &ArrowSchemaRef,
246    predicate: &ScanIOPredicate,
247    row_index: Option<RowIndex>,
248    projection: &[usize],
249    use_statistics: bool,
250    hive_partition_columns: Option<&[Series]>,
251) -> PolarsResult<Vec<DataFrame>> {
252    if row_group_end > u32::MAX as usize {
253        polars_bail!(ComputeError: "Parquet file contains too many row groups (> {})", u32::MAX);
254    }
255
256    let mut row_offset = *previous_row_count;
257    let rg_offsets: Vec<IdxSize> = match row_index {
258        None => Vec::new(),
259        Some(_) => (row_group_start..row_group_end)
260            .map(|index| {
261                let md = &file_metadata.row_groups[index];
262
263                let current_offset = row_offset;
264                let current_row_count = md.num_rows() as IdxSize;
265                row_offset += current_row_count;
266
267                current_offset
268            })
269            .collect(),
270    };
271
272    // Get the number of live columns
273    let num_live_columns = predicate.live_columns.len();
274    let num_dead_columns =
275        projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - num_live_columns;
276
277    if config::verbose() {
278        eprintln!("parquet live columns = {num_live_columns}, dead columns = {num_dead_columns}");
279    }
280
281    // We create two look-up tables that map indexes offsets into the live- and dead-set onto
282    // column indexes of the schema.
283    // Note: This may contain less than `num_live_columns` if there are hive columns involved.
284    let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns);
285    let mut dead_idx_to_col_idx: Vec<usize> = Vec::with_capacity(num_dead_columns);
286    for &i in projection.iter() {
287        let name = schema.get_at_index(i).unwrap().0.as_str();
288
289        if predicate.live_columns.contains(name) {
290            live_idx_to_col_idx.push(i);
291        } else {
292            dead_idx_to_col_idx.push(i);
293        }
294    }
295
296    let do_parquet_expr = std::env::var("POLARS_PARQUET_EXPR").as_deref() == Ok("1")
297        && predicate.live_columns.len() == 1 // Only do it with one column for now
298        && hive_partition_columns.is_none_or(|hc| {
299            !hc.iter()
300                .any(|c| c.name().as_str() == predicate.live_columns[0].as_str())
301        }) // No hive columns
302        && !schema
303            .get(predicate.live_columns[0].as_str())
304            .unwrap()
305            .dtype()
306            .is_nested(); // No nested columns
307    let column_exprs = do_parquet_expr.then(|| {
308        predicate
309            .live_columns
310            .iter()
311            .map(|name| {
312                let (p, specialized) = predicate.column_predicates.predicates.get(name)?;
313
314                let p = ColumnPredicateExpr::new(
315                    name.clone(),
316                    DataType::from_arrow_field(schema.get(name).unwrap()),
317                    p.clone(),
318                    specialized.clone(),
319                );
320
321                let eq_scalar = p.to_eq_scalar().cloned();
322                let predicate = Arc::new(p) as _;
323
324                Some((
325                    PredicateFilter {
326                        predicate,
327                        include_values: eq_scalar.is_none(),
328                    },
329                    eq_scalar,
330                ))
331            })
332            .collect::<Vec<_>>()
333    });
334
335    let mask_setting = PrefilterMaskSetting::init_from_env();
336    let projected_schema = schema.try_project_indices(projection).unwrap();
337
338    let dfs: Vec<Option<DataFrame>> = POOL.install(move || {
339        // Set partitioned fields to prevent quadratic behavior.
340        // Ensure all row groups are partitioned.
341
342        (row_group_start..row_group_end)
343            .into_par_iter()
344            .map(|rg_idx| {
345                let md = &file_metadata.row_groups[rg_idx];
346
347                if use_statistics {
348                    match read_this_row_group(Some(predicate), md, schema) {
349                        Ok(false) => return Ok(None),
350                        Ok(true) => {},
351                        Err(e) => return Err(e),
352                    }
353                }
354
355                let sorting_map = create_sorting_map(md);
356
357                // Collect the data for the live columns
358                let (live_columns, filters) = (0..live_idx_to_col_idx.len())
359                    .into_par_iter()
360                    .map(|i| {
361                        let col_idx = live_idx_to_col_idx[i];
362
363                        let (name, field) = schema.get_at_index(col_idx).unwrap();
364
365                        let Some(iter) = md.columns_under_root_iter(name) else {
366                            return Ok((
367                                Column::full_null(
368                                    name.clone(),
369                                    md.num_rows(),
370                                    &DataType::from_arrow_field(field),
371                                ),
372                                None,
373                            ));
374                        };
375
376                        let part = iter.collect::<Vec<_>>();
377
378                        let (filter, equals_scalar) = match column_exprs.as_ref() {
379                            None => (None, None),
380                            Some(column_expr) => match column_expr.get(i) {
381                                Some(Some((p, s))) => {
382                                    (Some(Filter::Predicate(p.clone())), s.clone())
383                                },
384                                _ => (None, None),
385                            },
386                        };
387
388                        let (mut series, pred_true_mask) =
389                            column_idx_to_series(col_idx, part.as_slice(), filter, schema, store)?;
390
391                        debug_assert!(
392                            pred_true_mask.is_empty() || pred_true_mask.len() == md.num_rows()
393                        );
394                        match equals_scalar {
395                            None => {
396                                try_set_sorted_flag(&mut series, col_idx, &sorting_map);
397                                Ok((
398                                    series.into_column(),
399                                    (!pred_true_mask.is_empty()).then_some(pred_true_mask),
400                                ))
401                            },
402                            Some(sc) => Ok((
403                                Column::new_scalar(name.clone(), sc, pred_true_mask.set_bits()),
404                                Some(pred_true_mask),
405                            )),
406                        }
407                    })
408                    .collect::<PolarsResult<(Vec<_>, Vec<_>)>>()?;
409
410                // Apply the predicate to the live columns and save the dataframe and the bitmask
411                let md = &file_metadata.row_groups[rg_idx];
412                let filter_mask: Bitmap;
413                let mut df: DataFrame;
414
415                if let Some(Some(f)) = filters.first() {
416                    if f.set_bits() == 0 {
417                        if config::verbose() {
418                            eprintln!("parquet filter mask found that row group can be skipped");
419                        }
420
421                        return Ok(None);
422                    }
423
424                    if let Some(rc) = &row_index {
425                        df = unsafe { DataFrame::new_no_checks(md.num_rows(), vec![]) };
426                        unsafe {
427                            df.with_row_index_mut(
428                                rc.name.clone(),
429                                Some(rg_offsets[rg_idx] + rc.offset),
430                            )
431                        };
432                        df = df.filter(&BooleanChunked::from_chunk_iter(
433                            PlSmallStr::EMPTY,
434                            [BooleanArray::new(ArrowDataType::Boolean, f.clone(), None)],
435                        ))?;
436                        unsafe { df.column_extend_unchecked(live_columns) }
437                    } else {
438                        df = DataFrame::new(live_columns).unwrap();
439                    }
440
441                    filter_mask = f.clone();
442                } else {
443                    df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) };
444
445                    materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
446                    let s = predicate.predicate.evaluate_io(&df)?;
447                    let mask = s.bool().expect("filter predicates was not of type boolean");
448
449                    // Create without hive columns - the first merge phase does not handle hive partitions. This also saves
450                    // some unnecessary filtering.
451                    df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };
452
453                    if let Some(rc) = &row_index {
454                        unsafe {
455                            df.with_row_index_mut(
456                                rc.name.clone(),
457                                Some(rg_offsets[rg_idx] + rc.offset),
458                            )
459                        };
460                    }
461                    df = df.filter(mask)?;
462
463                    let mut mut_filter_mask = BitmapBuilder::with_capacity(mask.len());
464
465                    // We need to account for the validity of the items
466                    for chunk in mask.downcast_iter() {
467                        match chunk.validity() {
468                            None => mut_filter_mask.extend_from_bitmap(chunk.values()),
469                            Some(validity) => {
470                                mut_filter_mask.extend_from_bitmap(&(validity & chunk.values()))
471                            },
472                        }
473                    }
474
475                    filter_mask = mut_filter_mask.freeze();
476                }
477
478                debug_assert_eq!(md.num_rows(), filter_mask.len());
479                debug_assert_eq!(df.height(), filter_mask.set_bits());
480
481                if filter_mask.set_bits() == 0 {
482                    if config::verbose() {
483                        eprintln!("parquet filter mask found that row group can be skipped");
484                    }
485
486                    return Ok(None);
487                }
488
489                // We don't need to do any further work if there are no dead columns
490                if dead_idx_to_col_idx.is_empty() {
491                    materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
492
493                    return Ok(Some(df));
494                }
495
496                let prefilter_cost = matches!(mask_setting, PrefilterMaskSetting::Auto)
497                    .then(|| calc_prefilter_cost(&filter_mask))
498                    .unwrap_or_default();
499
500                // #[cfg(debug_assertions)]
501                // {
502                //     let md = &file_metadata.row_groups[rg_idx];
503                //     debug_assert_eq!(md.num_rows(), mask.len());
504                // }
505
506                let n_rows_in_result = filter_mask.set_bits();
507
508                let dead_columns = (0..dead_idx_to_col_idx.len())
509                    .into_par_iter()
510                    .map(|i| {
511                        let col_idx = dead_idx_to_col_idx[i];
512
513                        let (name, field) = schema.get_at_index(col_idx).unwrap();
514
515                        let Some(iter) = md.columns_under_root_iter(name) else {
516                            return Ok(Column::full_null(
517                                name.clone(),
518                                n_rows_in_result,
519                                &DataType::from_arrow_field(field),
520                            ));
521                        };
522
523                        let field_md = iter.collect::<Vec<_>>();
524
525                        let pre = || {
526                            let (array, _) = column_idx_to_series(
527                                col_idx,
528                                field_md.as_slice(),
529                                Some(Filter::new_masked(filter_mask.clone())),
530                                schema,
531                                store,
532                            )?;
533
534                            PolarsResult::Ok(array)
535                        };
536                        let post = || {
537                            let (array, _) = column_idx_to_series(
538                                col_idx,
539                                field_md.as_slice(),
540                                None,
541                                schema,
542                                store,
543                            )?;
544
545                            debug_assert_eq!(array.len(), md.num_rows());
546
547                            let mask_arr = BooleanArray::new(
548                                ArrowDataType::Boolean,
549                                filter_mask.clone(),
550                                None,
551                            );
552                            let mask_arr = BooleanChunked::from(mask_arr);
553                            array.filter(&mask_arr)
554                        };
555
556                        let mut series = if mask_setting.should_prefilter(
557                            prefilter_cost,
558                            &schema.get_at_index(col_idx).unwrap().1.dtype,
559                        ) {
560                            pre()?
561                        } else {
562                            post()?
563                        };
564
565                        debug_assert_eq!(series.len(), filter_mask.set_bits());
566
567                        try_set_sorted_flag(&mut series, col_idx, &sorting_map);
568
569                        Ok(series.into_column())
570                    })
571                    .collect::<PolarsResult<Vec<Column>>>()?;
572
573                debug_assert!(dead_columns.iter().all(|v| v.len() == df.height()));
574
575                let height = df.height();
576                let live_columns = df.take_columns();
577
578                assert_eq!(live_columns.len() + dead_columns.len(), projection.len());
579
580                let mut merged = Vec::with_capacity(live_columns.len() + dead_columns.len());
581
582                // * All hive columns are always in `live_columns` if there are any.
583                // * `materialize_hive_partitions()` guarantees `live_columns` is sorted by their appearance in `reader_schema`.
584
585                // We re-use `hive::merge_sorted_to_schema_order()` as it performs most of the merge operation we want.
586                // But we take out the `row_index` column as it isn't on the right side.
587
588                if row_index.is_some() {
589                    merged.push(live_columns[0].clone());
590                };
591
592                hive::merge_sorted_to_schema_order(
593                    &mut dead_columns.into_iter(), // df_columns
594                    &mut live_columns.into_iter().skip(row_index.is_some() as usize), // hive_columns
595                    &projected_schema,
596                    &mut merged,
597                );
598
599                // SAFETY: This is completely based on the schema so all column names are unique
600                // and the length is given by the parquet file which should always be the same.
601                let mut df = unsafe { DataFrame::new_no_checks(height, merged) };
602
603                materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
604
605                PolarsResult::Ok(Some(df))
606            })
607            .collect::<PolarsResult<Vec<Option<DataFrame>>>>()
608    })?;
609
610    let dfs: Vec<DataFrame> = dfs.into_iter().flatten().collect();
611
612    let row_count: usize = dfs.iter().map(|df| df.height()).sum();
613    let row_count = IdxSize::try_from(row_count).map_err(|_| ROW_COUNT_OVERFLOW_ERR)?;
614    *previous_row_count = previous_row_count
615        .checked_add(row_count)
616        .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
617
618    Ok(dfs)
619}
620
621#[allow(clippy::too_many_arguments)]
622// might parallelize over columns
623fn rg_to_dfs_optionally_par_over_columns(
624    store: &mmap::ColumnStore,
625    previous_row_count: &mut IdxSize,
626    row_group_start: usize,
627    row_group_end: usize,
628    slice: (usize, usize),
629    file_metadata: &FileMetadata,
630    schema: &ArrowSchemaRef,
631    predicate: Option<&ScanIOPredicate>,
632    row_index: Option<RowIndex>,
633    parallel: ParallelStrategy,
634    projection: &[usize],
635    use_statistics: bool,
636    hive_partition_columns: Option<&[Series]>,
637) -> PolarsResult<Vec<DataFrame>> {
638    let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
639
640    let mut n_rows_processed: usize = (0..row_group_start)
641        .map(|i| file_metadata.row_groups[i].num_rows())
642        .sum();
643    let slice_end = slice.0 + slice.1;
644
645    for rg_idx in row_group_start..row_group_end {
646        let md = &file_metadata.row_groups[rg_idx];
647
648        let rg_slice =
649            split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
650        let current_row_count = md.num_rows() as IdxSize;
651
652        if use_statistics
653            && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
654        {
655            *previous_row_count += rg_slice.1 as IdxSize;
656            continue;
657        }
658
659        let sorting_map = create_sorting_map(md);
660
661        let f = |column_i: &usize| {
662            let (name, field) = schema.get_at_index(*column_i).unwrap();
663
664            let Some(iter) = md.columns_under_root_iter(name) else {
665                return Ok(Column::full_null(
666                    name.clone(),
667                    rg_slice.1,
668                    &DataType::from_arrow_field(field),
669                ));
670            };
671
672            let part = iter.collect::<Vec<_>>();
673
674            let (mut series, _) = column_idx_to_series(
675                *column_i,
676                part.as_slice(),
677                Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
678                schema,
679                store,
680            )?;
681
682            try_set_sorted_flag(&mut series, *column_i, &sorting_map);
683            Ok(series.into_column())
684        };
685
686        let columns = if let ParallelStrategy::Columns = parallel {
687            POOL.install(|| {
688                projection
689                    .par_iter()
690                    .map(f)
691                    .collect::<PolarsResult<Vec<_>>>()
692            })?
693        } else {
694            projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
695        };
696
697        let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
698        if let Some(rc) = &row_index {
699            unsafe {
700                df.with_row_index_mut(
701                    rc.name.clone(),
702                    Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
703                )
704            };
705        }
706
707        materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
708        apply_predicate(
709            &mut df,
710            predicate.as_ref().map(|p| p.predicate.as_ref()),
711            true,
712        )?;
713
714        *previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
715            polars_err!(
716                ComputeError: "Parquet file produces more than pow(2, 32) rows; \
717                consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
718                or set 'streaming'"
719            ),
720        )?;
721        dfs.push(df);
722
723        if *previous_row_count as usize >= slice_end {
724            break;
725        }
726    }
727
728    Ok(dfs)
729}
730
731#[allow(clippy::too_many_arguments)]
732// parallelizes over row groups
733fn rg_to_dfs_par_over_rg(
734    store: &mmap::ColumnStore,
735    row_group_start: usize,
736    row_group_end: usize,
737    rows_read: &mut IdxSize,
738    slice: (usize, usize),
739    file_metadata: &FileMetadata,
740    schema: &ArrowSchemaRef,
741    predicate: Option<&ScanIOPredicate>,
742    row_index: Option<RowIndex>,
743    projection: &[usize],
744    use_statistics: bool,
745    hive_partition_columns: Option<&[Series]>,
746) -> PolarsResult<Vec<DataFrame>> {
747    // compute the limits per row group and the row count offsets
748    let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
749
750    let mut n_rows_processed: usize = (0..row_group_start)
751        .map(|i| file_metadata.row_groups[i].num_rows())
752        .sum();
753    let slice_end = slice.0 + slice.1;
754
755    // rows_scanned is the number of rows that have been scanned so far when checking for overlap with the slice.
756    // rows_read is the number of rows found to overlap with the slice, and thus the number of rows that will be
757    // read into a dataframe.
758    let mut rows_scanned: IdxSize;
759
760    if row_group_start > 0 {
761        // In the case of async reads, we need to account for the fact that row_group_start may be greater than
762        // zero due to earlier processing.
763        // For details, see: https://github.com/pola-rs/polars/pull/20508#discussion_r1900165649
764        rows_scanned = (0..row_group_start)
765            .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
766            .sum();
767    } else {
768        rows_scanned = 0;
769    }
770
771    for i in row_group_start..row_group_end {
772        let row_count_start = rows_scanned;
773        let rg_md = &file_metadata.row_groups[i];
774        let n_rows_this_file = rg_md.num_rows();
775        let rg_slice =
776            split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
777        rows_scanned = rows_scanned
778            .checked_add(n_rows_this_file as IdxSize)
779            .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
780
781        *rows_read += rg_slice.1 as IdxSize;
782
783        if rg_slice.1 == 0 {
784            continue;
785        }
786
787        row_groups.push((rg_md, rg_slice, row_count_start));
788    }
789
790    let dfs = POOL.install(|| {
791        // Set partitioned fields to prevent quadratic behavior.
792        // Ensure all row groups are partitioned.
793        row_groups
794            .into_par_iter()
795            .map(|(md, slice, row_count_start)| {
796                if slice.1 == 0 || use_statistics && !read_this_row_group(predicate, md, schema)? {
797                    return Ok(None);
798                }
799                // test we don't read the parquet file if this env var is set
800                #[cfg(debug_assertions)]
801                {
802                    assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
803                }
804
805                let sorting_map = create_sorting_map(md);
806
807                let columns = projection
808                    .iter()
809                    .map(|column_i| {
810                        let (name, field) = schema.get_at_index(*column_i).unwrap();
811
812                        let Some(iter) = md.columns_under_root_iter(name) else {
813                            return Ok(Column::full_null(
814                                name.clone(),
815                                md.num_rows(),
816                                &DataType::from_arrow_field(field),
817                            ));
818                        };
819
820                        let part = iter.collect::<Vec<_>>();
821
822                        let (mut series, _) = column_idx_to_series(
823                            *column_i,
824                            part.as_slice(),
825                            Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
826                            schema,
827                            store,
828                        )?;
829
830                        try_set_sorted_flag(&mut series, *column_i, &sorting_map);
831                        Ok(series.into_column())
832                    })
833                    .collect::<PolarsResult<Vec<_>>>()?;
834
835                let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
836
837                if let Some(rc) = &row_index {
838                    unsafe {
839                        df.with_row_index_mut(
840                            rc.name.clone(),
841                            Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
842                        )
843                    };
844                }
845
846                materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
847                apply_predicate(
848                    &mut df,
849                    predicate.as_ref().map(|p| p.predicate.as_ref()),
850                    false,
851                )?;
852
853                Ok(Some(df))
854            })
855            .collect::<PolarsResult<Vec<_>>>()
856    })?;
857    Ok(dfs.into_iter().flatten().collect())
858}
859
860#[allow(clippy::too_many_arguments)]
861pub fn read_parquet<R: MmapBytesReader>(
862    mut reader: R,
863    pre_slice: (usize, usize),
864    projection: Option<&[usize]>,
865    reader_schema: &ArrowSchemaRef,
866    metadata: Option<FileMetadataRef>,
867    predicate: Option<&ScanIOPredicate>,
868    mut parallel: ParallelStrategy,
869    row_index: Option<RowIndex>,
870    use_statistics: bool,
871    hive_partition_columns: Option<&[Series]>,
872) -> PolarsResult<DataFrame> {
873    // Fast path.
874    if pre_slice.1 == 0 {
875        return Ok(materialize_empty_df(
876            projection,
877            reader_schema,
878            hive_partition_columns,
879            row_index.as_ref(),
880        ));
881    }
882
883    let file_metadata = metadata
884        .map(Ok)
885        .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
886    let n_row_groups = file_metadata.row_groups.len();
887
888    // if there are multiple row groups and categorical data
889    // we need a string cache
890    // we keep it alive until the end of the function
891    let _sc = if n_row_groups > 1 {
892        #[cfg(feature = "dtype-categorical")]
893        {
894            Some(polars_core::StringCacheHolder::hold())
895        }
896        #[cfg(not(feature = "dtype-categorical"))]
897        {
898            Some(0u8)
899        }
900    } else {
901        None
902    };
903
904    let materialized_projection = projection
905        .map(Cow::Borrowed)
906        .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
907
908    if let Some(predicate) = predicate {
909        let prefilter_env = std::env::var("POLARS_PARQUET_PREFILTER");
910        let prefilter_env = prefilter_env.as_deref();
911
912        let num_live_variables = predicate.live_columns.len();
913        let mut do_prefilter = false;
914
915        do_prefilter |= prefilter_env == Ok("1"); // Force enable
916        do_prefilter |= matches!(parallel, ParallelStrategy::Auto)
917            && num_live_variables * n_row_groups >= POOL.current_num_threads()
918            && materialized_projection.len() >= num_live_variables;
919
920        do_prefilter &= prefilter_env != Ok("0"); // Force disable
921
922        if do_prefilter {
923            parallel = ParallelStrategy::Prefiltered;
924        }
925    }
926    if ParallelStrategy::Auto == parallel {
927        if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
928        {
929            parallel = ParallelStrategy::RowGroups;
930        } else {
931            parallel = ParallelStrategy::Columns;
932        }
933    }
934
935    if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
936        parallel = ParallelStrategy::None;
937    }
938
939    let reader = ReaderBytes::from(&mut reader);
940    let store = mmap::ColumnStore::Local(unsafe {
941        std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
942    });
943
944    let dfs = rg_to_dfs(
945        &store,
946        &mut 0,
947        0,
948        n_row_groups,
949        pre_slice,
950        &file_metadata,
951        reader_schema,
952        predicate,
953        row_index.clone(),
954        parallel,
955        &materialized_projection,
956        use_statistics,
957        hive_partition_columns,
958    )?;
959
960    if dfs.is_empty() {
961        Ok(materialize_empty_df(
962            projection,
963            reader_schema,
964            hive_partition_columns,
965            row_index.as_ref(),
966        ))
967    } else {
968        accumulate_dataframes_vertical(dfs)
969    }
970}
971
972pub struct FetchRowGroupsFromMmapReader(ReaderBytes<'static>);
973
974impl FetchRowGroupsFromMmapReader {
975    pub fn new(mut reader: Box<dyn MmapBytesReader>) -> PolarsResult<Self> {
976        // SAFETY: we will keep ownership on the struct and reference the bytes on the heap.
977        // this should not work with passed bytes so we check if it is a file
978        assert!(reader.to_file().is_some());
979        let reader_ptr = unsafe {
980            std::mem::transmute::<&mut dyn MmapBytesReader, &'static mut dyn MmapBytesReader>(
981                reader.as_mut(),
982            )
983        };
984        let reader_bytes = get_reader_bytes(reader_ptr)?;
985        Ok(FetchRowGroupsFromMmapReader(reader_bytes))
986    }
987
988    fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
989        // @TODO: we can something smarter here with mmap
990        Ok(mmap::ColumnStore::Local(self.0.to_memslice()))
991    }
992}
993
994// We couldn't use a trait as async trait gave very hard HRT lifetime errors.
995// Maybe a puzzle for another day.
996pub enum RowGroupFetcher {
997    #[cfg(feature = "cloud")]
998    ObjectStore(FetchRowGroupsFromObjectStore),
999    Local(FetchRowGroupsFromMmapReader),
1000}
1001
1002#[cfg(feature = "cloud")]
1003impl From<FetchRowGroupsFromObjectStore> for RowGroupFetcher {
1004    fn from(value: FetchRowGroupsFromObjectStore) -> Self {
1005        RowGroupFetcher::ObjectStore(value)
1006    }
1007}
1008
1009impl From<FetchRowGroupsFromMmapReader> for RowGroupFetcher {
1010    fn from(value: FetchRowGroupsFromMmapReader) -> Self {
1011        RowGroupFetcher::Local(value)
1012    }
1013}
1014
1015impl RowGroupFetcher {
1016    async fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
1017        match self {
1018            RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups),
1019            #[cfg(feature = "cloud")]
1020            RowGroupFetcher::ObjectStore(f) => f.fetch_row_groups(_row_groups).await,
1021        }
1022    }
1023}
1024
1025pub(super) fn compute_row_group_range(
1026    row_group_start: usize,
1027    row_group_end: usize,
1028    slice: (usize, usize),
1029    row_groups: &[RowGroupMetadata],
1030) -> std::ops::Range<usize> {
1031    let mut start = row_group_start;
1032    let mut cum_rows: usize = (0..row_group_start).map(|i| row_groups[i].num_rows()).sum();
1033    let row_group_end = row_groups.len().min(row_group_end);
1034
1035    loop {
1036        if start == row_group_end {
1037            break;
1038        }
1039
1040        cum_rows += row_groups[start].num_rows();
1041
1042        if cum_rows >= slice.0 {
1043            break;
1044        }
1045
1046        start += 1;
1047    }
1048
1049    let slice_end = slice.0 + slice.1;
1050    let mut end = (1 + start).min(row_group_end);
1051
1052    loop {
1053        if end == row_group_end {
1054            break;
1055        }
1056
1057        if cum_rows >= slice_end {
1058            break;
1059        }
1060
1061        cum_rows += row_groups[end].num_rows();
1062        end += 1;
1063    }
1064
1065    start..end
1066}
1067
1068pub struct BatchedParquetReader {
1069    // use to keep ownership
1070    #[allow(dead_code)]
1071    row_group_fetcher: RowGroupFetcher,
1072    slice: (usize, usize),
1073    projection: Arc<[usize]>,
1074    schema: ArrowSchemaRef,
1075    metadata: FileMetadataRef,
1076    predicate: Option<ScanIOPredicate>,
1077    row_index: Option<RowIndex>,
1078    rows_read: IdxSize,
1079    row_group_offset: usize,
1080    n_row_groups: usize,
1081    chunks_fifo: VecDeque<DataFrame>,
1082    parallel: ParallelStrategy,
1083    chunk_size: usize,
1084    use_statistics: bool,
1085    hive_partition_columns: Option<Arc<[Series]>>,
1086    include_file_path: Option<Column>,
1087    /// Has returned at least one materialized frame.
1088    has_returned: bool,
1089}
1090
1091impl BatchedParquetReader {
1092    #[allow(clippy::too_many_arguments)]
1093    pub fn new(
1094        row_group_fetcher: RowGroupFetcher,
1095        metadata: FileMetadataRef,
1096        schema: ArrowSchemaRef,
1097        slice: (usize, usize),
1098        projection: Option<Vec<usize>>,
1099        predicate: Option<ScanIOPredicate>,
1100        row_index: Option<RowIndex>,
1101        chunk_size: usize,
1102        use_statistics: bool,
1103        hive_partition_columns: Option<Vec<Series>>,
1104        include_file_path: Option<(PlSmallStr, Arc<str>)>,
1105        mut parallel: ParallelStrategy,
1106    ) -> PolarsResult<Self> {
1107        let n_row_groups = metadata.row_groups.len();
1108        let projection = projection
1109            .map(Arc::from)
1110            .unwrap_or_else(|| (0usize..schema.len()).collect::<Arc<[_]>>());
1111
1112        parallel = match parallel {
1113            ParallelStrategy::Auto => {
1114                if n_row_groups > projection.len() || n_row_groups > POOL.current_num_threads() {
1115                    ParallelStrategy::RowGroups
1116                } else {
1117                    ParallelStrategy::Columns
1118                }
1119            },
1120            _ => parallel,
1121        };
1122
1123        if let (ParallelStrategy::Columns, true) = (parallel, projection.len() == 1) {
1124            parallel = ParallelStrategy::None;
1125        }
1126
1127        Ok(BatchedParquetReader {
1128            row_group_fetcher,
1129            slice,
1130            projection,
1131            schema,
1132            metadata,
1133            row_index,
1134            rows_read: 0,
1135            predicate,
1136            row_group_offset: 0,
1137            n_row_groups,
1138            chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()),
1139            parallel,
1140            chunk_size,
1141            use_statistics,
1142            hive_partition_columns: hive_partition_columns.map(Arc::from),
1143            include_file_path: include_file_path.map(|(col, path)| {
1144                Column::new_scalar(
1145                    col,
1146                    Scalar::new(
1147                        DataType::String,
1148                        AnyValue::StringOwned(path.as_ref().into()),
1149                    ),
1150                    1,
1151                )
1152            }),
1153            has_returned: false,
1154        })
1155    }
1156
1157    pub fn schema(&self) -> &ArrowSchemaRef {
1158        &self.schema
1159    }
1160
1161    pub fn is_finished(&self) -> bool {
1162        self.row_group_offset >= self.n_row_groups
1163    }
1164
1165    pub fn finishes_this_batch(&self, n: usize) -> bool {
1166        self.row_group_offset + n > self.n_row_groups
1167    }
1168
1169    #[cfg(feature = "async")]
1170    pub async fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
1171        if self.rows_read as usize == self.slice.0 + self.slice.1 && self.has_returned {
1172            return if self.chunks_fifo.is_empty() {
1173                Ok(None)
1174            } else {
1175                // the range end point must not be greater than the length of the deque
1176                let n_drainable = std::cmp::min(n, self.chunks_fifo.len());
1177                Ok(Some(self.chunks_fifo.drain(..n_drainable).collect()))
1178            };
1179        }
1180
1181        let mut skipped_all_rgs = false;
1182        // fill up fifo stack
1183        if (self.rows_read as usize) < self.slice.0 + self.slice.1
1184            && self.row_group_offset < self.n_row_groups
1185            && self.chunks_fifo.len() < n
1186        {
1187            // Ensure we apply the limit on the metadata, before we download the row-groups.
1188            let row_group_range = compute_row_group_range(
1189                self.row_group_offset,
1190                self.row_group_offset + n,
1191                self.slice,
1192                &self.metadata.row_groups,
1193            );
1194
1195            let store = self
1196                .row_group_fetcher
1197                .fetch_row_groups(row_group_range.clone())
1198                .await?;
1199
1200            let prev_rows_read = self.rows_read;
1201
1202            let mut dfs = {
1203                // Spawn the decoding and decompression of the bytes on a rayon task.
1204                // This will ensure we don't block the async thread.
1205
1206                // Make everything 'static.
1207                let mut rows_read = self.rows_read;
1208                let row_index = self.row_index.clone();
1209                let predicate = self.predicate.clone();
1210                let schema = self.schema.clone();
1211                let metadata = self.metadata.clone();
1212                let parallel = self.parallel;
1213                let projection = self.projection.clone();
1214                let use_statistics = self.use_statistics;
1215                let hive_partition_columns = self.hive_partition_columns.clone();
1216                let slice = self.slice;
1217
1218                let func = move || {
1219                    let dfs = rg_to_dfs(
1220                        &store,
1221                        &mut rows_read,
1222                        row_group_range.start,
1223                        row_group_range.end,
1224                        slice,
1225                        &metadata,
1226                        &schema,
1227                        predicate.as_ref(),
1228                        row_index,
1229                        parallel,
1230                        &projection,
1231                        use_statistics,
1232                        hive_partition_columns.as_deref(),
1233                    );
1234
1235                    dfs.map(|x| (x, rows_read))
1236                };
1237
1238                let (dfs, rows_read) = crate::pl_async::get_runtime().spawn_rayon(func).await?;
1239
1240                self.rows_read = rows_read;
1241                dfs
1242            };
1243
1244            if let Some(column) = self.include_file_path.as_ref() {
1245                if dfs.first().is_some_and(|x| x.width() > 0) {
1246                    for df in &mut dfs {
1247                        unsafe { df.with_column_unchecked(column.new_from_index(0, df.height())) };
1248                    }
1249                } else {
1250                    let (offset, len) = self.slice;
1251                    let end = offset + len;
1252
1253                    debug_assert_eq!(dfs.len(), 1);
1254                    dfs.get_mut(0).unwrap().insert_column(
1255                        0,
1256                        column.new_from_index(
1257                            0,
1258                            (self.rows_read.min(end.try_into().unwrap_or(IdxSize::MAX))
1259                                - prev_rows_read)
1260                                .try_into()
1261                                .unwrap(),
1262                        ),
1263                    )?;
1264                }
1265            }
1266
1267            self.row_group_offset += n;
1268
1269            // case where there is no data in the file
1270            // the streaming engine needs at least a single chunk
1271            if self.rows_read == 0 && dfs.is_empty() {
1272                let mut df = materialize_empty_df(
1273                    Some(self.projection.as_ref()),
1274                    &self.schema,
1275                    self.hive_partition_columns.as_deref(),
1276                    self.row_index.as_ref(),
1277                );
1278
1279                if let Some(ca) = &self.include_file_path {
1280                    unsafe {
1281                        df.with_column_unchecked(ca.clear().into_column());
1282                    }
1283                };
1284
1285                return Ok(Some(vec![df]));
1286            }
1287
1288            // TODO! this is slower than it needs to be
1289            // we also need to parallelize over row groups here.
1290
1291            skipped_all_rgs |= dfs.is_empty();
1292            for mut df in dfs {
1293                // make sure that the chunks are not too large
1294                let n = df.height() / self.chunk_size;
1295                if n > 1 {
1296                    for df in split_df(&mut df, n, false) {
1297                        self.chunks_fifo.push_back(df)
1298                    }
1299                } else {
1300                    self.chunks_fifo.push_back(df)
1301                }
1302            }
1303        } else {
1304            skipped_all_rgs = !self.has_returned;
1305        };
1306
1307        if self.chunks_fifo.is_empty() {
1308            if skipped_all_rgs {
1309                self.has_returned = true;
1310                let mut df = materialize_empty_df(
1311                    Some(self.projection.as_ref()),
1312                    &self.schema,
1313                    self.hive_partition_columns.as_deref(),
1314                    self.row_index.as_ref(),
1315                );
1316
1317                if let Some(ca) = &self.include_file_path {
1318                    unsafe {
1319                        df.with_column_unchecked(ca.clear().into_column());
1320                    }
1321                };
1322
1323                Ok(Some(vec![df]))
1324            } else {
1325                Ok(None)
1326            }
1327        } else {
1328            let mut chunks = Vec::with_capacity(n);
1329            let mut i = 0;
1330            while let Some(df) = self.chunks_fifo.pop_front() {
1331                chunks.push(df);
1332                i += 1;
1333                if i == n {
1334                    break;
1335                }
1336            }
1337
1338            self.has_returned = true;
1339            Ok(Some(chunks))
1340        }
1341    }
1342
1343    /// Turn the batched reader into an iterator.
1344    #[cfg(feature = "async")]
1345    pub fn iter(self, batches_per_iter: usize) -> BatchedParquetIter {
1346        BatchedParquetIter {
1347            batches_per_iter,
1348            inner: self,
1349            current_batch: vec![].into_iter(),
1350        }
1351    }
1352}
1353
1354#[cfg(feature = "async")]
1355pub struct BatchedParquetIter {
1356    batches_per_iter: usize,
1357    inner: BatchedParquetReader,
1358    current_batch: std::vec::IntoIter<DataFrame>,
1359}
1360
1361#[cfg(feature = "async")]
1362impl BatchedParquetIter {
1363    // todo! implement stream
1364    pub(crate) async fn next_(&mut self) -> Option<PolarsResult<DataFrame>> {
1365        match self.current_batch.next() {
1366            Some(df) => Some(Ok(df)),
1367            None => match self.inner.next_batches(self.batches_per_iter).await {
1368                Err(e) => Some(Err(e)),
1369                Ok(opt_batch) => {
1370                    let batch = opt_batch?;
1371                    self.current_batch = batch.into_iter();
1372                    self.current_batch.next().map(Ok)
1373                },
1374            },
1375        }
1376    }
1377}
1378
1379pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
1380    let num_edges = mask.num_edges() as f64;
1381    let rg_len = mask.len() as f64;
1382
1383    // @GB: I did quite some analysis on this.
1384    //
1385    // Pre-filtered and Post-filtered can both be faster in certain scenarios.
1386    //
1387    // - Pre-filtered is faster when there is some amount of clustering or
1388    // sorting involved or if the number of values selected is small.
1389    // - Post-filtering is faster when the predicate selects a somewhat random
1390    // elements throughout the row group.
1391    //
1392    // The following is a heuristic value to try and estimate which one is
1393    // faster. Essentially, it sees how many times it needs to switch between
1394    // skipping items and collecting items and compares it against the number
1395    // of values that it will collect.
1396    //
1397    // Closer to 0: pre-filtering is probably better.
1398    // Closer to 1: post-filtering is probably better.
1399    (num_edges / rg_len).clamp(0.0, 1.0)
1400}
1401
1402pub enum PrefilterMaskSetting {
1403    Auto,
1404    Pre,
1405    Post,
1406}
1407
1408impl PrefilterMaskSetting {
1409    pub fn init_from_env() -> Self {
1410        std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
1411            "auto" => Self::Auto,
1412            "pre" => Self::Pre,
1413            "post" => Self::Post,
1414            _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
1415        })
1416    }
1417
1418    pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
1419        match self {
1420            Self::Auto => {
1421                // Prefiltering is only expensive for nested types so we make the cut-off quite
1422                // high.
1423                let is_nested = dtype.is_nested();
1424
1425                // We empirically selected these numbers.
1426                !is_nested && prefilter_cost <= 0.01
1427            },
1428            Self::Pre => true,
1429            Self::Post => false,
1430        }
1431    }
1432}