1use std::fmt;
2use std::sync::Mutex;
3
4use polars_core::POOL;
5use polars_core::prelude::*;
6use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
7#[cfg(feature = "polars-time")]
8use polars_time::prelude::*;
9use polars_utils::mmap::MemSlice;
10use polars_utils::relaxed_cell::RelaxedCell;
11use rayon::prelude::*;
12
13use super::CsvParseOptions;
14use super::buffer::init_buffers;
15use super::options::{CsvEncoding, NullValuesCompiled};
16use super::parser::{CountLines, is_comment_line, parse_lines};
17use super::reader::prepare_csv_schema;
18#[cfg(feature = "decompress")]
19use super::utils::decompress;
20use crate::RowIndex;
21use crate::csv::read::{CsvReadOptions, read_until_start_and_infer_schema};
22use crate::mmap::ReaderBytes;
23use crate::predicates::PhysicalIoExpr;
24use crate::utils::compression::{CompressedReader, SupportedCompression};
25use crate::utils::update_row_counts2;
26
27pub fn cast_columns(
28 df: &mut DataFrame,
29 to_cast: &[Field],
30 parallel: bool,
31 ignore_errors: bool,
32) -> PolarsResult<()> {
33 let cast_fn = |c: &Column, fld: &Field| {
34 let out = match (c.dtype(), fld.dtype()) {
35 #[cfg(feature = "temporal")]
36 (DataType::String, DataType::Date) => c
37 .str()
38 .unwrap()
39 .as_date(None, false)
40 .map(|ca| ca.into_column()),
41 #[cfg(feature = "temporal")]
42 (DataType::String, DataType::Time) => c
43 .str()
44 .unwrap()
45 .as_time(None, false)
46 .map(|ca| ca.into_column()),
47 #[cfg(feature = "temporal")]
48 (DataType::String, DataType::Datetime(tu, _)) => c
49 .str()
50 .unwrap()
51 .as_datetime(
52 None,
53 *tu,
54 false,
55 false,
56 None,
57 &StringChunked::from_iter(std::iter::once("raise")),
58 )
59 .map(|ca| ca.into_column()),
60 (_, dt) => c.cast(dt),
61 }?;
62 if !ignore_errors && c.null_count() != out.null_count() {
63 handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
64 }
65 Ok(out)
66 };
67
68 if parallel {
69 let cols = POOL.install(|| {
70 df.columns()
71 .into_par_iter()
72 .map(|s| {
73 if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
74 cast_fn(s, fld)
75 } else {
76 Ok(s.clone())
77 }
78 })
79 .collect::<PolarsResult<Vec<_>>>()
80 })?;
81 *df = unsafe { DataFrame::new_unchecked(df.height(), cols) }
82 } else {
83 for fld in to_cast {
85 if let Some(idx) = df.get_column_index(fld.name()) {
87 df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
88 }
89 }
90 }
91 Ok(())
92}
93
94struct ReaderBytesAndDependents<'a> {
95 compressed_reader: CompressedReader,
99 leftover: MemSlice,
101 _reader_bytes: ReaderBytes<'a>,
102}
103
104pub(crate) struct CoreReader<'a> {
106 reader_bytes: Option<ReaderBytesAndDependents<'a>>,
107
108 schema: SchemaRef,
110 parse_options: CsvParseOptions,
111 projection: Option<Vec<usize>>,
113 current_line: usize,
115 ignore_errors: bool,
116 n_rows: Option<usize>,
117 n_threads: Option<usize>,
118 null_values: Option<NullValuesCompiled>,
119 predicate: Option<Arc<dyn PhysicalIoExpr>>,
120 to_cast: Vec<Field>,
121 row_index: Option<RowIndex>,
122}
123
124impl fmt::Debug for CoreReader<'_> {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.debug_struct("Reader")
127 .field("schema", &self.schema)
128 .field("projection", &self.projection)
129 .field("current_line", &self.current_line)
130 .finish()
131 }
132}
133
134impl<'a> CoreReader<'a> {
135 #[allow(clippy::too_many_arguments)]
136 pub(crate) fn new(
137 reader_bytes: ReaderBytes<'a>,
138 parse_options: Arc<CsvParseOptions>,
139 n_rows: Option<usize>,
140 skip_rows: usize,
141 skip_lines: usize,
142 mut projection: Option<Vec<usize>>,
143 max_records: Option<usize>,
144 has_header: bool,
145 ignore_errors: bool,
146 schema: Option<SchemaRef>,
147 columns: Option<Arc<[PlSmallStr]>>,
148 n_threads: Option<usize>,
149 schema_overwrite: Option<SchemaRef>,
150 dtype_overwrite: Option<Arc<Vec<DataType>>>,
151 predicate: Option<Arc<dyn PhysicalIoExpr>>,
152 mut to_cast: Vec<Field>,
153 skip_rows_after_header: usize,
154 row_index: Option<RowIndex>,
155 raise_if_empty: bool,
156 ) -> PolarsResult<CoreReader<'a>> {
157 let separator = parse_options.separator;
158
159 #[cfg(feature = "decompress")]
160 let mut reader_bytes = reader_bytes;
161
162 if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
163 polars_bail!(
164 ComputeError: "cannot read compressed CSV file; \
165 compile with feature 'decompress'"
166 );
167 }
168 #[cfg(feature = "decompress")]
172 {
173 let total_n_rows =
174 n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
175 if let Some(b) = decompress(
176 &reader_bytes,
177 total_n_rows,
178 separator,
179 parse_options.quote_char,
180 parse_options.eol_char,
181 ) {
182 reader_bytes = ReaderBytes::Owned(b.into());
183 }
184 }
185
186 let reader_slice = match &reader_bytes {
187 ReaderBytes::Borrowed(slice) => {
188 let static_slice = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) };
191 MemSlice::from_static(static_slice)
192 },
193 ReaderBytes::Owned(slice) => slice.clone(),
194 };
195 let mut compressed_reader = CompressedReader::try_new(reader_slice)?;
196
197 let read_options = CsvReadOptions {
198 parse_options: parse_options.clone(),
199 n_rows,
200 skip_rows,
201 skip_lines,
202 projection: projection.clone().map(Arc::new),
203 has_header,
204 ignore_errors,
205 schema: schema.clone(),
206 columns: columns.clone(),
207 n_threads,
208 schema_overwrite,
209 dtype_overwrite: dtype_overwrite.clone(),
210 fields_to_cast: to_cast.clone(),
211 skip_rows_after_header,
212 row_index: row_index.clone(),
213 raise_if_empty,
214 infer_schema_length: max_records,
215 ..Default::default()
216 };
217
218 let (inferred_schema, leftover) =
220 read_until_start_and_infer_schema(&read_options, None, None, &mut compressed_reader)?;
221
222 let mut schema = match schema {
223 Some(schema) => schema,
224 None => Arc::new(inferred_schema),
225 };
226 if let Some(dtypes) = dtype_overwrite {
227 polars_ensure!(
228 dtypes.len() <= schema.len(),
229 InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
230 );
231 let s = Arc::make_mut(&mut schema);
232 for (index, dt) in dtypes.iter().enumerate() {
233 s.set_dtype_at_index(index, dt.clone()).unwrap();
234 }
235 }
236
237 prepare_csv_schema(&mut schema, &mut to_cast)?;
238
239 let null_values = parse_options
241 .null_values
242 .as_ref()
243 .map(|nv| nv.clone().compile(&schema))
244 .transpose()?;
245
246 if let Some(cols) = columns {
247 let mut prj = Vec::with_capacity(cols.len());
248 for col in cols.as_ref() {
249 let i = schema.try_index_of(col)?;
250 prj.push(i);
251 }
252 projection = Some(prj);
253 }
254
255 Ok(CoreReader {
256 reader_bytes: Some(ReaderBytesAndDependents {
257 compressed_reader,
258 leftover,
259 _reader_bytes: reader_bytes,
260 }),
261 parse_options: (*parse_options).clone(),
262 schema,
263 projection,
264 current_line: usize::from(has_header),
265 ignore_errors,
266 n_rows,
267 n_threads,
268 null_values,
269 predicate,
270 to_cast,
271 row_index,
272 })
273 }
274
275 fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
276 self.projection
279 .take()
280 .map(|mut v| {
281 v.sort_unstable();
282 if let Some(idx) = v.last() {
283 polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
284 }
285 Ok(v)
286 })
287 .unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
288 }
289
290 fn read_chunk(
291 &self,
292 bytes: &[u8],
293 projection: &[usize],
294 bytes_offset: usize,
295 capacity: usize,
296 starting_point_offset: Option<usize>,
297 stop_at_nbytes: usize,
298 ) -> PolarsResult<DataFrame> {
299 let mut df = read_chunk(
300 bytes,
301 &self.parse_options,
302 self.schema.as_ref(),
303 self.ignore_errors,
304 projection,
305 bytes_offset,
306 capacity,
307 self.null_values.as_ref(),
308 usize::MAX,
309 stop_at_nbytes,
310 starting_point_offset,
311 )?;
312
313 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
314 Ok(df)
315 }
316
317 fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
322 let projection = self.get_projection()?;
323
324 if bytes.is_empty() {
326 let mut df = if projection.len() == self.schema.len() {
327 DataFrame::empty_with_schema(self.schema.as_ref())
328 } else {
329 DataFrame::empty_with_schema(
330 &projection
331 .iter()
332 .map(|&i| self.schema.get_at_index(i).unwrap())
333 .map(|(name, dtype)| Field {
334 name: name.clone(),
335 dtype: dtype.clone(),
336 })
337 .collect::<Schema>(),
338 )
339 };
340
341 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
342
343 if let Some(ref row_index) = self.row_index {
344 df.insert_column(0, Column::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
345 }
346 return Ok(df);
347 }
348
349 let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
350
351 let n_cols = projection.len();
366 const ALLOCATION_BUDGET: usize = 500_000;
368 let max_chunks_for_width = ALLOCATION_BUDGET / n_cols.max(1);
369 let n_parts_hint = std::cmp::min(n_threads * 16, max_chunks_for_width.max(n_threads));
370 let chunk_size = std::cmp::min(bytes.len() / n_parts_hint.max(1), 16 * 1024 * 1024);
371
372 #[cfg(debug_assertions)]
374 let min_chunk_size = 64;
375 #[cfg(not(debug_assertions))]
376 let min_chunk_size = 1024 * 4;
377
378 let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
379 let mut total_bytes_offset = 0;
380
381 let results = Arc::new(Mutex::new(vec![]));
382 let total_line_count = &RelaxedCell::new_usize(0);
384
385 let counter = CountLines::new(
386 self.parse_options.quote_char,
387 self.parse_options.eol_char,
388 None,
389 );
390 let mut total_offset = 0;
391 let mut previous_total_offset = 0;
392 let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
393 && self.schema.iter_fields().any(|f| f.dtype().is_string());
394
395 POOL.scope(|s| {
396 loop {
398 let b = unsafe { bytes.get_unchecked(total_offset..) };
399 if b.is_empty() {
400 break;
401 }
402 debug_assert!(
403 total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
404 );
405
406 let (count, position) = counter.find_next(b, &mut chunk_size);
409 debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
410
411 let (b, count) = if count == 0
412 && unsafe {
413 std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
414 } {
415 total_offset = bytes.len();
416 let c = if is_comment_line(bytes, self.parse_options.comment_prefix.as_ref()) {
417 0
418 } else {
419 1
420 };
421 (b, c)
422 } else {
423 let end = total_offset + position + 1;
424 let b = unsafe { bytes.get_unchecked(total_offset..end) };
425
426 previous_total_offset = total_offset;
427 total_offset = end;
428 (b, count)
429 };
430
431 if !b.is_empty() {
433 let results = results.clone();
434 let projection = projection.as_ref();
435 let slf = &(*self);
436 s.spawn(move |_| {
437 if check_utf8 && !super::buffer::validate_utf8(b) {
438 let mut results = results.lock().unwrap();
439 results.push((
440 b.as_ptr() as usize,
441 Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
442 ));
443 return;
444 }
445
446 let result = slf
447 .read_chunk(b, projection, 0, count, Some(0), b.len())
448 .and_then(|mut df| {
449 if df.height() > count
451 || (df.height() < count
452 && slf.parse_options.comment_prefix.is_none())
453 {
454 let msg = format!(
456 "CSV malformed: expected {} rows, \
457 actual {} rows, in chunk starting at \
458 byte offset {}, length {}",
459 count,
460 df.height(),
461 previous_total_offset,
462 b.len()
463 );
464 if slf.ignore_errors {
465 polars_warn!("{}", msg);
466 } else {
467 polars_bail!(ComputeError: msg);
468 }
469 }
470
471 if slf.n_rows.is_some() {
472 total_line_count.fetch_add(df.height());
473 }
474
475 if let Some(rc) = &slf.row_index {
477 let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {
479 Some(rc.offset)
480 } else {
481 None
482 };
483
484 unsafe { df.with_row_index_mut(rc.name.clone(), offset) };
485 };
486
487 if let Some(predicate) = slf.predicate.as_ref() {
488 let s = predicate.evaluate_io(&df)?;
489 let mask = s.bool()?;
490 df = df.filter(mask)?;
491 }
492 Ok(df)
493 });
494
495 results.lock().unwrap().push((b.as_ptr() as usize, result));
496 });
497
498 if self.n_rows.is_some() && total_line_count.load() > self.n_rows.unwrap() {
501 break;
502 }
503 }
504 total_bytes_offset += b.len();
505 }
506 });
507
508 let mut results = std::mem::take(&mut *results.lock().unwrap());
509 results.sort_unstable_by_key(|k| k.0);
510 let mut dfs = results
511 .into_iter()
512 .map(|k| k.1)
513 .collect::<PolarsResult<Vec<_>>>()?;
514
515 if let Some(rc) = &self.row_index {
516 update_row_counts2(&mut dfs, rc.offset)
517 };
518 accumulate_dataframes_vertical(dfs)
519 }
520
521 pub fn finish(mut self) -> PolarsResult<DataFrame> {
523 let mut reader_bytes = self.reader_bytes.take().unwrap();
524 let (body_bytes, _) = reader_bytes
525 .compressed_reader
526 .read_next_slice(&reader_bytes.leftover, usize::MAX)?;
527
528 let mut df = self.parse_csv(&body_bytes)?;
529
530 if let Some(n_rows) = self.n_rows {
533 if n_rows < df.height() {
534 df = df.slice(0, n_rows)
535 }
536 }
537 Ok(df)
538 }
539}
540
541#[allow(clippy::too_many_arguments)]
542pub fn read_chunk(
543 bytes: &[u8],
544 parse_options: &CsvParseOptions,
545 schema: &Schema,
546 ignore_errors: bool,
547 projection: &[usize],
548 bytes_offset_thread: usize,
549 capacity: usize,
550 null_values: Option<&NullValuesCompiled>,
551 chunk_size: usize,
552 stop_at_nbytes: usize,
553 starting_point_offset: Option<usize>,
554) -> PolarsResult<DataFrame> {
555 let mut read = bytes_offset_thread;
556 let mut buffers = init_buffers(
562 projection,
563 capacity + 1,
564 schema,
565 parse_options.quote_char,
566 parse_options.encoding,
567 parse_options.decimal_comma,
568 )?;
569
570 debug_assert!(projection.is_sorted());
571
572 let mut last_read = usize::MAX;
573 loop {
574 if read >= stop_at_nbytes || read == last_read {
575 break;
576 }
577 let local_bytes = &bytes[read..stop_at_nbytes];
578
579 last_read = read;
580 let offset = read + starting_point_offset.unwrap();
581 read += parse_lines(
582 local_bytes,
583 parse_options,
584 offset,
585 ignore_errors,
586 null_values,
587 projection,
588 &mut buffers,
589 chunk_size,
590 schema.len(),
591 schema,
592 )?;
593 }
594
595 let columns = buffers
596 .into_iter()
597 .map(|buf| buf.into_series().map(Column::from))
598 .collect::<PolarsResult<Vec<_>>>()?;
599 Ok(unsafe { DataFrame::new_unchecked_infer_height(columns) })
600}