polars_io/parquet/read/
reader.rs1use std::io::{Read, Seek};
2use std::sync::Arc;
3
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::prelude::*;
6use polars_parquet::read;
7
8use super::read_impl::read_parquet;
9use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
10use crate::RowIndex;
11use crate::mmap::MmapBytesReader;
12use crate::parquet::metadata::FileMetadataRef;
13use crate::prelude::*;
14
15#[must_use]
17pub struct ParquetReader<R: Read + Seek> {
18 reader: R,
19 rechunk: bool,
20 slice: (usize, usize),
21 columns: Option<Vec<String>>,
22 projection: Option<Vec<usize>>,
23 parallel: ParallelStrategy,
24 schema: Option<ArrowSchemaRef>,
25 row_index: Option<RowIndex>,
26 low_memory: bool,
27 metadata: Option<FileMetadataRef>,
28 hive_partition_columns: Option<Vec<Series>>,
29 include_file_path: Option<(PlSmallStr, Arc<str>)>,
30}
31
32impl<R: MmapBytesReader> ParquetReader<R> {
33 pub fn set_low_memory(mut self, low_memory: bool) -> Self {
36 self.low_memory = low_memory;
37 self
38 }
39
40 pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
42 self.parallel = parallel;
43 self
44 }
45
46 pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
47 self.slice = slice.unwrap_or((0, usize::MAX));
48 self
49 }
50
51 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
53 self.columns = columns;
54 self
55 }
56
57 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
60 self.projection = projection;
61 self
62 }
63
64 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
66 self.row_index = row_index;
67 self
68 }
69
70 pub fn with_arrow_schema_projection(
73 mut self,
74 first_schema: &Arc<ArrowSchema>,
75 projected_arrow_schema: Option<&ArrowSchema>,
76 allow_missing_columns: bool,
77 ) -> PolarsResult<Self> {
78 let slf_schema = self.schema()?;
79 let slf_schema_width = slf_schema.len();
80
81 if allow_missing_columns {
82 ensure_matching_dtypes_if_found(
84 projected_arrow_schema.unwrap_or(first_schema.as_ref()),
85 self.schema()?.as_ref(),
86 )?;
87 self.schema = Some(Arc::new(
88 first_schema
89 .iter()
90 .map(|(name, field)| {
91 (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
92 })
93 .collect(),
94 ));
95 }
96
97 let schema = self.schema()?;
98
99 (|| {
100 if let Some(projected_arrow_schema) = projected_arrow_schema {
101 self.projection = projected_arrow_schema_to_projection_indices(
102 schema.as_ref(),
103 projected_arrow_schema,
104 )?;
105 } else {
106 if slf_schema_width > first_schema.len() {
107 polars_bail!(
108 SchemaMismatch:
109 "parquet file contained extra columns and no selection was given"
110 )
111 }
112
113 self.projection =
114 projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
115 };
116 Ok(())
117 })()
118 .map_err(|e| {
119 if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
120 e.wrap_msg(|s| {
121 format!(
122 "error with column selection, \
123 consider passing `missing_columns='insert'`: {s}"
124 )
125 })
126 } else {
127 e
128 }
129 })?;
130
131 Ok(self)
132 }
133
134 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
136 self.schema = Some(match &self.schema {
137 Some(schema) => schema.clone(),
138 None => {
139 let metadata = self.get_metadata()?;
140 Arc::new(read::infer_schema(metadata)?)
141 },
142 });
143
144 Ok(self.schema.clone().unwrap())
145 }
146
147 pub fn num_rows(&mut self) -> PolarsResult<usize> {
149 let metadata = self.get_metadata()?;
150 Ok(metadata.num_rows)
151 }
152
153 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
154 self.hive_partition_columns = columns;
155 self
156 }
157
158 pub fn with_include_file_path(
159 mut self,
160 include_file_path: Option<(PlSmallStr, Arc<str>)>,
161 ) -> Self {
162 self.include_file_path = include_file_path;
163 self
164 }
165
166 pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
167 self.metadata = Some(metadata);
168 }
169
170 pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
171 if self.metadata.is_none() {
172 self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
173 }
174 Ok(self.metadata.as_ref().unwrap())
175 }
176}
177
178impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
179 fn new(reader: R) -> Self {
181 ParquetReader {
182 reader,
183 rechunk: false,
184 slice: (0, usize::MAX),
185 columns: None,
186 projection: None,
187 parallel: Default::default(),
188 row_index: None,
189 low_memory: false,
190 metadata: None,
191 schema: None,
192 hive_partition_columns: None,
193 include_file_path: None,
194 }
195 }
196
197 fn set_rechunk(mut self, rechunk: bool) -> Self {
198 self.rechunk = rechunk;
199 self
200 }
201
202 fn finish(mut self) -> PolarsResult<DataFrame> {
203 let schema = self.schema()?;
204 let metadata = self.get_metadata()?.clone();
205 let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
206
207 if let Some(cols) = &self.columns {
208 self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
209 }
210
211 let mut df = read_parquet(
212 self.reader,
213 self.slice,
214 self.projection.as_deref(),
215 &schema,
216 Some(metadata),
217 self.parallel,
218 self.row_index,
219 self.hive_partition_columns.as_deref(),
220 )?;
221
222 if self.rechunk {
223 df.as_single_chunk_par();
224 };
225
226 if let Some((col, value)) = &self.include_file_path {
227 unsafe {
228 df.with_column_unchecked(Column::new_scalar(
229 col.clone(),
230 Scalar::new(
231 DataType::String,
232 AnyValue::StringOwned(value.as_ref().into()),
233 ),
234 if df.width() > 0 { df.height() } else { n_rows },
235 ))
236 };
237 }
238
239 Ok(df)
240 }
241}