polars_lazy/scan/
csv.rs

1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::csv::read::{
6    CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, infer_file_schema,
7};
8use polars_io::path_utils::expand_paths;
9use polars_io::utils::compression::maybe_decompress_bytes;
10use polars_io::utils::get_reader_bytes;
11use polars_io::{HiveOptions, RowIndex};
12use polars_utils::mmap::MemSlice;
13use polars_utils::slice_enum::Slice;
14
15use crate::prelude::*;
16
17#[derive(Clone)]
18#[cfg(feature = "csv")]
19pub struct LazyCsvReader {
20    sources: ScanSources,
21    glob: bool,
22    cache: bool,
23    read_options: CsvReadOptions,
24    cloud_options: Option<CloudOptions>,
25    include_file_paths: Option<PlSmallStr>,
26}
27
28#[cfg(feature = "csv")]
29impl LazyCsvReader {
30    /// Re-export to shorten code.
31    pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
32        mut self,
33        map_func: F,
34    ) -> Self {
35        self.read_options = self.read_options.map_parse_options(map_func);
36        self
37    }
38
39    pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
40        Self::new_with_sources(ScanSources::Paths(paths))
41    }
42
43    pub fn new_with_sources(sources: ScanSources) -> Self {
44        LazyCsvReader {
45            sources,
46            glob: true,
47            cache: true,
48            read_options: Default::default(),
49            cloud_options: Default::default(),
50            include_file_paths: None,
51        }
52    }
53
54    pub fn new(path: impl AsRef<Path>) -> Self {
55        Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
56    }
57
58    /// Skip this number of rows after the header location.
59    #[must_use]
60    pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
61        self.read_options.skip_rows_after_header = offset;
62        self
63    }
64
65    /// Add a row index column.
66    #[must_use]
67    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
68        self.read_options.row_index = row_index;
69        self
70    }
71
72    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
73    /// be guaranteed.
74    #[must_use]
75    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
76        self.read_options.n_rows = num_rows;
77        self
78    }
79
80    /// Set the number of rows to use when inferring the csv schema.
81    /// The default is 100 rows.
82    /// Setting to [None] will do a full table scan, which is very slow.
83    #[must_use]
84    pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
85        self.read_options.infer_schema_length = num_rows;
86        self
87    }
88
89    /// Continue with next batch when a ParserError is encountered.
90    #[must_use]
91    pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
92        self.read_options.ignore_errors = ignore;
93        self
94    }
95
96    /// Set the CSV file's schema
97    #[must_use]
98    pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
99        self.read_options.schema = schema;
100        self
101    }
102
103    /// Skip the first `n` rows during parsing. The header will be parsed at row `n`.
104    /// Note that by row we mean valid CSV, encoding and comments are respected.
105    #[must_use]
106    pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
107        self.read_options.skip_rows = skip_rows;
108        self
109    }
110
111    /// Skip the first `n` lines during parsing. The header will be parsed at line `n`.
112    /// We don't respect CSV escaping when skipping lines.
113    #[must_use]
114    pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
115        self.read_options.skip_lines = skip_lines;
116        self
117    }
118
119    /// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
120    /// of the total schema.
121    #[must_use]
122    pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
123        self.read_options.schema_overwrite = schema;
124        self
125    }
126
127    /// Set whether the CSV file has headers
128    #[must_use]
129    pub fn with_has_header(mut self, has_header: bool) -> Self {
130        self.read_options.has_header = has_header;
131        self
132    }
133
134    /// Sets the chunk size used by the parser. This influences performance.
135    /// This can be used as a way to reduce memory usage during the parsing at the cost of performance.
136    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
137        self.read_options.chunk_size = chunk_size;
138        self
139    }
140
141    /// Set the CSV file's column separator as a byte character
142    #[must_use]
143    pub fn with_separator(self, separator: u8) -> Self {
144        self.map_parse_options(|opts| opts.with_separator(separator))
145    }
146
147    /// Set the comment prefix for this instance. Lines starting with this prefix will be ignored.
148    #[must_use]
149    pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
150        self.map_parse_options(|opts| {
151            opts.with_comment_prefix(comment_prefix.clone().map(|s| {
152                if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
153                    CommentPrefix::Single(s.as_bytes()[0])
154                } else {
155                    CommentPrefix::Multi(s)
156                }
157            }))
158        })
159    }
160
161    /// Set the `char` used as quote char. The default is `b'"'`. If set to [`None`] quoting is disabled.
162    #[must_use]
163    pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
164        self.map_parse_options(|opts| opts.with_quote_char(quote_char))
165    }
166
167    /// Set the `char` used as end of line. The default is `b'\n'`.
168    #[must_use]
169    pub fn with_eol_char(self, eol_char: u8) -> Self {
170        self.map_parse_options(|opts| opts.with_eol_char(eol_char))
171    }
172
173    /// Set values that will be interpreted as missing/ null.
174    #[must_use]
175    pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
176        self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
177    }
178
179    /// Treat missing fields as null.
180    pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
181        self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
182    }
183
184    /// Cache the DataFrame after reading.
185    #[must_use]
186    pub fn with_cache(mut self, cache: bool) -> Self {
187        self.cache = cache;
188        self
189    }
190
191    /// Reduce memory usage at the expense of performance
192    #[must_use]
193    pub fn with_low_memory(mut self, low_memory: bool) -> Self {
194        self.read_options.low_memory = low_memory;
195        self
196    }
197
198    /// Set  [`CsvEncoding`]
199    #[must_use]
200    pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
201        self.map_parse_options(|opts| opts.with_encoding(encoding))
202    }
203
204    /// Automatically try to parse dates/datetimes and time.
205    /// If parsing fails, columns remain of dtype [`DataType::String`].
206    #[cfg(feature = "temporal")]
207    pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
208        self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
209    }
210
211    /// Raise an error if CSV is empty (otherwise return an empty frame)
212    #[must_use]
213    pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
214        self.read_options.raise_if_empty = raise_if_empty;
215        self
216    }
217
218    /// Truncate lines that are longer than the schema.
219    #[must_use]
220    pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
221        self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
222    }
223
224    #[must_use]
225    pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
226        self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
227    }
228
229    #[must_use]
230    /// Expand path given via globbing rules.
231    pub fn with_glob(mut self, toggle: bool) -> Self {
232        self.glob = toggle;
233        self
234    }
235
236    pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
237        self.cloud_options = cloud_options;
238        self
239    }
240
241    /// Modify a schema before we run the lazy scanning.
242    ///
243    /// Important! Run this function latest in the builder!
244    pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
245    where
246        F: Fn(Schema) -> PolarsResult<Schema>,
247    {
248        let n_threads = self.read_options.n_threads;
249
250        let infer_schema = |bytes: MemSlice| {
251            let skip_rows = self.read_options.skip_rows;
252            let skip_lines = self.read_options.skip_lines;
253            let parse_options = self.read_options.get_parse_options();
254
255            let mut owned = vec![];
256            let bytes = maybe_decompress_bytes(bytes.as_ref(), &mut owned)?;
257
258            PolarsResult::Ok(
259                infer_file_schema(
260                    &get_reader_bytes(&mut std::io::Cursor::new(bytes))?,
261                    &parse_options,
262                    self.read_options.infer_schema_length,
263                    self.read_options.has_header,
264                    // we set it to None and modify them after the schema is updated
265                    None,
266                    skip_rows,
267                    skip_lines,
268                    self.read_options.skip_rows_after_header,
269                    self.read_options.raise_if_empty,
270                )?
271                .0,
272            )
273        };
274
275        let schema = match self.sources.clone() {
276            ScanSources::Paths(paths) => {
277                // TODO: Path expansion should happen when converting to the IR
278                // https://github.com/pola-rs/polars/issues/17634
279                let paths = expand_paths(&paths[..], self.glob(), self.cloud_options())?;
280
281                let Some(path) = paths.first() else {
282                    polars_bail!(ComputeError: "no paths specified for this reader");
283                };
284
285                infer_schema(MemSlice::from_file(&polars_utils::open_file(path)?)?)?
286            },
287            ScanSources::Files(files) => {
288                let Some(file) = files.first() else {
289                    polars_bail!(ComputeError: "no buffers specified for this reader");
290                };
291
292                infer_schema(MemSlice::from_file(file)?)?
293            },
294            ScanSources::Buffers(buffers) => {
295                let Some(buffer) = buffers.first() else {
296                    polars_bail!(ComputeError: "no buffers specified for this reader");
297                };
298
299                infer_schema(buffer.clone())?
300            },
301        };
302
303        self.read_options.n_threads = n_threads;
304        let mut schema = f(schema)?;
305
306        // the dtypes set may be for the new names, so update again
307        if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
308            for (name, dtype) in overwrite_schema.iter() {
309                schema.with_column(name.clone(), dtype.clone());
310            }
311        }
312
313        Ok(self.with_schema(Some(Arc::new(schema))))
314    }
315
316    pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
317        self.include_file_paths = include_file_paths;
318        self
319    }
320}
321
322impl LazyFileListReader for LazyCsvReader {
323    /// Get the final [LazyFrame].
324    fn finish(self) -> PolarsResult<LazyFrame> {
325        let rechunk = self.rechunk();
326        let row_index = self.row_index().cloned();
327        let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
328
329        let lf: LazyFrame = DslBuilder::scan_csv(
330            self.sources,
331            self.read_options,
332            UnifiedScanArgs {
333                schema: None,
334                cloud_options: self.cloud_options,
335                hive_options: HiveOptions::new_disabled(),
336                rechunk,
337                cache: self.cache,
338                glob: self.glob,
339                projection: None,
340                row_index,
341                pre_slice,
342                cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
343                missing_columns_policy: MissingColumnsPolicy::Raise,
344                include_file_paths: self.include_file_paths,
345            },
346        )?
347        .build()
348        .into();
349        Ok(lf)
350    }
351
352    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
353        unreachable!();
354    }
355
356    fn glob(&self) -> bool {
357        self.glob
358    }
359
360    fn sources(&self) -> &ScanSources {
361        &self.sources
362    }
363
364    fn with_sources(mut self, sources: ScanSources) -> Self {
365        self.sources = sources;
366        self
367    }
368
369    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
370        self.read_options.n_rows = n_rows.into();
371        self
372    }
373
374    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
375        self.read_options.row_index = row_index.into();
376        self
377    }
378
379    fn rechunk(&self) -> bool {
380        self.read_options.rechunk
381    }
382
383    /// Rechunk the memory to contiguous chunks when parsing is done.
384    fn with_rechunk(mut self, rechunk: bool) -> Self {
385        self.read_options.rechunk = rechunk;
386        self
387    }
388
389    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
390    /// be guaranteed.
391    fn n_rows(&self) -> Option<usize> {
392        self.read_options.n_rows
393    }
394
395    /// Return the row index settings.
396    fn row_index(&self) -> Option<&RowIndex> {
397        self.read_options.row_index.as_ref()
398    }
399
400    fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
401        // set to false, as the csv parser has full thread utilization
402        let args = UnionArgs {
403            rechunk: self.rechunk(),
404            parallel: false,
405            to_supertypes: false,
406            from_partitioned_ds: true,
407            ..Default::default()
408        };
409        concat_impl(&lfs, args)
410    }
411
412    /// [CloudOptions] used to list files.
413    fn cloud_options(&self) -> Option<&CloudOptions> {
414        self.cloud_options.as_ref()
415    }
416}