Skip to main content

polars_lazy/scan/
csv.rs

1#[cfg(feature = "csv")]
2use polars_buffer::Buffer;
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::csv::read::{
6    CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues,
7};
8use polars_io::path_utils::expand_paths;
9use polars_io::{HiveOptions, RowIndex};
10use polars_utils::mmap::MMapSemaphore;
11use polars_utils::pl_path::PlRefPath;
12use polars_utils::slice_enum::Slice;
13
14use crate::prelude::*;
15
16#[derive(Clone)]
17#[cfg(feature = "csv")]
18pub struct LazyCsvReader {
19    sources: ScanSources,
20    glob: bool,
21    cache: bool,
22    read_options: CsvReadOptions,
23    cloud_options: Option<CloudOptions>,
24    include_file_paths: Option<PlSmallStr>,
25    missing_columns_policy: Option<MissingColumnsPolicy>,
26}
27
28#[cfg(feature = "csv")]
29impl LazyCsvReader {
30    /// Re-export to shorten code.
31    pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
32        mut self,
33        map_func: F,
34    ) -> Self {
35        self.read_options = self.read_options.map_parse_options(map_func);
36        self
37    }
38
39    pub fn new_paths(paths: Buffer<PlRefPath>) -> Self {
40        Self::new_with_sources(ScanSources::Paths(paths))
41    }
42
43    pub fn new_with_sources(sources: ScanSources) -> Self {
44        LazyCsvReader {
45            sources,
46            glob: true,
47            cache: true,
48            read_options: Default::default(),
49            cloud_options: Default::default(),
50            include_file_paths: None,
51            missing_columns_policy: None,
52        }
53    }
54
55    pub fn new(path: PlRefPath) -> Self {
56        Self::new_with_sources(ScanSources::Paths(Buffer::from_iter([path])))
57    }
58
59    /// Skip this number of rows after the header location.
60    #[must_use]
61    pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
62        self.read_options.skip_rows_after_header = offset;
63        self
64    }
65
66    /// Add a row index column.
67    #[must_use]
68    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
69        self.read_options.row_index = row_index;
70        self
71    }
72
73    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
74    /// be guaranteed.
75    #[must_use]
76    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
77        self.read_options.n_rows = num_rows;
78        self
79    }
80
81    /// Sets the number of threads used for CSV parsing.
82    #[must_use]
83    pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
84        self.read_options.n_threads = n_threads;
85        self
86    }
87
88    /// Set the number of rows to use when inferring the csv schema.
89    /// The default is 100 rows.
90    /// Setting to [None] will do a full table scan, which is very slow.
91    #[must_use]
92    pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
93        self.read_options.infer_schema_length = num_rows;
94        self
95    }
96
97    /// Continue with next batch when a ParserError is encountered.
98    #[must_use]
99    pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
100        self.read_options.ignore_errors = ignore;
101        self
102    }
103
104    /// Set the CSV file's schema
105    #[must_use]
106    pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
107        self.read_options.schema = schema;
108        self
109    }
110
111    /// Skip the first `n` rows during parsing. The header will be parsed at row `n`.
112    /// Note that by row we mean valid CSV, encoding and comments are respected.
113    #[must_use]
114    pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
115        self.read_options.skip_rows = skip_rows;
116        self
117    }
118
119    /// Skip the first `n` lines during parsing. The header will be parsed at line `n`.
120    /// We don't respect CSV escaping when skipping lines.
121    #[must_use]
122    pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
123        self.read_options.skip_lines = skip_lines;
124        self
125    }
126
127    /// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
128    /// of the total schema.
129    #[must_use]
130    pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
131        self.read_options.schema_overwrite = schema;
132        self
133    }
134
135    /// Set whether the CSV file has headers
136    #[must_use]
137    pub fn with_has_header(mut self, has_header: bool) -> Self {
138        self.read_options.has_header = has_header;
139        self
140    }
141
142    /// Sets the chunk size used by the parser. This influences performance.
143    /// This can be used as a way to reduce memory usage during the parsing at the cost of performance.
144    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
145        self.read_options.chunk_size = chunk_size;
146        self
147    }
148
149    /// Set the CSV file's column separator as a byte character
150    #[must_use]
151    pub fn with_separator(self, separator: u8) -> Self {
152        self.map_parse_options(|opts| opts.with_separator(separator))
153    }
154
155    /// Set the comment prefix for this instance. Lines starting with this prefix will be ignored.
156    #[must_use]
157    pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
158        self.map_parse_options(|opts| {
159            opts.with_comment_prefix(comment_prefix.clone().map(|s| {
160                if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
161                    CommentPrefix::Single(s.as_bytes()[0])
162                } else {
163                    CommentPrefix::Multi(s)
164                }
165            }))
166        })
167    }
168
169    /// Set the `char` used as quote char. The default is `b'"'`. If set to [`None`] quoting is disabled.
170    #[must_use]
171    pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
172        self.map_parse_options(|opts| opts.with_quote_char(quote_char))
173    }
174
175    /// Set the `char` used as end of line. The default is `b'\n'`.
176    #[must_use]
177    pub fn with_eol_char(self, eol_char: u8) -> Self {
178        self.map_parse_options(|opts| opts.with_eol_char(eol_char))
179    }
180
181    /// Set values that will be interpreted as missing/ null.
182    #[must_use]
183    pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
184        self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
185    }
186
187    /// Treat missing fields as null.
188    pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
189        self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
190    }
191
192    /// Cache the DataFrame after reading.
193    #[must_use]
194    pub fn with_cache(mut self, cache: bool) -> Self {
195        self.cache = cache;
196        self
197    }
198
199    /// Reduce memory usage at the expense of performance
200    #[must_use]
201    pub fn with_low_memory(mut self, low_memory: bool) -> Self {
202        self.read_options.low_memory = low_memory;
203        self
204    }
205
206    /// Set  [`CsvEncoding`]
207    #[must_use]
208    pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
209        self.map_parse_options(|opts| opts.with_encoding(encoding))
210    }
211
212    /// Automatically try to parse dates/datetimes and time.
213    /// If parsing fails, columns remain of dtype [`DataType::String`].
214    #[cfg(feature = "temporal")]
215    pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
216        self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
217    }
218
219    /// Raise an error if CSV is empty (otherwise return an empty frame)
220    #[must_use]
221    pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
222        self.read_options.raise_if_empty = raise_if_empty;
223        self
224    }
225
226    /// Truncate lines that are longer than the schema.
227    #[must_use]
228    pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
229        self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
230    }
231
232    #[must_use]
233    pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
234        self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
235    }
236
237    #[must_use]
238    /// Expand path given via globbing rules.
239    pub fn with_glob(mut self, toggle: bool) -> Self {
240        self.glob = toggle;
241        self
242    }
243
244    pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
245        self.cloud_options = cloud_options;
246        self
247    }
248
249    /// Modify a schema before we run the lazy scanning.
250    ///
251    /// Important! Run this function latest in the builder!
252    pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
253    where
254        F: Fn(Schema) -> PolarsResult<Schema>,
255    {
256        const ASSUMED_COMPRESSION_RATIO: usize = 4;
257        let n_threads = self.read_options.n_threads;
258
259        let infer_schema = |bytes: Buffer<u8>| {
260            use polars_io::prelude::streaming::read_until_start_and_infer_schema;
261            use polars_io::utils::compression::ByteSourceReader;
262
263            let bytes_len = bytes.len();
264            let mut reader = ByteSourceReader::from_memory(bytes)?;
265            let decompressed_size_hint = Some(
266                bytes_len
267                    * reader
268                        .compression()
269                        .map_or(1, |_| ASSUMED_COMPRESSION_RATIO),
270            );
271
272            let (inferred_schema, _) = read_until_start_and_infer_schema(
273                &self.read_options,
274                None,
275                decompressed_size_hint,
276                None,
277                &mut reader,
278            )?;
279
280            PolarsResult::Ok(inferred_schema)
281        };
282
283        let schema = match self.sources.clone() {
284            ScanSources::Paths(paths) => {
285                // TODO: Path expansion should happen when converting to the IR
286                // https://github.com/pola-rs/polars/issues/17634
287
288                use polars_io::pl_async::get_runtime;
289
290                let paths = get_runtime().block_on(expand_paths(
291                    &paths[..],
292                    self.glob(),
293                    &[], // hidden_file_prefix
294                    &mut self.cloud_options,
295                ))?;
296
297                let Some(path) = paths.first() else {
298                    polars_bail!(ComputeError: "no paths specified for this reader");
299                };
300
301                let file = polars_utils::open_file(path.as_std_path())?;
302                let mmap = MMapSemaphore::new_from_file(&file)?;
303                infer_schema(Buffer::from_owner(mmap))?
304            },
305            ScanSources::Files(files) => {
306                let Some(file) = files.first() else {
307                    polars_bail!(ComputeError: "no buffers specified for this reader");
308                };
309
310                let mmap = MMapSemaphore::new_from_file(file)?;
311                infer_schema(Buffer::from_owner(mmap))?
312            },
313            ScanSources::Buffers(buffers) => {
314                let Some(buffer) = buffers.first() else {
315                    polars_bail!(ComputeError: "no buffers specified for this reader");
316                };
317
318                infer_schema(buffer.clone())?
319            },
320        };
321
322        self.read_options.n_threads = n_threads;
323        let mut schema = f(schema)?;
324
325        // the dtypes set may be for the new names, so update again
326        if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
327            for (name, dtype) in overwrite_schema.iter() {
328                schema.with_column(name.clone(), dtype.clone());
329            }
330        }
331
332        Ok(self.with_schema(Some(Arc::new(schema))))
333    }
334
335    pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
336        self.include_file_paths = include_file_paths;
337        self
338    }
339
340    #[must_use]
341    pub fn with_missing_columns_policy(mut self, policy: Option<MissingColumnsPolicy>) -> Self {
342        self.missing_columns_policy = policy;
343        self
344    }
345}
346
347impl LazyFileListReader for LazyCsvReader {
348    /// Get the final [LazyFrame].
349    fn finish(self) -> PolarsResult<LazyFrame> {
350        let rechunk = self.rechunk();
351        let row_index = self.row_index().cloned();
352        let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
353
354        let missing_columns_policy = self.missing_columns_policy.unwrap_or_default();
355
356        let lf: LazyFrame = DslBuilder::scan_csv(
357            self.sources,
358            self.read_options,
359            UnifiedScanArgs {
360                schema: None,
361                cloud_options: self.cloud_options,
362                hive_options: HiveOptions::new_disabled(),
363                rechunk,
364                cache: self.cache,
365                glob: self.glob,
366                hidden_file_prefix: None,
367                projection: None,
368                column_mapping: None,
369                default_values: None,
370                row_index,
371                pre_slice,
372                cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
373                missing_columns_policy,
374                extra_columns_policy: ExtraColumnsPolicy::Raise,
375                include_file_paths: self.include_file_paths,
376                deletion_files: None,
377                table_statistics: None,
378                row_count: None,
379            },
380        )?
381        .build()
382        .into();
383        Ok(lf)
384    }
385
386    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
387        unreachable!();
388    }
389
390    fn glob(&self) -> bool {
391        self.glob
392    }
393
394    fn sources(&self) -> &ScanSources {
395        &self.sources
396    }
397
398    fn with_sources(mut self, sources: ScanSources) -> Self {
399        self.sources = sources;
400        self
401    }
402
403    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
404        self.read_options.n_rows = n_rows.into();
405        self
406    }
407
408    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
409        self.read_options.row_index = row_index.into();
410        self
411    }
412
413    fn rechunk(&self) -> bool {
414        self.read_options.rechunk
415    }
416
417    /// Rechunk the memory to contiguous chunks when parsing is done.
418    fn with_rechunk(mut self, rechunk: bool) -> Self {
419        self.read_options.rechunk = rechunk;
420        self
421    }
422
423    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
424    /// be guaranteed.
425    fn n_rows(&self) -> Option<usize> {
426        self.read_options.n_rows
427    }
428
429    /// Return the row index settings.
430    fn row_index(&self) -> Option<&RowIndex> {
431        self.read_options.row_index.as_ref()
432    }
433
434    fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
435        // set to false, as the csv parser has full thread utilization
436        let args = UnionArgs {
437            rechunk: self.rechunk(),
438            parallel: false,
439            to_supertypes: false,
440            from_partitioned_ds: true,
441            ..Default::default()
442        };
443        concat_impl(&lfs, args)
444    }
445
446    /// [CloudOptions] used to list files.
447    fn cloud_options(&self) -> Option<&CloudOptions> {
448        self.cloud_options.as_ref()
449    }
450}