polars_lazy/scan/
ndjson.rs

1use std::num::NonZeroUsize;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use polars_core::prelude::*;
6use polars_io::cloud::CloudOptions;
7use polars_io::{HiveOptions, RowIndex};
8use polars_plan::dsl::{DslPlan, FileScan, ScanSources};
9use polars_plan::prelude::{FileScanOptions, NDJsonReadOptions};
10
11use crate::prelude::LazyFrame;
12use crate::scan::file_list_reader::LazyFileListReader;
13
14#[derive(Clone)]
15pub struct LazyJsonLineReader {
16    pub(crate) sources: ScanSources,
17    pub(crate) batch_size: Option<NonZeroUsize>,
18    pub(crate) low_memory: bool,
19    pub(crate) rechunk: bool,
20    pub(crate) schema: Option<SchemaRef>,
21    pub(crate) schema_overwrite: Option<SchemaRef>,
22    pub(crate) row_index: Option<RowIndex>,
23    pub(crate) infer_schema_length: Option<NonZeroUsize>,
24    pub(crate) n_rows: Option<usize>,
25    pub(crate) ignore_errors: bool,
26    pub(crate) include_file_paths: Option<PlSmallStr>,
27    pub(crate) cloud_options: Option<CloudOptions>,
28}
29
30impl LazyJsonLineReader {
31    pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
32        Self::new_with_sources(ScanSources::Paths(paths))
33    }
34
35    pub fn new_with_sources(sources: ScanSources) -> Self {
36        LazyJsonLineReader {
37            sources,
38            batch_size: None,
39            low_memory: false,
40            rechunk: false,
41            schema: None,
42            schema_overwrite: None,
43            row_index: None,
44            infer_schema_length: NonZeroUsize::new(100),
45            ignore_errors: false,
46            n_rows: None,
47            include_file_paths: None,
48            cloud_options: None,
49        }
50    }
51
52    pub fn new(path: impl AsRef<Path>) -> Self {
53        Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
54    }
55
56    /// Add a row index column.
57    #[must_use]
58    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
59        self.row_index = row_index;
60        self
61    }
62
63    /// Set values as `Null` if parsing fails because of schema mismatches.
64    #[must_use]
65    pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
66        self.ignore_errors = ignore_errors;
67        self
68    }
69    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
70    /// be guaranteed.
71    #[must_use]
72    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
73        self.n_rows = num_rows;
74        self
75    }
76    /// Set the number of rows to use when inferring the json schema.
77    /// the default is 100 rows.
78    /// Ignored when the schema is specified explicitly using [`Self::with_schema`].
79    /// Setting to `None` will do a full table scan, very slow.
80    #[must_use]
81    pub fn with_infer_schema_length(mut self, num_rows: Option<NonZeroUsize>) -> Self {
82        self.infer_schema_length = num_rows;
83        self
84    }
85    /// Set the JSON file's schema
86    #[must_use]
87    pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
88        self.schema = schema;
89        self
90    }
91
92    /// Set the JSON file's schema
93    #[must_use]
94    pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
95        self.schema_overwrite = schema_overwrite;
96        self
97    }
98
99    /// Reduce memory usage at the expense of performance
100    #[must_use]
101    pub fn low_memory(mut self, toggle: bool) -> Self {
102        self.low_memory = toggle;
103        self
104    }
105
106    #[must_use]
107    pub fn with_batch_size(mut self, batch_size: Option<NonZeroUsize>) -> Self {
108        self.batch_size = batch_size;
109        self
110    }
111
112    pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
113        self.cloud_options = cloud_options;
114        self
115    }
116
117    pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
118        self.include_file_paths = include_file_paths;
119        self
120    }
121}
122
123impl LazyFileListReader for LazyJsonLineReader {
124    fn finish(self) -> PolarsResult<LazyFrame> {
125        let file_options = Box::new(FileScanOptions {
126            pre_slice: self.n_rows.map(|x| (0, x)),
127            with_columns: None,
128            cache: false,
129            row_index: self.row_index,
130            rechunk: self.rechunk,
131            file_counter: 0,
132            hive_options: HiveOptions {
133                enabled: Some(false),
134                hive_start_idx: 0,
135                schema: None,
136                try_parse_dates: true,
137            },
138            glob: true,
139            include_file_paths: self.include_file_paths,
140            allow_missing_columns: false,
141        });
142
143        let options = NDJsonReadOptions {
144            n_threads: None,
145            infer_schema_length: self.infer_schema_length,
146            chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
147            low_memory: self.low_memory,
148            ignore_errors: self.ignore_errors,
149            schema: self.schema,
150            schema_overwrite: self.schema_overwrite,
151        };
152
153        let scan_type = Box::new(FileScan::NDJson {
154            options,
155            cloud_options: self.cloud_options,
156        });
157
158        Ok(LazyFrame::from(DslPlan::Scan {
159            sources: self.sources,
160            file_info: None,
161            file_options,
162            scan_type,
163            cached_ir: Default::default(),
164        }))
165    }
166
167    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
168        unreachable!();
169    }
170
171    fn sources(&self) -> &ScanSources {
172        &self.sources
173    }
174
175    fn with_sources(mut self, sources: ScanSources) -> Self {
176        self.sources = sources;
177        self
178    }
179
180    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
181        self.n_rows = n_rows.into();
182        self
183    }
184
185    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
186        self.row_index = row_index.into();
187        self
188    }
189
190    fn rechunk(&self) -> bool {
191        self.rechunk
192    }
193
194    /// Rechunk the memory to contiguous chunks when parsing is done.
195    fn with_rechunk(mut self, toggle: bool) -> Self {
196        self.rechunk = toggle;
197        self
198    }
199
200    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
201    /// be guaranteed.
202    fn n_rows(&self) -> Option<usize> {
203        self.n_rows
204    }
205
206    /// Add a row index column.
207    fn row_index(&self) -> Option<&RowIndex> {
208        self.row_index.as_ref()
209    }
210
211    /// [CloudOptions] used to list files.
212    fn cloud_options(&self) -> Option<&CloudOptions> {
213        self.cloud_options.as_ref()
214    }
215}