Skip to main content

polars_io/csv/read/
parser.rs

1use std::cmp;
2
3use memchr::memchr2_iter;
4use polars_buffer::Buffer;
5use polars_core::POOL;
6use polars_core::prelude::*;
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::pl_path::PlRefPath;
10use polars_utils::select::select_unpredictable;
11use rayon::prelude::*;
12
13use super::CsvParseOptions;
14use super::builder::Builder;
15use super::options::{CommentPrefix, NullValuesCompiled};
16use super::splitfields::SplitFields;
17use crate::prelude::CsvReadOptions;
18use crate::prelude::streaming::read_until_start_and_infer_schema;
19use crate::utils::compression::ByteSourceReader;
20use crate::utils::stream_buf_reader::ReaderSource;
21
22/// Read the number of rows without parsing columns
23/// useful for count(*) queries
24#[allow(clippy::too_many_arguments)]
25pub fn count_rows(
26    path: PlRefPath,
27    quote_char: Option<u8>,
28    comment_prefix: Option<&CommentPrefix>,
29    eol_char: u8,
30    has_header: bool,
31    skip_lines: usize,
32    skip_rows_before_header: usize,
33    skip_rows_after_header: usize,
34    raise_if_empty: bool,
35) -> PolarsResult<usize> {
36    let file = if path.has_scheme() || polars_config::config().force_async() {
37        feature_gated!("cloud", {
38            crate::file_cache::FILE_CACHE
39                .get_entry(path)
40                // Safety: This was initialized by schema inference.
41                .unwrap()
42                .try_open_assume_latest()?
43        })
44    } else {
45        polars_utils::open_file(path.as_std_path())?
46    };
47
48    let mmap = MMapSemaphore::new_from_file(&file).unwrap();
49
50    count_rows_from_slice_par(
51        Buffer::from_owner(mmap),
52        quote_char,
53        comment_prefix,
54        eol_char,
55        has_header,
56        skip_lines,
57        skip_rows_before_header,
58        skip_rows_after_header,
59        raise_if_empty,
60    )
61}
62
63/// Read the number of rows without parsing columns.
64/// Useful for count(*) queries.
65/// Supports transparent decompression. Does not support truncated compressed files.
66#[allow(clippy::too_many_arguments)]
67pub fn count_rows_from_reader_par(
68    mut reader: ByteSourceReader<ReaderSource>,
69    quote_char: Option<u8>,
70    comment_prefix: Option<&CommentPrefix>,
71    eol_char: u8,
72    has_header: bool,
73    skip_lines: usize,
74    skip_rows_before_header: usize,
75    skip_rows_after_header: usize,
76    raise_if_empty: bool,
77    decompressed_size_hint: Option<usize>,
78) -> PolarsResult<usize> {
79    let reader_options = CsvReadOptions {
80        parse_options: Arc::new(CsvParseOptions {
81            quote_char,
82            comment_prefix: comment_prefix.cloned(),
83            eol_char,
84            ..Default::default()
85        }),
86        has_header,
87        skip_lines,
88        skip_rows: skip_rows_before_header,
89        skip_rows_after_header,
90        raise_if_empty,
91        ..Default::default()
92    };
93
94    let (_, mut leftover) = read_until_start_and_infer_schema(
95        &reader_options,
96        None,
97        decompressed_size_hint,
98        None,
99        &mut reader,
100    )?;
101
102    const BYTES_PER_CHUNK: usize = if cfg!(debug_assertions) {
103        128
104    } else {
105        512 * 1024
106    };
107
108    let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
109    POOL.install(|| {
110        let mut states = Vec::new();
111        let eof_unterminated_row;
112
113        if comment_prefix.is_none() {
114            let mut last_slice = Buffer::new();
115            let mut err = None;
116
117            let streaming_iter = std::iter::from_fn(|| {
118                let (slice, read_n) =
119                    match reader.read_next_slice(&leftover, BYTES_PER_CHUNK, Some(BYTES_PER_CHUNK))
120                    {
121                        Ok(tup) => tup,
122                        Err(e) => {
123                            err = Some(e);
124                            return None;
125                        },
126                    };
127
128                leftover = Buffer::new();
129                if slice.is_empty() && read_n == 0 {
130                    return None;
131                }
132
133                last_slice = slice.clone();
134                Some(slice)
135            });
136
137            states = streaming_iter
138                .enumerate()
139                .par_bridge()
140                .map(|(id, slice)| (count.analyze_chunk(&slice), id))
141                .collect::<Vec<_>>();
142
143            if let Some(e) = err {
144                return Err(e.into());
145            }
146
147            // par_bridge does not guarantee order, but is mostly sorted so `slice::sort` is a
148            // decent fit.
149            states.sort_by_key(|(_, id)| *id);
150
151            // Technically this is broken if the input has a comment line at the end that is longer
152            // than `BYTES_PER_CHUNK`, but in practice this ought to be fine.
153            eof_unterminated_row = ends_in_unterminated_row(&last_slice, eol_char, comment_prefix);
154        } else {
155            // For the non-compressed case this is a zero-copy op.
156            // TODO: Implement streaming chunk logic.
157            let (bytes, _) =
158                reader.read_next_slice(&leftover, usize::MAX, decompressed_size_hint)?;
159
160            let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
161            (0..num_chunks)
162                .into_par_iter()
163                .map(|chunk_idx| {
164                    let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
165                    let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
166
167                    if start_offset != 0 {
168                        // Ensure we start at the start of a line.
169                        if let Some(nl_off) = bytes[start_offset..next_start_offset]
170                            .iter()
171                            .position(|b| *b == eol_char)
172                        {
173                            start_offset += nl_off + 1;
174                        } else {
175                            return (count.analyze_chunk(&[]), 0);
176                        }
177                    }
178
179                    let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
180                        .iter()
181                        .position(|b| *b == eol_char)
182                    {
183                        next_start_offset + nl_off + 1
184                    } else {
185                        bytes.len()
186                    };
187
188                    (count.analyze_chunk(&bytes[start_offset..stop_offset]), 0)
189                })
190                .collect_into_vec(&mut states);
191
192            eof_unterminated_row = ends_in_unterminated_row(&bytes, eol_char, comment_prefix);
193        }
194
195        let mut n = 0;
196        let mut in_string = false;
197        for (pair, _) in states {
198            n += pair[in_string as usize].newline_count;
199            in_string = pair[in_string as usize].end_inside_string;
200        }
201        n += eof_unterminated_row as usize;
202        Ok(n)
203    })
204}
205
206/// Read the number of rows without parsing columns.
207/// Useful for count(*) queries.
208/// Supports transparent decompression.
209#[allow(clippy::too_many_arguments)]
210pub fn count_rows_from_slice_par(
211    buffer: Buffer<u8>,
212    quote_char: Option<u8>,
213    comment_prefix: Option<&CommentPrefix>,
214    eol_char: u8,
215    has_header: bool,
216    skip_lines: usize,
217    skip_rows_before_header: usize,
218    skip_rows_after_header: usize,
219    raise_if_empty: bool,
220) -> PolarsResult<usize> {
221    const ASSUMED_COMPRESSION_RATIO: usize = 4;
222
223    let buffer_len = buffer.len();
224    let reader = ByteSourceReader::from_memory(buffer)?;
225    let decompressed_size_hint = Some(
226        buffer_len
227            * reader
228                .compression()
229                .map_or(1, |_| ASSUMED_COMPRESSION_RATIO),
230    );
231
232    count_rows_from_reader_par(
233        reader,
234        quote_char,
235        comment_prefix,
236        eol_char,
237        has_header,
238        skip_lines,
239        skip_rows_before_header,
240        skip_rows_after_header,
241        raise_if_empty,
242        decompressed_size_hint,
243    )
244}
245
246/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
247///
248/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
249#[inline]
250pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
251    match comment_prefix {
252        Some(CommentPrefix::Single(c)) => line.first() == Some(c),
253        Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
254        None => false,
255    }
256}
257
258/// Find the nearest next line position.
259/// Does not check for new line characters embedded in String fields.
260pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
261    let pos = memchr::memchr(eol_char, input)? + 1;
262    if input.len() - pos == 0 {
263        return None;
264    }
265    Some(pos)
266}
267
268/// Find the nearest next line position that is not embedded in a String field.
269pub(super) fn next_line_position(
270    mut input: &[u8],
271    mut expected_fields: Option<usize>,
272    separator: u8,
273    quote_char: Option<u8>,
274    eol_char: u8,
275) -> Option<usize> {
276    fn accept_line(
277        line: &[u8],
278        expected_fields: usize,
279        separator: u8,
280        eol_char: u8,
281        quote_char: Option<u8>,
282    ) -> bool {
283        let mut count = 0usize;
284        for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
285            if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
286                return false;
287            }
288            count += 1;
289        }
290
291        // if the latest field is missing
292        // e.g.:
293        // a,b,c
294        // vala,valb,
295        // SplitFields returns a count that is 1 less
296        // There fore we accept:
297        // expected == count
298        // and
299        // expected == count - 1
300        expected_fields.wrapping_sub(count) <= 1
301    }
302
303    // we check 3 subsequent lines for `accept_line` before we accept
304    // if 3 groups are rejected we reject completely
305    let mut rejected_line_groups = 0u8;
306
307    let mut total_pos = 0;
308    if input.is_empty() {
309        return None;
310    }
311    let mut lines_checked = 0u8;
312    loop {
313        if rejected_line_groups >= 3 {
314            return None;
315        }
316        lines_checked = lines_checked.wrapping_add(1);
317        // headers might have an extra value
318        // So if we have churned through enough lines
319        // we try one field less.
320        if lines_checked == u8::MAX {
321            if let Some(ef) = expected_fields {
322                expected_fields = Some(ef.saturating_sub(1))
323            }
324        };
325        let pos = memchr::memchr(eol_char, input)? + 1;
326        if input.len() - pos == 0 {
327            return None;
328        }
329        debug_assert!(pos <= input.len());
330        let new_input = unsafe { input.get_unchecked(pos..) };
331        let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
332        let line = lines.next();
333
334        match (line, expected_fields) {
335            // count the fields, and determine if they are equal to what we expect from the schema
336            (Some(line), Some(expected_fields)) => {
337                if accept_line(line, expected_fields, separator, eol_char, quote_char) {
338                    let mut valid = true;
339                    for line in lines.take(2) {
340                        if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
341                            valid = false;
342                            break;
343                        }
344                    }
345                    if valid {
346                        return Some(total_pos + pos);
347                    } else {
348                        rejected_line_groups += 1;
349                    }
350                } else {
351                    debug_assert!(pos < input.len());
352                    unsafe {
353                        input = input.get_unchecked(pos + 1..);
354                    }
355                    total_pos += pos + 1;
356                }
357            },
358            // don't count the fields
359            (Some(_), None) => return Some(total_pos + pos),
360            // // no new line found, check latest line (without eol) for number of fields
361            _ => return None,
362        }
363    }
364}
365
366#[inline(always)]
367pub(super) fn is_whitespace(b: u8) -> bool {
368    b == b' ' || b == b'\t'
369}
370
371/// May have false-positives, but not false negatives.
372#[inline(always)]
373pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
374    // We're interested in \t (ASCII 9) and " " (ASCII 32), both of which are
375    // <= 32. In that range there aren't a lot of other common symbols (besides
376    // newline), so this is a quick test which can be worth doing to avoid the
377    // exact test.
378    b <= 32
379}
380
381#[inline]
382fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
383where
384    F: Fn(u8) -> bool,
385{
386    if input.is_empty() {
387        return input;
388    }
389
390    let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
391    &input[read..]
392}
393
394/// Remove whitespace from the start of buffer.
395/// Makes sure that the bytes stream starts with
396///     'field_1,field_2'
397/// and not with
398///     '\nfield_1,field_1'
399#[inline]
400pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
401    skip_condition(input, is_whitespace)
402}
403
404/// An adapted version of std::iter::Split.
405/// This exists solely because we cannot split the file in lines naively as
406///
407/// ```text
408///    for line in bytes.split(b'\n') {
409/// ```
410///
411/// This will fail when strings fields are have embedded end line characters.
412/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
413pub struct SplitLines<'a> {
414    v: &'a [u8],
415    quote_char: u8,
416    eol_char: u8,
417    #[cfg(feature = "simd")]
418    simd_eol_char: SimdVec,
419    #[cfg(feature = "simd")]
420    simd_quote_char: SimdVec,
421    #[cfg(feature = "simd")]
422    previous_valid_eols: u64,
423    total_index: usize,
424    quoting: bool,
425    comment_prefix: Option<&'a CommentPrefix>,
426}
427
428#[cfg(feature = "simd")]
429const SIMD_SIZE: usize = 64;
430#[cfg(feature = "simd")]
431use std::simd::prelude::*;
432
433#[cfg(feature = "simd")]
434use polars_utils::clmul::prefix_xorsum_inclusive;
435
436#[cfg(feature = "simd")]
437type SimdVec = u8x64;
438
439impl<'a> SplitLines<'a> {
440    pub fn new(
441        slice: &'a [u8],
442        quote_char: Option<u8>,
443        eol_char: u8,
444        comment_prefix: Option<&'a CommentPrefix>,
445    ) -> Self {
446        let quoting = quote_char.is_some();
447        let quote_char = quote_char.unwrap_or(b'\"');
448        #[cfg(feature = "simd")]
449        let simd_eol_char = SimdVec::splat(eol_char);
450        #[cfg(feature = "simd")]
451        let simd_quote_char = SimdVec::splat(quote_char);
452        Self {
453            v: slice,
454            quote_char,
455            eol_char,
456            #[cfg(feature = "simd")]
457            simd_eol_char,
458            #[cfg(feature = "simd")]
459            simd_quote_char,
460            #[cfg(feature = "simd")]
461            previous_valid_eols: 0,
462            total_index: 0,
463            quoting,
464            comment_prefix,
465        }
466    }
467}
468
469impl<'a> SplitLines<'a> {
470    // scalar as in non-simd
471    fn next_scalar(&mut self) -> Option<&'a [u8]> {
472        if self.v.is_empty() {
473            return None;
474        }
475        if is_comment_line(self.v, self.comment_prefix) {
476            return self.next_comment_line();
477        }
478        {
479            let mut pos = 0u32;
480            let mut iter = self.v.iter();
481            let mut in_field = false;
482            loop {
483                match iter.next() {
484                    Some(&c) => {
485                        pos += 1;
486
487                        if self.quoting && c == self.quote_char {
488                            // toggle between string field enclosure
489                            //      if we encounter a starting '"' -> in_field = true;
490                            //      if we encounter a closing '"' -> in_field = false;
491                            in_field = !in_field;
492                        }
493                        // if we are not in a string and we encounter '\n' we can stop at this position.
494                        else if c == self.eol_char && !in_field {
495                            break;
496                        }
497                    },
498                    None => {
499                        let remainder = self.v;
500                        self.v = &[];
501                        return Some(remainder);
502                    },
503                }
504            }
505
506            unsafe {
507                debug_assert!((pos as usize) <= self.v.len());
508
509                // return line up to this position
510                let ret = Some(
511                    self.v
512                        .get_unchecked(..(self.total_index + pos as usize - 1)),
513                );
514                // skip the '\n' token and update slice.
515                self.v = self.v.get_unchecked(self.total_index + pos as usize..);
516                ret
517            }
518        }
519    }
520    fn next_comment_line(&mut self) -> Option<&'a [u8]> {
521        if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
522            unsafe {
523                // return line up to this position
524                let ret = Some(self.v.get_unchecked(..(pos - 1)));
525                // skip the '\n' token and update slice.
526                self.v = self.v.get_unchecked(pos..);
527                ret
528            }
529        } else {
530            let remainder = self.v;
531            self.v = &[];
532            Some(remainder)
533        }
534    }
535}
536
537impl<'a> Iterator for SplitLines<'a> {
538    type Item = &'a [u8];
539
540    #[inline]
541    #[cfg(not(feature = "simd"))]
542    fn next(&mut self) -> Option<&'a [u8]> {
543        self.next_scalar()
544    }
545
546    #[inline]
547    #[cfg(feature = "simd")]
548    fn next(&mut self) -> Option<&'a [u8]> {
549        // First check cached value
550        if self.previous_valid_eols != 0 {
551            let pos = self.previous_valid_eols.trailing_zeros() as usize;
552            self.previous_valid_eols >>= (pos + 1) as u64;
553
554            unsafe {
555                debug_assert!((pos) <= self.v.len());
556
557                // return line up to this position
558                let ret = Some(self.v.get_unchecked(..pos));
559                // skip the '\n' token and update slice.
560                self.v = self.v.get_unchecked(pos + 1..);
561                return ret;
562            }
563        }
564        if self.v.is_empty() {
565            return None;
566        }
567        if self.comment_prefix.is_some() {
568            return self.next_scalar();
569        }
570
571        self.total_index = 0;
572        let mut not_in_field_previous_iter = true;
573
574        loop {
575            let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
576            if bytes.len() > SIMD_SIZE {
577                let lane: [u8; SIMD_SIZE] = unsafe {
578                    bytes
579                        .get_unchecked(0..SIMD_SIZE)
580                        .try_into()
581                        .unwrap_unchecked()
582                };
583                let simd_bytes = SimdVec::from(lane);
584                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
585
586                let valid_eols = if self.quoting {
587                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
588                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
589
590                    if not_in_field_previous_iter {
591                        not_in_quote_field = !not_in_quote_field;
592                    }
593                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
594                    eol_mask & not_in_quote_field
595                } else {
596                    eol_mask
597                };
598
599                if valid_eols != 0 {
600                    let pos = valid_eols.trailing_zeros() as usize;
601                    if pos == SIMD_SIZE - 1 {
602                        self.previous_valid_eols = 0;
603                    } else {
604                        self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
605                    }
606
607                    unsafe {
608                        let pos = self.total_index + pos;
609                        debug_assert!((pos) <= self.v.len());
610
611                        // return line up to this position
612                        let ret = Some(self.v.get_unchecked(..pos));
613                        // skip the '\n' token and update slice.
614                        self.v = self.v.get_unchecked(pos + 1..);
615                        return ret;
616                    }
617                } else {
618                    self.total_index += SIMD_SIZE;
619                }
620            } else {
621                // Denotes if we are in a string field, started with a quote
622                let mut in_field = !not_in_field_previous_iter;
623                let mut pos = 0u32;
624                let mut iter = bytes.iter();
625                loop {
626                    match iter.next() {
627                        Some(&c) => {
628                            pos += 1;
629
630                            if self.quoting && c == self.quote_char {
631                                // toggle between string field enclosure
632                                //      if we encounter a starting '"' -> in_field = true;
633                                //      if we encounter a closing '"' -> in_field = false;
634                                in_field = !in_field;
635                            }
636                            // if we are not in a string and we encounter '\n' we can stop at this position.
637                            else if c == self.eol_char && !in_field {
638                                break;
639                            }
640                        },
641                        None => {
642                            let remainder = self.v;
643                            self.v = &[];
644                            return Some(remainder);
645                        },
646                    }
647                }
648
649                unsafe {
650                    debug_assert!((pos as usize) <= self.v.len());
651
652                    // return line up to this position
653                    let ret = Some(
654                        self.v
655                            .get_unchecked(..(self.total_index + pos as usize - 1)),
656                    );
657                    // skip the '\n' token and update slice.
658                    self.v = self.v.get_unchecked(self.total_index + pos as usize..);
659                    return ret;
660                }
661            }
662        }
663    }
664}
665
666pub struct CountLines {
667    quote_char: u8,
668    eol_char: u8,
669    #[cfg(feature = "simd")]
670    simd_eol_char: SimdVec,
671    #[cfg(feature = "simd")]
672    simd_quote_char: SimdVec,
673    quoting: bool,
674    comment_prefix: Option<CommentPrefix>,
675}
676
677#[derive(Copy, Clone, Debug, Default)]
678pub struct LineStats {
679    pub newline_count: usize,
680    pub last_newline_offset: usize,
681    pub end_inside_string: bool,
682}
683
684impl CountLines {
685    pub fn new(
686        quote_char: Option<u8>,
687        eol_char: u8,
688        comment_prefix: Option<CommentPrefix>,
689    ) -> Self {
690        let quoting = quote_char.is_some();
691        let quote_char = quote_char.unwrap_or(b'\"');
692        #[cfg(feature = "simd")]
693        let simd_eol_char = SimdVec::splat(eol_char);
694        #[cfg(feature = "simd")]
695        let simd_quote_char = SimdVec::splat(quote_char);
696        Self {
697            quote_char,
698            eol_char,
699            #[cfg(feature = "simd")]
700            simd_eol_char,
701            #[cfg(feature = "simd")]
702            simd_quote_char,
703            quoting,
704            comment_prefix,
705        }
706    }
707
708    /// Analyzes a chunk of CSV data.
709    ///
710    /// Returns (newline_count, last_newline_offset, end_inside_string) twice,
711    /// the first is assuming the start of the chunk is *not* inside a string,
712    /// the second assuming the start is inside a string.
713    ///
714    /// If comment_prefix is not None the start of bytes must be at the start of
715    /// a line (and thus not in the middle of a comment).
716    pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
717        let mut states = [
718            LineStats {
719                newline_count: 0,
720                last_newline_offset: 0,
721                end_inside_string: false,
722            },
723            LineStats {
724                newline_count: 0,
725                last_newline_offset: 0,
726                end_inside_string: false,
727            },
728        ];
729
730        // If we have to deal with comments we can't use SIMD and have to explicitly do two passes.
731        if self.comment_prefix.is_some() {
732            states[0] = self.analyze_chunk_with_comment(bytes, false);
733            states[1] = self.analyze_chunk_with_comment(bytes, true);
734            return states;
735        }
736
737        // False if even number of quotes seen so far, true otherwise.
738        #[allow(unused_assignments)]
739        let mut global_quote_parity = false;
740        let mut scan_offset = 0;
741
742        #[cfg(feature = "simd")]
743        {
744            // 0 if even number of quotes seen so far, u64::MAX otherwise.
745            let mut global_quote_parity_mask = 0;
746            while scan_offset + 64 <= bytes.len() {
747                let block: [u8; 64] = unsafe {
748                    bytes
749                        .get_unchecked(scan_offset..scan_offset + 64)
750                        .try_into()
751                        .unwrap_unchecked()
752                };
753                let simd_bytes = SimdVec::from(block);
754                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
755                if self.quoting {
756                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
757                    let quote_parity =
758                        prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
759                    global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
760
761                    let start_outside_string_eol_mask = eol_mask & !quote_parity;
762                    states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
763                    states[0].last_newline_offset = select_unpredictable(
764                        start_outside_string_eol_mask != 0,
765                        (scan_offset + 63)
766                            .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
767                        states[0].last_newline_offset,
768                    );
769
770                    let start_inside_string_eol_mask = eol_mask & quote_parity;
771                    states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
772                    states[1].last_newline_offset = select_unpredictable(
773                        start_inside_string_eol_mask != 0,
774                        (scan_offset + 63)
775                            .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
776                        states[1].last_newline_offset,
777                    );
778                } else {
779                    states[0].newline_count += eol_mask.count_ones() as usize;
780                    states[0].last_newline_offset = select_unpredictable(
781                        eol_mask != 0,
782                        (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
783                        states[0].last_newline_offset,
784                    );
785                }
786
787                scan_offset += 64;
788            }
789
790            global_quote_parity = global_quote_parity_mask > 0;
791        }
792
793        while scan_offset < bytes.len() {
794            let c = unsafe { *bytes.get_unchecked(scan_offset) };
795            global_quote_parity ^= (c == self.quote_char) & self.quoting;
796
797            let state = &mut states[global_quote_parity as usize];
798            state.newline_count += (c == self.eol_char) as usize;
799            state.last_newline_offset =
800                select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
801
802            scan_offset += 1;
803        }
804
805        states[0].end_inside_string = global_quote_parity;
806        states[1].end_inside_string = !global_quote_parity;
807        states
808    }
809
810    // bytes must begin at the start of a line.
811    fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
812        let pre_s = match self.comment_prefix.as_ref().unwrap() {
813            CommentPrefix::Single(pc) => core::slice::from_ref(pc),
814            CommentPrefix::Multi(ps) => ps.as_bytes(),
815        };
816
817        let mut state = LineStats::default();
818        let mut scan_offset = 0;
819        while scan_offset < bytes.len() {
820            // Skip comment line if needed.
821            while bytes[scan_offset..].starts_with(pre_s) {
822                scan_offset += pre_s.len();
823                let Some(nl_off) = bytes[scan_offset..]
824                    .iter()
825                    .position(|c| *c == self.eol_char)
826                else {
827                    break;
828                };
829                scan_offset += nl_off + 1;
830            }
831
832            while scan_offset < bytes.len() {
833                let c = unsafe { *bytes.get_unchecked(scan_offset) };
834                in_string ^= (c == self.quote_char) & self.quoting;
835
836                if c == self.eol_char && !in_string {
837                    state.newline_count += 1;
838                    state.last_newline_offset = scan_offset;
839                    scan_offset += 1;
840                    break;
841                } else {
842                    scan_offset += 1;
843                }
844            }
845        }
846
847        state.end_inside_string = in_string;
848        state
849    }
850
851    pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
852        loop {
853            let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
854
855            let (count, offset) = if self.comment_prefix.is_some() {
856                let stats = self.analyze_chunk_with_comment(b, false);
857                (stats.newline_count, stats.last_newline_offset)
858            } else {
859                self.count(b)
860            };
861
862            if count > 0 || b.len() == bytes.len() {
863                return (count, offset);
864            }
865
866            *chunk_size = chunk_size.saturating_mul(2);
867        }
868    }
869
870    pub fn count_rows(&self, bytes: &[u8], is_eof: bool) -> (usize, usize) {
871        let stats = if self.comment_prefix.is_some() {
872            self.analyze_chunk_with_comment(bytes, false)
873        } else {
874            self.analyze_chunk(bytes)[0]
875        };
876
877        let mut count = stats.newline_count;
878        let mut offset = stats.last_newline_offset;
879
880        if count > 0 {
881            offset = cmp::min(offset + 1, bytes.len());
882        } else {
883            debug_assert!(offset == 0);
884        }
885
886        if is_eof {
887            count += ends_in_unterminated_row(bytes, self.eol_char, self.comment_prefix.as_ref())
888                as usize;
889            offset = bytes.len();
890        }
891
892        (count, offset)
893    }
894
895    /// Returns count and offset to split for remainder in slice.
896    #[cfg(feature = "simd")]
897    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
898        let mut total_idx = 0;
899        let original_bytes = bytes;
900        let mut count = 0;
901        let mut position = 0;
902        let mut not_in_field_previous_iter = true;
903
904        loop {
905            let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
906
907            if bytes.len() > SIMD_SIZE {
908                let lane: [u8; SIMD_SIZE] = unsafe {
909                    bytes
910                        .get_unchecked(0..SIMD_SIZE)
911                        .try_into()
912                        .unwrap_unchecked()
913                };
914                let simd_bytes = SimdVec::from(lane);
915                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
916
917                let valid_eols = if self.quoting {
918                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
919                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
920
921                    if not_in_field_previous_iter {
922                        not_in_quote_field = !not_in_quote_field;
923                    }
924                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
925                    eol_mask & not_in_quote_field
926                } else {
927                    eol_mask
928                };
929
930                if valid_eols != 0 {
931                    count += valid_eols.count_ones() as usize;
932                    position = total_idx + 63 - valid_eols.leading_zeros() as usize;
933                    debug_assert_eq!(original_bytes[position], self.eol_char)
934                }
935                total_idx += SIMD_SIZE;
936            } else if bytes.is_empty() {
937                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
938                return (count, position);
939            } else {
940                let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
941
942                let (count, position) = if c > 0 {
943                    (count + c, total_idx + o)
944                } else {
945                    (count, position)
946                };
947                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
948
949                return (count, position);
950            }
951        }
952    }
953
954    #[cfg(not(feature = "simd"))]
955    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
956        self.count_no_simd(bytes, false)
957    }
958
959    fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
960        let iter = bytes.iter();
961        let mut in_field = in_field;
962        let mut count = 0;
963        let mut position = 0;
964
965        for b in iter {
966            let c = *b;
967            if self.quoting && c == self.quote_char {
968                // toggle between string field enclosure
969                //      if we encounter a starting '"' -> in_field = true;
970                //      if we encounter a closing '"' -> in_field = false;
971                in_field = !in_field;
972            }
973            // If we are not in a string and we encounter '\n' we can stop at this position.
974            else if c == self.eol_char && !in_field {
975                position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
976                count += 1;
977            }
978        }
979        debug_assert!(count == 0 || bytes[position] == self.eol_char);
980
981        (count, position)
982    }
983}
984
985fn ends_in_unterminated_row(
986    bytes: &[u8],
987    eol_char: u8,
988    comment_prefix: Option<&CommentPrefix>,
989) -> bool {
990    if !bytes.is_empty() && bytes.last().copied().unwrap() != eol_char {
991        // We can do a simple backwards-scan to find the start of last line if it is a
992        // comment line, since comment lines can't escape new-lines.
993        let last_new_line_post = memchr::memrchr(eol_char, bytes).unwrap_or(0);
994        let last_line_is_comment_line = bytes
995            .get(last_new_line_post + 1..)
996            .map(|line| is_comment_line(line, comment_prefix))
997            .unwrap_or(false);
998
999        return !last_line_is_comment_line;
1000    }
1001
1002    false
1003}
1004
1005#[inline]
1006fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
1007    let mut in_field = false;
1008
1009    let mut idx = 0u32;
1010    // micro optimizations
1011    #[allow(clippy::explicit_counter_loop)]
1012    for &c in bytes.iter() {
1013        if c == quote_char {
1014            // toggle between string field enclosure
1015            //      if we encounter a starting '"' -> in_field = true;
1016            //      if we encounter a closing '"' -> in_field = false;
1017            in_field = !in_field;
1018        }
1019
1020        if !in_field && c == needle {
1021            return Some(idx as usize);
1022        }
1023        idx += 1;
1024    }
1025    None
1026}
1027
1028#[inline]
1029pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
1030    let pos = match quote {
1031        Some(quote) => find_quoted(bytes, quote, eol_char),
1032        None => bytes.iter().position(|x| *x == eol_char),
1033    };
1034    match pos {
1035        None => &[],
1036        Some(pos) => &bytes[pos + 1..],
1037    }
1038}
1039
1040#[inline]
1041pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
1042    if let Some(pos) = next_line_position_naive(input, eol_char) {
1043        unsafe { input.get_unchecked(pos..) }
1044    } else {
1045        &[]
1046    }
1047}
1048
1049/// Parse CSV.
1050///
1051/// # Arguments
1052/// * `bytes` - input to parse
1053/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
1054///   thread has a different offset.
1055/// * `projection` - Indices of the columns to project.
1056/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
1057///   fields are written to the buffers. The UTF8 data will be parsed later.
1058///
1059/// Returns the number of bytes parsed successfully.
1060#[allow(clippy::too_many_arguments)]
1061pub(super) fn parse_lines(
1062    mut bytes: &[u8],
1063    parse_options: &CsvParseOptions,
1064    offset: usize,
1065    ignore_errors: bool,
1066    null_values: Option<&NullValuesCompiled>,
1067    projection: &[usize],
1068    buffers: &mut [Builder],
1069    n_lines: usize,
1070    // length of original schema
1071    schema_len: usize,
1072    schema: &Schema,
1073) -> PolarsResult<usize> {
1074    assert!(
1075        !projection.is_empty(),
1076        "at least one column should be projected"
1077    );
1078    let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
1079    // During projection pushdown we are not checking other csv fields.
1080    // This would be very expensive and we don't care as we only want
1081    // the projected columns.
1082    if projection.len() != schema_len {
1083        truncate_ragged_lines = true
1084    }
1085
1086    // we use the pointers to track the no of bytes read.
1087    let start = bytes.as_ptr() as usize;
1088    let original_bytes_len = bytes.len();
1089    let n_lines = n_lines as u32;
1090
1091    let mut line_count = 0u32;
1092    loop {
1093        if line_count > n_lines {
1094            let end = bytes.as_ptr() as usize;
1095            return Ok(end - start);
1096        }
1097
1098        if bytes.is_empty() {
1099            return Ok(original_bytes_len);
1100        } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1101            // deal with comments
1102            let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1103            bytes = bytes_rem;
1104            continue;
1105        }
1106
1107        // Every line we only need to parse the columns that are projected.
1108        // Therefore we check if the idx of the field is in our projected columns.
1109        // If it is not, we skip the field.
1110        let mut projection_iter = projection.iter().copied();
1111        let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1112        let mut processed_fields = 0;
1113
1114        let mut iter = SplitFields::new(
1115            bytes,
1116            parse_options.separator,
1117            parse_options.quote_char,
1118            parse_options.eol_char,
1119        );
1120        let mut idx = 0u32;
1121        let mut read_sol = 0;
1122        loop {
1123            match iter.next() {
1124                // end of line
1125                None => {
1126                    bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1127                    break;
1128                },
1129                Some((mut field, needs_escaping)) => {
1130                    let field_len = field.len();
1131
1132                    // +1 is the split character that is consumed by the iterator.
1133                    read_sol += field_len + 1;
1134
1135                    if idx == next_projected as u32 {
1136                        // the iterator is finished when it encounters a `\n`
1137                        // this could be preceded by a '\r'
1138                        unsafe {
1139                            if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1140                                field = field.get_unchecked(..field_len - 1);
1141                            }
1142                        }
1143
1144                        debug_assert!(processed_fields < buffers.len());
1145                        let buf = unsafe {
1146                            // SAFETY: processed fields index can never exceed the projection indices.
1147                            buffers.get_unchecked_mut(processed_fields)
1148                        };
1149                        let mut add_null = false;
1150
1151                        // if we have null values argument, check if this field equal null value
1152                        if let Some(null_values) = null_values {
1153                            let field = if needs_escaping && !field.is_empty() {
1154                                unsafe { field.get_unchecked(1..field.len() - 1) }
1155                            } else {
1156                                field
1157                            };
1158
1159                            // SAFETY:
1160                            // process fields is in bounds
1161                            add_null = unsafe { null_values.is_null(field, idx as usize) }
1162                        }
1163                        if add_null {
1164                            buf.add_null(!parse_options.missing_is_null && field.is_empty())
1165                        } else {
1166                            buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1167                                .map_err(|e| {
1168                                    let bytes_offset = offset + field.as_ptr() as usize - start;
1169                                    let unparsable = String::from_utf8_lossy(field);
1170                                    let column_name = schema.get_at_index(idx as usize).unwrap().0;
1171                                    polars_err!(
1172                                        ComputeError:
1173                                        "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1174                                        The current offset in the file is {} bytes.\n\
1175                                        \n\
1176                                        You might want to try:\n\
1177                                        - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1178                                        - specifying correct dtype with the `schema_overrides` argument\n\
1179                                        - setting `ignore_errors` to `True`,\n\
1180                                        - adding `{}` to the `null_values` list.\n\n\
1181                                        Original error: ```{}```",
1182                                        &unparsable,
1183                                        buf.dtype(),
1184                                        column_name,
1185                                        idx + 1,
1186                                        bytes_offset,
1187                                        &unparsable,
1188                                        e
1189                                    )
1190                                })?;
1191                        }
1192                        processed_fields += 1;
1193
1194                        // if we have all projected columns we are done with this line
1195                        match projection_iter.next() {
1196                            Some(p) => next_projected = p,
1197                            None => {
1198                                if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1199                                    bytes = unsafe { bytes.get_unchecked(read_sol..) };
1200                                } else {
1201                                    if !truncate_ragged_lines && read_sol < bytes.len() {
1202                                        polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1203
1204Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1205                                    }
1206                                    let bytes_rem = skip_this_line(
1207                                        unsafe { bytes.get_unchecked(read_sol - 1..) },
1208                                        parse_options.quote_char,
1209                                        parse_options.eol_char,
1210                                    );
1211                                    bytes = bytes_rem;
1212                                }
1213                                break;
1214                            },
1215                        }
1216                    }
1217                    idx += 1;
1218                },
1219            }
1220        }
1221
1222        // there can be lines that miss fields (also the comma values)
1223        // this means the splitter won't process them.
1224        // We traverse them to read them as null values.
1225        while processed_fields < projection.len() {
1226            debug_assert!(processed_fields < buffers.len());
1227            let buf = unsafe {
1228                // SAFETY: processed fields index can never exceed the projection indices.
1229                buffers.get_unchecked_mut(processed_fields)
1230            };
1231            buf.add_null(!parse_options.missing_is_null);
1232            processed_fields += 1;
1233        }
1234        line_count += 1;
1235    }
1236}
1237
1238#[cfg(test)]
1239mod test {
1240    use super::SplitLines;
1241
1242    #[test]
1243    fn test_splitlines() {
1244        let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1245        let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1246        assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1247        assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1248        assert_eq!(lines.next(), None);
1249
1250        let input2 = "1,'foo\n'\n2,'foo\n'\n";
1251        let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1252        assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1253        assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1254        assert_eq!(lines2.next(), None);
1255    }
1256}