polars_io/csv/read/
parser.rs

1use memchr::memchr2_iter;
2use polars_core::prelude::*;
3use polars_core::{POOL, config};
4use polars_error::feature_gated;
5use polars_utils::mmap::MMapSemaphore;
6use polars_utils::plpath::PlPathRef;
7use polars_utils::select::select_unpredictable;
8use rayon::prelude::*;
9
10use super::CsvParseOptions;
11use super::buffer::Buffer;
12use super::options::{CommentPrefix, NullValuesCompiled};
13use super::splitfields::SplitFields;
14use crate::prelude::_csv_read_internal::find_starting_point;
15use crate::utils::compression::maybe_decompress_bytes;
16
17/// Read the number of rows without parsing columns
18/// useful for count(*) queries
19#[allow(clippy::too_many_arguments)]
20pub fn count_rows(
21    addr: PlPathRef<'_>,
22    quote_char: Option<u8>,
23    comment_prefix: Option<&CommentPrefix>,
24    eol_char: u8,
25    has_header: bool,
26    skip_lines: usize,
27    skip_rows_before_header: usize,
28    skip_rows_after_header: usize,
29) -> PolarsResult<usize> {
30    let file = match addr
31        .as_local_path()
32        .and_then(|v| (!config::force_async()).then_some(v))
33    {
34        None => feature_gated!("cloud", {
35            crate::file_cache::FILE_CACHE
36                .get_entry(addr)
37                // Safety: This was initialized by schema inference.
38                .unwrap()
39                .try_open_assume_latest()?
40        }),
41        Some(path) => polars_utils::open_file(path)?,
42    };
43
44    let mmap = MMapSemaphore::new_from_file(&file).unwrap();
45    let owned = &mut vec![];
46    let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
47
48    count_rows_from_slice_par(
49        reader_bytes,
50        quote_char,
51        comment_prefix,
52        eol_char,
53        has_header,
54        skip_lines,
55        skip_rows_before_header,
56        skip_rows_after_header,
57    )
58}
59
60/// Read the number of rows without parsing columns
61/// useful for count(*) queries
62#[allow(clippy::too_many_arguments)]
63pub fn count_rows_from_slice_par(
64    mut bytes: &[u8],
65    quote_char: Option<u8>,
66    comment_prefix: Option<&CommentPrefix>,
67    eol_char: u8,
68    has_header: bool,
69    skip_lines: usize,
70    skip_rows_before_header: usize,
71    skip_rows_after_header: usize,
72) -> PolarsResult<usize> {
73    let start_offset = find_starting_point(
74        bytes,
75        quote_char,
76        eol_char,
77        // schema_len
78        // NOTE: schema_len is normally required to differentiate handling a leading blank line
79        // between case (a) when schema_len == 1 (as an empty string) vs case (b) when
80        // schema_len > 1 (as a blank line to be ignored).
81        // We skip blank lines, even when UFT8-BOM is present and schema_len == 1.
82        usize::MAX,
83        skip_lines,
84        skip_rows_before_header,
85        skip_rows_after_header,
86        comment_prefix,
87        has_header,
88    )?;
89    bytes = &bytes[start_offset..];
90
91    #[cfg(debug_assertions)]
92    const BYTES_PER_CHUNK: usize = 128;
93    #[cfg(not(debug_assertions))]
94    const BYTES_PER_CHUNK: usize = 1 << 16;
95
96    let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
97    POOL.install(|| {
98        let mut states = Vec::new();
99        if comment_prefix.is_none() {
100            bytes
101                .par_chunks(BYTES_PER_CHUNK)
102                .map(|chunk| count.analyze_chunk(chunk))
103                .collect_into_vec(&mut states);
104        } else {
105            let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
106            (0..num_chunks)
107                .into_par_iter()
108                .map(|chunk_idx| {
109                    let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
110                    let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
111
112                    if start_offset != 0 {
113                        // Ensure we start at the start of a line.
114                        if let Some(nl_off) = bytes[start_offset..next_start_offset]
115                            .iter()
116                            .position(|b| *b == b'\n')
117                        {
118                            start_offset += nl_off + 1;
119                        } else {
120                            return count.analyze_chunk(&[]);
121                        }
122                    }
123
124                    let stop_offset = if let Some(nl_off) =
125                        bytes[next_start_offset..].iter().position(|b| *b == b'\n')
126                    {
127                        next_start_offset + nl_off + 1
128                    } else {
129                        bytes.len()
130                    };
131
132                    count.analyze_chunk(&bytes[start_offset..stop_offset])
133                })
134                .collect_into_vec(&mut states);
135        }
136
137        let mut n = 0;
138        let mut in_string = false;
139        for pair in states {
140            n += pair[in_string as usize].newline_count;
141            in_string = pair[in_string as usize].end_inside_string;
142        }
143        if let Some(last) = bytes.last() {
144            n += (*last != eol_char) as usize;
145        }
146        Ok(n)
147    })
148}
149
150/// Read the number of rows without parsing columns, assuming bytes is at a
151/// newline starting point. Does not deal with start/header.
152pub fn count_rows_from_slice_raw(
153    bytes: &[u8],
154    quote_char: Option<u8>,
155    comment_prefix: Option<&CommentPrefix>,
156    eol_char: u8,
157) -> PolarsResult<usize> {
158    Ok(
159        CountLines::new(quote_char, eol_char, comment_prefix.cloned())
160            .count(bytes)
161            .0,
162    )
163}
164
165/// Skip the utf-8 Byte Order Mark.
166/// credits to csv-core
167pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
168    if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
169        &input[3..]
170    } else {
171        input
172    }
173}
174
175/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
176///
177/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
178#[inline]
179pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
180    match comment_prefix {
181        Some(CommentPrefix::Single(c)) => line.first() == Some(c),
182        Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
183        None => false,
184    }
185}
186
187/// Find the nearest next line position.
188/// Does not check for new line characters embedded in String fields.
189pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
190    let pos = memchr::memchr(eol_char, input)? + 1;
191    if input.len() - pos == 0 {
192        return None;
193    }
194    Some(pos)
195}
196
197pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
198    for _ in 0..skip {
199        if let Some(pos) = next_line_position_naive(input, eol_char) {
200            input = &input[pos..];
201        } else {
202            return input;
203        }
204    }
205    input
206}
207
208/// Find the nearest next line position that is not embedded in a String field.
209pub(super) fn next_line_position(
210    mut input: &[u8],
211    mut expected_fields: Option<usize>,
212    separator: u8,
213    quote_char: Option<u8>,
214    eol_char: u8,
215) -> Option<usize> {
216    fn accept_line(
217        line: &[u8],
218        expected_fields: usize,
219        separator: u8,
220        eol_char: u8,
221        quote_char: Option<u8>,
222    ) -> bool {
223        let mut count = 0usize;
224        for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
225            if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
226                return false;
227            }
228            count += 1;
229        }
230
231        // if the latest field is missing
232        // e.g.:
233        // a,b,c
234        // vala,valb,
235        // SplitFields returns a count that is 1 less
236        // There fore we accept:
237        // expected == count
238        // and
239        // expected == count - 1
240        expected_fields.wrapping_sub(count) <= 1
241    }
242
243    // we check 3 subsequent lines for `accept_line` before we accept
244    // if 3 groups are rejected we reject completely
245    let mut rejected_line_groups = 0u8;
246
247    let mut total_pos = 0;
248    if input.is_empty() {
249        return None;
250    }
251    let mut lines_checked = 0u8;
252    loop {
253        if rejected_line_groups >= 3 {
254            return None;
255        }
256        lines_checked = lines_checked.wrapping_add(1);
257        // headers might have an extra value
258        // So if we have churned through enough lines
259        // we try one field less.
260        if lines_checked == u8::MAX {
261            if let Some(ef) = expected_fields {
262                expected_fields = Some(ef.saturating_sub(1))
263            }
264        };
265        let pos = memchr::memchr(eol_char, input)? + 1;
266        if input.len() - pos == 0 {
267            return None;
268        }
269        debug_assert!(pos <= input.len());
270        let new_input = unsafe { input.get_unchecked(pos..) };
271        let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
272        let line = lines.next();
273
274        match (line, expected_fields) {
275            // count the fields, and determine if they are equal to what we expect from the schema
276            (Some(line), Some(expected_fields)) => {
277                if accept_line(line, expected_fields, separator, eol_char, quote_char) {
278                    let mut valid = true;
279                    for line in lines.take(2) {
280                        if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
281                            valid = false;
282                            break;
283                        }
284                    }
285                    if valid {
286                        return Some(total_pos + pos);
287                    } else {
288                        rejected_line_groups += 1;
289                    }
290                } else {
291                    debug_assert!(pos < input.len());
292                    unsafe {
293                        input = input.get_unchecked(pos + 1..);
294                    }
295                    total_pos += pos + 1;
296                }
297            },
298            // don't count the fields
299            (Some(_), None) => return Some(total_pos + pos),
300            // // no new line found, check latest line (without eol) for number of fields
301            _ => return None,
302        }
303    }
304}
305
306#[inline(always)]
307pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
308    b == eol_char || b == b'\r'
309}
310
311#[inline(always)]
312pub(super) fn is_whitespace(b: u8) -> bool {
313    b == b' ' || b == b'\t'
314}
315
316/// May have false-positives, but not false negatives.
317#[inline(always)]
318pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
319    // We're interested in \t (ASCII 9) and " " (ASCII 32), both of which are
320    // <= 32. In that range there aren't a lot of other common symbols (besides
321    // newline), so this is a quick test which can be worth doing to avoid the
322    // exact test.
323    b <= 32
324}
325
326#[inline]
327fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
328where
329    F: Fn(u8) -> bool,
330{
331    if input.is_empty() {
332        return input;
333    }
334
335    let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
336    &input[read..]
337}
338
339/// Remove whitespace from the start of buffer.
340/// Makes sure that the bytes stream starts with
341///     'field_1,field_2'
342/// and not with
343///     '\nfield_1,field_1'
344#[inline]
345pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
346    skip_condition(input, is_whitespace)
347}
348
349#[inline]
350pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
351    skip_condition(input, |b| is_line_ending(b, eol_char))
352}
353
354/// An adapted version of std::iter::Split.
355/// This exists solely because we cannot split the file in lines naively as
356///
357/// ```text
358///    for line in bytes.split(b'\n') {
359/// ```
360///
361/// This will fail when strings fields are have embedded end line characters.
362/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
363pub(super) struct SplitLines<'a> {
364    v: &'a [u8],
365    quote_char: u8,
366    eol_char: u8,
367    #[cfg(feature = "simd")]
368    simd_eol_char: SimdVec,
369    #[cfg(feature = "simd")]
370    simd_quote_char: SimdVec,
371    #[cfg(feature = "simd")]
372    previous_valid_eols: u64,
373    total_index: usize,
374    quoting: bool,
375    comment_prefix: Option<&'a CommentPrefix>,
376}
377
378#[cfg(feature = "simd")]
379const SIMD_SIZE: usize = 64;
380#[cfg(feature = "simd")]
381use std::simd::prelude::*;
382
383#[cfg(feature = "simd")]
384use polars_utils::clmul::prefix_xorsum_inclusive;
385
386#[cfg(feature = "simd")]
387type SimdVec = u8x64;
388
389impl<'a> SplitLines<'a> {
390    pub(super) fn new(
391        slice: &'a [u8],
392        quote_char: Option<u8>,
393        eol_char: u8,
394        comment_prefix: Option<&'a CommentPrefix>,
395    ) -> Self {
396        let quoting = quote_char.is_some();
397        let quote_char = quote_char.unwrap_or(b'\"');
398        #[cfg(feature = "simd")]
399        let simd_eol_char = SimdVec::splat(eol_char);
400        #[cfg(feature = "simd")]
401        let simd_quote_char = SimdVec::splat(quote_char);
402        Self {
403            v: slice,
404            quote_char,
405            eol_char,
406            #[cfg(feature = "simd")]
407            simd_eol_char,
408            #[cfg(feature = "simd")]
409            simd_quote_char,
410            #[cfg(feature = "simd")]
411            previous_valid_eols: 0,
412            total_index: 0,
413            quoting,
414            comment_prefix,
415        }
416    }
417}
418
419impl<'a> SplitLines<'a> {
420    // scalar as in non-simd
421    fn next_scalar(&mut self) -> Option<&'a [u8]> {
422        if self.v.is_empty() {
423            return None;
424        }
425        if is_comment_line(self.v, self.comment_prefix) {
426            return self.next_comment_line();
427        }
428        {
429            let mut pos = 0u32;
430            let mut iter = self.v.iter();
431            let mut in_field = false;
432            loop {
433                match iter.next() {
434                    Some(&c) => {
435                        pos += 1;
436
437                        if self.quoting && c == self.quote_char {
438                            // toggle between string field enclosure
439                            //      if we encounter a starting '"' -> in_field = true;
440                            //      if we encounter a closing '"' -> in_field = false;
441                            in_field = !in_field;
442                        }
443                        // if we are not in a string and we encounter '\n' we can stop at this position.
444                        else if c == self.eol_char && !in_field {
445                            break;
446                        }
447                    },
448                    None => {
449                        let remainder = self.v;
450                        self.v = &[];
451                        return Some(remainder);
452                    },
453                }
454            }
455
456            unsafe {
457                debug_assert!((pos as usize) <= self.v.len());
458
459                // return line up to this position
460                let ret = Some(
461                    self.v
462                        .get_unchecked(..(self.total_index + pos as usize - 1)),
463                );
464                // skip the '\n' token and update slice.
465                self.v = self.v.get_unchecked(self.total_index + pos as usize..);
466                ret
467            }
468        }
469    }
470    fn next_comment_line(&mut self) -> Option<&'a [u8]> {
471        if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
472            unsafe {
473                // return line up to this position
474                let ret = Some(self.v.get_unchecked(..(pos - 1)));
475                // skip the '\n' token and update slice.
476                self.v = self.v.get_unchecked(pos..);
477                ret
478            }
479        } else {
480            let remainder = self.v;
481            self.v = &[];
482            Some(remainder)
483        }
484    }
485}
486
487impl<'a> Iterator for SplitLines<'a> {
488    type Item = &'a [u8];
489
490    #[inline]
491    #[cfg(not(feature = "simd"))]
492    fn next(&mut self) -> Option<&'a [u8]> {
493        self.next_scalar()
494    }
495
496    #[inline]
497    #[cfg(feature = "simd")]
498    fn next(&mut self) -> Option<&'a [u8]> {
499        // First check cached value
500        if self.previous_valid_eols != 0 {
501            let pos = self.previous_valid_eols.trailing_zeros() as usize;
502            self.previous_valid_eols >>= (pos + 1) as u64;
503
504            unsafe {
505                debug_assert!((pos) <= self.v.len());
506
507                // return line up to this position
508                let ret = Some(self.v.get_unchecked(..pos));
509                // skip the '\n' token and update slice.
510                self.v = self.v.get_unchecked(pos + 1..);
511                return ret;
512            }
513        }
514        if self.v.is_empty() {
515            return None;
516        }
517        if self.comment_prefix.is_some() {
518            return self.next_scalar();
519        }
520
521        self.total_index = 0;
522        let mut not_in_field_previous_iter = true;
523
524        loop {
525            let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
526            if bytes.len() > SIMD_SIZE {
527                let lane: [u8; SIMD_SIZE] = unsafe {
528                    bytes
529                        .get_unchecked(0..SIMD_SIZE)
530                        .try_into()
531                        .unwrap_unchecked()
532                };
533                let simd_bytes = SimdVec::from(lane);
534                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
535
536                let valid_eols = if self.quoting {
537                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
538                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
539
540                    if not_in_field_previous_iter {
541                        not_in_quote_field = !not_in_quote_field;
542                    }
543                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
544                    eol_mask & not_in_quote_field
545                } else {
546                    eol_mask
547                };
548
549                if valid_eols != 0 {
550                    let pos = valid_eols.trailing_zeros() as usize;
551                    if pos == SIMD_SIZE - 1 {
552                        self.previous_valid_eols = 0;
553                    } else {
554                        self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
555                    }
556
557                    unsafe {
558                        let pos = self.total_index + pos;
559                        debug_assert!((pos) <= self.v.len());
560
561                        // return line up to this position
562                        let ret = Some(self.v.get_unchecked(..pos));
563                        // skip the '\n' token and update slice.
564                        self.v = self.v.get_unchecked(pos + 1..);
565                        return ret;
566                    }
567                } else {
568                    self.total_index += SIMD_SIZE;
569                }
570            } else {
571                // Denotes if we are in a string field, started with a quote
572                let mut in_field = !not_in_field_previous_iter;
573                let mut pos = 0u32;
574                let mut iter = bytes.iter();
575                loop {
576                    match iter.next() {
577                        Some(&c) => {
578                            pos += 1;
579
580                            if self.quoting && c == self.quote_char {
581                                // toggle between string field enclosure
582                                //      if we encounter a starting '"' -> in_field = true;
583                                //      if we encounter a closing '"' -> in_field = false;
584                                in_field = !in_field;
585                            }
586                            // if we are not in a string and we encounter '\n' we can stop at this position.
587                            else if c == self.eol_char && !in_field {
588                                break;
589                            }
590                        },
591                        None => {
592                            let remainder = self.v;
593                            self.v = &[];
594                            return Some(remainder);
595                        },
596                    }
597                }
598
599                unsafe {
600                    debug_assert!((pos as usize) <= self.v.len());
601
602                    // return line up to this position
603                    let ret = Some(
604                        self.v
605                            .get_unchecked(..(self.total_index + pos as usize - 1)),
606                    );
607                    // skip the '\n' token and update slice.
608                    self.v = self.v.get_unchecked(self.total_index + pos as usize..);
609                    return ret;
610                }
611            }
612        }
613    }
614}
615
616pub struct CountLines {
617    quote_char: u8,
618    eol_char: u8,
619    #[cfg(feature = "simd")]
620    simd_eol_char: SimdVec,
621    #[cfg(feature = "simd")]
622    simd_quote_char: SimdVec,
623    quoting: bool,
624    comment_prefix: Option<CommentPrefix>,
625}
626
627#[derive(Copy, Clone, Debug, Default)]
628pub struct LineStats {
629    newline_count: usize,
630    last_newline_offset: usize,
631    end_inside_string: bool,
632}
633
634impl CountLines {
635    pub fn new(
636        quote_char: Option<u8>,
637        eol_char: u8,
638        comment_prefix: Option<CommentPrefix>,
639    ) -> Self {
640        let quoting = quote_char.is_some();
641        let quote_char = quote_char.unwrap_or(b'\"');
642        #[cfg(feature = "simd")]
643        let simd_eol_char = SimdVec::splat(eol_char);
644        #[cfg(feature = "simd")]
645        let simd_quote_char = SimdVec::splat(quote_char);
646        Self {
647            quote_char,
648            eol_char,
649            #[cfg(feature = "simd")]
650            simd_eol_char,
651            #[cfg(feature = "simd")]
652            simd_quote_char,
653            quoting,
654            comment_prefix,
655        }
656    }
657
658    /// Analyzes a chunk of CSV data.
659    ///
660    /// Returns (newline_count, last_newline_offset, end_inside_string) twice,
661    /// the first is assuming the start of the chunk is *not* inside a string,
662    /// the second assuming the start is inside a string.
663    ///
664    /// If comment_prefix is not None the start of bytes must be at the start of
665    /// a line (and thus not in the middle of a comment).
666    pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
667        let mut states = [
668            LineStats {
669                newline_count: 0,
670                last_newline_offset: 0,
671                end_inside_string: false,
672            },
673            LineStats {
674                newline_count: 0,
675                last_newline_offset: 0,
676                end_inside_string: false,
677            },
678        ];
679
680        // If we have to deal with comments we can't use SIMD and have to explicitly do two passes.
681        if self.comment_prefix.is_some() {
682            states[0] = self.analyze_chunk_with_comment(bytes, false);
683            states[1] = self.analyze_chunk_with_comment(bytes, true);
684            return states;
685        }
686
687        // False if even number of quotes seen so far, true otherwise.
688        #[allow(unused_assignments)]
689        let mut global_quote_parity = false;
690        let mut scan_offset = 0;
691
692        #[cfg(feature = "simd")]
693        {
694            // 0 if even number of quotes seen so far, u64::MAX otherwise.
695            let mut global_quote_parity_mask = 0;
696            while scan_offset + 64 <= bytes.len() {
697                let block: [u8; 64] = unsafe {
698                    bytes
699                        .get_unchecked(scan_offset..scan_offset + 64)
700                        .try_into()
701                        .unwrap_unchecked()
702                };
703                let simd_bytes = SimdVec::from(block);
704                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
705                if self.quoting {
706                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
707                    let quote_parity =
708                        prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
709                    global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
710
711                    let start_outside_string_eol_mask = eol_mask & !quote_parity;
712                    states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
713                    states[0].last_newline_offset = select_unpredictable(
714                        start_outside_string_eol_mask != 0,
715                        (scan_offset + 63)
716                            .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
717                        states[0].last_newline_offset,
718                    );
719
720                    let start_inside_string_eol_mask = eol_mask & quote_parity;
721                    states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
722                    states[1].last_newline_offset = select_unpredictable(
723                        start_inside_string_eol_mask != 0,
724                        (scan_offset + 63)
725                            .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
726                        states[1].last_newline_offset,
727                    );
728                } else {
729                    states[0].newline_count += eol_mask.count_ones() as usize;
730                    states[0].last_newline_offset = select_unpredictable(
731                        eol_mask != 0,
732                        (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
733                        states[0].last_newline_offset,
734                    );
735                }
736
737                scan_offset += 64;
738            }
739
740            global_quote_parity = global_quote_parity_mask > 0;
741        }
742
743        while scan_offset < bytes.len() {
744            let c = unsafe { *bytes.get_unchecked(scan_offset) };
745            global_quote_parity ^= (c == self.quote_char) & self.quoting;
746
747            let state = &mut states[global_quote_parity as usize];
748            state.newline_count += (c == self.eol_char) as usize;
749            state.last_newline_offset =
750                select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
751
752            scan_offset += 1;
753        }
754
755        states[0].end_inside_string = global_quote_parity;
756        states[1].end_inside_string = !global_quote_parity;
757        states
758    }
759
760    // bytes must begin at the start of a line.
761    fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
762        let pre_s = match self.comment_prefix.as_ref().unwrap() {
763            CommentPrefix::Single(pc) => core::slice::from_ref(pc),
764            CommentPrefix::Multi(ps) => ps.as_bytes(),
765        };
766
767        let mut state = LineStats::default();
768        let mut scan_offset = 0;
769        while scan_offset < bytes.len() {
770            // Skip comment line if needed.
771            while bytes[scan_offset..].starts_with(pre_s) {
772                scan_offset += pre_s.len();
773                let Some(nl_off) = bytes[scan_offset..].iter().position(|c| *c == b'\n') else {
774                    break;
775                };
776                scan_offset += nl_off + 1;
777            }
778
779            while scan_offset < bytes.len() {
780                let c = unsafe { *bytes.get_unchecked(scan_offset) };
781                in_string ^= (c == self.quote_char) & self.quoting;
782
783                if c == self.eol_char && !in_string {
784                    state.newline_count += 1;
785                    state.last_newline_offset = scan_offset;
786                    scan_offset += 1;
787                    break;
788                } else {
789                    scan_offset += 1;
790                }
791            }
792        }
793
794        state.end_inside_string = in_string;
795        state
796    }
797
798    pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
799        loop {
800            let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
801
802            let (count, offset) = self.count(b);
803
804            if count > 0 || b.len() == bytes.len() {
805                return (count, offset);
806            }
807
808            *chunk_size *= 2;
809        }
810    }
811
812    /// Returns count and offset to split for remainder in slice.
813    #[cfg(feature = "simd")]
814    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
815        let mut total_idx = 0;
816        let original_bytes = bytes;
817        let mut count = 0;
818        let mut position = 0;
819        let mut not_in_field_previous_iter = true;
820
821        loop {
822            let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
823
824            if bytes.len() > SIMD_SIZE {
825                let lane: [u8; SIMD_SIZE] = unsafe {
826                    bytes
827                        .get_unchecked(0..SIMD_SIZE)
828                        .try_into()
829                        .unwrap_unchecked()
830                };
831                let simd_bytes = SimdVec::from(lane);
832                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
833
834                let valid_eols = if self.quoting {
835                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
836                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
837
838                    if not_in_field_previous_iter {
839                        not_in_quote_field = !not_in_quote_field;
840                    }
841                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
842                    eol_mask & not_in_quote_field
843                } else {
844                    eol_mask
845                };
846
847                if valid_eols != 0 {
848                    count += valid_eols.count_ones() as usize;
849                    position = total_idx + 63 - valid_eols.leading_zeros() as usize;
850                    debug_assert_eq!(original_bytes[position], self.eol_char)
851                }
852                total_idx += SIMD_SIZE;
853            } else if bytes.is_empty() {
854                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
855                return (count, position);
856            } else {
857                let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
858
859                let (count, position) = if c > 0 {
860                    (count + c, total_idx + o)
861                } else {
862                    (count, position)
863                };
864                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
865
866                return (count, position);
867            }
868        }
869    }
870
871    #[cfg(not(feature = "simd"))]
872    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
873        self.count_no_simd(bytes, false)
874    }
875
876    fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
877        let iter = bytes.iter();
878        let mut in_field = in_field;
879        let mut count = 0;
880        let mut position = 0;
881
882        for b in iter {
883            let c = *b;
884            if self.quoting && c == self.quote_char {
885                // toggle between string field enclosure
886                //      if we encounter a starting '"' -> in_field = true;
887                //      if we encounter a closing '"' -> in_field = false;
888                in_field = !in_field;
889            }
890            // If we are not in a string and we encounter '\n' we can stop at this position.
891            else if c == self.eol_char && !in_field {
892                position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
893                count += 1;
894            }
895        }
896        debug_assert!(count == 0 || bytes[position] == self.eol_char);
897
898        (count, position)
899    }
900}
901
902#[inline]
903fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
904    let mut in_field = false;
905
906    let mut idx = 0u32;
907    // micro optimizations
908    #[allow(clippy::explicit_counter_loop)]
909    for &c in bytes.iter() {
910        if c == quote_char {
911            // toggle between string field enclosure
912            //      if we encounter a starting '"' -> in_field = true;
913            //      if we encounter a closing '"' -> in_field = false;
914            in_field = !in_field;
915        }
916
917        if !in_field && c == needle {
918            return Some(idx as usize);
919        }
920        idx += 1;
921    }
922    None
923}
924
925#[inline]
926pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
927    let pos = match quote {
928        Some(quote) => find_quoted(bytes, quote, eol_char),
929        None => bytes.iter().position(|x| *x == eol_char),
930    };
931    match pos {
932        None => &[],
933        Some(pos) => &bytes[pos + 1..],
934    }
935}
936
937#[inline]
938pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
939    if let Some(pos) = next_line_position_naive(input, eol_char) {
940        unsafe { input.get_unchecked(pos..) }
941    } else {
942        &[]
943    }
944}
945
946/// Parse CSV.
947///
948/// # Arguments
949/// * `bytes` - input to parse
950/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
951///   thread has a different offset.
952/// * `projection` - Indices of the columns to project.
953/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
954///   fields are written to the buffers. The UTF8 data will be parsed later.
955///
956/// Returns the number of bytes parsed successfully.
957#[allow(clippy::too_many_arguments)]
958pub(super) fn parse_lines(
959    mut bytes: &[u8],
960    parse_options: &CsvParseOptions,
961    offset: usize,
962    ignore_errors: bool,
963    null_values: Option<&NullValuesCompiled>,
964    projection: &[usize],
965    buffers: &mut [Buffer],
966    n_lines: usize,
967    // length of original schema
968    schema_len: usize,
969    schema: &Schema,
970) -> PolarsResult<usize> {
971    assert!(
972        !projection.is_empty(),
973        "at least one column should be projected"
974    );
975    let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
976    // During projection pushdown we are not checking other csv fields.
977    // This would be very expensive and we don't care as we only want
978    // the projected columns.
979    if projection.len() != schema_len {
980        truncate_ragged_lines = true
981    }
982
983    // we use the pointers to track the no of bytes read.
984    let start = bytes.as_ptr() as usize;
985    let original_bytes_len = bytes.len();
986    let n_lines = n_lines as u32;
987
988    let mut line_count = 0u32;
989    loop {
990        if line_count > n_lines {
991            let end = bytes.as_ptr() as usize;
992            return Ok(end - start);
993        }
994
995        if bytes.is_empty() {
996            return Ok(original_bytes_len);
997        } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
998            // deal with comments
999            let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1000            bytes = bytes_rem;
1001            continue;
1002        }
1003
1004        // Every line we only need to parse the columns that are projected.
1005        // Therefore we check if the idx of the field is in our projected columns.
1006        // If it is not, we skip the field.
1007        let mut projection_iter = projection.iter().copied();
1008        let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1009        let mut processed_fields = 0;
1010
1011        let mut iter = SplitFields::new(
1012            bytes,
1013            parse_options.separator,
1014            parse_options.quote_char,
1015            parse_options.eol_char,
1016        );
1017        let mut idx = 0u32;
1018        let mut read_sol = 0;
1019        loop {
1020            match iter.next() {
1021                // end of line
1022                None => {
1023                    bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1024                    break;
1025                },
1026                Some((mut field, needs_escaping)) => {
1027                    let field_len = field.len();
1028
1029                    // +1 is the split character that is consumed by the iterator.
1030                    read_sol += field_len + 1;
1031
1032                    if idx == next_projected as u32 {
1033                        // the iterator is finished when it encounters a `\n`
1034                        // this could be preceded by a '\r'
1035                        unsafe {
1036                            if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1037                                field = field.get_unchecked(..field_len - 1);
1038                            }
1039                        }
1040
1041                        debug_assert!(processed_fields < buffers.len());
1042                        let buf = unsafe {
1043                            // SAFETY: processed fields index can never exceed the projection indices.
1044                            buffers.get_unchecked_mut(processed_fields)
1045                        };
1046                        let mut add_null = false;
1047
1048                        // if we have null values argument, check if this field equal null value
1049                        if let Some(null_values) = null_values {
1050                            let field = if needs_escaping && !field.is_empty() {
1051                                unsafe { field.get_unchecked(1..field.len() - 1) }
1052                            } else {
1053                                field
1054                            };
1055
1056                            // SAFETY:
1057                            // process fields is in bounds
1058                            add_null = unsafe { null_values.is_null(field, idx as usize) }
1059                        }
1060                        if add_null {
1061                            buf.add_null(!parse_options.missing_is_null && field.is_empty())
1062                        } else {
1063                            buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1064                                .map_err(|e| {
1065                                    let bytes_offset = offset + field.as_ptr() as usize - start;
1066                                    let unparsable = String::from_utf8_lossy(field);
1067                                    let column_name = schema.get_at_index(idx as usize).unwrap().0;
1068                                    polars_err!(
1069                                        ComputeError:
1070                                        "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1071                                        The current offset in the file is {} bytes.\n\
1072                                        \n\
1073                                        You might want to try:\n\
1074                                        - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1075                                        - specifying correct dtype with the `schema_overrides` argument\n\
1076                                        - setting `ignore_errors` to `True`,\n\
1077                                        - adding `{}` to the `null_values` list.\n\n\
1078                                        Original error: ```{}```",
1079                                        &unparsable,
1080                                        buf.dtype(),
1081                                        column_name,
1082                                        idx + 1,
1083                                        bytes_offset,
1084                                        &unparsable,
1085                                        e
1086                                    )
1087                                })?;
1088                        }
1089                        processed_fields += 1;
1090
1091                        // if we have all projected columns we are done with this line
1092                        match projection_iter.next() {
1093                            Some(p) => next_projected = p,
1094                            None => {
1095                                if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1096                                    bytes = unsafe { bytes.get_unchecked(read_sol..) };
1097                                } else {
1098                                    if !truncate_ragged_lines && read_sol < bytes.len() {
1099                                        polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1100
1101Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1102                                    }
1103                                    let bytes_rem = skip_this_line(
1104                                        unsafe { bytes.get_unchecked(read_sol - 1..) },
1105                                        parse_options.quote_char,
1106                                        parse_options.eol_char,
1107                                    );
1108                                    bytes = bytes_rem;
1109                                }
1110                                break;
1111                            },
1112                        }
1113                    }
1114                    idx += 1;
1115                },
1116            }
1117        }
1118
1119        // there can be lines that miss fields (also the comma values)
1120        // this means the splitter won't process them.
1121        // We traverse them to read them as null values.
1122        while processed_fields < projection.len() {
1123            debug_assert!(processed_fields < buffers.len());
1124            let buf = unsafe {
1125                // SAFETY: processed fields index can never exceed the projection indices.
1126                buffers.get_unchecked_mut(processed_fields)
1127            };
1128            buf.add_null(!parse_options.missing_is_null);
1129            processed_fields += 1;
1130        }
1131        line_count += 1;
1132    }
1133}
1134
1135#[cfg(test)]
1136mod test {
1137    use super::SplitLines;
1138
1139    #[test]
1140    fn test_splitlines() {
1141        let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1142        let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1143        assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1144        assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1145        assert_eq!(lines.next(), None);
1146
1147        let input2 = "1,'foo\n'\n2,'foo\n'\n";
1148        let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1149        assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1150        assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1151        assert_eq!(lines2.next(), None);
1152    }
1153}