1#![allow(unsafe_op_in_unsafe_fn)]
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use polars_core::datatypes::{DataType, Field};
6use polars_core::schema::{Schema, SchemaRef};
7use polars_error::PolarsResult;
8use polars_utils::pl_str::PlSmallStr;
9#[cfg(feature = "serde")]
10use serde::{Deserialize, Serialize};
11
12use crate::RowIndex;
13
14#[derive(Clone, Debug, PartialEq, Eq, Hash)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16pub struct CsvReadOptions {
17 pub path: Option<PathBuf>,
18 pub rechunk: bool,
20 pub n_threads: Option<usize>,
21 pub low_memory: bool,
22 pub n_rows: Option<usize>,
24 pub row_index: Option<RowIndex>,
25 pub columns: Option<Arc<[PlSmallStr]>>,
27 pub projection: Option<Arc<Vec<usize>>>,
28 pub schema: Option<SchemaRef>,
29 pub schema_overwrite: Option<SchemaRef>,
30 pub dtype_overwrite: Option<Arc<Vec<DataType>>>,
31 pub parse_options: Arc<CsvParseOptions>,
33 pub has_header: bool,
34 pub chunk_size: usize,
35 pub skip_rows: usize,
37 pub skip_lines: usize,
39 pub skip_rows_after_header: usize,
40 pub infer_schema_length: Option<usize>,
41 pub raise_if_empty: bool,
42 pub ignore_errors: bool,
43 pub fields_to_cast: Vec<Field>,
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, Hash)]
47#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
48pub struct CsvParseOptions {
49 pub separator: u8,
50 pub quote_char: Option<u8>,
51 pub eol_char: u8,
52 pub encoding: CsvEncoding,
53 pub null_values: Option<NullValues>,
54 pub missing_is_null: bool,
55 pub truncate_ragged_lines: bool,
56 pub comment_prefix: Option<CommentPrefix>,
57 pub try_parse_dates: bool,
58 pub decimal_comma: bool,
59}
60
61impl Default for CsvReadOptions {
62 fn default() -> Self {
63 Self {
64 path: None,
65
66 rechunk: false,
67 n_threads: None,
68 low_memory: false,
69
70 n_rows: None,
71 row_index: None,
72
73 columns: None,
74 projection: None,
75 schema: None,
76 schema_overwrite: None,
77 dtype_overwrite: None,
78
79 parse_options: Default::default(),
80 has_header: true,
81 chunk_size: 1 << 18,
82 skip_rows: 0,
83 skip_lines: 0,
84 skip_rows_after_header: 0,
85 infer_schema_length: Some(100),
86 raise_if_empty: true,
87 ignore_errors: false,
88 fields_to_cast: vec![],
89 }
90 }
91}
92
93impl Default for CsvParseOptions {
95 fn default() -> Self {
96 Self {
97 separator: b',',
98 quote_char: Some(b'"'),
99 eol_char: b'\n',
100 encoding: Default::default(),
101 null_values: None,
102 missing_is_null: true,
103 truncate_ragged_lines: false,
104 comment_prefix: None,
105 try_parse_dates: false,
106 decimal_comma: false,
107 }
108 }
109}
110
111impl CsvReadOptions {
112 pub fn get_parse_options(&self) -> Arc<CsvParseOptions> {
113 self.parse_options.clone()
114 }
115
116 pub fn with_path<P: Into<PathBuf>>(mut self, path: Option<P>) -> Self {
117 self.path = path.map(|p| p.into());
118 self
119 }
120
121 pub fn with_rechunk(mut self, rechunk: bool) -> Self {
123 self.rechunk = rechunk;
124 self
125 }
126
127 pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
130 self.n_threads = n_threads;
131 self
132 }
133
134 pub fn with_low_memory(mut self, low_memory: bool) -> Self {
136 self.low_memory = low_memory;
137 self
138 }
139
140 pub fn with_n_rows(mut self, n_rows: Option<usize>) -> Self {
142 self.n_rows = n_rows;
143 self
144 }
145
146 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
148 self.row_index = row_index;
149 self
150 }
151
152 pub fn with_columns(mut self, columns: Option<Arc<[PlSmallStr]>>) -> Self {
154 self.columns = columns;
155 self
156 }
157
158 pub fn with_projection(mut self, projection: Option<Arc<Vec<usize>>>) -> Self {
161 self.projection = projection;
162 self
163 }
164
165 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
169 self.schema = schema;
170 self
171 }
172
173 pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
175 self.schema_overwrite = schema_overwrite;
176 self
177 }
178
179 pub fn with_dtype_overwrite(mut self, dtype_overwrite: Option<Arc<Vec<DataType>>>) -> Self {
182 self.dtype_overwrite = dtype_overwrite;
183 self
184 }
185
186 pub fn with_parse_options(mut self, parse_options: CsvParseOptions) -> Self {
189 self.parse_options = Arc::new(parse_options);
190 self
191 }
192
193 pub fn with_has_header(mut self, has_header: bool) -> Self {
195 self.has_header = has_header;
196 self
197 }
198
199 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
201 self.chunk_size = chunk_size;
202 self
203 }
204
205 pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
209 self.skip_rows = skip_rows;
210 self
211 }
212
213 pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
217 self.skip_lines = skip_lines;
218 self
219 }
220
221 pub fn with_skip_rows_after_header(mut self, skip_rows_after_header: usize) -> Self {
223 self.skip_rows_after_header = skip_rows_after_header;
224 self
225 }
226
227 pub fn with_infer_schema_length(mut self, infer_schema_length: Option<usize>) -> Self {
229 self.infer_schema_length = infer_schema_length;
230 self
231 }
232
233 pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
236 self.raise_if_empty = raise_if_empty;
237 self
238 }
239
240 pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
242 self.ignore_errors = ignore_errors;
243 self
244 }
245
246 pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
248 mut self,
249 map_func: F,
250 ) -> Self {
251 let parse_options = Arc::unwrap_or_clone(self.parse_options);
252 self.parse_options = Arc::new(map_func(parse_options));
253 self
254 }
255}
256
257impl CsvParseOptions {
258 pub fn with_separator(mut self, separator: u8) -> Self {
261 self.separator = separator;
262 self
263 }
264
265 pub fn with_quote_char(mut self, quote_char: Option<u8>) -> Self {
268 self.quote_char = quote_char;
269 self
270 }
271
272 pub fn with_eol_char(mut self, eol_char: u8) -> Self {
274 self.eol_char = eol_char;
275 self
276 }
277
278 pub fn with_encoding(mut self, encoding: CsvEncoding) -> Self {
280 self.encoding = encoding;
281 self
282 }
283
284 pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
289 self.null_values = null_values;
290 self
291 }
292
293 pub fn with_missing_is_null(mut self, missing_is_null: bool) -> Self {
295 self.missing_is_null = missing_is_null;
296 self
297 }
298
299 pub fn with_truncate_ragged_lines(mut self, truncate_ragged_lines: bool) -> Self {
301 self.truncate_ragged_lines = truncate_ragged_lines;
302 self
303 }
304
305 pub fn with_comment_prefix<T: Into<CommentPrefix>>(
308 mut self,
309 comment_prefix: Option<T>,
310 ) -> Self {
311 self.comment_prefix = comment_prefix.map(Into::into);
312 self
313 }
314
315 pub fn with_try_parse_dates(mut self, try_parse_dates: bool) -> Self {
318 self.try_parse_dates = try_parse_dates;
319 self
320 }
321
322 pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
324 self.decimal_comma = decimal_comma;
325 self
326 }
327}
328
329#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
330#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
331pub enum CsvEncoding {
332 #[default]
334 Utf8,
335 LossyUtf8,
337}
338
339#[derive(Clone, Debug, Eq, PartialEq, Hash)]
340#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
341pub enum CommentPrefix {
342 Single(u8),
344 Multi(PlSmallStr),
347}
348
349impl CommentPrefix {
350 pub fn new_single(prefix: u8) -> Self {
352 CommentPrefix::Single(prefix)
353 }
354
355 pub fn new_multi(prefix: PlSmallStr) -> Self {
357 CommentPrefix::Multi(prefix)
358 }
359
360 pub fn new_from_str(prefix: &str) -> Self {
362 if prefix.len() == 1 && prefix.chars().next().unwrap().is_ascii() {
363 let c = prefix.as_bytes()[0];
364 CommentPrefix::Single(c)
365 } else {
366 CommentPrefix::Multi(PlSmallStr::from_str(prefix))
367 }
368 }
369}
370
371impl From<&str> for CommentPrefix {
372 fn from(value: &str) -> Self {
373 Self::new_from_str(value)
374 }
375}
376
377#[derive(Clone, Debug, Eq, PartialEq, Hash)]
378#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
379pub enum NullValues {
380 AllColumnsSingle(PlSmallStr),
382 AllColumns(Vec<PlSmallStr>),
384 Named(Vec<(PlSmallStr, PlSmallStr)>),
386}
387
388impl NullValues {
389 pub fn compile(self, schema: &Schema) -> PolarsResult<NullValuesCompiled> {
390 Ok(match self {
391 NullValues::AllColumnsSingle(v) => NullValuesCompiled::AllColumnsSingle(v),
392 NullValues::AllColumns(v) => NullValuesCompiled::AllColumns(v),
393 NullValues::Named(v) => {
394 let mut null_values = vec![PlSmallStr::from_static(""); schema.len()];
395 for (name, null_value) in v {
396 let i = schema.try_index_of(&name)?;
397 null_values[i] = null_value;
398 }
399 NullValuesCompiled::Columns(null_values)
400 },
401 })
402 }
403}
404
405#[derive(Debug, Clone)]
406pub enum NullValuesCompiled {
407 AllColumnsSingle(PlSmallStr),
409 AllColumns(Vec<PlSmallStr>),
411 Columns(Vec<PlSmallStr>),
413}
414
415impl NullValuesCompiled {
416 pub(super) unsafe fn is_null(&self, field: &[u8], index: usize) -> bool {
420 use NullValuesCompiled::*;
421 match self {
422 AllColumnsSingle(v) => v.as_bytes() == field,
423 AllColumns(v) => v.iter().any(|v| v.as_bytes() == field),
424 Columns(v) => {
425 debug_assert!(index < v.len());
426 v.get_unchecked(index).as_bytes() == field
427 },
428 }
429 }
430}