polars_io/csv/read/
reader.rs1use std::fs::File;
2use std::path::PathBuf;
3
4use polars_core::prelude::*;
5
6use super::options::CsvReadOptions;
7use super::read_impl::CoreReader;
8use super::read_impl::batched::to_batched_owned;
9use super::{BatchedCsvReader, OwnedBatchedCsvReader};
10use crate::mmap::MmapBytesReader;
11use crate::path_utils::resolve_homedir;
12use crate::predicates::PhysicalIoExpr;
13use crate::shared::SerReader;
14use crate::utils::get_reader_bytes;
15
16#[must_use]
33pub struct CsvReader<R>
34where
35 R: MmapBytesReader,
36{
37 reader: R,
39 options: CsvReadOptions,
41 predicate: Option<Arc<dyn PhysicalIoExpr>>,
42}
43
44impl<R> CsvReader<R>
45where
46 R: MmapBytesReader,
47{
48 pub fn _with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
49 self.predicate = predicate;
50 self
51 }
52
53 pub(crate) fn with_schema(mut self, schema: SchemaRef) -> Self {
55 self.options.schema = Some(schema);
56 self
57 }
58}
59
60impl CsvReadOptions {
61 pub fn try_into_reader_with_file_path(
67 mut self,
68 path: Option<PathBuf>,
69 ) -> PolarsResult<CsvReader<File>> {
70 if self.path.is_some() {
71 assert!(
72 path.is_none(),
73 "impl error: only 1 of self.path or the path parameter is to be non-null"
74 );
75 } else {
76 self.path = path;
77 };
78
79 assert!(
80 self.path.is_some(),
81 "impl error: either one of self.path or the path parameter is to be non-null"
82 );
83
84 let path = resolve_homedir(self.path.as_ref().unwrap());
85 let reader = polars_utils::open_file(&path)?;
86 let options = self;
87
88 Ok(CsvReader {
89 reader,
90 options,
91 predicate: None,
92 })
93 }
94
95 pub fn into_reader_with_file_handle<R: MmapBytesReader>(self, reader: R) -> CsvReader<R> {
97 let options = self;
98
99 CsvReader {
100 reader,
101 options,
102 predicate: Default::default(),
103 }
104 }
105}
106
107impl<R: MmapBytesReader> CsvReader<R> {
108 fn core_reader(&mut self) -> PolarsResult<CoreReader> {
109 let reader_bytes = get_reader_bytes(&mut self.reader)?;
110
111 let parse_options = self.options.get_parse_options();
112
113 CoreReader::new(
114 reader_bytes,
115 parse_options,
116 self.options.n_rows,
117 self.options.skip_rows,
118 self.options.skip_lines,
119 self.options.projection.clone().map(|x| x.as_ref().clone()),
120 self.options.infer_schema_length,
121 self.options.has_header,
122 self.options.ignore_errors,
123 self.options.schema.clone(),
124 self.options.columns.clone(),
125 self.options.n_threads,
126 self.options.schema_overwrite.clone(),
127 self.options.dtype_overwrite.clone(),
128 self.options.chunk_size,
129 self.predicate.clone(),
130 self.options.fields_to_cast.clone(),
131 self.options.skip_rows_after_header,
132 self.options.row_index.clone(),
133 self.options.raise_if_empty,
134 )
135 }
136
137 pub fn batched_borrowed(&mut self) -> PolarsResult<BatchedCsvReader> {
138 let csv_reader = self.core_reader()?;
139 csv_reader.batched()
140 }
141}
142
143impl CsvReader<Box<dyn MmapBytesReader>> {
144 pub fn batched(mut self, schema: Option<SchemaRef>) -> PolarsResult<OwnedBatchedCsvReader> {
145 if let Some(schema) = schema {
146 self = self.with_schema(schema);
147 }
148
149 to_batched_owned(self)
150 }
151}
152
153impl<R> SerReader<R> for CsvReader<R>
154where
155 R: MmapBytesReader,
156{
157 fn new(reader: R) -> Self {
161 CsvReader {
162 reader,
163 options: Default::default(),
164 predicate: None,
165 }
166 }
167
168 fn finish(mut self) -> PolarsResult<DataFrame> {
170 let rechunk = self.options.rechunk;
171 let low_memory = self.options.low_memory;
172
173 let csv_reader = self.core_reader()?;
174 let mut df = csv_reader.finish()?;
175
176 if rechunk && df.first_col_n_chunks() > 1 {
179 if low_memory {
180 df.as_single_chunk();
181 } else {
182 df.as_single_chunk_par();
183 }
184 }
185
186 Ok(df)
187 }
188}
189
190pub fn prepare_csv_schema(
196 schema: &mut SchemaRef,
197 fields_to_cast: &mut Vec<Field>,
198) -> PolarsResult<bool> {
199 let mut _has_categorical = false;
202
203 let mut changed = false;
204
205 let new_schema = schema
206 .iter_fields()
207 .map(|mut fld| {
208 use DataType::*;
209
210 let mut matched = true;
211
212 let out = match fld.dtype() {
213 Time => {
214 fields_to_cast.push(fld.clone());
215 fld.coerce(String);
216 PolarsResult::Ok(fld)
217 },
218 #[cfg(feature = "dtype-categorical")]
219 Categorical(_, _) => {
220 _has_categorical = true;
221 PolarsResult::Ok(fld)
222 },
223 #[cfg(feature = "dtype-decimal")]
224 Decimal(precision, scale) => match (precision, scale) {
225 (_, Some(_)) => {
226 fields_to_cast.push(fld.clone());
227 fld.coerce(String);
228 PolarsResult::Ok(fld)
229 },
230 _ => Err(PolarsError::ComputeError(
231 "'scale' must be set when reading csv column as Decimal".into(),
232 )),
233 },
234 _ => {
235 matched = false;
236 PolarsResult::Ok(fld)
237 },
238 }?;
239
240 changed |= matched;
241
242 PolarsResult::Ok(out)
243 })
244 .collect::<PolarsResult<Schema>>()?;
245
246 if changed {
247 *schema = Arc::new(new_schema);
248 }
249
250 Ok(_has_categorical)
251}