1use std::fmt::Write;
2use std::hash::{Hash, Hasher};
3
4use polars_core::frame::row::AnyValueBuffer;
5use polars_core::prelude::*;
6#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
7use polars_time::prelude::string::Pattern;
8#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
9use polars_time::prelude::string::infer::{DatetimeInfer, TryFromWithUnit, infer_pattern_single};
10use polars_utils::format_pl_smallstr;
11use simd_json::prelude::*;
12use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};
13
14#[derive(Debug, Clone, PartialEq)]
15pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
16impl Eq for BufferKey<'_> {}
17
18impl Hash for BufferKey<'_> {
19 fn hash<H: Hasher>(&self, state: &mut H) {
20 self.0.key().hash(state)
21 }
22}
23
24pub(crate) struct Buffer<'a> {
25 name: &'a str,
26 ignore_errors: bool,
27 buf: AnyValueBuffer<'a>,
28}
29
30impl Buffer<'_> {
31 pub fn into_series(self) -> PolarsResult<Series> {
32 let mut buf = self.buf;
33 let mut s = buf.reset(0, !self.ignore_errors)?;
34 s.rename(PlSmallStr::from_str(self.name));
35 Ok(s)
36 }
37
38 #[inline]
39 pub(crate) fn add(&mut self, value: &Value) -> PolarsResult<()> {
40 use AnyValueBuffer::*;
41 match &mut self.buf {
42 _ if value.is_null() => {
43 self.buf.add(AnyValue::Null);
44 Ok(())
45 },
46 Boolean(buf) => {
47 match value {
48 Value::Static(StaticNode::Bool(v)) => buf.append_value(*v),
49 Value::Static(StaticNode::Null) => buf.append_null(),
50 _ if self.ignore_errors => buf.append_null(),
51 v => {
52 polars_bail!(ComputeError: "cannot parse '{}' ({}) as Boolean", v, v.value_type())
53 },
54 }
55 Ok(())
56 },
57 Int32(buf) => {
58 let v =
59 deserialize_numeric::<Int32Type>(value, value.as_i32(), self.ignore_errors)?;
60 buf.append_option(v);
61 Ok(())
62 },
63 Int64(buf) => {
64 let v =
65 deserialize_numeric::<Int64Type>(value, value.as_i64(), self.ignore_errors)?;
66 buf.append_option(v);
67 Ok(())
68 },
69 UInt64(buf) => {
70 let v =
71 deserialize_numeric::<UInt64Type>(value, value.as_u64(), self.ignore_errors)?;
72 buf.append_option(v);
73 Ok(())
74 },
75 UInt32(buf) => {
76 let v =
77 deserialize_numeric::<UInt32Type>(value, value.as_u32(), self.ignore_errors)?;
78 buf.append_option(v);
79 Ok(())
80 },
81 Float32(buf) => {
82 let v = deserialize_numeric::<Float32Type>(
83 value,
84 value.cast_f64().map(|f| f as f32),
85 self.ignore_errors,
86 )?;
87 buf.append_option(v);
88 Ok(())
89 },
90 Float64(buf) => {
91 let v = deserialize_numeric::<Float64Type>(
92 value,
93 value.cast_f64(),
94 self.ignore_errors,
95 )?;
96 buf.append_option(v);
97 Ok(())
98 },
99 String(buf) => {
100 match value {
101 Value::String(v) => buf.append_value(v),
102 v => buf.append_value(format_pl_smallstr!("{}", ValueDisplay(v))),
104 }
105 Ok(())
106 },
107 #[cfg(feature = "dtype-datetime")]
108 Datetime(buf, tu, _) => {
109 let v =
110 deserialize_datetime::<Int64Type>(value, "Datetime", self.ignore_errors, *tu)?;
111 buf.append_option(v);
112 Ok(())
113 },
114 #[cfg(feature = "dtype-date")]
115 Date(buf) => {
116 let v = deserialize_datetime::<Int32Type>(
117 value,
118 "Date",
119 self.ignore_errors,
120 TimeUnit::Microseconds, )?;
122 buf.append_option(v);
123 Ok(())
124 },
125 All(dtype, buf) => {
126 let av = deserialize_all(value, dtype, self.ignore_errors)?;
127 buf.push(av);
128 Ok(())
129 },
130 Null(builder) => {
131 if !(matches!(value, Value::Static(StaticNode::Null)) || self.ignore_errors) {
132 polars_bail!(ComputeError: "got non-null value for NULL-typed column: {}", value)
133 };
134
135 builder.append_null();
136 Ok(())
137 },
138 _ => panic!("unexpected dtype when deserializing ndjson"),
139 }
140 }
141
142 pub fn add_null(&mut self) {
143 self.buf.add(AnyValue::Null).expect("should not fail");
144 }
145}
146pub(crate) fn init_buffers(
147 schema: &Schema,
148 capacity: usize,
149 ignore_errors: bool,
150) -> PolarsResult<PlIndexMap<BufferKey<'_>, Buffer<'_>>> {
151 schema
152 .iter()
153 .map(|(name, dtype)| {
154 let av_buf = (dtype, capacity).into();
155 let key = KnownKey::from(name.as_str());
156 Ok((
157 BufferKey(key),
158 Buffer {
159 name,
160 buf: av_buf,
161 ignore_errors,
162 },
163 ))
164 })
165 .collect()
166}
167
168fn deserialize_numeric<T: PolarsNumericType>(
169 value: &Value,
170 n: Option<T::Native>,
171 ignore_errors: bool,
172) -> PolarsResult<Option<T::Native>> {
173 match n {
174 Some(v) => Ok(Some(v)),
175 None if ignore_errors => Ok(None),
176 None => Err(
177 polars_err!(ComputeError: "cannot parse '{}' ({}) as {:?}", value, value.value_type(), T::get_static_dtype()),
178 ),
179 }
180}
181
182#[cfg(feature = "dtype-datetime")]
183fn deserialize_datetime<T>(
184 value: &Value,
185 type_name: &str,
186 ignore_errors: bool,
187 tu: TimeUnit,
188) -> PolarsResult<Option<T::Native>>
189where
190 T: PolarsNumericType,
191 DatetimeInfer<T>: TryFromWithUnit<Pattern>,
192{
193 match value {
194 Value::String(val) => {
195 if let Some(pattern) = infer_pattern_single(val) {
196 if let Ok(mut infer) = DatetimeInfer::try_from_with_unit(pattern, Some(tu)) {
197 if let Some(v) = infer.parse(val) {
198 return Ok(Some(v));
199 }
200 }
201 }
202 },
203 Value::Static(StaticNode::Null) => return Ok(None),
204 _ => {},
205 };
206
207 if ignore_errors {
208 return Ok(None);
209 }
210
211 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", value, value.value_type(), type_name)
212}
213
214fn deserialize_all<'a>(
215 json: &Value,
216 dtype: &DataType,
217 ignore_errors: bool,
218) -> PolarsResult<AnyValue<'a>> {
219 if let Value::Static(StaticNode::Null) = json {
220 return Ok(AnyValue::Null);
221 }
222 match dtype {
223 #[cfg(feature = "dtype-datetime")]
224 DataType::Date => {
225 let value = deserialize_datetime::<Int32Type>(
226 json,
227 "Date",
228 ignore_errors,
229 TimeUnit::Microseconds, )?;
231 return Ok(if let Some(value) = value {
232 AnyValue::Date(value)
233 } else {
234 AnyValue::Null
235 });
236 },
237 #[cfg(feature = "dtype-datetime")]
238 DataType::Datetime(tu, tz) => {
239 let value = deserialize_datetime::<Int64Type>(json, "Datetime", ignore_errors, *tu)?;
240 return Ok(if let Some(value) = value {
241 AnyValue::DatetimeOwned(value, *tu, tz.as_ref().map(|s| Arc::from(s.clone())))
242 } else {
243 AnyValue::Null
244 });
245 },
246 #[cfg(feature = "dtype-f16")]
247 dt @ DataType::Float16 => {
248 use num_traits::AsPrimitive;
249 use polars_utils::float16::pf16;
250
251 return match json.cast_f64() {
252 Some(v) => Ok(AnyValue::Float16(AsPrimitive::<pf16>::as_(v))),
253 None if ignore_errors => Ok(AnyValue::Null),
254 None => {
255 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
256 },
257 };
258 },
259 dt @ DataType::Float32 => {
260 return match json.cast_f64() {
261 Some(v) => Ok(AnyValue::Float32(v as f32)),
262 None if ignore_errors => Ok(AnyValue::Null),
263 None => {
264 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
265 },
266 };
267 },
268 dt @ DataType::Float64 => {
269 return match json.cast_f64() {
270 Some(v) => Ok(AnyValue::Float64(v)),
271 None if ignore_errors => Ok(AnyValue::Null),
272 None => {
273 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
274 },
275 };
276 },
277 DataType::String => {
278 return Ok(match json {
279 Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
280 v => AnyValue::StringOwned(format_pl_smallstr!("{}", ValueDisplay(v))),
281 });
282 },
283 dt if dt.is_primitive_numeric() => {
284 return match json.as_i128() {
285 Some(v) => Ok(AnyValue::Int128(v).into_static()),
286 None if ignore_errors => Ok(AnyValue::Null),
287 None => {
288 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
289 },
290 };
291 },
292 _ => {},
293 }
294
295 let out = match json {
296 Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
297 Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
298 Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
299 Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
300 Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
301 Value::Array(arr) => {
302 let Some(inner_dtype) = dtype.inner_dtype() else {
303 if ignore_errors {
304 return Ok(AnyValue::Null);
305 }
306 polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json);
307 };
308 let vals: Vec<AnyValue> = arr
309 .iter()
310 .map(|val| deserialize_all(val, inner_dtype, ignore_errors))
311 .collect::<PolarsResult<_>>()?;
312 let strict = !ignore_errors;
313 let s =
314 Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &vals, inner_dtype, strict)?;
315 AnyValue::List(s)
316 },
317 #[cfg(feature = "dtype-struct")]
318 Value::Object(doc) => {
319 if let DataType::Struct(fields) = dtype {
320 let document = &**doc;
321
322 let vals = fields
323 .iter()
324 .map(|field| {
325 if let Some(value) = document.get(field.name.as_str()) {
326 deserialize_all(value, &field.dtype, ignore_errors)
327 } else {
328 Ok(AnyValue::Null)
329 }
330 })
331 .collect::<PolarsResult<Vec<_>>>()?;
332 AnyValue::StructOwned(Box::new((vals, fields.clone())))
333 } else {
334 if ignore_errors {
335 return Ok(AnyValue::Null);
336 }
337 polars_bail!(
338 ComputeError: "expected {} in json value, got object", dtype,
339 );
340 }
341 },
342 val => AnyValue::StringOwned(format!("{val:#?}").into()),
343 };
344 Ok(out)
345}
346
347struct ValueDisplay<'a>(&'a Value<'a>);
355
356impl std::fmt::Display for ValueDisplay<'_> {
357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358 use std::fmt::Display;
359
360 use Value::*;
361
362 match self.0 {
363 Static(s) => Display::fmt(s, f),
364 String(s) => {
365 f.write_char('"')?;
366
367 let s: &mut &[u8] = &mut s.as_bytes();
368
369 while !s.is_empty() {
370 f.write_str({
371 let i = memchr::memchr3(b'"', b'\n', b'\r', s);
372
373 unsafe {
375 str::from_utf8_unchecked(s.split_off(..i.unwrap_or(s.len())).unwrap())
376 }
377 })?;
378
379 if let Some(&[c]) = s.split_off(..1) {
380 match c {
381 b'"' => f.write_str(r#"\""#)?,
382 b'\n' => f.write_str(r#"\n"#)?,
383 b'\r' => f.write_str(r#"\r"#)?,
384 _ => unreachable!(),
385 }
386 }
387 }
388
389 f.write_char('"')?;
390
391 Ok(())
392 },
393 Array(a) => {
394 f.write_char('[')?;
395
396 let mut iter = a.iter();
397
398 for v in (&mut iter).take(1) {
399 ValueDisplay(v).fmt(f)?;
400 }
401
402 for v in iter {
403 f.write_str(", ")?;
404 ValueDisplay(v).fmt(f)?;
405 }
406
407 f.write_char(']')
408 },
409 Object(o) => {
410 f.write_char('{')?;
411
412 let mut iter = o.iter();
413
414 for (k, v) in (&mut iter).take(1) {
415 f.write_char('"')?;
416
417 f.write_str(k)?;
418 f.write_str(r#"": "#)?;
419 ValueDisplay(v).fmt(f)?;
420 }
421
422 for (k, v) in iter {
423 f.write_str(r#", ""#)?;
424
425 f.write_str(k)?;
426 f.write_str(r#"": "#)?;
427 ValueDisplay(v).fmt(f)?;
428 }
429
430 f.write_char('}')
431 },
432 }
433 }
434}