polars_io/ipc/
write.rs

1use std::io::Write;
2
3use arrow::datatypes::Metadata;
4use arrow::io::ipc::IpcField;
5use arrow::io::ipc::write::{self, EncodedData, WriteOptions};
6use polars_core::prelude::*;
7#[cfg(feature = "serde")]
8use serde::{Deserialize, Serialize};
9
10use crate::prelude::*;
11use crate::shared::schema_to_arrow_checked;
12
13#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
14#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
15#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
16pub struct IpcWriterOptions {
17    /// Data page compression
18    pub compression: Option<IpcCompression>,
19    /// Compatibility level
20    pub compat_level: CompatLevel,
21    /// Size of each written chunk.
22    pub chunk_size: IdxSize,
23}
24
25impl Default for IpcWriterOptions {
26    fn default() -> Self {
27        Self {
28            compression: None,
29            compat_level: CompatLevel::newest(),
30            chunk_size: 1 << 18,
31        }
32    }
33}
34
35impl IpcWriterOptions {
36    pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
37        IpcWriter::new(writer).with_compression(self.compression)
38    }
39}
40
41/// Write a DataFrame to Arrow's IPC format
42///
43/// # Example
44///
45/// ```
46/// use polars_core::prelude::*;
47/// use polars_io::ipc::IpcWriter;
48/// use std::fs::File;
49/// use polars_io::SerWriter;
50///
51/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
52///     let mut file = File::create("file.ipc").expect("could not create file");
53///
54///     let mut writer = IpcWriter::new(&mut file);
55///
56///     let custom_metadata = [
57///         ("first_name".into(), "John".into()),
58///         ("last_name".into(), "Doe".into()),
59///     ]
60///     .into_iter()
61///     .collect();
62///     writer.set_custom_schema_metadata(Arc::new(custom_metadata));
63///     writer.finish(df)
64/// }
65///
66/// ```
67#[must_use]
68pub struct IpcWriter<W> {
69    pub(super) writer: W,
70    pub(super) compression: Option<IpcCompression>,
71    /// Polars' flavor of arrow. This might be temporary.
72    pub(super) compat_level: CompatLevel,
73    pub(super) parallel: bool,
74    pub(super) custom_schema_metadata: Option<Arc<Metadata>>,
75}
76
77impl<W: Write> IpcWriter<W> {
78    /// Set the compression used. Defaults to None.
79    pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
80        self.compression = compression;
81        self
82    }
83
84    pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
85        self.compat_level = compat_level;
86        self
87    }
88
89    pub fn with_parallel(mut self, parallel: bool) -> Self {
90        self.parallel = parallel;
91        self
92    }
93
94    pub fn batched(
95        self,
96        schema: &Schema,
97        ipc_fields: Vec<IpcField>,
98    ) -> PolarsResult<BatchedWriter<W>> {
99        let schema = schema_to_arrow_checked(schema, self.compat_level, "ipc")?;
100        let mut writer = write::FileWriter::new(
101            self.writer,
102            Arc::new(schema),
103            Some(ipc_fields),
104            WriteOptions {
105                compression: self.compression.map(|c| c.into()),
106            },
107        );
108        writer.start()?;
109
110        Ok(BatchedWriter {
111            writer,
112            compat_level: self.compat_level,
113        })
114    }
115
116    /// Sets custom schema metadata. Must be called before `start` is called
117    pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
118        self.custom_schema_metadata = Some(custom_metadata);
119    }
120}
121
122impl<W> SerWriter<W> for IpcWriter<W>
123where
124    W: Write,
125{
126    fn new(writer: W) -> Self {
127        IpcWriter {
128            writer,
129            compression: None,
130            compat_level: CompatLevel::newest(),
131            parallel: true,
132            custom_schema_metadata: None,
133        }
134    }
135
136    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
137        let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
138        let mut ipc_writer = write::FileWriter::try_new(
139            &mut self.writer,
140            Arc::new(schema),
141            None,
142            WriteOptions {
143                compression: self.compression.map(|c| c.into()),
144            },
145        )?;
146        if let Some(custom_metadata) = &self.custom_schema_metadata {
147            ipc_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
148        }
149
150        if self.parallel {
151            df.align_chunks_par();
152        } else {
153            df.align_chunks();
154        }
155        let iter = df.iter_chunks(self.compat_level, true);
156
157        for batch in iter {
158            ipc_writer.write(&batch, None)?
159        }
160        ipc_writer.finish()?;
161        Ok(())
162    }
163}
164
165pub struct BatchedWriter<W: Write> {
166    writer: write::FileWriter<W>,
167    compat_level: CompatLevel,
168}
169
170impl<W: Write> BatchedWriter<W> {
171    /// Write a batch to the ipc writer.
172    ///
173    /// # Panics
174    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
175    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
176        let iter = df.iter_chunks(self.compat_level, true);
177        for batch in iter {
178            self.writer.write(&batch, None)?
179        }
180        Ok(())
181    }
182
183    /// Write a encoded data to the ipc writer.
184    ///
185    /// # Panics
186    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
187    pub fn write_encoded(
188        &mut self,
189        dictionaries: &[EncodedData],
190        message: &EncodedData,
191    ) -> PolarsResult<()> {
192        self.writer.write_encoded(dictionaries, message)
193    }
194
195    pub fn write_encoded_dictionaries(
196        &mut self,
197        encoded_dictionaries: &[EncodedData],
198    ) -> PolarsResult<()> {
199        self.writer.write_encoded_dictionaries(encoded_dictionaries)
200    }
201
202    /// Writes the footer of the IPC file.
203    pub fn finish(&mut self) -> PolarsResult<()> {
204        self.writer.finish()?;
205        Ok(())
206    }
207}
208
209/// Compression codec
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
211#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
212#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
213pub enum IpcCompression {
214    /// LZ4 (framed)
215    LZ4,
216    /// ZSTD
217    ZSTD(polars_utils::compression::ZstdLevel),
218}
219
220impl Default for IpcCompression {
221    fn default() -> Self {
222        Self::ZSTD(Default::default())
223    }
224}
225
226impl From<IpcCompression> for write::Compression {
227    fn from(value: IpcCompression) -> Self {
228        match value {
229            IpcCompression::LZ4 => write::Compression::LZ4,
230            IpcCompression::ZSTD(level) => write::Compression::ZSTD(level),
231        }
232    }
233}