1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::csv::read::{
6 CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, infer_file_schema,
7};
8use polars_io::path_utils::expand_paths;
9use polars_io::utils::compression::maybe_decompress_bytes;
10use polars_io::utils::get_reader_bytes;
11use polars_io::{HiveOptions, RowIndex};
12use polars_utils::mmap::MemSlice;
13use polars_utils::slice_enum::Slice;
14
15use crate::prelude::*;
16
17#[derive(Clone)]
18#[cfg(feature = "csv")]
19pub struct LazyCsvReader {
20 sources: ScanSources,
21 glob: bool,
22 cache: bool,
23 read_options: CsvReadOptions,
24 cloud_options: Option<CloudOptions>,
25 include_file_paths: Option<PlSmallStr>,
26}
27
28#[cfg(feature = "csv")]
29impl LazyCsvReader {
30 pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
32 mut self,
33 map_func: F,
34 ) -> Self {
35 self.read_options = self.read_options.map_parse_options(map_func);
36 self
37 }
38
39 pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
40 Self::new_with_sources(ScanSources::Paths(paths))
41 }
42
43 pub fn new_with_sources(sources: ScanSources) -> Self {
44 LazyCsvReader {
45 sources,
46 glob: true,
47 cache: true,
48 read_options: Default::default(),
49 cloud_options: Default::default(),
50 include_file_paths: None,
51 }
52 }
53
54 pub fn new(path: impl AsRef<Path>) -> Self {
55 Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
56 }
57
58 #[must_use]
60 pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
61 self.read_options.skip_rows_after_header = offset;
62 self
63 }
64
65 #[must_use]
67 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
68 self.read_options.row_index = row_index;
69 self
70 }
71
72 #[must_use]
75 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
76 self.read_options.n_rows = num_rows;
77 self
78 }
79
80 #[must_use]
84 pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
85 self.read_options.infer_schema_length = num_rows;
86 self
87 }
88
89 #[must_use]
91 pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
92 self.read_options.ignore_errors = ignore;
93 self
94 }
95
96 #[must_use]
98 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
99 self.read_options.schema = schema;
100 self
101 }
102
103 #[must_use]
106 pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
107 self.read_options.skip_rows = skip_rows;
108 self
109 }
110
111 #[must_use]
114 pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
115 self.read_options.skip_lines = skip_lines;
116 self
117 }
118
119 #[must_use]
122 pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
123 self.read_options.schema_overwrite = schema;
124 self
125 }
126
127 #[must_use]
129 pub fn with_has_header(mut self, has_header: bool) -> Self {
130 self.read_options.has_header = has_header;
131 self
132 }
133
134 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
137 self.read_options.chunk_size = chunk_size;
138 self
139 }
140
141 #[must_use]
143 pub fn with_separator(self, separator: u8) -> Self {
144 self.map_parse_options(|opts| opts.with_separator(separator))
145 }
146
147 #[must_use]
149 pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
150 self.map_parse_options(|opts| {
151 opts.with_comment_prefix(comment_prefix.clone().map(|s| {
152 if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
153 CommentPrefix::Single(s.as_bytes()[0])
154 } else {
155 CommentPrefix::Multi(s)
156 }
157 }))
158 })
159 }
160
161 #[must_use]
163 pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
164 self.map_parse_options(|opts| opts.with_quote_char(quote_char))
165 }
166
167 #[must_use]
169 pub fn with_eol_char(self, eol_char: u8) -> Self {
170 self.map_parse_options(|opts| opts.with_eol_char(eol_char))
171 }
172
173 #[must_use]
175 pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
176 self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
177 }
178
179 pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
181 self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
182 }
183
184 #[must_use]
186 pub fn with_cache(mut self, cache: bool) -> Self {
187 self.cache = cache;
188 self
189 }
190
191 #[must_use]
193 pub fn with_low_memory(mut self, low_memory: bool) -> Self {
194 self.read_options.low_memory = low_memory;
195 self
196 }
197
198 #[must_use]
200 pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
201 self.map_parse_options(|opts| opts.with_encoding(encoding))
202 }
203
204 #[cfg(feature = "temporal")]
207 pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
208 self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
209 }
210
211 #[must_use]
213 pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
214 self.read_options.raise_if_empty = raise_if_empty;
215 self
216 }
217
218 #[must_use]
220 pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
221 self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
222 }
223
224 #[must_use]
225 pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
226 self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
227 }
228
229 #[must_use]
230 pub fn with_glob(mut self, toggle: bool) -> Self {
232 self.glob = toggle;
233 self
234 }
235
236 pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
237 self.cloud_options = cloud_options;
238 self
239 }
240
241 pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
245 where
246 F: Fn(Schema) -> PolarsResult<Schema>,
247 {
248 let n_threads = self.read_options.n_threads;
249
250 let infer_schema = |bytes: MemSlice| {
251 let skip_rows = self.read_options.skip_rows;
252 let skip_lines = self.read_options.skip_lines;
253 let parse_options = self.read_options.get_parse_options();
254
255 let mut owned = vec![];
256 let bytes = maybe_decompress_bytes(bytes.as_ref(), &mut owned)?;
257
258 PolarsResult::Ok(
259 infer_file_schema(
260 &get_reader_bytes(&mut std::io::Cursor::new(bytes))?,
261 &parse_options,
262 self.read_options.infer_schema_length,
263 self.read_options.has_header,
264 None,
266 skip_rows,
267 skip_lines,
268 self.read_options.skip_rows_after_header,
269 self.read_options.raise_if_empty,
270 )?
271 .0,
272 )
273 };
274
275 let schema = match self.sources.clone() {
276 ScanSources::Paths(paths) => {
277 let paths = expand_paths(&paths[..], self.glob(), self.cloud_options())?;
280
281 let Some(path) = paths.first() else {
282 polars_bail!(ComputeError: "no paths specified for this reader");
283 };
284
285 infer_schema(MemSlice::from_file(&polars_utils::open_file(path)?)?)?
286 },
287 ScanSources::Files(files) => {
288 let Some(file) = files.first() else {
289 polars_bail!(ComputeError: "no buffers specified for this reader");
290 };
291
292 infer_schema(MemSlice::from_file(file)?)?
293 },
294 ScanSources::Buffers(buffers) => {
295 let Some(buffer) = buffers.first() else {
296 polars_bail!(ComputeError: "no buffers specified for this reader");
297 };
298
299 infer_schema(buffer.clone())?
300 },
301 };
302
303 self.read_options.n_threads = n_threads;
304 let mut schema = f(schema)?;
305
306 if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
308 for (name, dtype) in overwrite_schema.iter() {
309 schema.with_column(name.clone(), dtype.clone());
310 }
311 }
312
313 Ok(self.with_schema(Some(Arc::new(schema))))
314 }
315
316 pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
317 self.include_file_paths = include_file_paths;
318 self
319 }
320}
321
322impl LazyFileListReader for LazyCsvReader {
323 fn finish(self) -> PolarsResult<LazyFrame> {
325 let rechunk = self.rechunk();
326 let row_index = self.row_index().cloned();
327 let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
328
329 let lf: LazyFrame = DslBuilder::scan_csv(
330 self.sources,
331 self.read_options,
332 UnifiedScanArgs {
333 schema: None,
334 cloud_options: self.cloud_options,
335 hive_options: HiveOptions::new_disabled(),
336 rechunk,
337 cache: self.cache,
338 glob: self.glob,
339 projection: None,
340 row_index,
341 pre_slice,
342 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
343 missing_columns_policy: MissingColumnsPolicy::Raise,
344 include_file_paths: self.include_file_paths,
345 },
346 )?
347 .build()
348 .into();
349 Ok(lf)
350 }
351
352 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
353 unreachable!();
354 }
355
356 fn glob(&self) -> bool {
357 self.glob
358 }
359
360 fn sources(&self) -> &ScanSources {
361 &self.sources
362 }
363
364 fn with_sources(mut self, sources: ScanSources) -> Self {
365 self.sources = sources;
366 self
367 }
368
369 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
370 self.read_options.n_rows = n_rows.into();
371 self
372 }
373
374 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
375 self.read_options.row_index = row_index.into();
376 self
377 }
378
379 fn rechunk(&self) -> bool {
380 self.read_options.rechunk
381 }
382
383 fn with_rechunk(mut self, rechunk: bool) -> Self {
385 self.read_options.rechunk = rechunk;
386 self
387 }
388
389 fn n_rows(&self) -> Option<usize> {
392 self.read_options.n_rows
393 }
394
395 fn row_index(&self) -> Option<&RowIndex> {
397 self.read_options.row_index.as_ref()
398 }
399
400 fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
401 let args = UnionArgs {
403 rechunk: self.rechunk(),
404 parallel: false,
405 to_supertypes: false,
406 from_partitioned_ds: true,
407 ..Default::default()
408 };
409 concat_impl(&lfs, args)
410 }
411
412 fn cloud_options(&self) -> Option<&CloudOptions> {
414 self.cloud_options.as_ref()
415 }
416}