polars_io/ipc/
write.rs

1use std::io::Write;
2
3use arrow::datatypes::Metadata;
4use arrow::io::ipc::write::{self, EncodedData, WriteOptions};
5use polars_core::prelude::*;
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8
9use crate::prelude::*;
10use crate::shared::schema_to_arrow_checked;
11
12#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
13#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
14pub struct IpcWriterOptions {
15    /// Data page compression
16    pub compression: Option<IpcCompression>,
17    /// Compatibility level
18    pub compat_level: CompatLevel,
19    /// Size of each written chunk.
20    pub chunk_size: IdxSize,
21}
22
23impl Default for IpcWriterOptions {
24    fn default() -> Self {
25        Self {
26            compression: None,
27            compat_level: CompatLevel::newest(),
28            chunk_size: 1 << 18,
29        }
30    }
31}
32
33impl IpcWriterOptions {
34    pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
35        IpcWriter::new(writer).with_compression(self.compression)
36    }
37}
38
39/// Write a DataFrame to Arrow's IPC format
40///
41/// # Example
42///
43/// ```
44/// use polars_core::prelude::*;
45/// use polars_io::ipc::IpcWriter;
46/// use std::fs::File;
47/// use polars_io::SerWriter;
48///
49/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
50///     let mut file = File::create("file.ipc").expect("could not create file");
51///
52///     let mut writer = IpcWriter::new(&mut file);
53///
54///     let custom_metadata = [
55///         ("first_name".into(), "John".into()),
56///         ("last_name".into(), "Doe".into()),
57///     ]
58///     .into_iter()
59///     .collect();
60///     writer.set_custom_schema_metadata(Arc::new(custom_metadata));
61///     writer.finish(df)
62/// }
63///
64/// ```
65#[must_use]
66pub struct IpcWriter<W> {
67    pub(super) writer: W,
68    pub(super) compression: Option<IpcCompression>,
69    /// Polars' flavor of arrow. This might be temporary.
70    pub(super) compat_level: CompatLevel,
71    pub(super) parallel: bool,
72    pub(super) custom_schema_metadata: Option<Arc<Metadata>>,
73}
74
75impl<W: Write> IpcWriter<W> {
76    /// Set the compression used. Defaults to None.
77    pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
78        self.compression = compression;
79        self
80    }
81
82    pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
83        self.compat_level = compat_level;
84        self
85    }
86
87    pub fn with_parallel(mut self, parallel: bool) -> Self {
88        self.parallel = parallel;
89        self
90    }
91
92    pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
93        let schema = schema_to_arrow_checked(schema, self.compat_level, "ipc")?;
94        let mut writer = write::FileWriter::new(
95            self.writer,
96            Arc::new(schema),
97            None,
98            WriteOptions {
99                compression: self.compression.map(|c| c.into()),
100            },
101        );
102        writer.start()?;
103
104        Ok(BatchedWriter {
105            writer,
106            compat_level: self.compat_level,
107        })
108    }
109
110    /// Sets custom schema metadata. Must be called before `start` is called
111    pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
112        self.custom_schema_metadata = Some(custom_metadata);
113    }
114}
115
116impl<W> SerWriter<W> for IpcWriter<W>
117where
118    W: Write,
119{
120    fn new(writer: W) -> Self {
121        IpcWriter {
122            writer,
123            compression: None,
124            compat_level: CompatLevel::newest(),
125            parallel: true,
126            custom_schema_metadata: None,
127        }
128    }
129
130    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
131        let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
132        let mut ipc_writer = write::FileWriter::try_new(
133            &mut self.writer,
134            Arc::new(schema),
135            None,
136            WriteOptions {
137                compression: self.compression.map(|c| c.into()),
138            },
139        )?;
140        if let Some(custom_metadata) = &self.custom_schema_metadata {
141            ipc_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
142        }
143
144        if self.parallel {
145            df.align_chunks_par();
146        } else {
147            df.align_chunks();
148        }
149        let iter = df.iter_chunks(self.compat_level, true);
150
151        for batch in iter {
152            ipc_writer.write(&batch, None)?
153        }
154        ipc_writer.finish()?;
155        Ok(())
156    }
157}
158
159pub struct BatchedWriter<W: Write> {
160    writer: write::FileWriter<W>,
161    compat_level: CompatLevel,
162}
163
164impl<W: Write> BatchedWriter<W> {
165    /// Write a batch to the ipc writer.
166    ///
167    /// # Panics
168    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
169    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
170        let iter = df.iter_chunks(self.compat_level, true);
171        for batch in iter {
172            self.writer.write(&batch, None)?
173        }
174        Ok(())
175    }
176
177    /// Write a encoded data to the ipc writer.
178    ///
179    /// # Panics
180    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
181    pub fn write_encoded(
182        &mut self,
183        dictionaries: &[EncodedData],
184        message: &EncodedData,
185    ) -> PolarsResult<()> {
186        self.writer.write_encoded(dictionaries, message)?;
187        Ok(())
188    }
189
190    /// Writes the footer of the IPC file.
191    pub fn finish(&mut self) -> PolarsResult<()> {
192        self.writer.finish()?;
193        Ok(())
194    }
195}
196
197/// Compression codec
198#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
199#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
200pub enum IpcCompression {
201    /// LZ4 (framed)
202    LZ4,
203    /// ZSTD
204    #[default]
205    ZSTD,
206}
207
208impl From<IpcCompression> for write::Compression {
209    fn from(value: IpcCompression) -> Self {
210        match value {
211            IpcCompression::LZ4 => write::Compression::LZ4,
212            IpcCompression::ZSTD => write::Compression::ZSTD,
213        }
214    }
215}