polars_core/serde/
df.rs

1use std::io::{Read, Seek};
2use std::sync::Arc;
3
4use arrow::datatypes::Metadata;
5use arrow::io::ipc::read::{StreamReader, StreamState, read_stream_metadata};
6use arrow::io::ipc::write::WriteOptions;
7use polars_error::{PolarsResult, polars_err, to_compute_err};
8use polars_utils::format_pl_smallstr;
9use polars_utils::pl_serialize::deserialize_map_bytes;
10use polars_utils::pl_str::PlSmallStr;
11use serde::de::Error;
12use serde::*;
13
14use crate::chunked_array::flags::StatisticsFlags;
15use crate::config;
16use crate::frame::chunk_df_for_writing;
17use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
18use crate::utils::accumulate_dataframes_vertical_unchecked;
19
20const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");
21
22impl DataFrame {
23    pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
24        let schema = self.schema();
25
26        if schema.iter_values().any(|x| x.is_object()) {
27            return Err(polars_err!(
28                ComputeError:
29                "serializing data of type Object is not supported",
30            ));
31        }
32
33        let mut ipc_writer =
34            arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });
35
36        ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
37            self.get_columns().iter().map(|c| {
38                (
39                    format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
40                    PlSmallStr::from(c.get_flags().bits().to_string()),
41                )
42            }),
43        )));
44
45        ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
46            FLAGS_KEY,
47            serde_json::to_string(
48                &self
49                    .iter()
50                    .map(|s| s.get_flags().bits())
51                    .collect::<Vec<u32>>(),
52            )
53            .map_err(to_compute_err)?
54            .into(),
55        )])));
56
57        ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;
58
59        for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
60        {
61            ipc_writer.write(&batch, None)?;
62        }
63
64        ipc_writer.finish()?;
65
66        Ok(())
67    }
68
69    pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
70        let mut buf = vec![];
71        self.serialize_into_writer(&mut buf)?;
72
73        Ok(buf)
74    }
75
76    pub fn deserialize_from_reader<T: Read + Seek>(reader: &mut T) -> PolarsResult<Self> {
77        let mut md = read_stream_metadata(reader)?;
78
79        let custom_metadata = md.custom_schema_metadata.take();
80
81        let reader = StreamReader::new(reader, md, None);
82        let dfs = reader
83            .into_iter()
84            .map_while(|batch| match batch {
85                Ok(StreamState::Some(batch)) => Some(Ok(DataFrame::from(batch))),
86                Ok(StreamState::Waiting) => None,
87                Err(e) => Some(Err(e)),
88            })
89            .collect::<PolarsResult<Vec<DataFrame>>>()?;
90
91        if dfs.is_empty() {
92            return Ok(DataFrame::empty());
93        }
94        let mut df = accumulate_dataframes_vertical_unchecked(dfs);
95
96        // Set custom metadata (fallible)
97        (|| {
98            let custom_metadata = custom_metadata?;
99            let flags = custom_metadata.get(&FLAGS_KEY)?;
100
101            let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);
102
103            let verbose = config::verbose();
104
105            if let Err(e) = &flags {
106                if verbose {
107                    eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {e}");
108                }
109            }
110
111            let flags = flags.ok()?;
112
113            if flags.len() != df.width() {
114                if verbose {
115                    eprintln!(
116                        "DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
117                        flags.len(),
118                        df.width()
119                    );
120                }
121
122                return None;
123            }
124
125            let mut n_set = 0;
126
127            for (c, v) in unsafe { df.get_columns_mut() }.iter_mut().zip(flags) {
128                if let Some(flags) = StatisticsFlags::from_bits(v) {
129                    n_set += c.set_flags(flags) as usize;
130                }
131            }
132
133            if verbose {
134                eprintln!(
135                    "DataFrame::read_ipc: Loaded metadata for {} / {} columns",
136                    n_set,
137                    df.width()
138                );
139            }
140
141            Some(())
142        })();
143
144        Ok(df)
145    }
146}
147
148impl Serialize for DataFrame {
149    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
150    where
151        S: Serializer,
152    {
153        use serde::ser::Error;
154
155        let mut bytes = vec![];
156        self.clone()
157            .serialize_into_writer(&mut bytes)
158            .map_err(S::Error::custom)?;
159
160        serializer.serialize_bytes(bytes.as_slice())
161    }
162}
163
164impl<'de> Deserialize<'de> for DataFrame {
165    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
166    where
167        D: Deserializer<'de>,
168    {
169        deserialize_map_bytes(deserializer, |b| {
170            let v = &mut b.as_ref();
171            let mut reader = std::io::Cursor::new(v);
172            Self::deserialize_from_reader(&mut reader)
173        })?
174        .map_err(D::Error::custom)
175    }
176}
177
178#[cfg(feature = "dsl-schema")]
179impl schemars::JsonSchema for DataFrame {
180    fn schema_name() -> String {
181        "DataFrame".to_owned()
182    }
183
184    fn schema_id() -> std::borrow::Cow<'static, str> {
185        std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "DataFrame"))
186    }
187
188    fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
189        Vec::<u8>::json_schema(generator)
190    }
191}