1#![allow(unsafe_op_in_unsafe_fn)]
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use polars_core::datatypes::{DataType, Field};
6use polars_core::schema::{Schema, SchemaRef};
7use polars_error::PolarsResult;
8use polars_utils::pl_str::PlSmallStr;
9#[cfg(feature = "serde")]
10use serde::{Deserialize, Serialize};
11
12use crate::RowIndex;
13
14#[derive(Clone, Debug, PartialEq, Eq, Hash)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16pub struct CsvReadOptions {
17 pub path: Option<PathBuf>,
18 pub rechunk: bool,
20 pub n_threads: Option<usize>,
21 pub low_memory: bool,
22 pub n_rows: Option<usize>,
24 pub row_index: Option<RowIndex>,
25 pub columns: Option<Arc<[PlSmallStr]>>,
27 pub projection: Option<Arc<Vec<usize>>>,
28 pub schema: Option<SchemaRef>,
29 pub schema_overwrite: Option<SchemaRef>,
30 pub dtype_overwrite: Option<Arc<Vec<DataType>>>,
31 pub parse_options: Arc<CsvParseOptions>,
33 pub has_header: bool,
34 pub chunk_size: usize,
35 pub skip_rows: usize,
37 pub skip_lines: usize,
39 pub skip_rows_after_header: usize,
40 pub infer_schema_length: Option<usize>,
41 pub raise_if_empty: bool,
42 pub ignore_errors: bool,
43 pub fields_to_cast: Vec<Field>,
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, Hash)]
47#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
48pub struct CsvParseOptions {
49 pub separator: u8,
50 pub quote_char: Option<u8>,
51 pub eol_char: u8,
52 pub encoding: CsvEncoding,
53 pub null_values: Option<NullValues>,
54 pub missing_is_null: bool,
55 pub truncate_ragged_lines: bool,
56 pub comment_prefix: Option<CommentPrefix>,
57 pub try_parse_dates: bool,
58 pub decimal_comma: bool,
59}
60
61impl Default for CsvReadOptions {
62 fn default() -> Self {
63 Self {
64 path: None,
65
66 rechunk: false,
67 n_threads: None,
68 low_memory: false,
69
70 n_rows: None,
71 row_index: None,
72
73 columns: None,
74 projection: None,
75 schema: None,
76 schema_overwrite: None,
77 dtype_overwrite: None,
78
79 parse_options: Default::default(),
80 has_header: true,
81 chunk_size: 1 << 18,
82 skip_rows: 0,
83 skip_lines: 0,
84 skip_rows_after_header: 0,
85 infer_schema_length: Some(100),
86 raise_if_empty: true,
87 ignore_errors: false,
88 fields_to_cast: vec![],
89 }
90 }
91}
92
93impl Default for CsvParseOptions {
95 fn default() -> Self {
96 Self {
97 separator: b',',
98 quote_char: Some(b'"'),
99 eol_char: b'\n',
100 encoding: Default::default(),
101 null_values: None,
102 missing_is_null: true,
103 truncate_ragged_lines: false,
104 comment_prefix: None,
105 try_parse_dates: false,
106 decimal_comma: false,
107 }
108 }
109}
110
111impl CsvReadOptions {
112 pub fn get_parse_options(&self) -> Arc<CsvParseOptions> {
113 self.parse_options.clone()
114 }
115
116 pub fn with_path<P: Into<PathBuf>>(mut self, path: Option<P>) -> Self {
117 self.path = path.map(|p| p.into());
118 self
119 }
120
121 pub fn with_rechunk(mut self, rechunk: bool) -> Self {
123 self.rechunk = rechunk;
124 self
125 }
126
127 pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
130 self.n_threads = n_threads;
131 self
132 }
133
134 pub fn with_low_memory(mut self, low_memory: bool) -> Self {
136 self.low_memory = low_memory;
137 self
138 }
139
140 pub fn with_n_rows(mut self, n_rows: Option<usize>) -> Self {
142 self.n_rows = n_rows;
143 self
144 }
145
146 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
148 self.row_index = row_index;
149 self
150 }
151
152 pub fn with_columns(mut self, columns: Option<Arc<[PlSmallStr]>>) -> Self {
154 self.columns = columns;
155 self
156 }
157
158 pub fn with_projection(mut self, projection: Option<Arc<Vec<usize>>>) -> Self {
161 self.projection = projection;
162 self
163 }
164
165 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
169 self.schema = schema;
170 self
171 }
172
173 pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
175 self.schema_overwrite = schema_overwrite;
176 self
177 }
178
179 pub fn with_dtype_overwrite(mut self, dtype_overwrite: Option<Arc<Vec<DataType>>>) -> Self {
182 self.dtype_overwrite = dtype_overwrite;
183 self
184 }
185
186 pub fn with_parse_options(mut self, parse_options: CsvParseOptions) -> Self {
189 self.parse_options = Arc::new(parse_options);
190 self
191 }
192
193 pub fn with_has_header(mut self, has_header: bool) -> Self {
195 self.has_header = has_header;
196 self
197 }
198
199 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
201 self.chunk_size = chunk_size;
202 self
203 }
204
205 pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
209 self.skip_rows = skip_rows;
210 self
211 }
212
213 pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
217 self.skip_lines = skip_lines;
218 self
219 }
220
221 pub fn with_skip_rows_after_header(mut self, skip_rows_after_header: usize) -> Self {
223 self.skip_rows_after_header = skip_rows_after_header;
224 self
225 }
226
227 pub fn with_infer_schema_length(mut self, infer_schema_length: Option<usize>) -> Self {
231 self.infer_schema_length = infer_schema_length;
232 self
233 }
234
235 pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
238 self.raise_if_empty = raise_if_empty;
239 self
240 }
241
242 pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
244 self.ignore_errors = ignore_errors;
245 self
246 }
247
248 pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
250 mut self,
251 map_func: F,
252 ) -> Self {
253 let parse_options = Arc::unwrap_or_clone(self.parse_options);
254 self.parse_options = Arc::new(map_func(parse_options));
255 self
256 }
257}
258
259impl CsvParseOptions {
260 pub fn with_separator(mut self, separator: u8) -> Self {
263 self.separator = separator;
264 self
265 }
266
267 pub fn with_quote_char(mut self, quote_char: Option<u8>) -> Self {
270 self.quote_char = quote_char;
271 self
272 }
273
274 pub fn with_eol_char(mut self, eol_char: u8) -> Self {
276 self.eol_char = eol_char;
277 self
278 }
279
280 pub fn with_encoding(mut self, encoding: CsvEncoding) -> Self {
282 self.encoding = encoding;
283 self
284 }
285
286 pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
291 self.null_values = null_values;
292 self
293 }
294
295 pub fn with_missing_is_null(mut self, missing_is_null: bool) -> Self {
297 self.missing_is_null = missing_is_null;
298 self
299 }
300
301 pub fn with_truncate_ragged_lines(mut self, truncate_ragged_lines: bool) -> Self {
303 self.truncate_ragged_lines = truncate_ragged_lines;
304 self
305 }
306
307 pub fn with_comment_prefix<T: Into<CommentPrefix>>(
310 mut self,
311 comment_prefix: Option<T>,
312 ) -> Self {
313 self.comment_prefix = comment_prefix.map(Into::into);
314 self
315 }
316
317 pub fn with_try_parse_dates(mut self, try_parse_dates: bool) -> Self {
320 self.try_parse_dates = try_parse_dates;
321 self
322 }
323
324 pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
326 self.decimal_comma = decimal_comma;
327 self
328 }
329}
330
331#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
332#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
333pub enum CsvEncoding {
334 #[default]
336 Utf8,
337 LossyUtf8,
339}
340
341#[derive(Clone, Debug, Eq, PartialEq, Hash)]
342#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
343pub enum CommentPrefix {
344 Single(u8),
346 Multi(PlSmallStr),
349}
350
351impl CommentPrefix {
352 pub fn new_single(prefix: u8) -> Self {
354 CommentPrefix::Single(prefix)
355 }
356
357 pub fn new_multi(prefix: PlSmallStr) -> Self {
359 CommentPrefix::Multi(prefix)
360 }
361
362 pub fn new_from_str(prefix: &str) -> Self {
364 if prefix.len() == 1 && prefix.chars().next().unwrap().is_ascii() {
365 let c = prefix.as_bytes()[0];
366 CommentPrefix::Single(c)
367 } else {
368 CommentPrefix::Multi(PlSmallStr::from_str(prefix))
369 }
370 }
371}
372
373impl From<&str> for CommentPrefix {
374 fn from(value: &str) -> Self {
375 Self::new_from_str(value)
376 }
377}
378
379#[derive(Clone, Debug, Eq, PartialEq, Hash)]
380#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
381pub enum NullValues {
382 AllColumnsSingle(PlSmallStr),
384 AllColumns(Vec<PlSmallStr>),
386 Named(Vec<(PlSmallStr, PlSmallStr)>),
388}
389
390impl NullValues {
391 pub fn compile(self, schema: &Schema) -> PolarsResult<NullValuesCompiled> {
392 Ok(match self {
393 NullValues::AllColumnsSingle(v) => NullValuesCompiled::AllColumnsSingle(v),
394 NullValues::AllColumns(v) => NullValuesCompiled::AllColumns(v),
395 NullValues::Named(v) => {
396 let mut null_values = vec![PlSmallStr::from_static(""); schema.len()];
397 for (name, null_value) in v {
398 let i = schema.try_index_of(&name)?;
399 null_values[i] = null_value;
400 }
401 NullValuesCompiled::Columns(null_values)
402 },
403 })
404 }
405}
406
407#[derive(Debug, Clone)]
408pub enum NullValuesCompiled {
409 AllColumnsSingle(PlSmallStr),
411 AllColumns(Vec<PlSmallStr>),
413 Columns(Vec<PlSmallStr>),
415}
416
417impl NullValuesCompiled {
418 pub(super) unsafe fn is_null(&self, field: &[u8], index: usize) -> bool {
422 use NullValuesCompiled::*;
423 match self {
424 AllColumnsSingle(v) => v.as_bytes() == field,
425 AllColumns(v) => v.iter().any(|v| v.as_bytes() == field),
426 Columns(v) => {
427 debug_assert!(index < v.len());
428 v.get_unchecked(index).as_bytes() == field
429 },
430 }
431 }
432}