1use core::{get_file_chunks_json, json_lines};
2use std::num::NonZeroUsize;
3
4use arrow::array::StructArray;
5use polars_core::POOL;
6use polars_core::prelude::*;
7use rayon::iter::{IntoParallelIterator, ParallelIterator};
8
9pub(crate) mod buffer;
10pub mod core;
11
12pub fn infer_schema<R: std::io::BufRead>(
13 reader: &mut R,
14 infer_schema_len: Option<NonZeroUsize>,
15) -> PolarsResult<Schema> {
16 let dtypes = polars_json::ndjson::iter_unique_dtypes(reader, infer_schema_len)?;
17 let dtype =
18 crate::json::infer::dtypes_to_supertype(dtypes.map(|dt| DataType::from_arrow_dtype(&dt)))?;
19
20 if !matches!(&dtype, DataType::Struct(_)) {
21 polars_bail!(ComputeError: "NDJSON line expected to contain JSON object: {dtype}");
22 }
23
24 let schema = StructArray::get_fields(&dtype.to_arrow(CompatLevel::newest()))
25 .iter()
26 .map(Into::<Field>::into)
27 .collect();
28 Ok(schema)
29}
30
31pub fn count_rows_par(full_bytes: &[u8], n_threads: Option<usize>) -> usize {
36 let n_threads = n_threads.unwrap_or(POOL.current_num_threads());
37 let file_chunks = get_file_chunks_json(full_bytes, n_threads);
38
39 if file_chunks.len() == 1 {
40 count_rows(full_bytes)
41 } else {
42 let iter = file_chunks
43 .into_par_iter()
44 .map(|(start_pos, stop_at_nbytes)| count_rows(&full_bytes[start_pos..stop_at_nbytes]));
45
46 POOL.install(|| iter.sum())
47 }
48}
49
50pub fn count_rows(full_bytes: &[u8]) -> usize {
53 json_lines(full_bytes).count()
54}