polars_io/parquet/read/
reader.rs

1use std::io::{Read, Seek};
2use std::sync::Arc;
3
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::prelude::*;
6use polars_parquet::read;
7
8use super::read_impl::read_parquet;
9use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
10use crate::RowIndex;
11use crate::mmap::MmapBytesReader;
12use crate::parquet::metadata::FileMetadataRef;
13use crate::prelude::*;
14
15/// Read Apache parquet format into a DataFrame.
16#[must_use]
17pub struct ParquetReader<R: Read + Seek> {
18    reader: R,
19    rechunk: bool,
20    slice: (usize, usize),
21    columns: Option<Vec<String>>,
22    projection: Option<Vec<usize>>,
23    parallel: ParallelStrategy,
24    schema: Option<ArrowSchemaRef>,
25    row_index: Option<RowIndex>,
26    low_memory: bool,
27    metadata: Option<FileMetadataRef>,
28    hive_partition_columns: Option<Vec<Series>>,
29    include_file_path: Option<(PlSmallStr, Arc<str>)>,
30}
31
32impl<R: MmapBytesReader> ParquetReader<R> {
33    /// Try to reduce memory pressure at the expense of performance. If setting this does not reduce memory
34    /// enough, turn off parallelization.
35    pub fn set_low_memory(mut self, low_memory: bool) -> Self {
36        self.low_memory = low_memory;
37        self
38    }
39
40    /// Read the parquet file in parallel (default). The single threaded reader consumes less memory.
41    pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
42        self.parallel = parallel;
43        self
44    }
45
46    pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
47        self.slice = slice.unwrap_or((0, usize::MAX));
48        self
49    }
50
51    /// Columns to select/ project
52    pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
53        self.columns = columns;
54        self
55    }
56
57    /// Set the reader's column projection. This counts from 0, meaning that
58    /// `vec![0, 4]` would select the 1st and 5th column.
59    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
60        self.projection = projection;
61        self
62    }
63
64    /// Add a row index column.
65    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
66        self.row_index = row_index;
67        self
68    }
69
70    /// Checks that the file contains all the columns in `projected_arrow_schema` with the same
71    /// dtype, and sets the projection indices.
72    pub fn with_arrow_schema_projection(
73        mut self,
74        first_schema: &Arc<ArrowSchema>,
75        projected_arrow_schema: Option<&ArrowSchema>,
76        allow_missing_columns: bool,
77    ) -> PolarsResult<Self> {
78        let slf_schema = self.schema()?;
79        let slf_schema_width = slf_schema.len();
80
81        if allow_missing_columns {
82            // Must check the dtypes
83            ensure_matching_dtypes_if_found(
84                projected_arrow_schema.unwrap_or(first_schema.as_ref()),
85                self.schema()?.as_ref(),
86            )?;
87            self.schema = Some(Arc::new(
88                first_schema
89                    .iter()
90                    .map(|(name, field)| {
91                        (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
92                    })
93                    .collect(),
94            ));
95        }
96
97        let schema = self.schema()?;
98
99        (|| {
100            if let Some(projected_arrow_schema) = projected_arrow_schema {
101                self.projection = projected_arrow_schema_to_projection_indices(
102                    schema.as_ref(),
103                    projected_arrow_schema,
104                )?;
105            } else {
106                if slf_schema_width > first_schema.len() {
107                    polars_bail!(
108                       SchemaMismatch:
109                       "parquet file contained extra columns and no selection was given"
110                    )
111                }
112
113                self.projection =
114                    projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
115            };
116            Ok(())
117        })()
118        .map_err(|e| {
119            if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
120                e.wrap_msg(|s| {
121                    format!(
122                        "error with column selection, \
123                        consider passing `missing_columns='insert'`: {s}"
124                    )
125                })
126            } else {
127                e
128            }
129        })?;
130
131        Ok(self)
132    }
133
134    /// [`Schema`] of the file.
135    pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
136        self.schema = Some(match &self.schema {
137            Some(schema) => schema.clone(),
138            None => {
139                let metadata = self.get_metadata()?;
140                Arc::new(read::infer_schema(metadata)?)
141            },
142        });
143
144        Ok(self.schema.clone().unwrap())
145    }
146
147    /// Number of rows in the parquet file.
148    pub fn num_rows(&mut self) -> PolarsResult<usize> {
149        let metadata = self.get_metadata()?;
150        Ok(metadata.num_rows)
151    }
152
153    pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
154        self.hive_partition_columns = columns;
155        self
156    }
157
158    pub fn with_include_file_path(
159        mut self,
160        include_file_path: Option<(PlSmallStr, Arc<str>)>,
161    ) -> Self {
162        self.include_file_path = include_file_path;
163        self
164    }
165
166    pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
167        self.metadata = Some(metadata);
168    }
169
170    pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
171        if self.metadata.is_none() {
172            self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
173        }
174        Ok(self.metadata.as_ref().unwrap())
175    }
176}
177
178impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
179    /// Create a new [`ParquetReader`] from an existing `Reader`.
180    fn new(reader: R) -> Self {
181        ParquetReader {
182            reader,
183            rechunk: false,
184            slice: (0, usize::MAX),
185            columns: None,
186            projection: None,
187            parallel: Default::default(),
188            row_index: None,
189            low_memory: false,
190            metadata: None,
191            schema: None,
192            hive_partition_columns: None,
193            include_file_path: None,
194        }
195    }
196
197    fn set_rechunk(mut self, rechunk: bool) -> Self {
198        self.rechunk = rechunk;
199        self
200    }
201
202    fn finish(mut self) -> PolarsResult<DataFrame> {
203        let schema = self.schema()?;
204        let metadata = self.get_metadata()?.clone();
205        let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
206
207        if let Some(cols) = &self.columns {
208            self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
209        }
210
211        let mut df = read_parquet(
212            self.reader,
213            self.slice,
214            self.projection.as_deref(),
215            &schema,
216            Some(metadata),
217            self.parallel,
218            self.row_index,
219            self.hive_partition_columns.as_deref(),
220        )?;
221
222        if self.rechunk {
223            df.as_single_chunk_par();
224        };
225
226        if let Some((col, value)) = &self.include_file_path {
227            unsafe {
228                df.with_column_unchecked(Column::new_scalar(
229                    col.clone(),
230                    Scalar::new(
231                        DataType::String,
232                        AnyValue::StringOwned(value.as_ref().into()),
233                    ),
234                    if df.width() > 0 { df.height() } else { n_rows },
235                ))
236            };
237        }
238
239        Ok(df)
240    }
241}