polars_lazy/scan/
ndjson.rs

1use std::num::NonZeroUsize;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use polars_core::prelude::*;
6use polars_io::cloud::CloudOptions;
7use polars_io::{HiveOptions, RowIndex};
8use polars_plan::dsl::{CastColumnsPolicy, DslPlan, FileScan, MissingColumnsPolicy, ScanSources};
9use polars_plan::prelude::{NDJsonReadOptions, UnifiedScanArgs};
10use polars_utils::slice_enum::Slice;
11
12use crate::prelude::LazyFrame;
13use crate::scan::file_list_reader::LazyFileListReader;
14
15#[derive(Clone)]
16pub struct LazyJsonLineReader {
17    pub(crate) sources: ScanSources,
18    pub(crate) batch_size: Option<NonZeroUsize>,
19    pub(crate) low_memory: bool,
20    pub(crate) rechunk: bool,
21    pub(crate) schema: Option<SchemaRef>,
22    pub(crate) schema_overwrite: Option<SchemaRef>,
23    pub(crate) row_index: Option<RowIndex>,
24    pub(crate) infer_schema_length: Option<NonZeroUsize>,
25    pub(crate) n_rows: Option<usize>,
26    pub(crate) ignore_errors: bool,
27    pub(crate) include_file_paths: Option<PlSmallStr>,
28    pub(crate) cloud_options: Option<CloudOptions>,
29}
30
31impl LazyJsonLineReader {
32    pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
33        Self::new_with_sources(ScanSources::Paths(paths))
34    }
35
36    pub fn new_with_sources(sources: ScanSources) -> Self {
37        LazyJsonLineReader {
38            sources,
39            batch_size: None,
40            low_memory: false,
41            rechunk: false,
42            schema: None,
43            schema_overwrite: None,
44            row_index: None,
45            infer_schema_length: NonZeroUsize::new(100),
46            ignore_errors: false,
47            n_rows: None,
48            include_file_paths: None,
49            cloud_options: None,
50        }
51    }
52
53    pub fn new(path: impl AsRef<Path>) -> Self {
54        Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
55    }
56
57    /// Add a row index column.
58    #[must_use]
59    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
60        self.row_index = row_index;
61        self
62    }
63
64    /// Set values as `Null` if parsing fails because of schema mismatches.
65    #[must_use]
66    pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
67        self.ignore_errors = ignore_errors;
68        self
69    }
70    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
71    /// be guaranteed.
72    #[must_use]
73    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
74        self.n_rows = num_rows;
75        self
76    }
77    /// Set the number of rows to use when inferring the json schema.
78    /// the default is 100 rows.
79    /// Ignored when the schema is specified explicitly using [`Self::with_schema`].
80    /// Setting to `None` will do a full table scan, very slow.
81    #[must_use]
82    pub fn with_infer_schema_length(mut self, num_rows: Option<NonZeroUsize>) -> Self {
83        self.infer_schema_length = num_rows;
84        self
85    }
86    /// Set the JSON file's schema
87    #[must_use]
88    pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
89        self.schema = schema;
90        self
91    }
92
93    /// Set the JSON file's schema
94    #[must_use]
95    pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
96        self.schema_overwrite = schema_overwrite;
97        self
98    }
99
100    /// Reduce memory usage at the expense of performance
101    #[must_use]
102    pub fn low_memory(mut self, toggle: bool) -> Self {
103        self.low_memory = toggle;
104        self
105    }
106
107    #[must_use]
108    pub fn with_batch_size(mut self, batch_size: Option<NonZeroUsize>) -> Self {
109        self.batch_size = batch_size;
110        self
111    }
112
113    pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
114        self.cloud_options = cloud_options;
115        self
116    }
117
118    pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
119        self.include_file_paths = include_file_paths;
120        self
121    }
122}
123
124impl LazyFileListReader for LazyJsonLineReader {
125    fn finish(self) -> PolarsResult<LazyFrame> {
126        let unified_scan_args = UnifiedScanArgs {
127            schema: None,
128            cloud_options: self.cloud_options,
129            hive_options: HiveOptions::new_disabled(),
130            rechunk: self.rechunk,
131            cache: false,
132            glob: true,
133            projection: None,
134            row_index: self.row_index,
135            pre_slice: self.n_rows.map(|len| Slice::Positive { offset: 0, len }),
136            cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
137            missing_columns_policy: MissingColumnsPolicy::Raise,
138            include_file_paths: self.include_file_paths,
139        };
140
141        let options = NDJsonReadOptions {
142            n_threads: None,
143            infer_schema_length: self.infer_schema_length,
144            chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
145            low_memory: self.low_memory,
146            ignore_errors: self.ignore_errors,
147            schema: self.schema,
148            schema_overwrite: self.schema_overwrite,
149        };
150
151        let scan_type = Box::new(FileScan::NDJson { options });
152
153        Ok(LazyFrame::from(DslPlan::Scan {
154            sources: self.sources,
155            file_info: None,
156            unified_scan_args: Box::new(unified_scan_args),
157            scan_type,
158            cached_ir: Default::default(),
159        }))
160    }
161
162    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
163        unreachable!();
164    }
165
166    fn sources(&self) -> &ScanSources {
167        &self.sources
168    }
169
170    fn with_sources(mut self, sources: ScanSources) -> Self {
171        self.sources = sources;
172        self
173    }
174
175    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
176        self.n_rows = n_rows.into();
177        self
178    }
179
180    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
181        self.row_index = row_index.into();
182        self
183    }
184
185    fn rechunk(&self) -> bool {
186        self.rechunk
187    }
188
189    /// Rechunk the memory to contiguous chunks when parsing is done.
190    fn with_rechunk(mut self, toggle: bool) -> Self {
191        self.rechunk = toggle;
192        self
193    }
194
195    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
196    /// be guaranteed.
197    fn n_rows(&self) -> Option<usize> {
198        self.n_rows
199    }
200
201    /// Add a row index column.
202    fn row_index(&self) -> Option<&RowIndex> {
203        self.row_index.as_ref()
204    }
205
206    /// [CloudOptions] used to list files.
207    fn cloud_options(&self) -> Option<&CloudOptions> {
208        self.cloud_options.as_ref()
209    }
210}