polars_io/parquet/read/
reader.rs

1use std::io::{Read, Seek};
2use std::sync::Arc;
3
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::prelude::*;
6use polars_parquet::read;
7use polars_utils::pl_str::PlRefStr;
8
9use super::read_impl::read_parquet;
10use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
11use crate::RowIndex;
12use crate::mmap::MmapBytesReader;
13use crate::parquet::metadata::FileMetadataRef;
14use crate::prelude::*;
15
16/// Read Apache parquet format into a DataFrame.
17#[must_use]
18pub struct ParquetReader<R: Read + Seek> {
19    reader: R,
20    rechunk: bool,
21    slice: (usize, usize),
22    columns: Option<Vec<String>>,
23    projection: Option<Vec<usize>>,
24    parallel: ParallelStrategy,
25    schema: Option<ArrowSchemaRef>,
26    row_index: Option<RowIndex>,
27    low_memory: bool,
28    metadata: Option<FileMetadataRef>,
29    hive_partition_columns: Option<Vec<Series>>,
30    include_file_path: Option<(PlSmallStr, PlRefStr)>,
31}
32
33impl<R: MmapBytesReader> ParquetReader<R> {
34    /// Try to reduce memory pressure at the expense of performance. If setting this does not reduce memory
35    /// enough, turn off parallelization.
36    pub fn set_low_memory(mut self, low_memory: bool) -> Self {
37        self.low_memory = low_memory;
38        self
39    }
40
41    /// Read the parquet file in parallel (default). The single threaded reader consumes less memory.
42    pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
43        self.parallel = parallel;
44        self
45    }
46
47    pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
48        self.slice = slice.unwrap_or((0, usize::MAX));
49        self
50    }
51
52    /// Columns to select/ project
53    pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
54        self.columns = columns;
55        self
56    }
57
58    /// Set the reader's column projection. This counts from 0, meaning that
59    /// `vec![0, 4]` would select the 1st and 5th column.
60    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
61        self.projection = projection;
62        self
63    }
64
65    /// Add a row index column.
66    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
67        self.row_index = row_index;
68        self
69    }
70
71    /// Checks that the file contains all the columns in `projected_arrow_schema` with the same
72    /// dtype, and sets the projection indices.
73    pub fn with_arrow_schema_projection(
74        mut self,
75        first_schema: &Arc<ArrowSchema>,
76        projected_arrow_schema: Option<&ArrowSchema>,
77        allow_missing_columns: bool,
78    ) -> PolarsResult<Self> {
79        let slf_schema = self.schema()?;
80        let slf_schema_width = slf_schema.len();
81
82        if allow_missing_columns {
83            // Must check the dtypes
84            ensure_matching_dtypes_if_found(
85                projected_arrow_schema.unwrap_or(first_schema.as_ref()),
86                self.schema()?.as_ref(),
87            )?;
88            self.schema = Some(Arc::new(
89                first_schema
90                    .iter()
91                    .map(|(name, field)| {
92                        (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
93                    })
94                    .collect(),
95            ));
96        }
97
98        let schema = self.schema()?;
99
100        (|| {
101            if let Some(projected_arrow_schema) = projected_arrow_schema {
102                self.projection = projected_arrow_schema_to_projection_indices(
103                    schema.as_ref(),
104                    projected_arrow_schema,
105                )?;
106            } else {
107                if slf_schema_width > first_schema.len() {
108                    polars_bail!(
109                       SchemaMismatch:
110                       "parquet file contained extra columns and no selection was given"
111                    )
112                }
113
114                self.projection =
115                    projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
116            };
117            Ok(())
118        })()
119        .map_err(|e| {
120            if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
121                e.wrap_msg(|s| {
122                    format!(
123                        "error with column selection, \
124                        consider passing `missing_columns='insert'`: {s}"
125                    )
126                })
127            } else {
128                e
129            }
130        })?;
131
132        Ok(self)
133    }
134
135    /// [`Schema`] of the file.
136    pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
137        self.schema = Some(match &self.schema {
138            Some(schema) => schema.clone(),
139            None => {
140                let metadata = self.get_metadata()?;
141                Arc::new(read::infer_schema(metadata)?)
142            },
143        });
144
145        Ok(self.schema.clone().unwrap())
146    }
147
148    /// Number of rows in the parquet file.
149    pub fn num_rows(&mut self) -> PolarsResult<usize> {
150        let metadata = self.get_metadata()?;
151        Ok(metadata.num_rows)
152    }
153
154    pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
155        self.hive_partition_columns = columns;
156        self
157    }
158
159    pub fn with_include_file_path(
160        mut self,
161        include_file_path: Option<(PlSmallStr, PlRefStr)>,
162    ) -> Self {
163        self.include_file_path = include_file_path;
164        self
165    }
166
167    pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
168        self.metadata = Some(metadata);
169    }
170
171    pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
172        if self.metadata.is_none() {
173            self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
174        }
175        Ok(self.metadata.as_ref().unwrap())
176    }
177}
178
179impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
180    /// Create a new [`ParquetReader`] from an existing `Reader`.
181    fn new(reader: R) -> Self {
182        ParquetReader {
183            reader,
184            rechunk: false,
185            slice: (0, usize::MAX),
186            columns: None,
187            projection: None,
188            parallel: Default::default(),
189            row_index: None,
190            low_memory: false,
191            metadata: None,
192            schema: None,
193            hive_partition_columns: None,
194            include_file_path: None,
195        }
196    }
197
198    fn set_rechunk(mut self, rechunk: bool) -> Self {
199        self.rechunk = rechunk;
200        self
201    }
202
203    fn finish(mut self) -> PolarsResult<DataFrame> {
204        let schema = self.schema()?;
205        let metadata = self.get_metadata()?.clone();
206        let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
207
208        if let Some(cols) = &self.columns {
209            self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
210        }
211
212        let mut df = read_parquet(
213            self.reader,
214            self.slice,
215            self.projection.as_deref(),
216            &schema,
217            Some(metadata),
218            self.parallel,
219            self.row_index,
220            self.hive_partition_columns.as_deref(),
221        )?;
222
223        if self.rechunk {
224            df.rechunk_mut_par();
225        };
226
227        if let Some((col, value)) = &self.include_file_path {
228            unsafe {
229                df.push_column_unchecked(Column::new_scalar(
230                    col.clone(),
231                    Scalar::new(
232                        DataType::String,
233                        AnyValue::StringOwned(value.as_str().into()),
234                    ),
235                    if df.width() > 0 { df.height() } else { n_rows },
236                ))
237            };
238        }
239
240        Ok(df)
241    }
242}