1use memchr::memchr2_iter;
2use polars_core::prelude::*;
3use polars_core::{POOL, config};
4use polars_error::feature_gated;
5use polars_utils::mmap::MMapSemaphore;
6use polars_utils::plpath::PlPathRef;
7use polars_utils::select::select_unpredictable;
8use rayon::prelude::*;
9
10use super::CsvParseOptions;
11use super::buffer::Buffer;
12use super::options::{CommentPrefix, NullValuesCompiled};
13use super::splitfields::SplitFields;
14use crate::prelude::_csv_read_internal::find_starting_point;
15use crate::utils::compression::maybe_decompress_bytes;
16
17#[allow(clippy::too_many_arguments)]
20pub fn count_rows(
21 addr: PlPathRef<'_>,
22 quote_char: Option<u8>,
23 comment_prefix: Option<&CommentPrefix>,
24 eol_char: u8,
25 has_header: bool,
26 skip_lines: usize,
27 skip_rows_before_header: usize,
28 skip_rows_after_header: usize,
29) -> PolarsResult<usize> {
30 let file = match addr
31 .as_local_path()
32 .and_then(|v| (!config::force_async()).then_some(v))
33 {
34 None => feature_gated!("cloud", {
35 crate::file_cache::FILE_CACHE
36 .get_entry(addr)
37 .unwrap()
39 .try_open_assume_latest()?
40 }),
41 Some(path) => polars_utils::open_file(path)?,
42 };
43
44 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
45 let owned = &mut vec![];
46 let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
47
48 count_rows_from_slice_par(
49 reader_bytes,
50 quote_char,
51 comment_prefix,
52 eol_char,
53 has_header,
54 skip_lines,
55 skip_rows_before_header,
56 skip_rows_after_header,
57 )
58}
59
60#[allow(clippy::too_many_arguments)]
63pub fn count_rows_from_slice_par(
64 mut bytes: &[u8],
65 quote_char: Option<u8>,
66 comment_prefix: Option<&CommentPrefix>,
67 eol_char: u8,
68 has_header: bool,
69 skip_lines: usize,
70 skip_rows_before_header: usize,
71 skip_rows_after_header: usize,
72) -> PolarsResult<usize> {
73 let start_offset = find_starting_point(
74 bytes,
75 quote_char,
76 eol_char,
77 usize::MAX,
83 skip_lines,
84 skip_rows_before_header,
85 skip_rows_after_header,
86 comment_prefix,
87 has_header,
88 )?;
89 bytes = &bytes[start_offset..];
90
91 #[cfg(debug_assertions)]
92 const BYTES_PER_CHUNK: usize = 128;
93 #[cfg(not(debug_assertions))]
94 const BYTES_PER_CHUNK: usize = 1 << 16;
95
96 let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
97 POOL.install(|| {
98 let mut states = Vec::new();
99 if comment_prefix.is_none() {
100 bytes
101 .par_chunks(BYTES_PER_CHUNK)
102 .map(|chunk| count.analyze_chunk(chunk))
103 .collect_into_vec(&mut states);
104 } else {
105 let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
106 (0..num_chunks)
107 .into_par_iter()
108 .map(|chunk_idx| {
109 let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
110 let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
111
112 if start_offset != 0 {
113 if let Some(nl_off) = bytes[start_offset..next_start_offset]
115 .iter()
116 .position(|b| *b == b'\n')
117 {
118 start_offset += nl_off + 1;
119 } else {
120 return count.analyze_chunk(&[]);
121 }
122 }
123
124 let stop_offset = if let Some(nl_off) =
125 bytes[next_start_offset..].iter().position(|b| *b == b'\n')
126 {
127 next_start_offset + nl_off + 1
128 } else {
129 bytes.len()
130 };
131
132 count.analyze_chunk(&bytes[start_offset..stop_offset])
133 })
134 .collect_into_vec(&mut states);
135 }
136
137 let mut n = 0;
138 let mut in_string = false;
139 for pair in states {
140 n += pair[in_string as usize].newline_count;
141 in_string = pair[in_string as usize].end_inside_string;
142 }
143 if let Some(last) = bytes.last() {
144 n += (*last != eol_char) as usize;
145 }
146 Ok(n)
147 })
148}
149
150pub fn count_rows_from_slice_raw(
153 bytes: &[u8],
154 quote_char: Option<u8>,
155 comment_prefix: Option<&CommentPrefix>,
156 eol_char: u8,
157) -> PolarsResult<usize> {
158 Ok(
159 CountLines::new(quote_char, eol_char, comment_prefix.cloned())
160 .count(bytes)
161 .0,
162 )
163}
164
165pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
168 if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
169 &input[3..]
170 } else {
171 input
172 }
173}
174
175#[inline]
179pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
180 match comment_prefix {
181 Some(CommentPrefix::Single(c)) => line.first() == Some(c),
182 Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
183 None => false,
184 }
185}
186
187pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
190 let pos = memchr::memchr(eol_char, input)? + 1;
191 if input.len() - pos == 0 {
192 return None;
193 }
194 Some(pos)
195}
196
197pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
198 for _ in 0..skip {
199 if let Some(pos) = next_line_position_naive(input, eol_char) {
200 input = &input[pos..];
201 } else {
202 return input;
203 }
204 }
205 input
206}
207
208pub(super) fn next_line_position(
210 mut input: &[u8],
211 mut expected_fields: Option<usize>,
212 separator: u8,
213 quote_char: Option<u8>,
214 eol_char: u8,
215) -> Option<usize> {
216 fn accept_line(
217 line: &[u8],
218 expected_fields: usize,
219 separator: u8,
220 eol_char: u8,
221 quote_char: Option<u8>,
222 ) -> bool {
223 let mut count = 0usize;
224 for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
225 if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
226 return false;
227 }
228 count += 1;
229 }
230
231 expected_fields.wrapping_sub(count) <= 1
241 }
242
243 let mut rejected_line_groups = 0u8;
246
247 let mut total_pos = 0;
248 if input.is_empty() {
249 return None;
250 }
251 let mut lines_checked = 0u8;
252 loop {
253 if rejected_line_groups >= 3 {
254 return None;
255 }
256 lines_checked = lines_checked.wrapping_add(1);
257 if lines_checked == u8::MAX {
261 if let Some(ef) = expected_fields {
262 expected_fields = Some(ef.saturating_sub(1))
263 }
264 };
265 let pos = memchr::memchr(eol_char, input)? + 1;
266 if input.len() - pos == 0 {
267 return None;
268 }
269 debug_assert!(pos <= input.len());
270 let new_input = unsafe { input.get_unchecked(pos..) };
271 let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
272 let line = lines.next();
273
274 match (line, expected_fields) {
275 (Some(line), Some(expected_fields)) => {
277 if accept_line(line, expected_fields, separator, eol_char, quote_char) {
278 let mut valid = true;
279 for line in lines.take(2) {
280 if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
281 valid = false;
282 break;
283 }
284 }
285 if valid {
286 return Some(total_pos + pos);
287 } else {
288 rejected_line_groups += 1;
289 }
290 } else {
291 debug_assert!(pos < input.len());
292 unsafe {
293 input = input.get_unchecked(pos + 1..);
294 }
295 total_pos += pos + 1;
296 }
297 },
298 (Some(_), None) => return Some(total_pos + pos),
300 _ => return None,
302 }
303 }
304}
305
306#[inline(always)]
307pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
308 b == eol_char || b == b'\r'
309}
310
311#[inline(always)]
312pub(super) fn is_whitespace(b: u8) -> bool {
313 b == b' ' || b == b'\t'
314}
315
316#[inline(always)]
318pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
319 b <= 32
324}
325
326#[inline]
327fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
328where
329 F: Fn(u8) -> bool,
330{
331 if input.is_empty() {
332 return input;
333 }
334
335 let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
336 &input[read..]
337}
338
339#[inline]
345pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
346 skip_condition(input, is_whitespace)
347}
348
349#[inline]
350pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
351 skip_condition(input, |b| is_line_ending(b, eol_char))
352}
353
354pub(super) struct SplitLines<'a> {
364 v: &'a [u8],
365 quote_char: u8,
366 eol_char: u8,
367 #[cfg(feature = "simd")]
368 simd_eol_char: SimdVec,
369 #[cfg(feature = "simd")]
370 simd_quote_char: SimdVec,
371 #[cfg(feature = "simd")]
372 previous_valid_eols: u64,
373 total_index: usize,
374 quoting: bool,
375 comment_prefix: Option<&'a CommentPrefix>,
376}
377
378#[cfg(feature = "simd")]
379const SIMD_SIZE: usize = 64;
380#[cfg(feature = "simd")]
381use std::simd::prelude::*;
382
383#[cfg(feature = "simd")]
384use polars_utils::clmul::prefix_xorsum_inclusive;
385
386#[cfg(feature = "simd")]
387type SimdVec = u8x64;
388
389impl<'a> SplitLines<'a> {
390 pub(super) fn new(
391 slice: &'a [u8],
392 quote_char: Option<u8>,
393 eol_char: u8,
394 comment_prefix: Option<&'a CommentPrefix>,
395 ) -> Self {
396 let quoting = quote_char.is_some();
397 let quote_char = quote_char.unwrap_or(b'\"');
398 #[cfg(feature = "simd")]
399 let simd_eol_char = SimdVec::splat(eol_char);
400 #[cfg(feature = "simd")]
401 let simd_quote_char = SimdVec::splat(quote_char);
402 Self {
403 v: slice,
404 quote_char,
405 eol_char,
406 #[cfg(feature = "simd")]
407 simd_eol_char,
408 #[cfg(feature = "simd")]
409 simd_quote_char,
410 #[cfg(feature = "simd")]
411 previous_valid_eols: 0,
412 total_index: 0,
413 quoting,
414 comment_prefix,
415 }
416 }
417}
418
419impl<'a> SplitLines<'a> {
420 fn next_scalar(&mut self) -> Option<&'a [u8]> {
422 if self.v.is_empty() {
423 return None;
424 }
425 if is_comment_line(self.v, self.comment_prefix) {
426 return self.next_comment_line();
427 }
428 {
429 let mut pos = 0u32;
430 let mut iter = self.v.iter();
431 let mut in_field = false;
432 loop {
433 match iter.next() {
434 Some(&c) => {
435 pos += 1;
436
437 if self.quoting && c == self.quote_char {
438 in_field = !in_field;
442 }
443 else if c == self.eol_char && !in_field {
445 break;
446 }
447 },
448 None => {
449 let remainder = self.v;
450 self.v = &[];
451 return Some(remainder);
452 },
453 }
454 }
455
456 unsafe {
457 debug_assert!((pos as usize) <= self.v.len());
458
459 let ret = Some(
461 self.v
462 .get_unchecked(..(self.total_index + pos as usize - 1)),
463 );
464 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
466 ret
467 }
468 }
469 }
470 fn next_comment_line(&mut self) -> Option<&'a [u8]> {
471 if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
472 unsafe {
473 let ret = Some(self.v.get_unchecked(..(pos - 1)));
475 self.v = self.v.get_unchecked(pos..);
477 ret
478 }
479 } else {
480 let remainder = self.v;
481 self.v = &[];
482 Some(remainder)
483 }
484 }
485}
486
487impl<'a> Iterator for SplitLines<'a> {
488 type Item = &'a [u8];
489
490 #[inline]
491 #[cfg(not(feature = "simd"))]
492 fn next(&mut self) -> Option<&'a [u8]> {
493 self.next_scalar()
494 }
495
496 #[inline]
497 #[cfg(feature = "simd")]
498 fn next(&mut self) -> Option<&'a [u8]> {
499 if self.previous_valid_eols != 0 {
501 let pos = self.previous_valid_eols.trailing_zeros() as usize;
502 self.previous_valid_eols >>= (pos + 1) as u64;
503
504 unsafe {
505 debug_assert!((pos) <= self.v.len());
506
507 let ret = Some(self.v.get_unchecked(..pos));
509 self.v = self.v.get_unchecked(pos + 1..);
511 return ret;
512 }
513 }
514 if self.v.is_empty() {
515 return None;
516 }
517 if self.comment_prefix.is_some() {
518 return self.next_scalar();
519 }
520
521 self.total_index = 0;
522 let mut not_in_field_previous_iter = true;
523
524 loop {
525 let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
526 if bytes.len() > SIMD_SIZE {
527 let lane: [u8; SIMD_SIZE] = unsafe {
528 bytes
529 .get_unchecked(0..SIMD_SIZE)
530 .try_into()
531 .unwrap_unchecked()
532 };
533 let simd_bytes = SimdVec::from(lane);
534 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
535
536 let valid_eols = if self.quoting {
537 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
538 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
539
540 if not_in_field_previous_iter {
541 not_in_quote_field = !not_in_quote_field;
542 }
543 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
544 eol_mask & not_in_quote_field
545 } else {
546 eol_mask
547 };
548
549 if valid_eols != 0 {
550 let pos = valid_eols.trailing_zeros() as usize;
551 if pos == SIMD_SIZE - 1 {
552 self.previous_valid_eols = 0;
553 } else {
554 self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
555 }
556
557 unsafe {
558 let pos = self.total_index + pos;
559 debug_assert!((pos) <= self.v.len());
560
561 let ret = Some(self.v.get_unchecked(..pos));
563 self.v = self.v.get_unchecked(pos + 1..);
565 return ret;
566 }
567 } else {
568 self.total_index += SIMD_SIZE;
569 }
570 } else {
571 let mut in_field = !not_in_field_previous_iter;
573 let mut pos = 0u32;
574 let mut iter = bytes.iter();
575 loop {
576 match iter.next() {
577 Some(&c) => {
578 pos += 1;
579
580 if self.quoting && c == self.quote_char {
581 in_field = !in_field;
585 }
586 else if c == self.eol_char && !in_field {
588 break;
589 }
590 },
591 None => {
592 let remainder = self.v;
593 self.v = &[];
594 return Some(remainder);
595 },
596 }
597 }
598
599 unsafe {
600 debug_assert!((pos as usize) <= self.v.len());
601
602 let ret = Some(
604 self.v
605 .get_unchecked(..(self.total_index + pos as usize - 1)),
606 );
607 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
609 return ret;
610 }
611 }
612 }
613 }
614}
615
616pub struct CountLines {
617 quote_char: u8,
618 eol_char: u8,
619 #[cfg(feature = "simd")]
620 simd_eol_char: SimdVec,
621 #[cfg(feature = "simd")]
622 simd_quote_char: SimdVec,
623 quoting: bool,
624 comment_prefix: Option<CommentPrefix>,
625}
626
627#[derive(Copy, Clone, Debug, Default)]
628pub struct LineStats {
629 newline_count: usize,
630 last_newline_offset: usize,
631 end_inside_string: bool,
632}
633
634impl CountLines {
635 pub fn new(
636 quote_char: Option<u8>,
637 eol_char: u8,
638 comment_prefix: Option<CommentPrefix>,
639 ) -> Self {
640 let quoting = quote_char.is_some();
641 let quote_char = quote_char.unwrap_or(b'\"');
642 #[cfg(feature = "simd")]
643 let simd_eol_char = SimdVec::splat(eol_char);
644 #[cfg(feature = "simd")]
645 let simd_quote_char = SimdVec::splat(quote_char);
646 Self {
647 quote_char,
648 eol_char,
649 #[cfg(feature = "simd")]
650 simd_eol_char,
651 #[cfg(feature = "simd")]
652 simd_quote_char,
653 quoting,
654 comment_prefix,
655 }
656 }
657
658 pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
667 let mut states = [
668 LineStats {
669 newline_count: 0,
670 last_newline_offset: 0,
671 end_inside_string: false,
672 },
673 LineStats {
674 newline_count: 0,
675 last_newline_offset: 0,
676 end_inside_string: false,
677 },
678 ];
679
680 if self.comment_prefix.is_some() {
682 states[0] = self.analyze_chunk_with_comment(bytes, false);
683 states[1] = self.analyze_chunk_with_comment(bytes, true);
684 return states;
685 }
686
687 #[allow(unused_assignments)]
689 let mut global_quote_parity = false;
690 let mut scan_offset = 0;
691
692 #[cfg(feature = "simd")]
693 {
694 let mut global_quote_parity_mask = 0;
696 while scan_offset + 64 <= bytes.len() {
697 let block: [u8; 64] = unsafe {
698 bytes
699 .get_unchecked(scan_offset..scan_offset + 64)
700 .try_into()
701 .unwrap_unchecked()
702 };
703 let simd_bytes = SimdVec::from(block);
704 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
705 if self.quoting {
706 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
707 let quote_parity =
708 prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
709 global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
710
711 let start_outside_string_eol_mask = eol_mask & !quote_parity;
712 states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
713 states[0].last_newline_offset = select_unpredictable(
714 start_outside_string_eol_mask != 0,
715 (scan_offset + 63)
716 .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
717 states[0].last_newline_offset,
718 );
719
720 let start_inside_string_eol_mask = eol_mask & quote_parity;
721 states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
722 states[1].last_newline_offset = select_unpredictable(
723 start_inside_string_eol_mask != 0,
724 (scan_offset + 63)
725 .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
726 states[1].last_newline_offset,
727 );
728 } else {
729 states[0].newline_count += eol_mask.count_ones() as usize;
730 states[0].last_newline_offset = select_unpredictable(
731 eol_mask != 0,
732 (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
733 states[0].last_newline_offset,
734 );
735 }
736
737 scan_offset += 64;
738 }
739
740 global_quote_parity = global_quote_parity_mask > 0;
741 }
742
743 while scan_offset < bytes.len() {
744 let c = unsafe { *bytes.get_unchecked(scan_offset) };
745 global_quote_parity ^= (c == self.quote_char) & self.quoting;
746
747 let state = &mut states[global_quote_parity as usize];
748 state.newline_count += (c == self.eol_char) as usize;
749 state.last_newline_offset =
750 select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
751
752 scan_offset += 1;
753 }
754
755 states[0].end_inside_string = global_quote_parity;
756 states[1].end_inside_string = !global_quote_parity;
757 states
758 }
759
760 fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
762 let pre_s = match self.comment_prefix.as_ref().unwrap() {
763 CommentPrefix::Single(pc) => core::slice::from_ref(pc),
764 CommentPrefix::Multi(ps) => ps.as_bytes(),
765 };
766
767 let mut state = LineStats::default();
768 let mut scan_offset = 0;
769 while scan_offset < bytes.len() {
770 while bytes[scan_offset..].starts_with(pre_s) {
772 scan_offset += pre_s.len();
773 let Some(nl_off) = bytes[scan_offset..].iter().position(|c| *c == b'\n') else {
774 break;
775 };
776 scan_offset += nl_off + 1;
777 }
778
779 while scan_offset < bytes.len() {
780 let c = unsafe { *bytes.get_unchecked(scan_offset) };
781 in_string ^= (c == self.quote_char) & self.quoting;
782
783 if c == self.eol_char && !in_string {
784 state.newline_count += 1;
785 state.last_newline_offset = scan_offset;
786 scan_offset += 1;
787 break;
788 } else {
789 scan_offset += 1;
790 }
791 }
792 }
793
794 state.end_inside_string = in_string;
795 state
796 }
797
798 pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
799 loop {
800 let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
801
802 let (count, offset) = self.count(b);
803
804 if count > 0 || b.len() == bytes.len() {
805 return (count, offset);
806 }
807
808 *chunk_size *= 2;
809 }
810 }
811
812 #[cfg(feature = "simd")]
814 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
815 let mut total_idx = 0;
816 let original_bytes = bytes;
817 let mut count = 0;
818 let mut position = 0;
819 let mut not_in_field_previous_iter = true;
820
821 loop {
822 let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
823
824 if bytes.len() > SIMD_SIZE {
825 let lane: [u8; SIMD_SIZE] = unsafe {
826 bytes
827 .get_unchecked(0..SIMD_SIZE)
828 .try_into()
829 .unwrap_unchecked()
830 };
831 let simd_bytes = SimdVec::from(lane);
832 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
833
834 let valid_eols = if self.quoting {
835 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
836 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
837
838 if not_in_field_previous_iter {
839 not_in_quote_field = !not_in_quote_field;
840 }
841 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
842 eol_mask & not_in_quote_field
843 } else {
844 eol_mask
845 };
846
847 if valid_eols != 0 {
848 count += valid_eols.count_ones() as usize;
849 position = total_idx + 63 - valid_eols.leading_zeros() as usize;
850 debug_assert_eq!(original_bytes[position], self.eol_char)
851 }
852 total_idx += SIMD_SIZE;
853 } else if bytes.is_empty() {
854 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
855 return (count, position);
856 } else {
857 let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
858
859 let (count, position) = if c > 0 {
860 (count + c, total_idx + o)
861 } else {
862 (count, position)
863 };
864 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
865
866 return (count, position);
867 }
868 }
869 }
870
871 #[cfg(not(feature = "simd"))]
872 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
873 self.count_no_simd(bytes, false)
874 }
875
876 fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
877 let iter = bytes.iter();
878 let mut in_field = in_field;
879 let mut count = 0;
880 let mut position = 0;
881
882 for b in iter {
883 let c = *b;
884 if self.quoting && c == self.quote_char {
885 in_field = !in_field;
889 }
890 else if c == self.eol_char && !in_field {
892 position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
893 count += 1;
894 }
895 }
896 debug_assert!(count == 0 || bytes[position] == self.eol_char);
897
898 (count, position)
899 }
900}
901
902#[inline]
903fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
904 let mut in_field = false;
905
906 let mut idx = 0u32;
907 #[allow(clippy::explicit_counter_loop)]
909 for &c in bytes.iter() {
910 if c == quote_char {
911 in_field = !in_field;
915 }
916
917 if !in_field && c == needle {
918 return Some(idx as usize);
919 }
920 idx += 1;
921 }
922 None
923}
924
925#[inline]
926pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
927 let pos = match quote {
928 Some(quote) => find_quoted(bytes, quote, eol_char),
929 None => bytes.iter().position(|x| *x == eol_char),
930 };
931 match pos {
932 None => &[],
933 Some(pos) => &bytes[pos + 1..],
934 }
935}
936
937#[inline]
938pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
939 if let Some(pos) = next_line_position_naive(input, eol_char) {
940 unsafe { input.get_unchecked(pos..) }
941 } else {
942 &[]
943 }
944}
945
946#[allow(clippy::too_many_arguments)]
958pub(super) fn parse_lines(
959 mut bytes: &[u8],
960 parse_options: &CsvParseOptions,
961 offset: usize,
962 ignore_errors: bool,
963 null_values: Option<&NullValuesCompiled>,
964 projection: &[usize],
965 buffers: &mut [Buffer],
966 n_lines: usize,
967 schema_len: usize,
969 schema: &Schema,
970) -> PolarsResult<usize> {
971 assert!(
972 !projection.is_empty(),
973 "at least one column should be projected"
974 );
975 let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
976 if projection.len() != schema_len {
980 truncate_ragged_lines = true
981 }
982
983 let start = bytes.as_ptr() as usize;
985 let original_bytes_len = bytes.len();
986 let n_lines = n_lines as u32;
987
988 let mut line_count = 0u32;
989 loop {
990 if line_count > n_lines {
991 let end = bytes.as_ptr() as usize;
992 return Ok(end - start);
993 }
994
995 if bytes.is_empty() {
996 return Ok(original_bytes_len);
997 } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
998 let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1000 bytes = bytes_rem;
1001 continue;
1002 }
1003
1004 let mut projection_iter = projection.iter().copied();
1008 let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1009 let mut processed_fields = 0;
1010
1011 let mut iter = SplitFields::new(
1012 bytes,
1013 parse_options.separator,
1014 parse_options.quote_char,
1015 parse_options.eol_char,
1016 );
1017 let mut idx = 0u32;
1018 let mut read_sol = 0;
1019 loop {
1020 match iter.next() {
1021 None => {
1023 bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1024 break;
1025 },
1026 Some((mut field, needs_escaping)) => {
1027 let field_len = field.len();
1028
1029 read_sol += field_len + 1;
1031
1032 if idx == next_projected as u32 {
1033 unsafe {
1036 if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1037 field = field.get_unchecked(..field_len - 1);
1038 }
1039 }
1040
1041 debug_assert!(processed_fields < buffers.len());
1042 let buf = unsafe {
1043 buffers.get_unchecked_mut(processed_fields)
1045 };
1046 let mut add_null = false;
1047
1048 if let Some(null_values) = null_values {
1050 let field = if needs_escaping && !field.is_empty() {
1051 unsafe { field.get_unchecked(1..field.len() - 1) }
1052 } else {
1053 field
1054 };
1055
1056 add_null = unsafe { null_values.is_null(field, idx as usize) }
1059 }
1060 if add_null {
1061 buf.add_null(!parse_options.missing_is_null && field.is_empty())
1062 } else {
1063 buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1064 .map_err(|e| {
1065 let bytes_offset = offset + field.as_ptr() as usize - start;
1066 let unparsable = String::from_utf8_lossy(field);
1067 let column_name = schema.get_at_index(idx as usize).unwrap().0;
1068 polars_err!(
1069 ComputeError:
1070 "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1071 The current offset in the file is {} bytes.\n\
1072 \n\
1073 You might want to try:\n\
1074 - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1075 - specifying correct dtype with the `schema_overrides` argument\n\
1076 - setting `ignore_errors` to `True`,\n\
1077 - adding `{}` to the `null_values` list.\n\n\
1078 Original error: ```{}```",
1079 &unparsable,
1080 buf.dtype(),
1081 column_name,
1082 idx + 1,
1083 bytes_offset,
1084 &unparsable,
1085 e
1086 )
1087 })?;
1088 }
1089 processed_fields += 1;
1090
1091 match projection_iter.next() {
1093 Some(p) => next_projected = p,
1094 None => {
1095 if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1096 bytes = unsafe { bytes.get_unchecked(read_sol..) };
1097 } else {
1098 if !truncate_ragged_lines && read_sol < bytes.len() {
1099 polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1100
1101Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1102 }
1103 let bytes_rem = skip_this_line(
1104 unsafe { bytes.get_unchecked(read_sol - 1..) },
1105 parse_options.quote_char,
1106 parse_options.eol_char,
1107 );
1108 bytes = bytes_rem;
1109 }
1110 break;
1111 },
1112 }
1113 }
1114 idx += 1;
1115 },
1116 }
1117 }
1118
1119 while processed_fields < projection.len() {
1123 debug_assert!(processed_fields < buffers.len());
1124 let buf = unsafe {
1125 buffers.get_unchecked_mut(processed_fields)
1127 };
1128 buf.add_null(!parse_options.missing_is_null);
1129 processed_fields += 1;
1130 }
1131 line_count += 1;
1132 }
1133}
1134
1135#[cfg(test)]
1136mod test {
1137 use super::SplitLines;
1138
1139 #[test]
1140 fn test_splitlines() {
1141 let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1142 let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1143 assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1144 assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1145 assert_eq!(lines.next(), None);
1146
1147 let input2 = "1,'foo\n'\n2,'foo\n'\n";
1148 let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1149 assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1150 assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1151 assert_eq!(lines2.next(), None);
1152 }
1153}