1use std::sync::Arc;
2
3use arrow::datatypes::Metadata;
4use arrow::io::ipc::read::{StreamReader, StreamState, read_stream_metadata};
5use arrow::io::ipc::write::WriteOptions;
6use polars_error::{PolarsResult, polars_err, to_compute_err};
7use polars_utils::format_pl_smallstr;
8use polars_utils::pl_serialize::deserialize_map_bytes;
9use polars_utils::pl_str::PlSmallStr;
10use serde::de::Error;
11use serde::*;
12
13use crate::chunked_array::flags::StatisticsFlags;
14use crate::config;
15use crate::frame::chunk_df_for_writing;
16use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
17use crate::utils::accumulate_dataframes_vertical_unchecked;
18
19const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");
20
21impl DataFrame {
22 pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
23 let schema = self.schema();
24
25 if schema.iter_values().any(|x| x.is_object()) {
26 return Err(polars_err!(
27 ComputeError:
28 "serializing data of type Object is not supported",
29 ));
30 }
31
32 let mut ipc_writer =
33 arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });
34
35 ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
36 self.get_columns().iter().map(|c| {
37 (
38 format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
39 PlSmallStr::from(c.get_flags().bits().to_string()),
40 )
41 }),
42 )));
43
44 ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
45 FLAGS_KEY,
46 serde_json::to_string(
47 &self
48 .iter()
49 .map(|s| s.get_flags().bits())
50 .collect::<Vec<u32>>(),
51 )
52 .map_err(to_compute_err)?
53 .into(),
54 )])));
55
56 ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;
57
58 for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
59 {
60 ipc_writer.write(&batch, None)?;
61 }
62
63 ipc_writer.finish()?;
64
65 Ok(())
66 }
67
68 pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
69 let mut buf = vec![];
70 self.serialize_into_writer(&mut buf)?;
71
72 Ok(buf)
73 }
74
75 pub fn deserialize_from_reader(reader: &mut dyn std::io::Read) -> PolarsResult<Self> {
76 let mut md = read_stream_metadata(reader)?;
77
78 let custom_metadata = md.custom_schema_metadata.take();
79
80 let reader = StreamReader::new(reader, md, None);
81 let dfs = reader
82 .into_iter()
83 .map_while(|batch| match batch {
84 Ok(StreamState::Some(batch)) => Some(Ok(DataFrame::from(batch))),
85 Ok(StreamState::Waiting) => None,
86 Err(e) => Some(Err(e)),
87 })
88 .collect::<PolarsResult<Vec<DataFrame>>>()?;
89
90 if dfs.is_empty() {
91 return Ok(DataFrame::empty());
92 }
93 let mut df = accumulate_dataframes_vertical_unchecked(dfs);
94
95 (|| {
97 let custom_metadata = custom_metadata?;
98 let flags = custom_metadata.get(&FLAGS_KEY)?;
99
100 let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);
101
102 let verbose = config::verbose();
103
104 if let Err(e) = &flags {
105 if verbose {
106 eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {}", e);
107 }
108 }
109
110 let flags = flags.ok()?;
111
112 if flags.len() != df.width() {
113 if verbose {
114 eprintln!(
115 "DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
116 flags.len(),
117 df.width()
118 );
119 }
120
121 return None;
122 }
123
124 let mut n_set = 0;
125
126 for (c, v) in unsafe { df.get_columns_mut() }.iter_mut().zip(flags) {
127 if let Some(flags) = StatisticsFlags::from_bits(v) {
128 n_set += c.set_flags(flags) as usize;
129 }
130 }
131
132 if verbose {
133 eprintln!(
134 "DataFrame::read_ipc: Loaded metadata for {} / {} columns",
135 n_set,
136 df.width()
137 );
138 }
139
140 Some(())
141 })();
142
143 Ok(df)
144 }
145}
146
147impl Serialize for DataFrame {
148 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
149 where
150 S: Serializer,
151 {
152 use serde::ser::Error;
153
154 let mut bytes = vec![];
155 self.clone()
156 .serialize_into_writer(&mut bytes)
157 .map_err(S::Error::custom)?;
158
159 serializer.serialize_bytes(bytes.as_slice())
160 }
161}
162
163impl<'de> Deserialize<'de> for DataFrame {
164 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
165 where
166 D: Deserializer<'de>,
167 {
168 deserialize_map_bytes(deserializer, |b| {
169 let v = &mut b.as_ref();
170 Self::deserialize_from_reader(v)
171 })?
172 .map_err(D::Error::custom)
173 }
174}