1use std::cmp;
2
3use memchr::memchr2_iter;
4use polars_buffer::Buffer;
5use polars_core::POOL;
6use polars_core::prelude::*;
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::pl_path::PlRefPath;
10use polars_utils::select::select_unpredictable;
11use rayon::prelude::*;
12
13use super::CsvParseOptions;
14use super::builder::Builder;
15use super::options::{CommentPrefix, NullValuesCompiled};
16use super::splitfields::SplitFields;
17use crate::prelude::CsvReadOptions;
18use crate::prelude::streaming::read_until_start_and_infer_schema;
19use crate::utils::compression::ByteSourceReader;
20use crate::utils::stream_buf_reader::ReaderSource;
21
22#[allow(clippy::too_many_arguments)]
25pub fn count_rows(
26 path: PlRefPath,
27 quote_char: Option<u8>,
28 comment_prefix: Option<&CommentPrefix>,
29 eol_char: u8,
30 has_header: bool,
31 skip_lines: usize,
32 skip_rows_before_header: usize,
33 skip_rows_after_header: usize,
34 raise_if_empty: bool,
35) -> PolarsResult<usize> {
36 let file = if path.has_scheme() || polars_config::config().force_async() {
37 feature_gated!("cloud", {
38 crate::file_cache::FILE_CACHE
39 .get_entry(path)
40 .unwrap()
42 .try_open_assume_latest()?
43 })
44 } else {
45 polars_utils::open_file(path.as_std_path())?
46 };
47
48 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
49
50 count_rows_from_slice_par(
51 Buffer::from_owner(mmap),
52 quote_char,
53 comment_prefix,
54 eol_char,
55 has_header,
56 skip_lines,
57 skip_rows_before_header,
58 skip_rows_after_header,
59 raise_if_empty,
60 )
61}
62
63#[allow(clippy::too_many_arguments)]
67pub fn count_rows_from_reader_par(
68 mut reader: ByteSourceReader<ReaderSource>,
69 quote_char: Option<u8>,
70 comment_prefix: Option<&CommentPrefix>,
71 eol_char: u8,
72 has_header: bool,
73 skip_lines: usize,
74 skip_rows_before_header: usize,
75 skip_rows_after_header: usize,
76 raise_if_empty: bool,
77 decompressed_size_hint: Option<usize>,
78) -> PolarsResult<usize> {
79 let reader_options = CsvReadOptions {
80 parse_options: Arc::new(CsvParseOptions {
81 quote_char,
82 comment_prefix: comment_prefix.cloned(),
83 eol_char,
84 ..Default::default()
85 }),
86 has_header,
87 skip_lines,
88 skip_rows: skip_rows_before_header,
89 skip_rows_after_header,
90 raise_if_empty,
91 ..Default::default()
92 };
93
94 let (_, mut leftover) = read_until_start_and_infer_schema(
95 &reader_options,
96 None,
97 decompressed_size_hint,
98 None,
99 &mut reader,
100 )?;
101
102 const BYTES_PER_CHUNK: usize = if cfg!(debug_assertions) {
103 128
104 } else {
105 512 * 1024
106 };
107
108 let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
109 POOL.install(|| {
110 let mut states = Vec::new();
111 let eof_unterminated_row;
112
113 if comment_prefix.is_none() {
114 let mut last_slice = Buffer::new();
115 let mut err = None;
116
117 let streaming_iter = std::iter::from_fn(|| {
118 let (slice, read_n) =
119 match reader.read_next_slice(&leftover, BYTES_PER_CHUNK, Some(BYTES_PER_CHUNK))
120 {
121 Ok(tup) => tup,
122 Err(e) => {
123 err = Some(e);
124 return None;
125 },
126 };
127
128 leftover = Buffer::new();
129 if slice.is_empty() && read_n == 0 {
130 return None;
131 }
132
133 last_slice = slice.clone();
134 Some(slice)
135 });
136
137 states = streaming_iter
138 .enumerate()
139 .par_bridge()
140 .map(|(id, slice)| (count.analyze_chunk(&slice), id))
141 .collect::<Vec<_>>();
142
143 if let Some(e) = err {
144 return Err(e.into());
145 }
146
147 states.sort_by_key(|(_, id)| *id);
150
151 eof_unterminated_row = ends_in_unterminated_row(&last_slice, eol_char, comment_prefix);
154 } else {
155 let (bytes, _) =
158 reader.read_next_slice(&leftover, usize::MAX, decompressed_size_hint)?;
159
160 let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
161 (0..num_chunks)
162 .into_par_iter()
163 .map(|chunk_idx| {
164 let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
165 let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
166
167 if start_offset != 0 {
168 if let Some(nl_off) = bytes[start_offset..next_start_offset]
170 .iter()
171 .position(|b| *b == eol_char)
172 {
173 start_offset += nl_off + 1;
174 } else {
175 return (count.analyze_chunk(&[]), 0);
176 }
177 }
178
179 let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
180 .iter()
181 .position(|b| *b == eol_char)
182 {
183 next_start_offset + nl_off + 1
184 } else {
185 bytes.len()
186 };
187
188 (count.analyze_chunk(&bytes[start_offset..stop_offset]), 0)
189 })
190 .collect_into_vec(&mut states);
191
192 eof_unterminated_row = ends_in_unterminated_row(&bytes, eol_char, comment_prefix);
193 }
194
195 let mut n = 0;
196 let mut in_string = false;
197 for (pair, _) in states {
198 n += pair[in_string as usize].newline_count;
199 in_string = pair[in_string as usize].end_inside_string;
200 }
201 n += eof_unterminated_row as usize;
202 Ok(n)
203 })
204}
205
206#[allow(clippy::too_many_arguments)]
210pub fn count_rows_from_slice_par(
211 buffer: Buffer<u8>,
212 quote_char: Option<u8>,
213 comment_prefix: Option<&CommentPrefix>,
214 eol_char: u8,
215 has_header: bool,
216 skip_lines: usize,
217 skip_rows_before_header: usize,
218 skip_rows_after_header: usize,
219 raise_if_empty: bool,
220) -> PolarsResult<usize> {
221 const ASSUMED_COMPRESSION_RATIO: usize = 4;
222
223 let buffer_len = buffer.len();
224 let reader = ByteSourceReader::from_memory(buffer)?;
225 let decompressed_size_hint = Some(
226 buffer_len
227 * reader
228 .compression()
229 .map_or(1, |_| ASSUMED_COMPRESSION_RATIO),
230 );
231
232 count_rows_from_reader_par(
233 reader,
234 quote_char,
235 comment_prefix,
236 eol_char,
237 has_header,
238 skip_lines,
239 skip_rows_before_header,
240 skip_rows_after_header,
241 raise_if_empty,
242 decompressed_size_hint,
243 )
244}
245
246#[inline]
250pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
251 match comment_prefix {
252 Some(CommentPrefix::Single(c)) => line.first() == Some(c),
253 Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
254 None => false,
255 }
256}
257
258pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
261 let pos = memchr::memchr(eol_char, input)? + 1;
262 if input.len() - pos == 0 {
263 return None;
264 }
265 Some(pos)
266}
267
268pub(super) fn next_line_position(
270 mut input: &[u8],
271 mut expected_fields: Option<usize>,
272 separator: u8,
273 quote_char: Option<u8>,
274 eol_char: u8,
275) -> Option<usize> {
276 fn accept_line(
277 line: &[u8],
278 expected_fields: usize,
279 separator: u8,
280 eol_char: u8,
281 quote_char: Option<u8>,
282 ) -> bool {
283 let mut count = 0usize;
284 for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
285 if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
286 return false;
287 }
288 count += 1;
289 }
290
291 expected_fields.wrapping_sub(count) <= 1
301 }
302
303 let mut rejected_line_groups = 0u8;
306
307 let mut total_pos = 0;
308 if input.is_empty() {
309 return None;
310 }
311 let mut lines_checked = 0u8;
312 loop {
313 if rejected_line_groups >= 3 {
314 return None;
315 }
316 lines_checked = lines_checked.wrapping_add(1);
317 if lines_checked == u8::MAX {
321 if let Some(ef) = expected_fields {
322 expected_fields = Some(ef.saturating_sub(1))
323 }
324 };
325 let pos = memchr::memchr(eol_char, input)? + 1;
326 if input.len() - pos == 0 {
327 return None;
328 }
329 debug_assert!(pos <= input.len());
330 let new_input = unsafe { input.get_unchecked(pos..) };
331 let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
332 let line = lines.next();
333
334 match (line, expected_fields) {
335 (Some(line), Some(expected_fields)) => {
337 if accept_line(line, expected_fields, separator, eol_char, quote_char) {
338 let mut valid = true;
339 for line in lines.take(2) {
340 if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
341 valid = false;
342 break;
343 }
344 }
345 if valid {
346 return Some(total_pos + pos);
347 } else {
348 rejected_line_groups += 1;
349 }
350 } else {
351 debug_assert!(pos < input.len());
352 unsafe {
353 input = input.get_unchecked(pos + 1..);
354 }
355 total_pos += pos + 1;
356 }
357 },
358 (Some(_), None) => return Some(total_pos + pos),
360 _ => return None,
362 }
363 }
364}
365
366#[inline(always)]
367pub(super) fn is_whitespace(b: u8) -> bool {
368 b == b' ' || b == b'\t'
369}
370
371#[inline(always)]
373pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
374 b <= 32
379}
380
381#[inline]
382fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
383where
384 F: Fn(u8) -> bool,
385{
386 if input.is_empty() {
387 return input;
388 }
389
390 let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
391 &input[read..]
392}
393
394#[inline]
400pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
401 skip_condition(input, is_whitespace)
402}
403
404pub struct SplitLines<'a> {
414 v: &'a [u8],
415 quote_char: u8,
416 eol_char: u8,
417 #[cfg(feature = "simd")]
418 simd_eol_char: SimdVec,
419 #[cfg(feature = "simd")]
420 simd_quote_char: SimdVec,
421 #[cfg(feature = "simd")]
422 previous_valid_eols: u64,
423 total_index: usize,
424 quoting: bool,
425 comment_prefix: Option<&'a CommentPrefix>,
426}
427
428#[cfg(feature = "simd")]
429const SIMD_SIZE: usize = 64;
430#[cfg(feature = "simd")]
431use std::simd::prelude::*;
432
433#[cfg(feature = "simd")]
434use polars_utils::clmul::prefix_xorsum_inclusive;
435
436#[cfg(feature = "simd")]
437type SimdVec = u8x64;
438
439impl<'a> SplitLines<'a> {
440 pub fn new(
441 slice: &'a [u8],
442 quote_char: Option<u8>,
443 eol_char: u8,
444 comment_prefix: Option<&'a CommentPrefix>,
445 ) -> Self {
446 let quoting = quote_char.is_some();
447 let quote_char = quote_char.unwrap_or(b'\"');
448 #[cfg(feature = "simd")]
449 let simd_eol_char = SimdVec::splat(eol_char);
450 #[cfg(feature = "simd")]
451 let simd_quote_char = SimdVec::splat(quote_char);
452 Self {
453 v: slice,
454 quote_char,
455 eol_char,
456 #[cfg(feature = "simd")]
457 simd_eol_char,
458 #[cfg(feature = "simd")]
459 simd_quote_char,
460 #[cfg(feature = "simd")]
461 previous_valid_eols: 0,
462 total_index: 0,
463 quoting,
464 comment_prefix,
465 }
466 }
467}
468
469impl<'a> SplitLines<'a> {
470 fn next_scalar(&mut self) -> Option<&'a [u8]> {
472 if self.v.is_empty() {
473 return None;
474 }
475 if is_comment_line(self.v, self.comment_prefix) {
476 return self.next_comment_line();
477 }
478 {
479 let mut pos = 0u32;
480 let mut iter = self.v.iter();
481 let mut in_field = false;
482 loop {
483 match iter.next() {
484 Some(&c) => {
485 pos += 1;
486
487 if self.quoting && c == self.quote_char {
488 in_field = !in_field;
492 }
493 else if c == self.eol_char && !in_field {
495 break;
496 }
497 },
498 None => {
499 let remainder = self.v;
500 self.v = &[];
501 return Some(remainder);
502 },
503 }
504 }
505
506 unsafe {
507 debug_assert!((pos as usize) <= self.v.len());
508
509 let ret = Some(
511 self.v
512 .get_unchecked(..(self.total_index + pos as usize - 1)),
513 );
514 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
516 ret
517 }
518 }
519 }
520 fn next_comment_line(&mut self) -> Option<&'a [u8]> {
521 if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
522 unsafe {
523 let ret = Some(self.v.get_unchecked(..(pos - 1)));
525 self.v = self.v.get_unchecked(pos..);
527 ret
528 }
529 } else {
530 let remainder = self.v;
531 self.v = &[];
532 Some(remainder)
533 }
534 }
535}
536
537impl<'a> Iterator for SplitLines<'a> {
538 type Item = &'a [u8];
539
540 #[inline]
541 #[cfg(not(feature = "simd"))]
542 fn next(&mut self) -> Option<&'a [u8]> {
543 self.next_scalar()
544 }
545
546 #[inline]
547 #[cfg(feature = "simd")]
548 fn next(&mut self) -> Option<&'a [u8]> {
549 if self.previous_valid_eols != 0 {
551 let pos = self.previous_valid_eols.trailing_zeros() as usize;
552 self.previous_valid_eols >>= (pos + 1) as u64;
553
554 unsafe {
555 debug_assert!((pos) <= self.v.len());
556
557 let ret = Some(self.v.get_unchecked(..pos));
559 self.v = self.v.get_unchecked(pos + 1..);
561 return ret;
562 }
563 }
564 if self.v.is_empty() {
565 return None;
566 }
567 if self.comment_prefix.is_some() {
568 return self.next_scalar();
569 }
570
571 self.total_index = 0;
572 let mut not_in_field_previous_iter = true;
573
574 loop {
575 let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
576 if bytes.len() > SIMD_SIZE {
577 let lane: [u8; SIMD_SIZE] = unsafe {
578 bytes
579 .get_unchecked(0..SIMD_SIZE)
580 .try_into()
581 .unwrap_unchecked()
582 };
583 let simd_bytes = SimdVec::from(lane);
584 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
585
586 let valid_eols = if self.quoting {
587 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
588 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
589
590 if not_in_field_previous_iter {
591 not_in_quote_field = !not_in_quote_field;
592 }
593 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
594 eol_mask & not_in_quote_field
595 } else {
596 eol_mask
597 };
598
599 if valid_eols != 0 {
600 let pos = valid_eols.trailing_zeros() as usize;
601 if pos == SIMD_SIZE - 1 {
602 self.previous_valid_eols = 0;
603 } else {
604 self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
605 }
606
607 unsafe {
608 let pos = self.total_index + pos;
609 debug_assert!((pos) <= self.v.len());
610
611 let ret = Some(self.v.get_unchecked(..pos));
613 self.v = self.v.get_unchecked(pos + 1..);
615 return ret;
616 }
617 } else {
618 self.total_index += SIMD_SIZE;
619 }
620 } else {
621 let mut in_field = !not_in_field_previous_iter;
623 let mut pos = 0u32;
624 let mut iter = bytes.iter();
625 loop {
626 match iter.next() {
627 Some(&c) => {
628 pos += 1;
629
630 if self.quoting && c == self.quote_char {
631 in_field = !in_field;
635 }
636 else if c == self.eol_char && !in_field {
638 break;
639 }
640 },
641 None => {
642 let remainder = self.v;
643 self.v = &[];
644 return Some(remainder);
645 },
646 }
647 }
648
649 unsafe {
650 debug_assert!((pos as usize) <= self.v.len());
651
652 let ret = Some(
654 self.v
655 .get_unchecked(..(self.total_index + pos as usize - 1)),
656 );
657 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
659 return ret;
660 }
661 }
662 }
663 }
664}
665
666pub struct CountLines {
667 quote_char: u8,
668 eol_char: u8,
669 #[cfg(feature = "simd")]
670 simd_eol_char: SimdVec,
671 #[cfg(feature = "simd")]
672 simd_quote_char: SimdVec,
673 quoting: bool,
674 comment_prefix: Option<CommentPrefix>,
675}
676
677#[derive(Copy, Clone, Debug, Default)]
678pub struct LineStats {
679 pub newline_count: usize,
680 pub last_newline_offset: usize,
681 pub end_inside_string: bool,
682}
683
684impl CountLines {
685 pub fn new(
686 quote_char: Option<u8>,
687 eol_char: u8,
688 comment_prefix: Option<CommentPrefix>,
689 ) -> Self {
690 let quoting = quote_char.is_some();
691 let quote_char = quote_char.unwrap_or(b'\"');
692 #[cfg(feature = "simd")]
693 let simd_eol_char = SimdVec::splat(eol_char);
694 #[cfg(feature = "simd")]
695 let simd_quote_char = SimdVec::splat(quote_char);
696 Self {
697 quote_char,
698 eol_char,
699 #[cfg(feature = "simd")]
700 simd_eol_char,
701 #[cfg(feature = "simd")]
702 simd_quote_char,
703 quoting,
704 comment_prefix,
705 }
706 }
707
708 pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
717 let mut states = [
718 LineStats {
719 newline_count: 0,
720 last_newline_offset: 0,
721 end_inside_string: false,
722 },
723 LineStats {
724 newline_count: 0,
725 last_newline_offset: 0,
726 end_inside_string: false,
727 },
728 ];
729
730 if self.comment_prefix.is_some() {
732 states[0] = self.analyze_chunk_with_comment(bytes, false);
733 states[1] = self.analyze_chunk_with_comment(bytes, true);
734 return states;
735 }
736
737 #[allow(unused_assignments)]
739 let mut global_quote_parity = false;
740 let mut scan_offset = 0;
741
742 #[cfg(feature = "simd")]
743 {
744 let mut global_quote_parity_mask = 0;
746 while scan_offset + 64 <= bytes.len() {
747 let block: [u8; 64] = unsafe {
748 bytes
749 .get_unchecked(scan_offset..scan_offset + 64)
750 .try_into()
751 .unwrap_unchecked()
752 };
753 let simd_bytes = SimdVec::from(block);
754 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
755 if self.quoting {
756 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
757 let quote_parity =
758 prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
759 global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
760
761 let start_outside_string_eol_mask = eol_mask & !quote_parity;
762 states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
763 states[0].last_newline_offset = select_unpredictable(
764 start_outside_string_eol_mask != 0,
765 (scan_offset + 63)
766 .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
767 states[0].last_newline_offset,
768 );
769
770 let start_inside_string_eol_mask = eol_mask & quote_parity;
771 states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
772 states[1].last_newline_offset = select_unpredictable(
773 start_inside_string_eol_mask != 0,
774 (scan_offset + 63)
775 .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
776 states[1].last_newline_offset,
777 );
778 } else {
779 states[0].newline_count += eol_mask.count_ones() as usize;
780 states[0].last_newline_offset = select_unpredictable(
781 eol_mask != 0,
782 (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
783 states[0].last_newline_offset,
784 );
785 }
786
787 scan_offset += 64;
788 }
789
790 global_quote_parity = global_quote_parity_mask > 0;
791 }
792
793 while scan_offset < bytes.len() {
794 let c = unsafe { *bytes.get_unchecked(scan_offset) };
795 global_quote_parity ^= (c == self.quote_char) & self.quoting;
796
797 let state = &mut states[global_quote_parity as usize];
798 state.newline_count += (c == self.eol_char) as usize;
799 state.last_newline_offset =
800 select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
801
802 scan_offset += 1;
803 }
804
805 states[0].end_inside_string = global_quote_parity;
806 states[1].end_inside_string = !global_quote_parity;
807 states
808 }
809
810 fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
812 let pre_s = match self.comment_prefix.as_ref().unwrap() {
813 CommentPrefix::Single(pc) => core::slice::from_ref(pc),
814 CommentPrefix::Multi(ps) => ps.as_bytes(),
815 };
816
817 let mut state = LineStats::default();
818 let mut scan_offset = 0;
819 while scan_offset < bytes.len() {
820 while bytes[scan_offset..].starts_with(pre_s) {
822 scan_offset += pre_s.len();
823 let Some(nl_off) = bytes[scan_offset..]
824 .iter()
825 .position(|c| *c == self.eol_char)
826 else {
827 break;
828 };
829 scan_offset += nl_off + 1;
830 }
831
832 while scan_offset < bytes.len() {
833 let c = unsafe { *bytes.get_unchecked(scan_offset) };
834 in_string ^= (c == self.quote_char) & self.quoting;
835
836 if c == self.eol_char && !in_string {
837 state.newline_count += 1;
838 state.last_newline_offset = scan_offset;
839 scan_offset += 1;
840 break;
841 } else {
842 scan_offset += 1;
843 }
844 }
845 }
846
847 state.end_inside_string = in_string;
848 state
849 }
850
851 pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
852 loop {
853 let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
854
855 let (count, offset) = if self.comment_prefix.is_some() {
856 let stats = self.analyze_chunk_with_comment(b, false);
857 (stats.newline_count, stats.last_newline_offset)
858 } else {
859 self.count(b)
860 };
861
862 if count > 0 || b.len() == bytes.len() {
863 return (count, offset);
864 }
865
866 *chunk_size = chunk_size.saturating_mul(2);
867 }
868 }
869
870 pub fn count_rows(&self, bytes: &[u8], is_eof: bool) -> (usize, usize) {
871 let stats = if self.comment_prefix.is_some() {
872 self.analyze_chunk_with_comment(bytes, false)
873 } else {
874 self.analyze_chunk(bytes)[0]
875 };
876
877 let mut count = stats.newline_count;
878 let mut offset = stats.last_newline_offset;
879
880 if count > 0 {
881 offset = cmp::min(offset + 1, bytes.len());
882 } else {
883 debug_assert!(offset == 0);
884 }
885
886 if is_eof {
887 count += ends_in_unterminated_row(bytes, self.eol_char, self.comment_prefix.as_ref())
888 as usize;
889 offset = bytes.len();
890 }
891
892 (count, offset)
893 }
894
895 #[cfg(feature = "simd")]
897 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
898 let mut total_idx = 0;
899 let original_bytes = bytes;
900 let mut count = 0;
901 let mut position = 0;
902 let mut not_in_field_previous_iter = true;
903
904 loop {
905 let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
906
907 if bytes.len() > SIMD_SIZE {
908 let lane: [u8; SIMD_SIZE] = unsafe {
909 bytes
910 .get_unchecked(0..SIMD_SIZE)
911 .try_into()
912 .unwrap_unchecked()
913 };
914 let simd_bytes = SimdVec::from(lane);
915 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
916
917 let valid_eols = if self.quoting {
918 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
919 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
920
921 if not_in_field_previous_iter {
922 not_in_quote_field = !not_in_quote_field;
923 }
924 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
925 eol_mask & not_in_quote_field
926 } else {
927 eol_mask
928 };
929
930 if valid_eols != 0 {
931 count += valid_eols.count_ones() as usize;
932 position = total_idx + 63 - valid_eols.leading_zeros() as usize;
933 debug_assert_eq!(original_bytes[position], self.eol_char)
934 }
935 total_idx += SIMD_SIZE;
936 } else if bytes.is_empty() {
937 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
938 return (count, position);
939 } else {
940 let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
941
942 let (count, position) = if c > 0 {
943 (count + c, total_idx + o)
944 } else {
945 (count, position)
946 };
947 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
948
949 return (count, position);
950 }
951 }
952 }
953
954 #[cfg(not(feature = "simd"))]
955 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
956 self.count_no_simd(bytes, false)
957 }
958
959 fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
960 let iter = bytes.iter();
961 let mut in_field = in_field;
962 let mut count = 0;
963 let mut position = 0;
964
965 for b in iter {
966 let c = *b;
967 if self.quoting && c == self.quote_char {
968 in_field = !in_field;
972 }
973 else if c == self.eol_char && !in_field {
975 position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
976 count += 1;
977 }
978 }
979 debug_assert!(count == 0 || bytes[position] == self.eol_char);
980
981 (count, position)
982 }
983}
984
985fn ends_in_unterminated_row(
986 bytes: &[u8],
987 eol_char: u8,
988 comment_prefix: Option<&CommentPrefix>,
989) -> bool {
990 if !bytes.is_empty() && bytes.last().copied().unwrap() != eol_char {
991 let last_new_line_post = memchr::memrchr(eol_char, bytes).unwrap_or(0);
994 let last_line_is_comment_line = bytes
995 .get(last_new_line_post + 1..)
996 .map(|line| is_comment_line(line, comment_prefix))
997 .unwrap_or(false);
998
999 return !last_line_is_comment_line;
1000 }
1001
1002 false
1003}
1004
1005#[inline]
1006fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
1007 let mut in_field = false;
1008
1009 let mut idx = 0u32;
1010 #[allow(clippy::explicit_counter_loop)]
1012 for &c in bytes.iter() {
1013 if c == quote_char {
1014 in_field = !in_field;
1018 }
1019
1020 if !in_field && c == needle {
1021 return Some(idx as usize);
1022 }
1023 idx += 1;
1024 }
1025 None
1026}
1027
1028#[inline]
1029pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
1030 let pos = match quote {
1031 Some(quote) => find_quoted(bytes, quote, eol_char),
1032 None => bytes.iter().position(|x| *x == eol_char),
1033 };
1034 match pos {
1035 None => &[],
1036 Some(pos) => &bytes[pos + 1..],
1037 }
1038}
1039
1040#[inline]
1041pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
1042 if let Some(pos) = next_line_position_naive(input, eol_char) {
1043 unsafe { input.get_unchecked(pos..) }
1044 } else {
1045 &[]
1046 }
1047}
1048
1049#[allow(clippy::too_many_arguments)]
1061pub(super) fn parse_lines(
1062 mut bytes: &[u8],
1063 parse_options: &CsvParseOptions,
1064 offset: usize,
1065 ignore_errors: bool,
1066 null_values: Option<&NullValuesCompiled>,
1067 projection: &[usize],
1068 buffers: &mut [Builder],
1069 n_lines: usize,
1070 schema_len: usize,
1072 schema: &Schema,
1073) -> PolarsResult<usize> {
1074 assert!(
1075 !projection.is_empty(),
1076 "at least one column should be projected"
1077 );
1078 let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
1079 if projection.len() != schema_len {
1083 truncate_ragged_lines = true
1084 }
1085
1086 let start = bytes.as_ptr() as usize;
1088 let original_bytes_len = bytes.len();
1089 let n_lines = n_lines as u32;
1090
1091 let mut line_count = 0u32;
1092 loop {
1093 if line_count > n_lines {
1094 let end = bytes.as_ptr() as usize;
1095 return Ok(end - start);
1096 }
1097
1098 if bytes.is_empty() {
1099 return Ok(original_bytes_len);
1100 } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1101 let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1103 bytes = bytes_rem;
1104 continue;
1105 }
1106
1107 let mut projection_iter = projection.iter().copied();
1111 let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1112 let mut processed_fields = 0;
1113
1114 let mut iter = SplitFields::new(
1115 bytes,
1116 parse_options.separator,
1117 parse_options.quote_char,
1118 parse_options.eol_char,
1119 );
1120 let mut idx = 0u32;
1121 let mut read_sol = 0;
1122 loop {
1123 match iter.next() {
1124 None => {
1126 bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1127 break;
1128 },
1129 Some((mut field, needs_escaping)) => {
1130 let field_len = field.len();
1131
1132 read_sol += field_len + 1;
1134
1135 if idx == next_projected as u32 {
1136 unsafe {
1139 if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1140 field = field.get_unchecked(..field_len - 1);
1141 }
1142 }
1143
1144 debug_assert!(processed_fields < buffers.len());
1145 let buf = unsafe {
1146 buffers.get_unchecked_mut(processed_fields)
1148 };
1149 let mut add_null = false;
1150
1151 if let Some(null_values) = null_values {
1153 let field = if needs_escaping && !field.is_empty() {
1154 unsafe { field.get_unchecked(1..field.len() - 1) }
1155 } else {
1156 field
1157 };
1158
1159 add_null = unsafe { null_values.is_null(field, idx as usize) }
1162 }
1163 if add_null {
1164 buf.add_null(!parse_options.missing_is_null && field.is_empty())
1165 } else {
1166 buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1167 .map_err(|e| {
1168 let bytes_offset = offset + field.as_ptr() as usize - start;
1169 let unparsable = String::from_utf8_lossy(field);
1170 let column_name = schema.get_at_index(idx as usize).unwrap().0;
1171 polars_err!(
1172 ComputeError:
1173 "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1174 The current offset in the file is {} bytes.\n\
1175 \n\
1176 You might want to try:\n\
1177 - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1178 - specifying correct dtype with the `schema_overrides` argument\n\
1179 - setting `ignore_errors` to `True`,\n\
1180 - adding `{}` to the `null_values` list.\n\n\
1181 Original error: ```{}```",
1182 &unparsable,
1183 buf.dtype(),
1184 column_name,
1185 idx + 1,
1186 bytes_offset,
1187 &unparsable,
1188 e
1189 )
1190 })?;
1191 }
1192 processed_fields += 1;
1193
1194 match projection_iter.next() {
1196 Some(p) => next_projected = p,
1197 None => {
1198 if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1199 bytes = unsafe { bytes.get_unchecked(read_sol..) };
1200 } else {
1201 if !truncate_ragged_lines && read_sol < bytes.len() {
1202 polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1203
1204Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1205 }
1206 let bytes_rem = skip_this_line(
1207 unsafe { bytes.get_unchecked(read_sol - 1..) },
1208 parse_options.quote_char,
1209 parse_options.eol_char,
1210 );
1211 bytes = bytes_rem;
1212 }
1213 break;
1214 },
1215 }
1216 }
1217 idx += 1;
1218 },
1219 }
1220 }
1221
1222 while processed_fields < projection.len() {
1226 debug_assert!(processed_fields < buffers.len());
1227 let buf = unsafe {
1228 buffers.get_unchecked_mut(processed_fields)
1230 };
1231 buf.add_null(!parse_options.missing_is_null);
1232 processed_fields += 1;
1233 }
1234 line_count += 1;
1235 }
1236}
1237
1238#[cfg(test)]
1239mod test {
1240 use super::SplitLines;
1241
1242 #[test]
1243 fn test_splitlines() {
1244 let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1245 let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1246 assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1247 assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1248 assert_eq!(lines.next(), None);
1249
1250 let input2 = "1,'foo\n'\n2,'foo\n'\n";
1251 let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1252 assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1253 assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1254 assert_eq!(lines2.next(), None);
1255 }
1256}