polars_lazy/scan/
csv.rs

1#[cfg(feature = "csv")]
2use arrow::buffer::Buffer;
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::csv::read::{
6    CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, infer_file_schema,
7};
8use polars_io::path_utils::expand_paths;
9use polars_io::utils::compression::maybe_decompress_bytes;
10use polars_io::utils::get_reader_bytes;
11use polars_io::{HiveOptions, RowIndex};
12use polars_utils::mmap::MemSlice;
13use polars_utils::plpath::PlPath;
14use polars_utils::slice_enum::Slice;
15
16use crate::prelude::*;
17
18#[derive(Clone)]
19#[cfg(feature = "csv")]
20pub struct LazyCsvReader {
21    sources: ScanSources,
22    glob: bool,
23    cache: bool,
24    read_options: CsvReadOptions,
25    cloud_options: Option<CloudOptions>,
26    include_file_paths: Option<PlSmallStr>,
27}
28
29#[cfg(feature = "csv")]
30impl LazyCsvReader {
31    /// Re-export to shorten code.
32    pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
33        mut self,
34        map_func: F,
35    ) -> Self {
36        self.read_options = self.read_options.map_parse_options(map_func);
37        self
38    }
39
40    pub fn new_paths(paths: Buffer<PlPath>) -> Self {
41        Self::new_with_sources(ScanSources::Paths(paths))
42    }
43
44    pub fn new_with_sources(sources: ScanSources) -> Self {
45        LazyCsvReader {
46            sources,
47            glob: true,
48            cache: true,
49            read_options: Default::default(),
50            cloud_options: Default::default(),
51            include_file_paths: None,
52        }
53    }
54
55    pub fn new(path: PlPath) -> Self {
56        Self::new_with_sources(ScanSources::Paths(Buffer::from_iter([path])))
57    }
58
59    /// Skip this number of rows after the header location.
60    #[must_use]
61    pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
62        self.read_options.skip_rows_after_header = offset;
63        self
64    }
65
66    /// Add a row index column.
67    #[must_use]
68    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
69        self.read_options.row_index = row_index;
70        self
71    }
72
73    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
74    /// be guaranteed.
75    #[must_use]
76    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
77        self.read_options.n_rows = num_rows;
78        self
79    }
80
81    /// Set the number of rows to use when inferring the csv schema.
82    /// The default is 100 rows.
83    /// Setting to [None] will do a full table scan, which is very slow.
84    #[must_use]
85    pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
86        self.read_options.infer_schema_length = num_rows;
87        self
88    }
89
90    /// Continue with next batch when a ParserError is encountered.
91    #[must_use]
92    pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
93        self.read_options.ignore_errors = ignore;
94        self
95    }
96
97    /// Set the CSV file's schema
98    #[must_use]
99    pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
100        self.read_options.schema = schema;
101        self
102    }
103
104    /// Skip the first `n` rows during parsing. The header will be parsed at row `n`.
105    /// Note that by row we mean valid CSV, encoding and comments are respected.
106    #[must_use]
107    pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
108        self.read_options.skip_rows = skip_rows;
109        self
110    }
111
112    /// Skip the first `n` lines during parsing. The header will be parsed at line `n`.
113    /// We don't respect CSV escaping when skipping lines.
114    #[must_use]
115    pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
116        self.read_options.skip_lines = skip_lines;
117        self
118    }
119
120    /// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
121    /// of the total schema.
122    #[must_use]
123    pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
124        self.read_options.schema_overwrite = schema;
125        self
126    }
127
128    /// Set whether the CSV file has headers
129    #[must_use]
130    pub fn with_has_header(mut self, has_header: bool) -> Self {
131        self.read_options.has_header = has_header;
132        self
133    }
134
135    /// Sets the chunk size used by the parser. This influences performance.
136    /// This can be used as a way to reduce memory usage during the parsing at the cost of performance.
137    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
138        self.read_options.chunk_size = chunk_size;
139        self
140    }
141
142    /// Set the CSV file's column separator as a byte character
143    #[must_use]
144    pub fn with_separator(self, separator: u8) -> Self {
145        self.map_parse_options(|opts| opts.with_separator(separator))
146    }
147
148    /// Set the comment prefix for this instance. Lines starting with this prefix will be ignored.
149    #[must_use]
150    pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
151        self.map_parse_options(|opts| {
152            opts.with_comment_prefix(comment_prefix.clone().map(|s| {
153                if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
154                    CommentPrefix::Single(s.as_bytes()[0])
155                } else {
156                    CommentPrefix::Multi(s)
157                }
158            }))
159        })
160    }
161
162    /// Set the `char` used as quote char. The default is `b'"'`. If set to [`None`] quoting is disabled.
163    #[must_use]
164    pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
165        self.map_parse_options(|opts| opts.with_quote_char(quote_char))
166    }
167
168    /// Set the `char` used as end of line. The default is `b'\n'`.
169    #[must_use]
170    pub fn with_eol_char(self, eol_char: u8) -> Self {
171        self.map_parse_options(|opts| opts.with_eol_char(eol_char))
172    }
173
174    /// Set values that will be interpreted as missing/ null.
175    #[must_use]
176    pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
177        self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
178    }
179
180    /// Treat missing fields as null.
181    pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
182        self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
183    }
184
185    /// Cache the DataFrame after reading.
186    #[must_use]
187    pub fn with_cache(mut self, cache: bool) -> Self {
188        self.cache = cache;
189        self
190    }
191
192    /// Reduce memory usage at the expense of performance
193    #[must_use]
194    pub fn with_low_memory(mut self, low_memory: bool) -> Self {
195        self.read_options.low_memory = low_memory;
196        self
197    }
198
199    /// Set  [`CsvEncoding`]
200    #[must_use]
201    pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
202        self.map_parse_options(|opts| opts.with_encoding(encoding))
203    }
204
205    /// Automatically try to parse dates/datetimes and time.
206    /// If parsing fails, columns remain of dtype [`DataType::String`].
207    #[cfg(feature = "temporal")]
208    pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
209        self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
210    }
211
212    /// Raise an error if CSV is empty (otherwise return an empty frame)
213    #[must_use]
214    pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
215        self.read_options.raise_if_empty = raise_if_empty;
216        self
217    }
218
219    /// Truncate lines that are longer than the schema.
220    #[must_use]
221    pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
222        self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
223    }
224
225    #[must_use]
226    pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
227        self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
228    }
229
230    #[must_use]
231    /// Expand path given via globbing rules.
232    pub fn with_glob(mut self, toggle: bool) -> Self {
233        self.glob = toggle;
234        self
235    }
236
237    pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
238        self.cloud_options = cloud_options;
239        self
240    }
241
242    /// Modify a schema before we run the lazy scanning.
243    ///
244    /// Important! Run this function latest in the builder!
245    pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
246    where
247        F: Fn(Schema) -> PolarsResult<Schema>,
248    {
249        let n_threads = self.read_options.n_threads;
250
251        let infer_schema = |bytes: MemSlice| {
252            let skip_rows = self.read_options.skip_rows;
253            let skip_lines = self.read_options.skip_lines;
254            let parse_options = self.read_options.get_parse_options();
255
256            let mut owned = vec![];
257            let bytes = maybe_decompress_bytes(bytes.as_ref(), &mut owned)?;
258
259            PolarsResult::Ok(
260                infer_file_schema(
261                    &get_reader_bytes(&mut std::io::Cursor::new(bytes))?,
262                    &parse_options,
263                    self.read_options.infer_schema_length,
264                    self.read_options.has_header,
265                    // we set it to None and modify them after the schema is updated
266                    None,
267                    skip_rows,
268                    skip_lines,
269                    self.read_options.skip_rows_after_header,
270                    self.read_options.raise_if_empty,
271                )?
272                .0,
273            )
274        };
275
276        let schema = match self.sources.clone() {
277            ScanSources::Paths(paths) => {
278                // TODO: Path expansion should happen when converting to the IR
279                // https://github.com/pola-rs/polars/issues/17634
280                let paths = expand_paths(
281                    &paths[..],
282                    self.glob(),
283                    &[], // hidden_file_prefix
284                    &mut self.cloud_options,
285                )?;
286
287                let Some(path) = paths.first() else {
288                    polars_bail!(ComputeError: "no paths specified for this reader");
289                };
290
291                infer_schema(MemSlice::from_file(&polars_utils::open_file(
292                    path.as_ref().as_local_path().unwrap(),
293                )?)?)?
294            },
295            ScanSources::Files(files) => {
296                let Some(file) = files.first() else {
297                    polars_bail!(ComputeError: "no buffers specified for this reader");
298                };
299
300                infer_schema(MemSlice::from_file(file)?)?
301            },
302            ScanSources::Buffers(buffers) => {
303                let Some(buffer) = buffers.first() else {
304                    polars_bail!(ComputeError: "no buffers specified for this reader");
305                };
306
307                infer_schema(buffer.clone())?
308            },
309        };
310
311        self.read_options.n_threads = n_threads;
312        let mut schema = f(schema)?;
313
314        // the dtypes set may be for the new names, so update again
315        if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
316            for (name, dtype) in overwrite_schema.iter() {
317                schema.with_column(name.clone(), dtype.clone());
318            }
319        }
320
321        Ok(self.with_schema(Some(Arc::new(schema))))
322    }
323
324    pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
325        self.include_file_paths = include_file_paths;
326        self
327    }
328}
329
330impl LazyFileListReader for LazyCsvReader {
331    /// Get the final [LazyFrame].
332    fn finish(self) -> PolarsResult<LazyFrame> {
333        let rechunk = self.rechunk();
334        let row_index = self.row_index().cloned();
335        let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
336
337        let lf: LazyFrame = DslBuilder::scan_csv(
338            self.sources,
339            self.read_options,
340            UnifiedScanArgs {
341                schema: None,
342                cloud_options: self.cloud_options,
343                hive_options: HiveOptions::new_disabled(),
344                rechunk,
345                cache: self.cache,
346                glob: self.glob,
347                hidden_file_prefix: None,
348                projection: None,
349                column_mapping: None,
350                default_values: None,
351                row_index,
352                pre_slice,
353                cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
354                missing_columns_policy: MissingColumnsPolicy::Raise,
355                extra_columns_policy: ExtraColumnsPolicy::Raise,
356                include_file_paths: self.include_file_paths,
357                deletion_files: None,
358                table_statistics: None,
359            },
360        )?
361        .build()
362        .into();
363        Ok(lf)
364    }
365
366    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
367        unreachable!();
368    }
369
370    fn glob(&self) -> bool {
371        self.glob
372    }
373
374    fn sources(&self) -> &ScanSources {
375        &self.sources
376    }
377
378    fn with_sources(mut self, sources: ScanSources) -> Self {
379        self.sources = sources;
380        self
381    }
382
383    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
384        self.read_options.n_rows = n_rows.into();
385        self
386    }
387
388    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
389        self.read_options.row_index = row_index.into();
390        self
391    }
392
393    fn rechunk(&self) -> bool {
394        self.read_options.rechunk
395    }
396
397    /// Rechunk the memory to contiguous chunks when parsing is done.
398    fn with_rechunk(mut self, rechunk: bool) -> Self {
399        self.read_options.rechunk = rechunk;
400        self
401    }
402
403    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
404    /// be guaranteed.
405    fn n_rows(&self) -> Option<usize> {
406        self.read_options.n_rows
407    }
408
409    /// Return the row index settings.
410    fn row_index(&self) -> Option<&RowIndex> {
411        self.read_options.row_index.as_ref()
412    }
413
414    fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
415        // set to false, as the csv parser has full thread utilization
416        let args = UnionArgs {
417            rechunk: self.rechunk(),
418            parallel: false,
419            to_supertypes: false,
420            from_partitioned_ds: true,
421            ..Default::default()
422        };
423        concat_impl(&lfs, args)
424    }
425
426    /// [CloudOptions] used to list files.
427    fn cloud_options(&self) -> Option<&CloudOptions> {
428        self.cloud_options.as_ref()
429    }
430}