polars_io/parquet/read/
read_impl.rs

1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::chunked_array::builder::NullChunkedBuilder;
6use polars_core::prelude::*;
7use polars_core::series::IsSorted;
8use polars_core::utils::accumulate_dataframes_vertical;
9use polars_core::{POOL, config};
10use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
11use rayon::prelude::*;
12
13use super::mmap::mmap_columns;
14use super::utils::materialize_empty_df;
15use super::{ParallelStrategy, mmap};
16use crate::RowIndex;
17use crate::hive::materialize_hive_partitions;
18use crate::mmap::{MmapBytesReader, ReaderBytes};
19use crate::parquet::metadata::FileMetadataRef;
20use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
21use crate::utils::slice::split_slice_at_file;
22
23#[cfg(debug_assertions)]
24// Ensure we get the proper polars types from schema inference
25// This saves unneeded casts.
26fn assert_dtypes(dtype: &ArrowDataType) {
27    use ArrowDataType as D;
28
29    match dtype {
30        // These should all be cast to the BinaryView / Utf8View variants
31        D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
32
33        // These should be cast to Float32
34        D::Float16 => unreachable!(),
35
36        // This should have been converted to a LargeList
37        D::List(_) => unreachable!(),
38
39        // This should have been converted to a LargeList(Struct(_))
40        D::Map(_, _) => unreachable!(),
41
42        // Recursive checks
43        D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
44        D::Extension(ext) => assert_dtypes(&ext.inner),
45        D::LargeList(inner) => assert_dtypes(&inner.dtype),
46        D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
47        D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
48
49        _ => {},
50    }
51}
52
53fn should_copy_sortedness(dtype: &DataType) -> bool {
54    // @NOTE: For now, we are a bit conservative with this.
55    use DataType as D;
56
57    matches!(
58        dtype,
59        D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
60    )
61}
62
63pub fn try_set_sorted_flag(
64    series: &mut Series,
65    col_idx: usize,
66    sorting_map: &PlHashMap<usize, IsSorted>,
67) {
68    if let Some(is_sorted) = sorting_map.get(&col_idx) {
69        if should_copy_sortedness(series.dtype()) {
70            if config::verbose() {
71                eprintln!(
72                    "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
73                    series.name()
74                );
75            }
76
77            series.set_sorted_flag(*is_sorted);
78        }
79    }
80}
81
82pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
83    let capacity = md.sorting_columns().map_or(0, |s| s.len());
84    let mut sorting_map = PlHashMap::with_capacity(capacity);
85
86    if let Some(sorting_columns) = md.sorting_columns() {
87        for sorting in sorting_columns {
88            let prev_value = sorting_map.insert(
89                sorting.column_idx as usize,
90                if sorting.descending {
91                    IsSorted::Descending
92                } else {
93                    IsSorted::Ascending
94                },
95            );
96
97            debug_assert!(prev_value.is_none());
98        }
99    }
100
101    sorting_map
102}
103
104fn column_idx_to_series(
105    column_i: usize,
106    // The metadata belonging to this column
107    field_md: &[&ColumnChunkMetadata],
108    filter: Option<Filter>,
109    file_schema: &ArrowSchema,
110    store: &mmap::ColumnStore,
111) -> PolarsResult<(Series, Bitmap)> {
112    let field = file_schema.get_at_index(column_i).unwrap().1;
113
114    #[cfg(debug_assertions)]
115    {
116        assert_dtypes(field.dtype())
117    }
118    let columns = mmap_columns(store, field_md);
119    let (array, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
120    let series = Series::try_from((field, array))?;
121
122    Ok((series, pred_true_mask))
123}
124
125#[allow(clippy::too_many_arguments)]
126fn rg_to_dfs(
127    store: &mmap::ColumnStore,
128    previous_row_count: &mut IdxSize,
129    row_group_start: usize,
130    row_group_end: usize,
131    pre_slice: (usize, usize),
132    file_metadata: &FileMetadata,
133    schema: &ArrowSchemaRef,
134    row_index: Option<RowIndex>,
135    parallel: ParallelStrategy,
136    projection: &[usize],
137    hive_partition_columns: Option<&[Series]>,
138) -> PolarsResult<Vec<DataFrame>> {
139    if config::verbose() {
140        eprintln!("parquet scan with parallel = {parallel:?}");
141    }
142
143    // If we are only interested in the row_index, we take a little special path here.
144    if projection.is_empty() {
145        if let Some(row_index) = row_index {
146            let placeholder =
147                NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
148            return Ok(vec![
149                DataFrame::new(vec![placeholder.into_series().into_column()])?
150                    .with_row_index(
151                        row_index.name.clone(),
152                        Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
153                    )?
154                    .select(std::iter::once(row_index.name))?,
155            ]);
156        }
157    }
158
159    use ParallelStrategy as S;
160
161    match parallel {
162        S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
163            store,
164            previous_row_count,
165            row_group_start,
166            row_group_end,
167            pre_slice,
168            file_metadata,
169            schema,
170            row_index,
171            parallel,
172            projection,
173            hive_partition_columns,
174        ),
175        _ => rg_to_dfs_par_over_rg(
176            store,
177            row_group_start,
178            row_group_end,
179            previous_row_count,
180            pre_slice,
181            file_metadata,
182            schema,
183            row_index,
184            projection,
185            hive_partition_columns,
186        ),
187    }
188}
189
190#[allow(clippy::too_many_arguments)]
191// might parallelize over columns
192fn rg_to_dfs_optionally_par_over_columns(
193    store: &mmap::ColumnStore,
194    previous_row_count: &mut IdxSize,
195    row_group_start: usize,
196    row_group_end: usize,
197    slice: (usize, usize),
198    file_metadata: &FileMetadata,
199    schema: &ArrowSchemaRef,
200    row_index: Option<RowIndex>,
201    parallel: ParallelStrategy,
202    projection: &[usize],
203    hive_partition_columns: Option<&[Series]>,
204) -> PolarsResult<Vec<DataFrame>> {
205    let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
206
207    let mut n_rows_processed: usize = (0..row_group_start)
208        .map(|i| file_metadata.row_groups[i].num_rows())
209        .sum();
210    let slice_end = slice.0 + slice.1;
211
212    for rg_idx in row_group_start..row_group_end {
213        let md = &file_metadata.row_groups[rg_idx];
214
215        let rg_slice =
216            split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
217        let current_row_count = md.num_rows() as IdxSize;
218
219        let sorting_map = create_sorting_map(md);
220
221        let f = |column_i: &usize| {
222            let (name, field) = schema.get_at_index(*column_i).unwrap();
223
224            let Some(iter) = md.columns_under_root_iter(name) else {
225                return Ok(Column::full_null(
226                    name.clone(),
227                    rg_slice.1,
228                    &DataType::from_arrow_field(field),
229                ));
230            };
231
232            let part = iter.collect::<Vec<_>>();
233
234            let (mut series, _) = column_idx_to_series(
235                *column_i,
236                part.as_slice(),
237                Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
238                schema,
239                store,
240            )?;
241
242            try_set_sorted_flag(&mut series, *column_i, &sorting_map);
243            Ok(series.into_column())
244        };
245
246        let columns = if let ParallelStrategy::Columns = parallel {
247            POOL.install(|| {
248                projection
249                    .par_iter()
250                    .map(f)
251                    .collect::<PolarsResult<Vec<_>>>()
252            })?
253        } else {
254            projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
255        };
256
257        let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
258        if let Some(rc) = &row_index {
259            unsafe {
260                df.with_row_index_mut(
261                    rc.name.clone(),
262                    Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
263                )
264            };
265        }
266
267        materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
268
269        *previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
270            polars_err!(
271                ComputeError: "Parquet file produces more than pow(2, 32) rows; \
272                consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
273                or set 'streaming'"
274            ),
275        )?;
276        dfs.push(df);
277
278        if *previous_row_count as usize >= slice_end {
279            break;
280        }
281    }
282
283    Ok(dfs)
284}
285
286#[allow(clippy::too_many_arguments)]
287// parallelizes over row groups
288fn rg_to_dfs_par_over_rg(
289    store: &mmap::ColumnStore,
290    row_group_start: usize,
291    row_group_end: usize,
292    rows_read: &mut IdxSize,
293    slice: (usize, usize),
294    file_metadata: &FileMetadata,
295    schema: &ArrowSchemaRef,
296    row_index: Option<RowIndex>,
297    projection: &[usize],
298    hive_partition_columns: Option<&[Series]>,
299) -> PolarsResult<Vec<DataFrame>> {
300    // compute the limits per row group and the row count offsets
301    let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
302
303    let mut n_rows_processed: usize = (0..row_group_start)
304        .map(|i| file_metadata.row_groups[i].num_rows())
305        .sum();
306    let slice_end = slice.0 + slice.1;
307
308    // rows_scanned is the number of rows that have been scanned so far when checking for overlap with the slice.
309    // rows_read is the number of rows found to overlap with the slice, and thus the number of rows that will be
310    // read into a dataframe.
311    let mut rows_scanned: IdxSize;
312
313    if row_group_start > 0 {
314        // In the case of async reads, we need to account for the fact that row_group_start may be greater than
315        // zero due to earlier processing.
316        // For details, see: https://github.com/pola-rs/polars/pull/20508#discussion_r1900165649
317        rows_scanned = (0..row_group_start)
318            .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
319            .sum();
320    } else {
321        rows_scanned = 0;
322    }
323
324    for i in row_group_start..row_group_end {
325        let row_count_start = rows_scanned;
326        let rg_md = &file_metadata.row_groups[i];
327        let n_rows_this_file = rg_md.num_rows();
328        let rg_slice =
329            split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
330        rows_scanned = rows_scanned
331            .checked_add(n_rows_this_file as IdxSize)
332            .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
333
334        *rows_read += rg_slice.1 as IdxSize;
335
336        if rg_slice.1 == 0 {
337            continue;
338        }
339
340        row_groups.push((rg_md, rg_slice, row_count_start));
341    }
342
343    let dfs = POOL.install(|| {
344        // Set partitioned fields to prevent quadratic behavior.
345        // Ensure all row groups are partitioned.
346        row_groups
347            .into_par_iter()
348            .map(|(md, slice, row_count_start)| {
349                if slice.1 == 0 {
350                    return Ok(None);
351                }
352                // test we don't read the parquet file if this env var is set
353                #[cfg(debug_assertions)]
354                {
355                    assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
356                }
357
358                let sorting_map = create_sorting_map(md);
359
360                let columns = projection
361                    .iter()
362                    .map(|column_i| {
363                        let (name, field) = schema.get_at_index(*column_i).unwrap();
364
365                        let Some(iter) = md.columns_under_root_iter(name) else {
366                            return Ok(Column::full_null(
367                                name.clone(),
368                                md.num_rows(),
369                                &DataType::from_arrow_field(field),
370                            ));
371                        };
372
373                        let part = iter.collect::<Vec<_>>();
374
375                        let (mut series, _) = column_idx_to_series(
376                            *column_i,
377                            part.as_slice(),
378                            Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
379                            schema,
380                            store,
381                        )?;
382
383                        try_set_sorted_flag(&mut series, *column_i, &sorting_map);
384                        Ok(series.into_column())
385                    })
386                    .collect::<PolarsResult<Vec<_>>>()?;
387
388                let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
389
390                if let Some(rc) = &row_index {
391                    unsafe {
392                        df.with_row_index_mut(
393                            rc.name.clone(),
394                            Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
395                        )
396                    };
397                }
398
399                materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
400
401                Ok(Some(df))
402            })
403            .collect::<PolarsResult<Vec<_>>>()
404    })?;
405    Ok(dfs.into_iter().flatten().collect())
406}
407
408#[allow(clippy::too_many_arguments)]
409pub fn read_parquet<R: MmapBytesReader>(
410    mut reader: R,
411    pre_slice: (usize, usize),
412    projection: Option<&[usize]>,
413    reader_schema: &ArrowSchemaRef,
414    metadata: Option<FileMetadataRef>,
415    mut parallel: ParallelStrategy,
416    row_index: Option<RowIndex>,
417    hive_partition_columns: Option<&[Series]>,
418) -> PolarsResult<DataFrame> {
419    // Fast path.
420    if pre_slice.1 == 0 {
421        return Ok(materialize_empty_df(
422            projection,
423            reader_schema,
424            hive_partition_columns,
425            row_index.as_ref(),
426        ));
427    }
428
429    let file_metadata = metadata
430        .map(Ok)
431        .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
432    let n_row_groups = file_metadata.row_groups.len();
433
434    // if there are multiple row groups and categorical data
435    // we need a string cache
436    // we keep it alive until the end of the function
437    let _sc = if n_row_groups > 1 {
438        #[cfg(feature = "dtype-categorical")]
439        {
440            Some(polars_core::StringCacheHolder::hold())
441        }
442        #[cfg(not(feature = "dtype-categorical"))]
443        {
444            Some(0u8)
445        }
446    } else {
447        None
448    };
449
450    let materialized_projection = projection
451        .map(Cow::Borrowed)
452        .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
453
454    if ParallelStrategy::Auto == parallel {
455        if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
456        {
457            parallel = ParallelStrategy::RowGroups;
458        } else {
459            parallel = ParallelStrategy::Columns;
460        }
461    }
462
463    if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
464        parallel = ParallelStrategy::None;
465    }
466
467    let reader = ReaderBytes::from(&mut reader);
468    let store = mmap::ColumnStore::Local(unsafe {
469        std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
470    });
471
472    let dfs = rg_to_dfs(
473        &store,
474        &mut 0,
475        0,
476        n_row_groups,
477        pre_slice,
478        &file_metadata,
479        reader_schema,
480        row_index.clone(),
481        parallel,
482        &materialized_projection,
483        hive_partition_columns,
484    )?;
485
486    if dfs.is_empty() {
487        Ok(materialize_empty_df(
488            projection,
489            reader_schema,
490            hive_partition_columns,
491            row_index.as_ref(),
492        ))
493    } else {
494        accumulate_dataframes_vertical(dfs)
495    }
496}
497
498pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
499    let num_edges = mask.num_edges() as f64;
500    let rg_len = mask.len() as f64;
501
502    // @GB: I did quite some analysis on this.
503    //
504    // Pre-filtered and Post-filtered can both be faster in certain scenarios.
505    //
506    // - Pre-filtered is faster when there is some amount of clustering or
507    // sorting involved or if the number of values selected is small.
508    // - Post-filtering is faster when the predicate selects a somewhat random
509    // elements throughout the row group.
510    //
511    // The following is a heuristic value to try and estimate which one is
512    // faster. Essentially, it sees how many times it needs to switch between
513    // skipping items and collecting items and compares it against the number
514    // of values that it will collect.
515    //
516    // Closer to 0: pre-filtering is probably better.
517    // Closer to 1: post-filtering is probably better.
518    (num_edges / rg_len).clamp(0.0, 1.0)
519}
520
521#[derive(Clone, Copy)]
522pub enum PrefilterMaskSetting {
523    Auto,
524    Pre,
525    Post,
526}
527
528impl PrefilterMaskSetting {
529    pub fn init_from_env() -> Self {
530        std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
531            "auto" => Self::Auto,
532            "pre" => Self::Pre,
533            "post" => Self::Post,
534            _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
535        })
536    }
537
538    pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
539        match self {
540            Self::Auto => {
541                // Prefiltering is only expensive for nested types so we make the cut-off quite
542                // high.
543                let is_nested = dtype.is_nested();
544
545                // We empirically selected these numbers.
546                !is_nested && prefilter_cost <= 0.01
547            },
548            Self::Pre => true,
549            Self::Post => false,
550        }
551    }
552}