polars_lazy/scan/
ndjson.rs1use std::num::NonZeroUsize;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use polars_core::prelude::*;
6use polars_io::cloud::CloudOptions;
7use polars_io::{HiveOptions, RowIndex};
8use polars_plan::dsl::{CastColumnsPolicy, DslPlan, FileScan, MissingColumnsPolicy, ScanSources};
9use polars_plan::prelude::{NDJsonReadOptions, UnifiedScanArgs};
10use polars_utils::slice_enum::Slice;
11
12use crate::prelude::LazyFrame;
13use crate::scan::file_list_reader::LazyFileListReader;
14
15#[derive(Clone)]
16pub struct LazyJsonLineReader {
17 pub(crate) sources: ScanSources,
18 pub(crate) batch_size: Option<NonZeroUsize>,
19 pub(crate) low_memory: bool,
20 pub(crate) rechunk: bool,
21 pub(crate) schema: Option<SchemaRef>,
22 pub(crate) schema_overwrite: Option<SchemaRef>,
23 pub(crate) row_index: Option<RowIndex>,
24 pub(crate) infer_schema_length: Option<NonZeroUsize>,
25 pub(crate) n_rows: Option<usize>,
26 pub(crate) ignore_errors: bool,
27 pub(crate) include_file_paths: Option<PlSmallStr>,
28 pub(crate) cloud_options: Option<CloudOptions>,
29}
30
31impl LazyJsonLineReader {
32 pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
33 Self::new_with_sources(ScanSources::Paths(paths))
34 }
35
36 pub fn new_with_sources(sources: ScanSources) -> Self {
37 LazyJsonLineReader {
38 sources,
39 batch_size: None,
40 low_memory: false,
41 rechunk: false,
42 schema: None,
43 schema_overwrite: None,
44 row_index: None,
45 infer_schema_length: NonZeroUsize::new(100),
46 ignore_errors: false,
47 n_rows: None,
48 include_file_paths: None,
49 cloud_options: None,
50 }
51 }
52
53 pub fn new(path: impl AsRef<Path>) -> Self {
54 Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
55 }
56
57 #[must_use]
59 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
60 self.row_index = row_index;
61 self
62 }
63
64 #[must_use]
66 pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
67 self.ignore_errors = ignore_errors;
68 self
69 }
70 #[must_use]
73 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
74 self.n_rows = num_rows;
75 self
76 }
77 #[must_use]
82 pub fn with_infer_schema_length(mut self, num_rows: Option<NonZeroUsize>) -> Self {
83 self.infer_schema_length = num_rows;
84 self
85 }
86 #[must_use]
88 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
89 self.schema = schema;
90 self
91 }
92
93 #[must_use]
95 pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
96 self.schema_overwrite = schema_overwrite;
97 self
98 }
99
100 #[must_use]
102 pub fn low_memory(mut self, toggle: bool) -> Self {
103 self.low_memory = toggle;
104 self
105 }
106
107 #[must_use]
108 pub fn with_batch_size(mut self, batch_size: Option<NonZeroUsize>) -> Self {
109 self.batch_size = batch_size;
110 self
111 }
112
113 pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
114 self.cloud_options = cloud_options;
115 self
116 }
117
118 pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
119 self.include_file_paths = include_file_paths;
120 self
121 }
122}
123
124impl LazyFileListReader for LazyJsonLineReader {
125 fn finish(self) -> PolarsResult<LazyFrame> {
126 let unified_scan_args = UnifiedScanArgs {
127 schema: None,
128 cloud_options: self.cloud_options,
129 hive_options: HiveOptions::new_disabled(),
130 rechunk: self.rechunk,
131 cache: false,
132 glob: true,
133 projection: None,
134 row_index: self.row_index,
135 pre_slice: self.n_rows.map(|len| Slice::Positive { offset: 0, len }),
136 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
137 missing_columns_policy: MissingColumnsPolicy::Raise,
138 include_file_paths: self.include_file_paths,
139 };
140
141 let options = NDJsonReadOptions {
142 n_threads: None,
143 infer_schema_length: self.infer_schema_length,
144 chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
145 low_memory: self.low_memory,
146 ignore_errors: self.ignore_errors,
147 schema: self.schema,
148 schema_overwrite: self.schema_overwrite,
149 };
150
151 let scan_type = Box::new(FileScan::NDJson { options });
152
153 Ok(LazyFrame::from(DslPlan::Scan {
154 sources: self.sources,
155 file_info: None,
156 unified_scan_args: Box::new(unified_scan_args),
157 scan_type,
158 cached_ir: Default::default(),
159 }))
160 }
161
162 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
163 unreachable!();
164 }
165
166 fn sources(&self) -> &ScanSources {
167 &self.sources
168 }
169
170 fn with_sources(mut self, sources: ScanSources) -> Self {
171 self.sources = sources;
172 self
173 }
174
175 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
176 self.n_rows = n_rows.into();
177 self
178 }
179
180 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
181 self.row_index = row_index.into();
182 self
183 }
184
185 fn rechunk(&self) -> bool {
186 self.rechunk
187 }
188
189 fn with_rechunk(mut self, toggle: bool) -> Self {
191 self.rechunk = toggle;
192 self
193 }
194
195 fn n_rows(&self) -> Option<usize> {
198 self.n_rows
199 }
200
201 fn row_index(&self) -> Option<&RowIndex> {
203 self.row_index.as_ref()
204 }
205
206 fn cloud_options(&self) -> Option<&CloudOptions> {
208 self.cloud_options.as_ref()
209 }
210}