Skip to main content

polars_io/csv/read/
streaming.rs

1use std::cmp;
2use std::iter::Iterator;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use polars_buffer::Buffer;
7use polars_core::prelude::Schema;
8use polars_core::schema::SchemaRef;
9use polars_error::{PolarsResult, polars_bail, polars_ensure};
10
11use crate::csv::read::schema_inference::infer_file_schema_impl;
12use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
13use crate::prelude::{CsvParseOptions, CsvReadOptions};
14use crate::utils::compression::{ByteSourceReader, CompressedReader};
15use crate::utils::stream_buf_reader::ReaderSource;
16
17pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
18
19/// Reads bytes from `reader` until the CSV starting point is reached depending on the options.
20///
21/// Returns the inferred schema and leftover bytes not yet consumed, which may be empty. The
22/// leftover bytes + `reader.read_next_slice` is guaranteed to start at first real content row.
23///
24/// `inspect_first_content_row_fn` allows looking at the first content row, this is where parsing
25/// will start. Beware even if the function is provided it's *not* guaranteed that the returned
26/// value will be `Some`, since it the CSV may be incomplete.
27///
28/// The reading is done in an iterative streaming fashion
29///
30/// This function isn't perf critical but would increase binary-size so don't inline it.
31#[inline(never)]
32pub fn read_until_start_and_infer_schema_from_compressed_reader(
33    options: &CsvReadOptions,
34    projected_schema: Option<SchemaRef>,
35    mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
36    reader: &mut CompressedReader,
37) -> PolarsResult<(Schema, Buffer<u8>)> {
38    // It's better to be above than below here.
39    const ESTIMATED_BYTES_PER_ROW: usize = 200;
40
41    #[derive(Copy, Clone)]
42    enum State {
43        // Ordered so that all states only happen after the ones before it.
44        SkipEmpty,
45        SkipRowsBeforeHeader(usize),
46        SkipHeader(bool),
47        SkipRowsAfterHeader(usize),
48        ContentInspect,
49        InferCollect,
50        Done,
51    }
52
53    polars_ensure!(
54        !(options.skip_lines != 0 && options.skip_rows != 0),
55        InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
56    );
57
58    // We have to treat skip_lines differently since the lines it skips may not follow regular CSV
59    // quote escape rules.
60    let prev_leftover = skip_lines_naive_from_compressed_reader(
61        options.parse_options.eol_char,
62        options.skip_lines,
63        options.raise_if_empty,
64        reader,
65    )?;
66
67    let mut state = if options.has_header {
68        State::SkipEmpty
69    } else if options.skip_lines != 0 {
70        // skip_lines shouldn't skip extra comments before the header, so directly go to SkipHeader
71        // state.
72        State::SkipHeader(false)
73    } else {
74        State::SkipRowsBeforeHeader(options.skip_rows)
75    };
76
77    let comment_prefix = options.parse_options.comment_prefix.as_ref();
78    let infer_schema_length = if options.schema.is_some() {
79        // Don't actually infer if the schema is set.
80        Some(0)
81    } else {
82        options.infer_schema_length
83    };
84
85    let mut header_line = None;
86    let mut content_lines = Vec::with_capacity(infer_schema_length.unwrap_or_else(|| {
87        reader
88            .total_len_estimate()
89            .saturating_div(ESTIMATED_BYTES_PER_ROW)
90    }));
91
92    // In the compressed case `reader.read_next_slice` has to copy the previous leftover into a new
93    // `Vec` which would lead to quadratic copying if we don't factor in `infer_schema_length` into
94    // the initial read size. We have to retain the row memory for schema inference and also for
95    // actual morsel generation. If `infer_schema_length` is set to `None` we will have to read the
96    // full input anyway so we can do so once and avoid re-copying.
97    let initial_read_size = infer_schema_length
98        .map(|isl| {
99            cmp::max(
100                CompressedReader::initial_read_size(),
101                isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
102            )
103        })
104        .unwrap_or(usize::MAX);
105
106    let leftover = for_each_line_from_reader_from_compressed_reader(
107        &options.parse_options,
108        true,
109        prev_leftover,
110        initial_read_size,
111        reader,
112        |mem_slice_line| {
113            let line = &*mem_slice_line;
114
115            let done = loop {
116                match &mut state {
117                    State::SkipEmpty => {
118                        if line.is_empty() || line == b"\r" {
119                            break LineUse::ConsumeDiscard;
120                        }
121
122                        state = State::SkipRowsBeforeHeader(options.skip_rows);
123                    },
124                    State::SkipRowsBeforeHeader(remaining) => {
125                        let is_comment = is_comment_line(line, comment_prefix);
126
127                        if *remaining == 0 && !is_comment {
128                            state = State::SkipHeader(false);
129                            continue;
130                        }
131
132                        *remaining -= !is_comment as usize;
133                        break LineUse::ConsumeDiscard;
134                    },
135                    State::SkipHeader(did_skip) => {
136                        if !options.has_header || *did_skip {
137                            state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
138                            continue;
139                        }
140
141                        header_line = Some(mem_slice_line.clone());
142                        *did_skip = true;
143                        break LineUse::ConsumeDiscard;
144                    },
145                    State::SkipRowsAfterHeader(remaining) => {
146                        let is_comment = is_comment_line(line, comment_prefix);
147
148                        if *remaining == 0 && !is_comment {
149                            state = State::ContentInspect;
150                            continue;
151                        }
152
153                        *remaining -= !is_comment as usize;
154                        break LineUse::ConsumeDiscard;
155                    },
156                    State::ContentInspect => {
157                        if let Some(func) = &mut inspect_first_content_row_fn {
158                            func(line);
159                        }
160
161                        state = State::InferCollect;
162                    },
163                    State::InferCollect => {
164                        if !is_comment_line(line, comment_prefix) {
165                            content_lines.push(mem_slice_line.clone());
166                            if content_lines.len() >= infer_schema_length.unwrap_or(usize::MAX) {
167                                state = State::Done;
168                                continue;
169                            }
170                        }
171
172                        break LineUse::ConsumeKeep;
173                    },
174                    State::Done => {
175                        break LineUse::Done;
176                    },
177                }
178            };
179
180            Ok(done)
181        },
182    )?;
183
184    let infer_all_as_str = infer_schema_length == Some(0);
185
186    let inferred_schema = infer_schema(
187        &header_line,
188        &content_lines,
189        infer_all_as_str,
190        options,
191        projected_schema,
192    )?;
193
194    Ok((inferred_schema, leftover))
195}
196
197/// Reads bytes from `reader` until the CSV starting point is reached depending on the options.
198///
199/// Returns the inferred schema and leftover bytes not yet consumed, which may be empty. The
200/// leftover bytes + `reader.read_next_slice` is guaranteed to start at first real content row.
201///
202/// `inspect_first_content_row_fn` allows looking at the first content row, this is where parsing
203/// will start. Beware even if the function is provided it's *not* guaranteed that the returned
204/// value will be `Some`, since it the CSV may be incomplete.
205///
206/// The reading is done in an iterative streaming fashion
207///
208/// This function isn't perf critical but would increase binary-size so don't inline it.
209#[inline(never)]
210pub fn read_until_start_and_infer_schema(
211    options: &CsvReadOptions,
212    projected_schema: Option<SchemaRef>,
213    decompressed_file_size_hint: Option<usize>,
214    mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
215    reader: &mut ByteSourceReader<ReaderSource>,
216) -> PolarsResult<(Schema, Buffer<u8>)> {
217    // It's better to be above than below here.
218    const ESTIMATED_BYTES_PER_ROW: usize = 200;
219
220    #[derive(Copy, Clone)]
221    enum State {
222        // Ordered so that all states only happen after the ones before it.
223        SkipEmpty,
224        SkipRowsBeforeHeader(usize),
225        SkipHeader(bool),
226        SkipRowsAfterHeader(usize),
227        ContentInspect,
228        InferCollect,
229        Done,
230    }
231
232    polars_ensure!(
233        !(options.skip_lines != 0 && options.skip_rows != 0),
234        InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
235    );
236
237    // We have to treat skip_lines differently since the lines it skips may not follow regular CSV
238    // quote escape rules.
239    let prev_leftover = skip_lines_naive(
240        options.parse_options.eol_char,
241        options.skip_lines,
242        options.raise_if_empty,
243        decompressed_file_size_hint,
244        reader,
245    )?;
246
247    let mut state = if options.has_header {
248        State::SkipEmpty
249    } else if options.skip_lines != 0 {
250        // skip_lines shouldn't skip extra comments before the header, so directly go to SkipHeader
251        // state.
252        State::SkipHeader(false)
253    } else {
254        State::SkipRowsBeforeHeader(options.skip_rows)
255    };
256
257    let comment_prefix = options.parse_options.comment_prefix.as_ref();
258    let infer_schema_length = if options.schema.is_some() {
259        // Don't actually infer if the schema is set.
260        Some(0)
261    } else {
262        options.infer_schema_length
263    };
264
265    let mut header_line = None;
266    let mut content_lines = Vec::with_capacity(infer_schema_length.unwrap_or_else(|| {
267        decompressed_file_size_hint
268            .map(|size| size.saturating_div(ESTIMATED_BYTES_PER_ROW))
269            .unwrap_or(100)
270    }));
271
272    // In the compressed case `reader.read_next_slice` has to copy the previous leftover into a new
273    // `Vec` which would lead to quadratic copying if we don't factor in `infer_schema_length` into
274    // the initial read size. We have to retain the row memory for schema inference and also for
275    // actual morsel generation. If `infer_schema_length` is set to `None` we will have to read the
276    // full input anyway so we can do so once and avoid re-copying.
277    let initial_read_size = infer_schema_length
278        .map(|isl| {
279            cmp::max(
280                CompressedReader::initial_read_size(),
281                isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
282            )
283        })
284        .unwrap_or(usize::MAX);
285
286    let leftover = for_each_line_from_reader(
287        &options.parse_options,
288        true,
289        prev_leftover,
290        initial_read_size,
291        decompressed_file_size_hint,
292        reader,
293        |mem_slice_line| {
294            let line = &*mem_slice_line;
295
296            let done = loop {
297                match &mut state {
298                    State::SkipEmpty => {
299                        if line.is_empty() || line == b"\r" {
300                            break LineUse::ConsumeDiscard;
301                        }
302
303                        state = State::SkipRowsBeforeHeader(options.skip_rows);
304                    },
305                    State::SkipRowsBeforeHeader(remaining) => {
306                        let is_comment = is_comment_line(line, comment_prefix);
307
308                        if *remaining == 0 && !is_comment {
309                            state = State::SkipHeader(false);
310                            continue;
311                        }
312
313                        *remaining -= !is_comment as usize;
314                        break LineUse::ConsumeDiscard;
315                    },
316                    State::SkipHeader(did_skip) => {
317                        if !options.has_header || *did_skip {
318                            state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
319                            continue;
320                        }
321
322                        header_line = Some(mem_slice_line.clone());
323                        *did_skip = true;
324                        break LineUse::ConsumeDiscard;
325                    },
326                    State::SkipRowsAfterHeader(remaining) => {
327                        let is_comment = is_comment_line(line, comment_prefix);
328
329                        if *remaining == 0 && !is_comment {
330                            state = State::ContentInspect;
331                            continue;
332                        }
333
334                        *remaining -= !is_comment as usize;
335                        break LineUse::ConsumeDiscard;
336                    },
337                    State::ContentInspect => {
338                        if let Some(func) = &mut inspect_first_content_row_fn {
339                            func(line);
340                        }
341
342                        state = State::InferCollect;
343                    },
344                    State::InferCollect => {
345                        if !is_comment_line(line, comment_prefix) {
346                            content_lines.push(mem_slice_line.clone());
347                            if content_lines.len() >= infer_schema_length.unwrap_or(usize::MAX) {
348                                state = State::Done;
349                                continue;
350                            }
351                        }
352
353                        break LineUse::ConsumeKeep;
354                    },
355                    State::Done => {
356                        break LineUse::Done;
357                    },
358                }
359            };
360
361            Ok(done)
362        },
363    )?;
364
365    let infer_all_as_str = infer_schema_length == Some(0);
366
367    let inferred_schema = infer_schema(
368        &header_line,
369        &content_lines,
370        infer_all_as_str,
371        options,
372        projected_schema,
373    )?;
374
375    Ok((inferred_schema, leftover))
376}
377
378enum LineUse {
379    ConsumeDiscard,
380    ConsumeKeep,
381    Done,
382}
383
384/// Iterate over valid CSV lines produced by reader.
385///
386/// Returning `ConsumeDiscard` after `ConsumeKeep` is a logic error, since a segmented `Buffer`
387/// can't be constructed.
388fn for_each_line_from_reader_from_compressed_reader(
389    parse_options: &CsvParseOptions,
390    is_file_start: bool,
391    mut prev_leftover: Buffer<u8>,
392    initial_read_size: usize,
393    reader: &mut CompressedReader,
394    mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
395) -> PolarsResult<Buffer<u8>> {
396    let mut is_first_line = is_file_start;
397
398    let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
399        .map(|x| {
400            x.parse::<NonZeroUsize>()
401                .unwrap_or_else(|_| {
402                    panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
403                })
404                .get()
405        })
406        .ok();
407
408    let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
409    let mut retain_offset = None;
410
411    loop {
412        let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
413        if slice.is_empty() {
414            return Ok(Buffer::new());
415        }
416
417        if is_first_line {
418            is_first_line = false;
419            const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
420            if slice.get(0..3) == UTF8_BOM_MARKER {
421                slice = slice.sliced(3..);
422            }
423        }
424
425        let line_to_sub_slice = |line: &[u8]| {
426            let start = line.as_ptr() as usize - slice.as_ptr() as usize;
427            slice.clone().sliced(start..(start + line.len()))
428        };
429
430        // When reading a CSV with `has_header=False` we need to read up to `infer_schema_length` lines, but we only want to decompress the input once, so we grow a `Buffer` that will be returned as leftover.
431        let effective_slice = if let Some(offset) = retain_offset {
432            slice.clone().sliced(offset..)
433        } else {
434            slice.clone()
435        };
436
437        let mut lines = SplitLines::new(
438            &effective_slice,
439            parse_options.quote_char,
440            parse_options.eol_char,
441            parse_options.comment_prefix.as_ref(),
442        );
443        let Some(mut prev_line) = lines.next() else {
444            read_size = read_size.saturating_mul(2);
445            prev_leftover = slice;
446            continue;
447        };
448
449        let mut should_ret = false;
450
451        // The last line in `SplitLines` may be incomplete if `slice` ends before the file does, so
452        // we iterate everything except the last line.
453        for next_line in lines {
454            match line_fn(line_to_sub_slice(prev_line))? {
455                LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
456                LineUse::ConsumeKeep => {
457                    if retain_offset.is_none() {
458                        let retain_start_offset =
459                            prev_line.as_ptr() as usize - slice.as_ptr() as usize;
460                        prev_leftover = slice.clone().sliced(retain_start_offset..);
461                        retain_offset = Some(0);
462                    }
463                },
464                LineUse::Done => {
465                    should_ret = true;
466                    break;
467                },
468            }
469            prev_line = next_line;
470        }
471
472        let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
473
474        // EOF file reached, the last line will have no continuation on the next call to
475        // `read_next_slice`.
476        if bytes_read < read_size {
477            match line_fn(line_to_sub_slice(prev_line))? {
478                LineUse::ConsumeDiscard => {
479                    debug_assert!(retain_offset.is_none());
480                    unconsumed_offset += prev_line.len();
481                    if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
482                        unconsumed_offset += 1;
483                    }
484                },
485                LineUse::ConsumeKeep | LineUse::Done => (),
486            }
487            should_ret = true;
488        }
489
490        if let Some(offset) = &mut retain_offset {
491            if *offset == 0 {
492                // `unconsumed_offset` was computed with the full `slice` as base reference
493                // compensate retained offset.
494                *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
495            } else {
496                prev_leftover = slice;
497                *offset += unconsumed_offset;
498            }
499        } else {
500            // Since `read_next_slice` has to copy the leftover bytes in the decompression case,
501            // it's more efficient to hand in as little as possible.
502            prev_leftover = slice.sliced(unconsumed_offset..);
503        }
504
505        if should_ret {
506            return Ok(prev_leftover);
507        }
508
509        if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {
510            read_size *= 4;
511        }
512    }
513}
514
515/// Iterate over valid CSV lines produced by reader.
516///
517/// Returning `ConsumeDiscard` after `ConsumeKeep` is a logic error, since a segmented `Buffer`
518/// can't be constructed.
519fn for_each_line_from_reader(
520    parse_options: &CsvParseOptions,
521    is_file_start: bool,
522    mut prev_leftover: Buffer<u8>,
523    initial_read_size: usize,
524    decompressed_file_size_hint: Option<usize>,
525    reader: &mut ByteSourceReader<ReaderSource>,
526    mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
527) -> PolarsResult<Buffer<u8>> {
528    let mut is_first_line = is_file_start;
529
530    let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
531        .map(|x| {
532            x.parse::<NonZeroUsize>()
533                .unwrap_or_else(|_| {
534                    panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
535                })
536                .get()
537        })
538        .ok();
539
540    let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
541    let mut retain_offset = None;
542
543    loop {
544        let (mut slice, bytes_read) =
545            reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
546        if slice.is_empty() {
547            return Ok(Buffer::new());
548        }
549
550        if is_first_line {
551            is_first_line = false;
552            const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
553            if slice.get(0..3) == UTF8_BOM_MARKER {
554                slice = slice.sliced(3..);
555            }
556        }
557
558        let line_to_sub_slice = |line: &[u8]| {
559            let start = line.as_ptr() as usize - slice.as_ptr() as usize;
560            slice.clone().sliced(start..(start + line.len()))
561        };
562
563        // When reading a CSV with `has_header=False` we need to read up to `infer_schema_length` lines, but we only want to decompress the input once, so we grow a `Buffer` that will be returned as leftover.
564        let effective_slice = if let Some(offset) = retain_offset {
565            slice.clone().sliced(offset..)
566        } else {
567            slice.clone()
568        };
569
570        let mut lines = SplitLines::new(
571            &effective_slice,
572            parse_options.quote_char,
573            parse_options.eol_char,
574            parse_options.comment_prefix.as_ref(),
575        );
576        let Some(mut prev_line) = lines.next() else {
577            read_size = read_size.saturating_mul(2);
578            prev_leftover = slice;
579            continue;
580        };
581
582        let mut should_ret = false;
583
584        // The last line in `SplitLines` may be incomplete if `slice` ends before the file does, so
585        // we iterate everything except the last line.
586        for next_line in lines {
587            match line_fn(line_to_sub_slice(prev_line))? {
588                LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
589                LineUse::ConsumeKeep => {
590                    if retain_offset.is_none() {
591                        let retain_start_offset =
592                            prev_line.as_ptr() as usize - slice.as_ptr() as usize;
593                        prev_leftover = slice.clone().sliced(retain_start_offset..);
594                        retain_offset = Some(0);
595                    }
596                },
597                LineUse::Done => {
598                    should_ret = true;
599                    break;
600                },
601            }
602            prev_line = next_line;
603        }
604
605        let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
606
607        // EOF file reached, the last line will have no continuation on the next call to
608        // `read_next_slice`.
609        if bytes_read < read_size {
610            match line_fn(line_to_sub_slice(prev_line))? {
611                LineUse::ConsumeDiscard => {
612                    debug_assert!(retain_offset.is_none());
613                    unconsumed_offset += prev_line.len();
614                    if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
615                        unconsumed_offset += 1;
616                    }
617                },
618                LineUse::ConsumeKeep | LineUse::Done => (),
619            }
620            should_ret = true;
621        }
622
623        if let Some(offset) = &mut retain_offset {
624            if *offset == 0 {
625                // `unconsumed_offset` was computed with the full `slice` as base reference
626                // compensate retained offset.
627                *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
628            } else {
629                prev_leftover = slice;
630                *offset += unconsumed_offset;
631            }
632        } else {
633            // Since `read_next_slice` has to copy the leftover bytes in the decompression case,
634            // it's more efficient to hand in as little as possible.
635            prev_leftover = slice.sliced(unconsumed_offset..);
636        }
637
638        if should_ret {
639            return Ok(prev_leftover);
640        }
641
642        if read_size < ByteSourceReader::<ReaderSource>::ideal_read_size()
643            && fixed_read_size.is_none()
644        {
645            read_size *= 4;
646        }
647    }
648}
649
650fn skip_lines_naive_from_compressed_reader(
651    eol_char: u8,
652    skip_lines: usize,
653    raise_if_empty: bool,
654    reader: &mut CompressedReader,
655) -> PolarsResult<Buffer<u8>> {
656    let mut prev_leftover = Buffer::new();
657
658    if skip_lines == 0 {
659        return Ok(prev_leftover);
660    }
661
662    let mut remaining = skip_lines;
663    let mut read_size = CompressedReader::initial_read_size();
664
665    loop {
666        let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
667        let mut bytes: &[u8] = &slice;
668
669        'inner: loop {
670            let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
671                read_size = read_size.saturating_mul(2);
672                break 'inner;
673            };
674            pos = cmp::min(pos + 1, bytes.len());
675
676            bytes = &bytes[pos..];
677            remaining -= 1;
678
679            if remaining == 0 {
680                let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
681                prev_leftover = slice.sliced(unconsumed_offset..);
682                return Ok(prev_leftover);
683            }
684        }
685
686        if bytes_read == 0 {
687            if raise_if_empty {
688                polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
689            } else {
690                return Ok(Buffer::new());
691            }
692        }
693
694        // No need to search for naive eol twice in the leftover.
695        prev_leftover = Buffer::new();
696
697        if read_size < CompressedReader::ideal_read_size() {
698            read_size *= 4;
699        }
700    }
701}
702
703fn skip_lines_naive(
704    eol_char: u8,
705    skip_lines: usize,
706    raise_if_empty: bool,
707    decompressed_file_size_hint: Option<usize>,
708    reader: &mut ByteSourceReader<ReaderSource>,
709) -> PolarsResult<Buffer<u8>> {
710    let mut prev_leftover = Buffer::new();
711
712    if skip_lines == 0 {
713        return Ok(prev_leftover);
714    }
715
716    let mut remaining = skip_lines;
717    let mut read_size = CompressedReader::initial_read_size();
718
719    loop {
720        let (slice, bytes_read) =
721            reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
722        let mut bytes: &[u8] = &slice;
723
724        'inner: loop {
725            let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
726                read_size = read_size.saturating_mul(2);
727                break 'inner;
728            };
729            pos = cmp::min(pos + 1, bytes.len());
730
731            bytes = &bytes[pos..];
732            remaining -= 1;
733
734            if remaining == 0 {
735                let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
736                prev_leftover = slice.sliced(unconsumed_offset..);
737                return Ok(prev_leftover);
738            }
739        }
740
741        if bytes_read == 0 {
742            if raise_if_empty {
743                polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
744            } else {
745                return Ok(Buffer::new());
746            }
747        }
748
749        // No need to search for naive eol twice in the leftover.
750        prev_leftover = Buffer::new();
751
752        if read_size < CompressedReader::ideal_read_size() {
753            read_size *= 4;
754        }
755    }
756}
757
758fn infer_schema(
759    header_line: &Option<Buffer<u8>>,
760    content_lines: &[Buffer<u8>],
761    infer_all_as_str: bool,
762    options: &CsvReadOptions,
763    projected_schema: Option<SchemaRef>,
764) -> PolarsResult<Schema> {
765    let has_no_inference_data = if options.has_header {
766        header_line.is_none()
767    } else {
768        content_lines.is_empty()
769    };
770
771    if options.raise_if_empty && has_no_inference_data {
772        polars_bail!(NoData: "empty CSV");
773    }
774
775    let mut inferred_schema = if has_no_inference_data {
776        Schema::default()
777    } else {
778        infer_file_schema_impl(
779            header_line,
780            content_lines,
781            infer_all_as_str,
782            &options.parse_options,
783            options.schema_overwrite.as_deref(),
784        )
785    };
786
787    if let Some(schema) = &options.schema {
788        // Note: User can provide schema with more columns, they will simply
789        // be projected as NULL.
790        // TODO: Should maybe expose a missing_columns parameter to the API for this.
791        if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
792            polars_bail!(
793                SchemaMismatch:
794                "provided schema does not match number of columns in file ({} != {} in file)",
795                schema.len(),
796                inferred_schema.len(),
797            );
798        }
799
800        if options.parse_options.truncate_ragged_lines {
801            inferred_schema = Arc::unwrap_or_clone(schema.clone());
802        } else {
803            inferred_schema = schema
804                .iter_names()
805                .zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
806                .map(|(name, dtype)| (name.clone(), dtype))
807                .collect();
808        }
809    }
810
811    if let Some(dtypes) = options.dtype_overwrite.as_deref() {
812        for (i, dtype) in dtypes.iter().enumerate() {
813            inferred_schema.set_dtype_at_index(i, dtype.clone());
814        }
815    }
816
817    // TODO: We currently always override with the projected dtype, but this may cause issues e.g.
818    // with temporal types. This can be improved to better choose between the 2 dtypes.
819    if let Some(projected_schema) = projected_schema {
820        for (name, inferred_dtype) in inferred_schema.iter_mut() {
821            if let Some(projected_dtype) = projected_schema.get(name) {
822                *inferred_dtype = projected_dtype.clone();
823            }
824        }
825    }
826
827    Ok(inferred_schema)
828}