polars_io/parquet/read/
reader.rs1use std::io::{Read, Seek};
2use std::sync::Arc;
3
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::prelude::*;
6use polars_parquet::read;
7use polars_utils::pl_str::PlRefStr;
8
9use super::read_impl::read_parquet;
10use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
11use crate::RowIndex;
12use crate::mmap::MmapBytesReader;
13use crate::parquet::metadata::FileMetadataRef;
14use crate::prelude::*;
15
16#[must_use]
18pub struct ParquetReader<R: Read + Seek> {
19 reader: R,
20 rechunk: bool,
21 slice: (usize, usize),
22 columns: Option<Vec<String>>,
23 projection: Option<Vec<usize>>,
24 parallel: ParallelStrategy,
25 schema: Option<ArrowSchemaRef>,
26 row_index: Option<RowIndex>,
27 low_memory: bool,
28 metadata: Option<FileMetadataRef>,
29 hive_partition_columns: Option<Vec<Series>>,
30 include_file_path: Option<(PlSmallStr, PlRefStr)>,
31}
32
33impl<R: MmapBytesReader> ParquetReader<R> {
34 pub fn set_low_memory(mut self, low_memory: bool) -> Self {
37 self.low_memory = low_memory;
38 self
39 }
40
41 pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
43 self.parallel = parallel;
44 self
45 }
46
47 pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
48 self.slice = slice.unwrap_or((0, usize::MAX));
49 self
50 }
51
52 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
54 self.columns = columns;
55 self
56 }
57
58 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
61 self.projection = projection;
62 self
63 }
64
65 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
67 self.row_index = row_index;
68 self
69 }
70
71 pub fn with_arrow_schema_projection(
74 mut self,
75 first_schema: &Arc<ArrowSchema>,
76 projected_arrow_schema: Option<&ArrowSchema>,
77 allow_missing_columns: bool,
78 ) -> PolarsResult<Self> {
79 let slf_schema = self.schema()?;
80 let slf_schema_width = slf_schema.len();
81
82 if allow_missing_columns {
83 ensure_matching_dtypes_if_found(
85 projected_arrow_schema.unwrap_or(first_schema.as_ref()),
86 self.schema()?.as_ref(),
87 )?;
88 self.schema = Some(Arc::new(
89 first_schema
90 .iter()
91 .map(|(name, field)| {
92 (name.clone(), slf_schema.get(name).unwrap_or(field).clone())
93 })
94 .collect(),
95 ));
96 }
97
98 let schema = self.schema()?;
99
100 (|| {
101 if let Some(projected_arrow_schema) = projected_arrow_schema {
102 self.projection = projected_arrow_schema_to_projection_indices(
103 schema.as_ref(),
104 projected_arrow_schema,
105 )?;
106 } else {
107 if slf_schema_width > first_schema.len() {
108 polars_bail!(
109 SchemaMismatch:
110 "parquet file contained extra columns and no selection was given"
111 )
112 }
113
114 self.projection =
115 projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
116 };
117 Ok(())
118 })()
119 .map_err(|e| {
120 if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
121 e.wrap_msg(|s| {
122 format!(
123 "error with column selection, \
124 consider passing `missing_columns='insert'`: {s}"
125 )
126 })
127 } else {
128 e
129 }
130 })?;
131
132 Ok(self)
133 }
134
135 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
137 self.schema = Some(match &self.schema {
138 Some(schema) => schema.clone(),
139 None => {
140 let metadata = self.get_metadata()?;
141 Arc::new(read::infer_schema(metadata)?)
142 },
143 });
144
145 Ok(self.schema.clone().unwrap())
146 }
147
148 pub fn num_rows(&mut self) -> PolarsResult<usize> {
150 let metadata = self.get_metadata()?;
151 Ok(metadata.num_rows)
152 }
153
154 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
155 self.hive_partition_columns = columns;
156 self
157 }
158
159 pub fn with_include_file_path(
160 mut self,
161 include_file_path: Option<(PlSmallStr, PlRefStr)>,
162 ) -> Self {
163 self.include_file_path = include_file_path;
164 self
165 }
166
167 pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
168 self.metadata = Some(metadata);
169 }
170
171 pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
172 if self.metadata.is_none() {
173 self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
174 }
175 Ok(self.metadata.as_ref().unwrap())
176 }
177}
178
179impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
180 fn new(reader: R) -> Self {
182 ParquetReader {
183 reader,
184 rechunk: false,
185 slice: (0, usize::MAX),
186 columns: None,
187 projection: None,
188 parallel: Default::default(),
189 row_index: None,
190 low_memory: false,
191 metadata: None,
192 schema: None,
193 hive_partition_columns: None,
194 include_file_path: None,
195 }
196 }
197
198 fn set_rechunk(mut self, rechunk: bool) -> Self {
199 self.rechunk = rechunk;
200 self
201 }
202
203 fn finish(mut self) -> PolarsResult<DataFrame> {
204 let schema = self.schema()?;
205 let metadata = self.get_metadata()?.clone();
206 let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
207
208 if let Some(cols) = &self.columns {
209 self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
210 }
211
212 let mut df = read_parquet(
213 self.reader,
214 self.slice,
215 self.projection.as_deref(),
216 &schema,
217 Some(metadata),
218 self.parallel,
219 self.row_index,
220 self.hive_partition_columns.as_deref(),
221 )?;
222
223 if self.rechunk {
224 df.rechunk_mut_par();
225 };
226
227 if let Some((col, value)) = &self.include_file_path {
228 unsafe {
229 df.push_column_unchecked(Column::new_scalar(
230 col.clone(),
231 Scalar::new(
232 DataType::String,
233 AnyValue::StringOwned(value.as_str().into()),
234 ),
235 if df.width() > 0 { df.height() } else { n_rows },
236 ))
237 };
238 }
239
240 Ok(df)
241 }
242}