1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_buffer::Buffer;
6use polars_core::chunked_array::builder::NullChunkedBuilder;
7use polars_core::config;
8use polars_core::prelude::*;
9use polars_core::runtime::RAYON;
10use polars_core::series::IsSorted;
11use polars_core::utils::accumulate_dataframes_vertical;
12use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
13use rayon::prelude::*;
14
15use super::mmap::mmap_columns;
16use super::utils::materialize_empty_df;
17use super::{ParallelStrategy, mmap};
18use crate::RowIndex;
19use crate::hive::materialize_hive_partitions;
20use crate::mmap::{MmapBytesReader, ReaderBytes};
21use crate::parquet::metadata::FileMetadataRef;
22use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
23use crate::utils::slice::split_slice_at_file;
24
25#[cfg(debug_assertions)]
26fn assert_dtypes(dtype: &ArrowDataType) {
29 use ArrowDataType as D;
30
31 match dtype {
32 D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
34
35 D::Float16 => unreachable!(),
37
38 D::List(_) => unreachable!(),
40
41 D::Map(_, _) => unreachable!(),
43
44 D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
46 D::Extension(ext) => assert_dtypes(&ext.inner),
47 D::LargeList(inner) => assert_dtypes(&inner.dtype),
48 D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
49 D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
50
51 _ => {},
52 }
53}
54
55fn should_copy_sortedness(dtype: &DataType) -> bool {
56 use DataType as D;
58
59 matches!(
60 dtype,
61 D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
62 )
63}
64
65pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {
66 let Some((sorted_col, is_sorted)) = sorting_map.first() else {
67 return;
68 };
69 if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {
70 return;
71 }
72 if config::verbose() {
73 eprintln!(
74 "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
75 series.name()
76 );
77 }
78
79 series.set_sorted_flag(*is_sorted);
80}
81
82pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {
83 let capacity = md.sorting_columns().map_or(0, |s| s.len());
84 let mut sorting_map = Vec::with_capacity(capacity);
85
86 if let Some(sorting_columns) = md.sorting_columns() {
87 for sorting in sorting_columns {
88 sorting_map.push((
89 sorting.column_idx as usize,
90 if sorting.descending {
91 IsSorted::Descending
92 } else {
93 IsSorted::Ascending
94 },
95 ))
96 }
97 }
98
99 sorting_map
100}
101
102fn column_idx_to_series(
103 column_i: usize,
104 field_md: &[&ColumnChunkMetadata],
106 filter: Option<Filter>,
107 file_schema: &ArrowSchema,
108 store: &mmap::ColumnStore,
109) -> PolarsResult<(Series, Bitmap)> {
110 let field = file_schema.get_at_index(column_i).unwrap().1;
111
112 #[cfg(debug_assertions)]
113 {
114 assert_dtypes(field.dtype())
115 }
116 let columns = mmap_columns(store, field_md);
117 let (arrays, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
118 let series = Series::try_from((field, arrays))?;
119
120 Ok((series, pred_true_mask))
121}
122
123#[allow(clippy::too_many_arguments)]
124fn rg_to_dfs(
125 store: &mmap::ColumnStore,
126 previous_row_count: &mut IdxSize,
127 row_group_start: usize,
128 row_group_end: usize,
129 pre_slice: (usize, usize),
130 file_metadata: &FileMetadata,
131 schema: &ArrowSchemaRef,
132 row_index: Option<RowIndex>,
133 parallel: ParallelStrategy,
134 projection: &[usize],
135 hive_partition_columns: Option<&[Series]>,
136) -> PolarsResult<Vec<DataFrame>> {
137 if config::verbose() {
138 eprintln!("parquet scan with parallel = {parallel:?}");
139 }
140
141 if projection.is_empty() {
143 if let Some(row_index) = row_index {
144 let placeholder =
145 NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
146 return Ok(vec![
147 DataFrame::new_infer_height(vec![placeholder.into_series().into_column()])?
148 .with_row_index(
149 row_index.name.clone(),
150 Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
151 )?
152 .select(std::iter::once(row_index.name))?,
153 ]);
154 }
155 }
156
157 use ParallelStrategy as S;
158
159 match parallel {
160 S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
161 store,
162 previous_row_count,
163 row_group_start,
164 row_group_end,
165 pre_slice,
166 file_metadata,
167 schema,
168 row_index,
169 parallel,
170 projection,
171 hive_partition_columns,
172 ),
173 _ => rg_to_dfs_par_over_rg(
174 store,
175 row_group_start,
176 row_group_end,
177 previous_row_count,
178 pre_slice,
179 file_metadata,
180 schema,
181 row_index,
182 projection,
183 hive_partition_columns,
184 ),
185 }
186}
187
188#[allow(clippy::too_many_arguments)]
189fn rg_to_dfs_optionally_par_over_columns(
191 store: &mmap::ColumnStore,
192 previous_row_count: &mut IdxSize,
193 row_group_start: usize,
194 row_group_end: usize,
195 slice: (usize, usize),
196 file_metadata: &FileMetadata,
197 schema: &ArrowSchemaRef,
198 row_index: Option<RowIndex>,
199 parallel: ParallelStrategy,
200 projection: &[usize],
201 hive_partition_columns: Option<&[Series]>,
202) -> PolarsResult<Vec<DataFrame>> {
203 let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
204
205 let mut n_rows_processed: usize = (0..row_group_start)
206 .map(|i| file_metadata.row_groups[i].num_rows())
207 .sum();
208 let slice_end = slice.0 + slice.1;
209
210 for rg_idx in row_group_start..row_group_end {
211 let md = &file_metadata.row_groups[rg_idx];
212
213 let rg_slice =
214 split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
215 let current_row_count = md.num_rows() as IdxSize;
216
217 let sorting_map = create_sorting_map(md);
218
219 let f = |column_i: &usize| {
220 let (name, field) = schema.get_at_index(*column_i).unwrap();
221
222 let Some(iter) = md.columns_under_root_iter(name) else {
223 return Ok(Column::full_null(
224 name.clone(),
225 rg_slice.1,
226 &DataType::from_arrow_field(field),
227 ));
228 };
229
230 let part = iter.collect::<Vec<_>>();
231
232 let (mut series, _) = column_idx_to_series(
233 *column_i,
234 part.as_slice(),
235 Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
236 schema,
237 store,
238 )?;
239
240 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
241 Ok(series.into_column())
242 };
243
244 let columns = if let ParallelStrategy::Columns = parallel {
245 RAYON.install(|| {
246 projection
247 .par_iter()
248 .map(f)
249 .collect::<PolarsResult<Vec<_>>>()
250 })?
251 } else {
252 projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
253 };
254
255 let mut df = unsafe { DataFrame::new_unchecked(rg_slice.1, columns) };
256 if let Some(rc) = &row_index {
257 unsafe {
258 df.with_row_index_mut(
259 rc.name.clone(),
260 Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
261 )
262 };
263 }
264
265 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
266
267 *previous_row_count = previous_row_count
268 .checked_add(current_row_count)
269 .ok_or_else(|| {
270 polars_err!(
271 ComputeError: "Parquet file produces more than pow(2, 32) rows; \
272 consider compiling with polars-bigidx feature (pip install polars[rt64]), \
273 or set 'streaming'"
274 )
275 })?;
276 dfs.push(df);
277
278 if *previous_row_count as usize >= slice_end {
279 break;
280 }
281 }
282
283 Ok(dfs)
284}
285
286#[allow(clippy::too_many_arguments)]
287fn rg_to_dfs_par_over_rg(
289 store: &mmap::ColumnStore,
290 row_group_start: usize,
291 row_group_end: usize,
292 rows_read: &mut IdxSize,
293 slice: (usize, usize),
294 file_metadata: &FileMetadata,
295 schema: &ArrowSchemaRef,
296 row_index: Option<RowIndex>,
297 projection: &[usize],
298 hive_partition_columns: Option<&[Series]>,
299) -> PolarsResult<Vec<DataFrame>> {
300 let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
302
303 let mut n_rows_processed: usize = (0..row_group_start)
304 .map(|i| file_metadata.row_groups[i].num_rows())
305 .sum();
306 let slice_end = slice.0 + slice.1;
307
308 let mut rows_scanned: IdxSize;
312
313 if row_group_start > 0 {
314 rows_scanned = (0..row_group_start)
318 .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
319 .sum();
320 } else {
321 rows_scanned = 0;
322 }
323
324 for i in row_group_start..row_group_end {
325 let row_count_start = rows_scanned;
326 let rg_md = &file_metadata.row_groups[i];
327 let n_rows_this_file = rg_md.num_rows();
328 let rg_slice =
329 split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
330 rows_scanned = rows_scanned
331 .checked_add(n_rows_this_file as IdxSize)
332 .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
333
334 *rows_read += rg_slice.1 as IdxSize;
335
336 if rg_slice.1 == 0 {
337 continue;
338 }
339
340 row_groups.push((rg_md, rg_slice, row_count_start));
341 }
342
343 let dfs = RAYON.install(|| {
344 row_groups
347 .into_par_iter()
348 .map(|(md, slice, row_count_start)| {
349 if slice.1 == 0 {
350 return Ok(None);
351 }
352 #[cfg(debug_assertions)]
354 {
355 assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
356 }
357
358 let sorting_map = create_sorting_map(md);
359
360 let columns = projection
361 .iter()
362 .map(|column_i| {
363 let (name, field) = schema.get_at_index(*column_i).unwrap();
364
365 let Some(iter) = md.columns_under_root_iter(name) else {
366 return Ok(Column::full_null(
367 name.clone(),
368 md.num_rows(),
369 &DataType::from_arrow_field(field),
370 ));
371 };
372
373 let part = iter.collect::<Vec<_>>();
374
375 let (mut series, _) = column_idx_to_series(
376 *column_i,
377 part.as_slice(),
378 Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
379 schema,
380 store,
381 )?;
382
383 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
384 Ok(series.into_column())
385 })
386 .collect::<PolarsResult<Vec<_>>>()?;
387
388 let mut df = unsafe { DataFrame::new_unchecked(slice.1, columns) };
389
390 if let Some(rc) = &row_index {
391 unsafe {
392 df.with_row_index_mut(
393 rc.name.clone(),
394 Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
395 )
396 };
397 }
398
399 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
400
401 Ok(Some(df))
402 })
403 .collect::<PolarsResult<Vec<_>>>()
404 })?;
405 Ok(dfs.into_iter().flatten().collect())
406}
407
408#[allow(clippy::too_many_arguments)]
409pub fn read_parquet<R: MmapBytesReader>(
410 mut reader: R,
411 pre_slice: (usize, usize),
412 projection: Option<&[usize]>,
413 reader_schema: &ArrowSchemaRef,
414 metadata: Option<FileMetadataRef>,
415 mut parallel: ParallelStrategy,
416 row_index: Option<RowIndex>,
417 hive_partition_columns: Option<&[Series]>,
418) -> PolarsResult<DataFrame> {
419 if pre_slice.1 == 0 {
421 return Ok(materialize_empty_df(
422 projection,
423 reader_schema,
424 hive_partition_columns,
425 row_index.as_ref(),
426 ));
427 }
428
429 let file_metadata = metadata
430 .map(Ok)
431 .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
432 let n_row_groups = file_metadata.row_groups.len();
433
434 let materialized_projection = projection
435 .map(Cow::Borrowed)
436 .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
437
438 if ParallelStrategy::Auto == parallel {
439 if n_row_groups > materialized_projection.len()
440 || n_row_groups > RAYON.current_num_threads()
441 {
442 parallel = ParallelStrategy::RowGroups;
443 } else {
444 parallel = ParallelStrategy::Columns;
445 }
446 }
447
448 if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
449 parallel = ParallelStrategy::None;
450 }
451
452 let reader = ReaderBytes::from(&mut reader);
453 Buffer::with_slice(&reader, |buf| {
454 let store = mmap::ColumnStore::Local(buf);
455 let dfs = rg_to_dfs(
456 &store,
457 &mut 0,
458 0,
459 n_row_groups,
460 pre_slice,
461 &file_metadata,
462 reader_schema,
463 row_index.clone(),
464 parallel,
465 &materialized_projection,
466 hive_partition_columns,
467 )?;
468
469 if dfs.is_empty() {
470 Ok(materialize_empty_df(
471 projection,
472 reader_schema,
473 hive_partition_columns,
474 row_index.as_ref(),
475 ))
476 } else {
477 accumulate_dataframes_vertical(dfs)
478 }
479 })
480}
481
482pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
483 let num_edges = mask.num_edges() as f64;
484 let rg_len = mask.len() as f64;
485
486 (num_edges / rg_len).clamp(0.0, 1.0)
503}
504
505#[derive(Clone, Copy)]
506pub enum PrefilterMaskSetting {
507 Auto,
508 Pre,
509 Post,
510}
511
512impl PrefilterMaskSetting {
513 pub fn init_from_env() -> Self {
514 std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
515 "auto" => Self::Auto,
516 "pre" => Self::Pre,
517 "post" => Self::Post,
518 _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
519 })
520 }
521
522 pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
523 match self {
524 Self::Auto => {
525 let is_nested = dtype.is_nested();
528
529 !is_nested && prefilter_cost <= 0.01
531 },
532 Self::Pre => true,
533 Self::Post => false,
534 }
535 }
536}