1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::chunked_array::builder::NullChunkedBuilder;
6use polars_core::prelude::*;
7use polars_core::series::IsSorted;
8use polars_core::utils::accumulate_dataframes_vertical;
9use polars_core::{POOL, config};
10use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
11use rayon::prelude::*;
12
13use super::mmap::mmap_columns;
14use super::utils::materialize_empty_df;
15use super::{ParallelStrategy, mmap};
16use crate::RowIndex;
17use crate::hive::materialize_hive_partitions;
18use crate::mmap::{MmapBytesReader, ReaderBytes};
19use crate::parquet::metadata::FileMetadataRef;
20use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
21use crate::utils::slice::split_slice_at_file;
22
23#[cfg(debug_assertions)]
24fn assert_dtypes(dtype: &ArrowDataType) {
27 use ArrowDataType as D;
28
29 match dtype {
30 D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
32
33 D::Float16 => unreachable!(),
35
36 D::List(_) => unreachable!(),
38
39 D::Map(_, _) => unreachable!(),
41
42 D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
44 D::Extension(ext) => assert_dtypes(&ext.inner),
45 D::LargeList(inner) => assert_dtypes(&inner.dtype),
46 D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
47 D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
48
49 _ => {},
50 }
51}
52
53fn should_copy_sortedness(dtype: &DataType) -> bool {
54 use DataType as D;
56
57 matches!(
58 dtype,
59 D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
60 )
61}
62
63pub fn try_set_sorted_flag(
64 series: &mut Series,
65 col_idx: usize,
66 sorting_map: &PlHashMap<usize, IsSorted>,
67) {
68 if let Some(is_sorted) = sorting_map.get(&col_idx) {
69 if should_copy_sortedness(series.dtype()) {
70 if config::verbose() {
71 eprintln!(
72 "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
73 series.name()
74 );
75 }
76
77 series.set_sorted_flag(*is_sorted);
78 }
79 }
80}
81
82pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
83 let capacity = md.sorting_columns().map_or(0, |s| s.len());
84 let mut sorting_map = PlHashMap::with_capacity(capacity);
85
86 if let Some(sorting_columns) = md.sorting_columns() {
87 for sorting in sorting_columns {
88 let prev_value = sorting_map.insert(
89 sorting.column_idx as usize,
90 if sorting.descending {
91 IsSorted::Descending
92 } else {
93 IsSorted::Ascending
94 },
95 );
96
97 debug_assert!(prev_value.is_none());
98 }
99 }
100
101 sorting_map
102}
103
104fn column_idx_to_series(
105 column_i: usize,
106 field_md: &[&ColumnChunkMetadata],
108 filter: Option<Filter>,
109 file_schema: &ArrowSchema,
110 store: &mmap::ColumnStore,
111) -> PolarsResult<(Series, Bitmap)> {
112 let field = file_schema.get_at_index(column_i).unwrap().1;
113
114 #[cfg(debug_assertions)]
115 {
116 assert_dtypes(field.dtype())
117 }
118 let columns = mmap_columns(store, field_md);
119 let (array, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
120 let series = Series::try_from((field, array))?;
121
122 Ok((series, pred_true_mask))
123}
124
125#[allow(clippy::too_many_arguments)]
126fn rg_to_dfs(
127 store: &mmap::ColumnStore,
128 previous_row_count: &mut IdxSize,
129 row_group_start: usize,
130 row_group_end: usize,
131 pre_slice: (usize, usize),
132 file_metadata: &FileMetadata,
133 schema: &ArrowSchemaRef,
134 row_index: Option<RowIndex>,
135 parallel: ParallelStrategy,
136 projection: &[usize],
137 hive_partition_columns: Option<&[Series]>,
138) -> PolarsResult<Vec<DataFrame>> {
139 if config::verbose() {
140 eprintln!("parquet scan with parallel = {parallel:?}");
141 }
142
143 if projection.is_empty() {
145 if let Some(row_index) = row_index {
146 let placeholder =
147 NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
148 return Ok(vec![
149 DataFrame::new(vec![placeholder.into_series().into_column()])?
150 .with_row_index(
151 row_index.name.clone(),
152 Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
153 )?
154 .select(std::iter::once(row_index.name))?,
155 ]);
156 }
157 }
158
159 use ParallelStrategy as S;
160
161 match parallel {
162 S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
163 store,
164 previous_row_count,
165 row_group_start,
166 row_group_end,
167 pre_slice,
168 file_metadata,
169 schema,
170 row_index,
171 parallel,
172 projection,
173 hive_partition_columns,
174 ),
175 _ => rg_to_dfs_par_over_rg(
176 store,
177 row_group_start,
178 row_group_end,
179 previous_row_count,
180 pre_slice,
181 file_metadata,
182 schema,
183 row_index,
184 projection,
185 hive_partition_columns,
186 ),
187 }
188}
189
190#[allow(clippy::too_many_arguments)]
191fn rg_to_dfs_optionally_par_over_columns(
193 store: &mmap::ColumnStore,
194 previous_row_count: &mut IdxSize,
195 row_group_start: usize,
196 row_group_end: usize,
197 slice: (usize, usize),
198 file_metadata: &FileMetadata,
199 schema: &ArrowSchemaRef,
200 row_index: Option<RowIndex>,
201 parallel: ParallelStrategy,
202 projection: &[usize],
203 hive_partition_columns: Option<&[Series]>,
204) -> PolarsResult<Vec<DataFrame>> {
205 let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
206
207 let mut n_rows_processed: usize = (0..row_group_start)
208 .map(|i| file_metadata.row_groups[i].num_rows())
209 .sum();
210 let slice_end = slice.0 + slice.1;
211
212 for rg_idx in row_group_start..row_group_end {
213 let md = &file_metadata.row_groups[rg_idx];
214
215 let rg_slice =
216 split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
217 let current_row_count = md.num_rows() as IdxSize;
218
219 let sorting_map = create_sorting_map(md);
220
221 let f = |column_i: &usize| {
222 let (name, field) = schema.get_at_index(*column_i).unwrap();
223
224 let Some(iter) = md.columns_under_root_iter(name) else {
225 return Ok(Column::full_null(
226 name.clone(),
227 rg_slice.1,
228 &DataType::from_arrow_field(field),
229 ));
230 };
231
232 let part = iter.collect::<Vec<_>>();
233
234 let (mut series, _) = column_idx_to_series(
235 *column_i,
236 part.as_slice(),
237 Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
238 schema,
239 store,
240 )?;
241
242 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
243 Ok(series.into_column())
244 };
245
246 let columns = if let ParallelStrategy::Columns = parallel {
247 POOL.install(|| {
248 projection
249 .par_iter()
250 .map(f)
251 .collect::<PolarsResult<Vec<_>>>()
252 })?
253 } else {
254 projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
255 };
256
257 let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
258 if let Some(rc) = &row_index {
259 unsafe {
260 df.with_row_index_mut(
261 rc.name.clone(),
262 Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
263 )
264 };
265 }
266
267 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
268
269 *previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
270 polars_err!(
271 ComputeError: "Parquet file produces more than pow(2, 32) rows; \
272 consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
273 or set 'streaming'"
274 ),
275 )?;
276 dfs.push(df);
277
278 if *previous_row_count as usize >= slice_end {
279 break;
280 }
281 }
282
283 Ok(dfs)
284}
285
286#[allow(clippy::too_many_arguments)]
287fn rg_to_dfs_par_over_rg(
289 store: &mmap::ColumnStore,
290 row_group_start: usize,
291 row_group_end: usize,
292 rows_read: &mut IdxSize,
293 slice: (usize, usize),
294 file_metadata: &FileMetadata,
295 schema: &ArrowSchemaRef,
296 row_index: Option<RowIndex>,
297 projection: &[usize],
298 hive_partition_columns: Option<&[Series]>,
299) -> PolarsResult<Vec<DataFrame>> {
300 let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
302
303 let mut n_rows_processed: usize = (0..row_group_start)
304 .map(|i| file_metadata.row_groups[i].num_rows())
305 .sum();
306 let slice_end = slice.0 + slice.1;
307
308 let mut rows_scanned: IdxSize;
312
313 if row_group_start > 0 {
314 rows_scanned = (0..row_group_start)
318 .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
319 .sum();
320 } else {
321 rows_scanned = 0;
322 }
323
324 for i in row_group_start..row_group_end {
325 let row_count_start = rows_scanned;
326 let rg_md = &file_metadata.row_groups[i];
327 let n_rows_this_file = rg_md.num_rows();
328 let rg_slice =
329 split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
330 rows_scanned = rows_scanned
331 .checked_add(n_rows_this_file as IdxSize)
332 .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
333
334 *rows_read += rg_slice.1 as IdxSize;
335
336 if rg_slice.1 == 0 {
337 continue;
338 }
339
340 row_groups.push((rg_md, rg_slice, row_count_start));
341 }
342
343 let dfs = POOL.install(|| {
344 row_groups
347 .into_par_iter()
348 .map(|(md, slice, row_count_start)| {
349 if slice.1 == 0 {
350 return Ok(None);
351 }
352 #[cfg(debug_assertions)]
354 {
355 assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
356 }
357
358 let sorting_map = create_sorting_map(md);
359
360 let columns = projection
361 .iter()
362 .map(|column_i| {
363 let (name, field) = schema.get_at_index(*column_i).unwrap();
364
365 let Some(iter) = md.columns_under_root_iter(name) else {
366 return Ok(Column::full_null(
367 name.clone(),
368 md.num_rows(),
369 &DataType::from_arrow_field(field),
370 ));
371 };
372
373 let part = iter.collect::<Vec<_>>();
374
375 let (mut series, _) = column_idx_to_series(
376 *column_i,
377 part.as_slice(),
378 Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
379 schema,
380 store,
381 )?;
382
383 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
384 Ok(series.into_column())
385 })
386 .collect::<PolarsResult<Vec<_>>>()?;
387
388 let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
389
390 if let Some(rc) = &row_index {
391 unsafe {
392 df.with_row_index_mut(
393 rc.name.clone(),
394 Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
395 )
396 };
397 }
398
399 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
400
401 Ok(Some(df))
402 })
403 .collect::<PolarsResult<Vec<_>>>()
404 })?;
405 Ok(dfs.into_iter().flatten().collect())
406}
407
408#[allow(clippy::too_many_arguments)]
409pub fn read_parquet<R: MmapBytesReader>(
410 mut reader: R,
411 pre_slice: (usize, usize),
412 projection: Option<&[usize]>,
413 reader_schema: &ArrowSchemaRef,
414 metadata: Option<FileMetadataRef>,
415 mut parallel: ParallelStrategy,
416 row_index: Option<RowIndex>,
417 hive_partition_columns: Option<&[Series]>,
418) -> PolarsResult<DataFrame> {
419 if pre_slice.1 == 0 {
421 return Ok(materialize_empty_df(
422 projection,
423 reader_schema,
424 hive_partition_columns,
425 row_index.as_ref(),
426 ));
427 }
428
429 let file_metadata = metadata
430 .map(Ok)
431 .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
432 let n_row_groups = file_metadata.row_groups.len();
433
434 let _sc = if n_row_groups > 1 {
438 #[cfg(feature = "dtype-categorical")]
439 {
440 Some(polars_core::StringCacheHolder::hold())
441 }
442 #[cfg(not(feature = "dtype-categorical"))]
443 {
444 Some(0u8)
445 }
446 } else {
447 None
448 };
449
450 let materialized_projection = projection
451 .map(Cow::Borrowed)
452 .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
453
454 if ParallelStrategy::Auto == parallel {
455 if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
456 {
457 parallel = ParallelStrategy::RowGroups;
458 } else {
459 parallel = ParallelStrategy::Columns;
460 }
461 }
462
463 if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
464 parallel = ParallelStrategy::None;
465 }
466
467 let reader = ReaderBytes::from(&mut reader);
468 let store = mmap::ColumnStore::Local(unsafe {
469 std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
470 });
471
472 let dfs = rg_to_dfs(
473 &store,
474 &mut 0,
475 0,
476 n_row_groups,
477 pre_slice,
478 &file_metadata,
479 reader_schema,
480 row_index.clone(),
481 parallel,
482 &materialized_projection,
483 hive_partition_columns,
484 )?;
485
486 if dfs.is_empty() {
487 Ok(materialize_empty_df(
488 projection,
489 reader_schema,
490 hive_partition_columns,
491 row_index.as_ref(),
492 ))
493 } else {
494 accumulate_dataframes_vertical(dfs)
495 }
496}
497
498pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
499 let num_edges = mask.num_edges() as f64;
500 let rg_len = mask.len() as f64;
501
502 (num_edges / rg_len).clamp(0.0, 1.0)
519}
520
521#[derive(Clone, Copy)]
522pub enum PrefilterMaskSetting {
523 Auto,
524 Pre,
525 Post,
526}
527
528impl PrefilterMaskSetting {
529 pub fn init_from_env() -> Self {
530 std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
531 "auto" => Self::Auto,
532 "pre" => Self::Pre,
533 "post" => Self::Post,
534 _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
535 })
536 }
537
538 pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
539 match self {
540 Self::Auto => {
541 let is_nested = dtype.is_nested();
544
545 !is_nested && prefilter_cost <= 0.01
547 },
548 Self::Pre => true,
549 Self::Post => false,
550 }
551 }
552}