polars_io/parquet/read/
reader.rs1use std::io::{Read, Seek};
2use std::sync::Arc;
3
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::prelude::*;
6use polars_parquet::read;
7
8use super::read_impl::read_parquet;
9use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
10use crate::RowIndex;
11use crate::mmap::MmapBytesReader;
12use crate::parquet::metadata::FileMetadataRef;
13use crate::prelude::*;
14
15#[must_use]
17pub struct ParquetReader<R: Read + Seek> {
18 reader: R,
19 rechunk: bool,
20 slice: (usize, usize),
21 columns: Option<Vec<String>>,
22 projection: Option<Vec<usize>>,
23 parallel: ParallelStrategy,
24 schema: Option<ArrowSchemaRef>,
25 row_index: Option<RowIndex>,
26 low_memory: bool,
27 metadata: Option<FileMetadataRef>,
28 hive_partition_columns: Option<Vec<Series>>,
29 include_file_path: Option<(PlSmallStr, Arc<str>)>,
30}
31
32impl<R: MmapBytesReader> ParquetReader<R> {
33 pub fn set_low_memory(mut self, low_memory: bool) -> Self {
36 self.low_memory = low_memory;
37 self
38 }
39
40 pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
42 self.parallel = parallel;
43 self
44 }
45
46 pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
47 self.slice = slice.unwrap_or((0, usize::MAX));
48 self
49 }
50
51 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
53 self.columns = columns;
54 self
55 }
56
57 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
60 self.projection = projection;
61 self
62 }
63
64 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
66 self.row_index = row_index;
67 self
68 }
69
70 pub fn with_arrow_schema_projection(
73 mut self,
74 first_schema: &Arc<ArrowSchema>,
75 projected_arrow_schema: Option<&ArrowSchema>,
76 allow_missing_columns: bool,
77 ) -> PolarsResult<Self> {
78 let slf_schema = self.schema()?;
79 let slf_schema_width = slf_schema.len();
80
81 if allow_missing_columns {
82 ensure_matching_dtypes_if_found(
84 projected_arrow_schema.unwrap_or(first_schema.as_ref()),
85 self.schema()?.as_ref(),
86 )?;
87 self.schema = Some(Arc::new(
88 first_schema
89 .iter()
90 .map(|(name, field)| {
91 (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
92 })
93 .collect(),
94 ));
95 }
96
97 let schema = self.schema()?;
98
99 (|| {
100 if let Some(projected_arrow_schema) = projected_arrow_schema {
101 self.projection = projected_arrow_schema_to_projection_indices(
102 schema.as_ref(),
103 projected_arrow_schema,
104 )?;
105 } else {
106 if slf_schema_width > first_schema.len() {
107 polars_bail!(
108 SchemaMismatch:
109 "parquet file contained extra columns and no selection was given"
110 )
111 }
112
113 self.projection =
114 projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
115 };
116 Ok(())
117 })()
118 .map_err(|e| {
119 if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
120 e.wrap_msg(|s| {
121 format!(
122 "error with column selection, \
123 consider enabling `allow_missing_columns`: {}",
124 s
125 )
126 })
127 } else {
128 e
129 }
130 })?;
131
132 Ok(self)
133 }
134
135 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
137 self.schema = Some(match &self.schema {
138 Some(schema) => schema.clone(),
139 None => {
140 let metadata = self.get_metadata()?;
141 Arc::new(read::infer_schema(metadata)?)
142 },
143 });
144
145 Ok(self.schema.clone().unwrap())
146 }
147
148 pub fn num_rows(&mut self) -> PolarsResult<usize> {
150 let metadata = self.get_metadata()?;
151 Ok(metadata.num_rows)
152 }
153
154 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
155 self.hive_partition_columns = columns;
156 self
157 }
158
159 pub fn with_include_file_path(
160 mut self,
161 include_file_path: Option<(PlSmallStr, Arc<str>)>,
162 ) -> Self {
163 self.include_file_path = include_file_path;
164 self
165 }
166
167 pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
168 self.metadata = Some(metadata);
169 }
170
171 pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
172 if self.metadata.is_none() {
173 self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
174 }
175 Ok(self.metadata.as_ref().unwrap())
176 }
177}
178
179impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
180 fn new(reader: R) -> Self {
182 ParquetReader {
183 reader,
184 rechunk: false,
185 slice: (0, usize::MAX),
186 columns: None,
187 projection: None,
188 parallel: Default::default(),
189 row_index: None,
190 low_memory: false,
191 metadata: None,
192 schema: None,
193 hive_partition_columns: None,
194 include_file_path: None,
195 }
196 }
197
198 fn set_rechunk(mut self, rechunk: bool) -> Self {
199 self.rechunk = rechunk;
200 self
201 }
202
203 fn finish(mut self) -> PolarsResult<DataFrame> {
204 let schema = self.schema()?;
205 let metadata = self.get_metadata()?.clone();
206 let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
207
208 if let Some(cols) = &self.columns {
209 self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
210 }
211
212 let mut df = read_parquet(
213 self.reader,
214 self.slice,
215 self.projection.as_deref(),
216 &schema,
217 Some(metadata),
218 self.parallel,
219 self.row_index,
220 self.hive_partition_columns.as_deref(),
221 )?;
222
223 if self.rechunk {
224 df.as_single_chunk_par();
225 };
226
227 if let Some((col, value)) = &self.include_file_path {
228 unsafe {
229 df.with_column_unchecked(Column::new_scalar(
230 col.clone(),
231 Scalar::new(
232 DataType::String,
233 AnyValue::StringOwned(value.as_ref().into()),
234 ),
235 if df.width() > 0 { df.height() } else { n_rows },
236 ))
237 };
238 }
239
240 Ok(df)
241 }
242}