1use std::io::{Read, Seek, SeekFrom};
2use std::sync::Arc;
3
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::prelude::*;
6#[cfg(feature = "cloud")]
7use polars_core::utils::accumulate_dataframes_vertical_unchecked;
8use polars_parquet::read;
9
10#[cfg(feature = "cloud")]
11use super::async_impl::FetchRowGroupsFromObjectStore;
12#[cfg(feature = "cloud")]
13use super::async_impl::ParquetObjectStore;
14pub use super::read_impl::BatchedParquetReader;
15use super::read_impl::{FetchRowGroupsFromMmapReader, compute_row_group_range, read_parquet};
16#[cfg(feature = "cloud")]
17use super::utils::materialize_empty_df;
18use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
19use crate::RowIndex;
20#[cfg(feature = "cloud")]
21use crate::cloud::CloudOptions;
22use crate::mmap::MmapBytesReader;
23use crate::parquet::metadata::FileMetadataRef;
24use crate::predicates::ScanIOPredicate;
25use crate::prelude::*;
26
27#[must_use]
29pub struct ParquetReader<R: Read + Seek> {
30 reader: R,
31 rechunk: bool,
32 slice: (usize, usize),
33 columns: Option<Vec<String>>,
34 projection: Option<Vec<usize>>,
35 parallel: ParallelStrategy,
36 schema: Option<ArrowSchemaRef>,
37 row_index: Option<RowIndex>,
38 low_memory: bool,
39 metadata: Option<FileMetadataRef>,
40 predicate: Option<ScanIOPredicate>,
41 hive_partition_columns: Option<Vec<Series>>,
42 include_file_path: Option<(PlSmallStr, Arc<str>)>,
43 use_statistics: bool,
44}
45
46impl<R: MmapBytesReader> ParquetReader<R> {
47 pub fn set_low_memory(mut self, low_memory: bool) -> Self {
50 self.low_memory = low_memory;
51 self
52 }
53
54 pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
56 self.parallel = parallel;
57 self
58 }
59
60 pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
61 self.slice = slice.unwrap_or((0, usize::MAX));
62 self
63 }
64
65 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
67 self.columns = columns;
68 self
69 }
70
71 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
74 self.projection = projection;
75 self
76 }
77
78 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
80 self.row_index = row_index;
81 self
82 }
83
84 pub fn with_arrow_schema_projection(
87 mut self,
88 first_schema: &Arc<ArrowSchema>,
89 projected_arrow_schema: Option<&ArrowSchema>,
90 allow_missing_columns: bool,
91 ) -> PolarsResult<Self> {
92 let slf_schema = self.schema()?;
93 let slf_schema_width = slf_schema.len();
94
95 if allow_missing_columns {
96 ensure_matching_dtypes_if_found(
98 projected_arrow_schema.unwrap_or(first_schema.as_ref()),
99 self.schema()?.as_ref(),
100 )?;
101 self.schema = Some(Arc::new(
102 first_schema
103 .iter()
104 .map(|(name, field)| {
105 (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
106 })
107 .collect(),
108 ));
109 }
110
111 let schema = self.schema()?;
112
113 (|| {
114 if let Some(projected_arrow_schema) = projected_arrow_schema {
115 self.projection = projected_arrow_schema_to_projection_indices(
116 schema.as_ref(),
117 projected_arrow_schema,
118 )?;
119 } else {
120 if slf_schema_width > first_schema.len() {
121 polars_bail!(
122 SchemaMismatch:
123 "parquet file contained extra columns and no selection was given"
124 )
125 }
126
127 self.projection =
128 projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
129 };
130 Ok(())
131 })()
132 .map_err(|e| {
133 if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
134 e.wrap_msg(|s| {
135 format!(
136 "error with column selection, \
137 consider enabling `allow_missing_columns`: {}",
138 s
139 )
140 })
141 } else {
142 e
143 }
144 })?;
145
146 Ok(self)
147 }
148
149 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
151 self.schema = Some(match &self.schema {
152 Some(schema) => schema.clone(),
153 None => {
154 let metadata = self.get_metadata()?;
155 Arc::new(read::infer_schema(metadata)?)
156 },
157 });
158
159 Ok(self.schema.clone().unwrap())
160 }
161
162 pub fn use_statistics(mut self, toggle: bool) -> Self {
165 self.use_statistics = toggle;
166 self
167 }
168
169 pub fn num_rows(&mut self) -> PolarsResult<usize> {
171 let metadata = self.get_metadata()?;
172 Ok(metadata.num_rows)
173 }
174
175 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
176 self.hive_partition_columns = columns;
177 self
178 }
179
180 pub fn with_include_file_path(
181 mut self,
182 include_file_path: Option<(PlSmallStr, Arc<str>)>,
183 ) -> Self {
184 self.include_file_path = include_file_path;
185 self
186 }
187
188 pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
189 self.metadata = Some(metadata);
190 }
191
192 pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
193 if self.metadata.is_none() {
194 self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
195 }
196 Ok(self.metadata.as_ref().unwrap())
197 }
198
199 pub fn with_predicate(mut self, predicate: Option<ScanIOPredicate>) -> Self {
200 self.predicate = predicate;
201 self
202 }
203}
204
205impl<R: MmapBytesReader + 'static> ParquetReader<R> {
206 pub fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
207 let metadata = self.get_metadata()?.clone();
208 let schema = self.schema()?;
209
210 self.reader.seek(SeekFrom::Start(0))?;
212 let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into();
213 BatchedParquetReader::new(
214 row_group_fetcher,
215 metadata,
216 schema,
217 self.slice,
218 self.projection,
219 self.predicate.clone(),
220 self.row_index,
221 chunk_size,
222 self.use_statistics,
223 self.hive_partition_columns,
224 self.include_file_path,
225 self.parallel,
226 )
227 }
228}
229
230impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
231 fn new(reader: R) -> Self {
233 ParquetReader {
234 reader,
235 rechunk: false,
236 slice: (0, usize::MAX),
237 columns: None,
238 projection: None,
239 parallel: Default::default(),
240 row_index: None,
241 low_memory: false,
242 metadata: None,
243 predicate: None,
244 schema: None,
245 use_statistics: true,
246 hive_partition_columns: None,
247 include_file_path: None,
248 }
249 }
250
251 fn set_rechunk(mut self, rechunk: bool) -> Self {
252 self.rechunk = rechunk;
253 self
254 }
255
256 fn finish(mut self) -> PolarsResult<DataFrame> {
257 let schema = self.schema()?;
258 let metadata = self.get_metadata()?.clone();
259 let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
260
261 if let Some(cols) = &self.columns {
262 self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
263 }
264
265 let mut df = read_parquet(
266 self.reader,
267 self.slice,
268 self.projection.as_deref(),
269 &schema,
270 Some(metadata),
271 self.predicate.as_ref(),
272 self.parallel,
273 self.row_index,
274 self.use_statistics,
275 self.hive_partition_columns.as_deref(),
276 )?;
277
278 if self.rechunk {
279 df.as_single_chunk_par();
280 };
281
282 if let Some((col, value)) = &self.include_file_path {
283 unsafe {
284 df.with_column_unchecked(Column::new_scalar(
285 col.clone(),
286 Scalar::new(
287 DataType::String,
288 AnyValue::StringOwned(value.as_ref().into()),
289 ),
290 if df.width() > 0 { df.height() } else { n_rows },
291 ))
292 };
293 }
294
295 Ok(df)
296 }
297}
298
299#[cfg(feature = "cloud")]
302pub struct ParquetAsyncReader {
303 reader: ParquetObjectStore,
304 slice: (usize, usize),
305 rechunk: bool,
306 projection: Option<Vec<usize>>,
307 predicate: Option<ScanIOPredicate>,
308 row_index: Option<RowIndex>,
309 use_statistics: bool,
310 hive_partition_columns: Option<Vec<Series>>,
311 include_file_path: Option<(PlSmallStr, Arc<str>)>,
312 schema: Option<ArrowSchemaRef>,
313 parallel: ParallelStrategy,
314}
315
316#[cfg(feature = "cloud")]
317impl ParquetAsyncReader {
318 pub async fn from_uri(
319 uri: &str,
320 cloud_options: Option<&CloudOptions>,
321 metadata: Option<FileMetadataRef>,
322 ) -> PolarsResult<ParquetAsyncReader> {
323 Ok(ParquetAsyncReader {
324 reader: ParquetObjectStore::from_uri(uri, cloud_options, metadata).await?,
325 rechunk: false,
326 slice: (0, usize::MAX),
327 projection: None,
328 row_index: None,
329 predicate: None,
330 use_statistics: true,
331 hive_partition_columns: None,
332 include_file_path: None,
333 schema: None,
334 parallel: Default::default(),
335 })
336 }
337
338 pub async fn with_arrow_schema_projection(
339 mut self,
340 first_schema: &Arc<ArrowSchema>,
341 projected_arrow_schema: Option<&ArrowSchema>,
342 allow_missing_columns: bool,
343 ) -> PolarsResult<Self> {
344 let slf_schema = self.schema().await?;
345 let slf_schema_width = slf_schema.len();
346
347 if allow_missing_columns {
348 ensure_matching_dtypes_if_found(
350 projected_arrow_schema.unwrap_or(first_schema.as_ref()),
351 self.schema().await?.as_ref(),
352 )?;
353 self.schema = Some(Arc::new(
354 first_schema
355 .iter()
356 .map(|(name, field)| {
357 (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
358 })
359 .collect(),
360 ));
361 }
362
363 let schema = self.schema().await?;
364
365 (|| {
366 if let Some(projected_arrow_schema) = projected_arrow_schema {
367 self.projection = projected_arrow_schema_to_projection_indices(
368 schema.as_ref(),
369 projected_arrow_schema,
370 )?;
371 } else {
372 if slf_schema_width > first_schema.len() {
373 polars_bail!(
374 SchemaMismatch:
375 "parquet file contained extra columns and no selection was given"
376 )
377 }
378
379 self.projection =
380 projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
381 };
382 Ok(())
383 })()
384 .map_err(|e| {
385 if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
386 e.wrap_msg(|s| {
387 format!(
388 "error with column selection, \
389 consider enabling `allow_missing_columns`: {}",
390 s
391 )
392 })
393 } else {
394 e
395 }
396 })?;
397
398 Ok(self)
399 }
400
401 pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
402 self.schema = Some(match self.schema.as_ref() {
403 Some(schema) => Arc::clone(schema),
404 None => {
405 let metadata = self.reader.get_metadata().await?;
406 let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
407 Arc::new(arrow_schema)
408 },
409 });
410
411 Ok(self.schema.clone().unwrap())
412 }
413
414 pub async fn num_rows(&mut self) -> PolarsResult<usize> {
415 self.reader.num_rows().await
416 }
417
418 pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
421 self.slice = slice.unwrap_or((0, usize::MAX));
422 self
423 }
424
425 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
426 self.row_index = row_index;
427 self
428 }
429
430 pub fn set_rechunk(mut self, rechunk: bool) -> Self {
431 self.rechunk = rechunk;
432 self
433 }
434
435 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
436 self.projection = projection;
437 self
438 }
439
440 pub fn with_predicate(mut self, predicate: Option<ScanIOPredicate>) -> Self {
441 self.predicate = predicate;
442 self
443 }
444
445 pub fn use_statistics(mut self, toggle: bool) -> Self {
448 self.use_statistics = toggle;
449 self
450 }
451
452 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
453 self.hive_partition_columns = columns;
454 self
455 }
456
457 pub fn with_include_file_path(
458 mut self,
459 include_file_path: Option<(PlSmallStr, Arc<str>)>,
460 ) -> Self {
461 self.include_file_path = include_file_path;
462 self
463 }
464
465 pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
466 self.parallel = parallel;
467 self
468 }
469
470 pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
471 let metadata = self.reader.get_metadata().await?.clone();
472 let schema = match self.schema {
473 Some(schema) => schema,
474 None => self.schema().await?,
475 };
476 let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
478 self.reader,
479 schema.clone(),
480 self.projection.as_deref(),
481 self.predicate.clone(),
482 compute_row_group_range(
483 0,
484 metadata.row_groups.len(),
485 self.slice,
486 &metadata.row_groups,
487 ),
488 &metadata.row_groups,
489 )?
490 .into();
491 BatchedParquetReader::new(
492 row_group_fetcher,
493 metadata,
494 schema,
495 self.slice,
496 self.projection,
497 self.predicate.clone(),
498 self.row_index,
499 chunk_size,
500 self.use_statistics,
501 self.hive_partition_columns,
502 self.include_file_path,
503 self.parallel,
504 )
505 }
506
507 pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
508 self.reader.get_metadata().await
509 }
510
511 pub async fn finish(mut self) -> PolarsResult<DataFrame> {
512 let rechunk = self.rechunk;
513 let metadata = self.get_metadata().await?.clone();
514 let reader_schema = self.schema().await?;
515 let row_index = self.row_index.clone();
516 let hive_partition_columns = self.hive_partition_columns.clone();
517 let projection = self.projection.clone();
518
519 let reader = self.batched(usize::MAX).await?;
521 let n_batches = metadata.row_groups.len();
522 let mut iter = reader.iter(n_batches);
523
524 let mut chunks = Vec::with_capacity(n_batches);
525 while let Some(result) = iter.next_().await {
526 chunks.push(result?)
527 }
528 if chunks.is_empty() {
529 return Ok(materialize_empty_df(
530 projection.as_deref(),
531 reader_schema.as_ref(),
532 hive_partition_columns.as_deref(),
533 row_index.as_ref(),
534 ));
535 }
536 let mut df = accumulate_dataframes_vertical_unchecked(chunks);
537
538 if rechunk {
539 df.as_single_chunk_par();
540 }
541 Ok(df)
542 }
543}