polars_io/csv/read/
reader.rs

1use std::fs::File;
2use std::path::PathBuf;
3
4use polars_core::prelude::*;
5
6use super::options::CsvReadOptions;
7use super::read_impl::CoreReader;
8use crate::mmap::MmapBytesReader;
9use crate::path_utils::resolve_homedir;
10use crate::predicates::PhysicalIoExpr;
11use crate::shared::SerReader;
12use crate::utils::get_reader_bytes;
13
14/// Create a new DataFrame by reading a csv file.
15///
16/// # Example
17///
18/// ```
19/// use polars_core::prelude::*;
20/// use polars_io::prelude::*;
21/// use std::fs::File;
22///
23/// fn example() -> PolarsResult<DataFrame> {
24///     CsvReadOptions::default()
25///             .with_has_header(true)
26///             .try_into_reader_with_file_path(Some("iris.csv".into()))?
27///             .finish()
28/// }
29/// ```
30#[must_use]
31pub struct CsvReader<R>
32where
33    R: MmapBytesReader,
34{
35    /// File or Stream object.
36    reader: R,
37    /// Options for the CSV reader.
38    options: CsvReadOptions,
39    predicate: Option<Arc<dyn PhysicalIoExpr>>,
40}
41
42impl<R> CsvReader<R>
43where
44    R: MmapBytesReader,
45{
46    pub fn _with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
47        self.predicate = predicate;
48        self
49    }
50}
51
52impl CsvReadOptions {
53    /// Creates a CSV reader using a file path.
54    ///
55    /// # Panics
56    /// If both self.path and the path parameter are non-null. Only one of them is
57    /// to be non-null.
58    pub fn try_into_reader_with_file_path(
59        mut self,
60        path: Option<PathBuf>,
61    ) -> PolarsResult<CsvReader<File>> {
62        if self.path.is_some() {
63            assert!(
64                path.is_none(),
65                "impl error: only 1 of self.path or the path parameter is to be non-null"
66            );
67        } else {
68            self.path = path;
69        };
70
71        assert!(
72            self.path.is_some(),
73            "impl error: either one of self.path or the path parameter is to be non-null"
74        );
75
76        let path = resolve_homedir(self.path.as_ref().unwrap());
77        let reader = polars_utils::open_file(&path)?;
78        let options = self;
79
80        Ok(CsvReader {
81            reader,
82            options,
83            predicate: None,
84        })
85    }
86
87    /// Creates a CSV reader using a file handle.
88    pub fn into_reader_with_file_handle<R: MmapBytesReader>(self, reader: R) -> CsvReader<R> {
89        let options = self;
90
91        CsvReader {
92            reader,
93            options,
94            predicate: Default::default(),
95        }
96    }
97}
98
99impl<R: MmapBytesReader> CsvReader<R> {
100    fn core_reader(&mut self) -> PolarsResult<CoreReader<'_>> {
101        let reader_bytes = get_reader_bytes(&mut self.reader)?;
102
103        let parse_options = self.options.get_parse_options();
104
105        CoreReader::new(
106            reader_bytes,
107            parse_options,
108            self.options.n_rows,
109            self.options.skip_rows,
110            self.options.skip_lines,
111            self.options.projection.clone().map(|x| x.as_ref().clone()),
112            self.options.infer_schema_length,
113            self.options.has_header,
114            self.options.ignore_errors,
115            self.options.schema.clone(),
116            self.options.columns.clone(),
117            self.options.n_threads,
118            self.options.schema_overwrite.clone(),
119            self.options.dtype_overwrite.clone(),
120            self.predicate.clone(),
121            self.options.fields_to_cast.clone(),
122            self.options.skip_rows_after_header,
123            self.options.row_index.clone(),
124            self.options.raise_if_empty,
125        )
126    }
127}
128
129impl<R> SerReader<R> for CsvReader<R>
130where
131    R: MmapBytesReader,
132{
133    /// Create a new CsvReader from a file/stream using default read options. To
134    /// use non-default read options, first construct [CsvReadOptions] and then use
135    /// any of the `(try)_into_` methods.
136    fn new(reader: R) -> Self {
137        CsvReader {
138            reader,
139            options: Default::default(),
140            predicate: None,
141        }
142    }
143
144    /// Read the file and create the DataFrame.
145    fn finish(mut self) -> PolarsResult<DataFrame> {
146        let rechunk = self.options.rechunk;
147        let low_memory = self.options.low_memory;
148
149        let csv_reader = self.core_reader()?;
150        let mut df = csv_reader.finish()?;
151
152        // Important that this rechunk is never done in parallel.
153        // As that leads to great memory overhead.
154        if rechunk && df.first_col_n_chunks() > 1 {
155            if low_memory {
156                df.as_single_chunk();
157            } else {
158                df.as_single_chunk_par();
159            }
160        }
161
162        Ok(df)
163    }
164}
165
166impl<R: MmapBytesReader> CsvReader<R> {
167    /// Sets custom CSV read options.
168    pub fn with_options(mut self, options: CsvReadOptions) -> Self {
169        self.options = options;
170        self
171    }
172}
173
174/// Splits datatypes that cannot be natively read into a `fields_to_cast` for
175/// post-read casting.
176pub fn prepare_csv_schema(
177    schema: &mut SchemaRef,
178    fields_to_cast: &mut Vec<Field>,
179) -> PolarsResult<()> {
180    // This branch we check if there are dtypes we cannot parse.
181    // We only support a few dtypes in the parser and later cast to the required dtype.
182    let mut changed = false;
183
184    let new_schema = schema
185        .iter_fields()
186        .map(|mut fld| {
187            use DataType::*;
188
189            let mut matched = true;
190
191            let out = match fld.dtype() {
192                Time => {
193                    fields_to_cast.push(fld.clone());
194                    fld.coerce(String);
195                    PolarsResult::Ok(fld)
196                },
197                _ => {
198                    matched = false;
199                    PolarsResult::Ok(fld)
200                },
201            }?;
202
203            changed |= matched;
204
205            PolarsResult::Ok(out)
206        })
207        .collect::<PolarsResult<Schema>>()?;
208
209    if changed {
210        *schema = Arc::new(new_schema);
211    }
212
213    Ok(())
214}