polars_core/serde/
df.rs

1use std::sync::Arc;
2
3use arrow::datatypes::Metadata;
4use arrow::io::ipc::read::{StreamReader, StreamState, read_stream_metadata};
5use arrow::io::ipc::write::WriteOptions;
6use polars_error::{PolarsResult, polars_err, to_compute_err};
7use polars_utils::format_pl_smallstr;
8use polars_utils::pl_serialize::deserialize_map_bytes;
9use polars_utils::pl_str::PlSmallStr;
10use serde::de::Error;
11use serde::*;
12
13use crate::chunked_array::flags::StatisticsFlags;
14use crate::config;
15use crate::frame::chunk_df_for_writing;
16use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
17use crate::utils::accumulate_dataframes_vertical_unchecked;
18
19const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");
20
21impl DataFrame {
22    pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
23        let schema = self.schema();
24
25        if schema.iter_values().any(|x| x.is_object()) {
26            return Err(polars_err!(
27                ComputeError:
28                "serializing data of type Object is not supported",
29            ));
30        }
31
32        let mut ipc_writer =
33            arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });
34
35        ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
36            self.get_columns().iter().map(|c| {
37                (
38                    format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
39                    PlSmallStr::from(c.get_flags().bits().to_string()),
40                )
41            }),
42        )));
43
44        ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
45            FLAGS_KEY,
46            serde_json::to_string(
47                &self
48                    .iter()
49                    .map(|s| s.get_flags().bits())
50                    .collect::<Vec<u32>>(),
51            )
52            .map_err(to_compute_err)?
53            .into(),
54        )])));
55
56        ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;
57
58        for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
59        {
60            ipc_writer.write(&batch, None)?;
61        }
62
63        ipc_writer.finish()?;
64
65        Ok(())
66    }
67
68    pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
69        let mut buf = vec![];
70        self.serialize_into_writer(&mut buf)?;
71
72        Ok(buf)
73    }
74
75    pub fn deserialize_from_reader(reader: &mut dyn std::io::Read) -> PolarsResult<Self> {
76        let mut md = read_stream_metadata(reader)?;
77
78        let custom_metadata = md.custom_schema_metadata.take();
79
80        let reader = StreamReader::new(reader, md, None);
81        let dfs = reader
82            .into_iter()
83            .map_while(|batch| match batch {
84                Ok(StreamState::Some(batch)) => Some(Ok(DataFrame::from(batch))),
85                Ok(StreamState::Waiting) => None,
86                Err(e) => Some(Err(e)),
87            })
88            .collect::<PolarsResult<Vec<DataFrame>>>()?;
89
90        if dfs.is_empty() {
91            return Ok(DataFrame::empty());
92        }
93        let mut df = accumulate_dataframes_vertical_unchecked(dfs);
94
95        // Set custom metadata (fallible)
96        (|| {
97            let custom_metadata = custom_metadata?;
98            let flags = custom_metadata.get(&FLAGS_KEY)?;
99
100            let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);
101
102            let verbose = config::verbose();
103
104            if let Err(e) = &flags {
105                if verbose {
106                    eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {}", e);
107                }
108            }
109
110            let flags = flags.ok()?;
111
112            if flags.len() != df.width() {
113                if verbose {
114                    eprintln!(
115                        "DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
116                        flags.len(),
117                        df.width()
118                    );
119                }
120
121                return None;
122            }
123
124            let mut n_set = 0;
125
126            for (c, v) in unsafe { df.get_columns_mut() }.iter_mut().zip(flags) {
127                if let Some(flags) = StatisticsFlags::from_bits(v) {
128                    n_set += c.set_flags(flags) as usize;
129                }
130            }
131
132            if verbose {
133                eprintln!(
134                    "DataFrame::read_ipc: Loaded metadata for {} / {} columns",
135                    n_set,
136                    df.width()
137                );
138            }
139
140            Some(())
141        })();
142
143        Ok(df)
144    }
145}
146
147impl Serialize for DataFrame {
148    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
149    where
150        S: Serializer,
151    {
152        use serde::ser::Error;
153
154        let mut bytes = vec![];
155        self.clone()
156            .serialize_into_writer(&mut bytes)
157            .map_err(S::Error::custom)?;
158
159        serializer.serialize_bytes(bytes.as_slice())
160    }
161}
162
163impl<'de> Deserialize<'de> for DataFrame {
164    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
165    where
166        D: Deserializer<'de>,
167    {
168        deserialize_map_bytes(deserializer, |b| {
169            let v = &mut b.as_ref();
170            Self::deserialize_from_reader(v)
171        })?
172        .map_err(D::Error::custom)
173    }
174}