polars_lazy/scan/
ndjson.rs

1use std::num::NonZeroUsize;
2
3use arrow::buffer::Buffer;
4use polars_core::prelude::*;
5use polars_io::cloud::CloudOptions;
6use polars_io::{HiveOptions, RowIndex};
7use polars_plan::dsl::{
8    CastColumnsPolicy, DslPlan, ExtraColumnsPolicy, FileScanDsl, MissingColumnsPolicy, ScanSources,
9};
10use polars_plan::prelude::{NDJsonReadOptions, UnifiedScanArgs};
11use polars_utils::plpath::PlPath;
12use polars_utils::slice_enum::Slice;
13
14use crate::prelude::LazyFrame;
15use crate::scan::file_list_reader::LazyFileListReader;
16
17#[derive(Clone)]
18pub struct LazyJsonLineReader {
19    pub(crate) sources: ScanSources,
20    pub(crate) batch_size: Option<NonZeroUsize>,
21    pub(crate) low_memory: bool,
22    pub(crate) rechunk: bool,
23    pub(crate) schema: Option<SchemaRef>,
24    pub(crate) schema_overwrite: Option<SchemaRef>,
25    pub(crate) row_index: Option<RowIndex>,
26    pub(crate) infer_schema_length: Option<NonZeroUsize>,
27    pub(crate) n_rows: Option<usize>,
28    pub(crate) ignore_errors: bool,
29    pub(crate) include_file_paths: Option<PlSmallStr>,
30    pub(crate) cloud_options: Option<CloudOptions>,
31}
32
33impl LazyJsonLineReader {
34    pub fn new_paths(paths: Buffer<PlPath>) -> Self {
35        Self::new_with_sources(ScanSources::Paths(paths))
36    }
37
38    pub fn new_with_sources(sources: ScanSources) -> Self {
39        LazyJsonLineReader {
40            sources,
41            batch_size: None,
42            low_memory: false,
43            rechunk: false,
44            schema: None,
45            schema_overwrite: None,
46            row_index: None,
47            infer_schema_length: NonZeroUsize::new(100),
48            ignore_errors: false,
49            n_rows: None,
50            include_file_paths: None,
51            cloud_options: None,
52        }
53    }
54
55    pub fn new(path: PlPath) -> Self {
56        Self::new_with_sources(ScanSources::Paths(Buffer::from_iter([path])))
57    }
58
59    /// Add a row index column.
60    #[must_use]
61    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
62        self.row_index = row_index;
63        self
64    }
65
66    /// Set values as `Null` if parsing fails because of schema mismatches.
67    #[must_use]
68    pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
69        self.ignore_errors = ignore_errors;
70        self
71    }
72    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
73    /// be guaranteed.
74    #[must_use]
75    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
76        self.n_rows = num_rows;
77        self
78    }
79    /// Set the number of rows to use when inferring the json schema.
80    /// the default is 100 rows.
81    /// Ignored when the schema is specified explicitly using [`Self::with_schema`].
82    /// Setting to `None` will do a full table scan, very slow.
83    #[must_use]
84    pub fn with_infer_schema_length(mut self, num_rows: Option<NonZeroUsize>) -> Self {
85        self.infer_schema_length = num_rows;
86        self
87    }
88    /// Set the JSON file's schema
89    #[must_use]
90    pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
91        self.schema = schema;
92        self
93    }
94
95    /// Set the JSON file's schema
96    #[must_use]
97    pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
98        self.schema_overwrite = schema_overwrite;
99        self
100    }
101
102    /// Reduce memory usage at the expense of performance
103    #[must_use]
104    pub fn low_memory(mut self, toggle: bool) -> Self {
105        self.low_memory = toggle;
106        self
107    }
108
109    #[must_use]
110    pub fn with_batch_size(mut self, batch_size: Option<NonZeroUsize>) -> Self {
111        self.batch_size = batch_size;
112        self
113    }
114
115    pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
116        self.cloud_options = cloud_options;
117        self
118    }
119
120    pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
121        self.include_file_paths = include_file_paths;
122        self
123    }
124}
125
126impl LazyFileListReader for LazyJsonLineReader {
127    fn finish(self) -> PolarsResult<LazyFrame> {
128        let unified_scan_args = UnifiedScanArgs {
129            schema: None,
130            cloud_options: self.cloud_options,
131            hive_options: HiveOptions::new_disabled(),
132            rechunk: self.rechunk,
133            cache: false,
134            glob: true,
135            hidden_file_prefix: None,
136            projection: None,
137            column_mapping: None,
138            default_values: None,
139            row_index: self.row_index,
140            pre_slice: self.n_rows.map(|len| Slice::Positive { offset: 0, len }),
141            cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
142            missing_columns_policy: MissingColumnsPolicy::Raise,
143            extra_columns_policy: ExtraColumnsPolicy::Raise,
144            include_file_paths: self.include_file_paths,
145            deletion_files: None,
146            table_statistics: None,
147        };
148
149        let options = NDJsonReadOptions {
150            n_threads: None,
151            infer_schema_length: self.infer_schema_length,
152            chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
153            low_memory: self.low_memory,
154            ignore_errors: self.ignore_errors,
155            schema: self.schema,
156            schema_overwrite: self.schema_overwrite,
157        };
158
159        let scan_type = Box::new(FileScanDsl::NDJson { options });
160
161        Ok(LazyFrame::from(DslPlan::Scan {
162            sources: self.sources,
163            unified_scan_args: Box::new(unified_scan_args),
164            scan_type,
165            cached_ir: Default::default(),
166        }))
167    }
168
169    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
170        unreachable!();
171    }
172
173    fn sources(&self) -> &ScanSources {
174        &self.sources
175    }
176
177    fn with_sources(mut self, sources: ScanSources) -> Self {
178        self.sources = sources;
179        self
180    }
181
182    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
183        self.n_rows = n_rows.into();
184        self
185    }
186
187    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
188        self.row_index = row_index.into();
189        self
190    }
191
192    fn rechunk(&self) -> bool {
193        self.rechunk
194    }
195
196    /// Rechunk the memory to contiguous chunks when parsing is done.
197    fn with_rechunk(mut self, toggle: bool) -> Self {
198        self.rechunk = toggle;
199        self
200    }
201
202    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
203    /// be guaranteed.
204    fn n_rows(&self) -> Option<usize> {
205        self.n_rows
206    }
207
208    /// Add a row index column.
209    fn row_index(&self) -> Option<&RowIndex> {
210        self.row_index.as_ref()
211    }
212
213    /// [CloudOptions] used to list files.
214    fn cloud_options(&self) -> Option<&CloudOptions> {
215        self.cloud_options.as_ref()
216    }
217}