polars_io/ipc/
ipc_stream.rs

1//! # (De)serializing Arrows Streaming IPC format.
2//!
3//! Arrow Streaming IPC is a [binary format](https://arrow.apache.org/docs/python/ipc.html).
4//! It used for sending an arbitrary length sequence of record batches.
5//! The format must be processed from start to end, and does not support random access.
6//! It is different than IPC, if you can't deserialize a file with `IpcReader::new`, it's probably an IPC Stream File.
7//!
8//! ## Example
9//!
10//! ```rust
11//! use polars_core::prelude::*;
12//! use polars_io::prelude::*;
13//! use std::io::Cursor;
14//!
15//!
16//! let c0 = Column::new("days".into(), &[0, 1, 2, 3, 4]);
17//! let c1 = Column::new("temp".into(), &[22.1, 19.9, 7., 2., 3.]);
18//! let mut df = DataFrame::new(vec![c0, c1]).unwrap();
19//!
20//! // Create an in memory file handler.
21//! // Vec<u8>: Read + Write
22//! // Cursor<T>: Seek
23//!
24//! let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
25//!
26//! // write to the in memory buffer
27//! IpcStreamWriter::new(&mut buf).finish(&mut df).expect("ipc writer");
28//!
29//! // reset the buffers index after writing to the beginning of the buffer
30//! buf.set_position(0);
31//!
32//! // read the buffer into a DataFrame
33//! let df_read = IpcStreamReader::new(buf).finish().unwrap();
34//! assert!(df.equals(&df_read));
35//! ```
36use std::io::{Read, Write};
37use std::path::PathBuf;
38
39use arrow::datatypes::Metadata;
40use arrow::io::ipc::read::{StreamMetadata, StreamState};
41use arrow::io::ipc::write::WriteOptions;
42use arrow::io::ipc::{read, write};
43use polars_core::frame::chunk_df_for_writing;
44use polars_core::prelude::*;
45
46use crate::prelude::*;
47use crate::shared::{ArrowReader, finish_reader};
48
49/// Read Arrows Stream IPC format into a DataFrame
50///
51/// # Example
52/// ```
53/// use polars_core::prelude::*;
54/// use std::fs::File;
55/// use polars_io::ipc::IpcStreamReader;
56/// use polars_io::SerReader;
57///
58/// fn example() -> PolarsResult<DataFrame> {
59///     let file = File::open("file.ipc").expect("file not found");
60///
61///     IpcStreamReader::new(file)
62///         .finish()
63/// }
64/// ```
65#[must_use]
66pub struct IpcStreamReader<R> {
67    /// File or Stream object
68    reader: R,
69    /// Aggregates chunks afterwards to a single chunk.
70    rechunk: bool,
71    n_rows: Option<usize>,
72    projection: Option<Vec<usize>>,
73    columns: Option<Vec<String>>,
74    row_index: Option<RowIndex>,
75    metadata: Option<StreamMetadata>,
76}
77
78impl<R: Read> IpcStreamReader<R> {
79    /// Get schema of the Ipc Stream File
80    pub fn schema(&mut self) -> PolarsResult<Schema> {
81        Ok(Schema::from_arrow_schema(&self.metadata()?.schema))
82    }
83
84    /// Get arrow schema of the Ipc Stream File, this is faster than creating a polars schema.
85    pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
86        Ok(self.metadata()?.schema)
87    }
88
89    /// Get schema-level custom metadata of the Ipc Stream file
90    pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
91        Ok(self.metadata()?.custom_schema_metadata.map(Arc::new))
92    }
93
94    /// Stop reading when `n` rows are read.
95    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
96        self.n_rows = num_rows;
97        self
98    }
99
100    /// Columns to select/ project
101    pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
102        self.columns = columns;
103        self
104    }
105
106    /// Add a row index column.
107    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
108        self.row_index = row_index;
109        self
110    }
111
112    /// Set the reader's column projection. This counts from 0, meaning that
113    /// `vec![0, 4]` would select the 1st and 5th column.
114    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
115        self.projection = projection;
116        self
117    }
118
119    fn metadata(&mut self) -> PolarsResult<StreamMetadata> {
120        match &self.metadata {
121            None => {
122                let metadata = read::read_stream_metadata(&mut self.reader)?;
123                self.metadata = Option::from(metadata.clone());
124                Ok(metadata)
125            },
126            Some(md) => Ok(md.clone()),
127        }
128    }
129}
130
131impl<R> ArrowReader for read::StreamReader<R>
132where
133    R: Read,
134{
135    fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
136        self.next().map_or(Ok(None), |v| match v {
137            Ok(stream_state) => match stream_state {
138                StreamState::Waiting => Ok(None),
139                StreamState::Some(chunk) => Ok(Some(chunk)),
140            },
141            Err(err) => Err(err),
142        })
143    }
144}
145
146impl<R> SerReader<R> for IpcStreamReader<R>
147where
148    R: Read,
149{
150    fn new(reader: R) -> Self {
151        IpcStreamReader {
152            reader,
153            rechunk: true,
154            n_rows: None,
155            columns: None,
156            projection: None,
157            row_index: None,
158            metadata: None,
159        }
160    }
161
162    fn set_rechunk(mut self, rechunk: bool) -> Self {
163        self.rechunk = rechunk;
164        self
165    }
166
167    fn finish(mut self) -> PolarsResult<DataFrame> {
168        let rechunk = self.rechunk;
169        let metadata = self.metadata()?;
170        let schema = &metadata.schema;
171
172        if let Some(columns) = self.columns {
173            let prj = columns_to_projection(&columns, schema)?;
174            self.projection = Some(prj);
175        }
176
177        let schema = if let Some(projection) = &self.projection {
178            apply_projection(&metadata.schema, projection)
179        } else {
180            metadata.schema.clone()
181        };
182
183        let ipc_reader =
184            read::StreamReader::new(&mut self.reader, metadata.clone(), self.projection);
185        finish_reader(
186            ipc_reader,
187            rechunk,
188            self.n_rows,
189            None,
190            &schema,
191            self.row_index,
192        )
193    }
194}
195
196/// Write a DataFrame to Arrow's Streaming IPC format
197///
198/// # Example
199///
200/// ```
201/// use polars_core::prelude::*;
202/// use polars_io::ipc::IpcStreamWriter;
203/// use std::fs::File;
204/// use polars_io::SerWriter;
205///
206/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
207///     let mut file = File::create("file.ipc").expect("could not create file");
208///
209///     let mut writer = IpcStreamWriter::new(&mut file);
210///
211///     let custom_metadata = [
212///         ("first_name".into(), "John".into()),
213///         ("last_name".into(), "Doe".into()),
214///     ]
215///     .into_iter()
216///     .collect();
217///     writer.set_custom_schema_metadata(Arc::new(custom_metadata));
218///
219///     writer.finish(df)
220/// }
221///
222/// ```
223#[must_use]
224pub struct IpcStreamWriter<W> {
225    writer: W,
226    compression: Option<IpcCompression>,
227    compat_level: CompatLevel,
228    /// Custom schema-level metadata
229    custom_schema_metadata: Option<Arc<Metadata>>,
230}
231
232use arrow::record_batch::RecordBatch;
233
234use crate::RowIndex;
235
236impl<W> IpcStreamWriter<W> {
237    /// Set the compression used. Defaults to None.
238    pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
239        self.compression = compression;
240        self
241    }
242
243    pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
244        self.compat_level = compat_level;
245        self
246    }
247
248    /// Sets custom schema metadata. Must be called before `start` is called
249    pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
250        self.custom_schema_metadata = Some(custom_metadata);
251    }
252}
253
254impl<W> SerWriter<W> for IpcStreamWriter<W>
255where
256    W: Write,
257{
258    fn new(writer: W) -> Self {
259        IpcStreamWriter {
260            writer,
261            compression: None,
262            compat_level: CompatLevel::oldest(),
263            custom_schema_metadata: None,
264        }
265    }
266
267    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
268        let mut ipc_stream_writer = write::StreamWriter::new(
269            &mut self.writer,
270            WriteOptions {
271                compression: self.compression.map(|c| c.into()),
272            },
273        );
274
275        if let Some(custom_metadata) = &self.custom_schema_metadata {
276            ipc_stream_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
277        }
278
279        ipc_stream_writer.start(&df.schema().to_arrow(self.compat_level), None)?;
280        let df = chunk_df_for_writing(df, 512 * 512)?;
281        let iter = df.iter_chunks(self.compat_level, true);
282
283        for batch in iter {
284            ipc_stream_writer.write(&batch, None)?
285        }
286        ipc_stream_writer.finish()?;
287        Ok(())
288    }
289}
290
291pub struct IpcStreamWriterOption {
292    compression: Option<IpcCompression>,
293    extension: PathBuf,
294}
295
296impl IpcStreamWriterOption {
297    pub fn new() -> Self {
298        Self {
299            compression: None,
300            extension: PathBuf::from(".ipc"),
301        }
302    }
303
304    /// Set the compression used. Defaults to None.
305    pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
306        self.compression = compression;
307        self
308    }
309
310    /// Set the extension. Defaults to ".ipc".
311    pub fn with_extension(mut self, extension: PathBuf) -> Self {
312        self.extension = extension;
313        self
314    }
315}
316
317impl Default for IpcStreamWriterOption {
318    fn default() -> Self {
319        Self::new()
320    }
321}