polars_io/csv/read/
parser.rs

1use std::path::Path;
2
3use memchr::memchr2_iter;
4use num_traits::Pow;
5use polars_core::prelude::*;
6use polars_core::{POOL, config};
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::select::select_unpredictable;
10use rayon::prelude::*;
11
12use super::CsvParseOptions;
13use super::buffer::Buffer;
14use super::options::{CommentPrefix, NullValuesCompiled};
15use super::splitfields::SplitFields;
16use super::utils::get_file_chunks;
17use crate::path_utils::is_cloud_url;
18use crate::utils::compression::maybe_decompress_bytes;
19
20/// Read the number of rows without parsing columns
21/// useful for count(*) queries
22pub fn count_rows(
23    path: &Path,
24    separator: u8,
25    quote_char: Option<u8>,
26    comment_prefix: Option<&CommentPrefix>,
27    eol_char: u8,
28    has_header: bool,
29) -> PolarsResult<usize> {
30    let file = if is_cloud_url(path) || config::force_async() {
31        feature_gated!("cloud", {
32            crate::file_cache::FILE_CACHE
33                .get_entry(path.to_str().unwrap())
34                // Safety: This was initialized by schema inference.
35                .unwrap()
36                .try_open_assume_latest()?
37        })
38    } else {
39        polars_utils::open_file(path)?
40    };
41
42    let mmap = MMapSemaphore::new_from_file(&file).unwrap();
43    let owned = &mut vec![];
44    let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
45
46    count_rows_from_slice_par(
47        reader_bytes,
48        separator,
49        quote_char,
50        comment_prefix,
51        eol_char,
52        has_header,
53    )
54}
55
56/// Read the number of rows without parsing columns
57/// useful for count(*) queries
58pub fn count_rows_from_slice_par(
59    mut bytes: &[u8],
60    separator: u8,
61    quote_char: Option<u8>,
62    comment_prefix: Option<&CommentPrefix>,
63    eol_char: u8,
64    has_header: bool,
65) -> PolarsResult<usize> {
66    for _ in 0..bytes.len() {
67        if bytes[0] != eol_char {
68            break;
69        }
70
71        bytes = &bytes[1..];
72    }
73
74    const MIN_ROWS_PER_THREAD: usize = 1024;
75    let max_threads = POOL.current_num_threads();
76
77    // Determine if parallelism is beneficial and how many threads
78    let n_threads = get_line_stats(
79        bytes,
80        MIN_ROWS_PER_THREAD,
81        eol_char,
82        None,
83        separator,
84        quote_char,
85    )
86    .map(|(mean, std)| {
87        let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
88        (n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
89    })
90    .unwrap_or(1);
91
92    if n_threads == 1 {
93        return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, has_header);
94    }
95
96    let file_chunks: Vec<(usize, usize)> =
97        get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);
98
99    let iter = file_chunks.into_par_iter().map(|(start, stop)| {
100        let bytes = &bytes[start..stop];
101
102        if comment_prefix.is_some() {
103            SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
104                .filter(|line| !is_comment_line(line, comment_prefix))
105                .count()
106        } else {
107            CountLines::new(quote_char, eol_char).count(bytes).0
108                + bytes.last().is_some_and(|x| *x != b'\n') as usize
109        }
110    });
111
112    let n: usize = POOL.install(|| iter.sum());
113
114    Ok(n - (has_header as usize))
115}
116
117/// Read the number of rows without parsing columns
118pub fn count_rows_from_slice(
119    mut bytes: &[u8],
120    quote_char: Option<u8>,
121    comment_prefix: Option<&CommentPrefix>,
122    eol_char: u8,
123    has_header: bool,
124) -> PolarsResult<usize> {
125    for _ in 0..bytes.len() {
126        if bytes[0] != eol_char {
127            break;
128        }
129
130        bytes = &bytes[1..];
131    }
132
133    let n = if comment_prefix.is_some() {
134        SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
135            .filter(|line| !is_comment_line(line, comment_prefix))
136            .count()
137    } else {
138        CountLines::new(quote_char, eol_char).count(bytes).0
139            + bytes.last().is_some_and(|x| *x != b'\n') as usize
140    };
141
142    Ok(n - (has_header as usize))
143}
144
145/// Skip the utf-8 Byte Order Mark.
146/// credits to csv-core
147pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
148    if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
149        &input[3..]
150    } else {
151        input
152    }
153}
154
155/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
156///
157/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
158#[inline]
159pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
160    match comment_prefix {
161        Some(CommentPrefix::Single(c)) => line.first() == Some(c),
162        Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
163        None => false,
164    }
165}
166
167/// Find the nearest next line position.
168/// Does not check for new line characters embedded in String fields.
169pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
170    let pos = memchr::memchr(eol_char, input)? + 1;
171    if input.len() - pos == 0 {
172        return None;
173    }
174    Some(pos)
175}
176
177pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
178    for _ in 0..skip {
179        if let Some(pos) = next_line_position_naive(input, eol_char) {
180            input = &input[pos..];
181        } else {
182            return input;
183        }
184    }
185    input
186}
187
188/// Find the nearest next line position that is not embedded in a String field.
189pub(super) fn next_line_position(
190    mut input: &[u8],
191    mut expected_fields: Option<usize>,
192    separator: u8,
193    quote_char: Option<u8>,
194    eol_char: u8,
195) -> Option<usize> {
196    fn accept_line(
197        line: &[u8],
198        expected_fields: usize,
199        separator: u8,
200        eol_char: u8,
201        quote_char: Option<u8>,
202    ) -> bool {
203        let mut count = 0usize;
204        for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
205            if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
206                return false;
207            }
208            count += 1;
209        }
210
211        // if the latest field is missing
212        // e.g.:
213        // a,b,c
214        // vala,valb,
215        // SplitFields returns a count that is 1 less
216        // There fore we accept:
217        // expected == count
218        // and
219        // expected == count - 1
220        expected_fields.wrapping_sub(count) <= 1
221    }
222
223    // we check 3 subsequent lines for `accept_line` before we accept
224    // if 3 groups are rejected we reject completely
225    let mut rejected_line_groups = 0u8;
226
227    let mut total_pos = 0;
228    if input.is_empty() {
229        return None;
230    }
231    let mut lines_checked = 0u8;
232    loop {
233        if rejected_line_groups >= 3 {
234            return None;
235        }
236        lines_checked = lines_checked.wrapping_add(1);
237        // headers might have an extra value
238        // So if we have churned through enough lines
239        // we try one field less.
240        if lines_checked == u8::MAX {
241            if let Some(ef) = expected_fields {
242                expected_fields = Some(ef.saturating_sub(1))
243            }
244        };
245        let pos = memchr::memchr(eol_char, input)? + 1;
246        if input.len() - pos == 0 {
247            return None;
248        }
249        debug_assert!(pos <= input.len());
250        let new_input = unsafe { input.get_unchecked(pos..) };
251        let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
252        let line = lines.next();
253
254        match (line, expected_fields) {
255            // count the fields, and determine if they are equal to what we expect from the schema
256            (Some(line), Some(expected_fields)) => {
257                if accept_line(line, expected_fields, separator, eol_char, quote_char) {
258                    let mut valid = true;
259                    for line in lines.take(2) {
260                        if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
261                            valid = false;
262                            break;
263                        }
264                    }
265                    if valid {
266                        return Some(total_pos + pos);
267                    } else {
268                        rejected_line_groups += 1;
269                    }
270                } else {
271                    debug_assert!(pos < input.len());
272                    unsafe {
273                        input = input.get_unchecked(pos + 1..);
274                    }
275                    total_pos += pos + 1;
276                }
277            },
278            // don't count the fields
279            (Some(_), None) => return Some(total_pos + pos),
280            // // no new line found, check latest line (without eol) for number of fields
281            _ => return None,
282        }
283    }
284}
285
286pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
287    b == eol_char || b == b'\r'
288}
289
290pub(super) fn is_whitespace(b: u8) -> bool {
291    b == b' ' || b == b'\t'
292}
293
294#[inline]
295fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
296where
297    F: Fn(u8) -> bool,
298{
299    if input.is_empty() {
300        return input;
301    }
302
303    let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
304    &input[read..]
305}
306
307/// Remove whitespace from the start of buffer.
308/// Makes sure that the bytes stream starts with
309///     'field_1,field_2'
310/// and not with
311///     '\nfield_1,field_1'
312#[inline]
313pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
314    skip_condition(input, is_whitespace)
315}
316
317#[inline]
318pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
319    skip_condition(input, |b| is_line_ending(b, eol_char))
320}
321
322/// Get the mean and standard deviation of length of lines in bytes
323pub(super) fn get_line_stats(
324    bytes: &[u8],
325    n_lines: usize,
326    eol_char: u8,
327    expected_fields: Option<usize>,
328    separator: u8,
329    quote_char: Option<u8>,
330) -> Option<(f32, f32)> {
331    let mut lengths = Vec::with_capacity(n_lines);
332
333    let mut bytes_trunc;
334    let n_lines_per_iter = n_lines / 2;
335
336    let mut n_read = 0;
337
338    // sample from start and 75% in the file
339    for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
340        bytes_trunc = &bytes[offset..];
341        let pos = next_line_position(
342            bytes_trunc,
343            expected_fields,
344            separator,
345            quote_char,
346            eol_char,
347        )?;
348        bytes_trunc = &bytes_trunc[pos + 1..];
349
350        for _ in offset..(offset + n_lines_per_iter) {
351            let pos = next_line_position_naive(bytes_trunc, eol_char)? + 1;
352            n_read += pos;
353            lengths.push(pos);
354            bytes_trunc = &bytes_trunc[pos..];
355        }
356    }
357
358    let n_samples = lengths.len();
359
360    let mean = (n_read as f32) / (n_samples as f32);
361    let mut std = 0.0;
362    for &len in lengths.iter() {
363        std += (len as f32 - mean).pow(2.0)
364    }
365    std = (std / n_samples as f32).sqrt();
366    Some((mean, std))
367}
368
369/// An adapted version of std::iter::Split.
370/// This exists solely because we cannot split the file in lines naively as
371///
372/// ```text
373///    for line in bytes.split(b'\n') {
374/// ```
375///
376/// This will fail when strings fields are have embedded end line characters.
377/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
378pub(super) struct SplitLines<'a> {
379    v: &'a [u8],
380    quote_char: u8,
381    eol_char: u8,
382    #[cfg(feature = "simd")]
383    simd_eol_char: SimdVec,
384    #[cfg(feature = "simd")]
385    simd_quote_char: SimdVec,
386    #[cfg(feature = "simd")]
387    previous_valid_eols: u64,
388    total_index: usize,
389    quoting: bool,
390    comment_prefix: Option<&'a CommentPrefix>,
391}
392
393#[cfg(feature = "simd")]
394const SIMD_SIZE: usize = 64;
395#[cfg(feature = "simd")]
396use std::simd::prelude::*;
397
398#[cfg(feature = "simd")]
399use polars_utils::clmul::prefix_xorsum_inclusive;
400
401#[cfg(feature = "simd")]
402type SimdVec = u8x64;
403
404impl<'a> SplitLines<'a> {
405    pub(super) fn new(
406        slice: &'a [u8],
407        quote_char: Option<u8>,
408        eol_char: u8,
409        comment_prefix: Option<&'a CommentPrefix>,
410    ) -> Self {
411        let quoting = quote_char.is_some();
412        let quote_char = quote_char.unwrap_or(b'\"');
413        #[cfg(feature = "simd")]
414        let simd_eol_char = SimdVec::splat(eol_char);
415        #[cfg(feature = "simd")]
416        let simd_quote_char = SimdVec::splat(quote_char);
417        Self {
418            v: slice,
419            quote_char,
420            eol_char,
421            #[cfg(feature = "simd")]
422            simd_eol_char,
423            #[cfg(feature = "simd")]
424            simd_quote_char,
425            #[cfg(feature = "simd")]
426            previous_valid_eols: 0,
427            total_index: 0,
428            quoting,
429            comment_prefix,
430        }
431    }
432}
433
434impl<'a> SplitLines<'a> {
435    // scalar as in non-simd
436    fn next_scalar(&mut self) -> Option<&'a [u8]> {
437        if self.v.is_empty() {
438            return None;
439        }
440        if is_comment_line(self.v, self.comment_prefix) {
441            return self.next_comment_line();
442        }
443        {
444            let mut pos = 0u32;
445            let mut iter = self.v.iter();
446            let mut in_field = false;
447            loop {
448                match iter.next() {
449                    Some(&c) => {
450                        pos += 1;
451
452                        if self.quoting && c == self.quote_char {
453                            // toggle between string field enclosure
454                            //      if we encounter a starting '"' -> in_field = true;
455                            //      if we encounter a closing '"' -> in_field = false;
456                            in_field = !in_field;
457                        }
458                        // if we are not in a string and we encounter '\n' we can stop at this position.
459                        else if c == self.eol_char && !in_field {
460                            break;
461                        }
462                    },
463                    None => {
464                        let remainder = self.v;
465                        self.v = &[];
466                        return Some(remainder);
467                    },
468                }
469            }
470
471            unsafe {
472                debug_assert!((pos as usize) <= self.v.len());
473
474                // return line up to this position
475                let ret = Some(
476                    self.v
477                        .get_unchecked(..(self.total_index + pos as usize - 1)),
478                );
479                // skip the '\n' token and update slice.
480                self.v = self.v.get_unchecked(self.total_index + pos as usize..);
481                ret
482            }
483        }
484    }
485    fn next_comment_line(&mut self) -> Option<&'a [u8]> {
486        if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
487            unsafe {
488                // return line up to this position
489                let ret = Some(self.v.get_unchecked(..(pos - 1)));
490                // skip the '\n' token and update slice.
491                self.v = self.v.get_unchecked(pos..);
492                ret
493            }
494        } else {
495            let remainder = self.v;
496            self.v = &[];
497            Some(remainder)
498        }
499    }
500}
501
502impl<'a> Iterator for SplitLines<'a> {
503    type Item = &'a [u8];
504
505    #[inline]
506    #[cfg(not(feature = "simd"))]
507    fn next(&mut self) -> Option<&'a [u8]> {
508        self.next_scalar()
509    }
510
511    #[inline]
512    #[cfg(feature = "simd")]
513    fn next(&mut self) -> Option<&'a [u8]> {
514        // First check cached value
515        if self.previous_valid_eols != 0 {
516            let pos = self.previous_valid_eols.trailing_zeros() as usize;
517            self.previous_valid_eols >>= (pos + 1) as u64;
518
519            unsafe {
520                debug_assert!((pos) <= self.v.len());
521
522                // return line up to this position
523                let ret = Some(self.v.get_unchecked(..pos));
524                // skip the '\n' token and update slice.
525                self.v = self.v.get_unchecked(pos + 1..);
526                return ret;
527            }
528        }
529        if self.v.is_empty() {
530            return None;
531        }
532        if self.comment_prefix.is_some() {
533            return self.next_scalar();
534        }
535
536        self.total_index = 0;
537        let mut not_in_field_previous_iter = true;
538
539        loop {
540            let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
541            if bytes.len() > SIMD_SIZE {
542                let lane: [u8; SIMD_SIZE] = unsafe {
543                    bytes
544                        .get_unchecked(0..SIMD_SIZE)
545                        .try_into()
546                        .unwrap_unchecked()
547                };
548                let simd_bytes = SimdVec::from(lane);
549                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
550
551                let valid_eols = if self.quoting {
552                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
553                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
554
555                    if not_in_field_previous_iter {
556                        not_in_quote_field = !not_in_quote_field;
557                    }
558                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
559                    eol_mask & not_in_quote_field
560                } else {
561                    eol_mask
562                };
563
564                if valid_eols != 0 {
565                    let pos = valid_eols.trailing_zeros() as usize;
566                    if pos == SIMD_SIZE - 1 {
567                        self.previous_valid_eols = 0;
568                    } else {
569                        self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
570                    }
571
572                    unsafe {
573                        let pos = self.total_index + pos;
574                        debug_assert!((pos) <= self.v.len());
575
576                        // return line up to this position
577                        let ret = Some(self.v.get_unchecked(..pos));
578                        // skip the '\n' token and update slice.
579                        self.v = self.v.get_unchecked(pos + 1..);
580                        return ret;
581                    }
582                } else {
583                    self.total_index += SIMD_SIZE;
584                }
585            } else {
586                // Denotes if we are in a string field, started with a quote
587                let mut in_field = !not_in_field_previous_iter;
588                let mut pos = 0u32;
589                let mut iter = bytes.iter();
590                loop {
591                    match iter.next() {
592                        Some(&c) => {
593                            pos += 1;
594
595                            if self.quoting && c == self.quote_char {
596                                // toggle between string field enclosure
597                                //      if we encounter a starting '"' -> in_field = true;
598                                //      if we encounter a closing '"' -> in_field = false;
599                                in_field = !in_field;
600                            }
601                            // if we are not in a string and we encounter '\n' we can stop at this position.
602                            else if c == self.eol_char && !in_field {
603                                break;
604                            }
605                        },
606                        None => {
607                            let remainder = self.v;
608                            self.v = &[];
609                            return Some(remainder);
610                        },
611                    }
612                }
613
614                unsafe {
615                    debug_assert!((pos as usize) <= self.v.len());
616
617                    // return line up to this position
618                    let ret = Some(
619                        self.v
620                            .get_unchecked(..(self.total_index + pos as usize - 1)),
621                    );
622                    // skip the '\n' token and update slice.
623                    self.v = self.v.get_unchecked(self.total_index + pos as usize..);
624                    return ret;
625                }
626            }
627        }
628    }
629}
630
631pub struct CountLines {
632    quote_char: u8,
633    eol_char: u8,
634    #[cfg(feature = "simd")]
635    simd_eol_char: SimdVec,
636    #[cfg(feature = "simd")]
637    simd_quote_char: SimdVec,
638    quoting: bool,
639}
640
641#[derive(Copy, Clone, Debug)]
642pub struct LineStats {
643    newline_count: usize,
644    last_newline_offset: usize,
645    end_inside_string: bool,
646}
647
648impl CountLines {
649    pub fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
650        let quoting = quote_char.is_some();
651        let quote_char = quote_char.unwrap_or(b'\"');
652        #[cfg(feature = "simd")]
653        let simd_eol_char = SimdVec::splat(eol_char);
654        #[cfg(feature = "simd")]
655        let simd_quote_char = SimdVec::splat(quote_char);
656        Self {
657            quote_char,
658            eol_char,
659            #[cfg(feature = "simd")]
660            simd_eol_char,
661            #[cfg(feature = "simd")]
662            simd_quote_char,
663            quoting,
664        }
665    }
666
667    /// Analyzes a chunk of CSV data.
668    ///
669    /// Returns (newline_count, last_newline_offset, end_inside_string) twice,
670    /// the first is assuming the start of the chunk is *not* inside a string,
671    /// the second assuming the start is inside a string.
672    pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
673        let mut scan_offset = 0;
674        let mut states = [
675            LineStats {
676                newline_count: 0,
677                last_newline_offset: 0,
678                end_inside_string: false,
679            },
680            LineStats {
681                newline_count: 0,
682                last_newline_offset: 0,
683                end_inside_string: false,
684            },
685        ];
686
687        // false if even number of quotes seen so far, true otherwise.
688        #[allow(unused_assignments)]
689        let mut global_quote_parity = false;
690
691        #[cfg(feature = "simd")]
692        {
693            // 0 if even number of quotes seen so far, u64::MAX otherwise.
694            let mut global_quote_parity_mask = 0;
695            while scan_offset + 64 <= bytes.len() {
696                let block: [u8; 64] = unsafe {
697                    bytes
698                        .get_unchecked(scan_offset..scan_offset + 64)
699                        .try_into()
700                        .unwrap_unchecked()
701                };
702                let simd_bytes = SimdVec::from(block);
703                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
704                if self.quoting {
705                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
706                    let quote_parity =
707                        prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
708                    global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
709
710                    let start_outside_string_eol_mask = eol_mask & !quote_parity;
711                    states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
712                    states[0].last_newline_offset = select_unpredictable(
713                        start_outside_string_eol_mask != 0,
714                        (scan_offset + 63)
715                            .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
716                        states[0].last_newline_offset,
717                    );
718
719                    let start_inside_string_eol_mask = eol_mask & quote_parity;
720                    states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
721                    states[1].last_newline_offset = select_unpredictable(
722                        start_inside_string_eol_mask != 0,
723                        (scan_offset + 63)
724                            .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
725                        states[1].last_newline_offset,
726                    );
727                } else {
728                    states[0].newline_count += eol_mask.count_ones() as usize;
729                    states[0].last_newline_offset = select_unpredictable(
730                        eol_mask != 0,
731                        (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
732                        states[0].last_newline_offset,
733                    );
734                }
735
736                scan_offset += 64;
737            }
738
739            global_quote_parity = global_quote_parity_mask > 0;
740        }
741
742        while scan_offset < bytes.len() {
743            let c = unsafe { *bytes.get_unchecked(scan_offset) };
744            global_quote_parity ^= (c == self.quote_char) & self.quoting;
745
746            let state = &mut states[global_quote_parity as usize];
747            state.newline_count += (c == self.eol_char) as usize;
748            state.last_newline_offset =
749                select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
750
751            scan_offset += 1;
752        }
753
754        states[0].end_inside_string = global_quote_parity;
755        states[1].end_inside_string = !global_quote_parity;
756        states
757    }
758
759    pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
760        loop {
761            let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
762
763            let (count, offset) = self.count(b);
764
765            if count > 0 || b.len() == bytes.len() {
766                return (count, offset);
767            }
768
769            *chunk_size *= 2;
770        }
771    }
772
773    /// Returns count and offset to split for remainder in slice.
774    #[cfg(feature = "simd")]
775    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
776        let mut total_idx = 0;
777        let original_bytes = bytes;
778        let mut count = 0;
779        let mut position = 0;
780        let mut not_in_field_previous_iter = true;
781
782        loop {
783            let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
784
785            if bytes.len() > SIMD_SIZE {
786                let lane: [u8; SIMD_SIZE] = unsafe {
787                    bytes
788                        .get_unchecked(0..SIMD_SIZE)
789                        .try_into()
790                        .unwrap_unchecked()
791                };
792                let simd_bytes = SimdVec::from(lane);
793                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
794
795                let valid_eols = if self.quoting {
796                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
797                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
798
799                    if not_in_field_previous_iter {
800                        not_in_quote_field = !not_in_quote_field;
801                    }
802                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
803                    eol_mask & not_in_quote_field
804                } else {
805                    eol_mask
806                };
807
808                if valid_eols != 0 {
809                    count += valid_eols.count_ones() as usize;
810                    position = total_idx + 63 - valid_eols.leading_zeros() as usize;
811                    debug_assert_eq!(original_bytes[position], self.eol_char)
812                }
813                total_idx += SIMD_SIZE;
814            } else if bytes.is_empty() {
815                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
816                return (count, position);
817            } else {
818                let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
819
820                let (count, position) = if c > 0 {
821                    (count + c, total_idx + o)
822                } else {
823                    (count, position)
824                };
825                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
826
827                return (count, position);
828            }
829        }
830    }
831
832    #[cfg(not(feature = "simd"))]
833    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
834        self.count_no_simd(bytes, false)
835    }
836
837    fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
838        let iter = bytes.iter();
839        let mut in_field = in_field;
840        let mut count = 0;
841        let mut position = 0;
842
843        for b in iter {
844            let c = *b;
845            if self.quoting && c == self.quote_char {
846                // toggle between string field enclosure
847                //      if we encounter a starting '"' -> in_field = true;
848                //      if we encounter a closing '"' -> in_field = false;
849                in_field = !in_field;
850            }
851            // If we are not in a string and we encounter '\n' we can stop at this position.
852            else if c == self.eol_char && !in_field {
853                position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
854                count += 1;
855            }
856        }
857        debug_assert!(count == 0 || bytes[position] == self.eol_char);
858
859        (count, position)
860    }
861}
862
863#[inline]
864fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
865    let mut in_field = false;
866
867    let mut idx = 0u32;
868    // micro optimizations
869    #[allow(clippy::explicit_counter_loop)]
870    for &c in bytes.iter() {
871        if c == quote_char {
872            // toggle between string field enclosure
873            //      if we encounter a starting '"' -> in_field = true;
874            //      if we encounter a closing '"' -> in_field = false;
875            in_field = !in_field;
876        }
877
878        if !in_field && c == needle {
879            return Some(idx as usize);
880        }
881        idx += 1;
882    }
883    None
884}
885
886#[inline]
887pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
888    let pos = match quote {
889        Some(quote) => find_quoted(bytes, quote, eol_char),
890        None => bytes.iter().position(|x| *x == eol_char),
891    };
892    match pos {
893        None => &[],
894        Some(pos) => &bytes[pos + 1..],
895    }
896}
897
898#[inline]
899pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
900    if let Some(pos) = next_line_position_naive(input, eol_char) {
901        unsafe { input.get_unchecked(pos..) }
902    } else {
903        &[]
904    }
905}
906
907/// Parse CSV.
908///
909/// # Arguments
910/// * `bytes` - input to parse
911/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
912///   thread has a different offset.
913/// * `projection` - Indices of the columns to project.
914/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
915///   fields are written to the buffers. The UTF8 data will be parsed later.
916#[allow(clippy::too_many_arguments)]
917pub(super) fn parse_lines(
918    mut bytes: &[u8],
919    parse_options: &CsvParseOptions,
920    offset: usize,
921    ignore_errors: bool,
922    null_values: Option<&NullValuesCompiled>,
923    projection: &[usize],
924    buffers: &mut [Buffer],
925    n_lines: usize,
926    // length of original schema
927    schema_len: usize,
928    schema: &Schema,
929) -> PolarsResult<usize> {
930    assert!(
931        !projection.is_empty(),
932        "at least one column should be projected"
933    );
934    let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
935    // During projection pushdown we are not checking other csv fields.
936    // This would be very expensive and we don't care as we only want
937    // the projected columns.
938    if projection.len() != schema_len {
939        truncate_ragged_lines = true
940    }
941
942    // we use the pointers to track the no of bytes read.
943    let start = bytes.as_ptr() as usize;
944    let original_bytes_len = bytes.len();
945    let n_lines = n_lines as u32;
946
947    let mut line_count = 0u32;
948    loop {
949        if line_count > n_lines {
950            let end = bytes.as_ptr() as usize;
951            return Ok(end - start);
952        }
953
954        if bytes.is_empty() {
955            return Ok(original_bytes_len);
956        } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
957            // deal with comments
958            let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
959            bytes = bytes_rem;
960            continue;
961        }
962
963        // Every line we only need to parse the columns that are projected.
964        // Therefore we check if the idx of the field is in our projected columns.
965        // If it is not, we skip the field.
966        let mut projection_iter = projection.iter().copied();
967        let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
968        let mut processed_fields = 0;
969
970        let mut iter = SplitFields::new(
971            bytes,
972            parse_options.separator,
973            parse_options.quote_char,
974            parse_options.eol_char,
975        );
976        let mut idx = 0u32;
977        let mut read_sol = 0;
978        loop {
979            match iter.next() {
980                // end of line
981                None => {
982                    bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
983                    break;
984                },
985                Some((mut field, needs_escaping)) => {
986                    let field_len = field.len();
987
988                    // +1 is the split character that is consumed by the iterator.
989                    read_sol += field_len + 1;
990
991                    if idx == next_projected as u32 {
992                        // the iterator is finished when it encounters a `\n`
993                        // this could be preceded by a '\r'
994                        unsafe {
995                            if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
996                                field = field.get_unchecked(..field_len - 1);
997                            }
998                        }
999
1000                        debug_assert!(processed_fields < buffers.len());
1001                        let buf = unsafe {
1002                            // SAFETY: processed fields index can never exceed the projection indices.
1003                            buffers.get_unchecked_mut(processed_fields)
1004                        };
1005                        let mut add_null = false;
1006
1007                        // if we have null values argument, check if this field equal null value
1008                        if let Some(null_values) = null_values {
1009                            let field = if needs_escaping && !field.is_empty() {
1010                                unsafe { field.get_unchecked(1..field.len() - 1) }
1011                            } else {
1012                                field
1013                            };
1014
1015                            // SAFETY:
1016                            // process fields is in bounds
1017                            add_null = unsafe { null_values.is_null(field, idx as usize) }
1018                        }
1019                        if add_null {
1020                            buf.add_null(!parse_options.missing_is_null && field.is_empty())
1021                        } else {
1022                            buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1023                                .map_err(|e| {
1024                                    let bytes_offset = offset + field.as_ptr() as usize - start;
1025                                    let unparsable = String::from_utf8_lossy(field);
1026                                    let column_name = schema.get_at_index(idx as usize).unwrap().0;
1027                                    polars_err!(
1028                                        ComputeError:
1029                                        "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1030                                        The current offset in the file is {} bytes.\n\
1031                                        \n\
1032                                        You might want to try:\n\
1033                                        - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1034                                        - specifying correct dtype with the `schema_overrides` argument\n\
1035                                        - setting `ignore_errors` to `True`,\n\
1036                                        - adding `{}` to the `null_values` list.\n\n\
1037                                        Original error: ```{}```",
1038                                        &unparsable,
1039                                        buf.dtype(),
1040                                        column_name,
1041                                        idx + 1,
1042                                        bytes_offset,
1043                                        &unparsable,
1044                                        e
1045                                    )
1046                                })?;
1047                        }
1048                        processed_fields += 1;
1049
1050                        // if we have all projected columns we are done with this line
1051                        match projection_iter.next() {
1052                            Some(p) => next_projected = p,
1053                            None => {
1054                                if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1055                                    bytes = &bytes[read_sol..];
1056                                } else {
1057                                    if !truncate_ragged_lines && read_sol < bytes.len() {
1058                                        polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1059
1060Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1061                                    }
1062                                    let bytes_rem = skip_this_line(
1063                                        unsafe { bytes.get_unchecked(read_sol - 1..) },
1064                                        parse_options.quote_char,
1065                                        parse_options.eol_char,
1066                                    );
1067                                    bytes = bytes_rem;
1068                                }
1069                                break;
1070                            },
1071                        }
1072                    }
1073                    idx += 1;
1074                },
1075            }
1076        }
1077
1078        // there can be lines that miss fields (also the comma values)
1079        // this means the splitter won't process them.
1080        // We traverse them to read them as null values.
1081        while processed_fields < projection.len() {
1082            debug_assert!(processed_fields < buffers.len());
1083            let buf = unsafe {
1084                // SAFETY: processed fields index can never exceed the projection indices.
1085                buffers.get_unchecked_mut(processed_fields)
1086            };
1087            buf.add_null(!parse_options.missing_is_null);
1088            processed_fields += 1;
1089        }
1090        line_count += 1;
1091    }
1092}
1093
1094#[cfg(test)]
1095mod test {
1096    use super::SplitLines;
1097
1098    #[test]
1099    fn test_splitlines() {
1100        let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1101        let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1102        assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1103        assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1104        assert_eq!(lines.next(), None);
1105
1106        let input2 = "1,'foo\n'\n2,'foo\n'\n";
1107        let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1108        assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1109        assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1110        assert_eq!(lines2.next(), None);
1111    }
1112}