Skip to main content

polars_io/ndjson/
buffer.rs

1use std::fmt::Write;
2use std::hash::{Hash, Hasher};
3
4use polars_core::frame::row::AnyValueBuffer;
5use polars_core::prelude::*;
6#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
7use polars_time::prelude::string::Pattern;
8#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
9use polars_time::prelude::string::infer::{DatetimeInfer, TryFromWithUnit, infer_pattern_single};
10use polars_utils::format_pl_smallstr;
11use simd_json::prelude::*;
12use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};
13
14#[derive(Debug, Clone, PartialEq)]
15pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
16impl Eq for BufferKey<'_> {}
17
18impl Hash for BufferKey<'_> {
19    fn hash<H: Hasher>(&self, state: &mut H) {
20        self.0.key().hash(state)
21    }
22}
23
24pub(crate) struct Buffer<'a> {
25    name: &'a str,
26    ignore_errors: bool,
27    buf: AnyValueBuffer<'a>,
28}
29
30impl Buffer<'_> {
31    pub fn into_series(self) -> PolarsResult<Series> {
32        let mut buf = self.buf;
33        let mut s = buf.reset(0, !self.ignore_errors)?;
34        s.rename(PlSmallStr::from_str(self.name));
35        Ok(s)
36    }
37
38    #[inline]
39    pub(crate) fn add(&mut self, value: &Value) -> PolarsResult<()> {
40        use AnyValueBuffer::*;
41        match &mut self.buf {
42            _ if value.is_null() => {
43                self.buf.add(AnyValue::Null);
44                Ok(())
45            },
46            Boolean(buf) => {
47                match value {
48                    Value::Static(StaticNode::Bool(v)) => buf.append_value(*v),
49                    Value::Static(StaticNode::Null) => buf.append_null(),
50                    _ if self.ignore_errors => buf.append_null(),
51                    v => {
52                        polars_bail!(ComputeError: "cannot parse '{}' ({}) as Boolean", v, v.value_type())
53                    },
54                }
55                Ok(())
56            },
57            Int32(buf) => {
58                let v =
59                    deserialize_numeric::<Int32Type>(value, value.as_i32(), self.ignore_errors)?;
60                buf.append_option(v);
61                Ok(())
62            },
63            Int64(buf) => {
64                let v =
65                    deserialize_numeric::<Int64Type>(value, value.as_i64(), self.ignore_errors)?;
66                buf.append_option(v);
67                Ok(())
68            },
69            UInt64(buf) => {
70                let v =
71                    deserialize_numeric::<UInt64Type>(value, value.as_u64(), self.ignore_errors)?;
72                buf.append_option(v);
73                Ok(())
74            },
75            UInt32(buf) => {
76                let v =
77                    deserialize_numeric::<UInt32Type>(value, value.as_u32(), self.ignore_errors)?;
78                buf.append_option(v);
79                Ok(())
80            },
81            Float32(buf) => {
82                let v = deserialize_numeric::<Float32Type>(
83                    value,
84                    value.cast_f64().map(|f| f as f32),
85                    self.ignore_errors,
86                )?;
87                buf.append_option(v);
88                Ok(())
89            },
90            Float64(buf) => {
91                let v = deserialize_numeric::<Float64Type>(
92                    value,
93                    value.cast_f64(),
94                    self.ignore_errors,
95                )?;
96                buf.append_option(v);
97                Ok(())
98            },
99            String(buf) => {
100                match value {
101                    Value::String(v) => buf.append_value(v),
102                    // Forcibly convert to String using the Display impl.
103                    v => buf.append_value(format_pl_smallstr!("{}", ValueDisplay(v))),
104                }
105                Ok(())
106            },
107            #[cfg(feature = "dtype-datetime")]
108            Datetime(buf, tu, _) => {
109                let v =
110                    deserialize_datetime::<Int64Type>(value, "Datetime", self.ignore_errors, *tu)?;
111                buf.append_option(v);
112                Ok(())
113            },
114            #[cfg(feature = "dtype-date")]
115            Date(buf) => {
116                let v = deserialize_datetime::<Int32Type>(
117                    value,
118                    "Date",
119                    self.ignore_errors,
120                    TimeUnit::Microseconds, // ignored
121                )?;
122                buf.append_option(v);
123                Ok(())
124            },
125            All(dtype, buf) => {
126                let av = deserialize_all(value, dtype, self.ignore_errors)?;
127                buf.push(av);
128                Ok(())
129            },
130            Null(builder) => {
131                if !(matches!(value, Value::Static(StaticNode::Null)) || self.ignore_errors) {
132                    polars_bail!(ComputeError: "got non-null value for NULL-typed column: {}", value)
133                };
134
135                builder.append_null();
136                Ok(())
137            },
138            _ => panic!("unexpected dtype when deserializing ndjson"),
139        }
140    }
141
142    pub fn add_null(&mut self) {
143        self.buf.add(AnyValue::Null).expect("should not fail");
144    }
145}
146pub(crate) fn init_buffers(
147    schema: &Schema,
148    capacity: usize,
149    ignore_errors: bool,
150) -> PolarsResult<PlIndexMap<BufferKey<'_>, Buffer<'_>>> {
151    schema
152        .iter()
153        .map(|(name, dtype)| {
154            let av_buf = (dtype, capacity).into();
155            let key = KnownKey::from(name.as_str());
156            Ok((
157                BufferKey(key),
158                Buffer {
159                    name,
160                    buf: av_buf,
161                    ignore_errors,
162                },
163            ))
164        })
165        .collect()
166}
167
168fn deserialize_numeric<T: PolarsNumericType>(
169    value: &Value,
170    n: Option<T::Native>,
171    ignore_errors: bool,
172) -> PolarsResult<Option<T::Native>> {
173    match n {
174        Some(v) => Ok(Some(v)),
175        None if ignore_errors => Ok(None),
176        None => Err(
177            polars_err!(ComputeError: "cannot parse '{}' ({}) as {:?}", value, value.value_type(), T::get_static_dtype()),
178        ),
179    }
180}
181
182#[cfg(feature = "dtype-datetime")]
183fn deserialize_datetime<T>(
184    value: &Value,
185    type_name: &str,
186    ignore_errors: bool,
187    tu: TimeUnit,
188) -> PolarsResult<Option<T::Native>>
189where
190    T: PolarsNumericType,
191    DatetimeInfer<T>: TryFromWithUnit<Pattern>,
192{
193    match value {
194        Value::String(val) => {
195            if let Some(pattern) = infer_pattern_single(val) {
196                if let Ok(mut infer) = DatetimeInfer::try_from_with_unit(pattern, Some(tu)) {
197                    if let Some(v) = infer.parse(val) {
198                        return Ok(Some(v));
199                    }
200                }
201            }
202        },
203        Value::Static(StaticNode::Null) => return Ok(None),
204        _ => {},
205    };
206
207    if ignore_errors {
208        return Ok(None);
209    }
210
211    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", value, value.value_type(), type_name)
212}
213
214fn deserialize_all<'a>(
215    json: &Value,
216    dtype: &DataType,
217    ignore_errors: bool,
218) -> PolarsResult<AnyValue<'a>> {
219    if let Value::Static(StaticNode::Null) = json {
220        return Ok(AnyValue::Null);
221    }
222    match dtype {
223        #[cfg(feature = "dtype-datetime")]
224        DataType::Date => {
225            let value = deserialize_datetime::<Int32Type>(
226                json,
227                "Date",
228                ignore_errors,
229                TimeUnit::Microseconds, // ignored
230            )?;
231            return Ok(if let Some(value) = value {
232                AnyValue::Date(value)
233            } else {
234                AnyValue::Null
235            });
236        },
237        #[cfg(feature = "dtype-datetime")]
238        DataType::Datetime(tu, tz) => {
239            let value = deserialize_datetime::<Int64Type>(json, "Datetime", ignore_errors, *tu)?;
240            return Ok(if let Some(value) = value {
241                AnyValue::DatetimeOwned(value, *tu, tz.as_ref().map(|s| Arc::from(s.clone())))
242            } else {
243                AnyValue::Null
244            });
245        },
246        #[cfg(feature = "dtype-f16")]
247        dt @ DataType::Float16 => {
248            use num_traits::AsPrimitive;
249            use polars_utils::float16::pf16;
250
251            return match json.cast_f64() {
252                Some(v) => Ok(AnyValue::Float16(AsPrimitive::<pf16>::as_(v))),
253                None if ignore_errors => Ok(AnyValue::Null),
254                None => {
255                    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
256                },
257            };
258        },
259        dt @ DataType::Float32 => {
260            return match json.cast_f64() {
261                Some(v) => Ok(AnyValue::Float32(v as f32)),
262                None if ignore_errors => Ok(AnyValue::Null),
263                None => {
264                    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
265                },
266            };
267        },
268        dt @ DataType::Float64 => {
269            return match json.cast_f64() {
270                Some(v) => Ok(AnyValue::Float64(v)),
271                None if ignore_errors => Ok(AnyValue::Null),
272                None => {
273                    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
274                },
275            };
276        },
277        DataType::String => {
278            return Ok(match json {
279                Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
280                v => AnyValue::StringOwned(format_pl_smallstr!("{}", ValueDisplay(v))),
281            });
282        },
283        dt if dt.is_primitive_numeric() => {
284            return match json.as_i128() {
285                Some(v) => Ok(AnyValue::Int128(v).into_static()),
286                None if ignore_errors => Ok(AnyValue::Null),
287                None => {
288                    polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
289                },
290            };
291        },
292        _ => {},
293    }
294
295    let out = match json {
296        Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
297        Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
298        Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
299        Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
300        Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
301        Value::Array(arr) => {
302            let Some(inner_dtype) = dtype.inner_dtype() else {
303                if ignore_errors {
304                    return Ok(AnyValue::Null);
305                }
306                polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json);
307            };
308            let vals: Vec<AnyValue> = arr
309                .iter()
310                .map(|val| deserialize_all(val, inner_dtype, ignore_errors))
311                .collect::<PolarsResult<_>>()?;
312            let strict = !ignore_errors;
313            let s =
314                Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &vals, inner_dtype, strict)?;
315            AnyValue::List(s)
316        },
317        #[cfg(feature = "dtype-struct")]
318        Value::Object(doc) => {
319            if let DataType::Struct(fields) = dtype {
320                let document = &**doc;
321
322                let vals = fields
323                    .iter()
324                    .map(|field| {
325                        if let Some(value) = document.get(field.name.as_str()) {
326                            deserialize_all(value, &field.dtype, ignore_errors)
327                        } else {
328                            Ok(AnyValue::Null)
329                        }
330                    })
331                    .collect::<PolarsResult<Vec<_>>>()?;
332                AnyValue::StructOwned(Box::new((vals, fields.clone())))
333            } else {
334                if ignore_errors {
335                    return Ok(AnyValue::Null);
336                }
337                polars_bail!(
338                    ComputeError: "expected {} in json value, got object", dtype,
339                );
340            }
341        },
342        val => AnyValue::StringOwned(format!("{val:#?}").into()),
343    };
344    Ok(out)
345}
346
347/// Wrapper for serde_json's `Value` with a human-friendly Display impl for nested types:
348///
349/// * Default: `{"x": Static(U64(1))}`
350/// * ValueDisplay: `{x: 1}`
351///
352/// This intended for reading in arbitrary `Value` types into a String type. Note that the output
353/// is not guaranteed to be valid JSON as we don't do any escaping of e.g. quote/newline values.
354struct ValueDisplay<'a>(&'a Value<'a>);
355
356impl std::fmt::Display for ValueDisplay<'_> {
357    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358        use std::fmt::Display;
359
360        use Value::*;
361
362        match self.0 {
363            Static(s) => Display::fmt(s, f),
364            String(s) => {
365                f.write_char('"')?;
366
367                let s: &mut &[u8] = &mut s.as_bytes();
368
369                while !s.is_empty() {
370                    f.write_str({
371                        let i = memchr::memchr3(b'"', b'\n', b'\r', s);
372
373                        // Safety: If `i` is `Some(_)`, it points to an ASCII char.
374                        unsafe {
375                            str::from_utf8_unchecked(s.split_off(..i.unwrap_or(s.len())).unwrap())
376                        }
377                    })?;
378
379                    if let Some(&[c]) = s.split_off(..1) {
380                        match c {
381                            b'"' => f.write_str(r#"\""#)?,
382                            b'\n' => f.write_str(r#"\n"#)?,
383                            b'\r' => f.write_str(r#"\r"#)?,
384                            _ => unreachable!(),
385                        }
386                    }
387                }
388
389                f.write_char('"')?;
390
391                Ok(())
392            },
393            Array(a) => {
394                f.write_char('[')?;
395
396                let mut iter = a.iter();
397
398                for v in (&mut iter).take(1) {
399                    ValueDisplay(v).fmt(f)?;
400                }
401
402                for v in iter {
403                    f.write_str(", ")?;
404                    ValueDisplay(v).fmt(f)?;
405                }
406
407                f.write_char(']')
408            },
409            Object(o) => {
410                f.write_char('{')?;
411
412                let mut iter = o.iter();
413
414                for (k, v) in (&mut iter).take(1) {
415                    f.write_char('"')?;
416
417                    f.write_str(k)?;
418                    f.write_str(r#"": "#)?;
419                    ValueDisplay(v).fmt(f)?;
420                }
421
422                for (k, v) in iter {
423                    f.write_str(r#", ""#)?;
424
425                    f.write_str(k)?;
426                    f.write_str(r#"": "#)?;
427                    ValueDisplay(v).fmt(f)?;
428                }
429
430                f.write_char('}')
431            },
432        }
433    }
434}