polars_io/ipc/
ipc_file.rs1use std::io::{Read, Seek};
36use std::path::PathBuf;
37
38use arrow::datatypes::{ArrowSchemaRef, Metadata};
39use arrow::io::ipc::read::{self, get_row_count};
40use arrow::record_batch::RecordBatch;
41use polars_core::prelude::*;
42#[cfg(feature = "serde")]
43use serde::{Deserialize, Serialize};
44
45use crate::RowIndex;
46use crate::hive::materialize_hive_partitions;
47use crate::mmap::MmapBytesReader;
48use crate::predicates::PhysicalIoExpr;
49use crate::prelude::*;
50use crate::shared::{ArrowReader, finish_reader};
51
52#[derive(Clone, Debug, PartialEq, Hash)]
53#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
54#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
55pub struct IpcScanOptions;
56
57#[expect(clippy::derivable_impls)]
58impl Default for IpcScanOptions {
59 fn default() -> Self {
60 Self {}
61 }
62}
63
64#[must_use]
81pub struct IpcReader<R: MmapBytesReader> {
82 pub(super) reader: R,
84 rechunk: bool,
86 pub(super) n_rows: Option<usize>,
87 pub(super) projection: Option<Vec<usize>>,
88 pub(crate) columns: Option<Vec<String>>,
89 hive_partition_columns: Option<Vec<Series>>,
90 include_file_path: Option<(PlSmallStr, Arc<str>)>,
91 pub(super) row_index: Option<RowIndex>,
92 pub(super) memory_map: Option<PathBuf>,
94 metadata: Option<read::FileMetadata>,
95 schema: Option<ArrowSchemaRef>,
96}
97
98fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
99 if let PolarsError::ComputeError(s) = &err {
100 if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
101 eprintln!(
102 "Could not memory_map compressed IPC file, defaulting to normal read. \
103 Toggle off 'memory_map' to silence this warning."
104 );
105 return Ok(());
106 }
107 }
108 Err(err)
109}
110
111impl<R: MmapBytesReader> IpcReader<R> {
112 fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
113 if self.metadata.is_none() {
114 let metadata = read::read_file_metadata(&mut self.reader)?;
115 self.schema = Some(metadata.schema.clone());
116 self.metadata = Some(metadata);
117 }
118 Ok(self.metadata.as_ref().unwrap())
119 }
120
121 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
123 self.get_metadata()?;
124 Ok(self.schema.as_ref().unwrap().clone())
125 }
126
127 pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
129 self.get_metadata()?;
130 Ok(self
131 .metadata
132 .as_ref()
133 .and_then(|meta| meta.custom_schema_metadata.clone()))
134 }
135
136 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
138 self.n_rows = num_rows;
139 self
140 }
141
142 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
144 self.columns = columns;
145 self
146 }
147
148 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
149 self.hive_partition_columns = columns;
150 self
151 }
152
153 pub fn with_include_file_path(
154 mut self,
155 include_file_path: Option<(PlSmallStr, Arc<str>)>,
156 ) -> Self {
157 self.include_file_path = include_file_path;
158 self
159 }
160
161 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
163 self.row_index = row_index;
164 self
165 }
166
167 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
170 self.projection = projection;
171 self
172 }
173
174 pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
177 self.memory_map = path_buf;
178 self
179 }
180
181 #[cfg(feature = "lazy")]
183 pub fn finish_with_scan_ops(
184 mut self,
185 predicate: Option<Arc<dyn PhysicalIoExpr>>,
186 verbose: bool,
187 ) -> PolarsResult<DataFrame> {
188 if self.memory_map.is_some() && self.reader.to_file().is_some() {
189 if verbose {
190 eprintln!("memory map ipc file")
191 }
192 match self.finish_memmapped(predicate.clone()) {
193 Ok(df) => return Ok(df),
194 Err(err) => check_mmap_err(err)?,
195 }
196 }
197 let rechunk = self.rechunk;
198 let metadata = read::read_file_metadata(&mut self.reader)?;
199
200 if let Some(columns) = &self.columns {
204 self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
205 }
206
207 let schema = if let Some(projection) = &self.projection {
208 Arc::new(apply_projection(&metadata.schema, projection))
209 } else {
210 metadata.schema.clone()
211 };
212
213 let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
214
215 finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
216 }
217}
218
219impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
220where
221 R: Read + Seek,
222{
223 fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
224 self.next().map_or(Ok(None), |v| v.map(Some))
225 }
226}
227
228impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
229 fn new(reader: R) -> Self {
230 IpcReader {
231 reader,
232 rechunk: true,
233 n_rows: None,
234 columns: None,
235 hive_partition_columns: None,
236 include_file_path: None,
237 projection: None,
238 row_index: None,
239 memory_map: None,
240 metadata: None,
241 schema: None,
242 }
243 }
244
245 fn set_rechunk(mut self, rechunk: bool) -> Self {
246 self.rechunk = rechunk;
247 self
248 }
249
250 fn finish(mut self) -> PolarsResult<DataFrame> {
251 let reader_schema = if let Some(ref schema) = self.schema {
252 schema.clone()
253 } else {
254 self.get_metadata()?.schema.clone()
255 };
256 let reader_schema = reader_schema.as_ref();
257
258 let hive_partition_columns = self.hive_partition_columns.take();
259 let include_file_path = self.include_file_path.take();
260
261 let mut df = (|| {
264 if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
265 let row_count = if let Some(v) = self.n_rows {
266 v
267 } else {
268 get_row_count(&mut self.reader)? as usize
269 };
270 let mut df = DataFrame::empty_with_height(row_count);
271
272 if let Some(ri) = &self.row_index {
273 unsafe { df.with_row_index_mut(ri.name.clone(), Some(ri.offset)) };
274 }
275 return PolarsResult::Ok(df);
276 }
277
278 if self.memory_map.is_some() && self.reader.to_file().is_some() {
279 match self.finish_memmapped(None) {
280 Ok(df) => {
281 return Ok(df);
282 },
283 Err(err) => check_mmap_err(err)?,
284 }
285 }
286 let rechunk = self.rechunk;
287 let schema = self.get_metadata()?.schema.clone();
288
289 if let Some(columns) = &self.columns {
290 let prj = columns_to_projection(columns, schema.as_ref())?;
291 self.projection = Some(prj);
292 }
293
294 let schema = if let Some(projection) = &self.projection {
295 Arc::new(apply_projection(schema.as_ref(), projection))
296 } else {
297 schema
298 };
299
300 let metadata = self.get_metadata()?.clone();
301
302 let ipc_reader =
303 read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
304 let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
305 Ok(df)
306 })()?;
307
308 if let Some(hive_cols) = hive_partition_columns {
309 materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice()));
310 };
311
312 if let Some((col, value)) = include_file_path {
313 unsafe {
314 df.with_column_unchecked(Column::new_scalar(
315 col,
316 Scalar::new(
317 DataType::String,
318 AnyValue::StringOwned(value.as_ref().into()),
319 ),
320 df.height(),
321 ))
322 };
323 }
324
325 Ok(df)
326 }
327}