1use std::hash::{Hash, Hasher};
2
3use arrow::types::NativeType;
4use num_traits::NumCast;
5use polars_core::frame::row::AnyValueBuffer;
6use polars_core::prelude::*;
7#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
8use polars_time::prelude::string::Pattern;
9#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
10use polars_time::prelude::string::infer::{DatetimeInfer, TryFromWithUnit, infer_pattern_single};
11use polars_utils::format_pl_smallstr;
12use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};
13
14#[derive(Debug, Clone, PartialEq)]
15pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
16impl Eq for BufferKey<'_> {}
17
18impl Hash for BufferKey<'_> {
19 fn hash<H: Hasher>(&self, state: &mut H) {
20 self.0.key().hash(state)
21 }
22}
23
24pub(crate) struct Buffer<'a> {
25 name: &'a str,
26 ignore_errors: bool,
27 buf: AnyValueBuffer<'a>,
28}
29
30impl Buffer<'_> {
31 pub fn into_series(self) -> PolarsResult<Series> {
32 let mut buf = self.buf;
33 let mut s = buf.reset(0, !self.ignore_errors)?;
34 s.rename(PlSmallStr::from_str(self.name));
35 Ok(s)
36 }
37
38 #[inline]
39 pub(crate) fn add(&mut self, value: &Value) -> PolarsResult<()> {
40 use AnyValueBuffer::*;
41 match &mut self.buf {
42 Boolean(buf) => {
43 match value {
44 Value::Static(StaticNode::Bool(b)) => buf.append_value(*b),
45 Value::Static(StaticNode::Null) => buf.append_null(),
46 _ if self.ignore_errors => buf.append_null(),
47 v => polars_bail!(ComputeError: "cannot parse '{}' as Boolean", v),
48 }
49 Ok(())
50 },
51 Int32(buf) => {
52 let n = deserialize_number::<i32>(value, "Int32", self.ignore_errors)?;
53 match n {
54 Some(v) => buf.append_value(v),
55 None => buf.append_null(),
56 }
57 Ok(())
58 },
59 Int64(buf) => {
60 let n = deserialize_number::<i64>(value, "Int64", self.ignore_errors)?;
61 match n {
62 Some(v) => buf.append_value(v),
63 None => buf.append_null(),
64 }
65 Ok(())
66 },
67 UInt64(buf) => {
68 let n = deserialize_number::<u64>(value, "UInt64", self.ignore_errors)?;
69 match n {
70 Some(v) => buf.append_value(v),
71 None => buf.append_null(),
72 }
73 Ok(())
74 },
75 UInt32(buf) => {
76 let n = deserialize_number::<u32>(value, "UInt32", self.ignore_errors)?;
77 match n {
78 Some(v) => buf.append_value(v),
79 None => buf.append_null(),
80 }
81 Ok(())
82 },
83 Float32(buf) => {
84 let n = deserialize_number::<f32>(value, "Float32", self.ignore_errors)?;
85 match n {
86 Some(v) => buf.append_value(v),
87 None => buf.append_null(),
88 }
89 Ok(())
90 },
91 Float64(buf) => {
92 let n = deserialize_number::<f64>(value, "Float64", self.ignore_errors)?;
93 match n {
94 Some(v) => buf.append_value(v),
95 None => buf.append_null(),
96 }
97 Ok(())
98 },
99
100 String(buf) => {
101 match value {
102 Value::String(v) => buf.append_value(v),
103 Value::Static(StaticNode::Null) => buf.append_null(),
104 v => buf.append_value(format_pl_smallstr!("{}", ValueDisplay(v))),
106 }
107 Ok(())
108 },
109 #[cfg(feature = "dtype-datetime")]
110 Datetime(buf, _, _) => {
111 let v = deserialize_datetime::<Int64Type>(value, "Datetime", self.ignore_errors)?;
112 buf.append_option(v);
113 Ok(())
114 },
115 #[cfg(feature = "dtype-date")]
116 Date(buf) => {
117 let v = deserialize_datetime::<Int32Type>(value, "Date", self.ignore_errors)?;
118 buf.append_option(v);
119 Ok(())
120 },
121 All(dtype, buf) => {
122 let av = deserialize_all(value, dtype, self.ignore_errors)?;
123 buf.push(av);
124 Ok(())
125 },
126 Null(builder) => {
127 if !(matches!(value, Value::Static(StaticNode::Null)) || self.ignore_errors) {
128 polars_bail!(ComputeError: "got non-null value for NULL-typed column: {}", value)
129 };
130
131 builder.append_null();
132 Ok(())
133 },
134 _ => panic!("unexpected dtype when deserializing ndjson"),
135 }
136 }
137
138 pub fn add_null(&mut self) {
139 self.buf.add(AnyValue::Null).expect("should not fail");
140 }
141}
142pub(crate) fn init_buffers(
143 schema: &Schema,
144 capacity: usize,
145 ignore_errors: bool,
146) -> PolarsResult<PlIndexMap<BufferKey, Buffer>> {
147 schema
148 .iter()
149 .map(|(name, dtype)| {
150 let av_buf = (dtype, capacity).into();
151 let key = KnownKey::from(name.as_str());
152 Ok((
153 BufferKey(key),
154 Buffer {
155 name,
156 buf: av_buf,
157 ignore_errors,
158 },
159 ))
160 })
161 .collect()
162}
163
164fn deserialize_number<T: NativeType + NumCast>(
165 value: &Value,
166 type_name: &str,
167 ignore_errors: bool,
168) -> PolarsResult<Option<T>> {
169 let to_result = |x: Option<T>| {
170 let out = if ignore_errors {
171 x
172 } else {
173 Some(x.ok_or_else(
174 || polars_err!(ComputeError: "cannot parse '{}' as {}", value, type_name),
175 )?)
176 };
177
178 Ok(out)
179 };
180
181 match value {
182 Value::Static(StaticNode::F64(f)) => to_result(num_traits::cast(*f)),
183 Value::Static(StaticNode::I64(i)) => to_result(num_traits::cast(*i)),
184 Value::Static(StaticNode::U64(u)) => to_result(num_traits::cast(*u)),
185 Value::Static(StaticNode::Bool(b)) => to_result(num_traits::cast(*b as i32)),
186 Value::Static(StaticNode::Null) => Ok(None),
187 _ => to_result(None),
188 }
189}
190
191#[cfg(feature = "dtype-datetime")]
192fn deserialize_datetime<T>(
193 value: &Value,
194 type_name: &str,
195 ignore_errors: bool,
196) -> PolarsResult<Option<T::Native>>
197where
198 T: PolarsNumericType,
199 DatetimeInfer<T>: TryFromWithUnit<Pattern>,
200{
201 match value {
202 Value::String(val) => {
203 if let Some(pattern) = infer_pattern_single(val) {
204 if let Ok(mut infer) =
205 DatetimeInfer::try_from_with_unit(pattern, Some(TimeUnit::Microseconds))
206 {
207 if let Some(v) = infer.parse(val) {
208 return Ok(Some(v));
209 }
210 }
211 }
212 },
213 Value::Static(StaticNode::Null) => return Ok(None),
214 _ => {},
215 };
216
217 if ignore_errors {
218 return Ok(None);
219 }
220
221 polars_bail!(ComputeError: "cannot parse '{}' as {}", value, type_name)
222}
223
224fn deserialize_all<'a>(
225 json: &Value,
226 dtype: &DataType,
227 ignore_errors: bool,
228) -> PolarsResult<AnyValue<'a>> {
229 if let DataType::String = dtype {
230 return Ok(match json {
231 Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
232 Value::Static(StaticNode::Null) => AnyValue::Null,
233 v => AnyValue::StringOwned(format_pl_smallstr!("{}", ValueDisplay(v))),
234 });
235 }
236
237 let out = match json {
238 Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
239 Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
240 Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
241 Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
242 Value::Static(StaticNode::Null) => AnyValue::Null,
243 Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
244 Value::Array(arr) => {
245 let Some(inner_dtype) = dtype.inner_dtype() else {
246 if ignore_errors {
247 return Ok(AnyValue::Null);
248 }
249 polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json);
250 };
251 let vals: Vec<AnyValue> = arr
252 .iter()
253 .map(|val| deserialize_all(val, inner_dtype, ignore_errors))
254 .collect::<PolarsResult<_>>()?;
255 let strict = !ignore_errors;
256 let s =
257 Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &vals, inner_dtype, strict)?;
258 AnyValue::List(s)
259 },
260 #[cfg(feature = "dtype-struct")]
261 Value::Object(doc) => {
262 if let DataType::Struct(fields) = dtype {
263 let document = &**doc;
264
265 let vals = fields
266 .iter()
267 .map(|field| {
268 if let Some(value) = document.get(field.name.as_str()) {
269 deserialize_all(value, &field.dtype, ignore_errors)
270 } else {
271 Ok(AnyValue::Null)
272 }
273 })
274 .collect::<PolarsResult<Vec<_>>>()?;
275 AnyValue::StructOwned(Box::new((vals, fields.clone())))
276 } else {
277 if ignore_errors {
278 return Ok(AnyValue::Null);
279 }
280 polars_bail!(
281 ComputeError: "expected {} in json value, got object", dtype,
282 );
283 }
284 },
285 #[cfg(not(feature = "dtype-struct"))]
286 val => AnyValue::StringOwned(format!("{:#?}", val).into()),
287 };
288 Ok(out)
289}
290
291struct ValueDisplay<'a>(&'a Value<'a>);
299
300impl std::fmt::Display for ValueDisplay<'_> {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 use Value::*;
303
304 match self.0 {
305 Static(s) => write!(f, "{s}"),
306 String(s) => write!(f, r#""{s}""#),
307 Array(a) => {
308 write!(f, "[")?;
309
310 let mut iter = a.iter();
311
312 for v in (&mut iter).take(1) {
313 write!(f, "{}", ValueDisplay(v))?;
314 }
315
316 for v in iter {
317 write!(f, ", {}", ValueDisplay(v))?;
318 }
319
320 write!(f, "]")
321 },
322 Object(o) => {
323 write!(f, "{{")?;
324
325 let mut iter = o.iter();
326
327 for (k, v) in (&mut iter).take(1) {
328 write!(f, r#""{}": {}"#, k, ValueDisplay(v))?;
329 }
330
331 for (k, v) in iter {
332 write!(f, r#", "{}": {}"#, k, ValueDisplay(v))?;
333 }
334
335 write!(f, "}}")
336 },
337 }
338 }
339}