polars_lazy/scan/
ndjson.rs1use std::num::NonZeroUsize;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use polars_core::prelude::*;
6use polars_io::cloud::CloudOptions;
7use polars_io::{HiveOptions, RowIndex};
8use polars_plan::dsl::{DslPlan, FileScan, ScanSources};
9use polars_plan::prelude::{FileScanOptions, NDJsonReadOptions};
10
11use crate::prelude::LazyFrame;
12use crate::scan::file_list_reader::LazyFileListReader;
13
14#[derive(Clone)]
15pub struct LazyJsonLineReader {
16 pub(crate) sources: ScanSources,
17 pub(crate) batch_size: Option<NonZeroUsize>,
18 pub(crate) low_memory: bool,
19 pub(crate) rechunk: bool,
20 pub(crate) schema: Option<SchemaRef>,
21 pub(crate) schema_overwrite: Option<SchemaRef>,
22 pub(crate) row_index: Option<RowIndex>,
23 pub(crate) infer_schema_length: Option<NonZeroUsize>,
24 pub(crate) n_rows: Option<usize>,
25 pub(crate) ignore_errors: bool,
26 pub(crate) include_file_paths: Option<PlSmallStr>,
27 pub(crate) cloud_options: Option<CloudOptions>,
28}
29
30impl LazyJsonLineReader {
31 pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
32 Self::new_with_sources(ScanSources::Paths(paths))
33 }
34
35 pub fn new_with_sources(sources: ScanSources) -> Self {
36 LazyJsonLineReader {
37 sources,
38 batch_size: None,
39 low_memory: false,
40 rechunk: false,
41 schema: None,
42 schema_overwrite: None,
43 row_index: None,
44 infer_schema_length: NonZeroUsize::new(100),
45 ignore_errors: false,
46 n_rows: None,
47 include_file_paths: None,
48 cloud_options: None,
49 }
50 }
51
52 pub fn new(path: impl AsRef<Path>) -> Self {
53 Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
54 }
55
56 #[must_use]
58 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
59 self.row_index = row_index;
60 self
61 }
62
63 #[must_use]
65 pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
66 self.ignore_errors = ignore_errors;
67 self
68 }
69 #[must_use]
72 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
73 self.n_rows = num_rows;
74 self
75 }
76 #[must_use]
81 pub fn with_infer_schema_length(mut self, num_rows: Option<NonZeroUsize>) -> Self {
82 self.infer_schema_length = num_rows;
83 self
84 }
85 #[must_use]
87 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
88 self.schema = schema;
89 self
90 }
91
92 #[must_use]
94 pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
95 self.schema_overwrite = schema_overwrite;
96 self
97 }
98
99 #[must_use]
101 pub fn low_memory(mut self, toggle: bool) -> Self {
102 self.low_memory = toggle;
103 self
104 }
105
106 #[must_use]
107 pub fn with_batch_size(mut self, batch_size: Option<NonZeroUsize>) -> Self {
108 self.batch_size = batch_size;
109 self
110 }
111
112 pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
113 self.cloud_options = cloud_options;
114 self
115 }
116
117 pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
118 self.include_file_paths = include_file_paths;
119 self
120 }
121}
122
123impl LazyFileListReader for LazyJsonLineReader {
124 fn finish(self) -> PolarsResult<LazyFrame> {
125 let file_options = Box::new(FileScanOptions {
126 pre_slice: self.n_rows.map(|x| (0, x)),
127 with_columns: None,
128 cache: false,
129 row_index: self.row_index,
130 rechunk: self.rechunk,
131 file_counter: 0,
132 hive_options: HiveOptions {
133 enabled: Some(false),
134 hive_start_idx: 0,
135 schema: None,
136 try_parse_dates: true,
137 },
138 glob: true,
139 include_file_paths: self.include_file_paths,
140 allow_missing_columns: false,
141 });
142
143 let options = NDJsonReadOptions {
144 n_threads: None,
145 infer_schema_length: self.infer_schema_length,
146 chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
147 low_memory: self.low_memory,
148 ignore_errors: self.ignore_errors,
149 schema: self.schema,
150 schema_overwrite: self.schema_overwrite,
151 };
152
153 let scan_type = Box::new(FileScan::NDJson {
154 options,
155 cloud_options: self.cloud_options,
156 });
157
158 Ok(LazyFrame::from(DslPlan::Scan {
159 sources: self.sources,
160 file_info: None,
161 file_options,
162 scan_type,
163 cached_ir: Default::default(),
164 }))
165 }
166
167 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
168 unreachable!();
169 }
170
171 fn sources(&self) -> &ScanSources {
172 &self.sources
173 }
174
175 fn with_sources(mut self, sources: ScanSources) -> Self {
176 self.sources = sources;
177 self
178 }
179
180 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
181 self.n_rows = n_rows.into();
182 self
183 }
184
185 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
186 self.row_index = row_index.into();
187 self
188 }
189
190 fn rechunk(&self) -> bool {
191 self.rechunk
192 }
193
194 fn with_rechunk(mut self, toggle: bool) -> Self {
196 self.rechunk = toggle;
197 self
198 }
199
200 fn n_rows(&self) -> Option<usize> {
203 self.n_rows
204 }
205
206 fn row_index(&self) -> Option<&RowIndex> {
208 self.row_index.as_ref()
209 }
210
211 fn cloud_options(&self) -> Option<&CloudOptions> {
213 self.cloud_options.as_ref()
214 }
215}