1use std::fs::File;
2use std::io::Cursor;
3use std::num::NonZeroUsize;
4use std::path::PathBuf;
5
6pub use arrow::array::StructArray;
7use num_traits::pow::Pow;
8use polars_core::POOL;
9use polars_core::prelude::*;
10use polars_core::utils::accumulate_dataframes_vertical;
11use rayon::prelude::*;
12
13use crate::mmap::{MmapBytesReader, ReaderBytes};
14use crate::ndjson::buffer::*;
15use crate::predicates::PhysicalIoExpr;
16use crate::prelude::*;
17use crate::{RowIndex, SerReader};
18const NEWLINE: u8 = b'\n';
19const CLOSING_BRACKET: u8 = b'}';
20
21#[must_use]
22pub struct JsonLineReader<'a, R>
23where
24 R: MmapBytesReader,
25{
26 reader: R,
27 rechunk: bool,
28 n_rows: Option<usize>,
29 n_threads: Option<usize>,
30 infer_schema_len: Option<NonZeroUsize>,
31 chunk_size: NonZeroUsize,
32 schema: Option<SchemaRef>,
33 schema_overwrite: Option<&'a Schema>,
34 path: Option<PathBuf>,
35 low_memory: bool,
36 ignore_errors: bool,
37 row_index: Option<&'a mut RowIndex>,
38 predicate: Option<Arc<dyn PhysicalIoExpr>>,
39 projection: Option<Arc<[PlSmallStr]>>,
40}
41
42impl<'a, R> JsonLineReader<'a, R>
43where
44 R: 'a + MmapBytesReader,
45{
46 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
47 self.n_rows = num_rows;
48 self
49 }
50 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
51 self.schema = Some(schema);
52 self
53 }
54
55 pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
56 self.schema_overwrite = Some(schema);
57 self
58 }
59
60 pub fn with_rechunk(mut self, rechunk: bool) -> Self {
61 self.rechunk = rechunk;
62 self
63 }
64
65 pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
66 self.predicate = predicate;
67 self
68 }
69
70 pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
71 self.projection = projection;
72 self
73 }
74
75 pub fn with_row_index(mut self, row_index: Option<&'a mut RowIndex>) -> Self {
76 self.row_index = row_index;
77 self
78 }
79
80 pub fn infer_schema_len(mut self, infer_schema_len: Option<NonZeroUsize>) -> Self {
81 self.infer_schema_len = infer_schema_len;
82 self
83 }
84
85 pub fn with_n_threads(mut self, n: Option<usize>) -> Self {
86 self.n_threads = n;
87 self
88 }
89
90 pub fn with_path<P: Into<PathBuf>>(mut self, path: Option<P>) -> Self {
91 self.path = path.map(|p| p.into());
92 self
93 }
94 pub fn with_chunk_size(mut self, chunk_size: Option<NonZeroUsize>) -> Self {
96 if let Some(chunk_size) = chunk_size {
97 self.chunk_size = chunk_size;
98 };
99
100 self
101 }
102 pub fn low_memory(mut self, toggle: bool) -> Self {
104 self.low_memory = toggle;
105 self
106 }
107
108 pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
110 self.ignore_errors = ignore_errors;
111 self
112 }
113
114 pub fn count(mut self) -> PolarsResult<usize> {
115 let reader_bytes = get_reader_bytes(&mut self.reader)?;
116 let json_reader = CoreJsonReader::new(
117 reader_bytes,
118 self.n_rows,
119 self.schema,
120 self.schema_overwrite,
121 self.n_threads,
122 1024, self.chunk_size,
124 self.low_memory,
125 self.infer_schema_len,
126 self.ignore_errors,
127 self.row_index,
128 self.predicate,
129 self.projection,
130 )?;
131
132 json_reader.count()
133 }
134}
135
136impl JsonLineReader<'_, File> {
137 pub fn from_path<P: Into<PathBuf>>(path: P) -> PolarsResult<Self> {
139 let path = crate::resolve_homedir(&path.into());
140 let f = polars_utils::open_file(&path)?;
141 Ok(Self::new(f).with_path(Some(path)))
142 }
143}
144impl<R> SerReader<R> for JsonLineReader<'_, R>
145where
146 R: MmapBytesReader,
147{
148 fn new(reader: R) -> Self {
150 JsonLineReader {
151 reader,
152 rechunk: true,
153 n_rows: None,
154 n_threads: None,
155 infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
156 schema: None,
157 schema_overwrite: None,
158 path: None,
159 chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
160 low_memory: false,
161 ignore_errors: false,
162 row_index: None,
163 predicate: None,
164 projection: None,
165 }
166 }
167 fn finish(mut self) -> PolarsResult<DataFrame> {
168 let rechunk = self.rechunk;
169 let reader_bytes = get_reader_bytes(&mut self.reader)?;
170 let mut json_reader = CoreJsonReader::new(
171 reader_bytes,
172 self.n_rows,
173 self.schema,
174 self.schema_overwrite,
175 self.n_threads,
176 1024, self.chunk_size,
178 self.low_memory,
179 self.infer_schema_len,
180 self.ignore_errors,
181 self.row_index,
182 self.predicate,
183 self.projection,
184 )?;
185
186 let mut df: DataFrame = json_reader.as_df()?;
187 if rechunk && df.first_col_n_chunks() > 1 {
188 df.as_single_chunk_par();
189 }
190 Ok(df)
191 }
192}
193
194pub(crate) struct CoreJsonReader<'a> {
195 reader_bytes: Option<ReaderBytes<'a>>,
196 n_rows: Option<usize>,
197 schema: SchemaRef,
198 n_threads: Option<usize>,
199 sample_size: usize,
200 chunk_size: NonZeroUsize,
201 low_memory: bool,
202 ignore_errors: bool,
203 row_index: Option<&'a mut RowIndex>,
204 predicate: Option<Arc<dyn PhysicalIoExpr>>,
205 projection: Option<Arc<[PlSmallStr]>>,
206}
207impl<'a> CoreJsonReader<'a> {
208 #[allow(clippy::too_many_arguments)]
209 pub(crate) fn new(
210 reader_bytes: ReaderBytes<'a>,
211 n_rows: Option<usize>,
212 schema: Option<SchemaRef>,
213 schema_overwrite: Option<&Schema>,
214 n_threads: Option<usize>,
215 sample_size: usize,
216 chunk_size: NonZeroUsize,
217 low_memory: bool,
218 infer_schema_len: Option<NonZeroUsize>,
219 ignore_errors: bool,
220 row_index: Option<&'a mut RowIndex>,
221 predicate: Option<Arc<dyn PhysicalIoExpr>>,
222 projection: Option<Arc<[PlSmallStr]>>,
223 ) -> PolarsResult<CoreJsonReader<'a>> {
224 let reader_bytes = reader_bytes;
225
226 let mut schema = match schema {
227 Some(schema) => schema,
228 None => {
229 let bytes: &[u8] = &reader_bytes;
230 let mut cursor = Cursor::new(bytes);
231 Arc::new(crate::ndjson::infer_schema(&mut cursor, infer_schema_len)?)
232 },
233 };
234 if let Some(overwriting_schema) = schema_overwrite {
235 let schema = Arc::make_mut(&mut schema);
236 overwrite_schema(schema, overwriting_schema)?;
237 }
238
239 Ok(CoreJsonReader {
240 reader_bytes: Some(reader_bytes),
241 schema,
242 sample_size,
243 n_rows,
244 n_threads,
245 chunk_size,
246 low_memory,
247 ignore_errors,
248 row_index,
249 predicate,
250 projection,
251 })
252 }
253
254 fn count(mut self) -> PolarsResult<usize> {
255 let bytes = self.reader_bytes.take().unwrap();
256 Ok(super::count_rows_par(&bytes, self.n_threads))
257 }
258
259 fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult<DataFrame> {
260 let mut bytes = bytes;
261 let mut total_rows = 128;
262
263 if let Some((mean, std)) = get_line_stats_json(bytes, self.sample_size) {
264 let line_length_upper_bound = mean + 1.1 * std;
265
266 total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
267 if let Some(n_rows) = self.n_rows {
268 total_rows = std::cmp::min(n_rows, total_rows);
269 let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;
271
272 if n_bytes < bytes.len() {
273 if let Some(pos) = next_line_position_naive_json(&bytes[n_bytes..]) {
274 bytes = &bytes[..n_bytes + pos]
275 }
276 }
277 }
278 }
279
280 if total_rows <= 128 {
281 n_threads = 1;
282 }
283
284 let rows_per_thread = total_rows / n_threads;
285
286 let max_proxy = bytes.len() / n_threads / 2;
287 let capacity = if self.low_memory {
288 usize::from(self.chunk_size)
289 } else {
290 std::cmp::min(rows_per_thread, max_proxy)
291 };
292 let file_chunks = get_file_chunks_json(bytes, n_threads);
293
294 let row_index = self.row_index.as_ref().map(|ri| ri as &RowIndex);
295 let (mut dfs, prepredicate_heights) = POOL.install(|| {
296 file_chunks
297 .into_par_iter()
298 .map(|(start_pos, stop_at_nbytes)| {
299 let mut local_df = parse_ndjson(
300 &bytes[start_pos..stop_at_nbytes],
301 Some(capacity),
302 &self.schema,
303 self.ignore_errors,
304 )?;
305
306 let prepredicate_height = local_df.height() as IdxSize;
307 if let Some(projection) = self.projection.as_deref() {
308 local_df = local_df.select(projection.iter().cloned())?;
309 }
310
311 if let Some(row_index) = row_index {
312 local_df = local_df
313 .with_row_index(row_index.name.clone(), Some(row_index.offset))?;
314 }
315
316 if let Some(predicate) = &self.predicate {
317 let s = predicate.evaluate_io(&local_df)?;
318 let mask = s.bool()?;
319 local_df = local_df.filter(mask)?;
320 }
321
322 Ok((local_df, prepredicate_height))
323 })
324 .collect::<PolarsResult<(Vec<_>, Vec<_>)>>()
325 })?;
326
327 if let Some(ref mut row_index) = self.row_index {
328 update_row_counts3(&mut dfs, &prepredicate_heights, 0);
329 row_index.offset += prepredicate_heights.iter().copied().sum::<IdxSize>();
330 }
331
332 accumulate_dataframes_vertical(dfs)
333 }
334
335 pub fn as_df(&mut self) -> PolarsResult<DataFrame> {
336 let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
337
338 let reader_bytes = self.reader_bytes.take().unwrap();
339
340 let mut df = self.parse_json(n_threads, &reader_bytes)?;
341
342 if let Some(n_rows) = self.n_rows {
345 if n_rows < df.height() {
346 df = df.slice(0, n_rows)
347 }
348 }
349 Ok(df)
350 }
351}
352
353#[inline(always)]
354fn parse_impl(
355 bytes: &[u8],
356 buffers: &mut PlIndexMap<BufferKey, Buffer>,
357 scratch: &mut Scratch,
358) -> PolarsResult<usize> {
359 scratch.json.clear();
360 scratch.json.extend_from_slice(bytes);
361 let n = scratch.json.len();
362 let value = simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers)
363 .map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
364 match value {
365 simd_json::BorrowedValue::Object(value) => {
366 buffers.iter_mut().try_for_each(|(s, inner)| {
367 match s.0.map_lookup(&value) {
368 Some(v) => inner.add(v)?,
369 None => inner.add_null(),
370 }
371 PolarsResult::Ok(())
372 })?;
373 },
374 _ => {
375 buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
376 },
377 };
378 Ok(n)
379}
380
381#[derive(Default)]
382struct Scratch {
383 json: Vec<u8>,
384 buffers: simd_json::Buffers,
385}
386
387pub fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
388 bytes.split(|&byte| byte == b'\n').filter(|&bytes| {
394 bytes
395 .iter()
396 .any(|&byte| !matches!(byte, b' ' | b'\t' | b'\r'))
397 })
398}
399
400fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap<BufferKey, Buffer>) -> PolarsResult<()> {
401 let mut scratch = Scratch::default();
402
403 let iter = json_lines(bytes);
404 for bytes in iter {
405 parse_impl(bytes, buffers, &mut scratch)?;
406 }
407 Ok(())
408}
409
410pub fn parse_ndjson(
411 bytes: &[u8],
412 n_rows_hint: Option<usize>,
413 schema: &Schema,
414 ignore_errors: bool,
415) -> PolarsResult<DataFrame> {
416 let capacity = n_rows_hint.unwrap_or_else(|| estimate_n_lines_in_chunk(bytes));
417
418 let mut buffers = init_buffers(schema, capacity, ignore_errors)?;
419 parse_lines(bytes, &mut buffers)?;
420
421 DataFrame::new(
422 buffers
423 .into_values()
424 .map(|buf| Ok(buf.into_series()?.into_column()))
425 .collect::<PolarsResult<_>>()
426 .map_err(|e| match e {
427 PolarsError::ComputeError(..) => e,
430 PolarsError::SchemaMismatch(e) => PolarsError::ComputeError(e),
431 e => e,
432 })?,
433 )
434}
435
436pub fn estimate_n_lines_in_file(file_bytes: &[u8], sample_size: usize) -> usize {
437 if let Some((mean, std)) = get_line_stats_json(file_bytes, sample_size) {
438 (file_bytes.len() as f32 / (mean - 0.01 * std)) as usize
439 } else {
440 estimate_n_lines_in_chunk(file_bytes)
441 }
442}
443
444pub fn estimate_n_lines_in_chunk(chunk: &[u8]) -> usize {
447 chunk
448 .split(|&c| c == b'\n')
449 .find(|x| !x.is_empty())
450 .map_or(1, |x| {
451 chunk.len().div_ceil(
452 x.len().max(
453 chunk
454 .rsplit(|&c| c == b'\n')
455 .find(|x| !x.is_empty())
456 .unwrap()
457 .len(),
458 ),
459 )
460 })
461}
462
463pub(crate) fn next_line_position_naive_json(input: &[u8]) -> Option<usize> {
467 let pos = memchr::memchr(NEWLINE, input)?;
468 if pos == 0 {
469 return Some(1);
470 }
471
472 let is_closing_bracket = input.get(pos - 1) == Some(&CLOSING_BRACKET);
473 if is_closing_bracket {
474 Some(pos + 1)
475 } else {
476 None
477 }
478}
479
480pub(crate) fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
482 let mut lengths = Vec::with_capacity(n_lines);
483
484 let mut bytes_trunc;
485 let n_lines_per_iter = n_lines / 2;
486
487 let mut n_read = 0;
488
489 let bytes_len = bytes.len();
490
491 for offset in [0, (bytes_len as f32 * 0.75) as usize] {
493 bytes_trunc = &bytes[offset..];
494 let pos = next_line_position_naive_json(bytes_trunc)?;
495 if pos >= bytes_len {
496 return None;
497 }
498 bytes_trunc = &bytes_trunc[pos + 1..];
499
500 for _ in offset..(offset + n_lines_per_iter) {
501 let pos = next_line_position_naive_json(bytes_trunc);
502 if let Some(pos) = pos {
503 lengths.push(pos);
504 let next_bytes = &bytes_trunc[pos..];
505 if next_bytes.is_empty() {
506 return None;
507 }
508 bytes_trunc = next_bytes;
509 n_read += pos;
510 } else {
511 break;
512 }
513 }
514 }
515
516 let n_samples = lengths.len();
517 let mean = (n_read as f32) / (n_samples as f32);
518 let mut std = 0.0;
519 for &len in lengths.iter() {
520 std += (len as f32 - mean).pow(2.0)
521 }
522 std = (std / n_samples as f32).sqrt();
523 Some((mean, std))
524}
525
526pub(crate) fn get_file_chunks_json(bytes: &[u8], n_threads: usize) -> Vec<(usize, usize)> {
527 let mut last_pos = 0;
528 let total_len = bytes.len();
529 let chunk_size = total_len / n_threads;
530 let mut offsets = Vec::with_capacity(n_threads);
531 for _ in 0..n_threads {
532 let search_pos = last_pos + chunk_size;
533
534 if search_pos >= bytes.len() {
535 break;
536 }
537
538 let end_pos = match next_line_position_naive_json(&bytes[search_pos..]) {
539 Some(pos) => search_pos + pos,
540 None => {
541 break;
542 },
543 };
544 offsets.push((last_pos, end_pos));
545 last_pos = end_pos;
546 }
547 offsets.push((last_pos, total_len));
548 offsets
549}