polars_io/ndjson/
core.rs

1use std::io::Cursor;
2use std::num::NonZeroUsize;
3
4pub use arrow::array::StructArray;
5use num_traits::pow::Pow;
6use polars_core::POOL;
7use polars_core::prelude::*;
8use polars_core::utils::accumulate_dataframes_vertical;
9use rayon::prelude::*;
10
11use crate::RowIndex;
12use crate::mmap::ReaderBytes;
13use crate::ndjson::buffer::*;
14use crate::predicates::PhysicalIoExpr;
15use crate::prelude::*;
16const NEWLINE: u8 = b'\n';
17const CLOSING_BRACKET: u8 = b'}';
18
19pub(crate) struct CoreJsonReader<'a> {
20    reader_bytes: Option<ReaderBytes<'a>>,
21    n_rows: Option<usize>,
22    schema: SchemaRef,
23    n_threads: Option<usize>,
24    sample_size: usize,
25    chunk_size: NonZeroUsize,
26    low_memory: bool,
27    ignore_errors: bool,
28    row_index: Option<&'a mut RowIndex>,
29    predicate: Option<Arc<dyn PhysicalIoExpr>>,
30    projection: Option<Arc<[PlSmallStr]>>,
31}
32impl<'a> CoreJsonReader<'a> {
33    #[allow(clippy::too_many_arguments)]
34    pub(crate) fn new(
35        reader_bytes: ReaderBytes<'a>,
36        n_rows: Option<usize>,
37        schema: Option<SchemaRef>,
38        schema_overwrite: Option<&Schema>,
39        n_threads: Option<usize>,
40        sample_size: usize,
41        chunk_size: NonZeroUsize,
42        low_memory: bool,
43        infer_schema_len: Option<NonZeroUsize>,
44        ignore_errors: bool,
45        row_index: Option<&'a mut RowIndex>,
46        predicate: Option<Arc<dyn PhysicalIoExpr>>,
47        projection: Option<Arc<[PlSmallStr]>>,
48    ) -> PolarsResult<CoreJsonReader<'a>> {
49        let reader_bytes = reader_bytes;
50
51        let mut schema = match schema {
52            Some(schema) => schema,
53            None => {
54                let bytes: &[u8] = &reader_bytes;
55                let mut cursor = Cursor::new(bytes);
56                Arc::new(crate::ndjson::infer_schema(&mut cursor, infer_schema_len)?)
57            },
58        };
59        if let Some(overwriting_schema) = schema_overwrite {
60            let schema = Arc::make_mut(&mut schema);
61            overwrite_schema(schema, overwriting_schema)?;
62        }
63
64        Ok(CoreJsonReader {
65            reader_bytes: Some(reader_bytes),
66            schema,
67            sample_size,
68            n_rows,
69            n_threads,
70            chunk_size,
71            low_memory,
72            ignore_errors,
73            row_index,
74            predicate,
75            projection,
76        })
77    }
78
79    fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult<DataFrame> {
80        let mut bytes = bytes;
81        let mut total_rows = 128;
82
83        if let Some((mean, std)) = get_line_stats_json(bytes, self.sample_size) {
84            let line_length_upper_bound = mean + 1.1 * std;
85
86            total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
87            if let Some(n_rows) = self.n_rows {
88                total_rows = std::cmp::min(n_rows, total_rows);
89                // the guessed upper bound of  the no. of bytes in the file
90                let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;
91
92                if n_bytes < bytes.len() {
93                    if let Some(pos) = next_line_position_naive_json(&bytes[n_bytes..]) {
94                        bytes = &bytes[..n_bytes + pos]
95                    }
96                }
97            }
98        }
99
100        if total_rows <= 128 {
101            n_threads = 1;
102        }
103
104        let rows_per_thread = total_rows / n_threads;
105
106        let max_proxy = bytes.len() / n_threads / 2;
107        let capacity = if self.low_memory {
108            usize::from(self.chunk_size)
109        } else {
110            std::cmp::min(rows_per_thread, max_proxy)
111        };
112        let file_chunks = get_file_chunks_json(bytes, n_threads);
113
114        let row_index = self.row_index.as_ref().map(|ri| ri as &RowIndex);
115        let (mut dfs, prepredicate_heights) = POOL.install(|| {
116            file_chunks
117                .into_par_iter()
118                .map(|(start_pos, stop_at_nbytes)| {
119                    let mut local_df = parse_ndjson(
120                        &bytes[start_pos..stop_at_nbytes],
121                        Some(capacity),
122                        &self.schema,
123                        self.ignore_errors,
124                    )?;
125
126                    let prepredicate_height = local_df.height() as IdxSize;
127                    if let Some(projection) = self.projection.as_deref() {
128                        local_df = local_df.select(projection.iter().cloned())?;
129                    }
130
131                    if let Some(row_index) = row_index {
132                        local_df = local_df
133                            .with_row_index(row_index.name.clone(), Some(row_index.offset))?;
134                    }
135
136                    if let Some(predicate) = &self.predicate {
137                        let s = predicate.evaluate_io(&local_df)?;
138                        let mask = s.bool()?;
139                        local_df = local_df.filter(mask)?;
140                    }
141
142                    Ok((local_df, prepredicate_height))
143                })
144                .collect::<PolarsResult<(Vec<_>, Vec<_>)>>()
145        })?;
146
147        if let Some(ref mut row_index) = self.row_index {
148            update_row_counts3(&mut dfs, &prepredicate_heights, 0);
149            row_index.offset += prepredicate_heights.iter().copied().sum::<IdxSize>();
150        }
151
152        accumulate_dataframes_vertical(dfs)
153    }
154
155    pub fn as_df(&mut self) -> PolarsResult<DataFrame> {
156        let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
157
158        let reader_bytes = self.reader_bytes.take().unwrap();
159
160        let mut df = self.parse_json(n_threads, &reader_bytes)?;
161
162        // if multi-threaded the n_rows was probabilistically determined.
163        // Let's slice to correct number of rows if possible.
164        if let Some(n_rows) = self.n_rows {
165            if n_rows < df.height() {
166                df = df.slice(0, n_rows)
167            }
168        }
169        Ok(df)
170    }
171}
172
173#[inline(always)]
174fn parse_impl(
175    bytes: &[u8],
176    buffers: &mut PlIndexMap<BufferKey, Buffer>,
177    scratch: &mut Scratch,
178    ignore_errors: bool,
179) -> PolarsResult<usize> {
180    scratch.json.clear();
181    scratch.json.extend_from_slice(bytes);
182    let n = scratch.json.len();
183    let value = simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers)
184        .map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
185    match value {
186        simd_json::BorrowedValue::Object(value) => {
187            buffers.iter_mut().try_for_each(|(s, inner)| {
188                match s.0.map_lookup(&value) {
189                    Some(v) => inner.add(v)?,
190                    None => inner.add_null(),
191                }
192                PolarsResult::Ok(())
193            })?;
194        },
195        _ if ignore_errors => {
196            buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
197        },
198        v => {
199            polars_bail!(ComputeError: "NDJSON line expected to contain JSON object: {v}");
200        },
201    };
202    Ok(n)
203}
204
205#[derive(Default)]
206struct Scratch {
207    json: Vec<u8>,
208    buffers: simd_json::Buffers,
209}
210
211pub fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
212    // This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them.
213    // However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all
214    // things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more
215    // ambitious goal: it wants to parse potentially *non-delimited* sequences of JSON values, while we know
216    // our values are line-delimited. Turns out, custom splitting is very easy, and gives a very nice performance boost.
217    bytes
218        .split(|&byte| byte == b'\n')
219        .filter(|bytes| is_json_line(bytes))
220}
221
222#[inline]
223pub fn is_json_line(bytes: &[u8]) -> bool {
224    bytes
225        .iter()
226        .any(|byte| !matches!(*byte, b' ' | b'\t' | b'\r'))
227}
228
229fn parse_lines(
230    bytes: &[u8],
231    buffers: &mut PlIndexMap<BufferKey, Buffer>,
232    ignore_errors: bool,
233) -> PolarsResult<()> {
234    let mut scratch = Scratch::default();
235
236    let iter = json_lines(bytes);
237    for bytes in iter {
238        parse_impl(bytes, buffers, &mut scratch, ignore_errors)?;
239    }
240    Ok(())
241}
242
243pub fn parse_ndjson(
244    bytes: &[u8],
245    n_rows_hint: Option<usize>,
246    schema: &Schema,
247    ignore_errors: bool,
248) -> PolarsResult<DataFrame> {
249    let capacity = n_rows_hint.unwrap_or_else(|| estimate_n_lines_in_chunk(bytes));
250
251    let mut buffers = init_buffers(schema, capacity, ignore_errors)?;
252    parse_lines(bytes, &mut buffers, ignore_errors)?;
253
254    DataFrame::new_infer_height(
255        buffers
256            .into_values()
257            .map(|buf| Ok(buf.into_series()?.into_column()))
258            .collect::<PolarsResult<_>>()
259            .map_err(|e| match e {
260                // Nested types raise SchemaMismatch instead of ComputeError, we map it back here to
261                // be consistent.
262                PolarsError::ComputeError(..) => e,
263                PolarsError::SchemaMismatch(e) => PolarsError::ComputeError(e),
264                e => e,
265            })?,
266    )
267}
268
269pub fn estimate_n_lines_in_file(file_bytes: &[u8], sample_size: usize) -> usize {
270    if let Some((mean, std)) = get_line_stats_json(file_bytes, sample_size) {
271        (file_bytes.len() as f32 / (mean - 0.01 * std)) as usize
272    } else {
273        estimate_n_lines_in_chunk(file_bytes)
274    }
275}
276
277/// Total len divided by max len of first and last non-empty lines. This is intended to be cheaper
278/// than `estimate_n_lines_in_file`.
279pub fn estimate_n_lines_in_chunk(chunk: &[u8]) -> usize {
280    chunk
281        .split(|&c| c == b'\n')
282        .find(|x| !x.is_empty())
283        .map_or(1, |x| {
284            chunk.len().div_ceil(
285                x.len().max(
286                    chunk
287                        .rsplit(|&c| c == b'\n')
288                        .find(|x| !x.is_empty())
289                        .unwrap()
290                        .len(),
291                ),
292            )
293        })
294}
295
296/// Find the nearest next line position.
297/// Does not check for new line characters embedded in String fields.
298/// This just looks for `}\n`
299pub(crate) fn next_line_position_naive_json(input: &[u8]) -> Option<usize> {
300    let pos = memchr::memchr(NEWLINE, input)?;
301    if pos == 0 {
302        return Some(1);
303    }
304
305    let is_closing_bracket = input.get(pos - 1) == Some(&CLOSING_BRACKET);
306    if is_closing_bracket {
307        Some(pos + 1)
308    } else {
309        None
310    }
311}
312
313/// Get the mean and standard deviation of length of lines in bytes
314pub(crate) fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
315    let mut lengths = Vec::with_capacity(n_lines);
316
317    let mut bytes_trunc;
318    let n_lines_per_iter = n_lines / 2;
319
320    let mut n_read = 0;
321
322    let bytes_len = bytes.len();
323
324    // sample from start and 75% in the file
325    for offset in [0, (bytes_len as f32 * 0.75) as usize] {
326        bytes_trunc = &bytes[offset..];
327        let pos = next_line_position_naive_json(bytes_trunc)?;
328        if pos >= bytes_len {
329            return None;
330        }
331        bytes_trunc = &bytes_trunc[pos + 1..];
332
333        for _ in offset..(offset + n_lines_per_iter) {
334            let pos = next_line_position_naive_json(bytes_trunc);
335            if let Some(pos) = pos {
336                lengths.push(pos);
337                let next_bytes = &bytes_trunc[pos..];
338                if next_bytes.is_empty() {
339                    return None;
340                }
341                bytes_trunc = next_bytes;
342                n_read += pos;
343            } else {
344                break;
345            }
346        }
347    }
348
349    let n_samples = lengths.len();
350    let mean = (n_read as f32) / (n_samples as f32);
351    let mut std = 0.0;
352    for &len in lengths.iter() {
353        std += (len as f32 - mean).pow(2.0)
354    }
355    std = (std / n_samples as f32).sqrt();
356    Some((mean, std))
357}
358
359pub(crate) fn get_file_chunks_json(bytes: &[u8], n_threads: usize) -> Vec<(usize, usize)> {
360    let mut last_pos = 0;
361    let total_len = bytes.len();
362    let chunk_size = total_len / n_threads;
363    let mut offsets = Vec::with_capacity(n_threads);
364    for _ in 0..n_threads {
365        let search_pos = last_pos + chunk_size;
366
367        if search_pos >= bytes.len() {
368            break;
369        }
370
371        let end_pos = match next_line_position_naive_json(&bytes[search_pos..]) {
372            Some(pos) => search_pos + pos,
373            None => {
374                break;
375            },
376        };
377        offsets.push((last_pos, end_pos));
378        last_pos = end_pos;
379    }
380    offsets.push((last_pos, total_len));
381    offsets
382}