polars_io/json/
mod.rs

1//! # (De)serialize JSON files.
2//!
3//! ## Read JSON to a DataFrame
4//!
5//! ## Example
6//!
7//! ```
8//! use polars_core::prelude::*;
9//! use polars_io::prelude::*;
10//! use std::io::Cursor;
11//! use std::num::NonZeroUsize;
12//!
13//! let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
14//! {"a":-10, "b":-3.5, "c":true, "d":"4"}
15//! {"a":2, "b":0.6, "c":false, "d":"text"}
16//! {"a":1, "b":2.0, "c":false, "d":"4"}
17//! {"a":7, "b":-3.5, "c":true, "d":"4"}
18//! {"a":1, "b":0.6, "c":false, "d":"text"}
19//! {"a":1, "b":2.0, "c":false, "d":"4"}
20//! {"a":5, "b":-3.5, "c":true, "d":"4"}
21//! {"a":1, "b":0.6, "c":false, "d":"text"}
22//! {"a":1, "b":2.0, "c":false, "d":"4"}
23//! {"a":1, "b":-3.5, "c":true, "d":"4"}
24//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
25//! let file = Cursor::new(basic_json);
26//! let df = JsonReader::new(file)
27//! .with_json_format(JsonFormat::JsonLines)
28//! .infer_schema_len(NonZeroUsize::new(3))
29//! .with_batch_size(NonZeroUsize::new(3).unwrap())
30//! .finish()
31//! .unwrap();
32//!
33//! println!("{:?}", df);
34//! ```
35//! >>> Outputs:
36//!
37//! ```text
38//! +-----+--------+-------+--------+
39//! | a   | b      | c     | d      |
40//! | --- | ---    | ---   | ---    |
41//! | i64 | f64    | bool  | str    |
42//! +=====+========+=======+========+
43//! | 1   | 2      | false | "4"    |
44//! +-----+--------+-------+--------+
45//! | -10 | -3.5e0 | true  | "4"    |
46//! +-----+--------+-------+--------+
47//! | 2   | 0.6    | false | "text" |
48//! +-----+--------+-------+--------+
49//! | 1   | 2      | false | "4"    |
50//! +-----+--------+-------+--------+
51//! | 7   | -3.5e0 | true  | "4"    |
52//! +-----+--------+-------+--------+
53//! | 1   | 0.6    | false | "text" |
54//! +-----+--------+-------+--------+
55//! | 1   | 2      | false | "4"    |
56//! +-----+--------+-------+--------+
57//! | 5   | -3.5e0 | true  | "4"    |
58//! +-----+--------+-------+--------+
59//! | 1   | 0.6    | false | "text" |
60//! +-----+--------+-------+--------+
61//! | 1   | 2      | false | "4"    |
62//! +-----+--------+-------+--------+
63//! ```
64//!
65pub(crate) mod infer;
66
67use std::io::Write;
68use std::num::NonZeroUsize;
69use std::ops::Deref;
70
71use arrow::array::LIST_VALUES_NAME;
72use arrow::legacy::conversion::chunk_to_struct;
73use polars_core::chunked_array::cast::CastOptions;
74use polars_core::error::to_compute_err;
75use polars_core::prelude::*;
76use polars_error::{PolarsResult, polars_bail};
77use polars_json::json::write::FallibleStreamingIterator;
78use simd_json::BorrowedValue;
79
80use crate::mmap::{MmapBytesReader, ReaderBytes};
81use crate::prelude::*;
82
83/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
84/// or `JsonLines` (each row output on a separate line).
85///
86/// In either case, each row is serialized as a JSON object whose keys are the column names and
87/// whose values are the row's corresponding values.
88pub enum JsonFormat {
89    /// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in
90    /// the DataFrame.
91    ///
92    /// Use this to create valid JSON that can be deserialized back into an array in one fell swoop.
93    Json,
94    /// Each DataFrame row is serialized as a JSON object on a separate line. The number of lines in the output is the
95    /// number of rows in the DataFrame.
96    ///
97    /// The [JSON Lines](https://jsonlines.org) format makes it easy to read records in a streaming fashion, one (line)
98    /// at a time. But the output in its entirety is not valid JSON; only the individual lines are.
99    ///
100    /// It is recommended to use the file extension `.jsonl` when saving as JSON Lines.
101    JsonLines,
102}
103
104/// Writes a DataFrame to JSON.
105///
106/// Under the hood, this uses [`arrow2::io::json`](https://docs.rs/arrow2/latest/arrow2/io/json/write/fn.write.html).
107/// `arrow2` generally serializes types that are not JSON primitives, such as Date and DateTime, as their
108/// `Display`-formatted versions. For instance, a (naive) DateTime column is formatted as the String `"yyyy-mm-dd
109/// HH:MM:SS"`. To control how non-primitive columns are serialized, convert them to String or another primitive type
110/// before serializing.
111#[must_use]
112pub struct JsonWriter<W: Write> {
113    /// File or Stream handler
114    buffer: W,
115    json_format: JsonFormat,
116}
117
118impl<W: Write> JsonWriter<W> {
119    pub fn with_json_format(mut self, format: JsonFormat) -> Self {
120        self.json_format = format;
121        self
122    }
123}
124
125impl<W> SerWriter<W> for JsonWriter<W>
126where
127    W: Write,
128{
129    /// Create a new `JsonWriter` writing to `buffer` with format `JsonFormat::JsonLines`. To specify a different
130    /// format, use e.g., [`JsonWriter::new(buffer).with_json_format(JsonFormat::Json)`](JsonWriter::with_json_format).
131    fn new(buffer: W) -> Self {
132        JsonWriter {
133            buffer,
134            json_format: JsonFormat::JsonLines,
135        }
136    }
137
138    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
139        df.align_chunks_par();
140        let fields = df.columns()
141            .iter()
142            .map(|s| {
143                #[cfg(feature = "object")]
144                polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
145                Ok(s.field().to_arrow(CompatLevel::newest()))
146            })
147            .collect::<PolarsResult<Vec<_>>>()?;
148        let batches = df
149            .iter_chunks(CompatLevel::newest(), false)
150            .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
151
152        match self.json_format {
153            JsonFormat::JsonLines => {
154                let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
155                let writer =
156                    polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
157                writer.collect::<PolarsResult<()>>()?;
158            },
159            JsonFormat::Json => {
160                let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
161                polars_json::json::write::write(&mut self.buffer, serializer)?;
162            },
163        }
164
165        Ok(())
166    }
167}
168
169pub struct BatchedWriter<W: Write> {
170    writer: W,
171}
172
173impl<W> BatchedWriter<W>
174where
175    W: Write,
176{
177    pub fn new(writer: W) -> Self {
178        BatchedWriter { writer }
179    }
180    /// Write a batch to the json writer.
181    ///
182    /// # Panics
183    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
184    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
185        let fields = df.columns()
186            .iter()
187            .map(|s| {
188                #[cfg(feature = "object")]
189                polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
190                Ok(s.field().to_arrow(CompatLevel::newest()))
191            })
192            .collect::<PolarsResult<Vec<_>>>()?;
193        let chunks = df.iter_chunks(CompatLevel::newest(), false);
194        let batches =
195            chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
196        let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
197        while let Some(block) = serializer.next()? {
198            self.writer.write_all(block)?;
199        }
200        Ok(())
201    }
202}
203
204/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
205#[must_use]
206pub struct JsonReader<'a, R>
207where
208    R: MmapBytesReader,
209{
210    reader: R,
211    rechunk: bool,
212    ignore_errors: bool,
213    infer_schema_len: Option<NonZeroUsize>,
214    batch_size: NonZeroUsize,
215    projection: Option<Vec<PlSmallStr>>,
216    schema: Option<SchemaRef>,
217    schema_overwrite: Option<&'a Schema>,
218    json_format: JsonFormat,
219}
220
221pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
222    if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
223        // UTF-8 BOM
224        Ok(&bytes[3..])
225    } else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
226        // UTF-16 BOM
227        polars_bail!(ComputeError: "utf-16 not supported")
228    } else {
229        Ok(bytes)
230    }
231}
232impl<R> SerReader<R> for JsonReader<'_, R>
233where
234    R: MmapBytesReader,
235{
236    fn new(reader: R) -> Self {
237        JsonReader {
238            reader,
239            rechunk: true,
240            ignore_errors: false,
241            infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
242            batch_size: NonZeroUsize::new(8192).unwrap(),
243            projection: None,
244            schema: None,
245            schema_overwrite: None,
246            json_format: JsonFormat::Json,
247        }
248    }
249
250    fn set_rechunk(mut self, rechunk: bool) -> Self {
251        self.rechunk = rechunk;
252        self
253    }
254
255    /// Take the SerReader and return a parsed DataFrame.
256    ///
257    /// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
258    /// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
259    /// error is returned or whether elements of incompatible dtypes are replaced with `null`.
260    fn finish(mut self) -> PolarsResult<DataFrame> {
261        let pre_rb: ReaderBytes = (&mut self.reader).into();
262        let bytes = remove_bom(pre_rb.deref())?;
263        let rb = ReaderBytes::Borrowed(bytes);
264        let out = match self.json_format {
265            JsonFormat::Json => {
266                polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
267                let mut bytes = rb.deref().to_vec();
268                let owned = &mut vec![];
269                #[expect(deprecated)] // JSON is not a row-format
270                compression::maybe_decompress_bytes(&bytes, owned)?;
271                // the easiest way to avoid ownership issues is by implicitly figuring out if
272                // decompression happened (owned is only populated on decompress), then pick which bytes to parse
273                let json_value = if owned.is_empty() {
274                    simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
275                } else {
276                    simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
277                };
278                if let BorrowedValue::Array(array) = &json_value {
279                    if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
280                        return Ok(DataFrame::empty());
281                    }
282                }
283
284                let allow_extra_fields_in_struct = self.schema.is_some();
285
286                let mut schema = if let Some(schema) = self.schema {
287                    Arc::unwrap_or_clone(schema)
288                } else {
289                    // Infer.
290                    let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
291                        infer::json_values_to_supertype(
292                            values,
293                            self.infer_schema_len
294                                .unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
295                        )?
296                    } else {
297                        DataType::from_arrow_dtype(&polars_json::json::infer(&json_value)?)
298                    };
299
300                    let DataType::Struct(fields) = inner_dtype else {
301                        polars_bail!(ComputeError: "can only deserialize json objects")
302                    };
303
304                    Schema::from_iter(fields)
305                };
306
307                if let Some(overwrite) = self.schema_overwrite {
308                    overwrite_schema(&mut schema, overwrite)?;
309                }
310
311                let mut needs_cast = false;
312                let deserialize_schema = schema
313                    .iter()
314                    .map(|(name, dt)| {
315                        Field::new(
316                            name.clone(),
317                            dt.clone().map_leaves(&mut |leaf_dt| {
318                                // Deserialize enums and categoricals as strings first.
319                                match leaf_dt {
320                                    #[cfg(feature = "dtype-categorical")]
321                                    DataType::Enum(..) | DataType::Categorical(..) => {
322                                        needs_cast = true;
323                                        DataType::String
324                                    },
325                                    leaf_dt => leaf_dt,
326                                }
327                            }),
328                        )
329                    })
330                    .collect();
331
332                let arrow_dtype =
333                    DataType::Struct(deserialize_schema).to_arrow(CompatLevel::newest());
334
335                let arrow_dtype = if let BorrowedValue::Array(_) = &json_value {
336                    ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
337                        LIST_VALUES_NAME,
338                        arrow_dtype,
339                        true,
340                    )))
341                } else {
342                    arrow_dtype
343                };
344
345                let arr = polars_json::json::deserialize(
346                    &json_value,
347                    arrow_dtype,
348                    allow_extra_fields_in_struct,
349                )?;
350
351                let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
352                    || polars_err!(ComputeError: "can only deserialize json objects"),
353                )?;
354
355                let mut df = DataFrame::try_from(arr.clone())?;
356
357                if df.width() == 0 && df.height() <= 1 {
358                    // read_json("{}")
359                    unsafe { df.set_height(0) };
360                }
361
362                if needs_cast {
363                    for (col, dt) in unsafe { df.columns_mut() }
364                        .iter_mut()
365                        .zip(schema.iter_values())
366                    {
367                        *col = col.cast_with_options(
368                            dt,
369                            if self.ignore_errors {
370                                CastOptions::NonStrict
371                            } else {
372                                CastOptions::Strict
373                            },
374                        )?;
375                    }
376                }
377
378                df
379            },
380            JsonFormat::JsonLines => {
381                let mut json_reader = CoreJsonReader::new(
382                    rb,
383                    None,
384                    self.schema,
385                    self.schema_overwrite,
386                    None,
387                    1024, // sample size
388                    NonZeroUsize::new(1 << 18).unwrap(),
389                    false,
390                    self.infer_schema_len,
391                    self.ignore_errors,
392                    None,
393                    None,
394                    None,
395                )?;
396                let mut df: DataFrame = json_reader.as_df()?;
397                if self.rechunk {
398                    df.rechunk_mut_par();
399                }
400
401                df
402            },
403        };
404
405        // TODO! Ensure we don't materialize the columns we don't need
406        if let Some(proj) = self.projection.as_deref() {
407            out.select(proj.iter().cloned())
408        } else {
409            Ok(out)
410        }
411    }
412}
413
414impl<'a, R> JsonReader<'a, R>
415where
416    R: MmapBytesReader,
417{
418    /// Set the JSON file's schema
419    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
420        self.schema = Some(schema);
421        self
422    }
423
424    /// Overwrite parts of the inferred schema.
425    pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
426        self.schema_overwrite = Some(schema);
427        self
428    }
429
430    /// Set the JSON reader to infer the schema of the file. Currently, this is only used when reading from
431    /// [`JsonFormat::JsonLines`], as [`JsonFormat::Json`] reads in the entire array anyway.
432    ///
433    /// When using [`JsonFormat::JsonLines`], `max_records = None` will read the entire buffer in order to infer the
434    /// schema, `Some(1)` would look only at the first record, `Some(2)` the first two records, etc.
435    ///
436    /// It is an error to pass `max_records = Some(0)`, as a schema cannot be inferred from 0 records when deserializing
437    /// from JSON (unlike CSVs, there is no header row to inspect for column names).
438    pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
439        self.infer_schema_len = max_records;
440        self
441    }
442
443    /// Set the batch size (number of records to load at one time)
444    ///
445    /// This heavily influences loading time.
446    pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
447        self.batch_size = batch_size;
448        self
449    }
450
451    /// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
452    /// columns are kept.
453    ///
454    /// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
455    /// then dropping the ones you don't want.
456    pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
457        self.projection = projection;
458        self
459    }
460
461    pub fn with_json_format(mut self, format: JsonFormat) -> Self {
462        self.json_format = format;
463        self
464    }
465
466    /// Return a `null` if an error occurs during parsing.
467    pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
468        self.ignore_errors = ignore;
469        self
470    }
471}