polars_io/parquet/read/
predicates.rs
1use polars_core::config;
2use polars_core::prelude::*;
3use polars_parquet::read::RowGroupMetadata;
4use polars_parquet::read::statistics::{
5 ArrowColumnStatisticsArrays, Statistics, deserialize, deserialize_all,
6};
7
8use crate::predicates::{BatchStats, ColumnStats, ScanIOPredicate};
9
10pub fn collect_statistics_with_live_columns(
12 row_groups: &[RowGroupMetadata],
13 schema: &ArrowSchema,
14 live_columns: &PlIndexSet<PlSmallStr>,
15) -> PolarsResult<Vec<Option<ArrowColumnStatisticsArrays>>> {
16 if row_groups.is_empty() {
17 return Ok((0..live_columns.len()).map(|_| None).collect());
18 }
19
20 let md = &row_groups[0];
21 live_columns
22 .iter()
23 .map(|c| {
24 let field = schema.get(c).unwrap();
25
26 let Some(idxs) = md.columns_idxs_under_root_iter(&field.name) else {
28 return Ok(None);
29 };
30
31 if idxs.is_empty() || idxs.len() > 1 {
36 return Ok(None);
37 }
38
39 let idx = idxs[0];
40 Ok(deserialize_all(field, row_groups, idx)?)
41 })
42 .collect::<PolarsResult<Vec<_>>>()
43}
44
45pub fn collect_statistics(
47 md: &RowGroupMetadata,
48 schema: &ArrowSchema,
49) -> PolarsResult<Option<BatchStats>> {
50 let stats = schema
52 .iter_values()
53 .map(|field| {
54 let default_fn = || ColumnStats::new(field.into(), None, None, None);
55
56 let Some(mut iter) = md.columns_under_root_iter(&field.name) else {
58 return Ok(default_fn());
59 };
60
61 let statistics = deserialize(field, &mut iter)?;
62 assert!(iter.next().is_none());
63
64 let Some(Statistics::Column(stats)) = statistics else {
67 return Ok(default_fn());
68 };
69
70 let stats = stats.into_arrow()?;
71
72 let null_count = stats
73 .null_count
74 .map(|x| Scalar::from(x).into_series(PlSmallStr::EMPTY));
75 let min_value = stats
76 .min_value
77 .map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());
78 let max_value = stats
79 .max_value
80 .map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());
81
82 Ok(ColumnStats::new(
83 field.into(),
84 null_count,
85 min_value,
86 max_value,
87 ))
88 })
89 .collect::<PolarsResult<Vec<_>>>()?;
90
91 if stats.is_empty() {
92 return Ok(None);
93 }
94
95 Ok(Some(BatchStats::new(
96 Arc::new(Schema::from_arrow_schema(schema)),
97 stats,
98 Some(md.num_rows()),
99 )))
100}
101
102pub fn read_this_row_group(
103 predicate: Option<&ScanIOPredicate>,
104 md: &RowGroupMetadata,
105 schema: &ArrowSchema,
106) -> PolarsResult<bool> {
107 if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
108 return Ok(true);
109 }
110
111 let mut should_read = true;
112
113 if let Some(predicate) = predicate {
114 if let Some(pred) = &predicate.skip_batch_predicate {
115 if let Some(stats) = collect_statistics(md, schema)? {
116 let stats = PlIndexMap::from_iter(stats.column_stats().iter().map(|col| {
117 (
118 col.field_name().clone(),
119 crate::predicates::ColumnStatistics {
120 dtype: stats.schema().get(col.field_name()).unwrap().clone(),
121 min: col
122 .to_min()
123 .map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()),
124 max: col
125 .to_max()
126 .map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()),
127 null_count: col.null_count().map(|nc| nc as IdxSize),
128 },
129 )
130 }));
131 let pred_result = pred.can_skip_batch(
132 md.num_rows() as IdxSize,
133 predicate.live_columns.as_ref(),
134 stats,
135 );
136
137 match pred_result {
139 Err(PolarsError::ColumnNotFound(errstr)) => {
140 return Err(PolarsError::ColumnNotFound(errstr));
141 },
142 Ok(true) => should_read = false,
143 _ => {},
144 }
145 }
146 } else if let Some(pred) = predicate.predicate.as_stats_evaluator() {
147 if let Some(stats) = collect_statistics(md, schema)? {
148 let pred_result = pred.should_read(&stats);
149
150 match pred_result {
152 Err(PolarsError::ColumnNotFound(errstr)) => {
153 return Err(PolarsError::ColumnNotFound(errstr));
154 },
155 Ok(false) => should_read = false,
156 _ => {},
157 }
158 }
159 }
160
161 if config::verbose() {
162 if should_read {
163 eprintln!(
164 "parquet row group must be read, statistics not sufficient for predicate."
165 );
166 } else {
167 eprintln!(
168 "parquet row group can be skipped, the statistics were sufficient to apply the predicate."
169 );
170 }
171 }
172 }
173
174 Ok(should_read)
175}