polars_io/ipc/
ipc_file.rs1use std::io::{Read, Seek};
36use std::path::PathBuf;
37
38use arrow::datatypes::{ArrowSchemaRef, Metadata};
39use arrow::io::ipc::read::{self, get_row_count};
40use arrow::record_batch::RecordBatch;
41use polars_core::prelude::*;
42use polars_utils::pl_str::PlRefStr;
43#[cfg(feature = "serde")]
44use serde::{Deserialize, Serialize};
45
46use crate::RowIndex;
47use crate::hive::materialize_hive_partitions;
48use crate::mmap::MmapBytesReader;
49use crate::predicates::PhysicalIoExpr;
50use crate::prelude::*;
51use crate::shared::{ArrowReader, finish_reader};
52
53#[derive(Clone, Debug, PartialEq, Hash)]
54#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
55#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
56pub struct IpcScanOptions;
57
58#[expect(clippy::derivable_impls)]
59impl Default for IpcScanOptions {
60 fn default() -> Self {
61 Self {}
62 }
63}
64
65#[must_use]
82pub struct IpcReader<R: MmapBytesReader> {
83 pub(super) reader: R,
85 rechunk: bool,
87 pub(super) n_rows: Option<usize>,
88 pub(super) projection: Option<Vec<usize>>,
89 pub(crate) columns: Option<Vec<String>>,
90 hive_partition_columns: Option<Vec<Series>>,
91 include_file_path: Option<(PlSmallStr, PlRefStr)>,
92 pub(super) row_index: Option<RowIndex>,
93 pub(super) memory_map: Option<PathBuf>,
95 metadata: Option<read::FileMetadata>,
96 schema: Option<ArrowSchemaRef>,
97}
98
99fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
100 if let PolarsError::ComputeError(s) = &err {
101 if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
102 eprintln!(
103 "Could not memory_map compressed IPC file, defaulting to normal read. \
104 Toggle off 'memory_map' to silence this warning."
105 );
106 return Ok(());
107 }
108 }
109 Err(err)
110}
111
112impl<R: MmapBytesReader> IpcReader<R> {
113 fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
114 if self.metadata.is_none() {
115 let metadata = read::read_file_metadata(&mut self.reader)?;
116 self.schema = Some(metadata.schema.clone());
117 self.metadata = Some(metadata);
118 }
119 Ok(self.metadata.as_ref().unwrap())
120 }
121
122 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
124 self.get_metadata()?;
125 Ok(self.schema.as_ref().unwrap().clone())
126 }
127
128 pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
130 self.get_metadata()?;
131 Ok(self
132 .metadata
133 .as_ref()
134 .and_then(|meta| meta.custom_schema_metadata.clone()))
135 }
136
137 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
139 self.n_rows = num_rows;
140 self
141 }
142
143 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
145 self.columns = columns;
146 self
147 }
148
149 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
150 self.hive_partition_columns = columns;
151 self
152 }
153
154 pub fn with_include_file_path(
155 mut self,
156 include_file_path: Option<(PlSmallStr, PlRefStr)>,
157 ) -> Self {
158 self.include_file_path = include_file_path;
159 self
160 }
161
162 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
164 self.row_index = row_index;
165 self
166 }
167
168 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
171 self.projection = projection;
172 self
173 }
174
175 pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
178 self.memory_map = path_buf;
179 self
180 }
181
182 #[cfg(feature = "lazy")]
184 pub fn finish_with_scan_ops(
185 mut self,
186 predicate: Option<Arc<dyn PhysicalIoExpr>>,
187 verbose: bool,
188 ) -> PolarsResult<DataFrame> {
189 if self.memory_map.is_some() && self.reader.to_file().is_some() {
190 if verbose {
191 eprintln!("memory map ipc file")
192 }
193 match self.finish_memmapped(predicate.clone()) {
194 Ok(df) => return Ok(df),
195 Err(err) => check_mmap_err(err)?,
196 }
197 }
198 let rechunk = self.rechunk;
199 let metadata = read::read_file_metadata(&mut self.reader)?;
200
201 if let Some(columns) = &self.columns {
205 self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
206 }
207
208 let schema = if let Some(projection) = &self.projection {
209 Arc::new(apply_projection(&metadata.schema, projection))
210 } else {
211 metadata.schema.clone()
212 };
213
214 let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
215
216 finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
217 }
218}
219
220impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
221where
222 R: Read + Seek,
223{
224 fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
225 self.next().map_or(Ok(None), |v| v.map(Some))
226 }
227}
228
229impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
230 fn new(reader: R) -> Self {
231 IpcReader {
232 reader,
233 rechunk: true,
234 n_rows: None,
235 columns: None,
236 hive_partition_columns: None,
237 include_file_path: None,
238 projection: None,
239 row_index: None,
240 memory_map: None,
241 metadata: None,
242 schema: None,
243 }
244 }
245
246 fn set_rechunk(mut self, rechunk: bool) -> Self {
247 self.rechunk = rechunk;
248 self
249 }
250
251 fn finish(mut self) -> PolarsResult<DataFrame> {
252 let reader_schema = if let Some(ref schema) = self.schema {
253 schema.clone()
254 } else {
255 self.get_metadata()?.schema.clone()
256 };
257 let reader_schema = reader_schema.as_ref();
258
259 let hive_partition_columns = self.hive_partition_columns.take();
260 let include_file_path = self.include_file_path.take();
261
262 let mut df = (|| {
265 if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
266 let row_count = if let Some(v) = self.n_rows {
267 v
268 } else {
269 get_row_count(&mut self.reader)? as usize
270 };
271 let mut df = DataFrame::empty_with_height(row_count);
272
273 if let Some(ri) = &self.row_index {
274 unsafe { df.with_row_index_mut(ri.name.clone(), Some(ri.offset)) };
275 }
276 return PolarsResult::Ok(df);
277 }
278
279 if self.memory_map.is_some() && self.reader.to_file().is_some() {
280 match self.finish_memmapped(None) {
281 Ok(df) => {
282 return Ok(df);
283 },
284 Err(err) => check_mmap_err(err)?,
285 }
286 }
287 let rechunk = self.rechunk;
288 let schema = self.get_metadata()?.schema.clone();
289
290 if let Some(columns) = &self.columns {
291 let prj = columns_to_projection(columns, schema.as_ref())?;
292 self.projection = Some(prj);
293 }
294
295 let schema = if let Some(projection) = &self.projection {
296 Arc::new(apply_projection(schema.as_ref(), projection))
297 } else {
298 schema
299 };
300
301 let metadata = self.get_metadata()?.clone();
302
303 let ipc_reader =
304 read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
305 let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
306 Ok(df)
307 })()?;
308
309 if let Some(hive_cols) = hive_partition_columns {
310 materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice()));
311 };
312
313 if let Some((col, value)) = include_file_path {
314 unsafe {
315 df.push_column_unchecked(Column::new_scalar(
316 col,
317 Scalar::new(
318 DataType::String,
319 AnyValue::StringOwned(value.as_str().into()),
320 ),
321 df.height(),
322 ))
323 };
324 }
325
326 Ok(df)
327 }
328}