polars_io/json/mod.rs
1//! # (De)serialize JSON files.
2//!
3//! ## Read JSON to a DataFrame
4//!
5//! ## Example
6//!
7//! ```
8//! use polars_core::prelude::*;
9//! use polars_io::prelude::*;
10//! use std::io::Cursor;
11//! use std::num::NonZeroUsize;
12//!
13//! let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
14//! {"a":-10, "b":-3.5, "c":true, "d":"4"}
15//! {"a":2, "b":0.6, "c":false, "d":"text"}
16//! {"a":1, "b":2.0, "c":false, "d":"4"}
17//! {"a":7, "b":-3.5, "c":true, "d":"4"}
18//! {"a":1, "b":0.6, "c":false, "d":"text"}
19//! {"a":1, "b":2.0, "c":false, "d":"4"}
20//! {"a":5, "b":-3.5, "c":true, "d":"4"}
21//! {"a":1, "b":0.6, "c":false, "d":"text"}
22//! {"a":1, "b":2.0, "c":false, "d":"4"}
23//! {"a":1, "b":-3.5, "c":true, "d":"4"}
24//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
25//! let file = Cursor::new(basic_json);
26//! let df = JsonReader::new(file)
27//! .with_json_format(JsonFormat::JsonLines)
28//! .infer_schema_len(NonZeroUsize::new(3))
29//! .with_batch_size(NonZeroUsize::new(3).unwrap())
30//! .finish()
31//! .unwrap();
32//!
33//! println!("{:?}", df);
34//! ```
35//! >>> Outputs:
36//!
37//! ```text
38//! +-----+--------+-------+--------+
39//! | a | b | c | d |
40//! | --- | --- | --- | --- |
41//! | i64 | f64 | bool | str |
42//! +=====+========+=======+========+
43//! | 1 | 2 | false | "4" |
44//! +-----+--------+-------+--------+
45//! | -10 | -3.5e0 | true | "4" |
46//! +-----+--------+-------+--------+
47//! | 2 | 0.6 | false | "text" |
48//! +-----+--------+-------+--------+
49//! | 1 | 2 | false | "4" |
50//! +-----+--------+-------+--------+
51//! | 7 | -3.5e0 | true | "4" |
52//! +-----+--------+-------+--------+
53//! | 1 | 0.6 | false | "text" |
54//! +-----+--------+-------+--------+
55//! | 1 | 2 | false | "4" |
56//! +-----+--------+-------+--------+
57//! | 5 | -3.5e0 | true | "4" |
58//! +-----+--------+-------+--------+
59//! | 1 | 0.6 | false | "text" |
60//! +-----+--------+-------+--------+
61//! | 1 | 2 | false | "4" |
62//! +-----+--------+-------+--------+
63//! ```
64//!
65pub(crate) mod infer;
66
67use std::io::Write;
68use std::num::NonZeroUsize;
69use std::ops::Deref;
70
71use arrow::array::LIST_VALUES_NAME;
72use arrow::legacy::conversion::chunk_to_struct;
73use polars_core::chunked_array::cast::CastOptions;
74use polars_core::error::to_compute_err;
75use polars_core::prelude::*;
76use polars_error::{PolarsResult, polars_bail};
77use polars_json::json::write::FallibleStreamingIterator;
78use simd_json::BorrowedValue;
79
80use crate::mmap::{MmapBytesReader, ReaderBytes};
81use crate::prelude::*;
82
83/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
84/// or `JsonLines` (each row output on a separate line).
85///
86/// In either case, each row is serialized as a JSON object whose keys are the column names and
87/// whose values are the row's corresponding values.
88pub enum JsonFormat {
89 /// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in
90 /// the DataFrame.
91 ///
92 /// Use this to create valid JSON that can be deserialized back into an array in one fell swoop.
93 Json,
94 /// Each DataFrame row is serialized as a JSON object on a separate line. The number of lines in the output is the
95 /// number of rows in the DataFrame.
96 ///
97 /// The [JSON Lines](https://jsonlines.org) format makes it easy to read records in a streaming fashion, one (line)
98 /// at a time. But the output in its entirety is not valid JSON; only the individual lines are.
99 ///
100 /// It is recommended to use the file extension `.jsonl` when saving as JSON Lines.
101 JsonLines,
102}
103
104/// Writes a DataFrame to JSON.
105///
106/// Under the hood, this uses [`arrow2::io::json`](https://docs.rs/arrow2/latest/arrow2/io/json/write/fn.write.html).
107/// `arrow2` generally serializes types that are not JSON primitives, such as Date and DateTime, as their
108/// `Display`-formatted versions. For instance, a (naive) DateTime column is formatted as the String `"yyyy-mm-dd
109/// HH:MM:SS"`. To control how non-primitive columns are serialized, convert them to String or another primitive type
110/// before serializing.
111#[must_use]
112pub struct JsonWriter<W: Write> {
113 /// File or Stream handler
114 buffer: W,
115 json_format: JsonFormat,
116}
117
118impl<W: Write> JsonWriter<W> {
119 pub fn with_json_format(mut self, format: JsonFormat) -> Self {
120 self.json_format = format;
121 self
122 }
123}
124
125impl<W> SerWriter<W> for JsonWriter<W>
126where
127 W: Write,
128{
129 /// Create a new `JsonWriter` writing to `buffer` with format `JsonFormat::JsonLines`. To specify a different
130 /// format, use e.g., [`JsonWriter::new(buffer).with_json_format(JsonFormat::Json)`](JsonWriter::with_json_format).
131 fn new(buffer: W) -> Self {
132 JsonWriter {
133 buffer,
134 json_format: JsonFormat::JsonLines,
135 }
136 }
137
138 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
139 df.align_chunks_par();
140 let fields = df.columns()
141 .iter()
142 .map(|s| {
143 #[cfg(feature = "object")]
144 polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
145 Ok(s.field().to_arrow(CompatLevel::newest()))
146 })
147 .collect::<PolarsResult<Vec<_>>>()?;
148 let batches = df
149 .iter_chunks(CompatLevel::newest(), false)
150 .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
151
152 match self.json_format {
153 JsonFormat::JsonLines => {
154 let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
155 let writer =
156 polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
157 writer.collect::<PolarsResult<()>>()?;
158 },
159 JsonFormat::Json => {
160 let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
161 polars_json::json::write::write(&mut self.buffer, serializer)?;
162 },
163 }
164
165 Ok(())
166 }
167}
168
169pub struct BatchedWriter<W: Write> {
170 writer: W,
171}
172
173impl<W> BatchedWriter<W>
174where
175 W: Write,
176{
177 pub fn new(writer: W) -> Self {
178 BatchedWriter { writer }
179 }
180 /// Write a batch to the json writer.
181 ///
182 /// # Panics
183 /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
184 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
185 let fields = df.columns()
186 .iter()
187 .map(|s| {
188 #[cfg(feature = "object")]
189 polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
190 Ok(s.field().to_arrow(CompatLevel::newest()))
191 })
192 .collect::<PolarsResult<Vec<_>>>()?;
193 let chunks = df.iter_chunks(CompatLevel::newest(), false);
194 let batches =
195 chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
196 let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
197 while let Some(block) = serializer.next()? {
198 self.writer.write_all(block)?;
199 }
200 Ok(())
201 }
202}
203
204/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
205#[must_use]
206pub struct JsonReader<'a, R>
207where
208 R: MmapBytesReader,
209{
210 reader: R,
211 rechunk: bool,
212 ignore_errors: bool,
213 infer_schema_len: Option<NonZeroUsize>,
214 batch_size: NonZeroUsize,
215 projection: Option<Vec<PlSmallStr>>,
216 schema: Option<SchemaRef>,
217 schema_overwrite: Option<&'a Schema>,
218 json_format: JsonFormat,
219}
220
221pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
222 if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
223 // UTF-8 BOM
224 Ok(&bytes[3..])
225 } else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
226 // UTF-16 BOM
227 polars_bail!(ComputeError: "utf-16 not supported")
228 } else {
229 Ok(bytes)
230 }
231}
232impl<R> SerReader<R> for JsonReader<'_, R>
233where
234 R: MmapBytesReader,
235{
236 fn new(reader: R) -> Self {
237 JsonReader {
238 reader,
239 rechunk: true,
240 ignore_errors: false,
241 infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
242 batch_size: NonZeroUsize::new(8192).unwrap(),
243 projection: None,
244 schema: None,
245 schema_overwrite: None,
246 json_format: JsonFormat::Json,
247 }
248 }
249
250 fn set_rechunk(mut self, rechunk: bool) -> Self {
251 self.rechunk = rechunk;
252 self
253 }
254
255 /// Take the SerReader and return a parsed DataFrame.
256 ///
257 /// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
258 /// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
259 /// error is returned or whether elements of incompatible dtypes are replaced with `null`.
260 fn finish(mut self) -> PolarsResult<DataFrame> {
261 let pre_rb: ReaderBytes = (&mut self.reader).into();
262 let bytes = remove_bom(pre_rb.deref())?;
263 let rb = ReaderBytes::Borrowed(bytes);
264 let out = match self.json_format {
265 JsonFormat::Json => {
266 polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
267 let mut bytes = rb.deref().to_vec();
268 let owned = &mut vec![];
269 #[expect(deprecated)] // JSON is not a row-format
270 compression::maybe_decompress_bytes(&bytes, owned)?;
271 // the easiest way to avoid ownership issues is by implicitly figuring out if
272 // decompression happened (owned is only populated on decompress), then pick which bytes to parse
273 let json_value = if owned.is_empty() {
274 simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
275 } else {
276 simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
277 };
278 if let BorrowedValue::Array(array) = &json_value {
279 if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
280 return Ok(DataFrame::empty());
281 }
282 }
283
284 let allow_extra_fields_in_struct = self.schema.is_some();
285
286 let mut schema = if let Some(schema) = self.schema {
287 Arc::unwrap_or_clone(schema)
288 } else {
289 // Infer.
290 let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
291 infer::json_values_to_supertype(
292 values,
293 self.infer_schema_len
294 .unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
295 )?
296 } else {
297 DataType::from_arrow_dtype(&polars_json::json::infer(&json_value)?)
298 };
299
300 let DataType::Struct(fields) = inner_dtype else {
301 polars_bail!(ComputeError: "can only deserialize json objects")
302 };
303
304 Schema::from_iter(fields)
305 };
306
307 if let Some(overwrite) = self.schema_overwrite {
308 overwrite_schema(&mut schema, overwrite)?;
309 }
310
311 let mut needs_cast = false;
312 let deserialize_schema = schema
313 .iter()
314 .map(|(name, dt)| {
315 Field::new(
316 name.clone(),
317 dt.clone().map_leaves(&mut |leaf_dt| {
318 // Deserialize enums and categoricals as strings first.
319 match leaf_dt {
320 #[cfg(feature = "dtype-categorical")]
321 DataType::Enum(..) | DataType::Categorical(..) => {
322 needs_cast = true;
323 DataType::String
324 },
325 leaf_dt => leaf_dt,
326 }
327 }),
328 )
329 })
330 .collect();
331
332 let arrow_dtype =
333 DataType::Struct(deserialize_schema).to_arrow(CompatLevel::newest());
334
335 let arrow_dtype = if let BorrowedValue::Array(_) = &json_value {
336 ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
337 LIST_VALUES_NAME,
338 arrow_dtype,
339 true,
340 )))
341 } else {
342 arrow_dtype
343 };
344
345 let arr = polars_json::json::deserialize(
346 &json_value,
347 arrow_dtype,
348 allow_extra_fields_in_struct,
349 )?;
350
351 let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
352 || polars_err!(ComputeError: "can only deserialize json objects"),
353 )?;
354
355 let mut df = DataFrame::try_from(arr.clone())?;
356
357 if df.width() == 0 && df.height() <= 1 {
358 // read_json("{}")
359 unsafe { df.set_height(0) };
360 }
361
362 if needs_cast {
363 for (col, dt) in unsafe { df.columns_mut() }
364 .iter_mut()
365 .zip(schema.iter_values())
366 {
367 *col = col.cast_with_options(
368 dt,
369 if self.ignore_errors {
370 CastOptions::NonStrict
371 } else {
372 CastOptions::Strict
373 },
374 )?;
375 }
376 }
377
378 df
379 },
380 JsonFormat::JsonLines => {
381 let mut json_reader = CoreJsonReader::new(
382 rb,
383 None,
384 self.schema,
385 self.schema_overwrite,
386 None,
387 1024, // sample size
388 NonZeroUsize::new(1 << 18).unwrap(),
389 false,
390 self.infer_schema_len,
391 self.ignore_errors,
392 None,
393 None,
394 None,
395 )?;
396 let mut df: DataFrame = json_reader.as_df()?;
397 if self.rechunk {
398 df.rechunk_mut_par();
399 }
400
401 df
402 },
403 };
404
405 // TODO! Ensure we don't materialize the columns we don't need
406 if let Some(proj) = self.projection.as_deref() {
407 out.select(proj.iter().cloned())
408 } else {
409 Ok(out)
410 }
411 }
412}
413
414impl<'a, R> JsonReader<'a, R>
415where
416 R: MmapBytesReader,
417{
418 /// Set the JSON file's schema
419 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
420 self.schema = Some(schema);
421 self
422 }
423
424 /// Overwrite parts of the inferred schema.
425 pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
426 self.schema_overwrite = Some(schema);
427 self
428 }
429
430 /// Set the JSON reader to infer the schema of the file. Currently, this is only used when reading from
431 /// [`JsonFormat::JsonLines`], as [`JsonFormat::Json`] reads in the entire array anyway.
432 ///
433 /// When using [`JsonFormat::JsonLines`], `max_records = None` will read the entire buffer in order to infer the
434 /// schema, `Some(1)` would look only at the first record, `Some(2)` the first two records, etc.
435 ///
436 /// It is an error to pass `max_records = Some(0)`, as a schema cannot be inferred from 0 records when deserializing
437 /// from JSON (unlike CSVs, there is no header row to inspect for column names).
438 pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
439 self.infer_schema_len = max_records;
440 self
441 }
442
443 /// Set the batch size (number of records to load at one time)
444 ///
445 /// This heavily influences loading time.
446 pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
447 self.batch_size = batch_size;
448 self
449 }
450
451 /// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
452 /// columns are kept.
453 ///
454 /// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
455 /// then dropping the ones you don't want.
456 pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
457 self.projection = projection;
458 self
459 }
460
461 pub fn with_json_format(mut self, format: JsonFormat) -> Self {
462 self.json_format = format;
463 self
464 }
465
466 /// Return a `null` if an error occurs during parsing.
467 pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
468 self.ignore_errors = ignore;
469 self
470 }
471}