polars_io/ipc/
ipc_stream.rs1use std::io::{Read, Write};
37use std::path::PathBuf;
38
39use arrow::datatypes::Metadata;
40use arrow::io::ipc::read::{StreamMetadata, StreamState};
41use arrow::io::ipc::write::WriteOptions;
42use arrow::io::ipc::{read, write};
43use polars_core::frame::chunk_df_for_writing;
44use polars_core::prelude::*;
45
46use crate::prelude::*;
47use crate::shared::{ArrowReader, finish_reader};
48
49#[must_use]
66pub struct IpcStreamReader<R> {
67 reader: R,
69 rechunk: bool,
71 n_rows: Option<usize>,
72 projection: Option<Vec<usize>>,
73 columns: Option<Vec<String>>,
74 row_index: Option<RowIndex>,
75 metadata: Option<StreamMetadata>,
76}
77
78impl<R: Read> IpcStreamReader<R> {
79 pub fn schema(&mut self) -> PolarsResult<Schema> {
81 Ok(Schema::from_arrow_schema(&self.metadata()?.schema))
82 }
83
84 pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
86 Ok(self.metadata()?.schema)
87 }
88
89 pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
91 Ok(self.metadata()?.custom_schema_metadata.map(Arc::new))
92 }
93
94 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
96 self.n_rows = num_rows;
97 self
98 }
99
100 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
102 self.columns = columns;
103 self
104 }
105
106 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
108 self.row_index = row_index;
109 self
110 }
111
112 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
115 self.projection = projection;
116 self
117 }
118
119 fn metadata(&mut self) -> PolarsResult<StreamMetadata> {
120 match &self.metadata {
121 None => {
122 let metadata = read::read_stream_metadata(&mut self.reader)?;
123 self.metadata = Option::from(metadata.clone());
124 Ok(metadata)
125 },
126 Some(md) => Ok(md.clone()),
127 }
128 }
129}
130
131impl<R> ArrowReader for read::StreamReader<R>
132where
133 R: Read,
134{
135 fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
136 self.next().map_or(Ok(None), |v| match v {
137 Ok(stream_state) => match stream_state {
138 StreamState::Waiting => Ok(None),
139 StreamState::Some(chunk) => Ok(Some(chunk)),
140 },
141 Err(err) => Err(err),
142 })
143 }
144}
145
146impl<R> SerReader<R> for IpcStreamReader<R>
147where
148 R: Read,
149{
150 fn new(reader: R) -> Self {
151 IpcStreamReader {
152 reader,
153 rechunk: true,
154 n_rows: None,
155 columns: None,
156 projection: None,
157 row_index: None,
158 metadata: None,
159 }
160 }
161
162 fn set_rechunk(mut self, rechunk: bool) -> Self {
163 self.rechunk = rechunk;
164 self
165 }
166
167 fn finish(mut self) -> PolarsResult<DataFrame> {
168 let rechunk = self.rechunk;
169 let metadata = self.metadata()?;
170 let schema = &metadata.schema;
171
172 if let Some(columns) = self.columns {
173 let prj = columns_to_projection(&columns, schema)?;
174 self.projection = Some(prj);
175 }
176
177 let schema = if let Some(projection) = &self.projection {
178 apply_projection(&metadata.schema, projection)
179 } else {
180 metadata.schema.clone()
181 };
182
183 let ipc_reader =
184 read::StreamReader::new(&mut self.reader, metadata.clone(), self.projection);
185 finish_reader(
186 ipc_reader,
187 rechunk,
188 self.n_rows,
189 None,
190 &schema,
191 self.row_index,
192 )
193 }
194}
195
196#[must_use]
224pub struct IpcStreamWriter<W> {
225 writer: W,
226 compression: Option<IpcCompression>,
227 compat_level: CompatLevel,
228 custom_schema_metadata: Option<Arc<Metadata>>,
230}
231
232use arrow::record_batch::RecordBatch;
233
234use crate::RowIndex;
235
236impl<W> IpcStreamWriter<W> {
237 pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
239 self.compression = compression;
240 self
241 }
242
243 pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
244 self.compat_level = compat_level;
245 self
246 }
247
248 pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
250 self.custom_schema_metadata = Some(custom_metadata);
251 }
252}
253
254impl<W> SerWriter<W> for IpcStreamWriter<W>
255where
256 W: Write,
257{
258 fn new(writer: W) -> Self {
259 IpcStreamWriter {
260 writer,
261 compression: None,
262 compat_level: CompatLevel::oldest(),
263 custom_schema_metadata: None,
264 }
265 }
266
267 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
268 let mut ipc_stream_writer = write::StreamWriter::new(
269 &mut self.writer,
270 WriteOptions {
271 compression: self.compression.map(|c| c.into()),
272 },
273 );
274
275 if let Some(custom_metadata) = &self.custom_schema_metadata {
276 ipc_stream_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
277 }
278
279 ipc_stream_writer.start(&df.schema().to_arrow(self.compat_level), None)?;
280 let df = chunk_df_for_writing(df, 512 * 512)?;
281 let iter = df.iter_chunks(self.compat_level, true);
282
283 for batch in iter {
284 ipc_stream_writer.write(&batch, None)?
285 }
286 ipc_stream_writer.finish()?;
287 Ok(())
288 }
289}
290
291pub struct IpcStreamWriterOption {
292 compression: Option<IpcCompression>,
293 extension: PathBuf,
294}
295
296impl IpcStreamWriterOption {
297 pub fn new() -> Self {
298 Self {
299 compression: None,
300 extension: PathBuf::from(".ipc"),
301 }
302 }
303
304 pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
306 self.compression = compression;
307 self
308 }
309
310 pub fn with_extension(mut self, extension: PathBuf) -> Self {
312 self.extension = extension;
313 self
314 }
315}
316
317impl Default for IpcStreamWriterOption {
318 fn default() -> Self {
319 Self::new()
320 }
321}