polars_io/parquet/read/
read_impl.rs

1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_buffer::Buffer;
6use polars_core::chunked_array::builder::NullChunkedBuilder;
7use polars_core::prelude::*;
8use polars_core::series::IsSorted;
9use polars_core::utils::accumulate_dataframes_vertical;
10use polars_core::{POOL, config};
11use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
12use rayon::prelude::*;
13
14use super::mmap::mmap_columns;
15use super::utils::materialize_empty_df;
16use super::{ParallelStrategy, mmap};
17use crate::RowIndex;
18use crate::hive::materialize_hive_partitions;
19use crate::mmap::{MmapBytesReader, ReaderBytes};
20use crate::parquet::metadata::FileMetadataRef;
21use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
22use crate::utils::slice::split_slice_at_file;
23
24#[cfg(debug_assertions)]
25// Ensure we get the proper polars types from schema inference
26// This saves unneeded casts.
27fn assert_dtypes(dtype: &ArrowDataType) {
28    use ArrowDataType as D;
29
30    match dtype {
31        // These should all be cast to the BinaryView / Utf8View variants
32        D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
33
34        // These should be cast to Float32
35        D::Float16 => unreachable!(),
36
37        // This should have been converted to a LargeList
38        D::List(_) => unreachable!(),
39
40        // This should have been converted to a LargeList(Struct(_))
41        D::Map(_, _) => unreachable!(),
42
43        // Recursive checks
44        D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
45        D::Extension(ext) => assert_dtypes(&ext.inner),
46        D::LargeList(inner) => assert_dtypes(&inner.dtype),
47        D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
48        D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
49
50        _ => {},
51    }
52}
53
54fn should_copy_sortedness(dtype: &DataType) -> bool {
55    // @NOTE: For now, we are a bit conservative with this.
56    use DataType as D;
57
58    matches!(
59        dtype,
60        D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
61    )
62}
63
64pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {
65    let Some((sorted_col, is_sorted)) = sorting_map.first() else {
66        return;
67    };
68    if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {
69        return;
70    }
71    if config::verbose() {
72        eprintln!(
73            "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
74            series.name()
75        );
76    }
77
78    series.set_sorted_flag(*is_sorted);
79}
80
81pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {
82    let capacity = md.sorting_columns().map_or(0, |s| s.len());
83    let mut sorting_map = Vec::with_capacity(capacity);
84
85    if let Some(sorting_columns) = md.sorting_columns() {
86        for sorting in sorting_columns {
87            sorting_map.push((
88                sorting.column_idx as usize,
89                if sorting.descending {
90                    IsSorted::Descending
91                } else {
92                    IsSorted::Ascending
93                },
94            ))
95        }
96    }
97
98    sorting_map
99}
100
101fn column_idx_to_series(
102    column_i: usize,
103    // The metadata belonging to this column
104    field_md: &[&ColumnChunkMetadata],
105    filter: Option<Filter>,
106    file_schema: &ArrowSchema,
107    store: &mmap::ColumnStore,
108) -> PolarsResult<(Series, Bitmap)> {
109    let field = file_schema.get_at_index(column_i).unwrap().1;
110
111    #[cfg(debug_assertions)]
112    {
113        assert_dtypes(field.dtype())
114    }
115    let columns = mmap_columns(store, field_md);
116    let (arrays, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
117    let series = Series::try_from((field, arrays))?;
118
119    Ok((series, pred_true_mask))
120}
121
122#[allow(clippy::too_many_arguments)]
123fn rg_to_dfs(
124    store: &mmap::ColumnStore,
125    previous_row_count: &mut IdxSize,
126    row_group_start: usize,
127    row_group_end: usize,
128    pre_slice: (usize, usize),
129    file_metadata: &FileMetadata,
130    schema: &ArrowSchemaRef,
131    row_index: Option<RowIndex>,
132    parallel: ParallelStrategy,
133    projection: &[usize],
134    hive_partition_columns: Option<&[Series]>,
135) -> PolarsResult<Vec<DataFrame>> {
136    if config::verbose() {
137        eprintln!("parquet scan with parallel = {parallel:?}");
138    }
139
140    // If we are only interested in the row_index, we take a little special path here.
141    if projection.is_empty() {
142        if let Some(row_index) = row_index {
143            let placeholder =
144                NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
145            return Ok(vec![
146                DataFrame::new_infer_height(vec![placeholder.into_series().into_column()])?
147                    .with_row_index(
148                        row_index.name.clone(),
149                        Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
150                    )?
151                    .select(std::iter::once(row_index.name))?,
152            ]);
153        }
154    }
155
156    use ParallelStrategy as S;
157
158    match parallel {
159        S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
160            store,
161            previous_row_count,
162            row_group_start,
163            row_group_end,
164            pre_slice,
165            file_metadata,
166            schema,
167            row_index,
168            parallel,
169            projection,
170            hive_partition_columns,
171        ),
172        _ => rg_to_dfs_par_over_rg(
173            store,
174            row_group_start,
175            row_group_end,
176            previous_row_count,
177            pre_slice,
178            file_metadata,
179            schema,
180            row_index,
181            projection,
182            hive_partition_columns,
183        ),
184    }
185}
186
187#[allow(clippy::too_many_arguments)]
188// might parallelize over columns
189fn rg_to_dfs_optionally_par_over_columns(
190    store: &mmap::ColumnStore,
191    previous_row_count: &mut IdxSize,
192    row_group_start: usize,
193    row_group_end: usize,
194    slice: (usize, usize),
195    file_metadata: &FileMetadata,
196    schema: &ArrowSchemaRef,
197    row_index: Option<RowIndex>,
198    parallel: ParallelStrategy,
199    projection: &[usize],
200    hive_partition_columns: Option<&[Series]>,
201) -> PolarsResult<Vec<DataFrame>> {
202    let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
203
204    let mut n_rows_processed: usize = (0..row_group_start)
205        .map(|i| file_metadata.row_groups[i].num_rows())
206        .sum();
207    let slice_end = slice.0 + slice.1;
208
209    for rg_idx in row_group_start..row_group_end {
210        let md = &file_metadata.row_groups[rg_idx];
211
212        let rg_slice =
213            split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
214        let current_row_count = md.num_rows() as IdxSize;
215
216        let sorting_map = create_sorting_map(md);
217
218        let f = |column_i: &usize| {
219            let (name, field) = schema.get_at_index(*column_i).unwrap();
220
221            let Some(iter) = md.columns_under_root_iter(name) else {
222                return Ok(Column::full_null(
223                    name.clone(),
224                    rg_slice.1,
225                    &DataType::from_arrow_field(field),
226                ));
227            };
228
229            let part = iter.collect::<Vec<_>>();
230
231            let (mut series, _) = column_idx_to_series(
232                *column_i,
233                part.as_slice(),
234                Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
235                schema,
236                store,
237            )?;
238
239            try_set_sorted_flag(&mut series, *column_i, &sorting_map);
240            Ok(series.into_column())
241        };
242
243        let columns = if let ParallelStrategy::Columns = parallel {
244            POOL.install(|| {
245                projection
246                    .par_iter()
247                    .map(f)
248                    .collect::<PolarsResult<Vec<_>>>()
249            })?
250        } else {
251            projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
252        };
253
254        let mut df = unsafe { DataFrame::new_unchecked(rg_slice.1, columns) };
255        if let Some(rc) = &row_index {
256            unsafe {
257                df.with_row_index_mut(
258                    rc.name.clone(),
259                    Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
260                )
261            };
262        }
263
264        materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
265
266        *previous_row_count = previous_row_count
267            .checked_add(current_row_count)
268            .ok_or_else(|| {
269                polars_err!(
270                    ComputeError: "Parquet file produces more than pow(2, 32) rows; \
271                    consider compiling with polars-bigidx feature (pip install polars[rt64]), \
272                    or set 'streaming'"
273                )
274            })?;
275        dfs.push(df);
276
277        if *previous_row_count as usize >= slice_end {
278            break;
279        }
280    }
281
282    Ok(dfs)
283}
284
285#[allow(clippy::too_many_arguments)]
286// parallelizes over row groups
287fn rg_to_dfs_par_over_rg(
288    store: &mmap::ColumnStore,
289    row_group_start: usize,
290    row_group_end: usize,
291    rows_read: &mut IdxSize,
292    slice: (usize, usize),
293    file_metadata: &FileMetadata,
294    schema: &ArrowSchemaRef,
295    row_index: Option<RowIndex>,
296    projection: &[usize],
297    hive_partition_columns: Option<&[Series]>,
298) -> PolarsResult<Vec<DataFrame>> {
299    // compute the limits per row group and the row count offsets
300    let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
301
302    let mut n_rows_processed: usize = (0..row_group_start)
303        .map(|i| file_metadata.row_groups[i].num_rows())
304        .sum();
305    let slice_end = slice.0 + slice.1;
306
307    // rows_scanned is the number of rows that have been scanned so far when checking for overlap with the slice.
308    // rows_read is the number of rows found to overlap with the slice, and thus the number of rows that will be
309    // read into a dataframe.
310    let mut rows_scanned: IdxSize;
311
312    if row_group_start > 0 {
313        // In the case of async reads, we need to account for the fact that row_group_start may be greater than
314        // zero due to earlier processing.
315        // For details, see: https://github.com/pola-rs/polars/pull/20508#discussion_r1900165649
316        rows_scanned = (0..row_group_start)
317            .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
318            .sum();
319    } else {
320        rows_scanned = 0;
321    }
322
323    for i in row_group_start..row_group_end {
324        let row_count_start = rows_scanned;
325        let rg_md = &file_metadata.row_groups[i];
326        let n_rows_this_file = rg_md.num_rows();
327        let rg_slice =
328            split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
329        rows_scanned = rows_scanned
330            .checked_add(n_rows_this_file as IdxSize)
331            .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
332
333        *rows_read += rg_slice.1 as IdxSize;
334
335        if rg_slice.1 == 0 {
336            continue;
337        }
338
339        row_groups.push((rg_md, rg_slice, row_count_start));
340    }
341
342    let dfs = POOL.install(|| {
343        // Set partitioned fields to prevent quadratic behavior.
344        // Ensure all row groups are partitioned.
345        row_groups
346            .into_par_iter()
347            .map(|(md, slice, row_count_start)| {
348                if slice.1 == 0 {
349                    return Ok(None);
350                }
351                // test we don't read the parquet file if this env var is set
352                #[cfg(debug_assertions)]
353                {
354                    assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
355                }
356
357                let sorting_map = create_sorting_map(md);
358
359                let columns = projection
360                    .iter()
361                    .map(|column_i| {
362                        let (name, field) = schema.get_at_index(*column_i).unwrap();
363
364                        let Some(iter) = md.columns_under_root_iter(name) else {
365                            return Ok(Column::full_null(
366                                name.clone(),
367                                md.num_rows(),
368                                &DataType::from_arrow_field(field),
369                            ));
370                        };
371
372                        let part = iter.collect::<Vec<_>>();
373
374                        let (mut series, _) = column_idx_to_series(
375                            *column_i,
376                            part.as_slice(),
377                            Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
378                            schema,
379                            store,
380                        )?;
381
382                        try_set_sorted_flag(&mut series, *column_i, &sorting_map);
383                        Ok(series.into_column())
384                    })
385                    .collect::<PolarsResult<Vec<_>>>()?;
386
387                let mut df = unsafe { DataFrame::new_unchecked(slice.1, columns) };
388
389                if let Some(rc) = &row_index {
390                    unsafe {
391                        df.with_row_index_mut(
392                            rc.name.clone(),
393                            Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
394                        )
395                    };
396                }
397
398                materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
399
400                Ok(Some(df))
401            })
402            .collect::<PolarsResult<Vec<_>>>()
403    })?;
404    Ok(dfs.into_iter().flatten().collect())
405}
406
407#[allow(clippy::too_many_arguments)]
408pub fn read_parquet<R: MmapBytesReader>(
409    mut reader: R,
410    pre_slice: (usize, usize),
411    projection: Option<&[usize]>,
412    reader_schema: &ArrowSchemaRef,
413    metadata: Option<FileMetadataRef>,
414    mut parallel: ParallelStrategy,
415    row_index: Option<RowIndex>,
416    hive_partition_columns: Option<&[Series]>,
417) -> PolarsResult<DataFrame> {
418    // Fast path.
419    if pre_slice.1 == 0 {
420        return Ok(materialize_empty_df(
421            projection,
422            reader_schema,
423            hive_partition_columns,
424            row_index.as_ref(),
425        ));
426    }
427
428    let file_metadata = metadata
429        .map(Ok)
430        .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
431    let n_row_groups = file_metadata.row_groups.len();
432
433    let materialized_projection = projection
434        .map(Cow::Borrowed)
435        .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
436
437    if ParallelStrategy::Auto == parallel {
438        if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
439        {
440            parallel = ParallelStrategy::RowGroups;
441        } else {
442            parallel = ParallelStrategy::Columns;
443        }
444    }
445
446    if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
447        parallel = ParallelStrategy::None;
448    }
449
450    let reader = ReaderBytes::from(&mut reader);
451    Buffer::with_slice(&reader, |buf| {
452        let store = mmap::ColumnStore::Local(buf);
453        let dfs = rg_to_dfs(
454            &store,
455            &mut 0,
456            0,
457            n_row_groups,
458            pre_slice,
459            &file_metadata,
460            reader_schema,
461            row_index.clone(),
462            parallel,
463            &materialized_projection,
464            hive_partition_columns,
465        )?;
466
467        if dfs.is_empty() {
468            Ok(materialize_empty_df(
469                projection,
470                reader_schema,
471                hive_partition_columns,
472                row_index.as_ref(),
473            ))
474        } else {
475            accumulate_dataframes_vertical(dfs)
476        }
477    })
478}
479
480pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
481    let num_edges = mask.num_edges() as f64;
482    let rg_len = mask.len() as f64;
483
484    // @GB: I did quite some analysis on this.
485    //
486    // Pre-filtered and Post-filtered can both be faster in certain scenarios.
487    //
488    // - Pre-filtered is faster when there is some amount of clustering or
489    // sorting involved or if the number of values selected is small.
490    // - Post-filtering is faster when the predicate selects a somewhat random
491    // elements throughout the row group.
492    //
493    // The following is a heuristic value to try and estimate which one is
494    // faster. Essentially, it sees how many times it needs to switch between
495    // skipping items and collecting items and compares it against the number
496    // of values that it will collect.
497    //
498    // Closer to 0: pre-filtering is probably better.
499    // Closer to 1: post-filtering is probably better.
500    (num_edges / rg_len).clamp(0.0, 1.0)
501}
502
503#[derive(Clone, Copy)]
504pub enum PrefilterMaskSetting {
505    Auto,
506    Pre,
507    Post,
508}
509
510impl PrefilterMaskSetting {
511    pub fn init_from_env() -> Self {
512        std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
513            "auto" => Self::Auto,
514            "pre" => Self::Pre,
515            "post" => Self::Post,
516            _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
517        })
518    }
519
520    pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
521        match self {
522            Self::Auto => {
523                // Prefiltering is only expensive for nested types so we make the cut-off quite
524                // high.
525                let is_nested = dtype.is_nested();
526
527                // We empirically selected these numbers.
528                !is_nested && prefilter_cost <= 0.01
529            },
530            Self::Pre => true,
531            Self::Post => false,
532        }
533    }
534}