use std::fs::File;
use std::path::PathBuf;
use polars_core::prelude::*;
#[cfg(feature = "temporal")]
use polars_time::prelude::*;
#[cfg(feature = "temporal")]
use rayon::prelude::*;
use super::options::CsvReadOptions;
use super::read_impl::batched::to_batched_owned;
use super::read_impl::CoreReader;
use super::{infer_file_schema, BatchedCsvReader, OwnedBatchedCsvReader};
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::shared::SerReader;
use crate::utils::{get_reader_bytes, resolve_homedir};
#[must_use]
pub struct CsvReader<R>
where
R: MmapBytesReader,
{
reader: R,
options: CsvReadOptions,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
}
impl<R> CsvReader<R>
where
R: MmapBytesReader,
{
pub fn _with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
self.predicate = predicate;
self
}
pub(crate) fn with_schema(mut self, schema: SchemaRef) -> Self {
self.options.schema = Some(schema);
self
}
pub(crate) fn get_schema(&self) -> Option<SchemaRef> {
self.options.schema.clone()
}
}
impl CsvReadOptions {
pub fn try_into_reader_with_file_path(
mut self,
path: Option<PathBuf>,
) -> PolarsResult<CsvReader<File>> {
if self.path.is_some() {
assert!(
path.is_none(),
"impl error: only 1 of self.path or the path parameter is to be non-null"
);
} else {
self.path = path;
};
assert!(
self.path.is_some(),
"impl error: either one of self.path or the path parameter is to be non-null"
);
let path = resolve_homedir(self.path.as_ref().unwrap());
let reader = polars_utils::open_file(path)?;
let options = self;
Ok(CsvReader {
reader,
options,
predicate: None,
})
}
pub fn into_reader_with_file_handle<R: MmapBytesReader>(self, reader: R) -> CsvReader<R> {
let options = self;
CsvReader {
reader,
options,
predicate: Default::default(),
}
}
}
impl<R: MmapBytesReader> CsvReader<R> {
fn core_reader(
&mut self,
schema: Option<SchemaRef>,
to_cast: Vec<Field>,
) -> PolarsResult<CoreReader> {
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let parse_options = self.options.get_parse_options();
CoreReader::new(
reader_bytes,
self.options.n_rows,
self.options.skip_rows,
self.options.projection.clone().map(|x| x.as_ref().clone()),
self.options.infer_schema_length,
Some(parse_options.separator),
self.options.has_header,
self.options.ignore_errors,
self.options.schema.clone(),
self.options.columns.clone(),
parse_options.encoding,
self.options.n_threads,
schema,
self.options.dtype_overwrite.clone(),
self.options.sample_size,
self.options.chunk_size,
self.options.low_memory,
parse_options.comment_prefix.clone(),
parse_options.quote_char,
parse_options.eol_char,
parse_options.null_values.clone(),
parse_options.missing_is_null,
self.predicate.clone(),
to_cast,
self.options.skip_rows_after_header,
self.options.row_index.clone(),
parse_options.try_parse_dates,
self.options.raise_if_empty,
parse_options.truncate_ragged_lines,
parse_options.decimal_comma,
)
}
fn prepare_schema_overwrite(
&self,
overwriting_schema: &Schema,
) -> PolarsResult<(Schema, Vec<Field>, bool)> {
let mut to_cast = Vec::with_capacity(overwriting_schema.len());
let mut _has_categorical = false;
let mut _err: Option<PolarsError> = None;
#[allow(unused_mut)]
let schema = overwriting_schema
.iter_fields()
.filter_map(|mut fld| {
use DataType::*;
match fld.data_type() {
Time => {
to_cast.push(fld);
None
},
#[cfg(feature = "dtype-categorical")]
Categorical(_, _) => {
_has_categorical = true;
Some(fld)
},
#[cfg(feature = "dtype-decimal")]
Decimal(precision, scale) => match (precision, scale) {
(_, Some(_)) => {
to_cast.push(fld.clone());
fld.coerce(String);
Some(fld)
},
_ => {
_err = Some(PolarsError::ComputeError(
"'scale' must be set when reading csv column as Decimal".into(),
));
None
},
},
_ => Some(fld),
}
})
.collect::<Schema>();
if let Some(err) = _err {
Err(err)
} else {
Ok((schema, to_cast, _has_categorical))
}
}
pub fn batched_borrowed(&mut self) -> PolarsResult<BatchedCsvReader> {
if let Some(schema) = self.options.schema_overwrite.as_deref() {
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema)?;
let schema = Arc::new(schema);
let csv_reader = self.core_reader(Some(schema), to_cast)?;
csv_reader.batched(has_cat)
} else {
let csv_reader = self.core_reader(self.options.schema.clone(), vec![])?;
csv_reader.batched(false)
}
}
}
impl CsvReader<Box<dyn MmapBytesReader>> {
pub fn batched(mut self, schema: Option<SchemaRef>) -> PolarsResult<OwnedBatchedCsvReader> {
match schema {
Some(schema) => Ok(to_batched_owned(self.with_schema(schema))),
None => {
let parse_options = self.options.get_parse_options();
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let (inferred_schema, _, _) = infer_file_schema(
&reader_bytes,
parse_options.separator,
self.options.infer_schema_length,
self.options.has_header,
None,
self.options.skip_rows,
self.options.skip_rows_after_header,
parse_options.comment_prefix.as_ref(),
parse_options.quote_char,
parse_options.eol_char,
parse_options.null_values.as_ref(),
parse_options.try_parse_dates,
self.options.raise_if_empty,
&mut self.options.n_threads,
parse_options.decimal_comma,
)?;
let schema = Arc::new(inferred_schema);
Ok(to_batched_owned(self.with_schema(schema)))
},
}
}
}
impl<R> SerReader<R> for CsvReader<R>
where
R: MmapBytesReader,
{
fn new(reader: R) -> Self {
CsvReader {
reader,
options: Default::default(),
predicate: None,
}
}
fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.options.rechunk;
let schema_overwrite = self.options.schema_overwrite.clone();
let low_memory = self.options.low_memory;
#[cfg(feature = "dtype-categorical")]
let mut _cat_lock = None;
let mut df = if let Some(schema) = schema_overwrite.as_deref() {
let (schema, to_cast, _has_cat) = self.prepare_schema_overwrite(schema)?;
#[cfg(feature = "dtype-categorical")]
if _has_cat {
_cat_lock = Some(polars_core::StringCacheHolder::hold())
}
let mut csv_reader = self.core_reader(Some(Arc::new(schema)), to_cast)?;
csv_reader.as_df()?
} else {
#[cfg(feature = "dtype-categorical")]
{
let has_cat = self
.options
.schema
.clone()
.map(|schema| {
schema
.iter_dtypes()
.any(|dtype| matches!(dtype, DataType::Categorical(_, _)))
})
.unwrap_or(false);
if has_cat {
_cat_lock = Some(polars_core::StringCacheHolder::hold())
}
}
let mut csv_reader = self.core_reader(self.options.schema.clone(), vec![])?;
csv_reader.as_df()?
};
if rechunk && df.n_chunks() > 1 {
if low_memory {
df.as_single_chunk();
} else {
df.as_single_chunk_par();
}
}
#[cfg(feature = "temporal")]
{
let parse_options = self.options.get_parse_options();
if parse_options.try_parse_dates {
let fixed_schema = match (schema_overwrite, self.options.dtype_overwrite) {
(Some(schema), _) => schema,
(None, Some(dtypes)) => {
let schema = dtypes
.iter()
.zip(df.get_column_names())
.map(|(dtype, name)| Field::new(name, dtype.clone()))
.collect::<Schema>();
Arc::new(schema)
},
_ => Arc::default(),
};
df = parse_dates(df, &fixed_schema)
}
}
Ok(df)
}
}
#[cfg(feature = "temporal")]
fn parse_dates(mut df: DataFrame, fixed_schema: &Schema) -> DataFrame {
use polars_core::POOL;
let cols = unsafe { std::mem::take(df.get_columns_mut()) }
.into_par_iter()
.map(|s| {
match s.dtype() {
DataType::String => {
let ca = s.str().unwrap();
if fixed_schema.index_of(s.name()).is_some() {
return s;
}
#[cfg(feature = "dtype-time")]
if let Ok(ca) = ca.as_time(None, false) {
return ca.into_series();
}
s
},
_ => s,
}
});
let cols = POOL.install(|| cols.collect::<Vec<_>>());
unsafe { DataFrame::new_no_checks(cols) }
}