polars_io/json/
mod.rs

1//! # (De)serialize JSON files.
2//!
3//! ## Read JSON to a DataFrame
4//!
5//! ## Example
6//!
7//! ```
8//! use polars_core::prelude::*;
9//! use polars_io::prelude::*;
10//! use std::io::Cursor;
11//! use std::num::NonZeroUsize;
12//!
13//! let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
14//! {"a":-10, "b":-3.5, "c":true, "d":"4"}
15//! {"a":2, "b":0.6, "c":false, "d":"text"}
16//! {"a":1, "b":2.0, "c":false, "d":"4"}
17//! {"a":7, "b":-3.5, "c":true, "d":"4"}
18//! {"a":1, "b":0.6, "c":false, "d":"text"}
19//! {"a":1, "b":2.0, "c":false, "d":"4"}
20//! {"a":5, "b":-3.5, "c":true, "d":"4"}
21//! {"a":1, "b":0.6, "c":false, "d":"text"}
22//! {"a":1, "b":2.0, "c":false, "d":"4"}
23//! {"a":1, "b":-3.5, "c":true, "d":"4"}
24//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
25//! let file = Cursor::new(basic_json);
26//! let df = JsonReader::new(file)
27//! .with_json_format(JsonFormat::JsonLines)
28//! .infer_schema_len(NonZeroUsize::new(3))
29//! .with_batch_size(NonZeroUsize::new(3).unwrap())
30//! .finish()
31//! .unwrap();
32//!
33//! println!("{:?}", df);
34//! ```
35//! >>> Outputs:
36//!
37//! ```text
38//! +-----+--------+-------+--------+
39//! | a   | b      | c     | d      |
40//! | --- | ---    | ---   | ---    |
41//! | i64 | f64    | bool  | str    |
42//! +=====+========+=======+========+
43//! | 1   | 2      | false | "4"    |
44//! +-----+--------+-------+--------+
45//! | -10 | -3.5e0 | true  | "4"    |
46//! +-----+--------+-------+--------+
47//! | 2   | 0.6    | false | "text" |
48//! +-----+--------+-------+--------+
49//! | 1   | 2      | false | "4"    |
50//! +-----+--------+-------+--------+
51//! | 7   | -3.5e0 | true  | "4"    |
52//! +-----+--------+-------+--------+
53//! | 1   | 0.6    | false | "text" |
54//! +-----+--------+-------+--------+
55//! | 1   | 2      | false | "4"    |
56//! +-----+--------+-------+--------+
57//! | 5   | -3.5e0 | true  | "4"    |
58//! +-----+--------+-------+--------+
59//! | 1   | 0.6    | false | "text" |
60//! +-----+--------+-------+--------+
61//! | 1   | 2      | false | "4"    |
62//! +-----+--------+-------+--------+
63//! ```
64//!
65pub(crate) mod infer;
66
67use std::io::Write;
68use std::num::NonZeroUsize;
69use std::ops::Deref;
70
71use arrow::legacy::conversion::chunk_to_struct;
72use polars_core::error::to_compute_err;
73use polars_core::prelude::*;
74use polars_error::{PolarsResult, polars_bail};
75use polars_json::json::write::FallibleStreamingIterator;
76#[cfg(feature = "serde")]
77use serde::{Deserialize, Serialize};
78use simd_json::BorrowedValue;
79
80use crate::mmap::{MmapBytesReader, ReaderBytes};
81use crate::prelude::*;
82
83#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
84#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
85#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
86pub struct JsonWriterOptions {}
87
88/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
89/// or `JsonLines` (each row output on a separate line).
90///
91/// In either case, each row is serialized as a JSON object whose keys are the column names and
92/// whose values are the row's corresponding values.
93pub enum JsonFormat {
94    /// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in
95    /// the DataFrame.
96    ///
97    /// Use this to create valid JSON that can be deserialized back into an array in one fell swoop.
98    Json,
99    /// Each DataFrame row is serialized as a JSON object on a separate line. The number of lines in the output is the
100    /// number of rows in the DataFrame.
101    ///
102    /// The [JSON Lines](https://jsonlines.org) format makes it easy to read records in a streaming fashion, one (line)
103    /// at a time. But the output in its entirety is not valid JSON; only the individual lines are.
104    ///
105    /// It is recommended to use the file extension `.jsonl` when saving as JSON Lines.
106    JsonLines,
107}
108
109/// Writes a DataFrame to JSON.
110///
111/// Under the hood, this uses [`arrow2::io::json`](https://docs.rs/arrow2/latest/arrow2/io/json/write/fn.write.html).
112/// `arrow2` generally serializes types that are not JSON primitives, such as Date and DateTime, as their
113/// `Display`-formatted versions. For instance, a (naive) DateTime column is formatted as the String `"yyyy-mm-dd
114/// HH:MM:SS"`. To control how non-primitive columns are serialized, convert them to String or another primitive type
115/// before serializing.
116#[must_use]
117pub struct JsonWriter<W: Write> {
118    /// File or Stream handler
119    buffer: W,
120    json_format: JsonFormat,
121}
122
123impl<W: Write> JsonWriter<W> {
124    pub fn with_json_format(mut self, format: JsonFormat) -> Self {
125        self.json_format = format;
126        self
127    }
128}
129
130impl<W> SerWriter<W> for JsonWriter<W>
131where
132    W: Write,
133{
134    /// Create a new `JsonWriter` writing to `buffer` with format `JsonFormat::JsonLines`. To specify a different
135    /// format, use e.g., [`JsonWriter::new(buffer).with_json_format(JsonFormat::Json)`](JsonWriter::with_json_format).
136    fn new(buffer: W) -> Self {
137        JsonWriter {
138            buffer,
139            json_format: JsonFormat::JsonLines,
140        }
141    }
142
143    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
144        df.align_chunks_par();
145        let fields = df
146            .iter()
147            .map(|s| {
148                #[cfg(feature = "object")]
149                polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
150                Ok(s.field().to_arrow(CompatLevel::newest()))
151            })
152            .collect::<PolarsResult<Vec<_>>>()?;
153        let batches = df
154            .iter_chunks(CompatLevel::newest(), false)
155            .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
156
157        match self.json_format {
158            JsonFormat::JsonLines => {
159                let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
160                let writer =
161                    polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
162                writer.collect::<PolarsResult<()>>()?;
163            },
164            JsonFormat::Json => {
165                let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
166                polars_json::json::write::write(&mut self.buffer, serializer)?;
167            },
168        }
169
170        Ok(())
171    }
172}
173
174pub struct BatchedWriter<W: Write> {
175    writer: W,
176}
177
178impl<W> BatchedWriter<W>
179where
180    W: Write,
181{
182    pub fn new(writer: W) -> Self {
183        BatchedWriter { writer }
184    }
185    /// Write a batch to the json writer.
186    ///
187    /// # Panics
188    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
189    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
190        let fields = df
191            .iter()
192            .map(|s| {
193                #[cfg(feature = "object")]
194                polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
195                Ok(s.field().to_arrow(CompatLevel::newest()))
196            })
197            .collect::<PolarsResult<Vec<_>>>()?;
198        let chunks = df.iter_chunks(CompatLevel::newest(), false);
199        let batches =
200            chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
201        let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
202        while let Some(block) = serializer.next()? {
203            self.writer.write_all(block)?;
204        }
205        Ok(())
206    }
207}
208
209/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
210#[must_use]
211pub struct JsonReader<'a, R>
212where
213    R: MmapBytesReader,
214{
215    reader: R,
216    rechunk: bool,
217    ignore_errors: bool,
218    infer_schema_len: Option<NonZeroUsize>,
219    batch_size: NonZeroUsize,
220    projection: Option<Vec<PlSmallStr>>,
221    schema: Option<SchemaRef>,
222    schema_overwrite: Option<&'a Schema>,
223    json_format: JsonFormat,
224}
225
226pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
227    if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
228        // UTF-8 BOM
229        Ok(&bytes[3..])
230    } else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
231        // UTF-16 BOM
232        polars_bail!(ComputeError: "utf-16 not supported")
233    } else {
234        Ok(bytes)
235    }
236}
237impl<R> SerReader<R> for JsonReader<'_, R>
238where
239    R: MmapBytesReader,
240{
241    fn new(reader: R) -> Self {
242        JsonReader {
243            reader,
244            rechunk: true,
245            ignore_errors: false,
246            infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
247            batch_size: NonZeroUsize::new(8192).unwrap(),
248            projection: None,
249            schema: None,
250            schema_overwrite: None,
251            json_format: JsonFormat::Json,
252        }
253    }
254
255    fn set_rechunk(mut self, rechunk: bool) -> Self {
256        self.rechunk = rechunk;
257        self
258    }
259
260    /// Take the SerReader and return a parsed DataFrame.
261    ///
262    /// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
263    /// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
264    /// error is returned or whether elements of incompatible dtypes are replaced with `null`.
265    fn finish(mut self) -> PolarsResult<DataFrame> {
266        let pre_rb: ReaderBytes = (&mut self.reader).into();
267        let bytes = remove_bom(pre_rb.deref())?;
268        let rb = ReaderBytes::Borrowed(bytes);
269        let out = match self.json_format {
270            JsonFormat::Json => {
271                polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
272                let mut bytes = rb.deref().to_vec();
273                let owned = &mut vec![];
274                compression::maybe_decompress_bytes(&bytes, owned)?;
275                // the easiest way to avoid ownership issues is by implicitly figuring out if
276                // decompression happened (owned is only populated on decompress), then pick which bytes to parse
277                let json_value = if owned.is_empty() {
278                    simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
279                } else {
280                    simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
281                };
282                if let BorrowedValue::Array(array) = &json_value {
283                    if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
284                        return Ok(DataFrame::empty());
285                    }
286                }
287
288                let allow_extra_fields_in_struct = self.schema.is_some();
289
290                // struct type
291                let dtype = if let Some(mut schema) = self.schema {
292                    if let Some(overwrite) = self.schema_overwrite {
293                        let mut_schema = Arc::make_mut(&mut schema);
294                        overwrite_schema(mut_schema, overwrite)?;
295                    }
296
297                    DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest())
298                } else {
299                    // infer
300                    let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
301                        infer::json_values_to_supertype(
302                            values,
303                            self.infer_schema_len
304                                .unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
305                        )?
306                        .to_arrow(CompatLevel::newest())
307                    } else {
308                        polars_json::json::infer(&json_value)?
309                    };
310
311                    if let Some(overwrite) = self.schema_overwrite {
312                        let ArrowDataType::Struct(fields) = inner_dtype else {
313                            polars_bail!(ComputeError: "can only deserialize json objects")
314                        };
315
316                        let mut schema = Schema::from_iter(fields.iter().map(Into::<Field>::into));
317                        overwrite_schema(&mut schema, overwrite)?;
318
319                        DataType::Struct(
320                            schema
321                                .into_iter()
322                                .map(|(name, dt)| Field::new(name, dt))
323                                .collect(),
324                        )
325                        .to_arrow(CompatLevel::newest())
326                    } else {
327                        inner_dtype
328                    }
329                };
330
331                let dtype = if let BorrowedValue::Array(_) = &json_value {
332                    ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
333                        PlSmallStr::from_static("item"),
334                        dtype,
335                        true,
336                    )))
337                } else {
338                    dtype
339                };
340
341                let arr = polars_json::json::deserialize(
342                    &json_value,
343                    dtype,
344                    allow_extra_fields_in_struct,
345                )?;
346                let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
347                    || polars_err!(ComputeError: "can only deserialize json objects"),
348                )?;
349                DataFrame::try_from(arr.clone())
350            },
351            JsonFormat::JsonLines => {
352                let mut json_reader = CoreJsonReader::new(
353                    rb,
354                    None,
355                    self.schema,
356                    self.schema_overwrite,
357                    None,
358                    1024, // sample size
359                    NonZeroUsize::new(1 << 18).unwrap(),
360                    false,
361                    self.infer_schema_len,
362                    self.ignore_errors,
363                    None,
364                    None,
365                    None,
366                )?;
367                let mut df: DataFrame = json_reader.as_df()?;
368                if self.rechunk {
369                    df.as_single_chunk_par();
370                }
371                Ok(df)
372            },
373        }?;
374
375        // TODO! Ensure we don't materialize the columns we don't need
376        if let Some(proj) = self.projection.as_deref() {
377            out.select(proj.iter().cloned())
378        } else {
379            Ok(out)
380        }
381    }
382}
383
384impl<'a, R> JsonReader<'a, R>
385where
386    R: MmapBytesReader,
387{
388    /// Set the JSON file's schema
389    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
390        self.schema = Some(schema);
391        self
392    }
393
394    /// Overwrite parts of the inferred schema.
395    pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
396        self.schema_overwrite = Some(schema);
397        self
398    }
399
400    /// Set the JSON reader to infer the schema of the file. Currently, this is only used when reading from
401    /// [`JsonFormat::JsonLines`], as [`JsonFormat::Json`] reads in the entire array anyway.
402    ///
403    /// When using [`JsonFormat::JsonLines`], `max_records = None` will read the entire buffer in order to infer the
404    /// schema, `Some(1)` would look only at the first record, `Some(2)` the first two records, etc.
405    ///
406    /// It is an error to pass `max_records = Some(0)`, as a schema cannot be inferred from 0 records when deserializing
407    /// from JSON (unlike CSVs, there is no header row to inspect for column names).
408    pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
409        self.infer_schema_len = max_records;
410        self
411    }
412
413    /// Set the batch size (number of records to load at one time)
414    ///
415    /// This heavily influences loading time.
416    pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
417        self.batch_size = batch_size;
418        self
419    }
420
421    /// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
422    /// columns are kept.
423    ///
424    /// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
425    /// then dropping the ones you don't want.
426    pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
427        self.projection = projection;
428        self
429    }
430
431    pub fn with_json_format(mut self, format: JsonFormat) -> Self {
432        self.json_format = format;
433        self
434    }
435
436    /// Return a `null` if an error occurs during parsing.
437    pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
438        self.ignore_errors = ignore;
439        self
440    }
441}