polars_utils/
pl_serialize.rs

1//! Centralized Polars serialization entry.
2//!
3//! Currently provides two serialization scheme's.
4//! - Self-describing (and thus more forward compatible) activated with `FC: true`
5//! - Compact activated with `FC: false`
6use polars_error::{PolarsResult, to_compute_err};
7
8fn serialize_impl<W, T, const FC: bool>(writer: W, value: &T) -> PolarsResult<()>
9where
10    W: std::io::Write,
11    T: serde::ser::Serialize,
12{
13    if FC {
14        let mut s = rmp_serde::Serializer::new(writer).with_struct_map();
15        value.serialize(&mut s).map_err(to_compute_err)
16    } else {
17        bincode::serialize_into(writer, value).map_err(to_compute_err)
18    }
19}
20
21pub fn deserialize_impl<T, R, const FC: bool>(reader: R) -> PolarsResult<T>
22where
23    T: serde::de::DeserializeOwned,
24    R: std::io::Read,
25{
26    if FC {
27        rmp_serde::from_read(reader).map_err(to_compute_err)
28    } else {
29        bincode::deserialize_from(reader).map_err(to_compute_err)
30    }
31}
32
33/// Mainly used to enable compression when serializing the final outer value.
34/// For intermediate serialization steps, the function in the module should
35/// be used instead.
36pub struct SerializeOptions {
37    compression: bool,
38}
39
40impl SerializeOptions {
41    pub fn with_compression(mut self, compression: bool) -> Self {
42        self.compression = compression;
43        self
44    }
45
46    pub fn serialize_into_writer<W, T, const FC: bool>(
47        &self,
48        writer: W,
49        value: &T,
50    ) -> PolarsResult<()>
51    where
52        W: std::io::Write,
53        T: serde::ser::Serialize,
54    {
55        if self.compression {
56            let writer = flate2::write::ZlibEncoder::new(writer, flate2::Compression::fast());
57            serialize_impl::<_, _, FC>(writer, value)
58        } else {
59            serialize_impl::<_, _, FC>(writer, value)
60        }
61    }
62
63    pub fn deserialize_from_reader<T, R, const FC: bool>(&self, reader: R) -> PolarsResult<T>
64    where
65        T: serde::de::DeserializeOwned,
66        R: std::io::Read,
67    {
68        if self.compression {
69            deserialize_impl::<_, _, FC>(flate2::read::ZlibDecoder::new(reader))
70        } else {
71            deserialize_impl::<_, _, FC>(reader)
72        }
73    }
74
75    pub fn serialize_to_bytes<T, const FC: bool>(&self, value: &T) -> PolarsResult<Vec<u8>>
76    where
77        T: serde::ser::Serialize,
78    {
79        let mut v = vec![];
80
81        self.serialize_into_writer::<_, _, FC>(&mut v, value)?;
82
83        Ok(v)
84    }
85}
86
87#[allow(clippy::derivable_impls)]
88impl Default for SerializeOptions {
89    fn default() -> Self {
90        Self { compression: false }
91    }
92}
93
94pub fn serialize_into_writer<W, T, const FC: bool>(writer: W, value: &T) -> PolarsResult<()>
95where
96    W: std::io::Write,
97    T: serde::ser::Serialize,
98{
99    serialize_impl::<_, _, FC>(writer, value)
100}
101
102pub fn deserialize_from_reader<T, R, const FC: bool>(reader: R) -> PolarsResult<T>
103where
104    T: serde::de::DeserializeOwned,
105    R: std::io::Read,
106{
107    deserialize_impl::<_, _, FC>(reader)
108}
109
110pub fn serialize_to_bytes<T, const FC: bool>(value: &T) -> PolarsResult<Vec<u8>>
111where
112    T: serde::ser::Serialize,
113{
114    let mut v = vec![];
115
116    serialize_into_writer::<_, _, FC>(&mut v, value)?;
117
118    Ok(v)
119}
120
121/// Potentially avoids copying memory compared to a naive `Vec::<u8>::deserialize`.
122///
123/// This is essentially boilerplate for visiting bytes without copying where possible.
124pub fn deserialize_map_bytes<'de, D, O>(
125    deserializer: D,
126    mut func: impl for<'b> FnMut(std::borrow::Cow<'b, [u8]>) -> O,
127) -> Result<O, D::Error>
128where
129    D: serde::de::Deserializer<'de>,
130{
131    // Lets us avoid monomorphizing the visitor
132    let mut out: Option<O> = None;
133    struct V<'f>(&'f mut (dyn for<'b> FnMut(std::borrow::Cow<'b, [u8]>)));
134
135    deserializer.deserialize_bytes(V(&mut |v| drop(out.replace(func(v)))))?;
136
137    return Ok(out.unwrap());
138
139    impl<'de> serde::de::Visitor<'de> for V<'_> {
140        type Value = ();
141
142        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
143            formatter.write_str("deserialize_map_bytes")
144        }
145
146        fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
147        where
148            E: serde::de::Error,
149        {
150            self.0(std::borrow::Cow::Borrowed(v));
151            Ok(())
152        }
153
154        fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
155        where
156            E: serde::de::Error,
157        {
158            self.0(std::borrow::Cow::Owned(v));
159            Ok(())
160        }
161
162        fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
163        where
164            A: serde::de::SeqAccess<'de>,
165        {
166            // This is not ideal, but we hit here if the serialization format is JSON.
167            let bytes = std::iter::from_fn(|| seq.next_element::<u8>().transpose())
168                .collect::<Result<Vec<_>, A::Error>>()?;
169
170            self.0(std::borrow::Cow::Owned(bytes));
171            Ok(())
172        }
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    #[test]
179    fn test_serde_skip_enum() {
180        #[derive(Default, Debug, PartialEq)]
181        struct MyType(Option<usize>);
182
183        // Note: serde(skip) must be at the end of enums
184        #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
185        enum Enum {
186            A,
187            #[serde(skip)]
188            B(MyType),
189        }
190
191        impl Default for Enum {
192            fn default() -> Self {
193                Self::B(MyType(None))
194            }
195        }
196
197        let v = Enum::A;
198        let b = super::serialize_to_bytes::<_, false>(&v).unwrap();
199        let r: Enum = super::deserialize_from_reader::<_, _, false>(b.as_slice()).unwrap();
200
201        assert_eq!(r, v);
202
203        let v = Enum::A;
204        let b = super::SerializeOptions::default()
205            .serialize_to_bytes::<_, false>(&v)
206            .unwrap();
207        let r: Enum = super::SerializeOptions::default()
208            .deserialize_from_reader::<_, _, false>(b.as_slice())
209            .unwrap();
210
211        assert_eq!(r, v);
212    }
213}