polars_io/ndjson/
mod.rs

1use core::{get_file_chunks_json, json_lines};
2use std::num::NonZeroUsize;
3
4use arrow::array::StructArray;
5use polars_core::POOL;
6use polars_core::prelude::*;
7use rayon::iter::{IntoParallelIterator, ParallelIterator};
8
9pub(crate) mod buffer;
10pub mod core;
11
12pub fn infer_schema<R: std::io::BufRead>(
13    reader: &mut R,
14    infer_schema_len: Option<NonZeroUsize>,
15) -> PolarsResult<Schema> {
16    let dtypes = polars_json::ndjson::iter_unique_dtypes(reader, infer_schema_len)?;
17    let dtype =
18        crate::json::infer::dtypes_to_supertype(dtypes.map(|dt| DataType::from_arrow_dtype(&dt)))?;
19    let schema = StructArray::get_fields(&dtype.to_arrow(CompatLevel::newest()))
20        .iter()
21        .map(Into::<Field>::into)
22        .collect();
23    Ok(schema)
24}
25
26/// Count the number of rows. The slice passed must represent the entire file. This will
27/// potentially parallelize using rayon.
28///
29/// This does not check if the lines are valid NDJSON - it assumes that is the case.
30pub fn count_rows_par(full_bytes: &[u8], n_threads: Option<usize>) -> usize {
31    let n_threads = n_threads.unwrap_or(POOL.current_num_threads());
32    let file_chunks = get_file_chunks_json(full_bytes, n_threads);
33
34    if file_chunks.len() == 1 {
35        count_rows(full_bytes)
36    } else {
37        let iter = file_chunks
38            .into_par_iter()
39            .map(|(start_pos, stop_at_nbytes)| count_rows(&full_bytes[start_pos..stop_at_nbytes]));
40
41        POOL.install(|| iter.sum())
42    }
43}
44
45/// Count the number of rows. The slice passed must represent the entire file.
46/// This does not check if the lines are valid NDJSON - it assumes that is the case.
47pub fn count_rows(full_bytes: &[u8]) -> usize {
48    json_lines(full_bytes).count()
49}