1#[cfg(feature = "csv")]
2use arrow::buffer::Buffer;
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::csv::read::{
6 CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, infer_file_schema,
7};
8use polars_io::path_utils::expand_paths;
9use polars_io::utils::compression::maybe_decompress_bytes;
10use polars_io::utils::get_reader_bytes;
11use polars_io::{HiveOptions, RowIndex};
12use polars_utils::mmap::MemSlice;
13use polars_utils::plpath::PlPath;
14use polars_utils::slice_enum::Slice;
15
16use crate::prelude::*;
17
18#[derive(Clone)]
19#[cfg(feature = "csv")]
20pub struct LazyCsvReader {
21 sources: ScanSources,
22 glob: bool,
23 cache: bool,
24 read_options: CsvReadOptions,
25 cloud_options: Option<CloudOptions>,
26 include_file_paths: Option<PlSmallStr>,
27}
28
29#[cfg(feature = "csv")]
30impl LazyCsvReader {
31 pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
33 mut self,
34 map_func: F,
35 ) -> Self {
36 self.read_options = self.read_options.map_parse_options(map_func);
37 self
38 }
39
40 pub fn new_paths(paths: Buffer<PlPath>) -> Self {
41 Self::new_with_sources(ScanSources::Paths(paths))
42 }
43
44 pub fn new_with_sources(sources: ScanSources) -> Self {
45 LazyCsvReader {
46 sources,
47 glob: true,
48 cache: true,
49 read_options: Default::default(),
50 cloud_options: Default::default(),
51 include_file_paths: None,
52 }
53 }
54
55 pub fn new(path: PlPath) -> Self {
56 Self::new_with_sources(ScanSources::Paths(Buffer::from_iter([path])))
57 }
58
59 #[must_use]
61 pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
62 self.read_options.skip_rows_after_header = offset;
63 self
64 }
65
66 #[must_use]
68 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
69 self.read_options.row_index = row_index;
70 self
71 }
72
73 #[must_use]
76 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
77 self.read_options.n_rows = num_rows;
78 self
79 }
80
81 #[must_use]
85 pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
86 self.read_options.infer_schema_length = num_rows;
87 self
88 }
89
90 #[must_use]
92 pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
93 self.read_options.ignore_errors = ignore;
94 self
95 }
96
97 #[must_use]
99 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
100 self.read_options.schema = schema;
101 self
102 }
103
104 #[must_use]
107 pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
108 self.read_options.skip_rows = skip_rows;
109 self
110 }
111
112 #[must_use]
115 pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
116 self.read_options.skip_lines = skip_lines;
117 self
118 }
119
120 #[must_use]
123 pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
124 self.read_options.schema_overwrite = schema;
125 self
126 }
127
128 #[must_use]
130 pub fn with_has_header(mut self, has_header: bool) -> Self {
131 self.read_options.has_header = has_header;
132 self
133 }
134
135 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
138 self.read_options.chunk_size = chunk_size;
139 self
140 }
141
142 #[must_use]
144 pub fn with_separator(self, separator: u8) -> Self {
145 self.map_parse_options(|opts| opts.with_separator(separator))
146 }
147
148 #[must_use]
150 pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
151 self.map_parse_options(|opts| {
152 opts.with_comment_prefix(comment_prefix.clone().map(|s| {
153 if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
154 CommentPrefix::Single(s.as_bytes()[0])
155 } else {
156 CommentPrefix::Multi(s)
157 }
158 }))
159 })
160 }
161
162 #[must_use]
164 pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
165 self.map_parse_options(|opts| opts.with_quote_char(quote_char))
166 }
167
168 #[must_use]
170 pub fn with_eol_char(self, eol_char: u8) -> Self {
171 self.map_parse_options(|opts| opts.with_eol_char(eol_char))
172 }
173
174 #[must_use]
176 pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
177 self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
178 }
179
180 pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
182 self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
183 }
184
185 #[must_use]
187 pub fn with_cache(mut self, cache: bool) -> Self {
188 self.cache = cache;
189 self
190 }
191
192 #[must_use]
194 pub fn with_low_memory(mut self, low_memory: bool) -> Self {
195 self.read_options.low_memory = low_memory;
196 self
197 }
198
199 #[must_use]
201 pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
202 self.map_parse_options(|opts| opts.with_encoding(encoding))
203 }
204
205 #[cfg(feature = "temporal")]
208 pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
209 self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
210 }
211
212 #[must_use]
214 pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
215 self.read_options.raise_if_empty = raise_if_empty;
216 self
217 }
218
219 #[must_use]
221 pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
222 self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
223 }
224
225 #[must_use]
226 pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
227 self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
228 }
229
230 #[must_use]
231 pub fn with_glob(mut self, toggle: bool) -> Self {
233 self.glob = toggle;
234 self
235 }
236
237 pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
238 self.cloud_options = cloud_options;
239 self
240 }
241
242 pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
246 where
247 F: Fn(Schema) -> PolarsResult<Schema>,
248 {
249 let n_threads = self.read_options.n_threads;
250
251 let infer_schema = |bytes: MemSlice| {
252 let skip_rows = self.read_options.skip_rows;
253 let skip_lines = self.read_options.skip_lines;
254 let parse_options = self.read_options.get_parse_options();
255
256 let mut owned = vec![];
257 let bytes = maybe_decompress_bytes(bytes.as_ref(), &mut owned)?;
258
259 PolarsResult::Ok(
260 infer_file_schema(
261 &get_reader_bytes(&mut std::io::Cursor::new(bytes))?,
262 &parse_options,
263 self.read_options.infer_schema_length,
264 self.read_options.has_header,
265 None,
267 skip_rows,
268 skip_lines,
269 self.read_options.skip_rows_after_header,
270 self.read_options.raise_if_empty,
271 )?
272 .0,
273 )
274 };
275
276 let schema = match self.sources.clone() {
277 ScanSources::Paths(paths) => {
278 let paths = expand_paths(
281 &paths[..],
282 self.glob(),
283 &[], &mut self.cloud_options,
285 )?;
286
287 let Some(path) = paths.first() else {
288 polars_bail!(ComputeError: "no paths specified for this reader");
289 };
290
291 infer_schema(MemSlice::from_file(&polars_utils::open_file(
292 path.as_ref().as_local_path().unwrap(),
293 )?)?)?
294 },
295 ScanSources::Files(files) => {
296 let Some(file) = files.first() else {
297 polars_bail!(ComputeError: "no buffers specified for this reader");
298 };
299
300 infer_schema(MemSlice::from_file(file)?)?
301 },
302 ScanSources::Buffers(buffers) => {
303 let Some(buffer) = buffers.first() else {
304 polars_bail!(ComputeError: "no buffers specified for this reader");
305 };
306
307 infer_schema(buffer.clone())?
308 },
309 };
310
311 self.read_options.n_threads = n_threads;
312 let mut schema = f(schema)?;
313
314 if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
316 for (name, dtype) in overwrite_schema.iter() {
317 schema.with_column(name.clone(), dtype.clone());
318 }
319 }
320
321 Ok(self.with_schema(Some(Arc::new(schema))))
322 }
323
324 pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
325 self.include_file_paths = include_file_paths;
326 self
327 }
328}
329
330impl LazyFileListReader for LazyCsvReader {
331 fn finish(self) -> PolarsResult<LazyFrame> {
333 let rechunk = self.rechunk();
334 let row_index = self.row_index().cloned();
335 let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
336
337 let lf: LazyFrame = DslBuilder::scan_csv(
338 self.sources,
339 self.read_options,
340 UnifiedScanArgs {
341 schema: None,
342 cloud_options: self.cloud_options,
343 hive_options: HiveOptions::new_disabled(),
344 rechunk,
345 cache: self.cache,
346 glob: self.glob,
347 hidden_file_prefix: None,
348 projection: None,
349 column_mapping: None,
350 default_values: None,
351 row_index,
352 pre_slice,
353 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
354 missing_columns_policy: MissingColumnsPolicy::Raise,
355 extra_columns_policy: ExtraColumnsPolicy::Raise,
356 include_file_paths: self.include_file_paths,
357 deletion_files: None,
358 table_statistics: None,
359 },
360 )?
361 .build()
362 .into();
363 Ok(lf)
364 }
365
366 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
367 unreachable!();
368 }
369
370 fn glob(&self) -> bool {
371 self.glob
372 }
373
374 fn sources(&self) -> &ScanSources {
375 &self.sources
376 }
377
378 fn with_sources(mut self, sources: ScanSources) -> Self {
379 self.sources = sources;
380 self
381 }
382
383 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
384 self.read_options.n_rows = n_rows.into();
385 self
386 }
387
388 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
389 self.read_options.row_index = row_index.into();
390 self
391 }
392
393 fn rechunk(&self) -> bool {
394 self.read_options.rechunk
395 }
396
397 fn with_rechunk(mut self, rechunk: bool) -> Self {
399 self.read_options.rechunk = rechunk;
400 self
401 }
402
403 fn n_rows(&self) -> Option<usize> {
406 self.read_options.n_rows
407 }
408
409 fn row_index(&self) -> Option<&RowIndex> {
411 self.read_options.row_index.as_ref()
412 }
413
414 fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
415 let args = UnionArgs {
417 rechunk: self.rechunk(),
418 parallel: false,
419 to_supertypes: false,
420 from_partitioned_ds: true,
421 ..Default::default()
422 };
423 concat_impl(&lfs, args)
424 }
425
426 fn cloud_options(&self) -> Option<&CloudOptions> {
428 self.cloud_options.as_ref()
429 }
430}