polars_io/csv/read/
parser.rs

1use std::path::Path;
2
3use memchr::memchr2_iter;
4use num_traits::Pow;
5use polars_core::prelude::*;
6use polars_core::{POOL, config};
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::select::select_unpredictable;
10use rayon::prelude::*;
11
12use super::CsvParseOptions;
13use super::buffer::Buffer;
14use super::options::{CommentPrefix, NullValuesCompiled};
15use super::splitfields::SplitFields;
16use super::utils::get_file_chunks;
17use crate::path_utils::is_cloud_url;
18use crate::utils::compression::maybe_decompress_bytes;
19
20/// Read the number of rows without parsing columns
21/// useful for count(*) queries
22pub fn count_rows(
23    path: &Path,
24    separator: u8,
25    quote_char: Option<u8>,
26    comment_prefix: Option<&CommentPrefix>,
27    eol_char: u8,
28    has_header: bool,
29) -> PolarsResult<usize> {
30    let file = if is_cloud_url(path) || config::force_async() {
31        feature_gated!("cloud", {
32            crate::file_cache::FILE_CACHE
33                .get_entry(path.to_str().unwrap())
34                // Safety: This was initialized by schema inference.
35                .unwrap()
36                .try_open_assume_latest()?
37        })
38    } else {
39        polars_utils::open_file(path)?
40    };
41
42    let mmap = MMapSemaphore::new_from_file(&file).unwrap();
43    let owned = &mut vec![];
44    let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
45
46    count_rows_from_slice_par(
47        reader_bytes,
48        separator,
49        quote_char,
50        comment_prefix,
51        eol_char,
52        has_header,
53    )
54}
55
56/// Read the number of rows without parsing columns
57/// useful for count(*) queries
58pub fn count_rows_from_slice_par(
59    mut bytes: &[u8],
60    separator: u8,
61    quote_char: Option<u8>,
62    comment_prefix: Option<&CommentPrefix>,
63    eol_char: u8,
64    has_header: bool,
65) -> PolarsResult<usize> {
66    for _ in 0..bytes.len() {
67        if bytes[0] != eol_char {
68            break;
69        }
70
71        bytes = &bytes[1..];
72    }
73
74    const MIN_ROWS_PER_THREAD: usize = 1024;
75    let max_threads = POOL.current_num_threads();
76
77    // Determine if parallelism is beneficial and how many threads
78    let n_threads = get_line_stats(
79        bytes,
80        MIN_ROWS_PER_THREAD,
81        eol_char,
82        None,
83        separator,
84        quote_char,
85    )
86    .map(|(mean, std)| {
87        let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
88        (n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
89    })
90    .unwrap_or(1);
91
92    if n_threads == 1 {
93        return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, has_header);
94    }
95
96    let file_chunks: Vec<(usize, usize)> =
97        get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);
98
99    let iter = file_chunks.into_par_iter().map(|(start, stop)| {
100        let bytes = &bytes[start..stop];
101
102        if comment_prefix.is_some() {
103            SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
104                .filter(|line| !is_comment_line(line, comment_prefix))
105                .count()
106        } else {
107            CountLines::new(quote_char, eol_char).count(bytes).0
108        }
109    });
110
111    let n: usize = POOL.install(|| iter.sum());
112
113    Ok(n - (has_header as usize))
114}
115
116/// Read the number of rows without parsing columns
117pub fn count_rows_from_slice(
118    mut bytes: &[u8],
119    quote_char: Option<u8>,
120    comment_prefix: Option<&CommentPrefix>,
121    eol_char: u8,
122    has_header: bool,
123) -> PolarsResult<usize> {
124    for _ in 0..bytes.len() {
125        if bytes[0] != eol_char {
126            break;
127        }
128
129        bytes = &bytes[1..];
130    }
131
132    let n = if comment_prefix.is_some() {
133        SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
134            .filter(|line| !is_comment_line(line, comment_prefix))
135            .count()
136    } else {
137        CountLines::new(quote_char, eol_char).count(bytes).0
138    };
139
140    Ok(n - (has_header as usize))
141}
142
143/// Skip the utf-8 Byte Order Mark.
144/// credits to csv-core
145pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
146    if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
147        &input[3..]
148    } else {
149        input
150    }
151}
152
153/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
154///
155/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
156#[inline]
157pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
158    match comment_prefix {
159        Some(CommentPrefix::Single(c)) => line.first() == Some(c),
160        Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
161        None => false,
162    }
163}
164
165/// Find the nearest next line position.
166/// Does not check for new line characters embedded in String fields.
167pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
168    let pos = memchr::memchr(eol_char, input)? + 1;
169    if input.len() - pos == 0 {
170        return None;
171    }
172    Some(pos)
173}
174
175pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
176    for _ in 0..skip {
177        if let Some(pos) = next_line_position_naive(input, eol_char) {
178            input = &input[pos..];
179        } else {
180            return input;
181        }
182    }
183    input
184}
185
186/// Find the nearest next line position that is not embedded in a String field.
187pub(super) fn next_line_position(
188    mut input: &[u8],
189    mut expected_fields: Option<usize>,
190    separator: u8,
191    quote_char: Option<u8>,
192    eol_char: u8,
193) -> Option<usize> {
194    fn accept_line(
195        line: &[u8],
196        expected_fields: usize,
197        separator: u8,
198        eol_char: u8,
199        quote_char: Option<u8>,
200    ) -> bool {
201        let mut count = 0usize;
202        for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
203            if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
204                return false;
205            }
206            count += 1;
207        }
208
209        // if the latest field is missing
210        // e.g.:
211        // a,b,c
212        // vala,valb,
213        // SplitFields returns a count that is 1 less
214        // There fore we accept:
215        // expected == count
216        // and
217        // expected == count - 1
218        expected_fields.wrapping_sub(count) <= 1
219    }
220
221    // we check 3 subsequent lines for `accept_line` before we accept
222    // if 3 groups are rejected we reject completely
223    let mut rejected_line_groups = 0u8;
224
225    let mut total_pos = 0;
226    if input.is_empty() {
227        return None;
228    }
229    let mut lines_checked = 0u8;
230    loop {
231        if rejected_line_groups >= 3 {
232            return None;
233        }
234        lines_checked = lines_checked.wrapping_add(1);
235        // headers might have an extra value
236        // So if we have churned through enough lines
237        // we try one field less.
238        if lines_checked == u8::MAX {
239            if let Some(ef) = expected_fields {
240                expected_fields = Some(ef.saturating_sub(1))
241            }
242        };
243        let pos = memchr::memchr(eol_char, input)? + 1;
244        if input.len() - pos == 0 {
245            return None;
246        }
247        debug_assert!(pos <= input.len());
248        let new_input = unsafe { input.get_unchecked(pos..) };
249        let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
250        let line = lines.next();
251
252        match (line, expected_fields) {
253            // count the fields, and determine if they are equal to what we expect from the schema
254            (Some(line), Some(expected_fields)) => {
255                if accept_line(line, expected_fields, separator, eol_char, quote_char) {
256                    let mut valid = true;
257                    for line in lines.take(2) {
258                        if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
259                            valid = false;
260                            break;
261                        }
262                    }
263                    if valid {
264                        return Some(total_pos + pos);
265                    } else {
266                        rejected_line_groups += 1;
267                    }
268                } else {
269                    debug_assert!(pos < input.len());
270                    unsafe {
271                        input = input.get_unchecked(pos + 1..);
272                    }
273                    total_pos += pos + 1;
274                }
275            },
276            // don't count the fields
277            (Some(_), None) => return Some(total_pos + pos),
278            // // no new line found, check latest line (without eol) for number of fields
279            _ => return None,
280        }
281    }
282}
283
284pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
285    b == eol_char || b == b'\r'
286}
287
288pub(super) fn is_whitespace(b: u8) -> bool {
289    b == b' ' || b == b'\t'
290}
291
292#[inline]
293fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
294where
295    F: Fn(u8) -> bool,
296{
297    if input.is_empty() {
298        return input;
299    }
300
301    let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
302    &input[read..]
303}
304
305/// Remove whitespace from the start of buffer.
306/// Makes sure that the bytes stream starts with
307///     'field_1,field_2'
308/// and not with
309///     '\nfield_1,field_1'
310#[inline]
311pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
312    skip_condition(input, is_whitespace)
313}
314
315#[inline]
316pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
317    skip_condition(input, |b| is_line_ending(b, eol_char))
318}
319
320/// Get the mean and standard deviation of length of lines in bytes
321pub(super) fn get_line_stats(
322    bytes: &[u8],
323    n_lines: usize,
324    eol_char: u8,
325    expected_fields: Option<usize>,
326    separator: u8,
327    quote_char: Option<u8>,
328) -> Option<(f32, f32)> {
329    let mut lengths = Vec::with_capacity(n_lines);
330
331    let mut bytes_trunc;
332    let n_lines_per_iter = n_lines / 2;
333
334    let mut n_read = 0;
335
336    // sample from start and 75% in the file
337    for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
338        bytes_trunc = &bytes[offset..];
339        let pos = next_line_position(
340            bytes_trunc,
341            expected_fields,
342            separator,
343            quote_char,
344            eol_char,
345        )?;
346        bytes_trunc = &bytes_trunc[pos + 1..];
347
348        for _ in offset..(offset + n_lines_per_iter) {
349            let pos = next_line_position_naive(bytes_trunc, eol_char)? + 1;
350            n_read += pos;
351            lengths.push(pos);
352            bytes_trunc = &bytes_trunc[pos..];
353        }
354    }
355
356    let n_samples = lengths.len();
357
358    let mean = (n_read as f32) / (n_samples as f32);
359    let mut std = 0.0;
360    for &len in lengths.iter() {
361        std += (len as f32 - mean).pow(2.0)
362    }
363    std = (std / n_samples as f32).sqrt();
364    Some((mean, std))
365}
366
367/// An adapted version of std::iter::Split.
368/// This exists solely because we cannot split the file in lines naively as
369///
370/// ```text
371///    for line in bytes.split(b'\n') {
372/// ```
373///
374/// This will fail when strings fields are have embedded end line characters.
375/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
376pub(super) struct SplitLines<'a> {
377    v: &'a [u8],
378    quote_char: u8,
379    eol_char: u8,
380    #[cfg(feature = "simd")]
381    simd_eol_char: SimdVec,
382    #[cfg(feature = "simd")]
383    simd_quote_char: SimdVec,
384    #[cfg(feature = "simd")]
385    previous_valid_eols: u64,
386    total_index: usize,
387    quoting: bool,
388    comment_prefix: Option<&'a CommentPrefix>,
389}
390
391#[cfg(feature = "simd")]
392const SIMD_SIZE: usize = 64;
393#[cfg(feature = "simd")]
394use std::simd::prelude::*;
395
396#[cfg(feature = "simd")]
397use polars_utils::clmul::prefix_xorsum_inclusive;
398
399#[cfg(feature = "simd")]
400type SimdVec = u8x64;
401
402impl<'a> SplitLines<'a> {
403    pub(super) fn new(
404        slice: &'a [u8],
405        quote_char: Option<u8>,
406        eol_char: u8,
407        comment_prefix: Option<&'a CommentPrefix>,
408    ) -> Self {
409        let quoting = quote_char.is_some();
410        let quote_char = quote_char.unwrap_or(b'\"');
411        #[cfg(feature = "simd")]
412        let simd_eol_char = SimdVec::splat(eol_char);
413        #[cfg(feature = "simd")]
414        let simd_quote_char = SimdVec::splat(quote_char);
415        Self {
416            v: slice,
417            quote_char,
418            eol_char,
419            #[cfg(feature = "simd")]
420            simd_eol_char,
421            #[cfg(feature = "simd")]
422            simd_quote_char,
423            #[cfg(feature = "simd")]
424            previous_valid_eols: 0,
425            total_index: 0,
426            quoting,
427            comment_prefix,
428        }
429    }
430}
431
432impl<'a> SplitLines<'a> {
433    // scalar as in non-simd
434    fn next_scalar(&mut self) -> Option<&'a [u8]> {
435        if self.v.is_empty() {
436            return None;
437        }
438        if is_comment_line(self.v, self.comment_prefix) {
439            return self.next_comment_line();
440        }
441        {
442            let mut pos = 0u32;
443            let mut iter = self.v.iter();
444            let mut in_field = false;
445            loop {
446                match iter.next() {
447                    Some(&c) => {
448                        pos += 1;
449
450                        if self.quoting && c == self.quote_char {
451                            // toggle between string field enclosure
452                            //      if we encounter a starting '"' -> in_field = true;
453                            //      if we encounter a closing '"' -> in_field = false;
454                            in_field = !in_field;
455                        }
456                        // if we are not in a string and we encounter '\n' we can stop at this position.
457                        else if c == self.eol_char && !in_field {
458                            break;
459                        }
460                    },
461                    None => {
462                        let remainder = self.v;
463                        self.v = &[];
464                        return Some(remainder);
465                    },
466                }
467            }
468
469            unsafe {
470                debug_assert!((pos as usize) <= self.v.len());
471
472                // return line up to this position
473                let ret = Some(
474                    self.v
475                        .get_unchecked(..(self.total_index + pos as usize - 1)),
476                );
477                // skip the '\n' token and update slice.
478                self.v = self.v.get_unchecked(self.total_index + pos as usize..);
479                ret
480            }
481        }
482    }
483    fn next_comment_line(&mut self) -> Option<&'a [u8]> {
484        if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
485            unsafe {
486                // return line up to this position
487                let ret = Some(self.v.get_unchecked(..(pos - 1)));
488                // skip the '\n' token and update slice.
489                self.v = self.v.get_unchecked(pos..);
490                ret
491            }
492        } else {
493            let remainder = self.v;
494            self.v = &[];
495            Some(remainder)
496        }
497    }
498}
499
500impl<'a> Iterator for SplitLines<'a> {
501    type Item = &'a [u8];
502
503    #[inline]
504    #[cfg(not(feature = "simd"))]
505    fn next(&mut self) -> Option<&'a [u8]> {
506        self.next_scalar()
507    }
508
509    #[inline]
510    #[cfg(feature = "simd")]
511    fn next(&mut self) -> Option<&'a [u8]> {
512        // First check cached value
513        if self.previous_valid_eols != 0 {
514            let pos = self.previous_valid_eols.trailing_zeros() as usize;
515            self.previous_valid_eols >>= (pos + 1) as u64;
516
517            unsafe {
518                debug_assert!((pos) <= self.v.len());
519
520                // return line up to this position
521                let ret = Some(self.v.get_unchecked(..pos));
522                // skip the '\n' token and update slice.
523                self.v = self.v.get_unchecked(pos + 1..);
524                return ret;
525            }
526        }
527        if self.v.is_empty() {
528            return None;
529        }
530        if self.comment_prefix.is_some() {
531            return self.next_scalar();
532        }
533
534        self.total_index = 0;
535        let mut not_in_field_previous_iter = true;
536
537        loop {
538            let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
539            if bytes.len() > SIMD_SIZE {
540                let lane: [u8; SIMD_SIZE] = unsafe {
541                    bytes
542                        .get_unchecked(0..SIMD_SIZE)
543                        .try_into()
544                        .unwrap_unchecked()
545                };
546                let simd_bytes = SimdVec::from(lane);
547                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
548
549                let valid_eols = if self.quoting {
550                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
551                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
552
553                    if not_in_field_previous_iter {
554                        not_in_quote_field = !not_in_quote_field;
555                    }
556                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
557                    eol_mask & not_in_quote_field
558                } else {
559                    eol_mask
560                };
561
562                if valid_eols != 0 {
563                    let pos = valid_eols.trailing_zeros() as usize;
564                    if pos == SIMD_SIZE - 1 {
565                        self.previous_valid_eols = 0;
566                    } else {
567                        self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
568                    }
569
570                    unsafe {
571                        let pos = self.total_index + pos;
572                        debug_assert!((pos) <= self.v.len());
573
574                        // return line up to this position
575                        let ret = Some(self.v.get_unchecked(..pos));
576                        // skip the '\n' token and update slice.
577                        self.v = self.v.get_unchecked(pos + 1..);
578                        return ret;
579                    }
580                } else {
581                    self.total_index += SIMD_SIZE;
582                }
583            } else {
584                // Denotes if we are in a string field, started with a quote
585                let mut in_field = !not_in_field_previous_iter;
586                let mut pos = 0u32;
587                let mut iter = bytes.iter();
588                loop {
589                    match iter.next() {
590                        Some(&c) => {
591                            pos += 1;
592
593                            if self.quoting && c == self.quote_char {
594                                // toggle between string field enclosure
595                                //      if we encounter a starting '"' -> in_field = true;
596                                //      if we encounter a closing '"' -> in_field = false;
597                                in_field = !in_field;
598                            }
599                            // if we are not in a string and we encounter '\n' we can stop at this position.
600                            else if c == self.eol_char && !in_field {
601                                break;
602                            }
603                        },
604                        None => {
605                            let remainder = self.v;
606                            self.v = &[];
607                            return Some(remainder);
608                        },
609                    }
610                }
611
612                unsafe {
613                    debug_assert!((pos as usize) <= self.v.len());
614
615                    // return line up to this position
616                    let ret = Some(
617                        self.v
618                            .get_unchecked(..(self.total_index + pos as usize - 1)),
619                    );
620                    // skip the '\n' token and update slice.
621                    self.v = self.v.get_unchecked(self.total_index + pos as usize..);
622                    return ret;
623                }
624            }
625        }
626    }
627}
628
629pub struct CountLines {
630    quote_char: u8,
631    eol_char: u8,
632    #[cfg(feature = "simd")]
633    simd_eol_char: SimdVec,
634    #[cfg(feature = "simd")]
635    simd_quote_char: SimdVec,
636    quoting: bool,
637}
638
639#[derive(Copy, Clone, Debug)]
640pub struct LineStats {
641    newline_count: usize,
642    last_newline_offset: usize,
643    end_inside_string: bool,
644}
645
646impl CountLines {
647    pub fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
648        let quoting = quote_char.is_some();
649        let quote_char = quote_char.unwrap_or(b'\"');
650        #[cfg(feature = "simd")]
651        let simd_eol_char = SimdVec::splat(eol_char);
652        #[cfg(feature = "simd")]
653        let simd_quote_char = SimdVec::splat(quote_char);
654        Self {
655            quote_char,
656            eol_char,
657            #[cfg(feature = "simd")]
658            simd_eol_char,
659            #[cfg(feature = "simd")]
660            simd_quote_char,
661            quoting,
662        }
663    }
664
665    /// Analyzes a chunk of CSV data.
666    ///
667    /// Returns (newline_count, last_newline_offset, end_inside_string) twice,
668    /// the first is assuming the start of the chunk is *not* inside a string,
669    /// the second assuming the start is inside a string.
670    pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
671        let mut scan_offset = 0;
672        let mut states = [
673            LineStats {
674                newline_count: 0,
675                last_newline_offset: 0,
676                end_inside_string: false,
677            },
678            LineStats {
679                newline_count: 0,
680                last_newline_offset: 0,
681                end_inside_string: false,
682            },
683        ];
684
685        // false if even number of quotes seen so far, true otherwise.
686        #[allow(unused_assignments)]
687        let mut global_quote_parity = false;
688
689        #[cfg(feature = "simd")]
690        {
691            // 0 if even number of quotes seen so far, u64::MAX otherwise.
692            let mut global_quote_parity_mask = 0;
693            while scan_offset + 64 <= bytes.len() {
694                let block: [u8; 64] = unsafe {
695                    bytes
696                        .get_unchecked(scan_offset..scan_offset + 64)
697                        .try_into()
698                        .unwrap_unchecked()
699                };
700                let simd_bytes = SimdVec::from(block);
701                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
702                if self.quoting {
703                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
704                    let quote_parity =
705                        prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
706                    global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
707
708                    let start_outside_string_eol_mask = eol_mask & !quote_parity;
709                    states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
710                    states[0].last_newline_offset = select_unpredictable(
711                        start_outside_string_eol_mask != 0,
712                        (scan_offset + 63)
713                            .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
714                        states[0].last_newline_offset,
715                    );
716
717                    let start_inside_string_eol_mask = eol_mask & quote_parity;
718                    states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
719                    states[1].last_newline_offset = select_unpredictable(
720                        start_inside_string_eol_mask != 0,
721                        (scan_offset + 63)
722                            .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
723                        states[1].last_newline_offset,
724                    );
725                } else {
726                    states[0].newline_count += eol_mask.count_ones() as usize;
727                    states[0].last_newline_offset = select_unpredictable(
728                        eol_mask != 0,
729                        (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
730                        states[0].last_newline_offset,
731                    );
732                }
733
734                scan_offset += 64;
735            }
736
737            global_quote_parity = global_quote_parity_mask > 0;
738        }
739
740        while scan_offset < bytes.len() {
741            let c = unsafe { *bytes.get_unchecked(scan_offset) };
742            global_quote_parity ^= (c == self.quote_char) & self.quoting;
743
744            let state = &mut states[global_quote_parity as usize];
745            state.newline_count += (c == self.eol_char) as usize;
746            state.last_newline_offset =
747                select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
748
749            scan_offset += 1;
750        }
751
752        states[0].end_inside_string = global_quote_parity;
753        states[1].end_inside_string = !global_quote_parity;
754        states
755    }
756
757    pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
758        loop {
759            let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
760
761            let (count, offset) = self.count(b);
762
763            if count > 0 || b.len() == bytes.len() {
764                return (count, offset);
765            }
766
767            *chunk_size *= 2;
768        }
769    }
770
771    /// Returns count and offset to split for remainder in slice.
772    #[cfg(feature = "simd")]
773    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
774        let mut total_idx = 0;
775        let original_bytes = bytes;
776        let mut count = 0;
777        let mut position = 0;
778        let mut not_in_field_previous_iter = true;
779
780        loop {
781            let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
782
783            if bytes.len() > SIMD_SIZE {
784                let lane: [u8; SIMD_SIZE] = unsafe {
785                    bytes
786                        .get_unchecked(0..SIMD_SIZE)
787                        .try_into()
788                        .unwrap_unchecked()
789                };
790                let simd_bytes = SimdVec::from(lane);
791                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
792
793                let valid_eols = if self.quoting {
794                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
795                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
796
797                    if not_in_field_previous_iter {
798                        not_in_quote_field = !not_in_quote_field;
799                    }
800                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
801                    eol_mask & not_in_quote_field
802                } else {
803                    eol_mask
804                };
805
806                if valid_eols != 0 {
807                    count += valid_eols.count_ones() as usize;
808                    position = total_idx + 63 - valid_eols.leading_zeros() as usize;
809                    debug_assert_eq!(original_bytes[position], self.eol_char)
810                }
811                total_idx += SIMD_SIZE;
812            } else if bytes.is_empty() {
813                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
814                return (count, position);
815            } else {
816                let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
817
818                let (count, position) = if c > 0 {
819                    (count + c, total_idx + o)
820                } else {
821                    (count, position)
822                };
823                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
824
825                return (count, position);
826            }
827        }
828    }
829
830    #[cfg(not(feature = "simd"))]
831    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
832        self.count_no_simd(bytes, false)
833    }
834
835    fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
836        let iter = bytes.iter();
837        let mut in_field = in_field;
838        let mut count = 0;
839        let mut position = 0;
840
841        for b in iter {
842            let c = *b;
843            if self.quoting && c == self.quote_char {
844                // toggle between string field enclosure
845                //      if we encounter a starting '"' -> in_field = true;
846                //      if we encounter a closing '"' -> in_field = false;
847                in_field = !in_field;
848            }
849            // If we are not in a string and we encounter '\n' we can stop at this position.
850            else if c == self.eol_char && !in_field {
851                position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
852                count += 1;
853            }
854        }
855        debug_assert!(count == 0 || bytes[position] == self.eol_char);
856
857        (count, position)
858    }
859}
860
861#[inline]
862fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
863    let mut in_field = false;
864
865    let mut idx = 0u32;
866    // micro optimizations
867    #[allow(clippy::explicit_counter_loop)]
868    for &c in bytes.iter() {
869        if c == quote_char {
870            // toggle between string field enclosure
871            //      if we encounter a starting '"' -> in_field = true;
872            //      if we encounter a closing '"' -> in_field = false;
873            in_field = !in_field;
874        }
875
876        if !in_field && c == needle {
877            return Some(idx as usize);
878        }
879        idx += 1;
880    }
881    None
882}
883
884#[inline]
885pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
886    let pos = match quote {
887        Some(quote) => find_quoted(bytes, quote, eol_char),
888        None => bytes.iter().position(|x| *x == eol_char),
889    };
890    match pos {
891        None => &[],
892        Some(pos) => &bytes[pos + 1..],
893    }
894}
895
896#[inline]
897pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
898    if let Some(pos) = next_line_position_naive(input, eol_char) {
899        unsafe { input.get_unchecked(pos..) }
900    } else {
901        &[]
902    }
903}
904
905/// Parse CSV.
906///
907/// # Arguments
908/// * `bytes` - input to parse
909/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
910///   thread has a different offset.
911/// * `projection` - Indices of the columns to project.
912/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
913///   fields are written to the buffers. The UTF8 data will be parsed later.
914#[allow(clippy::too_many_arguments)]
915pub(super) fn parse_lines(
916    mut bytes: &[u8],
917    parse_options: &CsvParseOptions,
918    offset: usize,
919    ignore_errors: bool,
920    null_values: Option<&NullValuesCompiled>,
921    projection: &[usize],
922    buffers: &mut [Buffer],
923    n_lines: usize,
924    // length of original schema
925    schema_len: usize,
926    schema: &Schema,
927) -> PolarsResult<usize> {
928    assert!(
929        !projection.is_empty(),
930        "at least one column should be projected"
931    );
932    let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
933    // During projection pushdown we are not checking other csv fields.
934    // This would be very expensive and we don't care as we only want
935    // the projected columns.
936    if projection.len() != schema_len {
937        truncate_ragged_lines = true
938    }
939
940    // we use the pointers to track the no of bytes read.
941    let start = bytes.as_ptr() as usize;
942    let original_bytes_len = bytes.len();
943    let n_lines = n_lines as u32;
944
945    let mut line_count = 0u32;
946    loop {
947        if line_count > n_lines {
948            let end = bytes.as_ptr() as usize;
949            return Ok(end - start);
950        }
951
952        if bytes.is_empty() {
953            return Ok(original_bytes_len);
954        } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
955            // deal with comments
956            let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
957            bytes = bytes_rem;
958            continue;
959        }
960
961        // Every line we only need to parse the columns that are projected.
962        // Therefore we check if the idx of the field is in our projected columns.
963        // If it is not, we skip the field.
964        let mut projection_iter = projection.iter().copied();
965        let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
966        let mut processed_fields = 0;
967
968        let mut iter = SplitFields::new(
969            bytes,
970            parse_options.separator,
971            parse_options.quote_char,
972            parse_options.eol_char,
973        );
974        let mut idx = 0u32;
975        let mut read_sol = 0;
976        loop {
977            match iter.next() {
978                // end of line
979                None => {
980                    bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
981                    break;
982                },
983                Some((mut field, needs_escaping)) => {
984                    let field_len = field.len();
985
986                    // +1 is the split character that is consumed by the iterator.
987                    read_sol += field_len + 1;
988
989                    if idx == next_projected as u32 {
990                        // the iterator is finished when it encounters a `\n`
991                        // this could be preceded by a '\r'
992                        unsafe {
993                            if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
994                                field = field.get_unchecked(..field_len - 1);
995                            }
996                        }
997
998                        debug_assert!(processed_fields < buffers.len());
999                        let buf = unsafe {
1000                            // SAFETY: processed fields index can never exceed the projection indices.
1001                            buffers.get_unchecked_mut(processed_fields)
1002                        };
1003                        let mut add_null = false;
1004
1005                        // if we have null values argument, check if this field equal null value
1006                        if let Some(null_values) = null_values {
1007                            let field = if needs_escaping && !field.is_empty() {
1008                                unsafe { field.get_unchecked(1..field.len() - 1) }
1009                            } else {
1010                                field
1011                            };
1012
1013                            // SAFETY:
1014                            // process fields is in bounds
1015                            add_null = unsafe { null_values.is_null(field, idx as usize) }
1016                        }
1017                        if add_null {
1018                            buf.add_null(!parse_options.missing_is_null && field.is_empty())
1019                        } else {
1020                            buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1021                                .map_err(|e| {
1022                                    let bytes_offset = offset + field.as_ptr() as usize - start;
1023                                    let unparsable = String::from_utf8_lossy(field);
1024                                    let column_name = schema.get_at_index(idx as usize).unwrap().0;
1025                                    polars_err!(
1026                                        ComputeError:
1027                                        "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1028                                        The current offset in the file is {} bytes.\n\
1029                                        \n\
1030                                        You might want to try:\n\
1031                                        - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1032                                        - specifying correct dtype with the `schema_overrides` argument\n\
1033                                        - setting `ignore_errors` to `True`,\n\
1034                                        - adding `{}` to the `null_values` list.\n\n\
1035                                        Original error: ```{}```",
1036                                        &unparsable,
1037                                        buf.dtype(),
1038                                        column_name,
1039                                        idx + 1,
1040                                        bytes_offset,
1041                                        &unparsable,
1042                                        e
1043                                    )
1044                                })?;
1045                        }
1046                        processed_fields += 1;
1047
1048                        // if we have all projected columns we are done with this line
1049                        match projection_iter.next() {
1050                            Some(p) => next_projected = p,
1051                            None => {
1052                                if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1053                                    bytes = &bytes[read_sol..];
1054                                } else {
1055                                    if !truncate_ragged_lines && read_sol < bytes.len() {
1056                                        polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1057
1058Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1059                                    }
1060                                    let bytes_rem = skip_this_line(
1061                                        unsafe { bytes.get_unchecked(read_sol - 1..) },
1062                                        parse_options.quote_char,
1063                                        parse_options.eol_char,
1064                                    );
1065                                    bytes = bytes_rem;
1066                                }
1067                                break;
1068                            },
1069                        }
1070                    }
1071                    idx += 1;
1072                },
1073            }
1074        }
1075
1076        // there can be lines that miss fields (also the comma values)
1077        // this means the splitter won't process them.
1078        // We traverse them to read them as null values.
1079        while processed_fields < projection.len() {
1080            debug_assert!(processed_fields < buffers.len());
1081            let buf = unsafe {
1082                // SAFETY: processed fields index can never exceed the projection indices.
1083                buffers.get_unchecked_mut(processed_fields)
1084            };
1085            buf.add_null(!parse_options.missing_is_null);
1086            processed_fields += 1;
1087        }
1088        line_count += 1;
1089    }
1090}
1091
1092#[cfg(test)]
1093mod test {
1094    use super::SplitLines;
1095
1096    #[test]
1097    fn test_splitlines() {
1098        let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1099        let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1100        assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1101        assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1102        assert_eq!(lines.next(), None);
1103
1104        let input2 = "1,'foo\n'\n2,'foo\n'\n";
1105        let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1106        assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1107        assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1108        assert_eq!(lines2.next(), None);
1109    }
1110}