polars_io/ipc/
ipc_file.rs1use std::io::{Read, Seek};
36use std::path::PathBuf;
37
38use arrow::datatypes::{ArrowSchemaRef, Metadata};
39use arrow::io::ipc::read::{self, get_row_count};
40use arrow::record_batch::RecordBatch;
41use polars_core::prelude::*;
42#[cfg(feature = "serde")]
43use serde::{Deserialize, Serialize};
44
45use crate::RowIndex;
46use crate::hive::materialize_hive_partitions;
47use crate::mmap::MmapBytesReader;
48use crate::predicates::PhysicalIoExpr;
49use crate::prelude::*;
50use crate::shared::{ArrowReader, finish_reader};
51
52#[derive(Clone, Debug, PartialEq, Hash)]
53#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
54pub struct IpcScanOptions;
55
56#[must_use]
73pub struct IpcReader<R: MmapBytesReader> {
74 pub(super) reader: R,
76 rechunk: bool,
78 pub(super) n_rows: Option<usize>,
79 pub(super) projection: Option<Vec<usize>>,
80 pub(crate) columns: Option<Vec<String>>,
81 hive_partition_columns: Option<Vec<Series>>,
82 include_file_path: Option<(PlSmallStr, Arc<str>)>,
83 pub(super) row_index: Option<RowIndex>,
84 pub(super) memory_map: Option<PathBuf>,
86 metadata: Option<read::FileMetadata>,
87 schema: Option<ArrowSchemaRef>,
88}
89
90fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
91 if let PolarsError::ComputeError(s) = &err {
92 if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
93 eprintln!(
94 "Could not memory_map compressed IPC file, defaulting to normal read. \
95 Toggle off 'memory_map' to silence this warning."
96 );
97 return Ok(());
98 }
99 }
100 Err(err)
101}
102
103impl<R: MmapBytesReader> IpcReader<R> {
104 fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
105 if self.metadata.is_none() {
106 let metadata = read::read_file_metadata(&mut self.reader)?;
107 self.schema = Some(metadata.schema.clone());
108 self.metadata = Some(metadata);
109 }
110 Ok(self.metadata.as_ref().unwrap())
111 }
112
113 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
115 self.get_metadata()?;
116 Ok(self.schema.as_ref().unwrap().clone())
117 }
118
119 pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
121 self.get_metadata()?;
122 Ok(self
123 .metadata
124 .as_ref()
125 .and_then(|meta| meta.custom_schema_metadata.clone()))
126 }
127
128 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
130 self.n_rows = num_rows;
131 self
132 }
133
134 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
136 self.columns = columns;
137 self
138 }
139
140 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
141 self.hive_partition_columns = columns;
142 self
143 }
144
145 pub fn with_include_file_path(
146 mut self,
147 include_file_path: Option<(PlSmallStr, Arc<str>)>,
148 ) -> Self {
149 self.include_file_path = include_file_path;
150 self
151 }
152
153 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
155 self.row_index = row_index;
156 self
157 }
158
159 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
162 self.projection = projection;
163 self
164 }
165
166 pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
169 self.memory_map = path_buf;
170 self
171 }
172
173 #[cfg(feature = "lazy")]
175 pub fn finish_with_scan_ops(
176 mut self,
177 predicate: Option<Arc<dyn PhysicalIoExpr>>,
178 verbose: bool,
179 ) -> PolarsResult<DataFrame> {
180 if self.memory_map.is_some() && self.reader.to_file().is_some() {
181 if verbose {
182 eprintln!("memory map ipc file")
183 }
184 match self.finish_memmapped(predicate.clone()) {
185 Ok(df) => return Ok(df),
186 Err(err) => check_mmap_err(err)?,
187 }
188 }
189 let rechunk = self.rechunk;
190 let metadata = read::read_file_metadata(&mut self.reader)?;
191
192 if let Some(columns) = &self.columns {
196 self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
197 }
198
199 let schema = if let Some(projection) = &self.projection {
200 Arc::new(apply_projection(&metadata.schema, projection))
201 } else {
202 metadata.schema.clone()
203 };
204
205 let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
206
207 finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
208 }
209}
210
211impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
212where
213 R: Read + Seek,
214{
215 fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
216 self.next().map_or(Ok(None), |v| v.map(Some))
217 }
218}
219
220impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
221 fn new(reader: R) -> Self {
222 IpcReader {
223 reader,
224 rechunk: true,
225 n_rows: None,
226 columns: None,
227 hive_partition_columns: None,
228 include_file_path: None,
229 projection: None,
230 row_index: None,
231 memory_map: None,
232 metadata: None,
233 schema: None,
234 }
235 }
236
237 fn set_rechunk(mut self, rechunk: bool) -> Self {
238 self.rechunk = rechunk;
239 self
240 }
241
242 fn finish(mut self) -> PolarsResult<DataFrame> {
243 let reader_schema = if let Some(ref schema) = self.schema {
244 schema.clone()
245 } else {
246 self.get_metadata()?.schema.clone()
247 };
248 let reader_schema = reader_schema.as_ref();
249
250 let hive_partition_columns = self.hive_partition_columns.take();
251 let include_file_path = self.include_file_path.take();
252
253 let mut df = (|| {
256 if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
257 let row_count = if let Some(v) = self.n_rows {
258 v
259 } else {
260 get_row_count(&mut self.reader)? as usize
261 };
262 let mut df = DataFrame::empty_with_height(row_count);
263
264 if let Some(ri) = &self.row_index {
265 unsafe { df.with_row_index_mut(ri.name.clone(), Some(ri.offset)) };
266 }
267 return PolarsResult::Ok(df);
268 }
269
270 if self.memory_map.is_some() && self.reader.to_file().is_some() {
271 match self.finish_memmapped(None) {
272 Ok(df) => {
273 return Ok(df);
274 },
275 Err(err) => check_mmap_err(err)?,
276 }
277 }
278 let rechunk = self.rechunk;
279 let schema = self.get_metadata()?.schema.clone();
280
281 if let Some(columns) = &self.columns {
282 let prj = columns_to_projection(columns, schema.as_ref())?;
283 self.projection = Some(prj);
284 }
285
286 let schema = if let Some(projection) = &self.projection {
287 Arc::new(apply_projection(schema.as_ref(), projection))
288 } else {
289 schema
290 };
291
292 let metadata = self.get_metadata()?.clone();
293
294 let ipc_reader =
295 read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
296 let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
297 Ok(df)
298 })()?;
299
300 if let Some(hive_cols) = hive_partition_columns {
301 materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice()));
302 };
303
304 if let Some((col, value)) = include_file_path {
305 unsafe {
306 df.with_column_unchecked(Column::new_scalar(
307 col,
308 Scalar::new(
309 DataType::String,
310 AnyValue::StringOwned(value.as_ref().into()),
311 ),
312 df.height(),
313 ))
314 };
315 }
316
317 Ok(df)
318 }
319}