polars_io/json/
mod.rs

1//! # (De)serialize JSON files.
2//!
3//! ## Read JSON to a DataFrame
4//!
5//! ## Example
6//!
7//! ```
8//! use polars_core::prelude::*;
9//! use polars_io::prelude::*;
10//! use std::io::Cursor;
11//! use std::num::NonZeroUsize;
12//!
13//! let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
14//! {"a":-10, "b":-3.5, "c":true, "d":"4"}
15//! {"a":2, "b":0.6, "c":false, "d":"text"}
16//! {"a":1, "b":2.0, "c":false, "d":"4"}
17//! {"a":7, "b":-3.5, "c":true, "d":"4"}
18//! {"a":1, "b":0.6, "c":false, "d":"text"}
19//! {"a":1, "b":2.0, "c":false, "d":"4"}
20//! {"a":5, "b":-3.5, "c":true, "d":"4"}
21//! {"a":1, "b":0.6, "c":false, "d":"text"}
22//! {"a":1, "b":2.0, "c":false, "d":"4"}
23//! {"a":1, "b":-3.5, "c":true, "d":"4"}
24//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
25//! let file = Cursor::new(basic_json);
26//! let df = JsonReader::new(file)
27//! .with_json_format(JsonFormat::JsonLines)
28//! .infer_schema_len(NonZeroUsize::new(3))
29//! .with_batch_size(NonZeroUsize::new(3).unwrap())
30//! .finish()
31//! .unwrap();
32//!
33//! println!("{:?}", df);
34//! ```
35//! >>> Outputs:
36//!
37//! ```text
38//! +-----+--------+-------+--------+
39//! | a   | b      | c     | d      |
40//! | --- | ---    | ---   | ---    |
41//! | i64 | f64    | bool  | str    |
42//! +=====+========+=======+========+
43//! | 1   | 2      | false | "4"    |
44//! +-----+--------+-------+--------+
45//! | -10 | -3.5e0 | true  | "4"    |
46//! +-----+--------+-------+--------+
47//! | 2   | 0.6    | false | "text" |
48//! +-----+--------+-------+--------+
49//! | 1   | 2      | false | "4"    |
50//! +-----+--------+-------+--------+
51//! | 7   | -3.5e0 | true  | "4"    |
52//! +-----+--------+-------+--------+
53//! | 1   | 0.6    | false | "text" |
54//! +-----+--------+-------+--------+
55//! | 1   | 2      | false | "4"    |
56//! +-----+--------+-------+--------+
57//! | 5   | -3.5e0 | true  | "4"    |
58//! +-----+--------+-------+--------+
59//! | 1   | 0.6    | false | "text" |
60//! +-----+--------+-------+--------+
61//! | 1   | 2      | false | "4"    |
62//! +-----+--------+-------+--------+
63//! ```
64//!
65pub(crate) mod infer;
66
67use std::io::Write;
68use std::num::NonZeroUsize;
69use std::ops::Deref;
70
71use arrow::array::LIST_VALUES_NAME;
72use arrow::legacy::conversion::chunk_to_struct;
73use polars_core::chunked_array::cast::CastOptions;
74use polars_core::error::to_compute_err;
75use polars_core::prelude::*;
76use polars_error::{PolarsResult, polars_bail};
77use polars_json::json::write::FallibleStreamingIterator;
78#[cfg(feature = "serde")]
79use serde::{Deserialize, Serialize};
80use simd_json::BorrowedValue;
81
82use crate::mmap::{MmapBytesReader, ReaderBytes};
83use crate::prelude::*;
84
85#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
86#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
87#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
88pub struct JsonWriterOptions {}
89
90/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
91/// or `JsonLines` (each row output on a separate line).
92///
93/// In either case, each row is serialized as a JSON object whose keys are the column names and
94/// whose values are the row's corresponding values.
95pub enum JsonFormat {
96    /// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in
97    /// the DataFrame.
98    ///
99    /// Use this to create valid JSON that can be deserialized back into an array in one fell swoop.
100    Json,
101    /// Each DataFrame row is serialized as a JSON object on a separate line. The number of lines in the output is the
102    /// number of rows in the DataFrame.
103    ///
104    /// The [JSON Lines](https://jsonlines.org) format makes it easy to read records in a streaming fashion, one (line)
105    /// at a time. But the output in its entirety is not valid JSON; only the individual lines are.
106    ///
107    /// It is recommended to use the file extension `.jsonl` when saving as JSON Lines.
108    JsonLines,
109}
110
111/// Writes a DataFrame to JSON.
112///
113/// Under the hood, this uses [`arrow2::io::json`](https://docs.rs/arrow2/latest/arrow2/io/json/write/fn.write.html).
114/// `arrow2` generally serializes types that are not JSON primitives, such as Date and DateTime, as their
115/// `Display`-formatted versions. For instance, a (naive) DateTime column is formatted as the String `"yyyy-mm-dd
116/// HH:MM:SS"`. To control how non-primitive columns are serialized, convert them to String or another primitive type
117/// before serializing.
118#[must_use]
119pub struct JsonWriter<W: Write> {
120    /// File or Stream handler
121    buffer: W,
122    json_format: JsonFormat,
123}
124
125impl<W: Write> JsonWriter<W> {
126    pub fn with_json_format(mut self, format: JsonFormat) -> Self {
127        self.json_format = format;
128        self
129    }
130}
131
132impl<W> SerWriter<W> for JsonWriter<W>
133where
134    W: Write,
135{
136    /// Create a new `JsonWriter` writing to `buffer` with format `JsonFormat::JsonLines`. To specify a different
137    /// format, use e.g., [`JsonWriter::new(buffer).with_json_format(JsonFormat::Json)`](JsonWriter::with_json_format).
138    fn new(buffer: W) -> Self {
139        JsonWriter {
140            buffer,
141            json_format: JsonFormat::JsonLines,
142        }
143    }
144
145    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
146        df.align_chunks_par();
147        let fields = df
148            .iter()
149            .map(|s| {
150                #[cfg(feature = "object")]
151                polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
152                Ok(s.field().to_arrow(CompatLevel::newest()))
153            })
154            .collect::<PolarsResult<Vec<_>>>()?;
155        let batches = df
156            .iter_chunks(CompatLevel::newest(), false)
157            .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
158
159        match self.json_format {
160            JsonFormat::JsonLines => {
161                let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
162                let writer =
163                    polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
164                writer.collect::<PolarsResult<()>>()?;
165            },
166            JsonFormat::Json => {
167                let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
168                polars_json::json::write::write(&mut self.buffer, serializer)?;
169            },
170        }
171
172        Ok(())
173    }
174}
175
176pub struct BatchedWriter<W: Write> {
177    writer: W,
178}
179
180impl<W> BatchedWriter<W>
181where
182    W: Write,
183{
184    pub fn new(writer: W) -> Self {
185        BatchedWriter { writer }
186    }
187    /// Write a batch to the json writer.
188    ///
189    /// # Panics
190    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
191    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
192        let fields = df
193            .iter()
194            .map(|s| {
195                #[cfg(feature = "object")]
196                polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
197                Ok(s.field().to_arrow(CompatLevel::newest()))
198            })
199            .collect::<PolarsResult<Vec<_>>>()?;
200        let chunks = df.iter_chunks(CompatLevel::newest(), false);
201        let batches =
202            chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
203        let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
204        while let Some(block) = serializer.next()? {
205            self.writer.write_all(block)?;
206        }
207        Ok(())
208    }
209}
210
211/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
212#[must_use]
213pub struct JsonReader<'a, R>
214where
215    R: MmapBytesReader,
216{
217    reader: R,
218    rechunk: bool,
219    ignore_errors: bool,
220    infer_schema_len: Option<NonZeroUsize>,
221    batch_size: NonZeroUsize,
222    projection: Option<Vec<PlSmallStr>>,
223    schema: Option<SchemaRef>,
224    schema_overwrite: Option<&'a Schema>,
225    json_format: JsonFormat,
226}
227
228pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
229    if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
230        // UTF-8 BOM
231        Ok(&bytes[3..])
232    } else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
233        // UTF-16 BOM
234        polars_bail!(ComputeError: "utf-16 not supported")
235    } else {
236        Ok(bytes)
237    }
238}
239impl<R> SerReader<R> for JsonReader<'_, R>
240where
241    R: MmapBytesReader,
242{
243    fn new(reader: R) -> Self {
244        JsonReader {
245            reader,
246            rechunk: true,
247            ignore_errors: false,
248            infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
249            batch_size: NonZeroUsize::new(8192).unwrap(),
250            projection: None,
251            schema: None,
252            schema_overwrite: None,
253            json_format: JsonFormat::Json,
254        }
255    }
256
257    fn set_rechunk(mut self, rechunk: bool) -> Self {
258        self.rechunk = rechunk;
259        self
260    }
261
262    /// Take the SerReader and return a parsed DataFrame.
263    ///
264    /// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
265    /// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
266    /// error is returned or whether elements of incompatible dtypes are replaced with `null`.
267    fn finish(mut self) -> PolarsResult<DataFrame> {
268        let pre_rb: ReaderBytes = (&mut self.reader).into();
269        let bytes = remove_bom(pre_rb.deref())?;
270        let rb = ReaderBytes::Borrowed(bytes);
271        let out = match self.json_format {
272            JsonFormat::Json => {
273                polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
274                let mut bytes = rb.deref().to_vec();
275                let owned = &mut vec![];
276                compression::maybe_decompress_bytes(&bytes, owned)?;
277                // the easiest way to avoid ownership issues is by implicitly figuring out if
278                // decompression happened (owned is only populated on decompress), then pick which bytes to parse
279                let json_value = if owned.is_empty() {
280                    simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
281                } else {
282                    simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
283                };
284                if let BorrowedValue::Array(array) = &json_value {
285                    if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
286                        return Ok(DataFrame::empty());
287                    }
288                }
289
290                let allow_extra_fields_in_struct = self.schema.is_some();
291
292                let mut schema = if let Some(schema) = self.schema {
293                    Arc::unwrap_or_clone(schema)
294                } else {
295                    // Infer.
296                    let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
297                        infer::json_values_to_supertype(
298                            values,
299                            self.infer_schema_len
300                                .unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
301                        )?
302                    } else {
303                        DataType::from_arrow_dtype(&polars_json::json::infer(&json_value)?)
304                    };
305
306                    let DataType::Struct(fields) = inner_dtype else {
307                        polars_bail!(ComputeError: "can only deserialize json objects")
308                    };
309
310                    Schema::from_iter(fields)
311                };
312
313                if let Some(overwrite) = self.schema_overwrite {
314                    overwrite_schema(&mut schema, overwrite)?;
315                }
316
317                let mut needs_cast = false;
318                let deserialize_schema = schema
319                    .iter()
320                    .map(|(name, dt)| {
321                        Field::new(
322                            name.clone(),
323                            dt.clone().map_leaves(&mut |leaf_dt| {
324                                // Deserialize enums and categoricals as strings first.
325                                match leaf_dt {
326                                    #[cfg(feature = "dtype-categorical")]
327                                    DataType::Enum(..) | DataType::Categorical(..) => {
328                                        needs_cast = true;
329                                        DataType::String
330                                    },
331                                    leaf_dt => leaf_dt,
332                                }
333                            }),
334                        )
335                    })
336                    .collect();
337
338                let arrow_dtype =
339                    DataType::Struct(deserialize_schema).to_arrow(CompatLevel::newest());
340
341                let arrow_dtype = if let BorrowedValue::Array(_) = &json_value {
342                    ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
343                        LIST_VALUES_NAME,
344                        arrow_dtype,
345                        true,
346                    )))
347                } else {
348                    arrow_dtype
349                };
350
351                let arr = polars_json::json::deserialize(
352                    &json_value,
353                    arrow_dtype,
354                    allow_extra_fields_in_struct,
355                )?;
356                let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
357                    || polars_err!(ComputeError: "can only deserialize json objects"),
358                )?;
359
360                let mut df = DataFrame::try_from(arr.clone())?;
361                if needs_cast {
362                    unsafe {
363                        for (col, dt) in df.get_columns_mut().iter_mut().zip(schema.iter_values()) {
364                            *col = col.cast_with_options(
365                                dt,
366                                if self.ignore_errors {
367                                    CastOptions::NonStrict
368                                } else {
369                                    CastOptions::Strict
370                                },
371                            )?;
372                        }
373                        df.clear_schema();
374                    }
375                }
376                PolarsResult::Ok(df)
377            },
378            JsonFormat::JsonLines => {
379                let mut json_reader = CoreJsonReader::new(
380                    rb,
381                    None,
382                    self.schema,
383                    self.schema_overwrite,
384                    None,
385                    1024, // sample size
386                    NonZeroUsize::new(1 << 18).unwrap(),
387                    false,
388                    self.infer_schema_len,
389                    self.ignore_errors,
390                    None,
391                    None,
392                    None,
393                )?;
394                let mut df: DataFrame = json_reader.as_df()?;
395                if self.rechunk {
396                    df.as_single_chunk_par();
397                }
398                Ok(df)
399            },
400        }?;
401
402        // TODO! Ensure we don't materialize the columns we don't need
403        if let Some(proj) = self.projection.as_deref() {
404            out.select(proj.iter().cloned())
405        } else {
406            Ok(out)
407        }
408    }
409}
410
411impl<'a, R> JsonReader<'a, R>
412where
413    R: MmapBytesReader,
414{
415    /// Set the JSON file's schema
416    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
417        self.schema = Some(schema);
418        self
419    }
420
421    /// Overwrite parts of the inferred schema.
422    pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
423        self.schema_overwrite = Some(schema);
424        self
425    }
426
427    /// Set the JSON reader to infer the schema of the file. Currently, this is only used when reading from
428    /// [`JsonFormat::JsonLines`], as [`JsonFormat::Json`] reads in the entire array anyway.
429    ///
430    /// When using [`JsonFormat::JsonLines`], `max_records = None` will read the entire buffer in order to infer the
431    /// schema, `Some(1)` would look only at the first record, `Some(2)` the first two records, etc.
432    ///
433    /// It is an error to pass `max_records = Some(0)`, as a schema cannot be inferred from 0 records when deserializing
434    /// from JSON (unlike CSVs, there is no header row to inspect for column names).
435    pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
436        self.infer_schema_len = max_records;
437        self
438    }
439
440    /// Set the batch size (number of records to load at one time)
441    ///
442    /// This heavily influences loading time.
443    pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
444        self.batch_size = batch_size;
445        self
446    }
447
448    /// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
449    /// columns are kept.
450    ///
451    /// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
452    /// then dropping the ones you don't want.
453    pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
454        self.projection = projection;
455        self
456    }
457
458    pub fn with_json_format(mut self, format: JsonFormat) -> Self {
459        self.json_format = format;
460        self
461    }
462
463    /// Return a `null` if an error occurs during parsing.
464    pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
465        self.ignore_errors = ignore;
466        self
467    }
468}