Skip to main content

polars_io/csv/read/
read_impl.rs

1use std::fmt;
2use std::sync::Mutex;
3
4use polars_buffer::{Buffer, SharedStorage};
5use polars_core::POOL;
6use polars_core::prelude::*;
7use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
8#[cfg(feature = "polars-time")]
9use polars_time::prelude::*;
10use polars_utils::relaxed_cell::RelaxedCell;
11use rayon::prelude::*;
12
13use super::CsvParseOptions;
14use super::builder::init_builders;
15use super::options::{CsvEncoding, NullValuesCompiled};
16use super::parser::{CountLines, is_comment_line, parse_lines};
17use super::reader::prepare_csv_schema;
18#[cfg(feature = "decompress")]
19use super::utils::decompress;
20use crate::RowIndex;
21use crate::csv::read::{CsvReadOptions, read_until_start_and_infer_schema_from_compressed_reader};
22use crate::mmap::ReaderBytes;
23use crate::predicates::PhysicalIoExpr;
24use crate::utils::compression::{CompressedReader, SupportedCompression};
25use crate::utils::update_row_counts2;
26
27pub fn cast_columns(
28    df: &mut DataFrame,
29    to_cast: &[Field],
30    parallel: bool,
31    ignore_errors: bool,
32) -> PolarsResult<()> {
33    let cast_fn = |c: &Column, fld: &Field| {
34        let out = match (c.dtype(), fld.dtype()) {
35            #[cfg(feature = "temporal")]
36            (DataType::String, DataType::Date) => c
37                .str()
38                .unwrap()
39                .as_date(None, false)
40                .map(|ca| ca.into_column()),
41            #[cfg(feature = "temporal")]
42            (DataType::String, DataType::Time) => c
43                .str()
44                .unwrap()
45                .as_time(None, false)
46                .map(|ca| ca.into_column()),
47            #[cfg(feature = "temporal")]
48            (DataType::String, DataType::Datetime(tu, _)) => c
49                .str()
50                .unwrap()
51                .as_datetime(
52                    None,
53                    *tu,
54                    false,
55                    false,
56                    None,
57                    &StringChunked::from_iter(std::iter::once("raise")),
58                )
59                .map(|ca| ca.into_column()),
60            (_, dt) => c.cast(dt),
61        }?;
62        if !ignore_errors && c.null_count() != out.null_count() {
63            handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
64        }
65        Ok(out)
66    };
67
68    if parallel {
69        let cols = POOL.install(|| {
70            df.columns()
71                .into_par_iter()
72                .map(|s| {
73                    if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
74                        cast_fn(s, fld)
75                    } else {
76                        Ok(s.clone())
77                    }
78                })
79                .collect::<PolarsResult<Vec<_>>>()
80        })?;
81        *df = unsafe { DataFrame::new_unchecked(df.height(), cols) }
82    } else {
83        // cast to the original dtypes in the schema
84        for fld in to_cast {
85            // field may not be projected
86            if let Some(idx) = df.get_column_index(fld.name()) {
87                df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
88            }
89        }
90    }
91    Ok(())
92}
93
94struct ReaderBytesAndDependents<'a> {
95    // Ensure lifetime dependents are dropped before `reader_bytes`, since their drop impls
96    // could access themselves, this is achieved by placing them before `reader_bytes`.
97    // SAFETY: This is lifetime bound to `reader_bytes`
98    compressed_reader: CompressedReader,
99    // SAFETY: This is lifetime bound to `reader_bytes`
100    leftover: Buffer<u8>,
101    _reader_bytes: ReaderBytes<'a>,
102}
103
104/// CSV file reader
105pub(crate) struct CoreReader<'a> {
106    reader_bytes: Option<ReaderBytesAndDependents<'a>>,
107
108    /// Explicit schema for the CSV file
109    schema: SchemaRef,
110    parse_options: CsvParseOptions,
111    /// Optional projection for which columns to load (zero-based column indices)
112    projection: Option<Vec<usize>>,
113    /// Current line number, used in error reporting
114    current_line: usize,
115    ignore_errors: bool,
116    n_rows: Option<usize>,
117    n_threads: Option<usize>,
118    null_values: Option<NullValuesCompiled>,
119    predicate: Option<Arc<dyn PhysicalIoExpr>>,
120    to_cast: Vec<Field>,
121    row_index: Option<RowIndex>,
122}
123
124impl fmt::Debug for CoreReader<'_> {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.debug_struct("Reader")
127            .field("schema", &self.schema)
128            .field("projection", &self.projection)
129            .field("current_line", &self.current_line)
130            .finish()
131    }
132}
133
134impl<'a> CoreReader<'a> {
135    #[allow(clippy::too_many_arguments)]
136    pub(crate) fn new(
137        reader_bytes: ReaderBytes<'a>,
138        parse_options: Arc<CsvParseOptions>,
139        n_rows: Option<usize>,
140        skip_rows: usize,
141        skip_lines: usize,
142        mut projection: Option<Vec<usize>>,
143        max_records: Option<usize>,
144        has_header: bool,
145        ignore_errors: bool,
146        schema: Option<SchemaRef>,
147        columns: Option<Arc<[PlSmallStr]>>,
148        n_threads: Option<usize>,
149        schema_overwrite: Option<SchemaRef>,
150        dtype_overwrite: Option<Arc<Vec<DataType>>>,
151        predicate: Option<Arc<dyn PhysicalIoExpr>>,
152        mut to_cast: Vec<Field>,
153        skip_rows_after_header: usize,
154        row_index: Option<RowIndex>,
155        raise_if_empty: bool,
156    ) -> PolarsResult<CoreReader<'a>> {
157        let separator = parse_options.separator;
158
159        #[cfg(feature = "decompress")]
160        let mut reader_bytes = reader_bytes;
161
162        if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
163            polars_bail!(
164                ComputeError: "cannot read compressed CSV file; \
165                compile with feature 'decompress'"
166            );
167        }
168        // We keep track of the inferred schema bool
169        // In case the file is compressed this schema inference is wrong and has to be done
170        // again after decompression.
171        #[cfg(feature = "decompress")]
172        {
173            let total_n_rows =
174                n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
175            if let Some(b) = decompress(
176                &reader_bytes,
177                total_n_rows,
178                separator,
179                parse_options.quote_char,
180                parse_options.eol_char,
181            ) {
182                reader_bytes = ReaderBytes::Owned(b.into());
183            }
184        }
185
186        let reader_slice = match &reader_bytes {
187            ReaderBytes::Borrowed(slice) => {
188                // SAFETY: The produced slice and derived slices MUST not live longer than
189                // `reader_bytes`. TODO use `scan_csv` to implement `read_csv`.
190                let ss = unsafe { SharedStorage::from_slice_unchecked(slice) };
191                Buffer::from_storage(ss)
192            },
193            ReaderBytes::Owned(slice) => slice.clone(),
194        };
195        let mut compressed_reader = CompressedReader::try_new(reader_slice)?;
196
197        let read_options = CsvReadOptions {
198            parse_options: parse_options.clone(),
199            n_rows,
200            skip_rows,
201            skip_lines,
202            projection: projection.clone().map(Arc::new),
203            has_header,
204            ignore_errors,
205            schema: schema.clone(),
206            columns: columns.clone(),
207            n_threads,
208            schema_overwrite,
209            dtype_overwrite: dtype_overwrite.clone(),
210            fields_to_cast: to_cast.clone(),
211            skip_rows_after_header,
212            row_index: row_index.clone(),
213            raise_if_empty,
214            infer_schema_length: max_records,
215            ..Default::default()
216        };
217
218        // Since this is also used to skip to the start, always call it.
219        let (inferred_schema, leftover) = read_until_start_and_infer_schema_from_compressed_reader(
220            &read_options,
221            None,
222            None,
223            &mut compressed_reader,
224        )?;
225
226        let mut schema = match schema {
227            Some(schema) => schema,
228            None => Arc::new(inferred_schema),
229        };
230        if let Some(dtypes) = dtype_overwrite {
231            polars_ensure!(
232                dtypes.len() <= schema.len(),
233                InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
234            );
235            let s = Arc::make_mut(&mut schema);
236            for (index, dt) in dtypes.iter().enumerate() {
237                s.set_dtype_at_index(index, dt.clone()).unwrap();
238            }
239        }
240
241        prepare_csv_schema(&mut schema, &mut to_cast)?;
242
243        // Create a null value for every column
244        let null_values = parse_options
245            .null_values
246            .as_ref()
247            .map(|nv| nv.clone().compile(&schema))
248            .transpose()?;
249
250        if let Some(cols) = columns {
251            let mut prj = Vec::with_capacity(cols.len());
252            for col in cols.as_ref() {
253                let i = schema.try_index_of(col)?;
254                prj.push(i);
255            }
256            projection = Some(prj);
257        }
258
259        Ok(CoreReader {
260            reader_bytes: Some(ReaderBytesAndDependents {
261                compressed_reader,
262                leftover,
263                _reader_bytes: reader_bytes,
264            }),
265            parse_options: (*parse_options).clone(),
266            schema,
267            projection,
268            current_line: usize::from(has_header),
269            ignore_errors,
270            n_rows,
271            n_threads,
272            null_values,
273            predicate,
274            to_cast,
275            row_index,
276        })
277    }
278
279    fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
280        // we also need to sort the projection to have predictable output.
281        // the `parse_lines` function expects this.
282        self.projection
283            .take()
284            .map(|mut v| {
285                v.sort_unstable();
286                if let Some(idx) = v.last() {
287                    polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
288                }
289                Ok(v)
290            })
291            .unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
292    }
293
294    fn read_chunk(
295        &self,
296        bytes: &[u8],
297        projection: &[usize],
298        bytes_offset: usize,
299        capacity: usize,
300        starting_point_offset: Option<usize>,
301        stop_at_nbytes: usize,
302    ) -> PolarsResult<DataFrame> {
303        let mut df = read_chunk(
304            bytes,
305            &self.parse_options,
306            self.schema.as_ref(),
307            self.ignore_errors,
308            projection,
309            bytes_offset,
310            capacity,
311            self.null_values.as_ref(),
312            usize::MAX,
313            stop_at_nbytes,
314            starting_point_offset,
315        )?;
316
317        cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
318        Ok(df)
319    }
320
321    // The code adheres to RFC 4180 in a strict sense, unless explicitly documented otherwise.
322    // Malformed CSV is common, see e.g. the use of lazy_quotes, whitespace and comments.
323    // In case malformed CSV is detected, a warning or an error will be issued.
324    // Not all malformed CSV will be detected, as that would impact performance.
325    fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
326        let projection = self.get_projection()?;
327
328        // An empty file with a schema should return an empty DataFrame with that schema
329        if bytes.is_empty() {
330            let mut df = if projection.len() == self.schema.len() {
331                DataFrame::empty_with_schema(self.schema.as_ref())
332            } else {
333                DataFrame::empty_with_schema(
334                    &projection
335                        .iter()
336                        .map(|&i| self.schema.get_at_index(i).unwrap())
337                        .map(|(name, dtype)| Field {
338                            name: name.clone(),
339                            dtype: dtype.clone(),
340                        })
341                        .collect::<Schema>(),
342                )
343            };
344
345            cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
346
347            if let Some(ref row_index) = self.row_index {
348                df.insert_column(0, Column::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
349            }
350            return Ok(df);
351        }
352
353        let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
354
355        // This is chosen by benchmarking on ny city trip csv dataset.
356        // We want small enough chunks such that threads start working as soon as possible
357        // But we also want them large enough, so that we have less chunks related overhead.
358        // We minimize chunks to 16 MB to still fit L3 cache.
359        //
360        // Width-aware adjustment: For wide data (many columns), per-chunk overhead
361        // (allocating column buffers) becomes significant. Each chunk must allocate
362        // O(n_cols) buffers, so total allocation overhead is O(n_chunks * n_cols).
363        // To keep this bounded, we limit n_chunks such that n_chunks * n_cols <= threshold.
364        // With threshold ~500K, this gives:
365        //   - 100 cols: up to 5000 chunks (no practical limit)
366        //   - 1000 cols: up to 500 chunks
367        //   - 10000 cols: up to 50 chunks
368        //   - 30000 cols: up to 16 chunks
369        let n_cols = projection.len();
370        // Empirically determined to balance allocation overhead and parallelism.
371        const ALLOCATION_BUDGET: usize = 500_000;
372        let max_chunks_for_width = ALLOCATION_BUDGET / n_cols.max(1);
373        let n_parts_hint = std::cmp::min(n_threads * 16, max_chunks_for_width.max(n_threads));
374        let chunk_size = std::cmp::min(bytes.len() / n_parts_hint.max(1), 16 * 1024 * 1024);
375
376        // Use a small min chunk size to catch failures in tests.
377        #[cfg(debug_assertions)]
378        let min_chunk_size = 64;
379        #[cfg(not(debug_assertions))]
380        let min_chunk_size = 1024 * 4;
381
382        let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
383        let mut total_bytes_offset = 0;
384
385        let results = Arc::new(Mutex::new(vec![]));
386        // We have to do this after parsing as there can be comments.
387        let total_line_count = &RelaxedCell::new_usize(0);
388
389        let counter = CountLines::new(
390            self.parse_options.quote_char,
391            self.parse_options.eol_char,
392            None,
393        );
394        let mut total_offset = 0;
395        let mut previous_total_offset = 0;
396        let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
397            && self.schema.iter_fields().any(|f| f.dtype().is_string());
398
399        POOL.scope(|s| {
400            // Pass 1: identify chunks for parallel processing (line parsing).
401            loop {
402                let b = unsafe { bytes.get_unchecked(total_offset..) };
403                if b.is_empty() {
404                    break;
405                }
406                debug_assert!(
407                    total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
408                );
409
410                // Count is the number of rows for the next chunk. In case of malformed CSV data,
411                // count may not be as expected.
412                let (count, position) = counter.find_next(b, &mut chunk_size);
413                debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
414
415                let (b, count) = if count == 0
416                    && unsafe {
417                        std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
418                    } {
419                    total_offset = bytes.len();
420                    let c = if is_comment_line(bytes, self.parse_options.comment_prefix.as_ref()) {
421                        0
422                    } else {
423                        1
424                    };
425                    (b, c)
426                } else {
427                    let end = total_offset + position + 1;
428                    let b = unsafe { bytes.get_unchecked(total_offset..end) };
429
430                    previous_total_offset = total_offset;
431                    total_offset = end;
432                    (b, count)
433                };
434
435                // Pass 2: process each individual chunk in parallel (field parsing)
436                if !b.is_empty() {
437                    let results = results.clone();
438                    let projection = projection.as_ref();
439                    let slf = &(*self);
440                    s.spawn(move |_| {
441                        if check_utf8 && !super::builder::validate_utf8(b) {
442                            let mut results = results.lock().unwrap();
443                            results.push((
444                                b.as_ptr() as usize,
445                                Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
446                            ));
447                            return;
448                        }
449
450                        let result = slf
451                            .read_chunk(b, projection, 0, count, Some(0), b.len())
452                            .and_then(|mut df| {
453                                // Check malformed
454                                if df.height() > count
455                                    || (df.height() < count
456                                        && slf.parse_options.comment_prefix.is_none())
457                                {
458                                    // Note: in case data is malformed, df.height() is more likely to be correct than count.
459                                    let msg = format!(
460                                        "CSV malformed: expected {} rows, \
461                                        actual {} rows, in chunk starting at \
462                                        byte offset {}, length {}",
463                                        count,
464                                        df.height(),
465                                        previous_total_offset,
466                                        b.len()
467                                    );
468                                    if slf.ignore_errors {
469                                        polars_warn!("{msg}");
470                                    } else {
471                                        polars_bail!(ComputeError: msg)
472                                    }
473                                }
474
475                                if slf.n_rows.is_some() {
476                                    total_line_count.fetch_add(df.height());
477                                }
478
479                                // We cannot use the line count as there can be comments in the lines so we must correct line counts later.
480                                if let Some(rc) = &slf.row_index {
481                                    // is first chunk
482                                    let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {
483                                        Some(rc.offset)
484                                    } else {
485                                        None
486                                    };
487
488                                    unsafe { df.with_row_index_mut(rc.name.clone(), offset) };
489                                };
490
491                                if let Some(predicate) = slf.predicate.as_ref() {
492                                    let s = predicate.evaluate_io(&df)?;
493                                    let mask = s.bool()?;
494                                    df = df.filter(mask)?;
495                                }
496                                Ok(df)
497                            });
498
499                        results.lock().unwrap().push((b.as_ptr() as usize, result));
500                    });
501
502                    // Check just after we spawned a chunk. That mean we processed all data up until
503                    // row count.
504                    if self.n_rows.is_some() && total_line_count.load() > self.n_rows.unwrap() {
505                        break;
506                    }
507                }
508                total_bytes_offset += b.len();
509            }
510        });
511
512        let mut results = std::mem::take(&mut *results.lock().unwrap());
513        results.sort_unstable_by_key(|k| k.0);
514        let mut dfs = results
515            .into_iter()
516            .map(|k| k.1)
517            .collect::<PolarsResult<Vec<_>>>()?;
518
519        if let Some(rc) = &self.row_index {
520            update_row_counts2(&mut dfs, rc.offset)
521        };
522        accumulate_dataframes_vertical(dfs)
523    }
524
525    /// Read the csv into a DataFrame. The predicate can come from a lazy physical plan.
526    pub fn finish(mut self) -> PolarsResult<DataFrame> {
527        let mut reader_bytes = self.reader_bytes.take().unwrap();
528        let (body_bytes, _) = reader_bytes
529            .compressed_reader
530            .read_next_slice(&reader_bytes.leftover, usize::MAX)?;
531
532        let mut df = self.parse_csv(&body_bytes)?;
533
534        // if multi-threaded the n_rows was probabilistically determined.
535        // Let's slice to correct number of rows if possible.
536        if let Some(n_rows) = self.n_rows {
537            if n_rows < df.height() {
538                df = df.slice(0, n_rows)
539            }
540        }
541        Ok(df)
542    }
543}
544
545#[allow(clippy::too_many_arguments)]
546pub fn read_chunk(
547    bytes: &[u8],
548    parse_options: &CsvParseOptions,
549    schema: &Schema,
550    ignore_errors: bool,
551    projection: &[usize],
552    bytes_offset_thread: usize,
553    capacity: usize,
554    null_values: Option<&NullValuesCompiled>,
555    chunk_size: usize,
556    stop_at_nbytes: usize,
557    starting_point_offset: Option<usize>,
558) -> PolarsResult<DataFrame> {
559    let mut read = bytes_offset_thread;
560    // There's an off-by-one error somewhere in the reading code, where it reads
561    // one more item than the requested capacity. Given the batch sizes are
562    // approximate (sometimes they're smaller), this isn't broken, but it does
563    // mean a bunch of extra allocation and copying. So we allocate a
564    // larger-by-one buffer so the size is more likely to be accurate.
565    let mut buffers = init_builders(
566        projection,
567        capacity + 1,
568        schema,
569        parse_options.quote_char,
570        parse_options.encoding,
571        parse_options.decimal_comma,
572    )?;
573
574    debug_assert!(projection.is_sorted());
575
576    let mut last_read = usize::MAX;
577    loop {
578        if read >= stop_at_nbytes || read == last_read {
579            break;
580        }
581        let local_bytes = &bytes[read..stop_at_nbytes];
582
583        last_read = read;
584        let offset = read + starting_point_offset.unwrap();
585        read += parse_lines(
586            local_bytes,
587            parse_options,
588            offset,
589            ignore_errors,
590            null_values,
591            projection,
592            &mut buffers,
593            chunk_size,
594            schema.len(),
595            schema,
596        )?;
597    }
598
599    let columns = buffers
600        .into_iter()
601        .map(|buf| buf.into_series().map(Column::from))
602        .collect::<PolarsResult<Vec<_>>>()?;
603    Ok(unsafe { DataFrame::new_unchecked_infer_height(columns) })
604}