1use std::io::{Read, Seek};
2use std::sync::Arc;
3
4use arrow::datatypes::Metadata;
5use arrow::io::ipc::read::{StreamReader, StreamState, read_stream_metadata};
6use arrow::io::ipc::write::WriteOptions;
7use polars_error::{PolarsResult, polars_err, to_compute_err};
8use polars_utils::format_pl_smallstr;
9use polars_utils::pl_serialize::deserialize_map_bytes;
10use polars_utils::pl_str::PlSmallStr;
11use serde::de::Error;
12use serde::*;
13
14use crate::chunked_array::flags::StatisticsFlags;
15use crate::config;
16use crate::frame::chunk_df_for_writing;
17use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
18use crate::utils::accumulate_dataframes_vertical_unchecked;
19
20const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");
21
22impl DataFrame {
23 pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
24 let schema = self.schema();
25
26 if schema.iter_values().any(|x| x.is_object()) {
27 return Err(polars_err!(
28 ComputeError:
29 "serializing data of type Object is not supported",
30 ));
31 }
32
33 let mut ipc_writer =
34 arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });
35
36 ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
37 self.get_columns().iter().map(|c| {
38 (
39 format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
40 PlSmallStr::from(c.get_flags().bits().to_string()),
41 )
42 }),
43 )));
44
45 ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
46 FLAGS_KEY,
47 serde_json::to_string(
48 &self
49 .iter()
50 .map(|s| s.get_flags().bits())
51 .collect::<Vec<u32>>(),
52 )
53 .map_err(to_compute_err)?
54 .into(),
55 )])));
56
57 ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;
58
59 for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
60 {
61 ipc_writer.write(&batch, None)?;
62 }
63
64 ipc_writer.finish()?;
65
66 Ok(())
67 }
68
69 pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
70 let mut buf = vec![];
71 self.serialize_into_writer(&mut buf)?;
72
73 Ok(buf)
74 }
75
76 pub fn deserialize_from_reader<T: Read + Seek>(reader: &mut T) -> PolarsResult<Self> {
77 let mut md = read_stream_metadata(reader)?;
78
79 let custom_metadata = md.custom_schema_metadata.take();
80
81 let reader = StreamReader::new(reader, md, None);
82 let dfs = reader
83 .into_iter()
84 .map_while(|batch| match batch {
85 Ok(StreamState::Some(batch)) => Some(Ok(DataFrame::from(batch))),
86 Ok(StreamState::Waiting) => None,
87 Err(e) => Some(Err(e)),
88 })
89 .collect::<PolarsResult<Vec<DataFrame>>>()?;
90
91 if dfs.is_empty() {
92 return Ok(DataFrame::empty());
93 }
94 let mut df = accumulate_dataframes_vertical_unchecked(dfs);
95
96 (|| {
98 let custom_metadata = custom_metadata?;
99 let flags = custom_metadata.get(&FLAGS_KEY)?;
100
101 let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);
102
103 let verbose = config::verbose();
104
105 if let Err(e) = &flags {
106 if verbose {
107 eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {e}");
108 }
109 }
110
111 let flags = flags.ok()?;
112
113 if flags.len() != df.width() {
114 if verbose {
115 eprintln!(
116 "DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
117 flags.len(),
118 df.width()
119 );
120 }
121
122 return None;
123 }
124
125 let mut n_set = 0;
126
127 for (c, v) in unsafe { df.get_columns_mut() }.iter_mut().zip(flags) {
128 if let Some(flags) = StatisticsFlags::from_bits(v) {
129 n_set += c.set_flags(flags) as usize;
130 }
131 }
132
133 if verbose {
134 eprintln!(
135 "DataFrame::read_ipc: Loaded metadata for {} / {} columns",
136 n_set,
137 df.width()
138 );
139 }
140
141 Some(())
142 })();
143
144 Ok(df)
145 }
146}
147
148impl Serialize for DataFrame {
149 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
150 where
151 S: Serializer,
152 {
153 use serde::ser::Error;
154
155 let mut bytes = vec![];
156 self.clone()
157 .serialize_into_writer(&mut bytes)
158 .map_err(S::Error::custom)?;
159
160 serializer.serialize_bytes(bytes.as_slice())
161 }
162}
163
164impl<'de> Deserialize<'de> for DataFrame {
165 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
166 where
167 D: Deserializer<'de>,
168 {
169 deserialize_map_bytes(deserializer, |b| {
170 let v = &mut b.as_ref();
171 let mut reader = std::io::Cursor::new(v);
172 Self::deserialize_from_reader(&mut reader)
173 })?
174 .map_err(D::Error::custom)
175 }
176}
177
178#[cfg(feature = "dsl-schema")]
179impl schemars::JsonSchema for DataFrame {
180 fn schema_name() -> String {
181 "DataFrame".to_owned()
182 }
183
184 fn schema_id() -> std::borrow::Cow<'static, str> {
185 std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "DataFrame"))
186 }
187
188 fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
189 Vec::<u8>::json_schema(generator)
190 }
191}