polars_io/json/mod.rs
1//! # (De)serialize JSON files.
2//!
3//! ## Read JSON to a DataFrame
4//!
5//! ## Example
6//!
7//! ```
8//! use polars_core::prelude::*;
9//! use polars_io::prelude::*;
10//! use std::io::Cursor;
11//! use std::num::NonZeroUsize;
12//!
13//! let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
14//! {"a":-10, "b":-3.5, "c":true, "d":"4"}
15//! {"a":2, "b":0.6, "c":false, "d":"text"}
16//! {"a":1, "b":2.0, "c":false, "d":"4"}
17//! {"a":7, "b":-3.5, "c":true, "d":"4"}
18//! {"a":1, "b":0.6, "c":false, "d":"text"}
19//! {"a":1, "b":2.0, "c":false, "d":"4"}
20//! {"a":5, "b":-3.5, "c":true, "d":"4"}
21//! {"a":1, "b":0.6, "c":false, "d":"text"}
22//! {"a":1, "b":2.0, "c":false, "d":"4"}
23//! {"a":1, "b":-3.5, "c":true, "d":"4"}
24//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
25//! let file = Cursor::new(basic_json);
26//! let df = JsonReader::new(file)
27//! .with_json_format(JsonFormat::JsonLines)
28//! .infer_schema_len(NonZeroUsize::new(3))
29//! .with_batch_size(NonZeroUsize::new(3).unwrap())
30//! .finish()
31//! .unwrap();
32//!
33//! println!("{:?}", df);
34//! ```
35//! >>> Outputs:
36//!
37//! ```text
38//! +-----+--------+-------+--------+
39//! | a | b | c | d |
40//! | --- | --- | --- | --- |
41//! | i64 | f64 | bool | str |
42//! +=====+========+=======+========+
43//! | 1 | 2 | false | "4" |
44//! +-----+--------+-------+--------+
45//! | -10 | -3.5e0 | true | "4" |
46//! +-----+--------+-------+--------+
47//! | 2 | 0.6 | false | "text" |
48//! +-----+--------+-------+--------+
49//! | 1 | 2 | false | "4" |
50//! +-----+--------+-------+--------+
51//! | 7 | -3.5e0 | true | "4" |
52//! +-----+--------+-------+--------+
53//! | 1 | 0.6 | false | "text" |
54//! +-----+--------+-------+--------+
55//! | 1 | 2 | false | "4" |
56//! +-----+--------+-------+--------+
57//! | 5 | -3.5e0 | true | "4" |
58//! +-----+--------+-------+--------+
59//! | 1 | 0.6 | false | "text" |
60//! +-----+--------+-------+--------+
61//! | 1 | 2 | false | "4" |
62//! +-----+--------+-------+--------+
63//! ```
64//!
65pub(crate) mod infer;
66
67use std::io::Write;
68use std::num::NonZeroUsize;
69use std::ops::Deref;
70
71use arrow::legacy::conversion::chunk_to_struct;
72use polars_core::error::to_compute_err;
73use polars_core::prelude::*;
74use polars_error::{PolarsResult, polars_bail};
75use polars_json::json::write::FallibleStreamingIterator;
76#[cfg(feature = "serde")]
77use serde::{Deserialize, Serialize};
78use simd_json::BorrowedValue;
79
80use crate::mmap::{MmapBytesReader, ReaderBytes};
81use crate::prelude::*;
82
83#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
84#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
85#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
86pub struct JsonWriterOptions {}
87
88/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
89/// or `JsonLines` (each row output on a separate line).
90///
91/// In either case, each row is serialized as a JSON object whose keys are the column names and
92/// whose values are the row's corresponding values.
93pub enum JsonFormat {
94 /// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in
95 /// the DataFrame.
96 ///
97 /// Use this to create valid JSON that can be deserialized back into an array in one fell swoop.
98 Json,
99 /// Each DataFrame row is serialized as a JSON object on a separate line. The number of lines in the output is the
100 /// number of rows in the DataFrame.
101 ///
102 /// The [JSON Lines](https://jsonlines.org) format makes it easy to read records in a streaming fashion, one (line)
103 /// at a time. But the output in its entirety is not valid JSON; only the individual lines are.
104 ///
105 /// It is recommended to use the file extension `.jsonl` when saving as JSON Lines.
106 JsonLines,
107}
108
109/// Writes a DataFrame to JSON.
110///
111/// Under the hood, this uses [`arrow2::io::json`](https://docs.rs/arrow2/latest/arrow2/io/json/write/fn.write.html).
112/// `arrow2` generally serializes types that are not JSON primitives, such as Date and DateTime, as their
113/// `Display`-formatted versions. For instance, a (naive) DateTime column is formatted as the String `"yyyy-mm-dd
114/// HH:MM:SS"`. To control how non-primitive columns are serialized, convert them to String or another primitive type
115/// before serializing.
116#[must_use]
117pub struct JsonWriter<W: Write> {
118 /// File or Stream handler
119 buffer: W,
120 json_format: JsonFormat,
121}
122
123impl<W: Write> JsonWriter<W> {
124 pub fn with_json_format(mut self, format: JsonFormat) -> Self {
125 self.json_format = format;
126 self
127 }
128}
129
130impl<W> SerWriter<W> for JsonWriter<W>
131where
132 W: Write,
133{
134 /// Create a new `JsonWriter` writing to `buffer` with format `JsonFormat::JsonLines`. To specify a different
135 /// format, use e.g., [`JsonWriter::new(buffer).with_json_format(JsonFormat::Json)`](JsonWriter::with_json_format).
136 fn new(buffer: W) -> Self {
137 JsonWriter {
138 buffer,
139 json_format: JsonFormat::JsonLines,
140 }
141 }
142
143 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
144 df.align_chunks_par();
145 let fields = df
146 .iter()
147 .map(|s| {
148 #[cfg(feature = "object")]
149 polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
150 Ok(s.field().to_arrow(CompatLevel::newest()))
151 })
152 .collect::<PolarsResult<Vec<_>>>()?;
153 let batches = df
154 .iter_chunks(CompatLevel::newest(), false)
155 .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
156
157 match self.json_format {
158 JsonFormat::JsonLines => {
159 let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
160 let writer =
161 polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
162 writer.collect::<PolarsResult<()>>()?;
163 },
164 JsonFormat::Json => {
165 let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
166 polars_json::json::write::write(&mut self.buffer, serializer)?;
167 },
168 }
169
170 Ok(())
171 }
172}
173
174pub struct BatchedWriter<W: Write> {
175 writer: W,
176}
177
178impl<W> BatchedWriter<W>
179where
180 W: Write,
181{
182 pub fn new(writer: W) -> Self {
183 BatchedWriter { writer }
184 }
185 /// Write a batch to the json writer.
186 ///
187 /// # Panics
188 /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
189 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
190 let fields = df
191 .iter()
192 .map(|s| {
193 #[cfg(feature = "object")]
194 polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
195 Ok(s.field().to_arrow(CompatLevel::newest()))
196 })
197 .collect::<PolarsResult<Vec<_>>>()?;
198 let chunks = df.iter_chunks(CompatLevel::newest(), false);
199 let batches =
200 chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
201 let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
202 while let Some(block) = serializer.next()? {
203 self.writer.write_all(block)?;
204 }
205 Ok(())
206 }
207}
208
209/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
210#[must_use]
211pub struct JsonReader<'a, R>
212where
213 R: MmapBytesReader,
214{
215 reader: R,
216 rechunk: bool,
217 ignore_errors: bool,
218 infer_schema_len: Option<NonZeroUsize>,
219 batch_size: NonZeroUsize,
220 projection: Option<Vec<PlSmallStr>>,
221 schema: Option<SchemaRef>,
222 schema_overwrite: Option<&'a Schema>,
223 json_format: JsonFormat,
224}
225
226pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
227 if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
228 // UTF-8 BOM
229 Ok(&bytes[3..])
230 } else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
231 // UTF-16 BOM
232 polars_bail!(ComputeError: "utf-16 not supported")
233 } else {
234 Ok(bytes)
235 }
236}
237impl<R> SerReader<R> for JsonReader<'_, R>
238where
239 R: MmapBytesReader,
240{
241 fn new(reader: R) -> Self {
242 JsonReader {
243 reader,
244 rechunk: true,
245 ignore_errors: false,
246 infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
247 batch_size: NonZeroUsize::new(8192).unwrap(),
248 projection: None,
249 schema: None,
250 schema_overwrite: None,
251 json_format: JsonFormat::Json,
252 }
253 }
254
255 fn set_rechunk(mut self, rechunk: bool) -> Self {
256 self.rechunk = rechunk;
257 self
258 }
259
260 /// Take the SerReader and return a parsed DataFrame.
261 ///
262 /// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
263 /// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
264 /// error is returned or whether elements of incompatible dtypes are replaced with `null`.
265 fn finish(mut self) -> PolarsResult<DataFrame> {
266 let pre_rb: ReaderBytes = (&mut self.reader).into();
267 let bytes = remove_bom(pre_rb.deref())?;
268 let rb = ReaderBytes::Borrowed(bytes);
269 let out = match self.json_format {
270 JsonFormat::Json => {
271 polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
272 let mut bytes = rb.deref().to_vec();
273 let owned = &mut vec![];
274 compression::maybe_decompress_bytes(&bytes, owned)?;
275 // the easiest way to avoid ownership issues is by implicitly figuring out if
276 // decompression happened (owned is only populated on decompress), then pick which bytes to parse
277 let json_value = if owned.is_empty() {
278 simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
279 } else {
280 simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
281 };
282 if let BorrowedValue::Array(array) = &json_value {
283 if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
284 return Ok(DataFrame::empty());
285 }
286 }
287
288 let allow_extra_fields_in_struct = self.schema.is_some();
289
290 // struct type
291 let dtype = if let Some(mut schema) = self.schema {
292 if let Some(overwrite) = self.schema_overwrite {
293 let mut_schema = Arc::make_mut(&mut schema);
294 overwrite_schema(mut_schema, overwrite)?;
295 }
296
297 DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest())
298 } else {
299 // infer
300 let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
301 infer::json_values_to_supertype(
302 values,
303 self.infer_schema_len
304 .unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
305 )?
306 .to_arrow(CompatLevel::newest())
307 } else {
308 polars_json::json::infer(&json_value)?
309 };
310
311 if let Some(overwrite) = self.schema_overwrite {
312 let ArrowDataType::Struct(fields) = inner_dtype else {
313 polars_bail!(ComputeError: "can only deserialize json objects")
314 };
315
316 let mut schema = Schema::from_iter(fields.iter().map(Into::<Field>::into));
317 overwrite_schema(&mut schema, overwrite)?;
318
319 DataType::Struct(
320 schema
321 .into_iter()
322 .map(|(name, dt)| Field::new(name, dt))
323 .collect(),
324 )
325 .to_arrow(CompatLevel::newest())
326 } else {
327 inner_dtype
328 }
329 };
330
331 let dtype = if let BorrowedValue::Array(_) = &json_value {
332 ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
333 PlSmallStr::from_static("item"),
334 dtype,
335 true,
336 )))
337 } else {
338 dtype
339 };
340
341 let arr = polars_json::json::deserialize(
342 &json_value,
343 dtype,
344 allow_extra_fields_in_struct,
345 )?;
346 let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
347 || polars_err!(ComputeError: "can only deserialize json objects"),
348 )?;
349 DataFrame::try_from(arr.clone())
350 },
351 JsonFormat::JsonLines => {
352 let mut json_reader = CoreJsonReader::new(
353 rb,
354 None,
355 self.schema,
356 self.schema_overwrite,
357 None,
358 1024, // sample size
359 NonZeroUsize::new(1 << 18).unwrap(),
360 false,
361 self.infer_schema_len,
362 self.ignore_errors,
363 None,
364 None,
365 None,
366 )?;
367 let mut df: DataFrame = json_reader.as_df()?;
368 if self.rechunk {
369 df.as_single_chunk_par();
370 }
371 Ok(df)
372 },
373 }?;
374
375 // TODO! Ensure we don't materialize the columns we don't need
376 if let Some(proj) = self.projection.as_deref() {
377 out.select(proj.iter().cloned())
378 } else {
379 Ok(out)
380 }
381 }
382}
383
384impl<'a, R> JsonReader<'a, R>
385where
386 R: MmapBytesReader,
387{
388 /// Set the JSON file's schema
389 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
390 self.schema = Some(schema);
391 self
392 }
393
394 /// Overwrite parts of the inferred schema.
395 pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
396 self.schema_overwrite = Some(schema);
397 self
398 }
399
400 /// Set the JSON reader to infer the schema of the file. Currently, this is only used when reading from
401 /// [`JsonFormat::JsonLines`], as [`JsonFormat::Json`] reads in the entire array anyway.
402 ///
403 /// When using [`JsonFormat::JsonLines`], `max_records = None` will read the entire buffer in order to infer the
404 /// schema, `Some(1)` would look only at the first record, `Some(2)` the first two records, etc.
405 ///
406 /// It is an error to pass `max_records = Some(0)`, as a schema cannot be inferred from 0 records when deserializing
407 /// from JSON (unlike CSVs, there is no header row to inspect for column names).
408 pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
409 self.infer_schema_len = max_records;
410 self
411 }
412
413 /// Set the batch size (number of records to load at one time)
414 ///
415 /// This heavily influences loading time.
416 pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
417 self.batch_size = batch_size;
418 self
419 }
420
421 /// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
422 /// columns are kept.
423 ///
424 /// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
425 /// then dropping the ones you don't want.
426 pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
427 self.projection = projection;
428 self
429 }
430
431 pub fn with_json_format(mut self, format: JsonFormat) -> Self {
432 self.json_format = format;
433 self
434 }
435
436 /// Return a `null` if an error occurs during parsing.
437 pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
438 self.ignore_errors = ignore;
439 self
440 }
441}