polars_io/csv/read/
reader.rs1use std::fs::File;
2use std::path::PathBuf;
3
4use polars_core::prelude::*;
5
6use super::options::CsvReadOptions;
7use super::read_impl::CoreReader;
8use crate::mmap::MmapBytesReader;
9use crate::path_utils::resolve_homedir;
10use crate::predicates::PhysicalIoExpr;
11use crate::shared::SerReader;
12use crate::utils::get_reader_bytes;
13
14#[must_use]
31pub struct CsvReader<R>
32where
33 R: MmapBytesReader,
34{
35 reader: R,
37 options: CsvReadOptions,
39 predicate: Option<Arc<dyn PhysicalIoExpr>>,
40}
41
42impl<R> CsvReader<R>
43where
44 R: MmapBytesReader,
45{
46 pub fn _with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
47 self.predicate = predicate;
48 self
49 }
50}
51
52impl CsvReadOptions {
53 pub fn try_into_reader_with_file_path(
59 mut self,
60 path: Option<PathBuf>,
61 ) -> PolarsResult<CsvReader<File>> {
62 if self.path.is_some() {
63 assert!(
64 path.is_none(),
65 "impl error: only 1 of self.path or the path parameter is to be non-null"
66 );
67 } else {
68 self.path = path;
69 };
70
71 assert!(
72 self.path.is_some(),
73 "impl error: either one of self.path or the path parameter is to be non-null"
74 );
75
76 let path = resolve_homedir(self.path.as_ref().unwrap());
77 let reader = polars_utils::open_file(&path)?;
78 let options = self;
79
80 Ok(CsvReader {
81 reader,
82 options,
83 predicate: None,
84 })
85 }
86
87 pub fn into_reader_with_file_handle<R: MmapBytesReader>(self, reader: R) -> CsvReader<R> {
89 let options = self;
90
91 CsvReader {
92 reader,
93 options,
94 predicate: Default::default(),
95 }
96 }
97}
98
99impl<R: MmapBytesReader> CsvReader<R> {
100 fn core_reader(&mut self) -> PolarsResult<CoreReader<'_>> {
101 let reader_bytes = get_reader_bytes(&mut self.reader)?;
102
103 let parse_options = self.options.get_parse_options();
104
105 CoreReader::new(
106 reader_bytes,
107 parse_options,
108 self.options.n_rows,
109 self.options.skip_rows,
110 self.options.skip_lines,
111 self.options.projection.clone().map(|x| x.as_ref().clone()),
112 self.options.infer_schema_length,
113 self.options.has_header,
114 self.options.ignore_errors,
115 self.options.schema.clone(),
116 self.options.columns.clone(),
117 self.options.n_threads,
118 self.options.schema_overwrite.clone(),
119 self.options.dtype_overwrite.clone(),
120 self.predicate.clone(),
121 self.options.fields_to_cast.clone(),
122 self.options.skip_rows_after_header,
123 self.options.row_index.clone(),
124 self.options.raise_if_empty,
125 )
126 }
127}
128
129impl<R> SerReader<R> for CsvReader<R>
130where
131 R: MmapBytesReader,
132{
133 fn new(reader: R) -> Self {
137 CsvReader {
138 reader,
139 options: Default::default(),
140 predicate: None,
141 }
142 }
143
144 fn finish(mut self) -> PolarsResult<DataFrame> {
146 let rechunk = self.options.rechunk;
147 let low_memory = self.options.low_memory;
148
149 let csv_reader = self.core_reader()?;
150 let mut df = csv_reader.finish()?;
151
152 if rechunk && df.first_col_n_chunks() > 1 {
155 if low_memory {
156 df.as_single_chunk();
157 } else {
158 df.as_single_chunk_par();
159 }
160 }
161
162 Ok(df)
163 }
164}
165
166impl<R: MmapBytesReader> CsvReader<R> {
167 pub fn with_options(mut self, options: CsvReadOptions) -> Self {
169 self.options = options;
170 self
171 }
172}
173
174pub fn prepare_csv_schema(
177 schema: &mut SchemaRef,
178 fields_to_cast: &mut Vec<Field>,
179) -> PolarsResult<()> {
180 let mut changed = false;
183
184 let new_schema = schema
185 .iter_fields()
186 .map(|mut fld| {
187 use DataType::*;
188
189 let mut matched = true;
190
191 let out = match fld.dtype() {
192 Time => {
193 fields_to_cast.push(fld.clone());
194 fld.coerce(String);
195 PolarsResult::Ok(fld)
196 },
197 _ => {
198 matched = false;
199 PolarsResult::Ok(fld)
200 },
201 }?;
202
203 changed |= matched;
204
205 PolarsResult::Ok(out)
206 })
207 .collect::<PolarsResult<Schema>>()?;
208
209 if changed {
210 *schema = Arc::new(new_schema);
211 }
212
213 Ok(())
214}