Skip to main content

polars_io/csv/read/
streaming.rs

1use std::cmp;
2use std::iter::Iterator;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use polars_buffer::Buffer;
7use polars_core::prelude::Schema;
8use polars_core::schema::SchemaRef;
9use polars_error::{PolarsResult, polars_bail, polars_ensure};
10
11use crate::csv::read::schema_inference::infer_file_schema_impl;
12use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
13use crate::prelude::{CsvParseOptions, CsvReadOptions};
14use crate::utils::compression::{ByteSourceReader, CompressedReader};
15use crate::utils::stream_buf_reader::ReaderSource;
16
17pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
18
19/// Reads bytes from `reader` until the CSV starting point is reached depending on the options.
20///
21/// Returns the inferred schema and leftover bytes not yet consumed, which may be empty. The
22/// leftover bytes + `reader.read_next_slice` is guaranteed to start at first real content row.
23///
24/// `inspect_first_content_row_fn` allows looking at the first content row, this is where parsing
25/// will start. Beware even if the function is provided it's *not* guaranteed that the returned
26/// value will be `Some`, since it the CSV may be incomplete.
27///
28/// The reading is done in an iterative streaming fashion
29///
30/// This function isn't perf critical but would increase binary-size so don't inline it.
31#[inline(never)]
32pub fn read_until_start_and_infer_schema_from_compressed_reader(
33    options: &CsvReadOptions,
34    projected_schema: Option<SchemaRef>,
35    mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
36    reader: &mut CompressedReader,
37) -> PolarsResult<(Schema, Buffer<u8>)> {
38    // It's better to be above than below here.
39    const ESTIMATED_BYTES_PER_ROW: usize = 200;
40
41    #[derive(Copy, Clone)]
42    enum State {
43        // Ordered so that all states only happen after the ones before it.
44        SkipEmpty,
45        SkipRowsBeforeHeader(usize),
46        SkipHeader(bool),
47        SkipRowsAfterHeader(usize),
48        ContentInspect,
49        InferCollect,
50        Done,
51    }
52
53    polars_ensure!(
54        !(options.skip_lines != 0 && options.skip_rows != 0),
55        InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
56    );
57
58    // We have to treat skip_lines differently since the lines it skips may not follow regular CSV
59    // quote escape rules.
60    let prev_leftover = skip_lines_naive_from_compressed_reader(
61        options.parse_options.eol_char,
62        options.skip_lines,
63        options.raise_if_empty,
64        reader,
65    )?;
66
67    let mut state = if options.has_header {
68        State::SkipEmpty
69    } else if options.skip_lines != 0 {
70        // skip_lines shouldn't skip extra comments before the header, so directly go to SkipHeader
71        // state.
72        State::SkipHeader(false)
73    } else {
74        State::SkipRowsBeforeHeader(options.skip_rows)
75    };
76
77    let comment_prefix = options.parse_options.comment_prefix.as_ref();
78    let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
79
80    let mut header_line = None;
81    let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or_else(|| {
82        reader
83            .total_len_estimate()
84            .saturating_div(ESTIMATED_BYTES_PER_ROW)
85    }));
86
87    // In the compressed case `reader.read_next_slice` has to copy the previous leftover into a new
88    // `Vec` which would lead to quadratic copying if we don't factor in `infer_schema_length` into
89    // the initial read size. We have to retain the row memory for schema inference and also for
90    // actual morsel generation. If `infer_schema_length` is set to `None` we will have to read the
91    // full input anyway so we can do so once and avoid re-copying.
92    let initial_read_size = options
93        .infer_schema_length
94        .map(|isl| {
95            cmp::max(
96                CompressedReader::initial_read_size(),
97                isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
98            )
99        })
100        .unwrap_or(usize::MAX);
101
102    let leftover = for_each_line_from_reader_from_compressed_reader(
103        &options.parse_options,
104        true,
105        prev_leftover,
106        initial_read_size,
107        reader,
108        |mem_slice_line| {
109            let line = &*mem_slice_line;
110
111            let done = loop {
112                match &mut state {
113                    State::SkipEmpty => {
114                        if line.is_empty() || line == b"\r" {
115                            break LineUse::ConsumeDiscard;
116                        }
117
118                        state = State::SkipRowsBeforeHeader(options.skip_rows);
119                    },
120                    State::SkipRowsBeforeHeader(remaining) => {
121                        let is_comment = is_comment_line(line, comment_prefix);
122
123                        if *remaining == 0 && !is_comment {
124                            state = State::SkipHeader(false);
125                            continue;
126                        }
127
128                        *remaining -= !is_comment as usize;
129                        break LineUse::ConsumeDiscard;
130                    },
131                    State::SkipHeader(did_skip) => {
132                        if !options.has_header || *did_skip {
133                            state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
134                            continue;
135                        }
136
137                        header_line = Some(mem_slice_line.clone());
138                        *did_skip = true;
139                        break LineUse::ConsumeDiscard;
140                    },
141                    State::SkipRowsAfterHeader(remaining) => {
142                        let is_comment = is_comment_line(line, comment_prefix);
143
144                        if *remaining == 0 && !is_comment {
145                            state = State::ContentInspect;
146                            continue;
147                        }
148
149                        *remaining -= !is_comment as usize;
150                        break LineUse::ConsumeDiscard;
151                    },
152                    State::ContentInspect => {
153                        if let Some(func) = &mut inspect_first_content_row_fn {
154                            func(line);
155                        }
156
157                        state = State::InferCollect;
158                    },
159                    State::InferCollect => {
160                        if !is_comment_line(line, comment_prefix) {
161                            content_lines.push(mem_slice_line.clone());
162                            if content_lines.len() >= infer_schema_length {
163                                state = State::Done;
164                                continue;
165                            }
166                        }
167
168                        break LineUse::ConsumeKeep;
169                    },
170                    State::Done => {
171                        break LineUse::Done;
172                    },
173                }
174            };
175
176            Ok(done)
177        },
178    )?;
179
180    let infer_all_as_str = infer_schema_length == 0;
181
182    let inferred_schema = infer_schema(
183        &header_line,
184        &content_lines,
185        infer_all_as_str,
186        options,
187        projected_schema,
188    )?;
189
190    Ok((inferred_schema, leftover))
191}
192
193/// Reads bytes from `reader` until the CSV starting point is reached depending on the options.
194///
195/// Returns the inferred schema and leftover bytes not yet consumed, which may be empty. The
196/// leftover bytes + `reader.read_next_slice` is guaranteed to start at first real content row.
197///
198/// `inspect_first_content_row_fn` allows looking at the first content row, this is where parsing
199/// will start. Beware even if the function is provided it's *not* guaranteed that the returned
200/// value will be `Some`, since it the CSV may be incomplete.
201///
202/// The reading is done in an iterative streaming fashion
203///
204/// This function isn't perf critical but would increase binary-size so don't inline it.
205#[inline(never)]
206pub fn read_until_start_and_infer_schema(
207    options: &CsvReadOptions,
208    projected_schema: Option<SchemaRef>,
209    decompressed_file_size_hint: Option<usize>,
210    mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
211    reader: &mut ByteSourceReader<ReaderSource>,
212) -> PolarsResult<(Schema, Buffer<u8>)> {
213    // It's better to be above than below here.
214    const ESTIMATED_BYTES_PER_ROW: usize = 200;
215
216    #[derive(Copy, Clone)]
217    enum State {
218        // Ordered so that all states only happen after the ones before it.
219        SkipEmpty,
220        SkipRowsBeforeHeader(usize),
221        SkipHeader(bool),
222        SkipRowsAfterHeader(usize),
223        ContentInspect,
224        InferCollect,
225        Done,
226    }
227
228    polars_ensure!(
229        !(options.skip_lines != 0 && options.skip_rows != 0),
230        InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
231    );
232
233    // We have to treat skip_lines differently since the lines it skips may not follow regular CSV
234    // quote escape rules.
235    let prev_leftover = skip_lines_naive(
236        options.parse_options.eol_char,
237        options.skip_lines,
238        options.raise_if_empty,
239        decompressed_file_size_hint,
240        reader,
241    )?;
242
243    let mut state = if options.has_header {
244        State::SkipEmpty
245    } else if options.skip_lines != 0 {
246        // skip_lines shouldn't skip extra comments before the header, so directly go to SkipHeader
247        // state.
248        State::SkipHeader(false)
249    } else {
250        State::SkipRowsBeforeHeader(options.skip_rows)
251    };
252
253    let comment_prefix = options.parse_options.comment_prefix.as_ref();
254    let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
255
256    let mut header_line = None;
257    let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or_else(|| {
258        decompressed_file_size_hint
259            .map(|size| size.saturating_div(ESTIMATED_BYTES_PER_ROW))
260            .unwrap_or(100)
261    }));
262
263    // In the compressed case `reader.read_next_slice` has to copy the previous leftover into a new
264    // `Vec` which would lead to quadratic copying if we don't factor in `infer_schema_length` into
265    // the initial read size. We have to retain the row memory for schema inference and also for
266    // actual morsel generation. If `infer_schema_length` is set to `None` we will have to read the
267    // full input anyway so we can do so once and avoid re-copying.
268    let initial_read_size = options
269        .infer_schema_length
270        .map(|isl| {
271            cmp::max(
272                CompressedReader::initial_read_size(),
273                isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
274            )
275        })
276        .unwrap_or(usize::MAX);
277
278    let leftover = for_each_line_from_reader(
279        &options.parse_options,
280        true,
281        prev_leftover,
282        initial_read_size,
283        decompressed_file_size_hint,
284        reader,
285        |mem_slice_line| {
286            let line = &*mem_slice_line;
287
288            let done = loop {
289                match &mut state {
290                    State::SkipEmpty => {
291                        if line.is_empty() || line == b"\r" {
292                            break LineUse::ConsumeDiscard;
293                        }
294
295                        state = State::SkipRowsBeforeHeader(options.skip_rows);
296                    },
297                    State::SkipRowsBeforeHeader(remaining) => {
298                        let is_comment = is_comment_line(line, comment_prefix);
299
300                        if *remaining == 0 && !is_comment {
301                            state = State::SkipHeader(false);
302                            continue;
303                        }
304
305                        *remaining -= !is_comment as usize;
306                        break LineUse::ConsumeDiscard;
307                    },
308                    State::SkipHeader(did_skip) => {
309                        if !options.has_header || *did_skip {
310                            state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
311                            continue;
312                        }
313
314                        header_line = Some(mem_slice_line.clone());
315                        *did_skip = true;
316                        break LineUse::ConsumeDiscard;
317                    },
318                    State::SkipRowsAfterHeader(remaining) => {
319                        let is_comment = is_comment_line(line, comment_prefix);
320
321                        if *remaining == 0 && !is_comment {
322                            state = State::ContentInspect;
323                            continue;
324                        }
325
326                        *remaining -= !is_comment as usize;
327                        break LineUse::ConsumeDiscard;
328                    },
329                    State::ContentInspect => {
330                        if let Some(func) = &mut inspect_first_content_row_fn {
331                            func(line);
332                        }
333
334                        state = State::InferCollect;
335                    },
336                    State::InferCollect => {
337                        if !is_comment_line(line, comment_prefix) {
338                            content_lines.push(mem_slice_line.clone());
339                            if content_lines.len() >= infer_schema_length {
340                                state = State::Done;
341                                continue;
342                            }
343                        }
344
345                        break LineUse::ConsumeKeep;
346                    },
347                    State::Done => {
348                        break LineUse::Done;
349                    },
350                }
351            };
352
353            Ok(done)
354        },
355    )?;
356
357    let infer_all_as_str = infer_schema_length == 0;
358
359    let inferred_schema = infer_schema(
360        &header_line,
361        &content_lines,
362        infer_all_as_str,
363        options,
364        projected_schema,
365    )?;
366
367    Ok((inferred_schema, leftover))
368}
369
370enum LineUse {
371    ConsumeDiscard,
372    ConsumeKeep,
373    Done,
374}
375
376/// Iterate over valid CSV lines produced by reader.
377///
378/// Returning `ConsumeDiscard` after `ConsumeKeep` is a logic error, since a segmented `Buffer`
379/// can't be constructed.
380fn for_each_line_from_reader_from_compressed_reader(
381    parse_options: &CsvParseOptions,
382    is_file_start: bool,
383    mut prev_leftover: Buffer<u8>,
384    initial_read_size: usize,
385    reader: &mut CompressedReader,
386    mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
387) -> PolarsResult<Buffer<u8>> {
388    let mut is_first_line = is_file_start;
389
390    let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
391        .map(|x| {
392            x.parse::<NonZeroUsize>()
393                .unwrap_or_else(|_| {
394                    panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
395                })
396                .get()
397        })
398        .ok();
399
400    let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
401    let mut retain_offset = None;
402
403    loop {
404        let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
405        if slice.is_empty() {
406            return Ok(Buffer::new());
407        }
408
409        if is_first_line {
410            is_first_line = false;
411            const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
412            if slice.get(0..3) == UTF8_BOM_MARKER {
413                slice = slice.sliced(3..);
414            }
415        }
416
417        let line_to_sub_slice = |line: &[u8]| {
418            let start = line.as_ptr() as usize - slice.as_ptr() as usize;
419            slice.clone().sliced(start..(start + line.len()))
420        };
421
422        // When reading a CSV with `has_header=False` we need to read up to `infer_schema_length` lines, but we only want to decompress the input once, so we grow a `Buffer` that will be returned as leftover.
423        let effective_slice = if let Some(offset) = retain_offset {
424            slice.clone().sliced(offset..)
425        } else {
426            slice.clone()
427        };
428
429        let mut lines = SplitLines::new(
430            &effective_slice,
431            parse_options.quote_char,
432            parse_options.eol_char,
433            parse_options.comment_prefix.as_ref(),
434        );
435        let Some(mut prev_line) = lines.next() else {
436            read_size = read_size.saturating_mul(2);
437            prev_leftover = slice;
438            continue;
439        };
440
441        let mut should_ret = false;
442
443        // The last line in `SplitLines` may be incomplete if `slice` ends before the file does, so
444        // we iterate everything except the last line.
445        for next_line in lines {
446            match line_fn(line_to_sub_slice(prev_line))? {
447                LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
448                LineUse::ConsumeKeep => {
449                    if retain_offset.is_none() {
450                        let retain_start_offset =
451                            prev_line.as_ptr() as usize - slice.as_ptr() as usize;
452                        prev_leftover = slice.clone().sliced(retain_start_offset..);
453                        retain_offset = Some(0);
454                    }
455                },
456                LineUse::Done => {
457                    should_ret = true;
458                    break;
459                },
460            }
461            prev_line = next_line;
462        }
463
464        let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
465
466        // EOF file reached, the last line will have no continuation on the next call to
467        // `read_next_slice`.
468        if bytes_read < read_size {
469            match line_fn(line_to_sub_slice(prev_line))? {
470                LineUse::ConsumeDiscard => {
471                    debug_assert!(retain_offset.is_none());
472                    unconsumed_offset += prev_line.len();
473                    if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
474                        unconsumed_offset += 1;
475                    }
476                },
477                LineUse::ConsumeKeep | LineUse::Done => (),
478            }
479            should_ret = true;
480        }
481
482        if let Some(offset) = &mut retain_offset {
483            if *offset == 0 {
484                // `unconsumed_offset` was computed with the full `slice` as base reference
485                // compensate retained offset.
486                *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
487            } else {
488                prev_leftover = slice;
489                *offset += unconsumed_offset;
490            }
491        } else {
492            // Since `read_next_slice` has to copy the leftover bytes in the decompression case,
493            // it's more efficient to hand in as little as possible.
494            prev_leftover = slice.sliced(unconsumed_offset..);
495        }
496
497        if should_ret {
498            return Ok(prev_leftover);
499        }
500
501        if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {
502            read_size *= 4;
503        }
504    }
505}
506
507/// Iterate over valid CSV lines produced by reader.
508///
509/// Returning `ConsumeDiscard` after `ConsumeKeep` is a logic error, since a segmented `Buffer`
510/// can't be constructed.
511fn for_each_line_from_reader(
512    parse_options: &CsvParseOptions,
513    is_file_start: bool,
514    mut prev_leftover: Buffer<u8>,
515    initial_read_size: usize,
516    decompressed_file_size_hint: Option<usize>,
517    reader: &mut ByteSourceReader<ReaderSource>,
518    mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
519) -> PolarsResult<Buffer<u8>> {
520    let mut is_first_line = is_file_start;
521
522    let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
523        .map(|x| {
524            x.parse::<NonZeroUsize>()
525                .unwrap_or_else(|_| {
526                    panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
527                })
528                .get()
529        })
530        .ok();
531
532    let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
533    let mut retain_offset = None;
534
535    loop {
536        let (mut slice, bytes_read) =
537            reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
538        if slice.is_empty() {
539            return Ok(Buffer::new());
540        }
541
542        if is_first_line {
543            is_first_line = false;
544            const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
545            if slice.get(0..3) == UTF8_BOM_MARKER {
546                slice = slice.sliced(3..);
547            }
548        }
549
550        let line_to_sub_slice = |line: &[u8]| {
551            let start = line.as_ptr() as usize - slice.as_ptr() as usize;
552            slice.clone().sliced(start..(start + line.len()))
553        };
554
555        // When reading a CSV with `has_header=False` we need to read up to `infer_schema_length` lines, but we only want to decompress the input once, so we grow a `Buffer` that will be returned as leftover.
556        let effective_slice = if let Some(offset) = retain_offset {
557            slice.clone().sliced(offset..)
558        } else {
559            slice.clone()
560        };
561
562        let mut lines = SplitLines::new(
563            &effective_slice,
564            parse_options.quote_char,
565            parse_options.eol_char,
566            parse_options.comment_prefix.as_ref(),
567        );
568        let Some(mut prev_line) = lines.next() else {
569            read_size = read_size.saturating_mul(2);
570            prev_leftover = slice;
571            continue;
572        };
573
574        let mut should_ret = false;
575
576        // The last line in `SplitLines` may be incomplete if `slice` ends before the file does, so
577        // we iterate everything except the last line.
578        for next_line in lines {
579            match line_fn(line_to_sub_slice(prev_line))? {
580                LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
581                LineUse::ConsumeKeep => {
582                    if retain_offset.is_none() {
583                        let retain_start_offset =
584                            prev_line.as_ptr() as usize - slice.as_ptr() as usize;
585                        prev_leftover = slice.clone().sliced(retain_start_offset..);
586                        retain_offset = Some(0);
587                    }
588                },
589                LineUse::Done => {
590                    should_ret = true;
591                    break;
592                },
593            }
594            prev_line = next_line;
595        }
596
597        let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
598
599        // EOF file reached, the last line will have no continuation on the next call to
600        // `read_next_slice`.
601        if bytes_read < read_size {
602            match line_fn(line_to_sub_slice(prev_line))? {
603                LineUse::ConsumeDiscard => {
604                    debug_assert!(retain_offset.is_none());
605                    unconsumed_offset += prev_line.len();
606                    if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
607                        unconsumed_offset += 1;
608                    }
609                },
610                LineUse::ConsumeKeep | LineUse::Done => (),
611            }
612            should_ret = true;
613        }
614
615        if let Some(offset) = &mut retain_offset {
616            if *offset == 0 {
617                // `unconsumed_offset` was computed with the full `slice` as base reference
618                // compensate retained offset.
619                *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
620            } else {
621                prev_leftover = slice;
622                *offset += unconsumed_offset;
623            }
624        } else {
625            // Since `read_next_slice` has to copy the leftover bytes in the decompression case,
626            // it's more efficient to hand in as little as possible.
627            prev_leftover = slice.sliced(unconsumed_offset..);
628        }
629
630        if should_ret {
631            return Ok(prev_leftover);
632        }
633
634        if read_size < ByteSourceReader::<ReaderSource>::ideal_read_size()
635            && fixed_read_size.is_none()
636        {
637            read_size *= 4;
638        }
639    }
640}
641
642fn skip_lines_naive_from_compressed_reader(
643    eol_char: u8,
644    skip_lines: usize,
645    raise_if_empty: bool,
646    reader: &mut CompressedReader,
647) -> PolarsResult<Buffer<u8>> {
648    let mut prev_leftover = Buffer::new();
649
650    if skip_lines == 0 {
651        return Ok(prev_leftover);
652    }
653
654    let mut remaining = skip_lines;
655    let mut read_size = CompressedReader::initial_read_size();
656
657    loop {
658        let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
659        let mut bytes: &[u8] = &slice;
660
661        'inner: loop {
662            let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
663                read_size = read_size.saturating_mul(2);
664                break 'inner;
665            };
666            pos = cmp::min(pos + 1, bytes.len());
667
668            bytes = &bytes[pos..];
669            remaining -= 1;
670
671            if remaining == 0 {
672                let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
673                prev_leftover = slice.sliced(unconsumed_offset..);
674                return Ok(prev_leftover);
675            }
676        }
677
678        if bytes_read == 0 {
679            if raise_if_empty {
680                polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
681            } else {
682                return Ok(Buffer::new());
683            }
684        }
685
686        // No need to search for naive eol twice in the leftover.
687        prev_leftover = Buffer::new();
688
689        if read_size < CompressedReader::ideal_read_size() {
690            read_size *= 4;
691        }
692    }
693}
694
695fn skip_lines_naive(
696    eol_char: u8,
697    skip_lines: usize,
698    raise_if_empty: bool,
699    decompressed_file_size_hint: Option<usize>,
700    reader: &mut ByteSourceReader<ReaderSource>,
701) -> PolarsResult<Buffer<u8>> {
702    let mut prev_leftover = Buffer::new();
703
704    if skip_lines == 0 {
705        return Ok(prev_leftover);
706    }
707
708    let mut remaining = skip_lines;
709    let mut read_size = CompressedReader::initial_read_size();
710
711    loop {
712        let (slice, bytes_read) =
713            reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
714        let mut bytes: &[u8] = &slice;
715
716        'inner: loop {
717            let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
718                read_size = read_size.saturating_mul(2);
719                break 'inner;
720            };
721            pos = cmp::min(pos + 1, bytes.len());
722
723            bytes = &bytes[pos..];
724            remaining -= 1;
725
726            if remaining == 0 {
727                let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
728                prev_leftover = slice.sliced(unconsumed_offset..);
729                return Ok(prev_leftover);
730            }
731        }
732
733        if bytes_read == 0 {
734            if raise_if_empty {
735                polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
736            } else {
737                return Ok(Buffer::new());
738            }
739        }
740
741        // No need to search for naive eol twice in the leftover.
742        prev_leftover = Buffer::new();
743
744        if read_size < CompressedReader::ideal_read_size() {
745            read_size *= 4;
746        }
747    }
748}
749
750fn infer_schema(
751    header_line: &Option<Buffer<u8>>,
752    content_lines: &[Buffer<u8>],
753    infer_all_as_str: bool,
754    options: &CsvReadOptions,
755    projected_schema: Option<SchemaRef>,
756) -> PolarsResult<Schema> {
757    let has_no_inference_data = if options.has_header {
758        header_line.is_none()
759    } else {
760        content_lines.is_empty()
761    };
762
763    if options.raise_if_empty && has_no_inference_data {
764        polars_bail!(NoData: "empty CSV");
765    }
766
767    let mut inferred_schema = if has_no_inference_data {
768        Schema::default()
769    } else {
770        infer_file_schema_impl(
771            header_line,
772            content_lines,
773            infer_all_as_str,
774            &options.parse_options,
775            options.schema_overwrite.as_deref(),
776        )
777    };
778
779    if let Some(schema) = &options.schema {
780        // Note: User can provide schema with more columns, they will simply
781        // be projected as NULL.
782        // TODO: Should maybe expose a missing_columns parameter to the API for this.
783        if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
784            polars_bail!(
785                SchemaMismatch:
786                "provided schema does not match number of columns in file ({} != {} in file)",
787                schema.len(),
788                inferred_schema.len(),
789            );
790        }
791
792        if options.parse_options.truncate_ragged_lines {
793            inferred_schema = Arc::unwrap_or_clone(schema.clone());
794        } else {
795            inferred_schema = schema
796                .iter_names()
797                .zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
798                .map(|(name, dtype)| (name.clone(), dtype))
799                .collect();
800        }
801    }
802
803    if let Some(dtypes) = options.dtype_overwrite.as_deref() {
804        for (i, dtype) in dtypes.iter().enumerate() {
805            inferred_schema.set_dtype_at_index(i, dtype.clone());
806        }
807    }
808
809    // TODO: We currently always override with the projected dtype, but this may cause issues e.g.
810    // with temporal types. This can be improved to better choose between the 2 dtypes.
811    if let Some(projected_schema) = projected_schema {
812        for (name, inferred_dtype) in inferred_schema.iter_mut() {
813            if let Some(projected_dtype) = projected_schema.get(name) {
814                *inferred_dtype = projected_dtype.clone();
815            }
816        }
817    }
818
819    Ok(inferred_schema)
820}