use std::path::PathBuf;
use std::sync::Arc;
use polars_core::datatypes::{DataType, Field};
use polars_core::schema::{Schema, SchemaRef};
use polars_error::PolarsResult;
use polars_utils::pl_str::PlSmallStr;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use crate::RowIndex;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CsvReadOptions {
pub path: Option<PathBuf>,
pub rechunk: bool,
pub n_threads: Option<usize>,
pub low_memory: bool,
pub n_rows: Option<usize>,
pub row_index: Option<RowIndex>,
pub columns: Option<Arc<[PlSmallStr]>>,
pub projection: Option<Arc<Vec<usize>>>,
pub schema: Option<SchemaRef>,
pub schema_overwrite: Option<SchemaRef>,
pub dtype_overwrite: Option<Arc<Vec<DataType>>>,
pub parse_options: Arc<CsvParseOptions>,
pub has_header: bool,
pub chunk_size: usize,
pub skip_rows: usize,
pub skip_rows_after_header: usize,
pub infer_schema_length: Option<usize>,
pub raise_if_empty: bool,
pub ignore_errors: bool,
pub fields_to_cast: Vec<Field>,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CsvParseOptions {
pub separator: u8,
pub quote_char: Option<u8>,
pub eol_char: u8,
pub encoding: CsvEncoding,
pub null_values: Option<NullValues>,
pub missing_is_null: bool,
pub truncate_ragged_lines: bool,
pub comment_prefix: Option<CommentPrefix>,
pub try_parse_dates: bool,
pub decimal_comma: bool,
}
impl Default for CsvReadOptions {
fn default() -> Self {
Self {
path: None,
rechunk: false,
n_threads: None,
low_memory: false,
n_rows: None,
row_index: None,
columns: None,
projection: None,
schema: None,
schema_overwrite: None,
dtype_overwrite: None,
parse_options: Default::default(),
has_header: true,
chunk_size: 1 << 18,
skip_rows: 0,
skip_rows_after_header: 0,
infer_schema_length: Some(100),
raise_if_empty: true,
ignore_errors: false,
fields_to_cast: vec![],
}
}
}
impl Default for CsvParseOptions {
fn default() -> Self {
Self {
separator: b',',
quote_char: Some(b'"'),
eol_char: b'\n',
encoding: Default::default(),
null_values: None,
missing_is_null: true,
truncate_ragged_lines: false,
comment_prefix: None,
try_parse_dates: false,
decimal_comma: false,
}
}
}
impl CsvReadOptions {
pub fn get_parse_options(&self) -> Arc<CsvParseOptions> {
self.parse_options.clone()
}
pub fn with_path<P: Into<PathBuf>>(mut self, path: Option<P>) -> Self {
self.path = path.map(|p| p.into());
self
}
pub fn with_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}
pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
self.n_threads = n_threads;
self
}
pub fn with_low_memory(mut self, low_memory: bool) -> Self {
self.low_memory = low_memory;
self
}
pub fn with_n_rows(mut self, n_rows: Option<usize>) -> Self {
self.n_rows = n_rows;
self
}
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
}
pub fn with_columns(mut self, columns: Option<Arc<[PlSmallStr]>>) -> Self {
self.columns = columns;
self
}
pub fn with_projection(mut self, projection: Option<Arc<Vec<usize>>>) -> Self {
self.projection = projection;
self
}
pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
self.schema = schema;
self
}
pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
self.schema_overwrite = schema_overwrite;
self
}
pub fn with_dtype_overwrite(mut self, dtype_overwrite: Option<Arc<Vec<DataType>>>) -> Self {
self.dtype_overwrite = dtype_overwrite;
self
}
pub fn with_parse_options(mut self, parse_options: CsvParseOptions) -> Self {
self.parse_options = Arc::new(parse_options);
self
}
pub fn with_has_header(mut self, has_header: bool) -> Self {
self.has_header = has_header;
self
}
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
self.skip_rows = skip_rows;
self
}
pub fn with_skip_rows_after_header(mut self, skip_rows_after_header: usize) -> Self {
self.skip_rows_after_header = skip_rows_after_header;
self
}
pub fn with_infer_schema_length(mut self, infer_schema_length: Option<usize>) -> Self {
self.infer_schema_length = infer_schema_length;
self
}
pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
self.raise_if_empty = raise_if_empty;
self
}
pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
self.ignore_errors = ignore_errors;
self
}
pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
mut self,
map_func: F,
) -> Self {
let parse_options = Arc::unwrap_or_clone(self.parse_options);
self.parse_options = Arc::new(map_func(parse_options));
self
}
}
impl CsvParseOptions {
pub fn with_separator(mut self, separator: u8) -> Self {
self.separator = separator;
self
}
pub fn with_quote_char(mut self, quote_char: Option<u8>) -> Self {
self.quote_char = quote_char;
self
}
pub fn with_eol_char(mut self, eol_char: u8) -> Self {
self.eol_char = eol_char;
self
}
pub fn with_encoding(mut self, encoding: CsvEncoding) -> Self {
self.encoding = encoding;
self
}
pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
self.null_values = null_values;
self
}
pub fn with_missing_is_null(mut self, missing_is_null: bool) -> Self {
self.missing_is_null = missing_is_null;
self
}
pub fn with_truncate_ragged_lines(mut self, truncate_ragged_lines: bool) -> Self {
self.truncate_ragged_lines = truncate_ragged_lines;
self
}
pub fn with_comment_prefix<T: Into<CommentPrefix>>(
mut self,
comment_prefix: Option<T>,
) -> Self {
self.comment_prefix = comment_prefix.map(Into::into);
self
}
pub fn with_try_parse_dates(mut self, try_parse_dates: bool) -> Self {
self.try_parse_dates = try_parse_dates;
self
}
pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
self.decimal_comma = decimal_comma;
self
}
}
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CsvEncoding {
#[default]
Utf8,
LossyUtf8,
}
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CommentPrefix {
Single(u8),
Multi(PlSmallStr),
}
impl CommentPrefix {
pub fn new_single(prefix: u8) -> Self {
CommentPrefix::Single(prefix)
}
pub fn new_multi(prefix: PlSmallStr) -> Self {
CommentPrefix::Multi(prefix)
}
pub fn new_from_str(prefix: &str) -> Self {
if prefix.len() == 1 && prefix.chars().next().unwrap().is_ascii() {
let c = prefix.as_bytes()[0];
CommentPrefix::Single(c)
} else {
CommentPrefix::Multi(PlSmallStr::from_str(prefix))
}
}
}
impl From<&str> for CommentPrefix {
fn from(value: &str) -> Self {
Self::new_from_str(value)
}
}
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum NullValues {
AllColumnsSingle(PlSmallStr),
AllColumns(Vec<PlSmallStr>),
Named(Vec<(PlSmallStr, PlSmallStr)>),
}
impl NullValues {
pub fn compile(self, schema: &Schema) -> PolarsResult<NullValuesCompiled> {
Ok(match self {
NullValues::AllColumnsSingle(v) => NullValuesCompiled::AllColumnsSingle(v),
NullValues::AllColumns(v) => NullValuesCompiled::AllColumns(v),
NullValues::Named(v) => {
let mut null_values = vec![PlSmallStr::from_static(""); schema.len()];
for (name, null_value) in v {
let i = schema.try_index_of(&name)?;
null_values[i] = null_value;
}
NullValuesCompiled::Columns(null_values)
},
})
}
}
#[derive(Debug, Clone)]
pub enum NullValuesCompiled {
AllColumnsSingle(PlSmallStr),
AllColumns(Vec<PlSmallStr>),
Columns(Vec<PlSmallStr>),
}
impl NullValuesCompiled {
pub(super) unsafe fn is_null(&self, field: &[u8], index: usize) -> bool {
use NullValuesCompiled::*;
match self {
AllColumnsSingle(v) => v.as_bytes() == field,
AllColumns(v) => v.iter().any(|v| v.as_bytes() == field),
Columns(v) => {
debug_assert!(index < v.len());
v.get_unchecked(index).as_bytes() == field
},
}
}
}