polars_io/parquet/read/
read_impl.rs

1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::chunked_array::builder::NullChunkedBuilder;
6use polars_core::prelude::*;
7use polars_core::series::IsSorted;
8use polars_core::utils::accumulate_dataframes_vertical;
9use polars_core::{POOL, config};
10use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
11use rayon::prelude::*;
12
13use super::mmap::mmap_columns;
14use super::utils::materialize_empty_df;
15use super::{ParallelStrategy, mmap};
16use crate::RowIndex;
17use crate::hive::materialize_hive_partitions;
18use crate::mmap::{MmapBytesReader, ReaderBytes};
19use crate::parquet::metadata::FileMetadataRef;
20use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
21use crate::utils::slice::split_slice_at_file;
22
23#[cfg(debug_assertions)]
24// Ensure we get the proper polars types from schema inference
25// This saves unneeded casts.
26fn assert_dtypes(dtype: &ArrowDataType) {
27    use ArrowDataType as D;
28
29    match dtype {
30        // These should all be cast to the BinaryView / Utf8View variants
31        D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
32
33        // These should be cast to Float32
34        D::Float16 => unreachable!(),
35
36        // This should have been converted to a LargeList
37        D::List(_) => unreachable!(),
38
39        // This should have been converted to a LargeList(Struct(_))
40        D::Map(_, _) => unreachable!(),
41
42        // Recursive checks
43        D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
44        D::Extension(ext) => assert_dtypes(&ext.inner),
45        D::LargeList(inner) => assert_dtypes(&inner.dtype),
46        D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
47        D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
48
49        _ => {},
50    }
51}
52
53fn should_copy_sortedness(dtype: &DataType) -> bool {
54    // @NOTE: For now, we are a bit conservative with this.
55    use DataType as D;
56
57    matches!(
58        dtype,
59        D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
60    )
61}
62
63pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {
64    let Some((sorted_col, is_sorted)) = sorting_map.first() else {
65        return;
66    };
67    if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {
68        return;
69    }
70    if config::verbose() {
71        eprintln!(
72            "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
73            series.name()
74        );
75    }
76
77    series.set_sorted_flag(*is_sorted);
78}
79
80pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {
81    let capacity = md.sorting_columns().map_or(0, |s| s.len());
82    let mut sorting_map = Vec::with_capacity(capacity);
83
84    if let Some(sorting_columns) = md.sorting_columns() {
85        for sorting in sorting_columns {
86            sorting_map.push((
87                sorting.column_idx as usize,
88                if sorting.descending {
89                    IsSorted::Descending
90                } else {
91                    IsSorted::Ascending
92                },
93            ))
94        }
95    }
96
97    sorting_map
98}
99
100fn column_idx_to_series(
101    column_i: usize,
102    // The metadata belonging to this column
103    field_md: &[&ColumnChunkMetadata],
104    filter: Option<Filter>,
105    file_schema: &ArrowSchema,
106    store: &mmap::ColumnStore,
107) -> PolarsResult<(Series, Bitmap)> {
108    let field = file_schema.get_at_index(column_i).unwrap().1;
109
110    #[cfg(debug_assertions)]
111    {
112        assert_dtypes(field.dtype())
113    }
114    let columns = mmap_columns(store, field_md);
115    let (arrays, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
116    let series = Series::try_from((field, arrays))?;
117
118    Ok((series, pred_true_mask))
119}
120
121#[allow(clippy::too_many_arguments)]
122fn rg_to_dfs(
123    store: &mmap::ColumnStore,
124    previous_row_count: &mut IdxSize,
125    row_group_start: usize,
126    row_group_end: usize,
127    pre_slice: (usize, usize),
128    file_metadata: &FileMetadata,
129    schema: &ArrowSchemaRef,
130    row_index: Option<RowIndex>,
131    parallel: ParallelStrategy,
132    projection: &[usize],
133    hive_partition_columns: Option<&[Series]>,
134) -> PolarsResult<Vec<DataFrame>> {
135    if config::verbose() {
136        eprintln!("parquet scan with parallel = {parallel:?}");
137    }
138
139    // If we are only interested in the row_index, we take a little special path here.
140    if projection.is_empty() {
141        if let Some(row_index) = row_index {
142            let placeholder =
143                NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
144            return Ok(vec![
145                DataFrame::new(vec![placeholder.into_series().into_column()])?
146                    .with_row_index(
147                        row_index.name.clone(),
148                        Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
149                    )?
150                    .select(std::iter::once(row_index.name))?,
151            ]);
152        }
153    }
154
155    use ParallelStrategy as S;
156
157    match parallel {
158        S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
159            store,
160            previous_row_count,
161            row_group_start,
162            row_group_end,
163            pre_slice,
164            file_metadata,
165            schema,
166            row_index,
167            parallel,
168            projection,
169            hive_partition_columns,
170        ),
171        _ => rg_to_dfs_par_over_rg(
172            store,
173            row_group_start,
174            row_group_end,
175            previous_row_count,
176            pre_slice,
177            file_metadata,
178            schema,
179            row_index,
180            projection,
181            hive_partition_columns,
182        ),
183    }
184}
185
186#[allow(clippy::too_many_arguments)]
187// might parallelize over columns
188fn rg_to_dfs_optionally_par_over_columns(
189    store: &mmap::ColumnStore,
190    previous_row_count: &mut IdxSize,
191    row_group_start: usize,
192    row_group_end: usize,
193    slice: (usize, usize),
194    file_metadata: &FileMetadata,
195    schema: &ArrowSchemaRef,
196    row_index: Option<RowIndex>,
197    parallel: ParallelStrategy,
198    projection: &[usize],
199    hive_partition_columns: Option<&[Series]>,
200) -> PolarsResult<Vec<DataFrame>> {
201    let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
202
203    let mut n_rows_processed: usize = (0..row_group_start)
204        .map(|i| file_metadata.row_groups[i].num_rows())
205        .sum();
206    let slice_end = slice.0 + slice.1;
207
208    for rg_idx in row_group_start..row_group_end {
209        let md = &file_metadata.row_groups[rg_idx];
210
211        let rg_slice =
212            split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
213        let current_row_count = md.num_rows() as IdxSize;
214
215        let sorting_map = create_sorting_map(md);
216
217        let f = |column_i: &usize| {
218            let (name, field) = schema.get_at_index(*column_i).unwrap();
219
220            let Some(iter) = md.columns_under_root_iter(name) else {
221                return Ok(Column::full_null(
222                    name.clone(),
223                    rg_slice.1,
224                    &DataType::from_arrow_field(field),
225                ));
226            };
227
228            let part = iter.collect::<Vec<_>>();
229
230            let (mut series, _) = column_idx_to_series(
231                *column_i,
232                part.as_slice(),
233                Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
234                schema,
235                store,
236            )?;
237
238            try_set_sorted_flag(&mut series, *column_i, &sorting_map);
239            Ok(series.into_column())
240        };
241
242        let columns = if let ParallelStrategy::Columns = parallel {
243            POOL.install(|| {
244                projection
245                    .par_iter()
246                    .map(f)
247                    .collect::<PolarsResult<Vec<_>>>()
248            })?
249        } else {
250            projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
251        };
252
253        let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
254        if let Some(rc) = &row_index {
255            unsafe {
256                df.with_row_index_mut(
257                    rc.name.clone(),
258                    Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
259                )
260            };
261        }
262
263        materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
264
265        *previous_row_count = previous_row_count
266            .checked_add(current_row_count)
267            .ok_or_else(|| {
268                polars_err!(
269                    ComputeError: "Parquet file produces more than pow(2, 32) rows; \
270                    consider compiling with polars-bigidx feature (pip install polars[rt64]), \
271                    or set 'streaming'"
272                )
273            })?;
274        dfs.push(df);
275
276        if *previous_row_count as usize >= slice_end {
277            break;
278        }
279    }
280
281    Ok(dfs)
282}
283
284#[allow(clippy::too_many_arguments)]
285// parallelizes over row groups
286fn rg_to_dfs_par_over_rg(
287    store: &mmap::ColumnStore,
288    row_group_start: usize,
289    row_group_end: usize,
290    rows_read: &mut IdxSize,
291    slice: (usize, usize),
292    file_metadata: &FileMetadata,
293    schema: &ArrowSchemaRef,
294    row_index: Option<RowIndex>,
295    projection: &[usize],
296    hive_partition_columns: Option<&[Series]>,
297) -> PolarsResult<Vec<DataFrame>> {
298    // compute the limits per row group and the row count offsets
299    let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
300
301    let mut n_rows_processed: usize = (0..row_group_start)
302        .map(|i| file_metadata.row_groups[i].num_rows())
303        .sum();
304    let slice_end = slice.0 + slice.1;
305
306    // rows_scanned is the number of rows that have been scanned so far when checking for overlap with the slice.
307    // rows_read is the number of rows found to overlap with the slice, and thus the number of rows that will be
308    // read into a dataframe.
309    let mut rows_scanned: IdxSize;
310
311    if row_group_start > 0 {
312        // In the case of async reads, we need to account for the fact that row_group_start may be greater than
313        // zero due to earlier processing.
314        // For details, see: https://github.com/pola-rs/polars/pull/20508#discussion_r1900165649
315        rows_scanned = (0..row_group_start)
316            .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
317            .sum();
318    } else {
319        rows_scanned = 0;
320    }
321
322    for i in row_group_start..row_group_end {
323        let row_count_start = rows_scanned;
324        let rg_md = &file_metadata.row_groups[i];
325        let n_rows_this_file = rg_md.num_rows();
326        let rg_slice =
327            split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
328        rows_scanned = rows_scanned
329            .checked_add(n_rows_this_file as IdxSize)
330            .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
331
332        *rows_read += rg_slice.1 as IdxSize;
333
334        if rg_slice.1 == 0 {
335            continue;
336        }
337
338        row_groups.push((rg_md, rg_slice, row_count_start));
339    }
340
341    let dfs = POOL.install(|| {
342        // Set partitioned fields to prevent quadratic behavior.
343        // Ensure all row groups are partitioned.
344        row_groups
345            .into_par_iter()
346            .map(|(md, slice, row_count_start)| {
347                if slice.1 == 0 {
348                    return Ok(None);
349                }
350                // test we don't read the parquet file if this env var is set
351                #[cfg(debug_assertions)]
352                {
353                    assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
354                }
355
356                let sorting_map = create_sorting_map(md);
357
358                let columns = projection
359                    .iter()
360                    .map(|column_i| {
361                        let (name, field) = schema.get_at_index(*column_i).unwrap();
362
363                        let Some(iter) = md.columns_under_root_iter(name) else {
364                            return Ok(Column::full_null(
365                                name.clone(),
366                                md.num_rows(),
367                                &DataType::from_arrow_field(field),
368                            ));
369                        };
370
371                        let part = iter.collect::<Vec<_>>();
372
373                        let (mut series, _) = column_idx_to_series(
374                            *column_i,
375                            part.as_slice(),
376                            Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
377                            schema,
378                            store,
379                        )?;
380
381                        try_set_sorted_flag(&mut series, *column_i, &sorting_map);
382                        Ok(series.into_column())
383                    })
384                    .collect::<PolarsResult<Vec<_>>>()?;
385
386                let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
387
388                if let Some(rc) = &row_index {
389                    unsafe {
390                        df.with_row_index_mut(
391                            rc.name.clone(),
392                            Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
393                        )
394                    };
395                }
396
397                materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
398
399                Ok(Some(df))
400            })
401            .collect::<PolarsResult<Vec<_>>>()
402    })?;
403    Ok(dfs.into_iter().flatten().collect())
404}
405
406#[allow(clippy::too_many_arguments)]
407pub fn read_parquet<R: MmapBytesReader>(
408    mut reader: R,
409    pre_slice: (usize, usize),
410    projection: Option<&[usize]>,
411    reader_schema: &ArrowSchemaRef,
412    metadata: Option<FileMetadataRef>,
413    mut parallel: ParallelStrategy,
414    row_index: Option<RowIndex>,
415    hive_partition_columns: Option<&[Series]>,
416) -> PolarsResult<DataFrame> {
417    // Fast path.
418    if pre_slice.1 == 0 {
419        return Ok(materialize_empty_df(
420            projection,
421            reader_schema,
422            hive_partition_columns,
423            row_index.as_ref(),
424        ));
425    }
426
427    let file_metadata = metadata
428        .map(Ok)
429        .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
430    let n_row_groups = file_metadata.row_groups.len();
431
432    let materialized_projection = projection
433        .map(Cow::Borrowed)
434        .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
435
436    if ParallelStrategy::Auto == parallel {
437        if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
438        {
439            parallel = ParallelStrategy::RowGroups;
440        } else {
441            parallel = ParallelStrategy::Columns;
442        }
443    }
444
445    if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
446        parallel = ParallelStrategy::None;
447    }
448
449    let reader = ReaderBytes::from(&mut reader);
450    let store = mmap::ColumnStore::Local(unsafe {
451        std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
452    });
453
454    let dfs = rg_to_dfs(
455        &store,
456        &mut 0,
457        0,
458        n_row_groups,
459        pre_slice,
460        &file_metadata,
461        reader_schema,
462        row_index.clone(),
463        parallel,
464        &materialized_projection,
465        hive_partition_columns,
466    )?;
467
468    if dfs.is_empty() {
469        Ok(materialize_empty_df(
470            projection,
471            reader_schema,
472            hive_partition_columns,
473            row_index.as_ref(),
474        ))
475    } else {
476        accumulate_dataframes_vertical(dfs)
477    }
478}
479
480pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
481    let num_edges = mask.num_edges() as f64;
482    let rg_len = mask.len() as f64;
483
484    // @GB: I did quite some analysis on this.
485    //
486    // Pre-filtered and Post-filtered can both be faster in certain scenarios.
487    //
488    // - Pre-filtered is faster when there is some amount of clustering or
489    // sorting involved or if the number of values selected is small.
490    // - Post-filtering is faster when the predicate selects a somewhat random
491    // elements throughout the row group.
492    //
493    // The following is a heuristic value to try and estimate which one is
494    // faster. Essentially, it sees how many times it needs to switch between
495    // skipping items and collecting items and compares it against the number
496    // of values that it will collect.
497    //
498    // Closer to 0: pre-filtering is probably better.
499    // Closer to 1: post-filtering is probably better.
500    (num_edges / rg_len).clamp(0.0, 1.0)
501}
502
503#[derive(Clone, Copy)]
504pub enum PrefilterMaskSetting {
505    Auto,
506    Pre,
507    Post,
508}
509
510impl PrefilterMaskSetting {
511    pub fn init_from_env() -> Self {
512        std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
513            "auto" => Self::Auto,
514            "pre" => Self::Pre,
515            "post" => Self::Post,
516            _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
517        })
518    }
519
520    pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
521        match self {
522            Self::Auto => {
523                // Prefiltering is only expensive for nested types so we make the cut-off quite
524                // high.
525                let is_nested = dtype.is_nested();
526
527                // We empirically selected these numbers.
528                !is_nested && prefilter_cost <= 0.01
529            },
530            Self::Pre => true,
531            Self::Post => false,
532        }
533    }
534}