polars_io/json/mod.rs
1//! # (De)serialize JSON files.
2//!
3//! ## Read JSON to a DataFrame
4//!
5//! ## Example
6//!
7//! ```
8//! use polars_core::prelude::*;
9//! use polars_io::prelude::*;
10//! use std::io::Cursor;
11//! use std::num::NonZeroUsize;
12//!
13//! let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
14//! {"a":-10, "b":-3.5, "c":true, "d":"4"}
15//! {"a":2, "b":0.6, "c":false, "d":"text"}
16//! {"a":1, "b":2.0, "c":false, "d":"4"}
17//! {"a":7, "b":-3.5, "c":true, "d":"4"}
18//! {"a":1, "b":0.6, "c":false, "d":"text"}
19//! {"a":1, "b":2.0, "c":false, "d":"4"}
20//! {"a":5, "b":-3.5, "c":true, "d":"4"}
21//! {"a":1, "b":0.6, "c":false, "d":"text"}
22//! {"a":1, "b":2.0, "c":false, "d":"4"}
23//! {"a":1, "b":-3.5, "c":true, "d":"4"}
24//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
25//! let file = Cursor::new(basic_json);
26//! let df = JsonReader::new(file)
27//! .with_json_format(JsonFormat::JsonLines)
28//! .infer_schema_len(NonZeroUsize::new(3))
29//! .with_batch_size(NonZeroUsize::new(3).unwrap())
30//! .finish()
31//! .unwrap();
32//!
33//! println!("{:?}", df);
34//! ```
35//! >>> Outputs:
36//!
37//! ```text
38//! +-----+--------+-------+--------+
39//! | a | b | c | d |
40//! | --- | --- | --- | --- |
41//! | i64 | f64 | bool | str |
42//! +=====+========+=======+========+
43//! | 1 | 2 | false | "4" |
44//! +-----+--------+-------+--------+
45//! | -10 | -3.5e0 | true | "4" |
46//! +-----+--------+-------+--------+
47//! | 2 | 0.6 | false | "text" |
48//! +-----+--------+-------+--------+
49//! | 1 | 2 | false | "4" |
50//! +-----+--------+-------+--------+
51//! | 7 | -3.5e0 | true | "4" |
52//! +-----+--------+-------+--------+
53//! | 1 | 0.6 | false | "text" |
54//! +-----+--------+-------+--------+
55//! | 1 | 2 | false | "4" |
56//! +-----+--------+-------+--------+
57//! | 5 | -3.5e0 | true | "4" |
58//! +-----+--------+-------+--------+
59//! | 1 | 0.6 | false | "text" |
60//! +-----+--------+-------+--------+
61//! | 1 | 2 | false | "4" |
62//! +-----+--------+-------+--------+
63//! ```
64//!
65pub(crate) mod infer;
66
67use std::io::Write;
68use std::num::NonZeroUsize;
69use std::ops::Deref;
70
71use arrow::array::LIST_VALUES_NAME;
72use arrow::legacy::conversion::chunk_to_struct;
73use polars_core::error::to_compute_err;
74use polars_core::prelude::*;
75use polars_error::{PolarsResult, polars_bail};
76use polars_json::json::write::FallibleStreamingIterator;
77#[cfg(feature = "serde")]
78use serde::{Deserialize, Serialize};
79use simd_json::BorrowedValue;
80
81use crate::mmap::{MmapBytesReader, ReaderBytes};
82use crate::prelude::*;
83
84#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
85#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
86#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
87pub struct JsonWriterOptions {}
88
89/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
90/// or `JsonLines` (each row output on a separate line).
91///
92/// In either case, each row is serialized as a JSON object whose keys are the column names and
93/// whose values are the row's corresponding values.
94pub enum JsonFormat {
95 /// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in
96 /// the DataFrame.
97 ///
98 /// Use this to create valid JSON that can be deserialized back into an array in one fell swoop.
99 Json,
100 /// Each DataFrame row is serialized as a JSON object on a separate line. The number of lines in the output is the
101 /// number of rows in the DataFrame.
102 ///
103 /// The [JSON Lines](https://jsonlines.org) format makes it easy to read records in a streaming fashion, one (line)
104 /// at a time. But the output in its entirety is not valid JSON; only the individual lines are.
105 ///
106 /// It is recommended to use the file extension `.jsonl` when saving as JSON Lines.
107 JsonLines,
108}
109
110/// Writes a DataFrame to JSON.
111///
112/// Under the hood, this uses [`arrow2::io::json`](https://docs.rs/arrow2/latest/arrow2/io/json/write/fn.write.html).
113/// `arrow2` generally serializes types that are not JSON primitives, such as Date and DateTime, as their
114/// `Display`-formatted versions. For instance, a (naive) DateTime column is formatted as the String `"yyyy-mm-dd
115/// HH:MM:SS"`. To control how non-primitive columns are serialized, convert them to String or another primitive type
116/// before serializing.
117#[must_use]
118pub struct JsonWriter<W: Write> {
119 /// File or Stream handler
120 buffer: W,
121 json_format: JsonFormat,
122}
123
124impl<W: Write> JsonWriter<W> {
125 pub fn with_json_format(mut self, format: JsonFormat) -> Self {
126 self.json_format = format;
127 self
128 }
129}
130
131impl<W> SerWriter<W> for JsonWriter<W>
132where
133 W: Write,
134{
135 /// Create a new `JsonWriter` writing to `buffer` with format `JsonFormat::JsonLines`. To specify a different
136 /// format, use e.g., [`JsonWriter::new(buffer).with_json_format(JsonFormat::Json)`](JsonWriter::with_json_format).
137 fn new(buffer: W) -> Self {
138 JsonWriter {
139 buffer,
140 json_format: JsonFormat::JsonLines,
141 }
142 }
143
144 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
145 df.align_chunks_par();
146 let fields = df
147 .iter()
148 .map(|s| {
149 #[cfg(feature = "object")]
150 polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
151 Ok(s.field().to_arrow(CompatLevel::newest()))
152 })
153 .collect::<PolarsResult<Vec<_>>>()?;
154 let batches = df
155 .iter_chunks(CompatLevel::newest(), false)
156 .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
157
158 match self.json_format {
159 JsonFormat::JsonLines => {
160 let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
161 let writer =
162 polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
163 writer.collect::<PolarsResult<()>>()?;
164 },
165 JsonFormat::Json => {
166 let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
167 polars_json::json::write::write(&mut self.buffer, serializer)?;
168 },
169 }
170
171 Ok(())
172 }
173}
174
175pub struct BatchedWriter<W: Write> {
176 writer: W,
177}
178
179impl<W> BatchedWriter<W>
180where
181 W: Write,
182{
183 pub fn new(writer: W) -> Self {
184 BatchedWriter { writer }
185 }
186 /// Write a batch to the json writer.
187 ///
188 /// # Panics
189 /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
190 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
191 let fields = df
192 .iter()
193 .map(|s| {
194 #[cfg(feature = "object")]
195 polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
196 Ok(s.field().to_arrow(CompatLevel::newest()))
197 })
198 .collect::<PolarsResult<Vec<_>>>()?;
199 let chunks = df.iter_chunks(CompatLevel::newest(), false);
200 let batches =
201 chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
202 let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
203 while let Some(block) = serializer.next()? {
204 self.writer.write_all(block)?;
205 }
206 Ok(())
207 }
208}
209
210/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
211#[must_use]
212pub struct JsonReader<'a, R>
213where
214 R: MmapBytesReader,
215{
216 reader: R,
217 rechunk: bool,
218 ignore_errors: bool,
219 infer_schema_len: Option<NonZeroUsize>,
220 batch_size: NonZeroUsize,
221 projection: Option<Vec<PlSmallStr>>,
222 schema: Option<SchemaRef>,
223 schema_overwrite: Option<&'a Schema>,
224 json_format: JsonFormat,
225}
226
227pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
228 if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
229 // UTF-8 BOM
230 Ok(&bytes[3..])
231 } else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
232 // UTF-16 BOM
233 polars_bail!(ComputeError: "utf-16 not supported")
234 } else {
235 Ok(bytes)
236 }
237}
238impl<R> SerReader<R> for JsonReader<'_, R>
239where
240 R: MmapBytesReader,
241{
242 fn new(reader: R) -> Self {
243 JsonReader {
244 reader,
245 rechunk: true,
246 ignore_errors: false,
247 infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
248 batch_size: NonZeroUsize::new(8192).unwrap(),
249 projection: None,
250 schema: None,
251 schema_overwrite: None,
252 json_format: JsonFormat::Json,
253 }
254 }
255
256 fn set_rechunk(mut self, rechunk: bool) -> Self {
257 self.rechunk = rechunk;
258 self
259 }
260
261 /// Take the SerReader and return a parsed DataFrame.
262 ///
263 /// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
264 /// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
265 /// error is returned or whether elements of incompatible dtypes are replaced with `null`.
266 fn finish(mut self) -> PolarsResult<DataFrame> {
267 let pre_rb: ReaderBytes = (&mut self.reader).into();
268 let bytes = remove_bom(pre_rb.deref())?;
269 let rb = ReaderBytes::Borrowed(bytes);
270 let out = match self.json_format {
271 JsonFormat::Json => {
272 polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
273 let mut bytes = rb.deref().to_vec();
274 let owned = &mut vec![];
275 compression::maybe_decompress_bytes(&bytes, owned)?;
276 // the easiest way to avoid ownership issues is by implicitly figuring out if
277 // decompression happened (owned is only populated on decompress), then pick which bytes to parse
278 let json_value = if owned.is_empty() {
279 simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
280 } else {
281 simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
282 };
283 if let BorrowedValue::Array(array) = &json_value {
284 if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
285 return Ok(DataFrame::empty());
286 }
287 }
288
289 let allow_extra_fields_in_struct = self.schema.is_some();
290
291 // struct type
292 let dtype = if let Some(mut schema) = self.schema {
293 if let Some(overwrite) = self.schema_overwrite {
294 let mut_schema = Arc::make_mut(&mut schema);
295 overwrite_schema(mut_schema, overwrite)?;
296 }
297
298 DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest())
299 } else {
300 // infer
301 let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
302 infer::json_values_to_supertype(
303 values,
304 self.infer_schema_len
305 .unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
306 )?
307 .to_arrow(CompatLevel::newest())
308 } else {
309 polars_json::json::infer(&json_value)?
310 };
311
312 if let Some(overwrite) = self.schema_overwrite {
313 let ArrowDataType::Struct(fields) = inner_dtype else {
314 polars_bail!(ComputeError: "can only deserialize json objects")
315 };
316
317 let mut schema = Schema::from_iter(fields.iter().map(Into::<Field>::into));
318 overwrite_schema(&mut schema, overwrite)?;
319
320 DataType::Struct(
321 schema
322 .into_iter()
323 .map(|(name, dt)| Field::new(name, dt))
324 .collect(),
325 )
326 .to_arrow(CompatLevel::newest())
327 } else {
328 inner_dtype
329 }
330 };
331
332 let dtype = if let BorrowedValue::Array(_) = &json_value {
333 ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
334 LIST_VALUES_NAME,
335 dtype,
336 true,
337 )))
338 } else {
339 dtype
340 };
341
342 let arr = polars_json::json::deserialize(
343 &json_value,
344 dtype,
345 allow_extra_fields_in_struct,
346 )?;
347 let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
348 || polars_err!(ComputeError: "can only deserialize json objects"),
349 )?;
350 DataFrame::try_from(arr.clone())
351 },
352 JsonFormat::JsonLines => {
353 let mut json_reader = CoreJsonReader::new(
354 rb,
355 None,
356 self.schema,
357 self.schema_overwrite,
358 None,
359 1024, // sample size
360 NonZeroUsize::new(1 << 18).unwrap(),
361 false,
362 self.infer_schema_len,
363 self.ignore_errors,
364 None,
365 None,
366 None,
367 )?;
368 let mut df: DataFrame = json_reader.as_df()?;
369 if self.rechunk {
370 df.as_single_chunk_par();
371 }
372 Ok(df)
373 },
374 }?;
375
376 // TODO! Ensure we don't materialize the columns we don't need
377 if let Some(proj) = self.projection.as_deref() {
378 out.select(proj.iter().cloned())
379 } else {
380 Ok(out)
381 }
382 }
383}
384
385impl<'a, R> JsonReader<'a, R>
386where
387 R: MmapBytesReader,
388{
389 /// Set the JSON file's schema
390 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
391 self.schema = Some(schema);
392 self
393 }
394
395 /// Overwrite parts of the inferred schema.
396 pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
397 self.schema_overwrite = Some(schema);
398 self
399 }
400
401 /// Set the JSON reader to infer the schema of the file. Currently, this is only used when reading from
402 /// [`JsonFormat::JsonLines`], as [`JsonFormat::Json`] reads in the entire array anyway.
403 ///
404 /// When using [`JsonFormat::JsonLines`], `max_records = None` will read the entire buffer in order to infer the
405 /// schema, `Some(1)` would look only at the first record, `Some(2)` the first two records, etc.
406 ///
407 /// It is an error to pass `max_records = Some(0)`, as a schema cannot be inferred from 0 records when deserializing
408 /// from JSON (unlike CSVs, there is no header row to inspect for column names).
409 pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
410 self.infer_schema_len = max_records;
411 self
412 }
413
414 /// Set the batch size (number of records to load at one time)
415 ///
416 /// This heavily influences loading time.
417 pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
418 self.batch_size = batch_size;
419 self
420 }
421
422 /// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
423 /// columns are kept.
424 ///
425 /// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
426 /// then dropping the ones you don't want.
427 pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
428 self.projection = projection;
429 self
430 }
431
432 pub fn with_json_format(mut self, format: JsonFormat) -> Self {
433 self.json_format = format;
434 self
435 }
436
437 /// Return a `null` if an error occurs during parsing.
438 pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
439 self.ignore_errors = ignore;
440 self
441 }
442}