polars_io/csv/read/
streaming.rs

1use std::cmp;
2use std::iter::Iterator;
3use std::sync::Arc;
4
5use polars_core::prelude::Schema;
6use polars_core::schema::SchemaRef;
7use polars_error::{PolarsResult, polars_bail, polars_ensure};
8use polars_utils::mmap::MemSlice;
9
10use crate::csv::read::schema_inference::infer_file_schema_impl;
11use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
12use crate::prelude::{CsvParseOptions, CsvReadOptions};
13use crate::utils::compression::CompressedReader;
14
15pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
16
17/// Reads bytes from `reader` until the CSV starting point is reached depending on the options.
18///
19/// Returns the inferred schema and leftover bytes not yet consumed, which may be empty. The
20/// leftover bytes + `reader.read_next_slice` is guaranteed to start at first real content row.
21///
22/// `inspect_first_content_row_fn` allows looking at the first content row, this is where parsing
23/// will start. Beware even if the function is provided it's *not* guaranteed that the returned
24/// value will be `Some`, since it the CSV may be incomplete.
25///
26/// The reading is done in an iterative streaming fashion
27///
28/// This function isn't perf critical but would increase binary-size so don't inline it.
29#[inline(never)]
30pub fn read_until_start_and_infer_schema(
31    options: &CsvReadOptions,
32    projected_schema: Option<SchemaRef>,
33    mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
34    reader: &mut CompressedReader,
35) -> PolarsResult<(Schema, MemSlice)> {
36    #[derive(Copy, Clone)]
37    enum State {
38        // Ordered so that all states only happen after the ones before it.
39        SkipEmpty,
40        SkipRowsBeforeHeader(usize),
41        SkipHeader(bool),
42        SkipRowsAfterHeader(usize),
43        ContentInspect,
44        InferCollect,
45        Done,
46    }
47
48    polars_ensure!(
49        !(options.skip_lines != 0 && options.skip_rows != 0),
50        InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
51    );
52
53    // We have to treat skip_lines differently since the lines it skips may not follow regular CSV
54    // quote escape rules.
55    let prev_leftover = skip_lines_naive(
56        options.parse_options.eol_char,
57        options.skip_lines,
58        options.raise_if_empty,
59        reader,
60    )?;
61
62    let mut state = if options.has_header {
63        State::SkipEmpty
64    } else if options.skip_lines != 0 {
65        // skip_lines shouldn't skip extra comments before the header, so directly go to SkipHeader
66        // state.
67        State::SkipHeader(false)
68    } else {
69        State::SkipRowsBeforeHeader(options.skip_rows)
70    };
71
72    let comment_prefix = options.parse_options.comment_prefix.as_ref();
73    let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
74
75    let mut header_line = None;
76    let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or(256));
77
78    let leftover = for_each_line_from_reader(
79        &options.parse_options,
80        true,
81        prev_leftover,
82        reader,
83        |mem_slice_line| {
84            let line = &*mem_slice_line;
85
86            let done = loop {
87                match &mut state {
88                    State::SkipEmpty => {
89                        if line.is_empty() || line == b"\r" {
90                            break LineUse::ConsumeDiscard;
91                        }
92
93                        state = State::SkipRowsBeforeHeader(options.skip_rows);
94                    },
95                    State::SkipRowsBeforeHeader(remaining) => {
96                        let is_comment = is_comment_line(line, comment_prefix);
97
98                        if *remaining == 0 && !is_comment {
99                            state = State::SkipHeader(false);
100                            continue;
101                        }
102
103                        *remaining -= !is_comment as usize;
104                        break LineUse::ConsumeDiscard;
105                    },
106                    State::SkipHeader(did_skip) => {
107                        if !options.has_header || *did_skip {
108                            state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
109                            continue;
110                        }
111
112                        header_line = Some(mem_slice_line.clone());
113                        *did_skip = true;
114                        break LineUse::ConsumeDiscard;
115                    },
116                    State::SkipRowsAfterHeader(remaining) => {
117                        let is_comment = is_comment_line(line, comment_prefix);
118
119                        if *remaining == 0 && !is_comment {
120                            state = State::ContentInspect;
121                            continue;
122                        }
123
124                        *remaining -= !is_comment as usize;
125                        break LineUse::ConsumeDiscard;
126                    },
127                    State::ContentInspect => {
128                        if let Some(func) = &mut inspect_first_content_row_fn {
129                            func(line);
130                        }
131
132                        state = State::InferCollect;
133                    },
134                    State::InferCollect => {
135                        if !is_comment_line(line, comment_prefix) {
136                            content_lines.push(mem_slice_line.clone());
137                            if content_lines.len() >= infer_schema_length {
138                                state = State::Done;
139                                continue;
140                            }
141                        }
142
143                        break LineUse::ConsumeKeep;
144                    },
145                    State::Done => {
146                        break LineUse::Done;
147                    },
148                }
149            };
150
151            Ok(done)
152        },
153    )?;
154
155    let infer_all_as_str = infer_schema_length == 0;
156
157    let inferred_schema = infer_schema(
158        &header_line,
159        &content_lines,
160        infer_all_as_str,
161        options,
162        projected_schema,
163    )?;
164
165    Ok((inferred_schema, leftover))
166}
167
168enum LineUse {
169    ConsumeDiscard,
170    ConsumeKeep,
171    Done,
172}
173
174/// Iterate over valid CSV lines produced by reader.
175///
176/// Returning `ConsumeDiscard` after `ConsumeKeep` is a logic error, since a segmented `MemSlice`
177/// can't be constructed.
178fn for_each_line_from_reader(
179    parse_options: &CsvParseOptions,
180    is_file_start: bool,
181    mut prev_leftover: MemSlice,
182    reader: &mut CompressedReader,
183    mut line_fn: impl FnMut(MemSlice) -> PolarsResult<LineUse>,
184) -> PolarsResult<MemSlice> {
185    let mut is_first_line = is_file_start;
186
187    // Since this is used for schema inference, we want to avoid needlessly large reads at first.
188    let mut read_size = 128 * 1024;
189    let mut retain_offset = None;
190
191    loop {
192        let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
193        if slice.is_empty() {
194            return Ok(MemSlice::EMPTY);
195        }
196
197        if is_first_line {
198            is_first_line = false;
199            const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
200            if slice.get(0..3) == UTF8_BOM_MARKER {
201                slice = slice.slice(3..slice.len());
202            }
203        }
204
205        let line_to_sub_slice = |line: &[u8]| {
206            let start = line.as_ptr() as usize - slice.as_ptr() as usize;
207            slice.slice(start..(start + line.len()))
208        };
209
210        // When reading a CSV with `has_header=False` we need to read up to `infer_schema_length` lines, but we only want to decompress the input once, so we grow a `MemSlice` that will be returned as leftover.
211        let effective_slice = if let Some(offset) = retain_offset {
212            slice.slice(offset..slice.len())
213        } else {
214            slice.clone()
215        };
216
217        let mut lines = SplitLines::new(
218            &effective_slice,
219            parse_options.quote_char,
220            parse_options.eol_char,
221            parse_options.comment_prefix.as_ref(),
222        );
223        let Some(mut prev_line) = lines.next() else {
224            read_size *= 2;
225            prev_leftover = slice;
226            continue;
227        };
228
229        let mut should_ret = false;
230
231        // The last line in `SplitLines` may be incomplete if `slice` ends before the file does, so
232        // we iterate everything except the last line.
233        for next_line in lines {
234            match line_fn(line_to_sub_slice(prev_line))? {
235                LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
236                LineUse::ConsumeKeep => {
237                    retain_offset
238                        .get_or_insert(prev_line.as_ptr() as usize - slice.as_ptr() as usize);
239                },
240                LineUse::Done => {
241                    should_ret = true;
242                    break;
243                },
244            }
245            prev_line = next_line;
246        }
247
248        let mut unconsumed_offset = prev_line.as_ptr() as usize - slice.as_ptr() as usize;
249
250        // EOF file reached, the last line will have no continuation on the next call to
251        // `read_next_slice`.
252        if bytes_read == 0 {
253            match line_fn(line_to_sub_slice(prev_line))? {
254                LineUse::ConsumeDiscard => {
255                    unconsumed_offset += prev_line.len();
256                    if slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
257                        unconsumed_offset += 1;
258                    }
259                },
260                LineUse::ConsumeKeep | LineUse::Done => (),
261            }
262            should_ret = true;
263        }
264
265        if retain_offset.is_some() {
266            prev_leftover = slice;
267        } else {
268            // Since `read_next_slice` has to copy the leftover bytes in the decompression case,
269            // it's more efficient to hand in as little as possible.
270            prev_leftover = slice.slice(unconsumed_offset..slice.len());
271        }
272
273        if should_ret {
274            let leftover = prev_leftover.slice(retain_offset.unwrap_or(0)..prev_leftover.len());
275            return Ok(leftover);
276        }
277    }
278}
279
280fn skip_lines_naive(
281    eol_char: u8,
282    skip_lines: usize,
283    raise_if_empty: bool,
284    reader: &mut CompressedReader,
285) -> PolarsResult<MemSlice> {
286    let mut prev_leftover = MemSlice::EMPTY;
287
288    if skip_lines == 0 {
289        return Ok(prev_leftover);
290    }
291
292    let mut remaining = skip_lines;
293    // Since this is used for schema inference, we want to avoid needlessly large reads at first.
294    let mut read_size = 128 * 1024;
295
296    loop {
297        let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
298        let mut bytes: &[u8] = &slice;
299
300        'inner: loop {
301            let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
302                read_size *= 2;
303                break 'inner;
304            };
305            pos = cmp::min(pos + 1, bytes.len());
306
307            bytes = &bytes[pos..];
308            remaining -= 1;
309
310            if remaining == 0 {
311                let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
312                prev_leftover = slice.slice(unconsumed_offset..slice.len());
313                return Ok(prev_leftover);
314            }
315        }
316
317        if bytes_read == 0 {
318            if raise_if_empty {
319                polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
320            } else {
321                return Ok(MemSlice::EMPTY);
322            }
323        }
324
325        // No need to search for naive eol twice in the leftover.
326        prev_leftover = MemSlice::EMPTY;
327    }
328}
329
330fn infer_schema(
331    header_line: &Option<MemSlice>,
332    content_lines: &[MemSlice],
333    infer_all_as_str: bool,
334    options: &CsvReadOptions,
335    projected_schema: Option<SchemaRef>,
336) -> PolarsResult<Schema> {
337    let has_no_inference_data = if options.has_header {
338        header_line.is_none()
339    } else {
340        content_lines.is_empty()
341    };
342
343    if options.raise_if_empty && has_no_inference_data {
344        polars_bail!(NoData: "empty CSV");
345    }
346
347    let mut inferred_schema = if has_no_inference_data {
348        Schema::default()
349    } else {
350        infer_file_schema_impl(
351            header_line,
352            content_lines,
353            infer_all_as_str,
354            &options.parse_options,
355            options.schema_overwrite.as_deref(),
356        )?
357    };
358
359    if let Some(schema) = &options.schema {
360        // Note: User can provide schema with more columns, they will simply
361        // be projected as NULL.
362        // TODO: Should maybe expose a missing_columns parameter to the API for this.
363        if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
364            polars_bail!(
365                SchemaMismatch:
366                "provided schema does not match number of columns in file ({} != {} in file)",
367                schema.len(),
368                inferred_schema.len(),
369            );
370        }
371
372        if options.parse_options.truncate_ragged_lines {
373            inferred_schema = Arc::unwrap_or_clone(schema.clone());
374        } else {
375            inferred_schema = schema
376                .iter_names()
377                .zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
378                .map(|(name, dtype)| (name.clone(), dtype))
379                .collect();
380        }
381    }
382
383    if let Some(dtypes) = options.dtype_overwrite.as_deref() {
384        for (i, dtype) in dtypes.iter().enumerate() {
385            inferred_schema.set_dtype_at_index(i, dtype.clone());
386        }
387    }
388
389    // TODO: We currently always override with the projected dtype, but this may cause issues e.g.
390    // with temporal types. This can be improved to better choose between the 2 dtypes.
391    if let Some(projected_schema) = projected_schema {
392        for (name, inferred_dtype) in inferred_schema.iter_mut() {
393            if let Some(projected_dtype) = projected_schema.get(name) {
394                *inferred_dtype = projected_dtype.clone();
395            }
396        }
397    }
398
399    Ok(inferred_schema)
400}