1use std::io::Write;
2
3use arrow::datatypes::Metadata;
4use arrow::io::ipc::write::{self, EncodedData, WriteOptions};
5use polars_core::prelude::*;
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8
9use crate::prelude::*;
10use crate::shared::schema_to_arrow_checked;
11
12#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
13#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
14pub struct IpcWriterOptions {
15 pub compression: Option<IpcCompression>,
17 pub compat_level: CompatLevel,
19 pub chunk_size: IdxSize,
21}
22
23impl Default for IpcWriterOptions {
24 fn default() -> Self {
25 Self {
26 compression: None,
27 compat_level: CompatLevel::newest(),
28 chunk_size: 1 << 18,
29 }
30 }
31}
32
33impl IpcWriterOptions {
34 pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
35 IpcWriter::new(writer).with_compression(self.compression)
36 }
37}
38
39#[must_use]
66pub struct IpcWriter<W> {
67 pub(super) writer: W,
68 pub(super) compression: Option<IpcCompression>,
69 pub(super) compat_level: CompatLevel,
71 pub(super) parallel: bool,
72 pub(super) custom_schema_metadata: Option<Arc<Metadata>>,
73}
74
75impl<W: Write> IpcWriter<W> {
76 pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
78 self.compression = compression;
79 self
80 }
81
82 pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
83 self.compat_level = compat_level;
84 self
85 }
86
87 pub fn with_parallel(mut self, parallel: bool) -> Self {
88 self.parallel = parallel;
89 self
90 }
91
92 pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
93 let schema = schema_to_arrow_checked(schema, self.compat_level, "ipc")?;
94 let mut writer = write::FileWriter::new(
95 self.writer,
96 Arc::new(schema),
97 None,
98 WriteOptions {
99 compression: self.compression.map(|c| c.into()),
100 },
101 );
102 writer.start()?;
103
104 Ok(BatchedWriter {
105 writer,
106 compat_level: self.compat_level,
107 })
108 }
109
110 pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
112 self.custom_schema_metadata = Some(custom_metadata);
113 }
114}
115
116impl<W> SerWriter<W> for IpcWriter<W>
117where
118 W: Write,
119{
120 fn new(writer: W) -> Self {
121 IpcWriter {
122 writer,
123 compression: None,
124 compat_level: CompatLevel::newest(),
125 parallel: true,
126 custom_schema_metadata: None,
127 }
128 }
129
130 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
131 let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
132 let mut ipc_writer = write::FileWriter::try_new(
133 &mut self.writer,
134 Arc::new(schema),
135 None,
136 WriteOptions {
137 compression: self.compression.map(|c| c.into()),
138 },
139 )?;
140 if let Some(custom_metadata) = &self.custom_schema_metadata {
141 ipc_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
142 }
143
144 if self.parallel {
145 df.align_chunks_par();
146 } else {
147 df.align_chunks();
148 }
149 let iter = df.iter_chunks(self.compat_level, true);
150
151 for batch in iter {
152 ipc_writer.write(&batch, None)?
153 }
154 ipc_writer.finish()?;
155 Ok(())
156 }
157}
158
159pub struct BatchedWriter<W: Write> {
160 writer: write::FileWriter<W>,
161 compat_level: CompatLevel,
162}
163
164impl<W: Write> BatchedWriter<W> {
165 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
170 let iter = df.iter_chunks(self.compat_level, true);
171 for batch in iter {
172 self.writer.write(&batch, None)?
173 }
174 Ok(())
175 }
176
177 pub fn write_encoded(
182 &mut self,
183 dictionaries: &[EncodedData],
184 message: &EncodedData,
185 ) -> PolarsResult<()> {
186 self.writer.write_encoded(dictionaries, message)?;
187 Ok(())
188 }
189
190 pub fn finish(&mut self) -> PolarsResult<()> {
192 self.writer.finish()?;
193 Ok(())
194 }
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
199#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
200pub enum IpcCompression {
201 LZ4,
203 #[default]
205 ZSTD,
206}
207
208impl From<IpcCompression> for write::Compression {
209 fn from(value: IpcCompression) -> Self {
210 match value {
211 IpcCompression::LZ4 => write::Compression::LZ4,
212 IpcCompression::ZSTD => write::Compression::ZSTD,
213 }
214 }
215}