Skip to main content

polars_io/parquet/read/
read_impl.rs

1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_buffer::Buffer;
6use polars_core::chunked_array::builder::NullChunkedBuilder;
7use polars_core::config;
8use polars_core::prelude::*;
9use polars_core::runtime::RAYON;
10use polars_core::series::IsSorted;
11use polars_core::utils::accumulate_dataframes_vertical;
12use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
13use rayon::prelude::*;
14
15use super::mmap::mmap_columns;
16use super::utils::materialize_empty_df;
17use super::{ParallelStrategy, mmap};
18use crate::RowIndex;
19use crate::hive::materialize_hive_partitions;
20use crate::mmap::{MmapBytesReader, ReaderBytes};
21use crate::parquet::metadata::FileMetadataRef;
22use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
23use crate::utils::slice::split_slice_at_file;
24
25#[cfg(debug_assertions)]
26// Ensure we get the proper polars types from schema inference
27// This saves unneeded casts.
28fn assert_dtypes(dtype: &ArrowDataType) {
29    use ArrowDataType as D;
30
31    match dtype {
32        // These should all be cast to the BinaryView / Utf8View variants
33        D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
34
35        // These should be cast to Float32
36        D::Float16 => unreachable!(),
37
38        // This should have been converted to a LargeList
39        D::List(_) => unreachable!(),
40
41        // This should have been converted to a LargeList(Struct(_))
42        D::Map(_, _) => unreachable!(),
43
44        // Recursive checks
45        D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
46        D::Extension(ext) => assert_dtypes(&ext.inner),
47        D::LargeList(inner) => assert_dtypes(&inner.dtype),
48        D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
49        D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
50
51        _ => {},
52    }
53}
54
55fn should_copy_sortedness(dtype: &DataType) -> bool {
56    // @NOTE: For now, we are a bit conservative with this.
57    use DataType as D;
58
59    matches!(
60        dtype,
61        D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
62    )
63}
64
65pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {
66    let Some((sorted_col, is_sorted)) = sorting_map.first() else {
67        return;
68    };
69    if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {
70        return;
71    }
72    if config::verbose() {
73        eprintln!(
74            "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
75            series.name()
76        );
77    }
78
79    series.set_sorted_flag(*is_sorted);
80}
81
82pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {
83    let capacity = md.sorting_columns().map_or(0, |s| s.len());
84    let mut sorting_map = Vec::with_capacity(capacity);
85
86    if let Some(sorting_columns) = md.sorting_columns() {
87        for sorting in sorting_columns {
88            sorting_map.push((
89                sorting.column_idx as usize,
90                if sorting.descending {
91                    IsSorted::Descending
92                } else {
93                    IsSorted::Ascending
94                },
95            ))
96        }
97    }
98
99    sorting_map
100}
101
102fn column_idx_to_series(
103    column_i: usize,
104    // The metadata belonging to this column
105    field_md: &[&ColumnChunkMetadata],
106    filter: Option<Filter>,
107    file_schema: &ArrowSchema,
108    store: &mmap::ColumnStore,
109) -> PolarsResult<(Series, Bitmap)> {
110    let field = file_schema.get_at_index(column_i).unwrap().1;
111
112    #[cfg(debug_assertions)]
113    {
114        assert_dtypes(field.dtype())
115    }
116    let columns = mmap_columns(store, field_md);
117    let (arrays, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
118    let series = Series::try_from((field, arrays))?;
119
120    Ok((series, pred_true_mask))
121}
122
123#[allow(clippy::too_many_arguments)]
124fn rg_to_dfs(
125    store: &mmap::ColumnStore,
126    previous_row_count: &mut IdxSize,
127    row_group_start: usize,
128    row_group_end: usize,
129    pre_slice: (usize, usize),
130    file_metadata: &FileMetadata,
131    schema: &ArrowSchemaRef,
132    row_index: Option<RowIndex>,
133    parallel: ParallelStrategy,
134    projection: &[usize],
135    hive_partition_columns: Option<&[Series]>,
136) -> PolarsResult<Vec<DataFrame>> {
137    if config::verbose() {
138        eprintln!("parquet scan with parallel = {parallel:?}");
139    }
140
141    // If we are only interested in the row_index, we take a little special path here.
142    if projection.is_empty() {
143        if let Some(row_index) = row_index {
144            let placeholder =
145                NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
146            return Ok(vec![
147                DataFrame::new_infer_height(vec![placeholder.into_series().into_column()])?
148                    .with_row_index(
149                        row_index.name.clone(),
150                        Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
151                    )?
152                    .select(std::iter::once(row_index.name))?,
153            ]);
154        }
155    }
156
157    use ParallelStrategy as S;
158
159    match parallel {
160        S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
161            store,
162            previous_row_count,
163            row_group_start,
164            row_group_end,
165            pre_slice,
166            file_metadata,
167            schema,
168            row_index,
169            parallel,
170            projection,
171            hive_partition_columns,
172        ),
173        _ => rg_to_dfs_par_over_rg(
174            store,
175            row_group_start,
176            row_group_end,
177            previous_row_count,
178            pre_slice,
179            file_metadata,
180            schema,
181            row_index,
182            projection,
183            hive_partition_columns,
184        ),
185    }
186}
187
188#[allow(clippy::too_many_arguments)]
189// might parallelize over columns
190fn rg_to_dfs_optionally_par_over_columns(
191    store: &mmap::ColumnStore,
192    previous_row_count: &mut IdxSize,
193    row_group_start: usize,
194    row_group_end: usize,
195    slice: (usize, usize),
196    file_metadata: &FileMetadata,
197    schema: &ArrowSchemaRef,
198    row_index: Option<RowIndex>,
199    parallel: ParallelStrategy,
200    projection: &[usize],
201    hive_partition_columns: Option<&[Series]>,
202) -> PolarsResult<Vec<DataFrame>> {
203    let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
204
205    let mut n_rows_processed: usize = (0..row_group_start)
206        .map(|i| file_metadata.row_groups[i].num_rows())
207        .sum();
208    let slice_end = slice.0 + slice.1;
209
210    for rg_idx in row_group_start..row_group_end {
211        let md = &file_metadata.row_groups[rg_idx];
212
213        let rg_slice =
214            split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
215        let current_row_count = md.num_rows() as IdxSize;
216
217        let sorting_map = create_sorting_map(md);
218
219        let f = |column_i: &usize| {
220            let (name, field) = schema.get_at_index(*column_i).unwrap();
221
222            let Some(iter) = md.columns_under_root_iter(name) else {
223                return Ok(Column::full_null(
224                    name.clone(),
225                    rg_slice.1,
226                    &DataType::from_arrow_field(field),
227                ));
228            };
229
230            let part = iter.collect::<Vec<_>>();
231
232            let (mut series, _) = column_idx_to_series(
233                *column_i,
234                part.as_slice(),
235                Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
236                schema,
237                store,
238            )?;
239
240            try_set_sorted_flag(&mut series, *column_i, &sorting_map);
241            Ok(series.into_column())
242        };
243
244        let columns = if let ParallelStrategy::Columns = parallel {
245            RAYON.install(|| {
246                projection
247                    .par_iter()
248                    .map(f)
249                    .collect::<PolarsResult<Vec<_>>>()
250            })?
251        } else {
252            projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
253        };
254
255        let mut df = unsafe { DataFrame::new_unchecked(rg_slice.1, columns) };
256        if let Some(rc) = &row_index {
257            unsafe {
258                df.with_row_index_mut(
259                    rc.name.clone(),
260                    Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
261                )
262            };
263        }
264
265        materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
266
267        *previous_row_count = previous_row_count
268            .checked_add(current_row_count)
269            .ok_or_else(|| {
270                polars_err!(
271                    ComputeError: "Parquet file produces more than pow(2, 32) rows; \
272                    consider compiling with polars-bigidx feature (pip install polars[rt64]), \
273                    or set 'streaming'"
274                )
275            })?;
276        dfs.push(df);
277
278        if *previous_row_count as usize >= slice_end {
279            break;
280        }
281    }
282
283    Ok(dfs)
284}
285
286#[allow(clippy::too_many_arguments)]
287// parallelizes over row groups
288fn rg_to_dfs_par_over_rg(
289    store: &mmap::ColumnStore,
290    row_group_start: usize,
291    row_group_end: usize,
292    rows_read: &mut IdxSize,
293    slice: (usize, usize),
294    file_metadata: &FileMetadata,
295    schema: &ArrowSchemaRef,
296    row_index: Option<RowIndex>,
297    projection: &[usize],
298    hive_partition_columns: Option<&[Series]>,
299) -> PolarsResult<Vec<DataFrame>> {
300    // compute the limits per row group and the row count offsets
301    let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
302
303    let mut n_rows_processed: usize = (0..row_group_start)
304        .map(|i| file_metadata.row_groups[i].num_rows())
305        .sum();
306    let slice_end = slice.0 + slice.1;
307
308    // rows_scanned is the number of rows that have been scanned so far when checking for overlap with the slice.
309    // rows_read is the number of rows found to overlap with the slice, and thus the number of rows that will be
310    // read into a dataframe.
311    let mut rows_scanned: IdxSize;
312
313    if row_group_start > 0 {
314        // In the case of async reads, we need to account for the fact that row_group_start may be greater than
315        // zero due to earlier processing.
316        // For details, see: https://github.com/pola-rs/polars/pull/20508#discussion_r1900165649
317        rows_scanned = (0..row_group_start)
318            .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
319            .sum();
320    } else {
321        rows_scanned = 0;
322    }
323
324    for i in row_group_start..row_group_end {
325        let row_count_start = rows_scanned;
326        let rg_md = &file_metadata.row_groups[i];
327        let n_rows_this_file = rg_md.num_rows();
328        let rg_slice =
329            split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
330        rows_scanned = rows_scanned
331            .checked_add(n_rows_this_file as IdxSize)
332            .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
333
334        *rows_read += rg_slice.1 as IdxSize;
335
336        if rg_slice.1 == 0 {
337            continue;
338        }
339
340        row_groups.push((rg_md, rg_slice, row_count_start));
341    }
342
343    let dfs = RAYON.install(|| {
344        // Set partitioned fields to prevent quadratic behavior.
345        // Ensure all row groups are partitioned.
346        row_groups
347            .into_par_iter()
348            .map(|(md, slice, row_count_start)| {
349                if slice.1 == 0 {
350                    return Ok(None);
351                }
352                // test we don't read the parquet file if this env var is set
353                #[cfg(debug_assertions)]
354                {
355                    assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
356                }
357
358                let sorting_map = create_sorting_map(md);
359
360                let columns = projection
361                    .iter()
362                    .map(|column_i| {
363                        let (name, field) = schema.get_at_index(*column_i).unwrap();
364
365                        let Some(iter) = md.columns_under_root_iter(name) else {
366                            return Ok(Column::full_null(
367                                name.clone(),
368                                md.num_rows(),
369                                &DataType::from_arrow_field(field),
370                            ));
371                        };
372
373                        let part = iter.collect::<Vec<_>>();
374
375                        let (mut series, _) = column_idx_to_series(
376                            *column_i,
377                            part.as_slice(),
378                            Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
379                            schema,
380                            store,
381                        )?;
382
383                        try_set_sorted_flag(&mut series, *column_i, &sorting_map);
384                        Ok(series.into_column())
385                    })
386                    .collect::<PolarsResult<Vec<_>>>()?;
387
388                let mut df = unsafe { DataFrame::new_unchecked(slice.1, columns) };
389
390                if let Some(rc) = &row_index {
391                    unsafe {
392                        df.with_row_index_mut(
393                            rc.name.clone(),
394                            Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
395                        )
396                    };
397                }
398
399                materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
400
401                Ok(Some(df))
402            })
403            .collect::<PolarsResult<Vec<_>>>()
404    })?;
405    Ok(dfs.into_iter().flatten().collect())
406}
407
408#[allow(clippy::too_many_arguments)]
409pub fn read_parquet<R: MmapBytesReader>(
410    mut reader: R,
411    pre_slice: (usize, usize),
412    projection: Option<&[usize]>,
413    reader_schema: &ArrowSchemaRef,
414    metadata: Option<FileMetadataRef>,
415    mut parallel: ParallelStrategy,
416    row_index: Option<RowIndex>,
417    hive_partition_columns: Option<&[Series]>,
418) -> PolarsResult<DataFrame> {
419    // Fast path.
420    if pre_slice.1 == 0 {
421        return Ok(materialize_empty_df(
422            projection,
423            reader_schema,
424            hive_partition_columns,
425            row_index.as_ref(),
426        ));
427    }
428
429    let file_metadata = metadata
430        .map(Ok)
431        .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
432    let n_row_groups = file_metadata.row_groups.len();
433
434    let materialized_projection = projection
435        .map(Cow::Borrowed)
436        .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
437
438    if ParallelStrategy::Auto == parallel {
439        if n_row_groups > materialized_projection.len()
440            || n_row_groups > RAYON.current_num_threads()
441        {
442            parallel = ParallelStrategy::RowGroups;
443        } else {
444            parallel = ParallelStrategy::Columns;
445        }
446    }
447
448    if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
449        parallel = ParallelStrategy::None;
450    }
451
452    let reader = ReaderBytes::from(&mut reader);
453    Buffer::with_slice(&reader, |buf| {
454        let store = mmap::ColumnStore::Local(buf);
455        let dfs = rg_to_dfs(
456            &store,
457            &mut 0,
458            0,
459            n_row_groups,
460            pre_slice,
461            &file_metadata,
462            reader_schema,
463            row_index.clone(),
464            parallel,
465            &materialized_projection,
466            hive_partition_columns,
467        )?;
468
469        if dfs.is_empty() {
470            Ok(materialize_empty_df(
471                projection,
472                reader_schema,
473                hive_partition_columns,
474                row_index.as_ref(),
475            ))
476        } else {
477            accumulate_dataframes_vertical(dfs)
478        }
479    })
480}
481
482pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
483    let num_edges = mask.num_edges() as f64;
484    let rg_len = mask.len() as f64;
485
486    // @GB: I did quite some analysis on this.
487    //
488    // Pre-filtered and Post-filtered can both be faster in certain scenarios.
489    //
490    // - Pre-filtered is faster when there is some amount of clustering or
491    // sorting involved or if the number of values selected is small.
492    // - Post-filtering is faster when the predicate selects a somewhat random
493    // elements throughout the row group.
494    //
495    // The following is a heuristic value to try and estimate which one is
496    // faster. Essentially, it sees how many times it needs to switch between
497    // skipping items and collecting items and compares it against the number
498    // of values that it will collect.
499    //
500    // Closer to 0: pre-filtering is probably better.
501    // Closer to 1: post-filtering is probably better.
502    (num_edges / rg_len).clamp(0.0, 1.0)
503}
504
505#[derive(Clone, Copy)]
506pub enum PrefilterMaskSetting {
507    Auto,
508    Pre,
509    Post,
510}
511
512impl PrefilterMaskSetting {
513    pub fn init_from_env() -> Self {
514        std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
515            "auto" => Self::Auto,
516            "pre" => Self::Pre,
517            "post" => Self::Post,
518            _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
519        })
520    }
521
522    pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
523        match self {
524            Self::Auto => {
525                // Prefiltering is only expensive for nested types so we make the cut-off quite
526                // high.
527                let is_nested = dtype.is_nested();
528
529                // We empirically selected these numbers.
530                !is_nested && prefilter_cost <= 0.01
531            },
532            Self::Pre => true,
533            Self::Post => false,
534        }
535    }
536}