1use std::hash::{Hash, Hasher};
2
3use polars_core::frame::row::AnyValueBuffer;
4use polars_core::prelude::*;
5#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
6use polars_time::prelude::string::Pattern;
7#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
8use polars_time::prelude::string::infer::{DatetimeInfer, TryFromWithUnit, infer_pattern_single};
9use polars_utils::format_pl_smallstr;
10use simd_json::prelude::*;
11use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};
12
13#[derive(Debug, Clone, PartialEq)]
14pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
15impl Eq for BufferKey<'_> {}
16
17impl Hash for BufferKey<'_> {
18 fn hash<H: Hasher>(&self, state: &mut H) {
19 self.0.key().hash(state)
20 }
21}
22
23pub(crate) struct Buffer<'a> {
24 name: &'a str,
25 ignore_errors: bool,
26 buf: AnyValueBuffer<'a>,
27}
28
29impl Buffer<'_> {
30 pub fn into_series(self) -> PolarsResult<Series> {
31 let mut buf = self.buf;
32 let mut s = buf.reset(0, !self.ignore_errors)?;
33 s.rename(PlSmallStr::from_str(self.name));
34 Ok(s)
35 }
36
37 #[inline]
38 pub(crate) fn add(&mut self, value: &Value) -> PolarsResult<()> {
39 use AnyValueBuffer::*;
40 match &mut self.buf {
41 _ if value.is_null() => {
42 self.buf.add(AnyValue::Null);
43 Ok(())
44 },
45 Boolean(buf) => {
46 match value {
47 Value::Static(StaticNode::Bool(v)) => buf.append_value(*v),
48 Value::Static(StaticNode::Null) => buf.append_null(),
49 _ if self.ignore_errors => buf.append_null(),
50 v => {
51 polars_bail!(ComputeError: "cannot parse '{}' ({}) as Boolean", v, v.value_type())
52 },
53 }
54 Ok(())
55 },
56 Int32(buf) => {
57 let v =
58 deserialize_numeric::<Int32Type>(value, value.as_i32(), self.ignore_errors)?;
59 buf.append_option(v);
60 Ok(())
61 },
62 Int64(buf) => {
63 let v =
64 deserialize_numeric::<Int64Type>(value, value.as_i64(), self.ignore_errors)?;
65 buf.append_option(v);
66 Ok(())
67 },
68 UInt64(buf) => {
69 let v =
70 deserialize_numeric::<UInt64Type>(value, value.as_u64(), self.ignore_errors)?;
71 buf.append_option(v);
72 Ok(())
73 },
74 UInt32(buf) => {
75 let v =
76 deserialize_numeric::<UInt32Type>(value, value.as_u32(), self.ignore_errors)?;
77 buf.append_option(v);
78 Ok(())
79 },
80 Float32(buf) => {
81 let v = deserialize_numeric::<Float32Type>(
82 value,
83 value.cast_f64().map(|f| f as f32),
84 self.ignore_errors,
85 )?;
86 buf.append_option(v);
87 Ok(())
88 },
89 Float64(buf) => {
90 let v = deserialize_numeric::<Float64Type>(
91 value,
92 value.cast_f64(),
93 self.ignore_errors,
94 )?;
95 buf.append_option(v);
96 Ok(())
97 },
98 String(buf) => {
99 match value {
100 Value::String(v) => buf.append_value(v),
101 v => buf.append_value(format_pl_smallstr!("{}", ValueDisplay(v))),
103 }
104 Ok(())
105 },
106 #[cfg(feature = "dtype-datetime")]
107 Datetime(buf, tu, _) => {
108 let v =
109 deserialize_datetime::<Int64Type>(value, "Datetime", self.ignore_errors, *tu)?;
110 buf.append_option(v);
111 Ok(())
112 },
113 #[cfg(feature = "dtype-date")]
114 Date(buf) => {
115 let v = deserialize_datetime::<Int32Type>(
116 value,
117 "Date",
118 self.ignore_errors,
119 TimeUnit::Microseconds, )?;
121 buf.append_option(v);
122 Ok(())
123 },
124 All(dtype, buf) => {
125 let av = deserialize_all(value, dtype, self.ignore_errors)?;
126 buf.push(av);
127 Ok(())
128 },
129 Null(builder) => {
130 if !(matches!(value, Value::Static(StaticNode::Null)) || self.ignore_errors) {
131 polars_bail!(ComputeError: "got non-null value for NULL-typed column: {}", value)
132 };
133
134 builder.append_null();
135 Ok(())
136 },
137 _ => panic!("unexpected dtype when deserializing ndjson"),
138 }
139 }
140
141 pub fn add_null(&mut self) {
142 self.buf.add(AnyValue::Null).expect("should not fail");
143 }
144}
145pub(crate) fn init_buffers(
146 schema: &Schema,
147 capacity: usize,
148 ignore_errors: bool,
149) -> PolarsResult<PlIndexMap<BufferKey<'_>, Buffer<'_>>> {
150 schema
151 .iter()
152 .map(|(name, dtype)| {
153 let av_buf = (dtype, capacity).into();
154 let key = KnownKey::from(name.as_str());
155 Ok((
156 BufferKey(key),
157 Buffer {
158 name,
159 buf: av_buf,
160 ignore_errors,
161 },
162 ))
163 })
164 .collect()
165}
166
167fn deserialize_numeric<T: PolarsNumericType>(
168 value: &Value,
169 n: Option<T::Native>,
170 ignore_errors: bool,
171) -> PolarsResult<Option<T::Native>> {
172 match n {
173 Some(v) => Ok(Some(v)),
174 None if ignore_errors => Ok(None),
175 None => Err(
176 polars_err!(ComputeError: "cannot parse '{}' ({}) as {:?}", value, value.value_type(), T::get_static_dtype()),
177 ),
178 }
179}
180
181#[cfg(feature = "dtype-datetime")]
182fn deserialize_datetime<T>(
183 value: &Value,
184 type_name: &str,
185 ignore_errors: bool,
186 tu: TimeUnit,
187) -> PolarsResult<Option<T::Native>>
188where
189 T: PolarsNumericType,
190 DatetimeInfer<T>: TryFromWithUnit<Pattern>,
191{
192 match value {
193 Value::String(val) => {
194 if let Some(pattern) = infer_pattern_single(val) {
195 if let Ok(mut infer) = DatetimeInfer::try_from_with_unit(pattern, Some(tu)) {
196 if let Some(v) = infer.parse(val) {
197 return Ok(Some(v));
198 }
199 }
200 }
201 },
202 Value::Static(StaticNode::Null) => return Ok(None),
203 _ => {},
204 };
205
206 if ignore_errors {
207 return Ok(None);
208 }
209
210 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", value, value.value_type(), type_name)
211}
212
213fn deserialize_all<'a>(
214 json: &Value,
215 dtype: &DataType,
216 ignore_errors: bool,
217) -> PolarsResult<AnyValue<'a>> {
218 if let Value::Static(StaticNode::Null) = json {
219 return Ok(AnyValue::Null);
220 }
221 match dtype {
222 #[cfg(feature = "dtype-datetime")]
223 DataType::Date => {
224 let value = deserialize_datetime::<Int32Type>(
225 json,
226 "Date",
227 ignore_errors,
228 TimeUnit::Microseconds, )?;
230 return Ok(if let Some(value) = value {
231 AnyValue::Date(value)
232 } else {
233 AnyValue::Null
234 });
235 },
236 #[cfg(feature = "dtype-datetime")]
237 DataType::Datetime(tu, tz) => {
238 let value = deserialize_datetime::<Int64Type>(json, "Datetime", ignore_errors, *tu)?;
239 return Ok(if let Some(value) = value {
240 AnyValue::DatetimeOwned(value, *tu, tz.as_ref().map(|s| Arc::from(s.clone())))
241 } else {
242 AnyValue::Null
243 });
244 },
245 #[cfg(feature = "dtype-f16")]
246 dt @ DataType::Float16 => {
247 use num_traits::AsPrimitive;
248 use polars_utils::float16::pf16;
249
250 return match json.cast_f64() {
251 Some(v) => Ok(AnyValue::Float16(AsPrimitive::<pf16>::as_(v))),
252 None if ignore_errors => Ok(AnyValue::Null),
253 None => {
254 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
255 },
256 };
257 },
258 dt @ DataType::Float32 => {
259 return match json.cast_f64() {
260 Some(v) => Ok(AnyValue::Float32(v as f32)),
261 None if ignore_errors => Ok(AnyValue::Null),
262 None => {
263 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
264 },
265 };
266 },
267 dt @ DataType::Float64 => {
268 return match json.cast_f64() {
269 Some(v) => Ok(AnyValue::Float64(v)),
270 None if ignore_errors => Ok(AnyValue::Null),
271 None => {
272 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
273 },
274 };
275 },
276 DataType::String => {
277 return Ok(match json {
278 Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
279 v => AnyValue::StringOwned(format_pl_smallstr!("{}", ValueDisplay(v))),
280 });
281 },
282 dt if dt.is_primitive_numeric() => {
283 return match json.as_i128() {
284 Some(v) => Ok(AnyValue::Int128(v).into_static()),
285 None if ignore_errors => Ok(AnyValue::Null),
286 None => {
287 polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
288 },
289 };
290 },
291 _ => {},
292 }
293
294 let out = match json {
295 Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
296 Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
297 Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
298 Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
299 Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
300 Value::Array(arr) => {
301 let Some(inner_dtype) = dtype.inner_dtype() else {
302 if ignore_errors {
303 return Ok(AnyValue::Null);
304 }
305 polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json);
306 };
307 let vals: Vec<AnyValue> = arr
308 .iter()
309 .map(|val| deserialize_all(val, inner_dtype, ignore_errors))
310 .collect::<PolarsResult<_>>()?;
311 let strict = !ignore_errors;
312 let s =
313 Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &vals, inner_dtype, strict)?;
314 AnyValue::List(s)
315 },
316 #[cfg(feature = "dtype-struct")]
317 Value::Object(doc) => {
318 if let DataType::Struct(fields) = dtype {
319 let document = &**doc;
320
321 let vals = fields
322 .iter()
323 .map(|field| {
324 if let Some(value) = document.get(field.name.as_str()) {
325 deserialize_all(value, &field.dtype, ignore_errors)
326 } else {
327 Ok(AnyValue::Null)
328 }
329 })
330 .collect::<PolarsResult<Vec<_>>>()?;
331 AnyValue::StructOwned(Box::new((vals, fields.clone())))
332 } else {
333 if ignore_errors {
334 return Ok(AnyValue::Null);
335 }
336 polars_bail!(
337 ComputeError: "expected {} in json value, got object", dtype,
338 );
339 }
340 },
341 val => AnyValue::StringOwned(format!("{val:#?}").into()),
342 };
343 Ok(out)
344}
345
346struct ValueDisplay<'a>(&'a Value<'a>);
354
355impl std::fmt::Display for ValueDisplay<'_> {
356 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357 use Value::*;
358
359 match self.0 {
360 Static(s) => write!(f, "{s}"),
361 String(s) => write!(f, r#""{s}""#),
362 Array(a) => {
363 write!(f, "[")?;
364
365 let mut iter = a.iter();
366
367 for v in (&mut iter).take(1) {
368 write!(f, "{}", ValueDisplay(v))?;
369 }
370
371 for v in iter {
372 write!(f, ", {}", ValueDisplay(v))?;
373 }
374
375 write!(f, "]")
376 },
377 Object(o) => {
378 write!(f, "{{")?;
379
380 let mut iter = o.iter();
381
382 for (k, v) in (&mut iter).take(1) {
383 write!(f, r#""{}": {}"#, k, ValueDisplay(v))?;
384 }
385
386 for (k, v) in iter {
387 write!(f, r#", "{}": {}"#, k, ValueDisplay(v))?;
388 }
389
390 write!(f, "}}")
391 },
392 }
393 }
394}