1use core::{get_file_chunks_json, json_lines};
2use std::num::NonZeroUsize;
34use arrow::array::StructArray;
5use polars_core::POOL;
6use polars_core::prelude::*;
7use rayon::iter::{IntoParallelIterator, ParallelIterator};
89pub(crate) mod buffer;
10pub mod core;
1112pub fn infer_schema<R: std::io::BufRead>(
13 reader: &mut R,
14 infer_schema_len: Option<NonZeroUsize>,
15) -> PolarsResult<Schema> {
16let dtypes = polars_json::ndjson::iter_unique_dtypes(reader, infer_schema_len)?;
17let dtype =
18crate::json::infer::dtypes_to_supertype(dtypes.map(|dt| DataType::from_arrow_dtype(&dt)))?;
19let schema = StructArray::get_fields(&dtype.to_arrow(CompatLevel::newest()))
20 .iter()
21 .map(Into::<Field>::into)
22 .collect();
23Ok(schema)
24}
2526/// Count the number of rows. The slice passed must represent the entire file. This will
27/// potentially parallelize using rayon.
28///
29/// This does not check if the lines are valid NDJSON - it assumes that is the case.
30pub fn count_rows_par(full_bytes: &[u8], n_threads: Option<usize>) -> usize {
31let n_threads = n_threads.unwrap_or(POOL.current_num_threads());
32let file_chunks = get_file_chunks_json(full_bytes, n_threads);
3334if file_chunks.len() == 1 {
35 count_rows(full_bytes)
36 } else {
37let iter = file_chunks
38 .into_par_iter()
39 .map(|(start_pos, stop_at_nbytes)| count_rows(&full_bytes[start_pos..stop_at_nbytes]));
4041 POOL.install(|| iter.sum())
42 }
43}
4445/// Count the number of rows. The slice passed must represent the entire file.
46/// This does not check if the lines are valid NDJSON - it assumes that is the case.
47pub fn count_rows(full_bytes: &[u8]) -> usize {
48 json_lines(full_bytes).count()
49}