polars_io/ndjson/
core.rs

1use std::fs::File;
2use std::io::Cursor;
3use std::num::NonZeroUsize;
4use std::path::PathBuf;
5
6pub use arrow::array::StructArray;
7use num_traits::pow::Pow;
8use polars_core::POOL;
9use polars_core::prelude::*;
10use polars_core::utils::accumulate_dataframes_vertical;
11use rayon::prelude::*;
12
13use crate::mmap::{MmapBytesReader, ReaderBytes};
14use crate::ndjson::buffer::*;
15use crate::predicates::PhysicalIoExpr;
16use crate::prelude::*;
17use crate::{RowIndex, SerReader};
18const NEWLINE: u8 = b'\n';
19const CLOSING_BRACKET: u8 = b'}';
20
21#[must_use]
22pub struct JsonLineReader<'a, R>
23where
24    R: MmapBytesReader,
25{
26    reader: R,
27    rechunk: bool,
28    n_rows: Option<usize>,
29    n_threads: Option<usize>,
30    infer_schema_len: Option<NonZeroUsize>,
31    chunk_size: NonZeroUsize,
32    schema: Option<SchemaRef>,
33    schema_overwrite: Option<&'a Schema>,
34    path: Option<PathBuf>,
35    low_memory: bool,
36    ignore_errors: bool,
37    row_index: Option<&'a mut RowIndex>,
38    predicate: Option<Arc<dyn PhysicalIoExpr>>,
39    projection: Option<Arc<[PlSmallStr]>>,
40}
41
42impl<'a, R> JsonLineReader<'a, R>
43where
44    R: 'a + MmapBytesReader,
45{
46    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
47        self.n_rows = num_rows;
48        self
49    }
50    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
51        self.schema = Some(schema);
52        self
53    }
54
55    pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
56        self.schema_overwrite = Some(schema);
57        self
58    }
59
60    pub fn with_rechunk(mut self, rechunk: bool) -> Self {
61        self.rechunk = rechunk;
62        self
63    }
64
65    pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
66        self.predicate = predicate;
67        self
68    }
69
70    pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
71        self.projection = projection;
72        self
73    }
74
75    pub fn with_row_index(mut self, row_index: Option<&'a mut RowIndex>) -> Self {
76        self.row_index = row_index;
77        self
78    }
79
80    pub fn infer_schema_len(mut self, infer_schema_len: Option<NonZeroUsize>) -> Self {
81        self.infer_schema_len = infer_schema_len;
82        self
83    }
84
85    pub fn with_n_threads(mut self, n: Option<usize>) -> Self {
86        self.n_threads = n;
87        self
88    }
89
90    pub fn with_path<P: Into<PathBuf>>(mut self, path: Option<P>) -> Self {
91        self.path = path.map(|p| p.into());
92        self
93    }
94    /// Sets the chunk size used by the parser. This influences performance
95    pub fn with_chunk_size(mut self, chunk_size: Option<NonZeroUsize>) -> Self {
96        if let Some(chunk_size) = chunk_size {
97            self.chunk_size = chunk_size;
98        };
99
100        self
101    }
102    /// Reduce memory consumption at the expense of performance
103    pub fn low_memory(mut self, toggle: bool) -> Self {
104        self.low_memory = toggle;
105        self
106    }
107
108    /// Set values as `Null` if parsing fails because of schema mismatches.
109    pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
110        self.ignore_errors = ignore_errors;
111        self
112    }
113
114    pub fn count(mut self) -> PolarsResult<usize> {
115        let reader_bytes = get_reader_bytes(&mut self.reader)?;
116        let json_reader = CoreJsonReader::new(
117            reader_bytes,
118            self.n_rows,
119            self.schema,
120            self.schema_overwrite,
121            self.n_threads,
122            1024, // sample size
123            self.chunk_size,
124            self.low_memory,
125            self.infer_schema_len,
126            self.ignore_errors,
127            self.row_index,
128            self.predicate,
129            self.projection,
130        )?;
131
132        json_reader.count()
133    }
134}
135
136impl JsonLineReader<'_, File> {
137    /// This is the recommended way to create a json reader as this allows for fastest parsing.
138    pub fn from_path<P: Into<PathBuf>>(path: P) -> PolarsResult<Self> {
139        let path = crate::resolve_homedir(&path.into());
140        let f = polars_utils::open_file(&path)?;
141        Ok(Self::new(f).with_path(Some(path)))
142    }
143}
144impl<R> SerReader<R> for JsonLineReader<'_, R>
145where
146    R: MmapBytesReader,
147{
148    /// Create a new JsonLineReader from a file/ stream
149    fn new(reader: R) -> Self {
150        JsonLineReader {
151            reader,
152            rechunk: true,
153            n_rows: None,
154            n_threads: None,
155            infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
156            schema: None,
157            schema_overwrite: None,
158            path: None,
159            chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
160            low_memory: false,
161            ignore_errors: false,
162            row_index: None,
163            predicate: None,
164            projection: None,
165        }
166    }
167    fn finish(mut self) -> PolarsResult<DataFrame> {
168        let rechunk = self.rechunk;
169        let reader_bytes = get_reader_bytes(&mut self.reader)?;
170        let mut json_reader = CoreJsonReader::new(
171            reader_bytes,
172            self.n_rows,
173            self.schema,
174            self.schema_overwrite,
175            self.n_threads,
176            1024, // sample size
177            self.chunk_size,
178            self.low_memory,
179            self.infer_schema_len,
180            self.ignore_errors,
181            self.row_index,
182            self.predicate,
183            self.projection,
184        )?;
185
186        let mut df: DataFrame = json_reader.as_df()?;
187        if rechunk && df.first_col_n_chunks() > 1 {
188            df.as_single_chunk_par();
189        }
190        Ok(df)
191    }
192}
193
194pub(crate) struct CoreJsonReader<'a> {
195    reader_bytes: Option<ReaderBytes<'a>>,
196    n_rows: Option<usize>,
197    schema: SchemaRef,
198    n_threads: Option<usize>,
199    sample_size: usize,
200    chunk_size: NonZeroUsize,
201    low_memory: bool,
202    ignore_errors: bool,
203    row_index: Option<&'a mut RowIndex>,
204    predicate: Option<Arc<dyn PhysicalIoExpr>>,
205    projection: Option<Arc<[PlSmallStr]>>,
206}
207impl<'a> CoreJsonReader<'a> {
208    #[allow(clippy::too_many_arguments)]
209    pub(crate) fn new(
210        reader_bytes: ReaderBytes<'a>,
211        n_rows: Option<usize>,
212        schema: Option<SchemaRef>,
213        schema_overwrite: Option<&Schema>,
214        n_threads: Option<usize>,
215        sample_size: usize,
216        chunk_size: NonZeroUsize,
217        low_memory: bool,
218        infer_schema_len: Option<NonZeroUsize>,
219        ignore_errors: bool,
220        row_index: Option<&'a mut RowIndex>,
221        predicate: Option<Arc<dyn PhysicalIoExpr>>,
222        projection: Option<Arc<[PlSmallStr]>>,
223    ) -> PolarsResult<CoreJsonReader<'a>> {
224        let reader_bytes = reader_bytes;
225
226        let mut schema = match schema {
227            Some(schema) => schema,
228            None => {
229                let bytes: &[u8] = &reader_bytes;
230                let mut cursor = Cursor::new(bytes);
231                Arc::new(crate::ndjson::infer_schema(&mut cursor, infer_schema_len)?)
232            },
233        };
234        if let Some(overwriting_schema) = schema_overwrite {
235            let schema = Arc::make_mut(&mut schema);
236            overwrite_schema(schema, overwriting_schema)?;
237        }
238
239        Ok(CoreJsonReader {
240            reader_bytes: Some(reader_bytes),
241            schema,
242            sample_size,
243            n_rows,
244            n_threads,
245            chunk_size,
246            low_memory,
247            ignore_errors,
248            row_index,
249            predicate,
250            projection,
251        })
252    }
253
254    fn count(mut self) -> PolarsResult<usize> {
255        let bytes = self.reader_bytes.take().unwrap();
256        Ok(super::count_rows_par(&bytes, self.n_threads))
257    }
258
259    fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult<DataFrame> {
260        let mut bytes = bytes;
261        let mut total_rows = 128;
262
263        if let Some((mean, std)) = get_line_stats_json(bytes, self.sample_size) {
264            let line_length_upper_bound = mean + 1.1 * std;
265
266            total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
267            if let Some(n_rows) = self.n_rows {
268                total_rows = std::cmp::min(n_rows, total_rows);
269                // the guessed upper bound of  the no. of bytes in the file
270                let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;
271
272                if n_bytes < bytes.len() {
273                    if let Some(pos) = next_line_position_naive_json(&bytes[n_bytes..]) {
274                        bytes = &bytes[..n_bytes + pos]
275                    }
276                }
277            }
278        }
279
280        if total_rows <= 128 {
281            n_threads = 1;
282        }
283
284        let rows_per_thread = total_rows / n_threads;
285
286        let max_proxy = bytes.len() / n_threads / 2;
287        let capacity = if self.low_memory {
288            usize::from(self.chunk_size)
289        } else {
290            std::cmp::min(rows_per_thread, max_proxy)
291        };
292        let file_chunks = get_file_chunks_json(bytes, n_threads);
293
294        let row_index = self.row_index.as_ref().map(|ri| ri as &RowIndex);
295        let (mut dfs, prepredicate_heights) = POOL.install(|| {
296            file_chunks
297                .into_par_iter()
298                .map(|(start_pos, stop_at_nbytes)| {
299                    let mut local_df = parse_ndjson(
300                        &bytes[start_pos..stop_at_nbytes],
301                        Some(capacity),
302                        &self.schema,
303                        self.ignore_errors,
304                    )?;
305
306                    let prepredicate_height = local_df.height() as IdxSize;
307                    if let Some(projection) = self.projection.as_deref() {
308                        local_df = local_df.select(projection.iter().cloned())?;
309                    }
310
311                    if let Some(row_index) = row_index {
312                        local_df = local_df
313                            .with_row_index(row_index.name.clone(), Some(row_index.offset))?;
314                    }
315
316                    if let Some(predicate) = &self.predicate {
317                        let s = predicate.evaluate_io(&local_df)?;
318                        let mask = s.bool()?;
319                        local_df = local_df.filter(mask)?;
320                    }
321
322                    Ok((local_df, prepredicate_height))
323                })
324                .collect::<PolarsResult<(Vec<_>, Vec<_>)>>()
325        })?;
326
327        if let Some(ref mut row_index) = self.row_index {
328            update_row_counts3(&mut dfs, &prepredicate_heights, 0);
329            row_index.offset += prepredicate_heights.iter().copied().sum::<IdxSize>();
330        }
331
332        accumulate_dataframes_vertical(dfs)
333    }
334
335    pub fn as_df(&mut self) -> PolarsResult<DataFrame> {
336        let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
337
338        let reader_bytes = self.reader_bytes.take().unwrap();
339
340        let mut df = self.parse_json(n_threads, &reader_bytes)?;
341
342        // if multi-threaded the n_rows was probabilistically determined.
343        // Let's slice to correct number of rows if possible.
344        if let Some(n_rows) = self.n_rows {
345            if n_rows < df.height() {
346                df = df.slice(0, n_rows)
347            }
348        }
349        Ok(df)
350    }
351}
352
353#[inline(always)]
354fn parse_impl(
355    bytes: &[u8],
356    buffers: &mut PlIndexMap<BufferKey, Buffer>,
357    scratch: &mut Scratch,
358) -> PolarsResult<usize> {
359    scratch.json.clear();
360    scratch.json.extend_from_slice(bytes);
361    let n = scratch.json.len();
362    let value = simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers)
363        .map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
364    match value {
365        simd_json::BorrowedValue::Object(value) => {
366            buffers.iter_mut().try_for_each(|(s, inner)| {
367                match s.0.map_lookup(&value) {
368                    Some(v) => inner.add(v)?,
369                    None => inner.add_null(),
370                }
371                PolarsResult::Ok(())
372            })?;
373        },
374        _ => {
375            buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
376        },
377    };
378    Ok(n)
379}
380
381#[derive(Default)]
382struct Scratch {
383    json: Vec<u8>,
384    buffers: simd_json::Buffers,
385}
386
387pub fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
388    // This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them.
389    // However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all
390    // things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more
391    // ambitious goal: it wants to parse potentially *non-delimited* sequences of JSON values, while we know
392    // our values are line-delimited. Turns out, custom splitting is very easy, and gives a very nice performance boost.
393    bytes.split(|&byte| byte == b'\n').filter(|&bytes| {
394        bytes
395            .iter()
396            .any(|&byte| !matches!(byte, b' ' | b'\t' | b'\r'))
397    })
398}
399
400fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap<BufferKey, Buffer>) -> PolarsResult<()> {
401    let mut scratch = Scratch::default();
402
403    let iter = json_lines(bytes);
404    for bytes in iter {
405        parse_impl(bytes, buffers, &mut scratch)?;
406    }
407    Ok(())
408}
409
410pub fn parse_ndjson(
411    bytes: &[u8],
412    n_rows_hint: Option<usize>,
413    schema: &Schema,
414    ignore_errors: bool,
415) -> PolarsResult<DataFrame> {
416    let capacity = n_rows_hint.unwrap_or_else(|| estimate_n_lines_in_chunk(bytes));
417
418    let mut buffers = init_buffers(schema, capacity, ignore_errors)?;
419    parse_lines(bytes, &mut buffers)?;
420
421    DataFrame::new(
422        buffers
423            .into_values()
424            .map(|buf| Ok(buf.into_series()?.into_column()))
425            .collect::<PolarsResult<_>>()
426            .map_err(|e| match e {
427                // Nested types raise SchemaMismatch instead of ComputeError, we map it back here to
428                // be consistent.
429                PolarsError::ComputeError(..) => e,
430                PolarsError::SchemaMismatch(e) => PolarsError::ComputeError(e),
431                e => e,
432            })?,
433    )
434}
435
436pub fn estimate_n_lines_in_file(file_bytes: &[u8], sample_size: usize) -> usize {
437    if let Some((mean, std)) = get_line_stats_json(file_bytes, sample_size) {
438        (file_bytes.len() as f32 / (mean - 0.01 * std)) as usize
439    } else {
440        estimate_n_lines_in_chunk(file_bytes)
441    }
442}
443
444/// Total len divided by max len of first and last non-empty lines. This is intended to be cheaper
445/// than `estimate_n_lines_in_file`.
446pub fn estimate_n_lines_in_chunk(chunk: &[u8]) -> usize {
447    chunk
448        .split(|&c| c == b'\n')
449        .find(|x| !x.is_empty())
450        .map_or(1, |x| {
451            chunk.len().div_ceil(
452                x.len().max(
453                    chunk
454                        .rsplit(|&c| c == b'\n')
455                        .find(|x| !x.is_empty())
456                        .unwrap()
457                        .len(),
458                ),
459            )
460        })
461}
462
463/// Find the nearest next line position.
464/// Does not check for new line characters embedded in String fields.
465/// This just looks for `}\n`
466pub(crate) fn next_line_position_naive_json(input: &[u8]) -> Option<usize> {
467    let pos = memchr::memchr(NEWLINE, input)?;
468    if pos == 0 {
469        return Some(1);
470    }
471
472    let is_closing_bracket = input.get(pos - 1) == Some(&CLOSING_BRACKET);
473    if is_closing_bracket {
474        Some(pos + 1)
475    } else {
476        None
477    }
478}
479
480/// Get the mean and standard deviation of length of lines in bytes
481pub(crate) fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
482    let mut lengths = Vec::with_capacity(n_lines);
483
484    let mut bytes_trunc;
485    let n_lines_per_iter = n_lines / 2;
486
487    let mut n_read = 0;
488
489    let bytes_len = bytes.len();
490
491    // sample from start and 75% in the file
492    for offset in [0, (bytes_len as f32 * 0.75) as usize] {
493        bytes_trunc = &bytes[offset..];
494        let pos = next_line_position_naive_json(bytes_trunc)?;
495        if pos >= bytes_len {
496            return None;
497        }
498        bytes_trunc = &bytes_trunc[pos + 1..];
499
500        for _ in offset..(offset + n_lines_per_iter) {
501            let pos = next_line_position_naive_json(bytes_trunc);
502            if let Some(pos) = pos {
503                lengths.push(pos);
504                let next_bytes = &bytes_trunc[pos..];
505                if next_bytes.is_empty() {
506                    return None;
507                }
508                bytes_trunc = next_bytes;
509                n_read += pos;
510            } else {
511                break;
512            }
513        }
514    }
515
516    let n_samples = lengths.len();
517    let mean = (n_read as f32) / (n_samples as f32);
518    let mut std = 0.0;
519    for &len in lengths.iter() {
520        std += (len as f32 - mean).pow(2.0)
521    }
522    std = (std / n_samples as f32).sqrt();
523    Some((mean, std))
524}
525
526pub(crate) fn get_file_chunks_json(bytes: &[u8], n_threads: usize) -> Vec<(usize, usize)> {
527    let mut last_pos = 0;
528    let total_len = bytes.len();
529    let chunk_size = total_len / n_threads;
530    let mut offsets = Vec::with_capacity(n_threads);
531    for _ in 0..n_threads {
532        let search_pos = last_pos + chunk_size;
533
534        if search_pos >= bytes.len() {
535            break;
536        }
537
538        let end_pos = match next_line_position_naive_json(&bytes[search_pos..]) {
539            Some(pos) => search_pos + pos,
540            None => {
541                break;
542            },
543        };
544        offsets.push((last_pos, end_pos));
545        last_pos = end_pos;
546    }
547    offsets.push((last_pos, total_len));
548    offsets
549}