polars_io/ndjson/
mod.rs

1use core::json_lines;
2use std::num::NonZeroUsize;
3
4use arrow::array::StructArray;
5use polars_core::prelude::*;
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8
9use crate::ExternalCompression;
10
11pub(crate) mod buffer;
12pub mod core;
13
14#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
17pub struct NDJsonWriterOptions {
18    pub compression: ExternalCompression,
19    #[cfg_attr(feature = "serde", serde(default))]
20    pub check_extension: bool,
21}
22
23pub fn infer_schema<R: std::io::BufRead>(
24    reader: &mut R,
25    infer_schema_len: Option<NonZeroUsize>,
26) -> PolarsResult<Schema> {
27    let dtypes = polars_json::ndjson::iter_unique_dtypes(reader, infer_schema_len)?;
28    let dtype =
29        crate::json::infer::dtypes_to_supertype(dtypes.map(|dt| DataType::from_arrow_dtype(&dt)))?;
30
31    if !matches!(&dtype, DataType::Struct(_)) {
32        polars_bail!(ComputeError: "NDJSON line expected to contain JSON object: {dtype}");
33    }
34
35    let schema = StructArray::get_fields(&dtype.to_arrow(CompatLevel::newest()))
36        .iter()
37        .map(Into::<Field>::into)
38        .collect();
39    Ok(schema)
40}
41
42/// Count the number of rows. The slice passed must represent the entire file.
43/// This does not check if the lines are valid NDJSON - it assumes that is the case.
44pub fn count_rows(full_bytes: &[u8]) -> usize {
45    json_lines(full_bytes).count()
46}