1use std::io::Cursor;
2use std::num::NonZeroUsize;
3
4pub use arrow::array::StructArray;
5use num_traits::pow::Pow;
6use polars_core::prelude::*;
7use polars_core::runtime::RAYON;
8use polars_core::utils::accumulate_dataframes_vertical;
9use rayon::prelude::*;
10
11use crate::RowIndex;
12use crate::mmap::ReaderBytes;
13use crate::ndjson::buffer::*;
14use crate::predicates::PhysicalIoExpr;
15use crate::prelude::*;
16const NEWLINE: u8 = b'\n';
17const CLOSING_BRACKET: u8 = b'}';
18
19pub(crate) struct CoreJsonReader<'a> {
20 reader_bytes: Option<ReaderBytes<'a>>,
21 n_rows: Option<usize>,
22 schema: SchemaRef,
23 n_threads: Option<usize>,
24 sample_size: usize,
25 chunk_size: NonZeroUsize,
26 low_memory: bool,
27 ignore_errors: bool,
28 row_index: Option<&'a mut RowIndex>,
29 predicate: Option<Arc<dyn PhysicalIoExpr>>,
30 projection: Option<Arc<[PlSmallStr]>>,
31}
32impl<'a> CoreJsonReader<'a> {
33 #[allow(clippy::too_many_arguments)]
34 pub(crate) fn new(
35 reader_bytes: ReaderBytes<'a>,
36 n_rows: Option<usize>,
37 schema: Option<SchemaRef>,
38 schema_overwrite: Option<&Schema>,
39 n_threads: Option<usize>,
40 sample_size: usize,
41 chunk_size: NonZeroUsize,
42 low_memory: bool,
43 infer_schema_len: Option<NonZeroUsize>,
44 ignore_errors: bool,
45 row_index: Option<&'a mut RowIndex>,
46 predicate: Option<Arc<dyn PhysicalIoExpr>>,
47 projection: Option<Arc<[PlSmallStr]>>,
48 ) -> PolarsResult<CoreJsonReader<'a>> {
49 let reader_bytes = reader_bytes;
50
51 let mut schema = match schema {
52 Some(schema) => schema,
53 None => {
54 let bytes: &[u8] = &reader_bytes;
55 let mut cursor = Cursor::new(bytes);
56 Arc::new(crate::ndjson::infer_schema(&mut cursor, infer_schema_len)?)
57 },
58 };
59 if let Some(overwriting_schema) = schema_overwrite {
60 let schema = Arc::make_mut(&mut schema);
61 overwrite_schema(schema, overwriting_schema)?;
62 }
63
64 Ok(CoreJsonReader {
65 reader_bytes: Some(reader_bytes),
66 schema,
67 sample_size,
68 n_rows,
69 n_threads,
70 chunk_size,
71 low_memory,
72 ignore_errors,
73 row_index,
74 predicate,
75 projection,
76 })
77 }
78
79 fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult<DataFrame> {
80 let mut bytes = bytes;
81 let mut total_rows = 128;
82
83 if let Some((mean, std)) = get_line_stats_json(bytes, self.sample_size) {
84 let line_length_upper_bound = mean + 1.1 * std;
85
86 total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
87 if let Some(n_rows) = self.n_rows {
88 total_rows = std::cmp::min(n_rows, total_rows);
89 let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;
91
92 if n_bytes < bytes.len() {
93 if let Some(pos) = next_line_position_naive_json(&bytes[n_bytes..]) {
94 bytes = &bytes[..n_bytes + pos]
95 }
96 }
97 }
98 }
99
100 if total_rows <= 128 {
101 n_threads = 1;
102 }
103
104 let rows_per_thread = total_rows / n_threads;
105
106 let max_proxy = bytes.len() / n_threads / 2;
107 let capacity = if self.low_memory {
108 usize::from(self.chunk_size)
109 } else {
110 std::cmp::min(rows_per_thread, max_proxy)
111 };
112 let file_chunks = get_file_chunks_json(bytes, n_threads);
113
114 let row_index = self.row_index.as_ref().map(|ri| ri as &RowIndex);
115 let (mut dfs, prepredicate_heights) = RAYON.install(|| {
116 file_chunks
117 .into_par_iter()
118 .map(|(start_pos, stop_at_nbytes)| {
119 let mut local_df = parse_ndjson(
120 &bytes[start_pos..stop_at_nbytes],
121 Some(capacity),
122 &self.schema,
123 self.ignore_errors,
124 )?;
125
126 let prepredicate_height = local_df.height() as IdxSize;
127 if let Some(projection) = self.projection.as_deref() {
128 local_df = local_df.select(projection.iter().cloned())?;
129 }
130
131 if let Some(row_index) = row_index {
132 local_df = local_df
133 .with_row_index(row_index.name.clone(), Some(row_index.offset))?;
134 }
135
136 if let Some(predicate) = &self.predicate {
137 let s = predicate.evaluate_io(&local_df)?;
138 let mask = s.bool()?;
139 local_df = local_df.filter(mask)?;
140 }
141
142 Ok((local_df, prepredicate_height))
143 })
144 .collect::<PolarsResult<(Vec<_>, Vec<_>)>>()
145 })?;
146
147 if let Some(ref mut row_index) = self.row_index {
148 update_row_counts3(&mut dfs, &prepredicate_heights, 0);
149 row_index.offset += prepredicate_heights.iter().copied().sum::<IdxSize>();
150 }
151
152 accumulate_dataframes_vertical(dfs)
153 }
154
155 pub fn as_df(&mut self) -> PolarsResult<DataFrame> {
156 let n_threads = self
157 .n_threads
158 .unwrap_or_else(|| RAYON.current_num_threads());
159
160 let reader_bytes = self.reader_bytes.take().unwrap();
161
162 let mut df = self.parse_json(n_threads, &reader_bytes)?;
163
164 if let Some(n_rows) = self.n_rows {
167 if n_rows < df.height() {
168 df = df.slice(0, n_rows)
169 }
170 }
171 Ok(df)
172 }
173}
174
175#[inline(always)]
176fn parse_impl(
177 bytes: &[u8],
178 buffers: &mut PlIndexMap<BufferKey, Buffer>,
179 scratch: &mut Scratch,
180 ignore_errors: bool,
181) -> PolarsResult<usize> {
182 scratch.json.clear();
183 scratch.json.extend_from_slice(bytes);
184 let n = scratch.json.len();
185 let value = simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers)
186 .map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
187 match value {
188 simd_json::BorrowedValue::Object(value) => {
189 buffers.iter_mut().try_for_each(|(s, inner)| {
190 match s.0.map_lookup(&value) {
191 Some(v) => inner.add(v)?,
192 None => inner.add_null(),
193 }
194 PolarsResult::Ok(())
195 })?;
196 },
197 _ if ignore_errors => {
198 buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
199 },
200 v => {
201 polars_bail!(ComputeError: "NDJSON line expected to contain JSON object: {v}");
202 },
203 };
204 Ok(n)
205}
206
207#[derive(Default)]
208struct Scratch {
209 json: Vec<u8>,
210 buffers: simd_json::Buffers,
211}
212
213pub fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
214 bytes
220 .split(|&byte| byte == b'\n')
221 .filter(|bytes| is_json_line(bytes))
222}
223
224#[inline]
225pub fn is_json_line(bytes: &[u8]) -> bool {
226 bytes
227 .iter()
228 .any(|byte| !matches!(*byte, b' ' | b'\t' | b'\r'))
229}
230
231fn parse_lines(
232 bytes: &[u8],
233 buffers: &mut PlIndexMap<BufferKey, Buffer>,
234 ignore_errors: bool,
235) -> PolarsResult<()> {
236 let mut scratch = Scratch::default();
237
238 let iter = json_lines(bytes);
239 for bytes in iter {
240 parse_impl(bytes, buffers, &mut scratch, ignore_errors)?;
241 }
242 Ok(())
243}
244
245pub fn parse_ndjson(
246 bytes: &[u8],
247 n_rows_hint: Option<usize>,
248 schema: &Schema,
249 ignore_errors: bool,
250) -> PolarsResult<DataFrame> {
251 let capacity = n_rows_hint.unwrap_or_else(|| estimate_n_lines_in_chunk(bytes));
252
253 let mut buffers = init_buffers(schema, capacity, ignore_errors)?;
254 parse_lines(bytes, &mut buffers, ignore_errors)?;
255
256 DataFrame::new_infer_height(
257 buffers
258 .into_values()
259 .map(|buf| Ok(buf.into_series()?.into_column()))
260 .collect::<PolarsResult<_>>()
261 .map_err(|e| match e {
262 PolarsError::ComputeError(..) => e,
265 PolarsError::SchemaMismatch(e) => PolarsError::ComputeError(e),
266 e => e,
267 })?,
268 )
269}
270
271pub fn estimate_n_lines_in_file(file_bytes: &[u8], sample_size: usize) -> usize {
272 if let Some((mean, std)) = get_line_stats_json(file_bytes, sample_size) {
273 (file_bytes.len() as f32 / (mean - 0.01 * std)) as usize
274 } else {
275 estimate_n_lines_in_chunk(file_bytes)
276 }
277}
278
279pub fn estimate_n_lines_in_chunk(chunk: &[u8]) -> usize {
282 chunk
283 .split(|&c| c == b'\n')
284 .find(|x| !x.is_empty())
285 .map_or(1, |x| {
286 chunk.len().div_ceil(
287 x.len().max(
288 chunk
289 .rsplit(|&c| c == b'\n')
290 .find(|x| !x.is_empty())
291 .unwrap()
292 .len(),
293 ),
294 )
295 })
296}
297
298pub(crate) fn next_line_position_naive_json(input: &[u8]) -> Option<usize> {
302 let pos = memchr::memchr(NEWLINE, input)?;
303 if pos == 0 {
304 return Some(1);
305 }
306
307 let is_closing_bracket = input.get(pos - 1) == Some(&CLOSING_BRACKET);
308 if is_closing_bracket {
309 Some(pos + 1)
310 } else {
311 None
312 }
313}
314
315pub(crate) fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
317 let mut lengths = Vec::with_capacity(n_lines);
318
319 let mut bytes_trunc;
320 let n_lines_per_iter = n_lines / 2;
321
322 let mut n_read = 0;
323
324 let bytes_len = bytes.len();
325
326 for offset in [0, (bytes_len as f32 * 0.75) as usize] {
328 bytes_trunc = &bytes[offset..];
329 let pos = next_line_position_naive_json(bytes_trunc)?;
330 if pos >= bytes_len {
331 return None;
332 }
333 bytes_trunc = &bytes_trunc[pos + 1..];
334
335 for _ in offset..(offset + n_lines_per_iter) {
336 let pos = next_line_position_naive_json(bytes_trunc);
337 if let Some(pos) = pos {
338 lengths.push(pos);
339 let next_bytes = &bytes_trunc[pos..];
340 if next_bytes.is_empty() {
341 return None;
342 }
343 bytes_trunc = next_bytes;
344 n_read += pos;
345 } else {
346 break;
347 }
348 }
349 }
350
351 let n_samples = lengths.len();
352 let mean = (n_read as f32) / (n_samples as f32);
353 let mut std = 0.0;
354 for &len in lengths.iter() {
355 std += (len as f32 - mean).pow(2.0)
356 }
357 std = (std / n_samples as f32).sqrt();
358 Some((mean, std))
359}
360
361pub(crate) fn get_file_chunks_json(bytes: &[u8], n_threads: usize) -> Vec<(usize, usize)> {
362 let mut last_pos = 0;
363 let total_len = bytes.len();
364 let chunk_size = total_len / n_threads;
365 let mut offsets = Vec::with_capacity(n_threads);
366 for _ in 0..n_threads {
367 let search_pos = last_pos + chunk_size;
368
369 if search_pos >= bytes.len() {
370 break;
371 }
372
373 let end_pos = match next_line_position_naive_json(&bytes[search_pos..]) {
374 Some(pos) => search_pos + pos,
375 None => {
376 break;
377 },
378 };
379 offsets.push((last_pos, end_pos));
380 last_pos = end_pos;
381 }
382 offsets.push((last_pos, total_len));
383 offsets
384}