polars_io/ndjson/
buffer.rs

1use std::hash::{Hash, Hasher};
2
3use arrow::types::NativeType;
4use num_traits::NumCast;
5use polars_core::frame::row::AnyValueBuffer;
6use polars_core::prelude::*;
7#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
8use polars_time::prelude::string::Pattern;
9#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
10use polars_time::prelude::string::infer::{DatetimeInfer, TryFromWithUnit, infer_pattern_single};
11use polars_utils::format_pl_smallstr;
12use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};
13
14#[derive(Debug, Clone, PartialEq)]
15pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
16impl Eq for BufferKey<'_> {}
17
18impl Hash for BufferKey<'_> {
19    fn hash<H: Hasher>(&self, state: &mut H) {
20        self.0.key().hash(state)
21    }
22}
23
24pub(crate) struct Buffer<'a> {
25    name: &'a str,
26    ignore_errors: bool,
27    buf: AnyValueBuffer<'a>,
28}
29
30impl Buffer<'_> {
31    pub fn into_series(self) -> PolarsResult<Series> {
32        let mut buf = self.buf;
33        let mut s = buf.reset(0, !self.ignore_errors)?;
34        s.rename(PlSmallStr::from_str(self.name));
35        Ok(s)
36    }
37
38    #[inline]
39    pub(crate) fn add(&mut self, value: &Value) -> PolarsResult<()> {
40        use AnyValueBuffer::*;
41        match &mut self.buf {
42            Boolean(buf) => {
43                match value {
44                    Value::Static(StaticNode::Bool(b)) => buf.append_value(*b),
45                    Value::Static(StaticNode::Null) => buf.append_null(),
46                    _ if self.ignore_errors => buf.append_null(),
47                    v => polars_bail!(ComputeError: "cannot parse '{}' as Boolean", v),
48                }
49                Ok(())
50            },
51            Int32(buf) => {
52                let n = deserialize_number::<i32>(value, "Int32", self.ignore_errors)?;
53                match n {
54                    Some(v) => buf.append_value(v),
55                    None => buf.append_null(),
56                }
57                Ok(())
58            },
59            Int64(buf) => {
60                let n = deserialize_number::<i64>(value, "Int64", self.ignore_errors)?;
61                match n {
62                    Some(v) => buf.append_value(v),
63                    None => buf.append_null(),
64                }
65                Ok(())
66            },
67            UInt64(buf) => {
68                let n = deserialize_number::<u64>(value, "UInt64", self.ignore_errors)?;
69                match n {
70                    Some(v) => buf.append_value(v),
71                    None => buf.append_null(),
72                }
73                Ok(())
74            },
75            UInt32(buf) => {
76                let n = deserialize_number::<u32>(value, "UInt32", self.ignore_errors)?;
77                match n {
78                    Some(v) => buf.append_value(v),
79                    None => buf.append_null(),
80                }
81                Ok(())
82            },
83            Float32(buf) => {
84                let n = deserialize_number::<f32>(value, "Float32", self.ignore_errors)?;
85                match n {
86                    Some(v) => buf.append_value(v),
87                    None => buf.append_null(),
88                }
89                Ok(())
90            },
91            Float64(buf) => {
92                let n = deserialize_number::<f64>(value, "Float64", self.ignore_errors)?;
93                match n {
94                    Some(v) => buf.append_value(v),
95                    None => buf.append_null(),
96                }
97                Ok(())
98            },
99
100            String(buf) => {
101                match value {
102                    Value::String(v) => buf.append_value(v),
103                    Value::Static(StaticNode::Null) => buf.append_null(),
104                    // Forcibly convert to String using the Display impl.
105                    v => buf.append_value(format_pl_smallstr!("{}", ValueDisplay(v))),
106                }
107                Ok(())
108            },
109            #[cfg(feature = "dtype-datetime")]
110            Datetime(buf, _, _) => {
111                let v = deserialize_datetime::<Int64Type>(value, "Datetime", self.ignore_errors)?;
112                buf.append_option(v);
113                Ok(())
114            },
115            #[cfg(feature = "dtype-date")]
116            Date(buf) => {
117                let v = deserialize_datetime::<Int32Type>(value, "Date", self.ignore_errors)?;
118                buf.append_option(v);
119                Ok(())
120            },
121            All(dtype, buf) => {
122                let av = deserialize_all(value, dtype, self.ignore_errors)?;
123                buf.push(av);
124                Ok(())
125            },
126            Null(builder) => {
127                if !(matches!(value, Value::Static(StaticNode::Null)) || self.ignore_errors) {
128                    polars_bail!(ComputeError: "got non-null value for NULL-typed column: {}", value)
129                };
130
131                builder.append_null();
132                Ok(())
133            },
134            _ => panic!("unexpected dtype when deserializing ndjson"),
135        }
136    }
137
138    pub fn add_null(&mut self) {
139        self.buf.add(AnyValue::Null).expect("should not fail");
140    }
141}
142pub(crate) fn init_buffers(
143    schema: &Schema,
144    capacity: usize,
145    ignore_errors: bool,
146) -> PolarsResult<PlIndexMap<BufferKey, Buffer>> {
147    schema
148        .iter()
149        .map(|(name, dtype)| {
150            let av_buf = (dtype, capacity).into();
151            let key = KnownKey::from(name.as_str());
152            Ok((
153                BufferKey(key),
154                Buffer {
155                    name,
156                    buf: av_buf,
157                    ignore_errors,
158                },
159            ))
160        })
161        .collect()
162}
163
164fn deserialize_number<T: NativeType + NumCast>(
165    value: &Value,
166    type_name: &str,
167    ignore_errors: bool,
168) -> PolarsResult<Option<T>> {
169    let to_result = |x: Option<T>| {
170        let out = if ignore_errors {
171            x
172        } else {
173            Some(x.ok_or_else(
174                || polars_err!(ComputeError: "cannot parse '{}' as {}", value, type_name),
175            )?)
176        };
177
178        Ok(out)
179    };
180
181    match value {
182        Value::Static(StaticNode::F64(f)) => to_result(num_traits::cast(*f)),
183        Value::Static(StaticNode::I64(i)) => to_result(num_traits::cast(*i)),
184        Value::Static(StaticNode::U64(u)) => to_result(num_traits::cast(*u)),
185        Value::Static(StaticNode::Bool(b)) => to_result(num_traits::cast(*b as i32)),
186        Value::Static(StaticNode::Null) => Ok(None),
187        _ => to_result(None),
188    }
189}
190
191#[cfg(feature = "dtype-datetime")]
192fn deserialize_datetime<T>(
193    value: &Value,
194    type_name: &str,
195    ignore_errors: bool,
196) -> PolarsResult<Option<T::Native>>
197where
198    T: PolarsNumericType,
199    DatetimeInfer<T>: TryFromWithUnit<Pattern>,
200{
201    match value {
202        Value::String(val) => {
203            if let Some(pattern) = infer_pattern_single(val) {
204                if let Ok(mut infer) =
205                    DatetimeInfer::try_from_with_unit(pattern, Some(TimeUnit::Microseconds))
206                {
207                    if let Some(v) = infer.parse(val) {
208                        return Ok(Some(v));
209                    }
210                }
211            }
212        },
213        Value::Static(StaticNode::Null) => return Ok(None),
214        _ => {},
215    };
216
217    if ignore_errors {
218        return Ok(None);
219    }
220
221    polars_bail!(ComputeError: "cannot parse '{}' as {}", value, type_name)
222}
223
224fn deserialize_all<'a>(
225    json: &Value,
226    dtype: &DataType,
227    ignore_errors: bool,
228) -> PolarsResult<AnyValue<'a>> {
229    if let DataType::String = dtype {
230        return Ok(match json {
231            Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
232            Value::Static(StaticNode::Null) => AnyValue::Null,
233            v => AnyValue::StringOwned(format_pl_smallstr!("{}", ValueDisplay(v))),
234        });
235    }
236
237    let out = match json {
238        Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
239        Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
240        Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
241        Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
242        Value::Static(StaticNode::Null) => AnyValue::Null,
243        Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
244        Value::Array(arr) => {
245            let Some(inner_dtype) = dtype.inner_dtype() else {
246                if ignore_errors {
247                    return Ok(AnyValue::Null);
248                }
249                polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json);
250            };
251            let vals: Vec<AnyValue> = arr
252                .iter()
253                .map(|val| deserialize_all(val, inner_dtype, ignore_errors))
254                .collect::<PolarsResult<_>>()?;
255            let strict = !ignore_errors;
256            let s =
257                Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &vals, inner_dtype, strict)?;
258            AnyValue::List(s)
259        },
260        #[cfg(feature = "dtype-struct")]
261        Value::Object(doc) => {
262            if let DataType::Struct(fields) = dtype {
263                let document = &**doc;
264
265                let vals = fields
266                    .iter()
267                    .map(|field| {
268                        if let Some(value) = document.get(field.name.as_str()) {
269                            deserialize_all(value, &field.dtype, ignore_errors)
270                        } else {
271                            Ok(AnyValue::Null)
272                        }
273                    })
274                    .collect::<PolarsResult<Vec<_>>>()?;
275                AnyValue::StructOwned(Box::new((vals, fields.clone())))
276            } else {
277                if ignore_errors {
278                    return Ok(AnyValue::Null);
279                }
280                polars_bail!(
281                    ComputeError: "expected {} in json value, got object", dtype,
282                );
283            }
284        },
285        #[cfg(not(feature = "dtype-struct"))]
286        val => AnyValue::StringOwned(format!("{:#?}", val).into()),
287    };
288    Ok(out)
289}
290
291/// Wrapper for serde_json's `Value` with a human-friendly Display impl for nested types:
292///
293/// * Default: `{"x": Static(U64(1))}`
294/// * ValueDisplay: `{x: 1}`
295///
296/// This intended for reading in arbitrary `Value` types into a String type. Note that the output
297/// is not guaranteed to be valid JSON as we don't do any escaping of e.g. quote/newline values.
298struct ValueDisplay<'a>(&'a Value<'a>);
299
300impl std::fmt::Display for ValueDisplay<'_> {
301    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302        use Value::*;
303
304        match self.0 {
305            Static(s) => write!(f, "{s}"),
306            String(s) => write!(f, r#""{s}""#),
307            Array(a) => {
308                write!(f, "[")?;
309
310                let mut iter = a.iter();
311
312                for v in (&mut iter).take(1) {
313                    write!(f, "{}", ValueDisplay(v))?;
314                }
315
316                for v in iter {
317                    write!(f, ", {}", ValueDisplay(v))?;
318                }
319
320                write!(f, "]")
321            },
322            Object(o) => {
323                write!(f, "{{")?;
324
325                let mut iter = o.iter();
326
327                for (k, v) in (&mut iter).take(1) {
328                    write!(f, r#""{}": {}"#, k, ValueDisplay(v))?;
329                }
330
331                for (k, v) in iter {
332                    write!(f, r#", "{}": {}"#, k, ValueDisplay(v))?;
333                }
334
335                write!(f, "}}")
336            },
337        }
338    }
339}