polars_io/csv/read/
reader.rs

1use std::fs::File;
2use std::path::PathBuf;
3
4use polars_core::prelude::*;
5
6use super::options::CsvReadOptions;
7use super::read_impl::CoreReader;
8use super::read_impl::batched::to_batched_owned;
9use super::{BatchedCsvReader, OwnedBatchedCsvReader};
10use crate::mmap::MmapBytesReader;
11use crate::path_utils::resolve_homedir;
12use crate::predicates::PhysicalIoExpr;
13use crate::shared::SerReader;
14use crate::utils::get_reader_bytes;
15
16/// Create a new DataFrame by reading a csv file.
17///
18/// # Example
19///
20/// ```
21/// use polars_core::prelude::*;
22/// use polars_io::prelude::*;
23/// use std::fs::File;
24///
25/// fn example() -> PolarsResult<DataFrame> {
26///     CsvReadOptions::default()
27///             .with_has_header(true)
28///             .try_into_reader_with_file_path(Some("iris.csv".into()))?
29///             .finish()
30/// }
31/// ```
32#[must_use]
33pub struct CsvReader<R>
34where
35    R: MmapBytesReader,
36{
37    /// File or Stream object.
38    reader: R,
39    /// Options for the CSV reader.
40    options: CsvReadOptions,
41    predicate: Option<Arc<dyn PhysicalIoExpr>>,
42}
43
44impl<R> CsvReader<R>
45where
46    R: MmapBytesReader,
47{
48    pub fn _with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
49        self.predicate = predicate;
50        self
51    }
52
53    // TODO: Investigate if we can remove this
54    pub(crate) fn with_schema(mut self, schema: SchemaRef) -> Self {
55        self.options.schema = Some(schema);
56        self
57    }
58}
59
60impl CsvReadOptions {
61    /// Creates a CSV reader using a file path.
62    ///
63    /// # Panics
64    /// If both self.path and the path parameter are non-null. Only one of them is
65    /// to be non-null.
66    pub fn try_into_reader_with_file_path(
67        mut self,
68        path: Option<PathBuf>,
69    ) -> PolarsResult<CsvReader<File>> {
70        if self.path.is_some() {
71            assert!(
72                path.is_none(),
73                "impl error: only 1 of self.path or the path parameter is to be non-null"
74            );
75        } else {
76            self.path = path;
77        };
78
79        assert!(
80            self.path.is_some(),
81            "impl error: either one of self.path or the path parameter is to be non-null"
82        );
83
84        let path = resolve_homedir(self.path.as_ref().unwrap());
85        let reader = polars_utils::open_file(&path)?;
86        let options = self;
87
88        Ok(CsvReader {
89            reader,
90            options,
91            predicate: None,
92        })
93    }
94
95    /// Creates a CSV reader using a file handle.
96    pub fn into_reader_with_file_handle<R: MmapBytesReader>(self, reader: R) -> CsvReader<R> {
97        let options = self;
98
99        CsvReader {
100            reader,
101            options,
102            predicate: Default::default(),
103        }
104    }
105}
106
107impl<R: MmapBytesReader> CsvReader<R> {
108    fn core_reader(&mut self) -> PolarsResult<CoreReader> {
109        let reader_bytes = get_reader_bytes(&mut self.reader)?;
110
111        let parse_options = self.options.get_parse_options();
112
113        CoreReader::new(
114            reader_bytes,
115            parse_options,
116            self.options.n_rows,
117            self.options.skip_rows,
118            self.options.skip_lines,
119            self.options.projection.clone().map(|x| x.as_ref().clone()),
120            self.options.infer_schema_length,
121            self.options.has_header,
122            self.options.ignore_errors,
123            self.options.schema.clone(),
124            self.options.columns.clone(),
125            self.options.n_threads,
126            self.options.schema_overwrite.clone(),
127            self.options.dtype_overwrite.clone(),
128            self.options.chunk_size,
129            self.predicate.clone(),
130            self.options.fields_to_cast.clone(),
131            self.options.skip_rows_after_header,
132            self.options.row_index.clone(),
133            self.options.raise_if_empty,
134        )
135    }
136
137    pub fn batched_borrowed(&mut self) -> PolarsResult<BatchedCsvReader> {
138        let csv_reader = self.core_reader()?;
139        csv_reader.batched()
140    }
141}
142
143impl CsvReader<Box<dyn MmapBytesReader>> {
144    pub fn batched(mut self, schema: Option<SchemaRef>) -> PolarsResult<OwnedBatchedCsvReader> {
145        if let Some(schema) = schema {
146            self = self.with_schema(schema);
147        }
148
149        to_batched_owned(self)
150    }
151}
152
153impl<R> SerReader<R> for CsvReader<R>
154where
155    R: MmapBytesReader,
156{
157    /// Create a new CsvReader from a file/stream using default read options. To
158    /// use non-default read options, first construct [CsvReadOptions] and then use
159    /// any of the `(try)_into_` methods.
160    fn new(reader: R) -> Self {
161        CsvReader {
162            reader,
163            options: Default::default(),
164            predicate: None,
165        }
166    }
167
168    /// Read the file and create the DataFrame.
169    fn finish(mut self) -> PolarsResult<DataFrame> {
170        let rechunk = self.options.rechunk;
171        let low_memory = self.options.low_memory;
172
173        let csv_reader = self.core_reader()?;
174        let mut df = csv_reader.finish()?;
175
176        // Important that this rechunk is never done in parallel.
177        // As that leads to great memory overhead.
178        if rechunk && df.first_col_n_chunks() > 1 {
179            if low_memory {
180                df.as_single_chunk();
181            } else {
182                df.as_single_chunk_par();
183            }
184        }
185
186        Ok(df)
187    }
188}
189
190/// Splits datatypes that cannot be natively read into a `fields_to_cast` for
191/// post-read casting.
192///
193/// # Returns
194/// `has_categorical`
195pub fn prepare_csv_schema(
196    schema: &mut SchemaRef,
197    fields_to_cast: &mut Vec<Field>,
198) -> PolarsResult<bool> {
199    // This branch we check if there are dtypes we cannot parse.
200    // We only support a few dtypes in the parser and later cast to the required dtype
201    let mut _has_categorical = false;
202
203    let mut changed = false;
204
205    let new_schema = schema
206        .iter_fields()
207        .map(|mut fld| {
208            use DataType::*;
209
210            let mut matched = true;
211
212            let out = match fld.dtype() {
213                Time => {
214                    fields_to_cast.push(fld.clone());
215                    fld.coerce(String);
216                    PolarsResult::Ok(fld)
217                },
218                #[cfg(feature = "dtype-categorical")]
219                Categorical(_, _) => {
220                    _has_categorical = true;
221                    PolarsResult::Ok(fld)
222                },
223                #[cfg(feature = "dtype-decimal")]
224                Decimal(precision, scale) => match (precision, scale) {
225                    (_, Some(_)) => {
226                        fields_to_cast.push(fld.clone());
227                        fld.coerce(String);
228                        PolarsResult::Ok(fld)
229                    },
230                    _ => Err(PolarsError::ComputeError(
231                        "'scale' must be set when reading csv column as Decimal".into(),
232                    )),
233                },
234                _ => {
235                    matched = false;
236                    PolarsResult::Ok(fld)
237                },
238            }?;
239
240            changed |= matched;
241
242            PolarsResult::Ok(out)
243        })
244        .collect::<PolarsResult<Schema>>()?;
245
246    if changed {
247        *schema = Arc::new(new_schema);
248    }
249
250    Ok(_has_categorical)
251}