Skip to main content

polars_lazy/scan/
csv.rs

1#[cfg(feature = "csv")]
2use polars_buffer::Buffer;
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::csv::read::{
6    CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues,
7};
8use polars_io::path_utils::expand_paths;
9use polars_io::{HiveOptions, RowIndex};
10use polars_utils::mmap::MMapSemaphore;
11use polars_utils::pl_path::PlRefPath;
12use polars_utils::slice_enum::Slice;
13
14use crate::prelude::*;
15
16#[derive(Clone)]
17#[cfg(feature = "csv")]
18pub struct LazyCsvReader {
19    sources: ScanSources,
20    glob: bool,
21    cache: bool,
22    read_options: CsvReadOptions,
23    cloud_options: Option<CloudOptions>,
24    include_file_paths: Option<PlSmallStr>,
25}
26
27#[cfg(feature = "csv")]
28impl LazyCsvReader {
29    /// Re-export to shorten code.
30    pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
31        mut self,
32        map_func: F,
33    ) -> Self {
34        self.read_options = self.read_options.map_parse_options(map_func);
35        self
36    }
37
38    pub fn new_paths(paths: Buffer<PlRefPath>) -> Self {
39        Self::new_with_sources(ScanSources::Paths(paths))
40    }
41
42    pub fn new_with_sources(sources: ScanSources) -> Self {
43        LazyCsvReader {
44            sources,
45            glob: true,
46            cache: true,
47            read_options: Default::default(),
48            cloud_options: Default::default(),
49            include_file_paths: None,
50        }
51    }
52
53    pub fn new(path: PlRefPath) -> Self {
54        Self::new_with_sources(ScanSources::Paths(Buffer::from_iter([path])))
55    }
56
57    /// Skip this number of rows after the header location.
58    #[must_use]
59    pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
60        self.read_options.skip_rows_after_header = offset;
61        self
62    }
63
64    /// Add a row index column.
65    #[must_use]
66    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
67        self.read_options.row_index = row_index;
68        self
69    }
70
71    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
72    /// be guaranteed.
73    #[must_use]
74    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
75        self.read_options.n_rows = num_rows;
76        self
77    }
78
79    /// Sets the number of threads used for CSV parsing.
80    #[must_use]
81    pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
82        self.read_options.n_threads = n_threads;
83        self
84    }
85
86    /// Set the number of rows to use when inferring the csv schema.
87    /// The default is 100 rows.
88    /// Setting to [None] will do a full table scan, which is very slow.
89    #[must_use]
90    pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
91        self.read_options.infer_schema_length = num_rows;
92        self
93    }
94
95    /// Continue with next batch when a ParserError is encountered.
96    #[must_use]
97    pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
98        self.read_options.ignore_errors = ignore;
99        self
100    }
101
102    /// Set the CSV file's schema
103    #[must_use]
104    pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
105        self.read_options.schema = schema;
106        self
107    }
108
109    /// Skip the first `n` rows during parsing. The header will be parsed at row `n`.
110    /// Note that by row we mean valid CSV, encoding and comments are respected.
111    #[must_use]
112    pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
113        self.read_options.skip_rows = skip_rows;
114        self
115    }
116
117    /// Skip the first `n` lines during parsing. The header will be parsed at line `n`.
118    /// We don't respect CSV escaping when skipping lines.
119    #[must_use]
120    pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
121        self.read_options.skip_lines = skip_lines;
122        self
123    }
124
125    /// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
126    /// of the total schema.
127    #[must_use]
128    pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
129        self.read_options.schema_overwrite = schema;
130        self
131    }
132
133    /// Set whether the CSV file has headers
134    #[must_use]
135    pub fn with_has_header(mut self, has_header: bool) -> Self {
136        self.read_options.has_header = has_header;
137        self
138    }
139
140    /// Sets the chunk size used by the parser. This influences performance.
141    /// This can be used as a way to reduce memory usage during the parsing at the cost of performance.
142    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
143        self.read_options.chunk_size = chunk_size;
144        self
145    }
146
147    /// Set the CSV file's column separator as a byte character
148    #[must_use]
149    pub fn with_separator(self, separator: u8) -> Self {
150        self.map_parse_options(|opts| opts.with_separator(separator))
151    }
152
153    /// Set the comment prefix for this instance. Lines starting with this prefix will be ignored.
154    #[must_use]
155    pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
156        self.map_parse_options(|opts| {
157            opts.with_comment_prefix(comment_prefix.clone().map(|s| {
158                if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
159                    CommentPrefix::Single(s.as_bytes()[0])
160                } else {
161                    CommentPrefix::Multi(s)
162                }
163            }))
164        })
165    }
166
167    /// Set the `char` used as quote char. The default is `b'"'`. If set to [`None`] quoting is disabled.
168    #[must_use]
169    pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
170        self.map_parse_options(|opts| opts.with_quote_char(quote_char))
171    }
172
173    /// Set the `char` used as end of line. The default is `b'\n'`.
174    #[must_use]
175    pub fn with_eol_char(self, eol_char: u8) -> Self {
176        self.map_parse_options(|opts| opts.with_eol_char(eol_char))
177    }
178
179    /// Set values that will be interpreted as missing/ null.
180    #[must_use]
181    pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
182        self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
183    }
184
185    /// Treat missing fields as null.
186    pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
187        self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
188    }
189
190    /// Cache the DataFrame after reading.
191    #[must_use]
192    pub fn with_cache(mut self, cache: bool) -> Self {
193        self.cache = cache;
194        self
195    }
196
197    /// Reduce memory usage at the expense of performance
198    #[must_use]
199    pub fn with_low_memory(mut self, low_memory: bool) -> Self {
200        self.read_options.low_memory = low_memory;
201        self
202    }
203
204    /// Set  [`CsvEncoding`]
205    #[must_use]
206    pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
207        self.map_parse_options(|opts| opts.with_encoding(encoding))
208    }
209
210    /// Automatically try to parse dates/datetimes and time.
211    /// If parsing fails, columns remain of dtype [`DataType::String`].
212    #[cfg(feature = "temporal")]
213    pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
214        self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
215    }
216
217    /// Raise an error if CSV is empty (otherwise return an empty frame)
218    #[must_use]
219    pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
220        self.read_options.raise_if_empty = raise_if_empty;
221        self
222    }
223
224    /// Truncate lines that are longer than the schema.
225    #[must_use]
226    pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
227        self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
228    }
229
230    #[must_use]
231    pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
232        self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
233    }
234
235    #[must_use]
236    /// Expand path given via globbing rules.
237    pub fn with_glob(mut self, toggle: bool) -> Self {
238        self.glob = toggle;
239        self
240    }
241
242    pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
243        self.cloud_options = cloud_options;
244        self
245    }
246
247    /// Modify a schema before we run the lazy scanning.
248    ///
249    /// Important! Run this function latest in the builder!
250    pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
251    where
252        F: Fn(Schema) -> PolarsResult<Schema>,
253    {
254        const ASSUMED_COMPRESSION_RATIO: usize = 4;
255        let n_threads = self.read_options.n_threads;
256
257        let infer_schema = |bytes: Buffer<u8>| {
258            use polars_io::prelude::streaming::read_until_start_and_infer_schema;
259            use polars_io::utils::compression::ByteSourceReader;
260
261            let bytes_len = bytes.len();
262            let mut reader = ByteSourceReader::from_memory(bytes)?;
263            let decompressed_size_hint = Some(
264                bytes_len
265                    * reader
266                        .compression()
267                        .map_or(1, |_| ASSUMED_COMPRESSION_RATIO),
268            );
269
270            let (inferred_schema, _) = read_until_start_and_infer_schema(
271                &self.read_options,
272                None,
273                decompressed_size_hint,
274                None,
275                &mut reader,
276            )?;
277
278            PolarsResult::Ok(inferred_schema)
279        };
280
281        let schema = match self.sources.clone() {
282            ScanSources::Paths(paths) => {
283                // TODO: Path expansion should happen when converting to the IR
284                // https://github.com/pola-rs/polars/issues/17634
285
286                use polars_io::pl_async::get_runtime;
287
288                let paths = get_runtime().block_on(expand_paths(
289                    &paths[..],
290                    self.glob(),
291                    &[], // hidden_file_prefix
292                    &mut self.cloud_options,
293                ))?;
294
295                let Some(path) = paths.first() else {
296                    polars_bail!(ComputeError: "no paths specified for this reader");
297                };
298
299                let file = polars_utils::open_file(path.as_std_path())?;
300                let mmap = MMapSemaphore::new_from_file(&file)?;
301                infer_schema(Buffer::from_owner(mmap))?
302            },
303            ScanSources::Files(files) => {
304                let Some(file) = files.first() else {
305                    polars_bail!(ComputeError: "no buffers specified for this reader");
306                };
307
308                let mmap = MMapSemaphore::new_from_file(file)?;
309                infer_schema(Buffer::from_owner(mmap))?
310            },
311            ScanSources::Buffers(buffers) => {
312                let Some(buffer) = buffers.first() else {
313                    polars_bail!(ComputeError: "no buffers specified for this reader");
314                };
315
316                infer_schema(buffer.clone())?
317            },
318        };
319
320        self.read_options.n_threads = n_threads;
321        let mut schema = f(schema)?;
322
323        // the dtypes set may be for the new names, so update again
324        if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
325            for (name, dtype) in overwrite_schema.iter() {
326                schema.with_column(name.clone(), dtype.clone());
327            }
328        }
329
330        Ok(self.with_schema(Some(Arc::new(schema))))
331    }
332
333    pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
334        self.include_file_paths = include_file_paths;
335        self
336    }
337}
338
339impl LazyFileListReader for LazyCsvReader {
340    /// Get the final [LazyFrame].
341    fn finish(self) -> PolarsResult<LazyFrame> {
342        let rechunk = self.rechunk();
343        let row_index = self.row_index().cloned();
344        let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
345
346        let lf: LazyFrame = DslBuilder::scan_csv(
347            self.sources,
348            self.read_options,
349            UnifiedScanArgs {
350                schema: None,
351                cloud_options: self.cloud_options,
352                hive_options: HiveOptions::new_disabled(),
353                rechunk,
354                cache: self.cache,
355                glob: self.glob,
356                hidden_file_prefix: None,
357                projection: None,
358                column_mapping: None,
359                default_values: None,
360                row_index,
361                pre_slice,
362                cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
363                missing_columns_policy: MissingColumnsPolicy::Raise,
364                extra_columns_policy: ExtraColumnsPolicy::Raise,
365                include_file_paths: self.include_file_paths,
366                deletion_files: None,
367                table_statistics: None,
368                row_count: None,
369            },
370        )?
371        .build()
372        .into();
373        Ok(lf)
374    }
375
376    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
377        unreachable!();
378    }
379
380    fn glob(&self) -> bool {
381        self.glob
382    }
383
384    fn sources(&self) -> &ScanSources {
385        &self.sources
386    }
387
388    fn with_sources(mut self, sources: ScanSources) -> Self {
389        self.sources = sources;
390        self
391    }
392
393    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
394        self.read_options.n_rows = n_rows.into();
395        self
396    }
397
398    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
399        self.read_options.row_index = row_index.into();
400        self
401    }
402
403    fn rechunk(&self) -> bool {
404        self.read_options.rechunk
405    }
406
407    /// Rechunk the memory to contiguous chunks when parsing is done.
408    fn with_rechunk(mut self, rechunk: bool) -> Self {
409        self.read_options.rechunk = rechunk;
410        self
411    }
412
413    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
414    /// be guaranteed.
415    fn n_rows(&self) -> Option<usize> {
416        self.read_options.n_rows
417    }
418
419    /// Return the row index settings.
420    fn row_index(&self) -> Option<&RowIndex> {
421        self.read_options.row_index.as_ref()
422    }
423
424    fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
425        // set to false, as the csv parser has full thread utilization
426        let args = UnionArgs {
427            rechunk: self.rechunk(),
428            parallel: false,
429            to_supertypes: false,
430            from_partitioned_ds: true,
431            ..Default::default()
432        };
433        concat_impl(&lfs, args)
434    }
435
436    /// [CloudOptions] used to list files.
437    fn cloud_options(&self) -> Option<&CloudOptions> {
438        self.cloud_options.as_ref()
439    }
440}