1#![allow(unsafe_op_in_unsafe_fn)]
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use polars_core::datatypes::{DataType, Field};
6use polars_core::schema::{Schema, SchemaRef};
7use polars_error::PolarsResult;
8use polars_utils::pl_str::PlSmallStr;
9#[cfg(feature = "serde")]
10use serde::{Deserialize, Serialize};
11
12use crate::RowIndex;
13
14#[derive(Clone, Debug, PartialEq, Eq, Hash)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
17pub struct CsvReadOptions {
18 pub path: Option<PathBuf>,
19 pub rechunk: bool,
21 pub n_threads: Option<usize>,
22 pub low_memory: bool,
23 pub n_rows: Option<usize>,
25 pub row_index: Option<RowIndex>,
26 pub columns: Option<Arc<[PlSmallStr]>>,
28 pub projection: Option<Arc<Vec<usize>>>,
29 pub schema: Option<SchemaRef>,
30 pub schema_overwrite: Option<SchemaRef>,
31 pub dtype_overwrite: Option<Arc<Vec<DataType>>>,
32 pub parse_options: Arc<CsvParseOptions>,
34 pub has_header: bool,
35 pub chunk_size: usize,
36 pub skip_rows: usize,
38 pub skip_lines: usize,
40 pub skip_rows_after_header: usize,
41 pub infer_schema_length: Option<usize>,
42 pub raise_if_empty: bool,
43 pub ignore_errors: bool,
44 pub fields_to_cast: Vec<Field>,
45}
46
47#[derive(Clone, Debug, PartialEq, Eq, Hash)]
48#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
49#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
50pub struct CsvParseOptions {
51 pub separator: u8,
52 pub quote_char: Option<u8>,
53 pub eol_char: u8,
54 pub encoding: CsvEncoding,
55 pub null_values: Option<NullValues>,
56 pub missing_is_null: bool,
57 pub truncate_ragged_lines: bool,
58 pub comment_prefix: Option<CommentPrefix>,
59 pub try_parse_dates: bool,
60 pub decimal_comma: bool,
61}
62
63impl Default for CsvReadOptions {
64 fn default() -> Self {
65 Self {
66 path: None,
67
68 rechunk: false,
69 n_threads: None,
70 low_memory: false,
71
72 n_rows: None,
73 row_index: None,
74
75 columns: None,
76 projection: None,
77 schema: None,
78 schema_overwrite: None,
79 dtype_overwrite: None,
80
81 parse_options: Default::default(),
82 has_header: true,
83 chunk_size: 1 << 18,
84 skip_rows: 0,
85 skip_lines: 0,
86 skip_rows_after_header: 0,
87 infer_schema_length: Some(100),
88 raise_if_empty: true,
89 ignore_errors: false,
90 fields_to_cast: vec![],
91 }
92 }
93}
94
95impl Default for CsvParseOptions {
97 fn default() -> Self {
98 Self {
99 separator: b',',
100 quote_char: Some(b'"'),
101 eol_char: b'\n',
102 encoding: Default::default(),
103 null_values: None,
104 missing_is_null: true,
105 truncate_ragged_lines: false,
106 comment_prefix: None,
107 try_parse_dates: false,
108 decimal_comma: false,
109 }
110 }
111}
112
113impl CsvReadOptions {
114 pub fn get_parse_options(&self) -> Arc<CsvParseOptions> {
115 self.parse_options.clone()
116 }
117
118 pub fn with_path<P: Into<PathBuf>>(mut self, path: Option<P>) -> Self {
119 self.path = path.map(|p| p.into());
120 self
121 }
122
123 pub fn with_rechunk(mut self, rechunk: bool) -> Self {
125 self.rechunk = rechunk;
126 self
127 }
128
129 pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
132 self.n_threads = n_threads;
133 self
134 }
135
136 pub fn with_low_memory(mut self, low_memory: bool) -> Self {
138 self.low_memory = low_memory;
139 self
140 }
141
142 pub fn with_n_rows(mut self, n_rows: Option<usize>) -> Self {
144 self.n_rows = n_rows;
145 self
146 }
147
148 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
150 self.row_index = row_index;
151 self
152 }
153
154 pub fn with_columns(mut self, columns: Option<Arc<[PlSmallStr]>>) -> Self {
156 self.columns = columns;
157 self
158 }
159
160 pub fn with_projection(mut self, projection: Option<Arc<Vec<usize>>>) -> Self {
163 self.projection = projection;
164 self
165 }
166
167 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
171 self.schema = schema;
172 self
173 }
174
175 pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
177 self.schema_overwrite = schema_overwrite;
178 self
179 }
180
181 pub fn with_dtype_overwrite(mut self, dtype_overwrite: Option<Arc<Vec<DataType>>>) -> Self {
184 self.dtype_overwrite = dtype_overwrite;
185 self
186 }
187
188 pub fn with_parse_options(mut self, parse_options: CsvParseOptions) -> Self {
191 self.parse_options = Arc::new(parse_options);
192 self
193 }
194
195 pub fn with_has_header(mut self, has_header: bool) -> Self {
197 self.has_header = has_header;
198 self
199 }
200
201 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
203 self.chunk_size = chunk_size;
204 self
205 }
206
207 pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
211 self.skip_rows = skip_rows;
212 self
213 }
214
215 pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
219 self.skip_lines = skip_lines;
220 self
221 }
222
223 pub fn with_skip_rows_after_header(mut self, skip_rows_after_header: usize) -> Self {
225 self.skip_rows_after_header = skip_rows_after_header;
226 self
227 }
228
229 pub fn with_infer_schema_length(mut self, infer_schema_length: Option<usize>) -> Self {
233 self.infer_schema_length = infer_schema_length;
234 self
235 }
236
237 pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
240 self.raise_if_empty = raise_if_empty;
241 self
242 }
243
244 pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
246 self.ignore_errors = ignore_errors;
247 self
248 }
249
250 pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
252 mut self,
253 map_func: F,
254 ) -> Self {
255 let parse_options = Arc::unwrap_or_clone(self.parse_options);
256 self.parse_options = Arc::new(map_func(parse_options));
257 self
258 }
259}
260
261impl CsvParseOptions {
262 pub fn with_separator(mut self, separator: u8) -> Self {
265 self.separator = separator;
266 self
267 }
268
269 pub fn with_quote_char(mut self, quote_char: Option<u8>) -> Self {
272 self.quote_char = quote_char;
273 self
274 }
275
276 pub fn with_eol_char(mut self, eol_char: u8) -> Self {
278 self.eol_char = eol_char;
279 self
280 }
281
282 pub fn with_encoding(mut self, encoding: CsvEncoding) -> Self {
284 self.encoding = encoding;
285 self
286 }
287
288 pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
293 self.null_values = null_values;
294 self
295 }
296
297 pub fn with_missing_is_null(mut self, missing_is_null: bool) -> Self {
299 self.missing_is_null = missing_is_null;
300 self
301 }
302
303 pub fn with_truncate_ragged_lines(mut self, truncate_ragged_lines: bool) -> Self {
305 self.truncate_ragged_lines = truncate_ragged_lines;
306 self
307 }
308
309 pub fn with_comment_prefix<T: Into<CommentPrefix>>(
312 mut self,
313 comment_prefix: Option<T>,
314 ) -> Self {
315 self.comment_prefix = comment_prefix.map(Into::into);
316 self
317 }
318
319 pub fn with_try_parse_dates(mut self, try_parse_dates: bool) -> Self {
322 self.try_parse_dates = try_parse_dates;
323 self
324 }
325
326 pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
328 self.decimal_comma = decimal_comma;
329 self
330 }
331}
332
333#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
334#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
335#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
336pub enum CsvEncoding {
337 #[default]
339 Utf8,
340 LossyUtf8,
342}
343
344#[derive(Clone, Debug, Eq, PartialEq, Hash)]
345#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
346#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
347pub enum CommentPrefix {
348 Single(u8),
350 Multi(PlSmallStr),
353}
354
355impl CommentPrefix {
356 pub fn new_single(prefix: u8) -> Self {
358 CommentPrefix::Single(prefix)
359 }
360
361 pub fn new_multi(prefix: PlSmallStr) -> Self {
363 CommentPrefix::Multi(prefix)
364 }
365
366 pub fn new_from_str(prefix: &str) -> Self {
368 if prefix.len() == 1 && prefix.chars().next().unwrap().is_ascii() {
369 let c = prefix.as_bytes()[0];
370 CommentPrefix::Single(c)
371 } else {
372 CommentPrefix::Multi(PlSmallStr::from_str(prefix))
373 }
374 }
375}
376
377impl From<&str> for CommentPrefix {
378 fn from(value: &str) -> Self {
379 Self::new_from_str(value)
380 }
381}
382
383#[derive(Clone, Debug, Eq, PartialEq, Hash)]
384#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
385#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
386pub enum NullValues {
387 AllColumnsSingle(PlSmallStr),
389 AllColumns(Vec<PlSmallStr>),
391 Named(Vec<(PlSmallStr, PlSmallStr)>),
393}
394
395impl NullValues {
396 pub fn compile(self, schema: &Schema) -> PolarsResult<NullValuesCompiled> {
397 Ok(match self {
398 NullValues::AllColumnsSingle(v) => NullValuesCompiled::AllColumnsSingle(v),
399 NullValues::AllColumns(v) => NullValuesCompiled::AllColumns(v),
400 NullValues::Named(v) => {
401 let mut null_values = vec![PlSmallStr::from_static(""); schema.len()];
402 for (name, null_value) in v {
403 let i = schema.try_index_of(&name)?;
404 null_values[i] = null_value;
405 }
406 NullValuesCompiled::Columns(null_values)
407 },
408 })
409 }
410}
411
412#[derive(Debug, Clone)]
413pub enum NullValuesCompiled {
414 AllColumnsSingle(PlSmallStr),
416 AllColumns(Vec<PlSmallStr>),
418 Columns(Vec<PlSmallStr>),
420}
421
422impl NullValuesCompiled {
423 pub(super) unsafe fn is_null(&self, field: &[u8], index: usize) -> bool {
427 use NullValuesCompiled::*;
428 match self {
429 AllColumnsSingle(v) => v.as_bytes() == field,
430 AllColumns(v) => v.iter().any(|v| v.as_bytes() == field),
431 Columns(v) => {
432 debug_assert!(index < v.len());
433 v.get_unchecked(index).as_bytes() == field
434 },
435 }
436 }
437}