1#[cfg(feature = "csv")]
2use polars_buffer::Buffer;
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::csv::read::{
6 CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues,
7};
8use polars_io::path_utils::expand_paths;
9use polars_io::{HiveOptions, RowIndex};
10use polars_utils::mmap::MMapSemaphore;
11use polars_utils::pl_path::PlRefPath;
12use polars_utils::slice_enum::Slice;
13
14use crate::prelude::*;
15
16#[derive(Clone)]
17#[cfg(feature = "csv")]
18pub struct LazyCsvReader {
19 sources: ScanSources,
20 glob: bool,
21 cache: bool,
22 read_options: CsvReadOptions,
23 cloud_options: Option<CloudOptions>,
24 include_file_paths: Option<PlSmallStr>,
25 missing_columns_policy: Option<MissingColumnsPolicy>,
26}
27
28#[cfg(feature = "csv")]
29impl LazyCsvReader {
30 pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
32 mut self,
33 map_func: F,
34 ) -> Self {
35 self.read_options = self.read_options.map_parse_options(map_func);
36 self
37 }
38
39 pub fn new_paths(paths: Buffer<PlRefPath>) -> Self {
40 Self::new_with_sources(ScanSources::Paths(paths))
41 }
42
43 pub fn new_with_sources(sources: ScanSources) -> Self {
44 LazyCsvReader {
45 sources,
46 glob: true,
47 cache: true,
48 read_options: Default::default(),
49 cloud_options: Default::default(),
50 include_file_paths: None,
51 missing_columns_policy: None,
52 }
53 }
54
55 pub fn new(path: PlRefPath) -> Self {
56 Self::new_with_sources(ScanSources::Paths(Buffer::from_iter([path])))
57 }
58
59 #[must_use]
61 pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
62 self.read_options.skip_rows_after_header = offset;
63 self
64 }
65
66 #[must_use]
68 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
69 self.read_options.row_index = row_index;
70 self
71 }
72
73 #[must_use]
76 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
77 self.read_options.n_rows = num_rows;
78 self
79 }
80
81 #[must_use]
83 pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
84 self.read_options.n_threads = n_threads;
85 self
86 }
87
88 #[must_use]
92 pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
93 self.read_options.infer_schema_length = num_rows;
94 self
95 }
96
97 #[must_use]
99 pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
100 self.read_options.ignore_errors = ignore;
101 self
102 }
103
104 #[must_use]
106 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
107 self.read_options.schema = schema;
108 self
109 }
110
111 #[must_use]
114 pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
115 self.read_options.skip_rows = skip_rows;
116 self
117 }
118
119 #[must_use]
122 pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
123 self.read_options.skip_lines = skip_lines;
124 self
125 }
126
127 #[must_use]
130 pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
131 self.read_options.schema_overwrite = schema;
132 self
133 }
134
135 #[must_use]
137 pub fn with_has_header(mut self, has_header: bool) -> Self {
138 self.read_options.has_header = has_header;
139 self
140 }
141
142 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
145 self.read_options.chunk_size = chunk_size;
146 self
147 }
148
149 #[must_use]
151 pub fn with_separator(self, separator: u8) -> Self {
152 self.map_parse_options(|opts| opts.with_separator(separator))
153 }
154
155 #[must_use]
157 pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
158 self.map_parse_options(|opts| {
159 opts.with_comment_prefix(comment_prefix.clone().map(|s| {
160 if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
161 CommentPrefix::Single(s.as_bytes()[0])
162 } else {
163 CommentPrefix::Multi(s)
164 }
165 }))
166 })
167 }
168
169 #[must_use]
171 pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
172 self.map_parse_options(|opts| opts.with_quote_char(quote_char))
173 }
174
175 #[must_use]
177 pub fn with_eol_char(self, eol_char: u8) -> Self {
178 self.map_parse_options(|opts| opts.with_eol_char(eol_char))
179 }
180
181 #[must_use]
183 pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
184 self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
185 }
186
187 pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
189 self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
190 }
191
192 #[must_use]
194 pub fn with_cache(mut self, cache: bool) -> Self {
195 self.cache = cache;
196 self
197 }
198
199 #[must_use]
201 pub fn with_low_memory(mut self, low_memory: bool) -> Self {
202 self.read_options.low_memory = low_memory;
203 self
204 }
205
206 #[must_use]
208 pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
209 self.map_parse_options(|opts| opts.with_encoding(encoding))
210 }
211
212 #[cfg(feature = "temporal")]
215 pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
216 self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
217 }
218
219 #[must_use]
221 pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
222 self.read_options.raise_if_empty = raise_if_empty;
223 self
224 }
225
226 #[must_use]
228 pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
229 self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
230 }
231
232 #[must_use]
233 pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
234 self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
235 }
236
237 #[must_use]
238 pub fn with_glob(mut self, toggle: bool) -> Self {
240 self.glob = toggle;
241 self
242 }
243
244 pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
245 self.cloud_options = cloud_options;
246 self
247 }
248
249 pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
253 where
254 F: Fn(Schema) -> PolarsResult<Schema>,
255 {
256 const ASSUMED_COMPRESSION_RATIO: usize = 4;
257 let n_threads = self.read_options.n_threads;
258
259 let infer_schema = |bytes: Buffer<u8>| {
260 use polars_io::prelude::streaming::read_until_start_and_infer_schema;
261 use polars_io::utils::compression::ByteSourceReader;
262
263 let bytes_len = bytes.len();
264 let mut reader = ByteSourceReader::from_memory(bytes)?;
265 let decompressed_size_hint = Some(
266 bytes_len
267 * reader
268 .compression()
269 .map_or(1, |_| ASSUMED_COMPRESSION_RATIO),
270 );
271
272 let (inferred_schema, _) = read_until_start_and_infer_schema(
273 &self.read_options,
274 None,
275 decompressed_size_hint,
276 None,
277 &mut reader,
278 )?;
279
280 PolarsResult::Ok(inferred_schema)
281 };
282
283 let schema = match self.sources.clone() {
284 ScanSources::Paths(paths) => {
285 use polars_io::pl_async::get_runtime;
289
290 let paths = get_runtime().block_on(expand_paths(
291 &paths[..],
292 self.glob(),
293 &[], &mut self.cloud_options,
295 ))?;
296
297 let Some(path) = paths.first() else {
298 polars_bail!(ComputeError: "no paths specified for this reader");
299 };
300
301 let file = polars_utils::open_file(path.as_std_path())?;
302 let mmap = MMapSemaphore::new_from_file(&file)?;
303 infer_schema(Buffer::from_owner(mmap))?
304 },
305 ScanSources::Files(files) => {
306 let Some(file) = files.first() else {
307 polars_bail!(ComputeError: "no buffers specified for this reader");
308 };
309
310 let mmap = MMapSemaphore::new_from_file(file)?;
311 infer_schema(Buffer::from_owner(mmap))?
312 },
313 ScanSources::Buffers(buffers) => {
314 let Some(buffer) = buffers.first() else {
315 polars_bail!(ComputeError: "no buffers specified for this reader");
316 };
317
318 infer_schema(buffer.clone())?
319 },
320 };
321
322 self.read_options.n_threads = n_threads;
323 let mut schema = f(schema)?;
324
325 if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
327 for (name, dtype) in overwrite_schema.iter() {
328 schema.with_column(name.clone(), dtype.clone());
329 }
330 }
331
332 Ok(self.with_schema(Some(Arc::new(schema))))
333 }
334
335 pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
336 self.include_file_paths = include_file_paths;
337 self
338 }
339
340 #[must_use]
341 pub fn with_missing_columns_policy(mut self, policy: Option<MissingColumnsPolicy>) -> Self {
342 self.missing_columns_policy = policy;
343 self
344 }
345}
346
347impl LazyFileListReader for LazyCsvReader {
348 fn finish(self) -> PolarsResult<LazyFrame> {
350 let rechunk = self.rechunk();
351 let row_index = self.row_index().cloned();
352 let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
353
354 let missing_columns_policy = self.missing_columns_policy.unwrap_or_default();
355
356 let lf: LazyFrame = DslBuilder::scan_csv(
357 self.sources,
358 self.read_options,
359 UnifiedScanArgs {
360 schema: None,
361 cloud_options: self.cloud_options,
362 hive_options: HiveOptions::new_disabled(),
363 rechunk,
364 cache: self.cache,
365 glob: self.glob,
366 hidden_file_prefix: None,
367 projection: None,
368 column_mapping: None,
369 default_values: None,
370 row_index,
371 pre_slice,
372 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
373 missing_columns_policy,
374 extra_columns_policy: ExtraColumnsPolicy::Raise,
375 include_file_paths: self.include_file_paths,
376 deletion_files: None,
377 table_statistics: None,
378 row_count: None,
379 },
380 )?
381 .build()
382 .into();
383 Ok(lf)
384 }
385
386 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
387 unreachable!();
388 }
389
390 fn glob(&self) -> bool {
391 self.glob
392 }
393
394 fn sources(&self) -> &ScanSources {
395 &self.sources
396 }
397
398 fn with_sources(mut self, sources: ScanSources) -> Self {
399 self.sources = sources;
400 self
401 }
402
403 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
404 self.read_options.n_rows = n_rows.into();
405 self
406 }
407
408 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
409 self.read_options.row_index = row_index.into();
410 self
411 }
412
413 fn rechunk(&self) -> bool {
414 self.read_options.rechunk
415 }
416
417 fn with_rechunk(mut self, rechunk: bool) -> Self {
419 self.read_options.rechunk = rechunk;
420 self
421 }
422
423 fn n_rows(&self) -> Option<usize> {
426 self.read_options.n_rows
427 }
428
429 fn row_index(&self) -> Option<&RowIndex> {
431 self.read_options.row_index.as_ref()
432 }
433
434 fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
435 let args = UnionArgs {
437 rechunk: self.rechunk(),
438 parallel: false,
439 to_supertypes: false,
440 from_partitioned_ds: true,
441 ..Default::default()
442 };
443 concat_impl(&lfs, args)
444 }
445
446 fn cloud_options(&self) -> Option<&CloudOptions> {
448 self.cloud_options.as_ref()
449 }
450}