polars_io/csv/read/
streaming.rs

1use std::cmp;
2use std::iter::Iterator;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use polars_buffer::Buffer;
7use polars_core::prelude::Schema;
8use polars_core::schema::SchemaRef;
9use polars_error::{PolarsResult, polars_bail, polars_ensure};
10
11use crate::csv::read::schema_inference::infer_file_schema_impl;
12use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
13use crate::prelude::{CsvParseOptions, CsvReadOptions};
14use crate::utils::compression::CompressedReader;
15
16pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
17
18/// Reads bytes from `reader` until the CSV starting point is reached depending on the options.
19///
20/// Returns the inferred schema and leftover bytes not yet consumed, which may be empty. The
21/// leftover bytes + `reader.read_next_slice` is guaranteed to start at first real content row.
22///
23/// `inspect_first_content_row_fn` allows looking at the first content row, this is where parsing
24/// will start. Beware even if the function is provided it's *not* guaranteed that the returned
25/// value will be `Some`, since it the CSV may be incomplete.
26///
27/// The reading is done in an iterative streaming fashion
28///
29/// This function isn't perf critical but would increase binary-size so don't inline it.
30#[inline(never)]
31pub fn read_until_start_and_infer_schema(
32    options: &CsvReadOptions,
33    projected_schema: Option<SchemaRef>,
34    mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
35    reader: &mut CompressedReader,
36) -> PolarsResult<(Schema, Buffer<u8>)> {
37    // It's better to be above than below here.
38    const ESTIMATED_BYTES_PER_ROW: usize = 200;
39
40    #[derive(Copy, Clone)]
41    enum State {
42        // Ordered so that all states only happen after the ones before it.
43        SkipEmpty,
44        SkipRowsBeforeHeader(usize),
45        SkipHeader(bool),
46        SkipRowsAfterHeader(usize),
47        ContentInspect,
48        InferCollect,
49        Done,
50    }
51
52    polars_ensure!(
53        !(options.skip_lines != 0 && options.skip_rows != 0),
54        InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
55    );
56
57    // We have to treat skip_lines differently since the lines it skips may not follow regular CSV
58    // quote escape rules.
59    let prev_leftover = skip_lines_naive(
60        options.parse_options.eol_char,
61        options.skip_lines,
62        options.raise_if_empty,
63        reader,
64    )?;
65
66    let mut state = if options.has_header {
67        State::SkipEmpty
68    } else if options.skip_lines != 0 {
69        // skip_lines shouldn't skip extra comments before the header, so directly go to SkipHeader
70        // state.
71        State::SkipHeader(false)
72    } else {
73        State::SkipRowsBeforeHeader(options.skip_rows)
74    };
75
76    let comment_prefix = options.parse_options.comment_prefix.as_ref();
77    let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
78
79    let mut header_line = None;
80    let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or_else(|| {
81        reader
82            .total_len_estimate()
83            .saturating_div(ESTIMATED_BYTES_PER_ROW)
84    }));
85
86    // In the compressed case `reader.read_next_slice` has to copy the previous leftover into a new
87    // `Vec` which would lead to quadratic copying if we don't factor in `infer_schema_length` into
88    // the initial read size. We have to retain the row memory for schema inference and also for
89    // actual morsel generation. If `infer_schema_length` is set to `None` we will have to read the
90    // full input anyway so we can do so once and avoid re-copying.
91    let initial_read_size = options
92        .infer_schema_length
93        .map(|isl| {
94            cmp::max(
95                CompressedReader::initial_read_size(),
96                isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
97            )
98        })
99        .unwrap_or(usize::MAX);
100
101    let leftover = for_each_line_from_reader(
102        &options.parse_options,
103        true,
104        prev_leftover,
105        initial_read_size,
106        reader,
107        |mem_slice_line| {
108            let line = &*mem_slice_line;
109
110            let done = loop {
111                match &mut state {
112                    State::SkipEmpty => {
113                        if line.is_empty() || line == b"\r" {
114                            break LineUse::ConsumeDiscard;
115                        }
116
117                        state = State::SkipRowsBeforeHeader(options.skip_rows);
118                    },
119                    State::SkipRowsBeforeHeader(remaining) => {
120                        let is_comment = is_comment_line(line, comment_prefix);
121
122                        if *remaining == 0 && !is_comment {
123                            state = State::SkipHeader(false);
124                            continue;
125                        }
126
127                        *remaining -= !is_comment as usize;
128                        break LineUse::ConsumeDiscard;
129                    },
130                    State::SkipHeader(did_skip) => {
131                        if !options.has_header || *did_skip {
132                            state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
133                            continue;
134                        }
135
136                        header_line = Some(mem_slice_line.clone());
137                        *did_skip = true;
138                        break LineUse::ConsumeDiscard;
139                    },
140                    State::SkipRowsAfterHeader(remaining) => {
141                        let is_comment = is_comment_line(line, comment_prefix);
142
143                        if *remaining == 0 && !is_comment {
144                            state = State::ContentInspect;
145                            continue;
146                        }
147
148                        *remaining -= !is_comment as usize;
149                        break LineUse::ConsumeDiscard;
150                    },
151                    State::ContentInspect => {
152                        if let Some(func) = &mut inspect_first_content_row_fn {
153                            func(line);
154                        }
155
156                        state = State::InferCollect;
157                    },
158                    State::InferCollect => {
159                        if !is_comment_line(line, comment_prefix) {
160                            content_lines.push(mem_slice_line.clone());
161                            if content_lines.len() >= infer_schema_length {
162                                state = State::Done;
163                                continue;
164                            }
165                        }
166
167                        break LineUse::ConsumeKeep;
168                    },
169                    State::Done => {
170                        break LineUse::Done;
171                    },
172                }
173            };
174
175            Ok(done)
176        },
177    )?;
178
179    let infer_all_as_str = infer_schema_length == 0;
180
181    let inferred_schema = infer_schema(
182        &header_line,
183        &content_lines,
184        infer_all_as_str,
185        options,
186        projected_schema,
187    )?;
188
189    Ok((inferred_schema, leftover))
190}
191
192enum LineUse {
193    ConsumeDiscard,
194    ConsumeKeep,
195    Done,
196}
197
198/// Iterate over valid CSV lines produced by reader.
199///
200/// Returning `ConsumeDiscard` after `ConsumeKeep` is a logic error, since a segmented `Buffer`
201/// can't be constructed.
202fn for_each_line_from_reader(
203    parse_options: &CsvParseOptions,
204    is_file_start: bool,
205    mut prev_leftover: Buffer<u8>,
206    initial_read_size: usize,
207    reader: &mut CompressedReader,
208    mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
209) -> PolarsResult<Buffer<u8>> {
210    let mut is_first_line = is_file_start;
211
212    let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_CHUNK_SIZE")
213        .map(|x| {
214            x.parse::<NonZeroUsize>()
215                .unwrap_or_else(|_| {
216                    panic!("invalid value for POLARS_FORCE_CSV_INFER_CHUNK_SIZE: {x}")
217                })
218                .get()
219        })
220        .ok();
221
222    let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
223    let mut retain_offset = None;
224
225    loop {
226        let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
227        if slice.is_empty() {
228            return Ok(Buffer::new());
229        }
230
231        if is_first_line {
232            is_first_line = false;
233            const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
234            if slice.get(0..3) == UTF8_BOM_MARKER {
235                slice = slice.sliced(3..);
236            }
237        }
238
239        let line_to_sub_slice = |line: &[u8]| {
240            let start = line.as_ptr() as usize - slice.as_ptr() as usize;
241            slice.clone().sliced(start..(start + line.len()))
242        };
243
244        // When reading a CSV with `has_header=False` we need to read up to `infer_schema_length` lines, but we only want to decompress the input once, so we grow a `Buffer` that will be returned as leftover.
245        let effective_slice = if let Some(offset) = retain_offset {
246            slice.clone().sliced(offset..)
247        } else {
248            slice.clone()
249        };
250
251        let mut lines = SplitLines::new(
252            &effective_slice,
253            parse_options.quote_char,
254            parse_options.eol_char,
255            parse_options.comment_prefix.as_ref(),
256        );
257        let Some(mut prev_line) = lines.next() else {
258            read_size = read_size.saturating_mul(2);
259            prev_leftover = slice;
260            continue;
261        };
262
263        let mut should_ret = false;
264
265        // The last line in `SplitLines` may be incomplete if `slice` ends before the file does, so
266        // we iterate everything except the last line.
267        for next_line in lines {
268            match line_fn(line_to_sub_slice(prev_line))? {
269                LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
270                LineUse::ConsumeKeep => {
271                    if retain_offset.is_none() {
272                        let retain_start_offset =
273                            prev_line.as_ptr() as usize - slice.as_ptr() as usize;
274                        prev_leftover = slice.clone().sliced(retain_start_offset..);
275                        retain_offset = Some(0);
276                    }
277                },
278                LineUse::Done => {
279                    should_ret = true;
280                    break;
281                },
282            }
283            prev_line = next_line;
284        }
285
286        let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
287
288        // EOF file reached, the last line will have no continuation on the next call to
289        // `read_next_slice`.
290        if bytes_read < read_size {
291            match line_fn(line_to_sub_slice(prev_line))? {
292                LineUse::ConsumeDiscard => {
293                    debug_assert!(retain_offset.is_none());
294                    unconsumed_offset += prev_line.len();
295                    if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
296                        unconsumed_offset += 1;
297                    }
298                },
299                LineUse::ConsumeKeep | LineUse::Done => (),
300            }
301            should_ret = true;
302        }
303
304        if let Some(offset) = &mut retain_offset {
305            if *offset == 0 {
306                // `unconsumed_offset` was computed with the full `slice` as base reference
307                // compensate retained offset.
308                *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
309            } else {
310                prev_leftover = slice;
311                *offset += unconsumed_offset;
312            }
313        } else {
314            // Since `read_next_slice` has to copy the leftover bytes in the decompression case,
315            // it's more efficient to hand in as little as possible.
316            prev_leftover = slice.sliced(unconsumed_offset..);
317        }
318
319        if should_ret {
320            return Ok(prev_leftover);
321        }
322
323        if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {
324            read_size *= 4;
325        }
326    }
327}
328
329fn skip_lines_naive(
330    eol_char: u8,
331    skip_lines: usize,
332    raise_if_empty: bool,
333    reader: &mut CompressedReader,
334) -> PolarsResult<Buffer<u8>> {
335    let mut prev_leftover = Buffer::new();
336
337    if skip_lines == 0 {
338        return Ok(prev_leftover);
339    }
340
341    let mut remaining = skip_lines;
342    let mut read_size = CompressedReader::initial_read_size();
343
344    loop {
345        let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
346        let mut bytes: &[u8] = &slice;
347
348        'inner: loop {
349            let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
350                read_size = read_size.saturating_mul(2);
351                break 'inner;
352            };
353            pos = cmp::min(pos + 1, bytes.len());
354
355            bytes = &bytes[pos..];
356            remaining -= 1;
357
358            if remaining == 0 {
359                let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
360                prev_leftover = slice.sliced(unconsumed_offset..);
361                return Ok(prev_leftover);
362            }
363        }
364
365        if bytes_read == 0 {
366            if raise_if_empty {
367                polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
368            } else {
369                return Ok(Buffer::new());
370            }
371        }
372
373        // No need to search for naive eol twice in the leftover.
374        prev_leftover = Buffer::new();
375
376        if read_size < CompressedReader::ideal_read_size() {
377            read_size *= 4;
378        }
379    }
380}
381
382fn infer_schema(
383    header_line: &Option<Buffer<u8>>,
384    content_lines: &[Buffer<u8>],
385    infer_all_as_str: bool,
386    options: &CsvReadOptions,
387    projected_schema: Option<SchemaRef>,
388) -> PolarsResult<Schema> {
389    let has_no_inference_data = if options.has_header {
390        header_line.is_none()
391    } else {
392        content_lines.is_empty()
393    };
394
395    if options.raise_if_empty && has_no_inference_data {
396        polars_bail!(NoData: "empty CSV");
397    }
398
399    let mut inferred_schema = if has_no_inference_data {
400        Schema::default()
401    } else {
402        infer_file_schema_impl(
403            header_line,
404            content_lines,
405            infer_all_as_str,
406            &options.parse_options,
407            options.schema_overwrite.as_deref(),
408        )
409    };
410
411    if let Some(schema) = &options.schema {
412        // Note: User can provide schema with more columns, they will simply
413        // be projected as NULL.
414        // TODO: Should maybe expose a missing_columns parameter to the API for this.
415        if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
416            polars_bail!(
417                SchemaMismatch:
418                "provided schema does not match number of columns in file ({} != {} in file)",
419                schema.len(),
420                inferred_schema.len(),
421            );
422        }
423
424        if options.parse_options.truncate_ragged_lines {
425            inferred_schema = Arc::unwrap_or_clone(schema.clone());
426        } else {
427            inferred_schema = schema
428                .iter_names()
429                .zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
430                .map(|(name, dtype)| (name.clone(), dtype))
431                .collect();
432        }
433    }
434
435    if let Some(dtypes) = options.dtype_overwrite.as_deref() {
436        for (i, dtype) in dtypes.iter().enumerate() {
437            inferred_schema.set_dtype_at_index(i, dtype.clone());
438        }
439    }
440
441    // TODO: We currently always override with the projected dtype, but this may cause issues e.g.
442    // with temporal types. This can be improved to better choose between the 2 dtypes.
443    if let Some(projected_schema) = projected_schema {
444        for (name, inferred_dtype) in inferred_schema.iter_mut() {
445            if let Some(projected_dtype) = projected_schema.get(name) {
446                *inferred_dtype = projected_dtype.clone();
447            }
448        }
449    }
450
451    Ok(inferred_schema)
452}