1use std::fmt;
2use std::sync::Mutex;
3
4use polars_buffer::{Buffer, SharedStorage};
5use polars_core::POOL;
6use polars_core::prelude::*;
7use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
8#[cfg(feature = "polars-time")]
9use polars_time::prelude::*;
10use polars_utils::relaxed_cell::RelaxedCell;
11use rayon::prelude::*;
12
13use super::CsvParseOptions;
14use super::builder::init_builders;
15use super::options::{CsvEncoding, NullValuesCompiled};
16use super::parser::{CountLines, is_comment_line, parse_lines};
17use super::reader::prepare_csv_schema;
18#[cfg(feature = "decompress")]
19use super::utils::decompress;
20use crate::RowIndex;
21use crate::csv::read::{CsvReadOptions, read_until_start_and_infer_schema_from_compressed_reader};
22use crate::mmap::ReaderBytes;
23use crate::predicates::PhysicalIoExpr;
24use crate::utils::compression::{CompressedReader, SupportedCompression};
25use crate::utils::update_row_counts2;
26
27pub fn cast_columns(
28 df: &mut DataFrame,
29 to_cast: &[Field],
30 parallel: bool,
31 ignore_errors: bool,
32) -> PolarsResult<()> {
33 let cast_fn = |c: &Column, fld: &Field| {
34 let out = match (c.dtype(), fld.dtype()) {
35 #[cfg(feature = "temporal")]
36 (DataType::String, DataType::Date) => c
37 .str()
38 .unwrap()
39 .as_date(None, false)
40 .map(|ca| ca.into_column()),
41 #[cfg(feature = "temporal")]
42 (DataType::String, DataType::Time) => c
43 .str()
44 .unwrap()
45 .as_time(None, false)
46 .map(|ca| ca.into_column()),
47 #[cfg(feature = "temporal")]
48 (DataType::String, DataType::Datetime(tu, _)) => c
49 .str()
50 .unwrap()
51 .as_datetime(
52 None,
53 *tu,
54 false,
55 false,
56 None,
57 &StringChunked::from_iter(std::iter::once("raise")),
58 )
59 .map(|ca| ca.into_column()),
60 (_, dt) => c.cast(dt),
61 }?;
62 if !ignore_errors && c.null_count() != out.null_count() {
63 handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
64 }
65 Ok(out)
66 };
67
68 if parallel {
69 let cols = POOL.install(|| {
70 df.columns()
71 .into_par_iter()
72 .map(|s| {
73 if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
74 cast_fn(s, fld)
75 } else {
76 Ok(s.clone())
77 }
78 })
79 .collect::<PolarsResult<Vec<_>>>()
80 })?;
81 *df = unsafe { DataFrame::new_unchecked(df.height(), cols) }
82 } else {
83 for fld in to_cast {
85 if let Some(idx) = df.get_column_index(fld.name()) {
87 df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
88 }
89 }
90 }
91 Ok(())
92}
93
94struct ReaderBytesAndDependents<'a> {
95 compressed_reader: CompressedReader,
99 leftover: Buffer<u8>,
101 _reader_bytes: ReaderBytes<'a>,
102}
103
104pub(crate) struct CoreReader<'a> {
106 reader_bytes: Option<ReaderBytesAndDependents<'a>>,
107
108 schema: SchemaRef,
110 parse_options: CsvParseOptions,
111 projection: Option<Vec<usize>>,
113 current_line: usize,
115 ignore_errors: bool,
116 n_rows: Option<usize>,
117 n_threads: Option<usize>,
118 null_values: Option<NullValuesCompiled>,
119 predicate: Option<Arc<dyn PhysicalIoExpr>>,
120 to_cast: Vec<Field>,
121 row_index: Option<RowIndex>,
122}
123
124impl fmt::Debug for CoreReader<'_> {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.debug_struct("Reader")
127 .field("schema", &self.schema)
128 .field("projection", &self.projection)
129 .field("current_line", &self.current_line)
130 .finish()
131 }
132}
133
134impl<'a> CoreReader<'a> {
135 #[allow(clippy::too_many_arguments)]
136 pub(crate) fn new(
137 reader_bytes: ReaderBytes<'a>,
138 parse_options: Arc<CsvParseOptions>,
139 n_rows: Option<usize>,
140 skip_rows: usize,
141 skip_lines: usize,
142 mut projection: Option<Vec<usize>>,
143 max_records: Option<usize>,
144 has_header: bool,
145 ignore_errors: bool,
146 schema: Option<SchemaRef>,
147 columns: Option<Arc<[PlSmallStr]>>,
148 n_threads: Option<usize>,
149 schema_overwrite: Option<SchemaRef>,
150 dtype_overwrite: Option<Arc<Vec<DataType>>>,
151 predicate: Option<Arc<dyn PhysicalIoExpr>>,
152 mut to_cast: Vec<Field>,
153 skip_rows_after_header: usize,
154 row_index: Option<RowIndex>,
155 raise_if_empty: bool,
156 ) -> PolarsResult<CoreReader<'a>> {
157 let separator = parse_options.separator;
158
159 #[cfg(feature = "decompress")]
160 let mut reader_bytes = reader_bytes;
161
162 if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
163 polars_bail!(
164 ComputeError: "cannot read compressed CSV file; \
165 compile with feature 'decompress'"
166 );
167 }
168 #[cfg(feature = "decompress")]
172 {
173 let total_n_rows =
174 n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
175 if let Some(b) = decompress(
176 &reader_bytes,
177 total_n_rows,
178 separator,
179 parse_options.quote_char,
180 parse_options.eol_char,
181 ) {
182 reader_bytes = ReaderBytes::Owned(b.into());
183 }
184 }
185
186 let reader_slice = match &reader_bytes {
187 ReaderBytes::Borrowed(slice) => {
188 let ss = unsafe { SharedStorage::from_slice_unchecked(slice) };
191 Buffer::from_storage(ss)
192 },
193 ReaderBytes::Owned(slice) => slice.clone(),
194 };
195 let mut compressed_reader = CompressedReader::try_new(reader_slice)?;
196
197 let read_options = CsvReadOptions {
198 parse_options: parse_options.clone(),
199 n_rows,
200 skip_rows,
201 skip_lines,
202 projection: projection.clone().map(Arc::new),
203 has_header,
204 ignore_errors,
205 schema: schema.clone(),
206 columns: columns.clone(),
207 n_threads,
208 schema_overwrite,
209 dtype_overwrite: dtype_overwrite.clone(),
210 fields_to_cast: to_cast.clone(),
211 skip_rows_after_header,
212 row_index: row_index.clone(),
213 raise_if_empty,
214 infer_schema_length: max_records,
215 ..Default::default()
216 };
217
218 let (inferred_schema, leftover) = read_until_start_and_infer_schema_from_compressed_reader(
220 &read_options,
221 None,
222 None,
223 &mut compressed_reader,
224 )?;
225
226 let mut schema = match schema {
227 Some(schema) => schema,
228 None => Arc::new(inferred_schema),
229 };
230 if let Some(dtypes) = dtype_overwrite {
231 polars_ensure!(
232 dtypes.len() <= schema.len(),
233 InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
234 );
235 let s = Arc::make_mut(&mut schema);
236 for (index, dt) in dtypes.iter().enumerate() {
237 s.set_dtype_at_index(index, dt.clone()).unwrap();
238 }
239 }
240
241 prepare_csv_schema(&mut schema, &mut to_cast)?;
242
243 let null_values = parse_options
245 .null_values
246 .as_ref()
247 .map(|nv| nv.clone().compile(&schema))
248 .transpose()?;
249
250 if let Some(cols) = columns {
251 let mut prj = Vec::with_capacity(cols.len());
252 for col in cols.as_ref() {
253 let i = schema.try_index_of(col)?;
254 prj.push(i);
255 }
256 projection = Some(prj);
257 }
258
259 Ok(CoreReader {
260 reader_bytes: Some(ReaderBytesAndDependents {
261 compressed_reader,
262 leftover,
263 _reader_bytes: reader_bytes,
264 }),
265 parse_options: (*parse_options).clone(),
266 schema,
267 projection,
268 current_line: usize::from(has_header),
269 ignore_errors,
270 n_rows,
271 n_threads,
272 null_values,
273 predicate,
274 to_cast,
275 row_index,
276 })
277 }
278
279 fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
280 self.projection
283 .take()
284 .map(|mut v| {
285 v.sort_unstable();
286 if let Some(idx) = v.last() {
287 polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
288 }
289 Ok(v)
290 })
291 .unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
292 }
293
294 fn read_chunk(
295 &self,
296 bytes: &[u8],
297 projection: &[usize],
298 bytes_offset: usize,
299 capacity: usize,
300 starting_point_offset: Option<usize>,
301 stop_at_nbytes: usize,
302 ) -> PolarsResult<DataFrame> {
303 let mut df = read_chunk(
304 bytes,
305 &self.parse_options,
306 self.schema.as_ref(),
307 self.ignore_errors,
308 projection,
309 bytes_offset,
310 capacity,
311 self.null_values.as_ref(),
312 usize::MAX,
313 stop_at_nbytes,
314 starting_point_offset,
315 )?;
316
317 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
318 Ok(df)
319 }
320
321 fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
326 let projection = self.get_projection()?;
327
328 if bytes.is_empty() {
330 let mut df = if projection.len() == self.schema.len() {
331 DataFrame::empty_with_schema(self.schema.as_ref())
332 } else {
333 DataFrame::empty_with_schema(
334 &projection
335 .iter()
336 .map(|&i| self.schema.get_at_index(i).unwrap())
337 .map(|(name, dtype)| Field {
338 name: name.clone(),
339 dtype: dtype.clone(),
340 })
341 .collect::<Schema>(),
342 )
343 };
344
345 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
346
347 if let Some(ref row_index) = self.row_index {
348 df.insert_column(0, Column::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
349 }
350 return Ok(df);
351 }
352
353 let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
354
355 let n_cols = projection.len();
370 const ALLOCATION_BUDGET: usize = 500_000;
372 let max_chunks_for_width = ALLOCATION_BUDGET / n_cols.max(1);
373 let n_parts_hint = std::cmp::min(n_threads * 16, max_chunks_for_width.max(n_threads));
374 let chunk_size = std::cmp::min(bytes.len() / n_parts_hint.max(1), 16 * 1024 * 1024);
375
376 #[cfg(debug_assertions)]
378 let min_chunk_size = 64;
379 #[cfg(not(debug_assertions))]
380 let min_chunk_size = 1024 * 4;
381
382 let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
383 let mut total_bytes_offset = 0;
384
385 let results = Arc::new(Mutex::new(vec![]));
386 let total_line_count = &RelaxedCell::new_usize(0);
388
389 let counter = CountLines::new(
390 self.parse_options.quote_char,
391 self.parse_options.eol_char,
392 None,
393 );
394 let mut total_offset = 0;
395 let mut previous_total_offset = 0;
396 let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
397 && self.schema.iter_fields().any(|f| f.dtype().is_string());
398
399 POOL.scope(|s| {
400 loop {
402 let b = unsafe { bytes.get_unchecked(total_offset..) };
403 if b.is_empty() {
404 break;
405 }
406 debug_assert!(
407 total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
408 );
409
410 let (count, position) = counter.find_next(b, &mut chunk_size);
413 debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
414
415 let (b, count) = if count == 0
416 && unsafe {
417 std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
418 } {
419 total_offset = bytes.len();
420 let c = if is_comment_line(bytes, self.parse_options.comment_prefix.as_ref()) {
421 0
422 } else {
423 1
424 };
425 (b, c)
426 } else {
427 let end = total_offset + position + 1;
428 let b = unsafe { bytes.get_unchecked(total_offset..end) };
429
430 previous_total_offset = total_offset;
431 total_offset = end;
432 (b, count)
433 };
434
435 if !b.is_empty() {
437 let results = results.clone();
438 let projection = projection.as_ref();
439 let slf = &(*self);
440 s.spawn(move |_| {
441 if check_utf8 && !super::builder::validate_utf8(b) {
442 let mut results = results.lock().unwrap();
443 results.push((
444 b.as_ptr() as usize,
445 Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
446 ));
447 return;
448 }
449
450 let result = slf
451 .read_chunk(b, projection, 0, count, Some(0), b.len())
452 .and_then(|mut df| {
453 if df.height() > count
455 || (df.height() < count
456 && slf.parse_options.comment_prefix.is_none())
457 {
458 let msg = format!(
460 "CSV malformed: expected {} rows, \
461 actual {} rows, in chunk starting at \
462 byte offset {}, length {}",
463 count,
464 df.height(),
465 previous_total_offset,
466 b.len()
467 );
468 if slf.ignore_errors {
469 polars_warn!("{msg}");
470 } else {
471 polars_bail!(ComputeError: msg)
472 }
473 }
474
475 if slf.n_rows.is_some() {
476 total_line_count.fetch_add(df.height());
477 }
478
479 if let Some(rc) = &slf.row_index {
481 let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {
483 Some(rc.offset)
484 } else {
485 None
486 };
487
488 unsafe { df.with_row_index_mut(rc.name.clone(), offset) };
489 };
490
491 if let Some(predicate) = slf.predicate.as_ref() {
492 let s = predicate.evaluate_io(&df)?;
493 let mask = s.bool()?;
494 df = df.filter(mask)?;
495 }
496 Ok(df)
497 });
498
499 results.lock().unwrap().push((b.as_ptr() as usize, result));
500 });
501
502 if self.n_rows.is_some() && total_line_count.load() > self.n_rows.unwrap() {
505 break;
506 }
507 }
508 total_bytes_offset += b.len();
509 }
510 });
511
512 let mut results = std::mem::take(&mut *results.lock().unwrap());
513 results.sort_unstable_by_key(|k| k.0);
514 let mut dfs = results
515 .into_iter()
516 .map(|k| k.1)
517 .collect::<PolarsResult<Vec<_>>>()?;
518
519 if let Some(rc) = &self.row_index {
520 update_row_counts2(&mut dfs, rc.offset)
521 };
522 accumulate_dataframes_vertical(dfs)
523 }
524
525 pub fn finish(mut self) -> PolarsResult<DataFrame> {
527 let mut reader_bytes = self.reader_bytes.take().unwrap();
528 let (body_bytes, _) = reader_bytes
529 .compressed_reader
530 .read_next_slice(&reader_bytes.leftover, usize::MAX)?;
531
532 let mut df = self.parse_csv(&body_bytes)?;
533
534 if let Some(n_rows) = self.n_rows {
537 if n_rows < df.height() {
538 df = df.slice(0, n_rows)
539 }
540 }
541 Ok(df)
542 }
543}
544
545#[allow(clippy::too_many_arguments)]
546pub fn read_chunk(
547 bytes: &[u8],
548 parse_options: &CsvParseOptions,
549 schema: &Schema,
550 ignore_errors: bool,
551 projection: &[usize],
552 bytes_offset_thread: usize,
553 capacity: usize,
554 null_values: Option<&NullValuesCompiled>,
555 chunk_size: usize,
556 stop_at_nbytes: usize,
557 starting_point_offset: Option<usize>,
558) -> PolarsResult<DataFrame> {
559 let mut read = bytes_offset_thread;
560 let mut buffers = init_builders(
566 projection,
567 capacity + 1,
568 schema,
569 parse_options.quote_char,
570 parse_options.encoding,
571 parse_options.decimal_comma,
572 )?;
573
574 debug_assert!(projection.is_sorted());
575
576 let mut last_read = usize::MAX;
577 loop {
578 if read >= stop_at_nbytes || read == last_read {
579 break;
580 }
581 let local_bytes = &bytes[read..stop_at_nbytes];
582
583 last_read = read;
584 let offset = read + starting_point_offset.unwrap();
585 read += parse_lines(
586 local_bytes,
587 parse_options,
588 offset,
589 ignore_errors,
590 null_values,
591 projection,
592 &mut buffers,
593 chunk_size,
594 schema.len(),
595 schema,
596 )?;
597 }
598
599 let columns = buffers
600 .into_iter()
601 .map(|buf| buf.into_series().map(Column::from))
602 .collect::<PolarsResult<Vec<_>>>()?;
603 Ok(unsafe { DataFrame::new_unchecked_infer_height(columns) })
604}