1use std::io::Write;
2
3use arrow::datatypes::Metadata;
4use arrow::io::ipc::IpcField;
5use arrow::io::ipc::write::{self, EncodedData, WriteOptions};
6use polars_core::prelude::*;
7#[cfg(feature = "serde")]
8use serde::{Deserialize, Serialize};
9
10use crate::prelude::*;
11use crate::shared::schema_to_arrow_checked;
12
13#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
14#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
15#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
16pub struct IpcWriterOptions {
17 pub compression: Option<IpcCompression>,
19 pub compat_level: CompatLevel,
21 pub chunk_size: IdxSize,
23}
24
25impl Default for IpcWriterOptions {
26 fn default() -> Self {
27 Self {
28 compression: None,
29 compat_level: CompatLevel::newest(),
30 chunk_size: 1 << 18,
31 }
32 }
33}
34
35impl IpcWriterOptions {
36 pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
37 IpcWriter::new(writer).with_compression(self.compression)
38 }
39}
40
41#[must_use]
68pub struct IpcWriter<W> {
69 pub(super) writer: W,
70 pub(super) compression: Option<IpcCompression>,
71 pub(super) compat_level: CompatLevel,
73 pub(super) parallel: bool,
74 pub(super) custom_schema_metadata: Option<Arc<Metadata>>,
75}
76
77impl<W: Write> IpcWriter<W> {
78 pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
80 self.compression = compression;
81 self
82 }
83
84 pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
85 self.compat_level = compat_level;
86 self
87 }
88
89 pub fn with_parallel(mut self, parallel: bool) -> Self {
90 self.parallel = parallel;
91 self
92 }
93
94 pub fn batched(
95 self,
96 schema: &Schema,
97 ipc_fields: Vec<IpcField>,
98 ) -> PolarsResult<BatchedWriter<W>> {
99 let schema = schema_to_arrow_checked(schema, self.compat_level, "ipc")?;
100 let mut writer = write::FileWriter::new(
101 self.writer,
102 Arc::new(schema),
103 Some(ipc_fields),
104 WriteOptions {
105 compression: self.compression.map(|c| c.into()),
106 },
107 );
108 writer.start()?;
109
110 Ok(BatchedWriter {
111 writer,
112 compat_level: self.compat_level,
113 })
114 }
115
116 pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
118 self.custom_schema_metadata = Some(custom_metadata);
119 }
120}
121
122impl<W> SerWriter<W> for IpcWriter<W>
123where
124 W: Write,
125{
126 fn new(writer: W) -> Self {
127 IpcWriter {
128 writer,
129 compression: None,
130 compat_level: CompatLevel::newest(),
131 parallel: true,
132 custom_schema_metadata: None,
133 }
134 }
135
136 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
137 let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
138 let mut ipc_writer = write::FileWriter::try_new(
139 &mut self.writer,
140 Arc::new(schema),
141 None,
142 WriteOptions {
143 compression: self.compression.map(|c| c.into()),
144 },
145 )?;
146 if let Some(custom_metadata) = &self.custom_schema_metadata {
147 ipc_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
148 }
149
150 if self.parallel {
151 df.align_chunks_par();
152 } else {
153 df.align_chunks();
154 }
155 let iter = df.iter_chunks(self.compat_level, true);
156
157 for batch in iter {
158 ipc_writer.write(&batch, None)?
159 }
160 ipc_writer.finish()?;
161 Ok(())
162 }
163}
164
165pub struct BatchedWriter<W: Write> {
166 writer: write::FileWriter<W>,
167 compat_level: CompatLevel,
168}
169
170impl<W: Write> BatchedWriter<W> {
171 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
176 let iter = df.iter_chunks(self.compat_level, true);
177 for batch in iter {
178 self.writer.write(&batch, None)?
179 }
180 Ok(())
181 }
182
183 pub fn write_encoded(
188 &mut self,
189 dictionaries: &[EncodedData],
190 message: &EncodedData,
191 ) -> PolarsResult<()> {
192 self.writer.write_encoded(dictionaries, message)
193 }
194
195 pub fn write_encoded_dictionaries(
196 &mut self,
197 encoded_dictionaries: &[EncodedData],
198 ) -> PolarsResult<()> {
199 self.writer.write_encoded_dictionaries(encoded_dictionaries)
200 }
201
202 pub fn finish(&mut self) -> PolarsResult<()> {
204 self.writer.finish()?;
205 Ok(())
206 }
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
211#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
212#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
213pub enum IpcCompression {
214 LZ4,
216 ZSTD(polars_utils::compression::ZstdLevel),
218}
219
220impl Default for IpcCompression {
221 fn default() -> Self {
222 Self::ZSTD(Default::default())
223 }
224}
225
226impl From<IpcCompression> for write::Compression {
227 fn from(value: IpcCompression) -> Self {
228 match value {
229 IpcCompression::LZ4 => write::Compression::LZ4,
230 IpcCompression::ZSTD(level) => write::Compression::ZSTD(level),
231 }
232 }
233}