polars_lazy/scan/
ndjson.rs1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use polars_core::prelude::*;
5use polars_io::cloud::CloudOptions;
6use polars_io::{HiveOptions, RowIndex};
7use polars_plan::dsl::{
8 CastColumnsPolicy, DslPlan, ExtraColumnsPolicy, FileScanDsl, MissingColumnsPolicy, ScanSources,
9};
10use polars_plan::prelude::{NDJsonReadOptions, UnifiedScanArgs};
11use polars_utils::plpath::PlPath;
12use polars_utils::slice_enum::Slice;
13
14use crate::prelude::LazyFrame;
15use crate::scan::file_list_reader::LazyFileListReader;
16
17#[derive(Clone)]
18pub struct LazyJsonLineReader {
19 pub(crate) sources: ScanSources,
20 pub(crate) batch_size: Option<NonZeroUsize>,
21 pub(crate) low_memory: bool,
22 pub(crate) rechunk: bool,
23 pub(crate) schema: Option<SchemaRef>,
24 pub(crate) schema_overwrite: Option<SchemaRef>,
25 pub(crate) row_index: Option<RowIndex>,
26 pub(crate) infer_schema_length: Option<NonZeroUsize>,
27 pub(crate) n_rows: Option<usize>,
28 pub(crate) ignore_errors: bool,
29 pub(crate) include_file_paths: Option<PlSmallStr>,
30 pub(crate) cloud_options: Option<CloudOptions>,
31}
32
33impl LazyJsonLineReader {
34 pub fn new_paths(paths: Arc<[PlPath]>) -> Self {
35 Self::new_with_sources(ScanSources::Paths(paths))
36 }
37
38 pub fn new_with_sources(sources: ScanSources) -> Self {
39 LazyJsonLineReader {
40 sources,
41 batch_size: None,
42 low_memory: false,
43 rechunk: false,
44 schema: None,
45 schema_overwrite: None,
46 row_index: None,
47 infer_schema_length: NonZeroUsize::new(100),
48 ignore_errors: false,
49 n_rows: None,
50 include_file_paths: None,
51 cloud_options: None,
52 }
53 }
54
55 pub fn new(path: PlPath) -> Self {
56 Self::new_with_sources(ScanSources::Paths([path].into()))
57 }
58
59 #[must_use]
61 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
62 self.row_index = row_index;
63 self
64 }
65
66 #[must_use]
68 pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
69 self.ignore_errors = ignore_errors;
70 self
71 }
72 #[must_use]
75 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
76 self.n_rows = num_rows;
77 self
78 }
79 #[must_use]
84 pub fn with_infer_schema_length(mut self, num_rows: Option<NonZeroUsize>) -> Self {
85 self.infer_schema_length = num_rows;
86 self
87 }
88 #[must_use]
90 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
91 self.schema = schema;
92 self
93 }
94
95 #[must_use]
97 pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
98 self.schema_overwrite = schema_overwrite;
99 self
100 }
101
102 #[must_use]
104 pub fn low_memory(mut self, toggle: bool) -> Self {
105 self.low_memory = toggle;
106 self
107 }
108
109 #[must_use]
110 pub fn with_batch_size(mut self, batch_size: Option<NonZeroUsize>) -> Self {
111 self.batch_size = batch_size;
112 self
113 }
114
115 pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
116 self.cloud_options = cloud_options;
117 self
118 }
119
120 pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
121 self.include_file_paths = include_file_paths;
122 self
123 }
124}
125
126impl LazyFileListReader for LazyJsonLineReader {
127 fn finish(self) -> PolarsResult<LazyFrame> {
128 let unified_scan_args = UnifiedScanArgs {
129 schema: None,
130 cloud_options: self.cloud_options,
131 hive_options: HiveOptions::new_disabled(),
132 rechunk: self.rechunk,
133 cache: false,
134 glob: true,
135 projection: None,
136 column_mapping: None,
137 default_values: None,
138 row_index: self.row_index,
139 pre_slice: self.n_rows.map(|len| Slice::Positive { offset: 0, len }),
140 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
141 missing_columns_policy: MissingColumnsPolicy::Raise,
142 extra_columns_policy: ExtraColumnsPolicy::Raise,
143 include_file_paths: self.include_file_paths,
144 deletion_files: None,
145 };
146
147 let options = NDJsonReadOptions {
148 n_threads: None,
149 infer_schema_length: self.infer_schema_length,
150 chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
151 low_memory: self.low_memory,
152 ignore_errors: self.ignore_errors,
153 schema: self.schema,
154 schema_overwrite: self.schema_overwrite,
155 };
156
157 let scan_type = Box::new(FileScanDsl::NDJson { options });
158
159 Ok(LazyFrame::from(DslPlan::Scan {
160 sources: self.sources,
161 unified_scan_args: Box::new(unified_scan_args),
162 scan_type,
163 cached_ir: Default::default(),
164 }))
165 }
166
167 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
168 unreachable!();
169 }
170
171 fn sources(&self) -> &ScanSources {
172 &self.sources
173 }
174
175 fn with_sources(mut self, sources: ScanSources) -> Self {
176 self.sources = sources;
177 self
178 }
179
180 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
181 self.n_rows = n_rows.into();
182 self
183 }
184
185 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
186 self.row_index = row_index.into();
187 self
188 }
189
190 fn rechunk(&self) -> bool {
191 self.rechunk
192 }
193
194 fn with_rechunk(mut self, toggle: bool) -> Self {
196 self.rechunk = toggle;
197 self
198 }
199
200 fn n_rows(&self) -> Option<usize> {
203 self.n_rows
204 }
205
206 fn row_index(&self) -> Option<&RowIndex> {
208 self.row_index.as_ref()
209 }
210
211 fn cloud_options(&self) -> Option<&CloudOptions> {
213 self.cloud_options.as_ref()
214 }
215}