
1use std::io::{Read, Seek, SeekFrom};
2use std::sync::Arc;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::prelude::*;
6#[cfg(feature = "cloud")]
7use polars_core::utils::accumulate_dataframes_vertical_unchecked;
8use polars_parquet::read;
10#[cfg(feature = "cloud")]
11use super::async_impl::FetchRowGroupsFromObjectStore;
12#[cfg(feature = "cloud")]
13use super::async_impl::ParquetObjectStore;
14pub use super::read_impl::BatchedParquetReader;
15use super::read_impl::{FetchRowGroupsFromMmapReader, compute_row_group_range, read_parquet};
16#[cfg(feature = "cloud")]
17use super::utils::materialize_empty_df;
18use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
19use crate::RowIndex;
20#[cfg(feature = "cloud")]
21use crate::cloud::CloudOptions;
22use crate::mmap::MmapBytesReader;
23use crate::parquet::metadata::FileMetadataRef;
24use crate::predicates::ScanIOPredicate;
25use crate::prelude::*;
27/// Read Apache parquet format into a DataFrame.
29pub struct ParquetReader<R: Read + Seek> {
30    reader: R,
31    rechunk: bool,
32    slice: (usize, usize),
33    columns: Option<Vec<String>>,
34    projection: Option<Vec<usize>>,
35    parallel: ParallelStrategy,
36    schema: Option<ArrowSchemaRef>,
37    row_index: Option<RowIndex>,
38    low_memory: bool,
39    metadata: Option<FileMetadataRef>,
40    predicate: Option<ScanIOPredicate>,
41    hive_partition_columns: Option<Vec<Series>>,
42    include_file_path: Option<(PlSmallStr, Arc<str>)>,
43    use_statistics: bool,
46impl<R: MmapBytesReader> ParquetReader<R> {
47    /// Try to reduce memory pressure at the expense of performance. If setting this does not reduce memory
48    /// enough, turn off parallelization.
49    pub fn set_low_memory(mut self, low_memory: bool) -> Self {
50        self.low_memory = low_memory;
51        self
52    }
54    /// Read the parquet file in parallel (default). The single threaded reader consumes less memory.
55    pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
56        self.parallel = parallel;
57        self
58    }
60    pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
61        self.slice = slice.unwrap_or((0, usize::MAX));
62        self
63    }
65    /// Columns to select/ project
66    pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
67        self.columns = columns;
68        self
69    }
71    /// Set the reader's column projection. This counts from 0, meaning that
72    /// `vec![0, 4]` would select the 1st and 5th column.
73    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
74        self.projection = projection;
75        self
76    }
78    /// Add a row index column.
79    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
80        self.row_index = row_index;
81        self
82    }
84    /// Checks that the file contains all the columns in `projected_arrow_schema` with the same
85    /// dtype, and sets the projection indices.
86    pub fn with_arrow_schema_projection(
87        mut self,
88        first_schema: &Arc<ArrowSchema>,
89        projected_arrow_schema: Option<&ArrowSchema>,
90        allow_missing_columns: bool,
91    ) -> PolarsResult<Self> {
92        let slf_schema = self.schema()?;
93        let slf_schema_width = slf_schema.len();
95        if allow_missing_columns {
96            // Must check the dtypes
97            ensure_matching_dtypes_if_found(
98                projected_arrow_schema.unwrap_or(first_schema.as_ref()),
99                self.schema()?.as_ref(),
100            )?;
101            self.schema = Some(Arc::new(
102                first_schema
103                    .iter()
104                    .map(|(name, field)| {
105                        (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
106                    })
107                    .collect(),
108            ));
109        }
111        let schema = self.schema()?;
113        (|| {
114            if let Some(projected_arrow_schema) = projected_arrow_schema {
115                self.projection = projected_arrow_schema_to_projection_indices(
116                    schema.as_ref(),
117                    projected_arrow_schema,
118                )?;
119            } else {
120                if slf_schema_width > first_schema.len() {
121                    polars_bail!(
122                       SchemaMismatch:
123                       "parquet file contained extra columns and no selection was given"
124                    )
125                }
127                self.projection =
128                    projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
129            };
130            Ok(())
131        })()
132        .map_err(|e| {
133            if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
134                e.wrap_msg(|s| {
135                    format!(
136                        "error with column selection, \
137                        consider enabling `allow_missing_columns`: {}",
138                        s
139                    )
140                })
141            } else {
142                e
143            }
144        })?;
146        Ok(self)
147    }
149    /// [`Schema`] of the file.
150    pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
151        self.schema = Some(match &self.schema {
152            Some(schema) => schema.clone(),
153            None => {
154                let metadata = self.get_metadata()?;
155                Arc::new(read::infer_schema(metadata)?)
156            },
157        });
159        Ok(self.schema.clone().unwrap())
160    }
162    /// Use statistics in the parquet to determine if pages
163    /// can be skipped from reading.
164    pub fn use_statistics(mut self, toggle: bool) -> Self {
165        self.use_statistics = toggle;
166        self
167    }
169    /// Number of rows in the parquet file.
170    pub fn num_rows(&mut self) -> PolarsResult<usize> {
171        let metadata = self.get_metadata()?;
172        Ok(metadata.num_rows)
173    }
175    pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
176        self.hive_partition_columns = columns;
177        self
178    }
180    pub fn with_include_file_path(
181        mut self,
182        include_file_path: Option<(PlSmallStr, Arc<str>)>,
183    ) -> Self {
184        self.include_file_path = include_file_path;
185        self
186    }
188    pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
189        self.metadata = Some(metadata);
190    }
192    pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
193        if self.metadata.is_none() {
194            self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
195        }
196        Ok(self.metadata.as_ref().unwrap())
197    }
199    pub fn with_predicate(mut self, predicate: Option<ScanIOPredicate>) -> Self {
200        self.predicate = predicate;
201        self
202    }
205impl<R: MmapBytesReader + 'static> ParquetReader<R> {
206    pub fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
207        let metadata = self.get_metadata()?.clone();
208        let schema = self.schema()?;
210        // XXX: Can a parquet file starts at an offset?
211        self.reader.seek(SeekFrom::Start(0))?;
212        let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into();
213        BatchedParquetReader::new(
214            row_group_fetcher,
215            metadata,
216            schema,
217            self.slice,
218            self.projection,
219            self.predicate.clone(),
220            self.row_index,
221            chunk_size,
222            self.use_statistics,
223            self.hive_partition_columns,
224            self.include_file_path,
225            self.parallel,
226        )
227    }
230impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
231    /// Create a new [`ParquetReader`] from an existing `Reader`.
232    fn new(reader: R) -> Self {
233        ParquetReader {
234            reader,
235            rechunk: false,
236            slice: (0, usize::MAX),
237            columns: None,
238            projection: None,
239            parallel: Default::default(),
240            row_index: None,
241            low_memory: false,
242            metadata: None,
243            predicate: None,
244            schema: None,
245            use_statistics: true,
246            hive_partition_columns: None,
247            include_file_path: None,
248        }
249    }
251    fn set_rechunk(mut self, rechunk: bool) -> Self {
252        self.rechunk = rechunk;
253        self
254    }
256    fn finish(mut self) -> PolarsResult<DataFrame> {
257        let schema = self.schema()?;
258        let metadata = self.get_metadata()?.clone();
259        let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
261        if let Some(cols) = &self.columns {
262            self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
263        }
265        let mut df = read_parquet(
266            self.reader,
267            self.slice,
268            self.projection.as_deref(),
269            &schema,
270            Some(metadata),
271            self.predicate.as_ref(),
272            self.parallel,
273            self.row_index,
274            self.use_statistics,
275            self.hive_partition_columns.as_deref(),
276        )?;
278        if self.rechunk {
279            df.as_single_chunk_par();
280        };
282        if let Some((col, value)) = &self.include_file_path {
283            unsafe {
284                df.with_column_unchecked(Column::new_scalar(
285                    col.clone(),
286                    Scalar::new(
287                        DataType::String,
288                        AnyValue::StringOwned(value.as_ref().into()),
289                    ),
290                    if df.width() > 0 { df.height() } else { n_rows },
291                ))
292            };
293        }
295        Ok(df)
296    }
299/// A Parquet reader on top of the async object_store API. Only the batch reader is implemented since
300/// parquet files on cloud storage tend to be big and slow to access.
301#[cfg(feature = "cloud")]
302pub struct ParquetAsyncReader {
303    reader: ParquetObjectStore,
304    slice: (usize, usize),
305    rechunk: bool,
306    projection: Option<Vec<usize>>,
307    predicate: Option<ScanIOPredicate>,
308    row_index: Option<RowIndex>,
309    use_statistics: bool,
310    hive_partition_columns: Option<Vec<Series>>,
311    include_file_path: Option<(PlSmallStr, Arc<str>)>,
312    schema: Option<ArrowSchemaRef>,
313    parallel: ParallelStrategy,
316#[cfg(feature = "cloud")]
317impl ParquetAsyncReader {
318    pub async fn from_uri(
319        uri: &str,
320        cloud_options: Option<&CloudOptions>,
321        metadata: Option<FileMetadataRef>,
322    ) -> PolarsResult<ParquetAsyncReader> {
323        Ok(ParquetAsyncReader {
324            reader: ParquetObjectStore::from_uri(uri, cloud_options, metadata).await?,
325            rechunk: false,
326            slice: (0, usize::MAX),
327            projection: None,
328            row_index: None,
329            predicate: None,
330            use_statistics: true,
331            hive_partition_columns: None,
332            include_file_path: None,
333            schema: None,
334            parallel: Default::default(),
335        })
336    }
338    pub async fn with_arrow_schema_projection(
339        mut self,
340        first_schema: &Arc<ArrowSchema>,
341        projected_arrow_schema: Option<&ArrowSchema>,
342        allow_missing_columns: bool,
343    ) -> PolarsResult<Self> {
344        let slf_schema = self.schema().await?;
345        let slf_schema_width = slf_schema.len();
347        if allow_missing_columns {
348            // Must check the dtypes
349            ensure_matching_dtypes_if_found(
350                projected_arrow_schema.unwrap_or(first_schema.as_ref()),
351                self.schema().await?.as_ref(),
352            )?;
353            self.schema = Some(Arc::new(
354                first_schema
355                    .iter()
356                    .map(|(name, field)| {
357                        (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
358                    })
359                    .collect(),
360            ));
361        }
363        let schema = self.schema().await?;
365        (|| {
366            if let Some(projected_arrow_schema) = projected_arrow_schema {
367                self.projection = projected_arrow_schema_to_projection_indices(
368                    schema.as_ref(),
369                    projected_arrow_schema,
370                )?;
371            } else {
372                if slf_schema_width > first_schema.len() {
373                    polars_bail!(
374                       SchemaMismatch:
375                       "parquet file contained extra columns and no selection was given"
376                    )
377                }
379                self.projection =
380                    projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
381            };
382            Ok(())
383        })()
384        .map_err(|e| {
385            if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
386                e.wrap_msg(|s| {
387                    format!(
388                        "error with column selection, \
389                        consider enabling `allow_missing_columns`: {}",
390                        s
391                    )
392                })
393            } else {
394                e
395            }
396        })?;
398        Ok(self)
399    }
401    pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
402        self.schema = Some(match self.schema.as_ref() {
403            Some(schema) => Arc::clone(schema),
404            None => {
405                let metadata = self.reader.get_metadata().await?;
406                let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
407                Arc::new(arrow_schema)
408            },
409        });
411        Ok(self.schema.clone().unwrap())
412    }
414    pub async fn num_rows(&mut self) -> PolarsResult<usize> {
415        self.reader.num_rows().await
416    }
418    /// Only positive offsets are supported for simplicity - the caller should
419    /// translate negative offsets into the positive equivalent.
420    pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
421        self.slice = slice.unwrap_or((0, usize::MAX));
422        self
423    }
425    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
426        self.row_index = row_index;
427        self
428    }
430    pub fn set_rechunk(mut self, rechunk: bool) -> Self {
431        self.rechunk = rechunk;
432        self
433    }
435    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
436        self.projection = projection;
437        self
438    }
440    pub fn with_predicate(mut self, predicate: Option<ScanIOPredicate>) -> Self {
441        self.predicate = predicate;
442        self
443    }
445    /// Use statistics in the parquet to determine if pages
446    /// can be skipped from reading.
447    pub fn use_statistics(mut self, toggle: bool) -> Self {
448        self.use_statistics = toggle;
449        self
450    }
452    pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
453        self.hive_partition_columns = columns;
454        self
455    }
457    pub fn with_include_file_path(
458        mut self,
459        include_file_path: Option<(PlSmallStr, Arc<str>)>,
460    ) -> Self {
461        self.include_file_path = include_file_path;
462        self
463    }
465    pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
466        self.parallel = parallel;
467        self
468    }
470    pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
471        let metadata = self.reader.get_metadata().await?.clone();
472        let schema = match self.schema {
473            Some(schema) => schema,
474            None => self.schema().await?,
475        };
476        // row group fetched deals with projection
477        let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
478            self.reader,
479            schema.clone(),
480            self.projection.as_deref(),
481            self.predicate.clone(),
482            compute_row_group_range(
483                0,
484                metadata.row_groups.len(),
485                self.slice,
486                &metadata.row_groups,
487            ),
488            &metadata.row_groups,
489        )?
490        .into();
491        BatchedParquetReader::new(
492            row_group_fetcher,
493            metadata,
494            schema,
495            self.slice,
496            self.projection,
497            self.predicate.clone(),
498            self.row_index,
499            chunk_size,
500            self.use_statistics,
501            self.hive_partition_columns,
502            self.include_file_path,
503            self.parallel,
504        )
505    }
507    pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
508        self.reader.get_metadata().await
509    }
511    pub async fn finish(mut self) -> PolarsResult<DataFrame> {
512        let rechunk = self.rechunk;
513        let metadata = self.get_metadata().await?.clone();
514        let reader_schema = self.schema().await?;
515        let row_index = self.row_index.clone();
516        let hive_partition_columns = self.hive_partition_columns.clone();
517        let projection = self.projection.clone();
519        // batched reader deals with slice pushdown
520        let reader = self.batched(usize::MAX).await?;
521        let n_batches = metadata.row_groups.len();
522        let mut iter = reader.iter(n_batches);
524        let mut chunks = Vec::with_capacity(n_batches);
525        while let Some(result) = iter.next_().await {
526            chunks.push(result?)
527        }
528        if chunks.is_empty() {
529            return Ok(materialize_empty_df(
530                projection.as_deref(),
531                reader_schema.as_ref(),
532                hive_partition_columns.as_deref(),
533                row_index.as_ref(),
534            ));
535        }
536        let mut df = accumulate_dataframes_vertical_unchecked(chunks);
538        if rechunk {
539            df.as_single_chunk_par();
540        }
541        Ok(df)
542    }