1#[cfg(feature = "csv")]
2use polars_buffer::Buffer;
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::csv::read::{
6 CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues,
7};
8use polars_io::path_utils::expand_paths;
9use polars_io::{HiveOptions, RowIndex};
10use polars_utils::mmap::MMapSemaphore;
11use polars_utils::pl_path::PlRefPath;
12use polars_utils::slice_enum::Slice;
13
14use crate::prelude::*;
15
16#[derive(Clone)]
17#[cfg(feature = "csv")]
18pub struct LazyCsvReader {
19 sources: ScanSources,
20 glob: bool,
21 cache: bool,
22 read_options: CsvReadOptions,
23 cloud_options: Option<CloudOptions>,
24 include_file_paths: Option<PlSmallStr>,
25}
26
27#[cfg(feature = "csv")]
28impl LazyCsvReader {
29 pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
31 mut self,
32 map_func: F,
33 ) -> Self {
34 self.read_options = self.read_options.map_parse_options(map_func);
35 self
36 }
37
38 pub fn new_paths(paths: Buffer<PlRefPath>) -> Self {
39 Self::new_with_sources(ScanSources::Paths(paths))
40 }
41
42 pub fn new_with_sources(sources: ScanSources) -> Self {
43 LazyCsvReader {
44 sources,
45 glob: true,
46 cache: true,
47 read_options: Default::default(),
48 cloud_options: Default::default(),
49 include_file_paths: None,
50 }
51 }
52
53 pub fn new(path: PlRefPath) -> Self {
54 Self::new_with_sources(ScanSources::Paths(Buffer::from_iter([path])))
55 }
56
57 #[must_use]
59 pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
60 self.read_options.skip_rows_after_header = offset;
61 self
62 }
63
64 #[must_use]
66 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
67 self.read_options.row_index = row_index;
68 self
69 }
70
71 #[must_use]
74 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
75 self.read_options.n_rows = num_rows;
76 self
77 }
78
79 #[must_use]
81 pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
82 self.read_options.n_threads = n_threads;
83 self
84 }
85
86 #[must_use]
90 pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
91 self.read_options.infer_schema_length = num_rows;
92 self
93 }
94
95 #[must_use]
97 pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
98 self.read_options.ignore_errors = ignore;
99 self
100 }
101
102 #[must_use]
104 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
105 self.read_options.schema = schema;
106 self
107 }
108
109 #[must_use]
112 pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
113 self.read_options.skip_rows = skip_rows;
114 self
115 }
116
117 #[must_use]
120 pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
121 self.read_options.skip_lines = skip_lines;
122 self
123 }
124
125 #[must_use]
128 pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
129 self.read_options.schema_overwrite = schema;
130 self
131 }
132
133 #[must_use]
135 pub fn with_has_header(mut self, has_header: bool) -> Self {
136 self.read_options.has_header = has_header;
137 self
138 }
139
140 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
143 self.read_options.chunk_size = chunk_size;
144 self
145 }
146
147 #[must_use]
149 pub fn with_separator(self, separator: u8) -> Self {
150 self.map_parse_options(|opts| opts.with_separator(separator))
151 }
152
153 #[must_use]
155 pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
156 self.map_parse_options(|opts| {
157 opts.with_comment_prefix(comment_prefix.clone().map(|s| {
158 if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
159 CommentPrefix::Single(s.as_bytes()[0])
160 } else {
161 CommentPrefix::Multi(s)
162 }
163 }))
164 })
165 }
166
167 #[must_use]
169 pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
170 self.map_parse_options(|opts| opts.with_quote_char(quote_char))
171 }
172
173 #[must_use]
175 pub fn with_eol_char(self, eol_char: u8) -> Self {
176 self.map_parse_options(|opts| opts.with_eol_char(eol_char))
177 }
178
179 #[must_use]
181 pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
182 self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
183 }
184
185 pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
187 self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
188 }
189
190 #[must_use]
192 pub fn with_cache(mut self, cache: bool) -> Self {
193 self.cache = cache;
194 self
195 }
196
197 #[must_use]
199 pub fn with_low_memory(mut self, low_memory: bool) -> Self {
200 self.read_options.low_memory = low_memory;
201 self
202 }
203
204 #[must_use]
206 pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
207 self.map_parse_options(|opts| opts.with_encoding(encoding))
208 }
209
210 #[cfg(feature = "temporal")]
213 pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
214 self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
215 }
216
217 #[must_use]
219 pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
220 self.read_options.raise_if_empty = raise_if_empty;
221 self
222 }
223
224 #[must_use]
226 pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
227 self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
228 }
229
230 #[must_use]
231 pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
232 self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
233 }
234
235 #[must_use]
236 pub fn with_glob(mut self, toggle: bool) -> Self {
238 self.glob = toggle;
239 self
240 }
241
242 pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
243 self.cloud_options = cloud_options;
244 self
245 }
246
247 pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
251 where
252 F: Fn(Schema) -> PolarsResult<Schema>,
253 {
254 const ASSUMED_COMPRESSION_RATIO: usize = 4;
255 let n_threads = self.read_options.n_threads;
256
257 let infer_schema = |bytes: Buffer<u8>| {
258 use polars_io::prelude::streaming::read_until_start_and_infer_schema;
259 use polars_io::utils::compression::ByteSourceReader;
260
261 let bytes_len = bytes.len();
262 let mut reader = ByteSourceReader::from_memory(bytes)?;
263 let decompressed_size_hint = Some(
264 bytes_len
265 * reader
266 .compression()
267 .map_or(1, |_| ASSUMED_COMPRESSION_RATIO),
268 );
269
270 let (inferred_schema, _) = read_until_start_and_infer_schema(
271 &self.read_options,
272 None,
273 decompressed_size_hint,
274 None,
275 &mut reader,
276 )?;
277
278 PolarsResult::Ok(inferred_schema)
279 };
280
281 let schema = match self.sources.clone() {
282 ScanSources::Paths(paths) => {
283 use polars_io::pl_async::get_runtime;
287
288 let paths = get_runtime().block_on(expand_paths(
289 &paths[..],
290 self.glob(),
291 &[], &mut self.cloud_options,
293 ))?;
294
295 let Some(path) = paths.first() else {
296 polars_bail!(ComputeError: "no paths specified for this reader");
297 };
298
299 let file = polars_utils::open_file(path.as_std_path())?;
300 let mmap = MMapSemaphore::new_from_file(&file)?;
301 infer_schema(Buffer::from_owner(mmap))?
302 },
303 ScanSources::Files(files) => {
304 let Some(file) = files.first() else {
305 polars_bail!(ComputeError: "no buffers specified for this reader");
306 };
307
308 let mmap = MMapSemaphore::new_from_file(file)?;
309 infer_schema(Buffer::from_owner(mmap))?
310 },
311 ScanSources::Buffers(buffers) => {
312 let Some(buffer) = buffers.first() else {
313 polars_bail!(ComputeError: "no buffers specified for this reader");
314 };
315
316 infer_schema(buffer.clone())?
317 },
318 };
319
320 self.read_options.n_threads = n_threads;
321 let mut schema = f(schema)?;
322
323 if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
325 for (name, dtype) in overwrite_schema.iter() {
326 schema.with_column(name.clone(), dtype.clone());
327 }
328 }
329
330 Ok(self.with_schema(Some(Arc::new(schema))))
331 }
332
333 pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
334 self.include_file_paths = include_file_paths;
335 self
336 }
337}
338
339impl LazyFileListReader for LazyCsvReader {
340 fn finish(self) -> PolarsResult<LazyFrame> {
342 let rechunk = self.rechunk();
343 let row_index = self.row_index().cloned();
344 let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
345
346 let lf: LazyFrame = DslBuilder::scan_csv(
347 self.sources,
348 self.read_options,
349 UnifiedScanArgs {
350 schema: None,
351 cloud_options: self.cloud_options,
352 hive_options: HiveOptions::new_disabled(),
353 rechunk,
354 cache: self.cache,
355 glob: self.glob,
356 hidden_file_prefix: None,
357 projection: None,
358 column_mapping: None,
359 default_values: None,
360 row_index,
361 pre_slice,
362 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
363 missing_columns_policy: MissingColumnsPolicy::Raise,
364 extra_columns_policy: ExtraColumnsPolicy::Raise,
365 include_file_paths: self.include_file_paths,
366 deletion_files: None,
367 table_statistics: None,
368 row_count: None,
369 },
370 )?
371 .build()
372 .into();
373 Ok(lf)
374 }
375
376 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
377 unreachable!();
378 }
379
380 fn glob(&self) -> bool {
381 self.glob
382 }
383
384 fn sources(&self) -> &ScanSources {
385 &self.sources
386 }
387
388 fn with_sources(mut self, sources: ScanSources) -> Self {
389 self.sources = sources;
390 self
391 }
392
393 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
394 self.read_options.n_rows = n_rows.into();
395 self
396 }
397
398 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
399 self.read_options.row_index = row_index.into();
400 self
401 }
402
403 fn rechunk(&self) -> bool {
404 self.read_options.rechunk
405 }
406
407 fn with_rechunk(mut self, rechunk: bool) -> Self {
409 self.read_options.rechunk = rechunk;
410 self
411 }
412
413 fn n_rows(&self) -> Option<usize> {
416 self.read_options.n_rows
417 }
418
419 fn row_index(&self) -> Option<&RowIndex> {
421 self.read_options.row_index.as_ref()
422 }
423
424 fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
425 let args = UnionArgs {
427 rechunk: self.rechunk(),
428 parallel: false,
429 to_supertypes: false,
430 from_partitioned_ds: true,
431 ..Default::default()
432 };
433 concat_impl(&lfs, args)
434 }
435
436 fn cloud_options(&self) -> Option<&CloudOptions> {
438 self.cloud_options.as_ref()
439 }
440}