polars_io/csv/read/
read_impl.rs

1pub(super) mod batched;
2
3use std::fmt;
4use std::sync::Mutex;
5
6use polars_core::POOL;
7use polars_core::prelude::*;
8use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
9#[cfg(feature = "polars-time")]
10use polars_time::prelude::*;
11use polars_utils::relaxed_cell::RelaxedCell;
12use rayon::prelude::*;
13
14use super::CsvParseOptions;
15use super::buffer::init_buffers;
16use super::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
17use super::parser::{
18    CountLines, SplitLines, is_comment_line, parse_lines, skip_bom, skip_line_ending,
19    skip_lines_naive, skip_this_line,
20};
21use super::reader::prepare_csv_schema;
22use super::schema_inference::infer_file_schema;
23#[cfg(feature = "decompress")]
24use super::utils::decompress;
25use crate::RowIndex;
26use crate::csv::read::parser::skip_this_line_naive;
27use crate::mmap::ReaderBytes;
28use crate::predicates::PhysicalIoExpr;
29use crate::utils::compression::SupportedCompression;
30use crate::utils::update_row_counts2;
31
32pub fn cast_columns(
33    df: &mut DataFrame,
34    to_cast: &[Field],
35    parallel: bool,
36    ignore_errors: bool,
37) -> PolarsResult<()> {
38    let cast_fn = |c: &Column, fld: &Field| {
39        let out = match (c.dtype(), fld.dtype()) {
40            #[cfg(feature = "temporal")]
41            (DataType::String, DataType::Date) => c
42                .str()
43                .unwrap()
44                .as_date(None, false)
45                .map(|ca| ca.into_column()),
46            #[cfg(feature = "temporal")]
47            (DataType::String, DataType::Time) => c
48                .str()
49                .unwrap()
50                .as_time(None, false)
51                .map(|ca| ca.into_column()),
52            #[cfg(feature = "temporal")]
53            (DataType::String, DataType::Datetime(tu, _)) => c
54                .str()
55                .unwrap()
56                .as_datetime(
57                    None,
58                    *tu,
59                    false,
60                    false,
61                    None,
62                    &StringChunked::from_iter(std::iter::once("raise")),
63                )
64                .map(|ca| ca.into_column()),
65            (_, dt) => c.cast(dt),
66        }?;
67        if !ignore_errors && c.null_count() != out.null_count() {
68            handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
69        }
70        Ok(out)
71    };
72
73    if parallel {
74        let cols = POOL.install(|| {
75            df.get_columns()
76                .into_par_iter()
77                .map(|s| {
78                    if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
79                        cast_fn(s, fld)
80                    } else {
81                        Ok(s.clone())
82                    }
83                })
84                .collect::<PolarsResult<Vec<_>>>()
85        })?;
86        *df = unsafe { DataFrame::new_no_checks(df.height(), cols) }
87    } else {
88        // cast to the original dtypes in the schema
89        for fld in to_cast {
90            // field may not be projected
91            if let Some(idx) = df.get_column_index(fld.name()) {
92                df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
93            }
94        }
95
96        df.clear_schema();
97    }
98    Ok(())
99}
100
101/// CSV file reader
102pub(crate) struct CoreReader<'a> {
103    reader_bytes: Option<ReaderBytes<'a>>,
104    /// Explicit schema for the CSV file
105    schema: SchemaRef,
106    parse_options: CsvParseOptions,
107    /// Optional projection for which columns to load (zero-based column indices)
108    projection: Option<Vec<usize>>,
109    /// Current line number, used in error reporting
110    current_line: usize,
111    ignore_errors: bool,
112    skip_lines: usize,
113    skip_rows_before_header: usize,
114    // after the header, we need to take embedded lines into account
115    skip_rows_after_header: usize,
116    n_rows: Option<usize>,
117    n_threads: Option<usize>,
118    has_header: bool,
119    chunk_size: usize,
120    null_values: Option<NullValuesCompiled>,
121    predicate: Option<Arc<dyn PhysicalIoExpr>>,
122    to_cast: Vec<Field>,
123    row_index: Option<RowIndex>,
124}
125
126impl fmt::Debug for CoreReader<'_> {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        f.debug_struct("Reader")
129            .field("schema", &self.schema)
130            .field("projection", &self.projection)
131            .field("current_line", &self.current_line)
132            .finish()
133    }
134}
135
136impl<'a> CoreReader<'a> {
137    #[allow(clippy::too_many_arguments)]
138    pub(crate) fn new(
139        reader_bytes: ReaderBytes<'a>,
140        parse_options: Arc<CsvParseOptions>,
141        n_rows: Option<usize>,
142        skip_rows: usize,
143        skip_lines: usize,
144        mut projection: Option<Vec<usize>>,
145        max_records: Option<usize>,
146        has_header: bool,
147        ignore_errors: bool,
148        schema: Option<SchemaRef>,
149        columns: Option<Arc<[PlSmallStr]>>,
150        n_threads: Option<usize>,
151        schema_overwrite: Option<SchemaRef>,
152        dtype_overwrite: Option<Arc<Vec<DataType>>>,
153        chunk_size: usize,
154        predicate: Option<Arc<dyn PhysicalIoExpr>>,
155        mut to_cast: Vec<Field>,
156        skip_rows_after_header: usize,
157        row_index: Option<RowIndex>,
158        raise_if_empty: bool,
159    ) -> PolarsResult<CoreReader<'a>> {
160        let separator = parse_options.separator;
161
162        #[cfg(feature = "decompress")]
163        let mut reader_bytes = reader_bytes;
164
165        if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
166            polars_bail!(
167                ComputeError: "cannot read compressed CSV file; \
168                compile with feature 'decompress'"
169            );
170        }
171        // We keep track of the inferred schema bool
172        // In case the file is compressed this schema inference is wrong and has to be done
173        // again after decompression.
174        #[cfg(feature = "decompress")]
175        {
176            let total_n_rows =
177                n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
178            if let Some(b) = decompress(
179                &reader_bytes,
180                total_n_rows,
181                separator,
182                parse_options.quote_char,
183                parse_options.eol_char,
184            ) {
185                reader_bytes = ReaderBytes::Owned(b.into());
186            }
187        }
188
189        let mut schema = match schema {
190            Some(schema) => schema,
191            None => {
192                let (inferred_schema, _, _) = infer_file_schema(
193                    &reader_bytes,
194                    &parse_options,
195                    max_records,
196                    has_header,
197                    schema_overwrite.as_deref(),
198                    skip_rows,
199                    skip_lines,
200                    skip_rows_after_header,
201                    raise_if_empty,
202                )?;
203                Arc::new(inferred_schema)
204            },
205        };
206        if let Some(dtypes) = dtype_overwrite {
207            polars_ensure!(
208                dtypes.len() <= schema.len(),
209                InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
210            );
211            let s = Arc::make_mut(&mut schema);
212            for (index, dt) in dtypes.iter().enumerate() {
213                s.set_dtype_at_index(index, dt.clone()).unwrap();
214            }
215        }
216
217        prepare_csv_schema(&mut schema, &mut to_cast)?;
218
219        // Create a null value for every column
220        let null_values = parse_options
221            .null_values
222            .as_ref()
223            .map(|nv| nv.clone().compile(&schema))
224            .transpose()?;
225
226        if let Some(cols) = columns {
227            let mut prj = Vec::with_capacity(cols.len());
228            for col in cols.as_ref() {
229                let i = schema.try_index_of(col)?;
230                prj.push(i);
231            }
232            projection = Some(prj);
233        }
234
235        Ok(CoreReader {
236            reader_bytes: Some(reader_bytes),
237            parse_options: (*parse_options).clone(),
238            schema,
239            projection,
240            current_line: usize::from(has_header),
241            ignore_errors,
242            skip_lines,
243            skip_rows_before_header: skip_rows,
244            skip_rows_after_header,
245            n_rows,
246            n_threads,
247            has_header,
248            chunk_size,
249            null_values,
250            predicate,
251            to_cast,
252            row_index,
253        })
254    }
255
256    fn find_starting_point<'b>(
257        &self,
258        bytes: &'b [u8],
259        quote_char: Option<u8>,
260        eol_char: u8,
261    ) -> PolarsResult<(&'b [u8], Option<usize>)> {
262        let i = find_starting_point(
263            bytes,
264            quote_char,
265            eol_char,
266            self.schema.len(),
267            self.skip_lines,
268            self.skip_rows_before_header,
269            self.skip_rows_after_header,
270            self.parse_options.comment_prefix.as_ref(),
271            self.has_header,
272        )?;
273
274        Ok((&bytes[i..], (i <= bytes.len()).then_some(i)))
275    }
276
277    fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
278        // we also need to sort the projection to have predictable output.
279        // the `parse_lines` function expects this.
280        self.projection
281            .take()
282            .map(|mut v| {
283                v.sort_unstable();
284                if let Some(idx) = v.last() {
285                    polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
286                }
287                Ok(v)
288            })
289            .unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
290    }
291
292    fn read_chunk(
293        &self,
294        bytes: &[u8],
295        projection: &[usize],
296        bytes_offset: usize,
297        capacity: usize,
298        starting_point_offset: Option<usize>,
299        stop_at_nbytes: usize,
300    ) -> PolarsResult<DataFrame> {
301        let mut df = read_chunk(
302            bytes,
303            &self.parse_options,
304            self.schema.as_ref(),
305            self.ignore_errors,
306            projection,
307            bytes_offset,
308            capacity,
309            self.null_values.as_ref(),
310            usize::MAX,
311            stop_at_nbytes,
312            starting_point_offset,
313        )?;
314
315        cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
316        Ok(df)
317    }
318
319    // The code adheres to RFC 4180 in a strict sense, unless explicitly documented otherwise.
320    // Malformed CSV is common, see e.g. the use of lazy_quotes, whitespace and comments.
321    // In case malformed CSV is detected, a warning or an error will be issued.
322    // Not all malformed CSV will be detected, as that would impact performance.
323    fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
324        let (bytes, _) = self.find_starting_point(
325            bytes,
326            self.parse_options.quote_char,
327            self.parse_options.eol_char,
328        )?;
329
330        let projection = self.get_projection()?;
331
332        // An empty file with a schema should return an empty DataFrame with that schema
333        if bytes.is_empty() {
334            let mut df = if projection.len() == self.schema.len() {
335                DataFrame::empty_with_schema(self.schema.as_ref())
336            } else {
337                DataFrame::empty_with_schema(
338                    &projection
339                        .iter()
340                        .map(|&i| self.schema.get_at_index(i).unwrap())
341                        .map(|(name, dtype)| Field {
342                            name: name.clone(),
343                            dtype: dtype.clone(),
344                        })
345                        .collect::<Schema>(),
346                )
347            };
348
349            cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
350
351            if let Some(ref row_index) = self.row_index {
352                df.insert_column(0, Series::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
353            }
354            return Ok(df);
355        }
356
357        let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
358
359        // This is chosen by benchmarking on ny city trip csv dataset.
360        // We want small enough chunks such that threads start working as soon as possible
361        // But we also want them large enough, so that we have less chunks related overhead, but
362        // We minimize chunks to 16 MB to still fit L3 cache.
363        let n_parts_hint = n_threads * 16;
364        let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
365
366        // Use a small min chunk size to catch failures in tests.
367        #[cfg(debug_assertions)]
368        let min_chunk_size = 64;
369        #[cfg(not(debug_assertions))]
370        let min_chunk_size = 1024 * 4;
371
372        let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
373        let mut total_bytes_offset = 0;
374
375        let results = Arc::new(Mutex::new(vec![]));
376        // We have to do this after parsing as there can be comments.
377        let total_line_count = &RelaxedCell::new_usize(0);
378
379        #[cfg(not(target_family = "wasm"))]
380        let pool;
381        #[cfg(not(target_family = "wasm"))]
382        let pool = if n_threads == POOL.current_num_threads() {
383            &POOL
384        } else {
385            pool = rayon::ThreadPoolBuilder::new()
386                .num_threads(n_threads)
387                .build()
388                .map_err(|_| polars_err!(ComputeError: "could not spawn threads"))?;
389            &pool
390        };
391        #[cfg(target_family = "wasm")]
392        let pool = &POOL;
393
394        let counter = CountLines::new(self.parse_options.quote_char, self.parse_options.eol_char);
395        let mut total_offset = 0;
396        let mut previous_total_offset = 0;
397        let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
398            && self.schema.iter_fields().any(|f| f.dtype().is_string());
399
400        pool.scope(|s| {
401            // Pass 1: identify chunks for parallel processing (line parsing).
402            loop {
403                let b = unsafe { bytes.get_unchecked(total_offset..) };
404                if b.is_empty() {
405                    break;
406                }
407                debug_assert!(
408                    total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
409                );
410
411                // Count is the number of rows for the next chunk. In case of malformed CSV data,
412                // count may not be as expected.
413                let (count, position) = counter.find_next(b, &mut chunk_size);
414                debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
415
416                let (b, count) = if count == 0
417                    && unsafe {
418                        std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
419                    } {
420                    total_offset = bytes.len();
421                    (b, 1)
422                } else {
423                    if count == 0 {
424                        chunk_size *= 2;
425                        continue;
426                    }
427
428                    let end = total_offset + position + 1;
429                    let b = unsafe { bytes.get_unchecked(total_offset..end) };
430
431                    previous_total_offset = total_offset;
432                    total_offset = end;
433                    (b, count)
434                };
435
436                // Pass 2: process each individual chunk in parallel (field parsing)
437                if !b.is_empty() {
438                    let results = results.clone();
439                    let projection = projection.as_ref();
440                    let slf = &(*self);
441                    s.spawn(move |_| {
442                        if check_utf8 && !super::buffer::validate_utf8(b) {
443                            let mut results = results.lock().unwrap();
444                            results.push((
445                                b.as_ptr() as usize,
446                                Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
447                            ));
448                            return;
449                        }
450
451                        let result = slf
452                            .read_chunk(b, projection, 0, count, Some(0), b.len())
453                            .and_then(|mut df| {
454
455                                // Check malformed
456                                if df.height() > count || (df.height() < count && slf.parse_options.comment_prefix.is_none()) {
457                                    // Note: in case data is malformed, df.height() is more likely to be correct than count.
458                                    let msg = format!("CSV malformed: expected {} rows, actual {} rows, in chunk starting at byte offset {}, length {}",
459                                        count, df.height(), previous_total_offset, b.len());
460                                    if slf.ignore_errors {
461                                        polars_warn!(msg);
462                                    } else {
463                                        polars_bail!(ComputeError: msg);
464                                    }
465                                }
466
467                                if slf.n_rows.is_some() {
468                                    total_line_count.fetch_add(df.height());
469                                }
470
471                                // We cannot use the line count as there can be comments in the lines so we must correct line counts later.
472                                if let Some(rc) = &slf.row_index {
473                                    // is first chunk
474                                    let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {
475                                        Some(rc.offset)
476                                    } else {
477                                        None
478                                    };
479
480                                    unsafe { df.with_row_index_mut(rc.name.clone(), offset) };
481                                };
482
483                                if let Some(predicate) = slf.predicate.as_ref() {
484                                    let s = predicate.evaluate_io(&df)?;
485                                    let mask = s.bool()?;
486                                    df = df.filter(mask)?;
487                                }
488                                Ok(df)
489                            });
490
491                        results.lock().unwrap().push((b.as_ptr() as usize, result));
492                    });
493
494                    // Check just after we spawned a chunk. That mean we processed all data up until
495                    // row count.
496                    if self.n_rows.is_some()
497                        && total_line_count.load() > self.n_rows.unwrap()
498                    {
499                        break;
500                    }
501                }
502                total_bytes_offset += b.len();
503            }
504        });
505        let mut results = std::mem::take(&mut *results.lock().unwrap());
506        results.sort_unstable_by_key(|k| k.0);
507        let mut dfs = results
508            .into_iter()
509            .map(|k| k.1)
510            .collect::<PolarsResult<Vec<_>>>()?;
511
512        if let Some(rc) = &self.row_index {
513            update_row_counts2(&mut dfs, rc.offset)
514        };
515        accumulate_dataframes_vertical(dfs)
516    }
517
518    /// Read the csv into a DataFrame. The predicate can come from a lazy physical plan.
519    pub fn finish(mut self) -> PolarsResult<DataFrame> {
520        let reader_bytes = self.reader_bytes.take().unwrap();
521        let mut df = self.parse_csv(&reader_bytes)?;
522
523        // if multi-threaded the n_rows was probabilistically determined.
524        // Let's slice to correct number of rows if possible.
525        if let Some(n_rows) = self.n_rows {
526            if n_rows < df.height() {
527                df = df.slice(0, n_rows)
528            }
529        }
530        Ok(df)
531    }
532}
533
534#[allow(clippy::too_many_arguments)]
535pub fn read_chunk(
536    bytes: &[u8],
537    parse_options: &CsvParseOptions,
538    schema: &Schema,
539    ignore_errors: bool,
540    projection: &[usize],
541    bytes_offset_thread: usize,
542    capacity: usize,
543    null_values: Option<&NullValuesCompiled>,
544    chunk_size: usize,
545    stop_at_nbytes: usize,
546    starting_point_offset: Option<usize>,
547) -> PolarsResult<DataFrame> {
548    let mut read = bytes_offset_thread;
549    // There's an off-by-one error somewhere in the reading code, where it reads
550    // one more item than the requested capacity. Given the batch sizes are
551    // approximate (sometimes they're smaller), this isn't broken, but it does
552    // mean a bunch of extra allocation and copying. So we allocate a
553    // larger-by-one buffer so the size is more likely to be accurate.
554    let mut buffers = init_buffers(
555        projection,
556        capacity + 1,
557        schema,
558        parse_options.quote_char,
559        parse_options.encoding,
560        parse_options.decimal_comma,
561    )?;
562
563    debug_assert!(projection.is_sorted());
564
565    let mut last_read = usize::MAX;
566    loop {
567        if read >= stop_at_nbytes || read == last_read {
568            break;
569        }
570        let local_bytes = &bytes[read..stop_at_nbytes];
571
572        last_read = read;
573        let offset = read + starting_point_offset.unwrap();
574        read += parse_lines(
575            local_bytes,
576            parse_options,
577            offset,
578            ignore_errors,
579            null_values,
580            projection,
581            &mut buffers,
582            chunk_size,
583            schema.len(),
584            schema,
585        )?;
586    }
587
588    let columns = buffers
589        .into_iter()
590        .map(|buf| buf.into_series().map(Column::from))
591        .collect::<PolarsResult<Vec<_>>>()?;
592    Ok(unsafe { DataFrame::new_no_checks_height_from_first(columns) })
593}
594
595#[allow(clippy::too_many_arguments)]
596pub fn find_starting_point(
597    mut bytes: &[u8],
598    quote_char: Option<u8>,
599    eol_char: u8,
600    schema_len: usize,
601    skip_lines: usize,
602    skip_rows_before_header: usize,
603    skip_rows_after_header: usize,
604    comment_prefix: Option<&CommentPrefix>,
605    has_header: bool,
606) -> PolarsResult<usize> {
607    let full_len = bytes.len();
608    let starting_point_offset = bytes.as_ptr() as usize;
609
610    bytes = if skip_lines > 0 {
611        polars_ensure!(skip_rows_before_header == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set");
612        skip_lines_naive(bytes, eol_char, skip_lines)
613    } else {
614        // Skip utf8 byte-order-mark (BOM)
615        bytes = skip_bom(bytes);
616
617        // \n\n can be a empty string row of a single column
618        // in other cases we skip it.
619        if schema_len > 1 {
620            bytes = skip_line_ending(bytes, eol_char)
621        }
622        bytes
623    };
624
625    // skip 'n' leading rows
626    if skip_rows_before_header > 0 {
627        let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
628        let mut current_line = &bytes[..0];
629
630        for _ in 0..skip_rows_before_header {
631            current_line = split_lines
632                .next()
633                .ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
634        }
635
636        current_line = split_lines
637            .next()
638            .unwrap_or(&current_line[current_line.len()..]);
639        bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
640    }
641
642    // skip lines that are comments
643    while is_comment_line(bytes, comment_prefix) {
644        bytes = skip_this_line_naive(bytes, eol_char);
645    }
646
647    // skip header row
648    if has_header {
649        bytes = skip_this_line(bytes, quote_char, eol_char);
650    }
651    // skip 'n' rows following the header
652    if skip_rows_after_header > 0 {
653        let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
654        let mut current_line = &bytes[..0];
655
656        for _ in 0..skip_rows_after_header {
657            current_line = split_lines
658                .next()
659                .ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
660        }
661
662        current_line = split_lines
663            .next()
664            .unwrap_or(&current_line[current_line.len()..]);
665        bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
666    }
667
668    Ok(
669        // Some of the functions we call may return `&'static []` instead of
670        // slices of `&bytes[..]`.
671        if bytes.is_empty() {
672            full_len
673        } else {
674            bytes.as_ptr() as usize - starting_point_offset
675        },
676    )
677}