polars_io/ipc/
ipc_file.rs1use std::io::{Read, Seek};
36use std::path::PathBuf;
37
38use arrow::datatypes::{ArrowSchemaRef, Metadata};
39use arrow::io::ipc::read::{self, get_row_count};
40use arrow::record_batch::RecordBatch;
41use polars_core::prelude::*;
42use polars_utils::pl_str::PlRefStr;
43#[cfg(feature = "serde")]
44use serde::{Deserialize, Serialize};
45
46use crate::RowIndex;
47use crate::hive::materialize_hive_partitions;
48use crate::mmap::MmapBytesReader;
49use crate::predicates::PhysicalIoExpr;
50use crate::prelude::*;
51use crate::shared::{ArrowReader, finish_reader};
52
53#[derive(Clone, Debug, PartialEq, Hash)]
54#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
55#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
56pub struct IpcScanOptions {
57 pub record_batch_statistics: bool,
59}
60
61#[expect(clippy::derivable_impls)]
62impl Default for IpcScanOptions {
63 fn default() -> Self {
64 Self {
65 record_batch_statistics: false,
66 }
67 }
68}
69
70#[must_use]
87pub struct IpcReader<R: MmapBytesReader> {
88 pub(super) reader: R,
90 rechunk: bool,
92 pub(super) n_rows: Option<usize>,
93 pub(super) projection: Option<Vec<usize>>,
94 pub(crate) columns: Option<Vec<String>>,
95 hive_partition_columns: Option<Vec<Series>>,
96 include_file_path: Option<(PlSmallStr, PlRefStr)>,
97 pub(super) row_index: Option<RowIndex>,
98 pub(super) memory_map: Option<PathBuf>,
100 metadata: Option<read::FileMetadata>,
101 schema: Option<ArrowSchemaRef>,
102}
103
104fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
105 if let PolarsError::ComputeError(s) = &err {
106 if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
107 eprintln!(
108 "Could not memory_map compressed IPC file, defaulting to normal read. \
109 Toggle off 'memory_map' to silence this warning."
110 );
111 return Ok(());
112 }
113 }
114 Err(err)
115}
116
117impl<R: MmapBytesReader> IpcReader<R> {
118 fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
119 if self.metadata.is_none() {
120 let metadata = read::read_file_metadata(&mut self.reader)?;
121 self.schema = Some(metadata.schema.clone());
122 self.metadata = Some(metadata);
123 }
124 Ok(self.metadata.as_ref().unwrap())
125 }
126
127 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
129 self.get_metadata()?;
130 Ok(self.schema.as_ref().unwrap().clone())
131 }
132
133 pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
135 self.get_metadata()?;
136 Ok(self
137 .metadata
138 .as_ref()
139 .and_then(|meta| meta.custom_schema_metadata.clone()))
140 }
141
142 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
144 self.n_rows = num_rows;
145 self
146 }
147
148 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
150 self.columns = columns;
151 self
152 }
153
154 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
155 self.hive_partition_columns = columns;
156 self
157 }
158
159 pub fn with_include_file_path(
160 mut self,
161 include_file_path: Option<(PlSmallStr, PlRefStr)>,
162 ) -> Self {
163 self.include_file_path = include_file_path;
164 self
165 }
166
167 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
169 self.row_index = row_index;
170 self
171 }
172
173 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
176 self.projection = projection;
177 self
178 }
179
180 pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
183 self.memory_map = path_buf;
184 self
185 }
186
187 #[cfg(feature = "lazy")]
189 pub fn finish_with_scan_ops(
190 mut self,
191 predicate: Option<Arc<dyn PhysicalIoExpr>>,
192 verbose: bool,
193 ) -> PolarsResult<DataFrame> {
194 if self.memory_map.is_some() && self.reader.to_file().is_some() {
195 if verbose {
196 eprintln!("memory map ipc file")
197 }
198 match self.finish_memmapped(predicate.clone()) {
199 Ok(df) => return Ok(df),
200 Err(err) => check_mmap_err(err)?,
201 }
202 }
203 let rechunk = self.rechunk;
204 let metadata = read::read_file_metadata(&mut self.reader)?;
205
206 if let Some(columns) = &self.columns {
210 self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
211 }
212
213 let schema = if let Some(projection) = &self.projection {
214 Arc::new(apply_projection(&metadata.schema, projection))
215 } else {
216 metadata.schema.clone()
217 };
218
219 let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
220
221 finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
222 }
223}
224
225impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
226where
227 R: Read + Seek,
228{
229 fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
230 self.next().map_or(Ok(None), |v| v.map(Some))
231 }
232}
233
234impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
235 fn new(reader: R) -> Self {
236 IpcReader {
237 reader,
238 rechunk: true,
239 n_rows: None,
240 columns: None,
241 hive_partition_columns: None,
242 include_file_path: None,
243 projection: None,
244 row_index: None,
245 memory_map: None,
246 metadata: None,
247 schema: None,
248 }
249 }
250
251 fn set_rechunk(mut self, rechunk: bool) -> Self {
252 self.rechunk = rechunk;
253 self
254 }
255
256 fn finish(mut self) -> PolarsResult<DataFrame> {
257 let reader_schema = if let Some(ref schema) = self.schema {
258 schema.clone()
259 } else {
260 self.get_metadata()?.schema.clone()
261 };
262 let reader_schema = reader_schema.as_ref();
263
264 let hive_partition_columns = self.hive_partition_columns.take();
265 let include_file_path = self.include_file_path.take();
266
267 let mut df = (|| {
270 if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
271 let row_count = if let Some(v) = self.n_rows {
272 v
273 } else {
274 get_row_count(&mut self.reader)? as usize
275 };
276 let mut df = DataFrame::empty_with_height(row_count);
277
278 if let Some(ri) = &self.row_index {
279 unsafe { df.with_row_index_mut(ri.name.clone(), Some(ri.offset)) };
280 }
281 return PolarsResult::Ok(df);
282 }
283
284 if self.memory_map.is_some() && self.reader.to_file().is_some() {
285 match self.finish_memmapped(None) {
286 Ok(df) => {
287 return Ok(df);
288 },
289 Err(err) => check_mmap_err(err)?,
290 }
291 }
292 let rechunk = self.rechunk;
293 let schema = self.get_metadata()?.schema.clone();
294
295 if let Some(columns) = &self.columns {
296 let prj = columns_to_projection(columns, schema.as_ref())?;
297 self.projection = Some(prj);
298 }
299
300 let schema = if let Some(projection) = &self.projection {
301 Arc::new(apply_projection(schema.as_ref(), projection))
302 } else {
303 schema
304 };
305
306 let metadata = self.get_metadata()?.clone();
307
308 let ipc_reader =
309 read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
310 let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
311 Ok(df)
312 })()?;
313
314 if let Some(hive_cols) = hive_partition_columns {
315 materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice()));
316 };
317
318 if let Some((col, value)) = include_file_path {
319 unsafe {
320 df.push_column_unchecked(Column::new_scalar(
321 col,
322 Scalar::new(
323 DataType::String,
324 AnyValue::StringOwned(value.as_str().into()),
325 ),
326 df.height(),
327 ))
328 };
329 }
330
331 Ok(df)
332 }
333}