polars_io/ndjson/
buffer.rs

1use std::hash::{Hash, Hasher};
2
3use polars_core::frame::row::AnyValueBuffer;
4use polars_core::prelude::*;
5#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
6use polars_time::prelude::string::Pattern;
7#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
8use polars_time::prelude::string::infer::{DatetimeInfer, TryFromWithUnit, infer_pattern_single};
9use polars_utils::format_pl_smallstr;
10use simd_json::prelude::*;
11use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};
12
13#[derive(Debug, Clone, PartialEq)]
14pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
15impl Eq for BufferKey<'_> {}
16
17impl Hash for BufferKey<'_> {
18    fn hash<H: Hasher>(&self, state: &mut H) {
19        self.0.key().hash(state)
20    }
21}
22
23pub(crate) struct Buffer<'a> {
24    name: &'a str,
25    ignore_errors: bool,
26    buf: AnyValueBuffer<'a>,
27}
28
29impl Buffer<'_> {
30    pub fn into_series(self) -> PolarsResult<Series> {
31        let mut buf = self.buf;
32        let mut s = buf.reset(0, !self.ignore_errors)?;
33        s.rename(PlSmallStr::from_str(self.name));
34        Ok(s)
35    }
36
37    #[inline]
38    pub(crate) fn add(&mut self, value: &Value) -> PolarsResult<()> {
39        use AnyValueBuffer::*;
40        match &mut self.buf {
41            _ if value.is_null() => {
42                self.buf.add(AnyValue::Null);
43                Ok(())
44            },
45            Boolean(buf) => {
46                match value {
47                    Value::Static(StaticNode::Bool(v)) => buf.append_value(*v),
48                    Value::Static(StaticNode::Null) => buf.append_null(),
49                    _ if self.ignore_errors => buf.append_null(),
50                    v => {
51                        polars_bail!(ComputeError: "cannot parse '{}' ({}) as Boolean", v, v.value_type())
52                    },
53                }
54                Ok(())
55            },
56            Int32(buf) => {
57                let v =
58                    deserialize_numeric::<Int32Type>(value, value.as_i32(), self.ignore_errors)?;
59                buf.append_option(v);
60                Ok(())
61            },
62            Int64(buf) => {
63                let v =
64                    deserialize_numeric::<Int64Type>(value, value.as_i64(), self.ignore_errors)?;
65                buf.append_option(v);
66                Ok(())
67            },
68            UInt64(buf) => {
69                let v =
70                    deserialize_numeric::<UInt64Type>(value, value.as_u64(), self.ignore_errors)?;
71                buf.append_option(v);
72                Ok(())
73            },
74            UInt32(buf) => {
75                let v =
76                    deserialize_numeric::<UInt32Type>(value, value.as_u32(), self.ignore_errors)?;
77                buf.append_option(v);
78                Ok(())
79            },
80            Float32(buf) => {
81                let v = deserialize_numeric::<Float32Type>(
82                    value,
83                    value.cast_f64().map(|f| f as f32),
84                    self.ignore_errors,
85                )?;
86                buf.append_option(v);
87                Ok(())
88            },
89            Float64(buf) => {
90                let v = deserialize_numeric::<Float64Type>(
91                    value,
92                    value.cast_f64(),
93                    self.ignore_errors,
94                )?;
95                buf.append_option(v);
96                Ok(())
97            },
98            String(buf) => {
99                match value {
100                    Value::String(v) => buf.append_value(v),
101                    // Forcibly convert to String using the Display impl.
102                    v => buf.append_value(format_pl_smallstr!("{}", ValueDisplay(v))),
103                }
104                Ok(())
105            },
106            #[cfg(feature = "dtype-datetime")]
107            Datetime(buf, tu, _) => {
108                let v =
109                    deserialize_datetime::<Int64Type>(value, "Datetime", self.ignore_errors, *tu)?;
110                buf.append_option(v);
111                Ok(())
112            },
113            #[cfg(feature = "dtype-date")]
114            Date(buf) => {
115                let v = deserialize_datetime::<Int32Type>(
116                    value,
117                    "Date",
118                    self.ignore_errors,
119                    TimeUnit::Microseconds, // ignored
120                )?;
121                buf.append_option(v);
122                Ok(())
123            },
124            All(dtype, buf) => {
125                let av = deserialize_all(value, dtype, self.ignore_errors)?;
126                buf.push(av);
127                Ok(())
128            },
129            Null(builder) => {
130                if !(matches!(value, Value::Static(StaticNode::Null)) || self.ignore_errors) {
131                    polars_bail!(ComputeError: "got non-null value for NULL-typed column: {}", value)
132                };
133
134                builder.append_null();
135                Ok(())
136            },
137            _ => panic!("unexpected dtype when deserializing ndjson"),
138        }
139    }
140
141    pub fn add_null(&mut self) {
142        self.buf.add(AnyValue::Null).expect("should not fail");
143    }
144}
145pub(crate) fn init_buffers(
146    schema: &Schema,
147    capacity: usize,
148    ignore_errors: bool,
149) -> PolarsResult<PlIndexMap<BufferKey<'_>, Buffer<'_>>> {
150    schema
151        .iter()
152        .map(|(name, dtype)| {
153            let av_buf = (dtype, capacity).into();
154            let key = KnownKey::from(name.as_str());
155            Ok((
156                BufferKey(key),
157                Buffer {
158                    name,
159                    buf: av_buf,
160                    ignore_errors,
161                },
162            ))
163        })
164        .collect()
165}
166
167fn deserialize_numeric<T: PolarsNumericType>(
168    value: &Value,
169    n: Option<T::Native>,
170    ignore_errors: bool,
171) -> PolarsResult<Option<T::Native>> {
172    match n {
173        Some(v) => Ok(Some(v)),
174        None if ignore_errors => Ok(None),
175        None => Err(
176            polars_err!(ComputeError: "cannot parse '{}' ({}) as {:?}", value, value.value_type(), T::get_static_dtype()),
177        ),
178    }
179}
180
181#[cfg(feature = "dtype-datetime")]
182fn deserialize_datetime<T>(
183    value: &Value,
184    type_name: &str,
185    ignore_errors: bool,
186    tu: TimeUnit,
187) -> PolarsResult<Option<T::Native>>
188where
189    T: PolarsNumericType,
190    DatetimeInfer<T>: TryFromWithUnit<Pattern>,
191{
192    match value {
193        Value::String(val) => {
194            if let Some(pattern) = infer_pattern_single(val) {
195                if let Ok(mut infer) = DatetimeInfer::try_from_with_unit(pattern, Some(tu)) {
196                    if let Some(v) = infer.parse(val) {
197                        return Ok(Some(v));
198                    }
199                }
200            }
201        },
202        Value::Static(StaticNode::Null) => return Ok(None),
203        _ => {},
204    };
205
206    if ignore_errors {
207        return Ok(None);
208    }
209
210    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", value, value.value_type(), type_name)
211}
212
213fn deserialize_all<'a>(
214    json: &Value,
215    dtype: &DataType,
216    ignore_errors: bool,
217) -> PolarsResult<AnyValue<'a>> {
218    if let Value::Static(StaticNode::Null) = json {
219        return Ok(AnyValue::Null);
220    }
221    match dtype {
222        #[cfg(feature = "dtype-datetime")]
223        DataType::Date => {
224            let value = deserialize_datetime::<Int32Type>(
225                json,
226                "Date",
227                ignore_errors,
228                TimeUnit::Microseconds, // ignored
229            )?;
230            return Ok(if let Some(value) = value {
231                AnyValue::Date(value)
232            } else {
233                AnyValue::Null
234            });
235        },
236        #[cfg(feature = "dtype-datetime")]
237        DataType::Datetime(tu, tz) => {
238            let value = deserialize_datetime::<Int64Type>(json, "Datetime", ignore_errors, *tu)?;
239            return Ok(if let Some(value) = value {
240                AnyValue::DatetimeOwned(value, *tu, tz.as_ref().map(|s| Arc::from(s.clone())))
241            } else {
242                AnyValue::Null
243            });
244        },
245        #[cfg(feature = "dtype-f16")]
246        dt @ DataType::Float16 => {
247            use num_traits::AsPrimitive;
248            use polars_utils::float16::pf16;
249
250            return match json.cast_f64() {
251                Some(v) => Ok(AnyValue::Float16(AsPrimitive::<pf16>::as_(v))),
252                None if ignore_errors => Ok(AnyValue::Null),
253                None => {
254                    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
255                },
256            };
257        },
258        dt @ DataType::Float32 => {
259            return match json.cast_f64() {
260                Some(v) => Ok(AnyValue::Float32(v as f32)),
261                None if ignore_errors => Ok(AnyValue::Null),
262                None => {
263                    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
264                },
265            };
266        },
267        dt @ DataType::Float64 => {
268            return match json.cast_f64() {
269                Some(v) => Ok(AnyValue::Float64(v)),
270                None if ignore_errors => Ok(AnyValue::Null),
271                None => {
272                    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
273                },
274            };
275        },
276        DataType::String => {
277            return Ok(match json {
278                Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
279                v => AnyValue::StringOwned(format_pl_smallstr!("{}", ValueDisplay(v))),
280            });
281        },
282        dt if dt.is_primitive_numeric() => {
283            return match json.as_i128() {
284                Some(v) => Ok(AnyValue::Int128(v).into_static()),
285                None if ignore_errors => Ok(AnyValue::Null),
286                None => {
287                    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
288                },
289            };
290        },
291        _ => {},
292    }
293
294    let out = match json {
295        Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
296        Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
297        Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
298        Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
299        Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
300        Value::Array(arr) => {
301            let Some(inner_dtype) = dtype.inner_dtype() else {
302                if ignore_errors {
303                    return Ok(AnyValue::Null);
304                }
305                polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json);
306            };
307            let vals: Vec<AnyValue> = arr
308                .iter()
309                .map(|val| deserialize_all(val, inner_dtype, ignore_errors))
310                .collect::<PolarsResult<_>>()?;
311            let strict = !ignore_errors;
312            let s =
313                Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &vals, inner_dtype, strict)?;
314            AnyValue::List(s)
315        },
316        #[cfg(feature = "dtype-struct")]
317        Value::Object(doc) => {
318            if let DataType::Struct(fields) = dtype {
319                let document = &**doc;
320
321                let vals = fields
322                    .iter()
323                    .map(|field| {
324                        if let Some(value) = document.get(field.name.as_str()) {
325                            deserialize_all(value, &field.dtype, ignore_errors)
326                        } else {
327                            Ok(AnyValue::Null)
328                        }
329                    })
330                    .collect::<PolarsResult<Vec<_>>>()?;
331                AnyValue::StructOwned(Box::new((vals, fields.clone())))
332            } else {
333                if ignore_errors {
334                    return Ok(AnyValue::Null);
335                }
336                polars_bail!(
337                    ComputeError: "expected {} in json value, got object", dtype,
338                );
339            }
340        },
341        val => AnyValue::StringOwned(format!("{val:#?}").into()),
342    };
343    Ok(out)
344}
345
346/// Wrapper for serde_json's `Value` with a human-friendly Display impl for nested types:
347///
348/// * Default: `{"x": Static(U64(1))}`
349/// * ValueDisplay: `{x: 1}`
350///
351/// This intended for reading in arbitrary `Value` types into a String type. Note that the output
352/// is not guaranteed to be valid JSON as we don't do any escaping of e.g. quote/newline values.
353struct ValueDisplay<'a>(&'a Value<'a>);
354
355impl std::fmt::Display for ValueDisplay<'_> {
356    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357        use Value::*;
358
359        match self.0 {
360            Static(s) => write!(f, "{s}"),
361            String(s) => write!(f, r#""{s}""#),
362            Array(a) => {
363                write!(f, "[")?;
364
365                let mut iter = a.iter();
366
367                for v in (&mut iter).take(1) {
368                    write!(f, "{}", ValueDisplay(v))?;
369                }
370
371                for v in iter {
372                    write!(f, ", {}", ValueDisplay(v))?;
373                }
374
375                write!(f, "]")
376            },
377            Object(o) => {
378                write!(f, "{{")?;
379
380                let mut iter = o.iter();
381
382                for (k, v) in (&mut iter).take(1) {
383                    write!(f, r#""{}": {}"#, k, ValueDisplay(v))?;
384                }
385
386                for (k, v) in iter {
387                    write!(f, r#", "{}": {}"#, k, ValueDisplay(v))?;
388                }
389
390                write!(f, "}}")
391            },
392        }
393    }
394}