polars_io/csv/read/
parser.rs

1use std::cmp;
2
3use memchr::memchr2_iter;
4use polars_core::prelude::*;
5use polars_core::{POOL, config};
6use polars_error::feature_gated;
7use polars_utils::mmap::{MMapSemaphore, MemSlice};
8use polars_utils::plpath::PlPathRef;
9use polars_utils::select::select_unpredictable;
10use rayon::prelude::*;
11
12use super::CsvParseOptions;
13use super::buffer::Buffer;
14use super::options::{CommentPrefix, NullValuesCompiled};
15use super::splitfields::SplitFields;
16use crate::csv::read::read_until_start_and_infer_schema;
17use crate::prelude::CsvReadOptions;
18use crate::utils::compression::CompressedReader;
19
20/// Read the number of rows without parsing columns
21/// useful for count(*) queries
22#[allow(clippy::too_many_arguments)]
23pub fn count_rows(
24    addr: PlPathRef<'_>,
25    quote_char: Option<u8>,
26    comment_prefix: Option<&CommentPrefix>,
27    eol_char: u8,
28    has_header: bool,
29    skip_lines: usize,
30    skip_rows_before_header: usize,
31    skip_rows_after_header: usize,
32) -> PolarsResult<usize> {
33    let file = match addr
34        .as_local_path()
35        .and_then(|v| (!config::force_async()).then_some(v))
36    {
37        None => feature_gated!("cloud", {
38            crate::file_cache::FILE_CACHE
39                .get_entry(addr)
40                // Safety: This was initialized by schema inference.
41                .unwrap()
42                .try_open_assume_latest()?
43        }),
44        Some(path) => polars_utils::open_file(path)?,
45    };
46
47    let mmap = MMapSemaphore::new_from_file(&file).unwrap();
48
49    count_rows_from_slice_par(
50        MemSlice::from_mmap(Arc::new(mmap)),
51        quote_char,
52        comment_prefix,
53        eol_char,
54        has_header,
55        skip_lines,
56        skip_rows_before_header,
57        skip_rows_after_header,
58    )
59}
60
61/// Read the number of rows without parsing columns
62/// useful for count(*) queries
63#[allow(clippy::too_many_arguments)]
64pub fn count_rows_from_slice_par(
65    mem_slice: MemSlice,
66    quote_char: Option<u8>,
67    comment_prefix: Option<&CommentPrefix>,
68    eol_char: u8,
69    has_header: bool,
70    skip_lines: usize,
71    skip_rows_before_header: usize,
72    skip_rows_after_header: usize,
73) -> PolarsResult<usize> {
74    // TODO: Improve default scan_csv path to match count perf or implement streaming count.
75    let mut reader = CompressedReader::try_new(mem_slice)?;
76
77    let reader_options = CsvReadOptions {
78        parse_options: Arc::new(CsvParseOptions {
79            quote_char,
80            comment_prefix: comment_prefix.cloned(),
81            eol_char,
82            ..Default::default()
83        }),
84        has_header,
85        skip_lines,
86        skip_rows: skip_rows_before_header,
87        skip_rows_after_header,
88        ..Default::default()
89    };
90
91    let (_, leftover) =
92        read_until_start_and_infer_schema(&reader_options, None, None, &mut reader)?;
93
94    // For the non-compressed case this is a zero-copy op.
95    let (bytes, _) = reader.read_next_slice(&leftover, usize::MAX)?;
96
97    #[cfg(debug_assertions)]
98    const BYTES_PER_CHUNK: usize = 128;
99    #[cfg(not(debug_assertions))]
100    const BYTES_PER_CHUNK: usize = 1 << 16;
101
102    let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
103    POOL.install(|| {
104        let mut states = Vec::new();
105        if comment_prefix.is_none() {
106            bytes
107                .par_chunks(BYTES_PER_CHUNK)
108                .map(|chunk| count.analyze_chunk(chunk))
109                .collect_into_vec(&mut states);
110        } else {
111            let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
112            (0..num_chunks)
113                .into_par_iter()
114                .map(|chunk_idx| {
115                    let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
116                    let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
117
118                    if start_offset != 0 {
119                        // Ensure we start at the start of a line.
120                        if let Some(nl_off) = bytes[start_offset..next_start_offset]
121                            .iter()
122                            .position(|b| *b == eol_char)
123                        {
124                            start_offset += nl_off + 1;
125                        } else {
126                            return count.analyze_chunk(&[]);
127                        }
128                    }
129
130                    let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
131                        .iter()
132                        .position(|b| *b == eol_char)
133                    {
134                        next_start_offset + nl_off + 1
135                    } else {
136                        bytes.len()
137                    };
138
139                    count.analyze_chunk(&bytes[start_offset..stop_offset])
140                })
141                .collect_into_vec(&mut states);
142        }
143
144        let mut n = 0;
145        let mut in_string = false;
146        for pair in states {
147            n += pair[in_string as usize].newline_count;
148            in_string = pair[in_string as usize].end_inside_string;
149        }
150        if let Some(last) = bytes.last()
151            && *last != eol_char
152            && (comment_prefix.is_none()
153                || !is_comment_line(
154                    bytes.rsplit(|c| *c == eol_char).next().unwrap(),
155                    comment_prefix,
156                ))
157        {
158            n += 1
159        }
160
161        Ok(n)
162    })
163}
164
165/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
166///
167/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
168#[inline]
169pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
170    match comment_prefix {
171        Some(CommentPrefix::Single(c)) => line.first() == Some(c),
172        Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
173        None => false,
174    }
175}
176
177/// Find the nearest next line position.
178/// Does not check for new line characters embedded in String fields.
179pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
180    let pos = memchr::memchr(eol_char, input)? + 1;
181    if input.len() - pos == 0 {
182        return None;
183    }
184    Some(pos)
185}
186
187/// Find the nearest next line position that is not embedded in a String field.
188pub(super) fn next_line_position(
189    mut input: &[u8],
190    mut expected_fields: Option<usize>,
191    separator: u8,
192    quote_char: Option<u8>,
193    eol_char: u8,
194) -> Option<usize> {
195    fn accept_line(
196        line: &[u8],
197        expected_fields: usize,
198        separator: u8,
199        eol_char: u8,
200        quote_char: Option<u8>,
201    ) -> bool {
202        let mut count = 0usize;
203        for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
204            if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
205                return false;
206            }
207            count += 1;
208        }
209
210        // if the latest field is missing
211        // e.g.:
212        // a,b,c
213        // vala,valb,
214        // SplitFields returns a count that is 1 less
215        // There fore we accept:
216        // expected == count
217        // and
218        // expected == count - 1
219        expected_fields.wrapping_sub(count) <= 1
220    }
221
222    // we check 3 subsequent lines for `accept_line` before we accept
223    // if 3 groups are rejected we reject completely
224    let mut rejected_line_groups = 0u8;
225
226    let mut total_pos = 0;
227    if input.is_empty() {
228        return None;
229    }
230    let mut lines_checked = 0u8;
231    loop {
232        if rejected_line_groups >= 3 {
233            return None;
234        }
235        lines_checked = lines_checked.wrapping_add(1);
236        // headers might have an extra value
237        // So if we have churned through enough lines
238        // we try one field less.
239        if lines_checked == u8::MAX {
240            if let Some(ef) = expected_fields {
241                expected_fields = Some(ef.saturating_sub(1))
242            }
243        };
244        let pos = memchr::memchr(eol_char, input)? + 1;
245        if input.len() - pos == 0 {
246            return None;
247        }
248        debug_assert!(pos <= input.len());
249        let new_input = unsafe { input.get_unchecked(pos..) };
250        let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
251        let line = lines.next();
252
253        match (line, expected_fields) {
254            // count the fields, and determine if they are equal to what we expect from the schema
255            (Some(line), Some(expected_fields)) => {
256                if accept_line(line, expected_fields, separator, eol_char, quote_char) {
257                    let mut valid = true;
258                    for line in lines.take(2) {
259                        if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
260                            valid = false;
261                            break;
262                        }
263                    }
264                    if valid {
265                        return Some(total_pos + pos);
266                    } else {
267                        rejected_line_groups += 1;
268                    }
269                } else {
270                    debug_assert!(pos < input.len());
271                    unsafe {
272                        input = input.get_unchecked(pos + 1..);
273                    }
274                    total_pos += pos + 1;
275                }
276            },
277            // don't count the fields
278            (Some(_), None) => return Some(total_pos + pos),
279            // // no new line found, check latest line (without eol) for number of fields
280            _ => return None,
281        }
282    }
283}
284
285#[inline(always)]
286pub(super) fn is_whitespace(b: u8) -> bool {
287    b == b' ' || b == b'\t'
288}
289
290/// May have false-positives, but not false negatives.
291#[inline(always)]
292pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
293    // We're interested in \t (ASCII 9) and " " (ASCII 32), both of which are
294    // <= 32. In that range there aren't a lot of other common symbols (besides
295    // newline), so this is a quick test which can be worth doing to avoid the
296    // exact test.
297    b <= 32
298}
299
300#[inline]
301fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
302where
303    F: Fn(u8) -> bool,
304{
305    if input.is_empty() {
306        return input;
307    }
308
309    let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
310    &input[read..]
311}
312
313/// Remove whitespace from the start of buffer.
314/// Makes sure that the bytes stream starts with
315///     'field_1,field_2'
316/// and not with
317///     '\nfield_1,field_1'
318#[inline]
319pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
320    skip_condition(input, is_whitespace)
321}
322
323/// An adapted version of std::iter::Split.
324/// This exists solely because we cannot split the file in lines naively as
325///
326/// ```text
327///    for line in bytes.split(b'\n') {
328/// ```
329///
330/// This will fail when strings fields are have embedded end line characters.
331/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
332pub struct SplitLines<'a> {
333    v: &'a [u8],
334    quote_char: u8,
335    eol_char: u8,
336    #[cfg(feature = "simd")]
337    simd_eol_char: SimdVec,
338    #[cfg(feature = "simd")]
339    simd_quote_char: SimdVec,
340    #[cfg(feature = "simd")]
341    previous_valid_eols: u64,
342    total_index: usize,
343    quoting: bool,
344    comment_prefix: Option<&'a CommentPrefix>,
345}
346
347#[cfg(feature = "simd")]
348const SIMD_SIZE: usize = 64;
349#[cfg(feature = "simd")]
350use std::simd::prelude::*;
351
352#[cfg(feature = "simd")]
353use polars_utils::clmul::prefix_xorsum_inclusive;
354
355#[cfg(feature = "simd")]
356type SimdVec = u8x64;
357
358impl<'a> SplitLines<'a> {
359    pub fn new(
360        slice: &'a [u8],
361        quote_char: Option<u8>,
362        eol_char: u8,
363        comment_prefix: Option<&'a CommentPrefix>,
364    ) -> Self {
365        let quoting = quote_char.is_some();
366        let quote_char = quote_char.unwrap_or(b'\"');
367        #[cfg(feature = "simd")]
368        let simd_eol_char = SimdVec::splat(eol_char);
369        #[cfg(feature = "simd")]
370        let simd_quote_char = SimdVec::splat(quote_char);
371        Self {
372            v: slice,
373            quote_char,
374            eol_char,
375            #[cfg(feature = "simd")]
376            simd_eol_char,
377            #[cfg(feature = "simd")]
378            simd_quote_char,
379            #[cfg(feature = "simd")]
380            previous_valid_eols: 0,
381            total_index: 0,
382            quoting,
383            comment_prefix,
384        }
385    }
386}
387
388impl<'a> SplitLines<'a> {
389    // scalar as in non-simd
390    fn next_scalar(&mut self) -> Option<&'a [u8]> {
391        if self.v.is_empty() {
392            return None;
393        }
394        if is_comment_line(self.v, self.comment_prefix) {
395            return self.next_comment_line();
396        }
397        {
398            let mut pos = 0u32;
399            let mut iter = self.v.iter();
400            let mut in_field = false;
401            loop {
402                match iter.next() {
403                    Some(&c) => {
404                        pos += 1;
405
406                        if self.quoting && c == self.quote_char {
407                            // toggle between string field enclosure
408                            //      if we encounter a starting '"' -> in_field = true;
409                            //      if we encounter a closing '"' -> in_field = false;
410                            in_field = !in_field;
411                        }
412                        // if we are not in a string and we encounter '\n' we can stop at this position.
413                        else if c == self.eol_char && !in_field {
414                            break;
415                        }
416                    },
417                    None => {
418                        let remainder = self.v;
419                        self.v = &[];
420                        return Some(remainder);
421                    },
422                }
423            }
424
425            unsafe {
426                debug_assert!((pos as usize) <= self.v.len());
427
428                // return line up to this position
429                let ret = Some(
430                    self.v
431                        .get_unchecked(..(self.total_index + pos as usize - 1)),
432                );
433                // skip the '\n' token and update slice.
434                self.v = self.v.get_unchecked(self.total_index + pos as usize..);
435                ret
436            }
437        }
438    }
439    fn next_comment_line(&mut self) -> Option<&'a [u8]> {
440        if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
441            unsafe {
442                // return line up to this position
443                let ret = Some(self.v.get_unchecked(..(pos - 1)));
444                // skip the '\n' token and update slice.
445                self.v = self.v.get_unchecked(pos..);
446                ret
447            }
448        } else {
449            let remainder = self.v;
450            self.v = &[];
451            Some(remainder)
452        }
453    }
454}
455
456impl<'a> Iterator for SplitLines<'a> {
457    type Item = &'a [u8];
458
459    #[inline]
460    #[cfg(not(feature = "simd"))]
461    fn next(&mut self) -> Option<&'a [u8]> {
462        self.next_scalar()
463    }
464
465    #[inline]
466    #[cfg(feature = "simd")]
467    fn next(&mut self) -> Option<&'a [u8]> {
468        // First check cached value
469        if self.previous_valid_eols != 0 {
470            let pos = self.previous_valid_eols.trailing_zeros() as usize;
471            self.previous_valid_eols >>= (pos + 1) as u64;
472
473            unsafe {
474                debug_assert!((pos) <= self.v.len());
475
476                // return line up to this position
477                let ret = Some(self.v.get_unchecked(..pos));
478                // skip the '\n' token and update slice.
479                self.v = self.v.get_unchecked(pos + 1..);
480                return ret;
481            }
482        }
483        if self.v.is_empty() {
484            return None;
485        }
486        if self.comment_prefix.is_some() {
487            return self.next_scalar();
488        }
489
490        self.total_index = 0;
491        let mut not_in_field_previous_iter = true;
492
493        loop {
494            let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
495            if bytes.len() > SIMD_SIZE {
496                let lane: [u8; SIMD_SIZE] = unsafe {
497                    bytes
498                        .get_unchecked(0..SIMD_SIZE)
499                        .try_into()
500                        .unwrap_unchecked()
501                };
502                let simd_bytes = SimdVec::from(lane);
503                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
504
505                let valid_eols = if self.quoting {
506                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
507                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
508
509                    if not_in_field_previous_iter {
510                        not_in_quote_field = !not_in_quote_field;
511                    }
512                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
513                    eol_mask & not_in_quote_field
514                } else {
515                    eol_mask
516                };
517
518                if valid_eols != 0 {
519                    let pos = valid_eols.trailing_zeros() as usize;
520                    if pos == SIMD_SIZE - 1 {
521                        self.previous_valid_eols = 0;
522                    } else {
523                        self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
524                    }
525
526                    unsafe {
527                        let pos = self.total_index + pos;
528                        debug_assert!((pos) <= self.v.len());
529
530                        // return line up to this position
531                        let ret = Some(self.v.get_unchecked(..pos));
532                        // skip the '\n' token and update slice.
533                        self.v = self.v.get_unchecked(pos + 1..);
534                        return ret;
535                    }
536                } else {
537                    self.total_index += SIMD_SIZE;
538                }
539            } else {
540                // Denotes if we are in a string field, started with a quote
541                let mut in_field = !not_in_field_previous_iter;
542                let mut pos = 0u32;
543                let mut iter = bytes.iter();
544                loop {
545                    match iter.next() {
546                        Some(&c) => {
547                            pos += 1;
548
549                            if self.quoting && c == self.quote_char {
550                                // toggle between string field enclosure
551                                //      if we encounter a starting '"' -> in_field = true;
552                                //      if we encounter a closing '"' -> in_field = false;
553                                in_field = !in_field;
554                            }
555                            // if we are not in a string and we encounter '\n' we can stop at this position.
556                            else if c == self.eol_char && !in_field {
557                                break;
558                            }
559                        },
560                        None => {
561                            let remainder = self.v;
562                            self.v = &[];
563                            return Some(remainder);
564                        },
565                    }
566                }
567
568                unsafe {
569                    debug_assert!((pos as usize) <= self.v.len());
570
571                    // return line up to this position
572                    let ret = Some(
573                        self.v
574                            .get_unchecked(..(self.total_index + pos as usize - 1)),
575                    );
576                    // skip the '\n' token and update slice.
577                    self.v = self.v.get_unchecked(self.total_index + pos as usize..);
578                    return ret;
579                }
580            }
581        }
582    }
583}
584
585pub struct CountLines {
586    quote_char: u8,
587    eol_char: u8,
588    #[cfg(feature = "simd")]
589    simd_eol_char: SimdVec,
590    #[cfg(feature = "simd")]
591    simd_quote_char: SimdVec,
592    quoting: bool,
593    comment_prefix: Option<CommentPrefix>,
594}
595
596#[derive(Copy, Clone, Debug, Default)]
597pub struct LineStats {
598    pub newline_count: usize,
599    pub last_newline_offset: usize,
600    pub end_inside_string: bool,
601}
602
603impl CountLines {
604    pub fn new(
605        quote_char: Option<u8>,
606        eol_char: u8,
607        comment_prefix: Option<CommentPrefix>,
608    ) -> Self {
609        let quoting = quote_char.is_some();
610        let quote_char = quote_char.unwrap_or(b'\"');
611        #[cfg(feature = "simd")]
612        let simd_eol_char = SimdVec::splat(eol_char);
613        #[cfg(feature = "simd")]
614        let simd_quote_char = SimdVec::splat(quote_char);
615        Self {
616            quote_char,
617            eol_char,
618            #[cfg(feature = "simd")]
619            simd_eol_char,
620            #[cfg(feature = "simd")]
621            simd_quote_char,
622            quoting,
623            comment_prefix,
624        }
625    }
626
627    /// Analyzes a chunk of CSV data.
628    ///
629    /// Returns (newline_count, last_newline_offset, end_inside_string) twice,
630    /// the first is assuming the start of the chunk is *not* inside a string,
631    /// the second assuming the start is inside a string.
632    ///
633    /// If comment_prefix is not None the start of bytes must be at the start of
634    /// a line (and thus not in the middle of a comment).
635    pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
636        let mut states = [
637            LineStats {
638                newline_count: 0,
639                last_newline_offset: 0,
640                end_inside_string: false,
641            },
642            LineStats {
643                newline_count: 0,
644                last_newline_offset: 0,
645                end_inside_string: false,
646            },
647        ];
648
649        // If we have to deal with comments we can't use SIMD and have to explicitly do two passes.
650        if self.comment_prefix.is_some() {
651            states[0] = self.analyze_chunk_with_comment(bytes, false);
652            states[1] = self.analyze_chunk_with_comment(bytes, true);
653            return states;
654        }
655
656        // False if even number of quotes seen so far, true otherwise.
657        #[allow(unused_assignments)]
658        let mut global_quote_parity = false;
659        let mut scan_offset = 0;
660
661        #[cfg(feature = "simd")]
662        {
663            // 0 if even number of quotes seen so far, u64::MAX otherwise.
664            let mut global_quote_parity_mask = 0;
665            while scan_offset + 64 <= bytes.len() {
666                let block: [u8; 64] = unsafe {
667                    bytes
668                        .get_unchecked(scan_offset..scan_offset + 64)
669                        .try_into()
670                        .unwrap_unchecked()
671                };
672                let simd_bytes = SimdVec::from(block);
673                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
674                if self.quoting {
675                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
676                    let quote_parity =
677                        prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
678                    global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
679
680                    let start_outside_string_eol_mask = eol_mask & !quote_parity;
681                    states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
682                    states[0].last_newline_offset = select_unpredictable(
683                        start_outside_string_eol_mask != 0,
684                        (scan_offset + 63)
685                            .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
686                        states[0].last_newline_offset,
687                    );
688
689                    let start_inside_string_eol_mask = eol_mask & quote_parity;
690                    states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
691                    states[1].last_newline_offset = select_unpredictable(
692                        start_inside_string_eol_mask != 0,
693                        (scan_offset + 63)
694                            .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
695                        states[1].last_newline_offset,
696                    );
697                } else {
698                    states[0].newline_count += eol_mask.count_ones() as usize;
699                    states[0].last_newline_offset = select_unpredictable(
700                        eol_mask != 0,
701                        (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
702                        states[0].last_newline_offset,
703                    );
704                }
705
706                scan_offset += 64;
707            }
708
709            global_quote_parity = global_quote_parity_mask > 0;
710        }
711
712        while scan_offset < bytes.len() {
713            let c = unsafe { *bytes.get_unchecked(scan_offset) };
714            global_quote_parity ^= (c == self.quote_char) & self.quoting;
715
716            let state = &mut states[global_quote_parity as usize];
717            state.newline_count += (c == self.eol_char) as usize;
718            state.last_newline_offset =
719                select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
720
721            scan_offset += 1;
722        }
723
724        states[0].end_inside_string = global_quote_parity;
725        states[1].end_inside_string = !global_quote_parity;
726        states
727    }
728
729    // bytes must begin at the start of a line.
730    fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
731        let pre_s = match self.comment_prefix.as_ref().unwrap() {
732            CommentPrefix::Single(pc) => core::slice::from_ref(pc),
733            CommentPrefix::Multi(ps) => ps.as_bytes(),
734        };
735
736        let mut state = LineStats::default();
737        let mut scan_offset = 0;
738        while scan_offset < bytes.len() {
739            // Skip comment line if needed.
740            while bytes[scan_offset..].starts_with(pre_s) {
741                scan_offset += pre_s.len();
742                let Some(nl_off) = bytes[scan_offset..]
743                    .iter()
744                    .position(|c| *c == self.eol_char)
745                else {
746                    break;
747                };
748                scan_offset += nl_off + 1;
749            }
750
751            while scan_offset < bytes.len() {
752                let c = unsafe { *bytes.get_unchecked(scan_offset) };
753                in_string ^= (c == self.quote_char) & self.quoting;
754
755                if c == self.eol_char && !in_string {
756                    state.newline_count += 1;
757                    state.last_newline_offset = scan_offset;
758                    scan_offset += 1;
759                    break;
760                } else {
761                    scan_offset += 1;
762                }
763            }
764        }
765
766        state.end_inside_string = in_string;
767        state
768    }
769
770    pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
771        loop {
772            let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
773
774            let (count, offset) = if self.comment_prefix.is_some() {
775                let stats = self.analyze_chunk_with_comment(b, false);
776                (stats.newline_count, stats.last_newline_offset)
777            } else {
778                self.count(b)
779            };
780
781            if count > 0 || b.len() == bytes.len() {
782                return (count, offset);
783            }
784
785            *chunk_size = chunk_size.saturating_mul(2);
786        }
787    }
788
789    pub fn count_rows(&self, bytes: &[u8], is_eof: bool) -> (usize, usize) {
790        let stats = if self.comment_prefix.is_some() {
791            self.analyze_chunk_with_comment(bytes, false)
792        } else {
793            self.analyze_chunk(bytes)[0]
794        };
795
796        let mut count = stats.newline_count;
797        let mut offset = stats.last_newline_offset;
798
799        if count > 0 {
800            offset = cmp::min(offset + 1, bytes.len());
801        } else {
802            debug_assert!(offset == 0);
803        }
804
805        if is_eof {
806            // +1 count correction for when last line does not end with '\n'
807            if !bytes.is_empty() && bytes.last().copied().unwrap() != self.eol_char {
808                // We can do a simple backwards-scan to find the start of last line if it is a
809                // comment line, since comment lines can't escape new-lines.
810                let last_new_line_post = memchr::memrchr(self.eol_char, bytes).unwrap_or(0);
811                let last_line_is_comment_line = bytes
812                    .get(last_new_line_post + 1..)
813                    .map(|line| is_comment_line(line, self.comment_prefix.as_ref()))
814                    .unwrap_or(false);
815
816                count += !last_line_is_comment_line as usize;
817            }
818            offset = bytes.len();
819        }
820
821        (count, offset)
822    }
823
824    /// Returns count and offset to split for remainder in slice.
825    #[cfg(feature = "simd")]
826    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
827        let mut total_idx = 0;
828        let original_bytes = bytes;
829        let mut count = 0;
830        let mut position = 0;
831        let mut not_in_field_previous_iter = true;
832
833        loop {
834            let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
835
836            if bytes.len() > SIMD_SIZE {
837                let lane: [u8; SIMD_SIZE] = unsafe {
838                    bytes
839                        .get_unchecked(0..SIMD_SIZE)
840                        .try_into()
841                        .unwrap_unchecked()
842                };
843                let simd_bytes = SimdVec::from(lane);
844                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
845
846                let valid_eols = if self.quoting {
847                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
848                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
849
850                    if not_in_field_previous_iter {
851                        not_in_quote_field = !not_in_quote_field;
852                    }
853                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
854                    eol_mask & not_in_quote_field
855                } else {
856                    eol_mask
857                };
858
859                if valid_eols != 0 {
860                    count += valid_eols.count_ones() as usize;
861                    position = total_idx + 63 - valid_eols.leading_zeros() as usize;
862                    debug_assert_eq!(original_bytes[position], self.eol_char)
863                }
864                total_idx += SIMD_SIZE;
865            } else if bytes.is_empty() {
866                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
867                return (count, position);
868            } else {
869                let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
870
871                let (count, position) = if c > 0 {
872                    (count + c, total_idx + o)
873                } else {
874                    (count, position)
875                };
876                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
877
878                return (count, position);
879            }
880        }
881    }
882
883    #[cfg(not(feature = "simd"))]
884    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
885        self.count_no_simd(bytes, false)
886    }
887
888    fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
889        let iter = bytes.iter();
890        let mut in_field = in_field;
891        let mut count = 0;
892        let mut position = 0;
893
894        for b in iter {
895            let c = *b;
896            if self.quoting && c == self.quote_char {
897                // toggle between string field enclosure
898                //      if we encounter a starting '"' -> in_field = true;
899                //      if we encounter a closing '"' -> in_field = false;
900                in_field = !in_field;
901            }
902            // If we are not in a string and we encounter '\n' we can stop at this position.
903            else if c == self.eol_char && !in_field {
904                position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
905                count += 1;
906            }
907        }
908        debug_assert!(count == 0 || bytes[position] == self.eol_char);
909
910        (count, position)
911    }
912}
913
914#[inline]
915fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
916    let mut in_field = false;
917
918    let mut idx = 0u32;
919    // micro optimizations
920    #[allow(clippy::explicit_counter_loop)]
921    for &c in bytes.iter() {
922        if c == quote_char {
923            // toggle between string field enclosure
924            //      if we encounter a starting '"' -> in_field = true;
925            //      if we encounter a closing '"' -> in_field = false;
926            in_field = !in_field;
927        }
928
929        if !in_field && c == needle {
930            return Some(idx as usize);
931        }
932        idx += 1;
933    }
934    None
935}
936
937#[inline]
938pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
939    let pos = match quote {
940        Some(quote) => find_quoted(bytes, quote, eol_char),
941        None => bytes.iter().position(|x| *x == eol_char),
942    };
943    match pos {
944        None => &[],
945        Some(pos) => &bytes[pos + 1..],
946    }
947}
948
949#[inline]
950pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
951    if let Some(pos) = next_line_position_naive(input, eol_char) {
952        unsafe { input.get_unchecked(pos..) }
953    } else {
954        &[]
955    }
956}
957
958/// Parse CSV.
959///
960/// # Arguments
961/// * `bytes` - input to parse
962/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
963///   thread has a different offset.
964/// * `projection` - Indices of the columns to project.
965/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
966///   fields are written to the buffers. The UTF8 data will be parsed later.
967///
968/// Returns the number of bytes parsed successfully.
969#[allow(clippy::too_many_arguments)]
970pub(super) fn parse_lines(
971    mut bytes: &[u8],
972    parse_options: &CsvParseOptions,
973    offset: usize,
974    ignore_errors: bool,
975    null_values: Option<&NullValuesCompiled>,
976    projection: &[usize],
977    buffers: &mut [Buffer],
978    n_lines: usize,
979    // length of original schema
980    schema_len: usize,
981    schema: &Schema,
982) -> PolarsResult<usize> {
983    assert!(
984        !projection.is_empty(),
985        "at least one column should be projected"
986    );
987    let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
988    // During projection pushdown we are not checking other csv fields.
989    // This would be very expensive and we don't care as we only want
990    // the projected columns.
991    if projection.len() != schema_len {
992        truncate_ragged_lines = true
993    }
994
995    // we use the pointers to track the no of bytes read.
996    let start = bytes.as_ptr() as usize;
997    let original_bytes_len = bytes.len();
998    let n_lines = n_lines as u32;
999
1000    let mut line_count = 0u32;
1001    loop {
1002        if line_count > n_lines {
1003            let end = bytes.as_ptr() as usize;
1004            return Ok(end - start);
1005        }
1006
1007        if bytes.is_empty() {
1008            return Ok(original_bytes_len);
1009        } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1010            // deal with comments
1011            let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1012            bytes = bytes_rem;
1013            continue;
1014        }
1015
1016        // Every line we only need to parse the columns that are projected.
1017        // Therefore we check if the idx of the field is in our projected columns.
1018        // If it is not, we skip the field.
1019        let mut projection_iter = projection.iter().copied();
1020        let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1021        let mut processed_fields = 0;
1022
1023        let mut iter = SplitFields::new(
1024            bytes,
1025            parse_options.separator,
1026            parse_options.quote_char,
1027            parse_options.eol_char,
1028        );
1029        let mut idx = 0u32;
1030        let mut read_sol = 0;
1031        loop {
1032            match iter.next() {
1033                // end of line
1034                None => {
1035                    bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1036                    break;
1037                },
1038                Some((mut field, needs_escaping)) => {
1039                    let field_len = field.len();
1040
1041                    // +1 is the split character that is consumed by the iterator.
1042                    read_sol += field_len + 1;
1043
1044                    if idx == next_projected as u32 {
1045                        // the iterator is finished when it encounters a `\n`
1046                        // this could be preceded by a '\r'
1047                        unsafe {
1048                            if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1049                                field = field.get_unchecked(..field_len - 1);
1050                            }
1051                        }
1052
1053                        debug_assert!(processed_fields < buffers.len());
1054                        let buf = unsafe {
1055                            // SAFETY: processed fields index can never exceed the projection indices.
1056                            buffers.get_unchecked_mut(processed_fields)
1057                        };
1058                        let mut add_null = false;
1059
1060                        // if we have null values argument, check if this field equal null value
1061                        if let Some(null_values) = null_values {
1062                            let field = if needs_escaping && !field.is_empty() {
1063                                unsafe { field.get_unchecked(1..field.len() - 1) }
1064                            } else {
1065                                field
1066                            };
1067
1068                            // SAFETY:
1069                            // process fields is in bounds
1070                            add_null = unsafe { null_values.is_null(field, idx as usize) }
1071                        }
1072                        if add_null {
1073                            buf.add_null(!parse_options.missing_is_null && field.is_empty())
1074                        } else {
1075                            buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1076                                .map_err(|e| {
1077                                    let bytes_offset = offset + field.as_ptr() as usize - start;
1078                                    let unparsable = String::from_utf8_lossy(field);
1079                                    let column_name = schema.get_at_index(idx as usize).unwrap().0;
1080                                    polars_err!(
1081                                        ComputeError:
1082                                        "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1083                                        The current offset in the file is {} bytes.\n\
1084                                        \n\
1085                                        You might want to try:\n\
1086                                        - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1087                                        - specifying correct dtype with the `schema_overrides` argument\n\
1088                                        - setting `ignore_errors` to `True`,\n\
1089                                        - adding `{}` to the `null_values` list.\n\n\
1090                                        Original error: ```{}```",
1091                                        &unparsable,
1092                                        buf.dtype(),
1093                                        column_name,
1094                                        idx + 1,
1095                                        bytes_offset,
1096                                        &unparsable,
1097                                        e
1098                                    )
1099                                })?;
1100                        }
1101                        processed_fields += 1;
1102
1103                        // if we have all projected columns we are done with this line
1104                        match projection_iter.next() {
1105                            Some(p) => next_projected = p,
1106                            None => {
1107                                if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1108                                    bytes = unsafe { bytes.get_unchecked(read_sol..) };
1109                                } else {
1110                                    if !truncate_ragged_lines && read_sol < bytes.len() {
1111                                        polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1112
1113Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1114                                    }
1115                                    let bytes_rem = skip_this_line(
1116                                        unsafe { bytes.get_unchecked(read_sol - 1..) },
1117                                        parse_options.quote_char,
1118                                        parse_options.eol_char,
1119                                    );
1120                                    bytes = bytes_rem;
1121                                }
1122                                break;
1123                            },
1124                        }
1125                    }
1126                    idx += 1;
1127                },
1128            }
1129        }
1130
1131        // there can be lines that miss fields (also the comma values)
1132        // this means the splitter won't process them.
1133        // We traverse them to read them as null values.
1134        while processed_fields < projection.len() {
1135            debug_assert!(processed_fields < buffers.len());
1136            let buf = unsafe {
1137                // SAFETY: processed fields index can never exceed the projection indices.
1138                buffers.get_unchecked_mut(processed_fields)
1139            };
1140            buf.add_null(!parse_options.missing_is_null);
1141            processed_fields += 1;
1142        }
1143        line_count += 1;
1144    }
1145}
1146
1147#[cfg(test)]
1148mod test {
1149    use super::SplitLines;
1150
1151    #[test]
1152    fn test_splitlines() {
1153        let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1154        let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1155        assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1156        assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1157        assert_eq!(lines.next(), None);
1158
1159        let input2 = "1,'foo\n'\n2,'foo\n'\n";
1160        let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1161        assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1162        assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1163        assert_eq!(lines2.next(), None);
1164    }
1165}