polars_core/serde/
df.rs

1use std::io::{Read, Seek};
2use std::sync::Arc;
3
4use arrow::datatypes::Metadata;
5use arrow::io::ipc::read::{StreamReader, StreamState, read_stream_metadata};
6use arrow::io::ipc::write::WriteOptions;
7use polars_error::{PolarsResult, polars_err, to_compute_err};
8use polars_utils::format_pl_smallstr;
9use polars_utils::pl_serialize::deserialize_map_bytes;
10use polars_utils::pl_str::PlSmallStr;
11use serde::de::Error;
12use serde::*;
13
14use crate::chunked_array::flags::StatisticsFlags;
15use crate::config;
16use crate::frame::chunk_df_for_writing;
17use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
18use crate::schema::Schema;
19use crate::utils::accumulate_dataframes_vertical_unchecked;
20
21const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");
22
23impl DataFrame {
24    pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
25        let schema = self.schema();
26
27        if schema.iter_values().any(|x| x.is_object()) {
28            return Err(polars_err!(
29                ComputeError:
30                "serializing data of type Object is not supported",
31            ));
32        }
33
34        let mut ipc_writer =
35            arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });
36
37        ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
38            self.columns().iter().map(|c| {
39                (
40                    format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
41                    PlSmallStr::from(c.get_flags().bits().to_string()),
42                )
43            }),
44        )));
45
46        ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
47            FLAGS_KEY,
48            serde_json::to_string(
49                &self
50                    .columns()
51                    .iter()
52                    .map(|s| s.get_flags().bits())
53                    .collect::<Vec<u32>>(),
54            )
55            .map_err(to_compute_err)?
56            .into(),
57        )])));
58
59        ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;
60
61        for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
62        {
63            ipc_writer.write(&batch, None)?;
64        }
65
66        ipc_writer.finish()?;
67
68        Ok(())
69    }
70
71    pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
72        let mut buf = vec![];
73        self.serialize_into_writer(&mut buf)?;
74
75        Ok(buf)
76    }
77
78    pub fn deserialize_from_reader<T: Read + Seek>(reader: &mut T) -> PolarsResult<Self> {
79        let mut md = read_stream_metadata(reader)?;
80        let pl_schema = Schema::from_arrow_schema(&md.schema);
81
82        let custom_metadata = md.custom_schema_metadata.take();
83
84        let reader = StreamReader::new(reader, md, None);
85        let dfs = reader
86            .into_iter()
87            .map_while(|batch| match batch {
88                Ok(StreamState::Some(batch)) => Some(Ok(DataFrame::from(batch))),
89                Ok(StreamState::Waiting) => None,
90                Err(e) => Some(Err(e)),
91            })
92            .collect::<PolarsResult<Vec<DataFrame>>>()?;
93
94        if dfs.is_empty() {
95            return Ok(DataFrame::empty_with_schema(&pl_schema));
96        }
97
98        let mut df = accumulate_dataframes_vertical_unchecked(dfs);
99
100        // Set custom metadata (fallible)
101        (|| {
102            let custom_metadata = custom_metadata?;
103            let flags = custom_metadata.get(&FLAGS_KEY)?;
104
105            let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);
106
107            let verbose = config::verbose();
108
109            if let Err(e) = &flags {
110                if verbose {
111                    eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {e}");
112                }
113            }
114
115            let flags = flags.ok()?;
116
117            if flags.len() != df.width() {
118                if verbose {
119                    eprintln!(
120                        "DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
121                        flags.len(),
122                        df.width()
123                    );
124                }
125
126                return None;
127            }
128
129            let mut n_set = 0;
130
131            for (c, v) in unsafe { df.columns_mut_retain_schema() }
132                .iter_mut()
133                .zip(flags)
134            {
135                if let Some(flags) = StatisticsFlags::from_bits(v) {
136                    n_set += c.set_flags(flags) as usize;
137                }
138            }
139
140            if verbose {
141                eprintln!(
142                    "DataFrame::read_ipc: Loaded metadata for {} / {} columns",
143                    n_set,
144                    df.width()
145                );
146            }
147
148            Some(())
149        })();
150
151        Ok(df)
152    }
153}
154
155impl Serialize for DataFrame {
156    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
157    where
158        S: Serializer,
159    {
160        use serde::ser::Error;
161
162        let mut bytes = vec![];
163        self.clone()
164            .serialize_into_writer(&mut bytes)
165            .map_err(S::Error::custom)?;
166
167        serializer.serialize_bytes(bytes.as_slice())
168    }
169}
170
171impl<'de> Deserialize<'de> for DataFrame {
172    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
173    where
174        D: Deserializer<'de>,
175    {
176        deserialize_map_bytes(deserializer, |b| {
177            let v = &mut b.as_ref();
178            let mut reader = std::io::Cursor::new(v);
179            Self::deserialize_from_reader(&mut reader)
180        })?
181        .map_err(D::Error::custom)
182    }
183}
184
185#[cfg(feature = "dsl-schema")]
186impl schemars::JsonSchema for DataFrame {
187    fn schema_name() -> std::borrow::Cow<'static, str> {
188        "DataFrame".into()
189    }
190
191    fn schema_id() -> std::borrow::Cow<'static, str> {
192        std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "DataFrame"))
193    }
194
195    fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
196        Vec::<u8>::json_schema(generator)
197    }
198}