polars_io/parquet/read/
predicates.rs

1use polars_core::config;
2use polars_core::prelude::*;
3use polars_parquet::read::RowGroupMetadata;
4use polars_parquet::read::statistics::{
5    ArrowColumnStatisticsArrays, Statistics, deserialize, deserialize_all,
6};
7
8use crate::predicates::{BatchStats, ColumnStats, ScanIOPredicate};
9
10/// Collect the statistics in a row-group
11pub fn collect_statistics_with_live_columns(
12    row_groups: &[RowGroupMetadata],
13    schema: &ArrowSchema,
14    live_columns: &PlIndexSet<PlSmallStr>,
15) -> PolarsResult<Vec<Option<ArrowColumnStatisticsArrays>>> {
16    if row_groups.is_empty() {
17        return Ok((0..live_columns.len()).map(|_| None).collect());
18    }
19
20    let md = &row_groups[0];
21    live_columns
22        .iter()
23        .map(|c| {
24            let field = schema.get(c).unwrap();
25
26            // This can be None in the allow_missing_columns case.
27            let Some(idxs) = md.columns_idxs_under_root_iter(&field.name) else {
28                return Ok(None);
29            };
30
31            // 0 is possible for possible for empty structs.
32            //
33            // 2+ is for structs. We don't support reading nested statistics for now. It does not
34            // really make any sense at the moment with how we structure statistics.
35            if idxs.is_empty() || idxs.len() > 1 {
36                return Ok(None);
37            }
38
39            let idx = idxs[0];
40            Ok(deserialize_all(field, row_groups, idx)?)
41        })
42        .collect::<PolarsResult<Vec<_>>>()
43}
44
45/// Collect the statistics in a row-group
46pub fn collect_statistics(
47    md: &RowGroupMetadata,
48    schema: &ArrowSchema,
49) -> PolarsResult<Option<BatchStats>> {
50    // TODO! fix this performance. This is a full sequential scan.
51    let stats = schema
52        .iter_values()
53        .map(|field| {
54            let default_fn = || ColumnStats::new(field.into(), None, None, None);
55
56            // This can be None in the allow_missing_columns case.
57            let Some(mut iter) = md.columns_under_root_iter(&field.name) else {
58                return Ok(default_fn());
59            };
60
61            let statistics = deserialize(field, &mut iter)?;
62            assert!(iter.next().is_none());
63
64            // We don't support reading nested statistics for now. It does not really make any
65            // sense at the moment with how we structure statistics.
66            let Some(Statistics::Column(stats)) = statistics else {
67                return Ok(default_fn());
68            };
69
70            let stats = stats.into_arrow()?;
71
72            let null_count = stats
73                .null_count
74                .map(|x| Scalar::from(x).into_series(PlSmallStr::EMPTY));
75            let min_value = stats
76                .min_value
77                .map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());
78            let max_value = stats
79                .max_value
80                .map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());
81
82            Ok(ColumnStats::new(
83                field.into(),
84                null_count,
85                min_value,
86                max_value,
87            ))
88        })
89        .collect::<PolarsResult<Vec<_>>>()?;
90
91    if stats.is_empty() {
92        return Ok(None);
93    }
94
95    Ok(Some(BatchStats::new(
96        Arc::new(Schema::from_arrow_schema(schema)),
97        stats,
98        Some(md.num_rows()),
99    )))
100}
101
102pub fn read_this_row_group(
103    predicate: Option<&ScanIOPredicate>,
104    md: &RowGroupMetadata,
105    schema: &ArrowSchema,
106) -> PolarsResult<bool> {
107    if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
108        return Ok(true);
109    }
110
111    let mut should_read = true;
112
113    if let Some(predicate) = predicate {
114        if let Some(pred) = &predicate.skip_batch_predicate {
115            if let Some(stats) = collect_statistics(md, schema)? {
116                let stats = PlIndexMap::from_iter(stats.column_stats().iter().map(|col| {
117                    (
118                        col.field_name().clone(),
119                        crate::predicates::ColumnStatistics {
120                            dtype: stats.schema().get(col.field_name()).unwrap().clone(),
121                            min: col
122                                .to_min()
123                                .map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()),
124                            max: col
125                                .to_max()
126                                .map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()),
127                            null_count: col.null_count().map(|nc| nc as IdxSize),
128                        },
129                    )
130                }));
131                let pred_result = pred.can_skip_batch(
132                    md.num_rows() as IdxSize,
133                    predicate.live_columns.as_ref(),
134                    stats,
135                );
136
137                // a parquet file may not have statistics of all columns
138                match pred_result {
139                    Err(PolarsError::ColumnNotFound(errstr)) => {
140                        return Err(PolarsError::ColumnNotFound(errstr));
141                    },
142                    Ok(true) => should_read = false,
143                    _ => {},
144                }
145            }
146        } else if let Some(pred) = predicate.predicate.as_stats_evaluator() {
147            if let Some(stats) = collect_statistics(md, schema)? {
148                let pred_result = pred.should_read(&stats);
149
150                // a parquet file may not have statistics of all columns
151                match pred_result {
152                    Err(PolarsError::ColumnNotFound(errstr)) => {
153                        return Err(PolarsError::ColumnNotFound(errstr));
154                    },
155                    Ok(false) => should_read = false,
156                    _ => {},
157                }
158            }
159        }
160
161        if config::verbose() {
162            if should_read {
163                eprintln!(
164                    "parquet row group must be read, statistics not sufficient for predicate."
165                );
166            } else {
167                eprintln!(
168                    "parquet row group can be skipped, the statistics were sufficient to apply the predicate."
169                );
170            }
171        }
172    }
173
174    Ok(should_read)
175}