1use std::cmp;
2
3use memchr::memchr2_iter;
4use polars_core::prelude::*;
5use polars_core::{POOL, config};
6use polars_error::feature_gated;
7use polars_utils::mmap::{MMapSemaphore, MemSlice};
8use polars_utils::plpath::PlPathRef;
9use polars_utils::select::select_unpredictable;
10use rayon::prelude::*;
11
12use super::CsvParseOptions;
13use super::buffer::Buffer;
14use super::options::{CommentPrefix, NullValuesCompiled};
15use super::splitfields::SplitFields;
16use crate::csv::read::read_until_start_and_infer_schema;
17use crate::prelude::CsvReadOptions;
18use crate::utils::compression::CompressedReader;
19
20#[allow(clippy::too_many_arguments)]
23pub fn count_rows(
24 addr: PlPathRef<'_>,
25 quote_char: Option<u8>,
26 comment_prefix: Option<&CommentPrefix>,
27 eol_char: u8,
28 has_header: bool,
29 skip_lines: usize,
30 skip_rows_before_header: usize,
31 skip_rows_after_header: usize,
32) -> PolarsResult<usize> {
33 let file = match addr
34 .as_local_path()
35 .and_then(|v| (!config::force_async()).then_some(v))
36 {
37 None => feature_gated!("cloud", {
38 crate::file_cache::FILE_CACHE
39 .get_entry(addr)
40 .unwrap()
42 .try_open_assume_latest()?
43 }),
44 Some(path) => polars_utils::open_file(path)?,
45 };
46
47 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
48
49 count_rows_from_slice_par(
50 MemSlice::from_mmap(Arc::new(mmap)),
51 quote_char,
52 comment_prefix,
53 eol_char,
54 has_header,
55 skip_lines,
56 skip_rows_before_header,
57 skip_rows_after_header,
58 )
59}
60
61#[allow(clippy::too_many_arguments)]
64pub fn count_rows_from_slice_par(
65 mem_slice: MemSlice,
66 quote_char: Option<u8>,
67 comment_prefix: Option<&CommentPrefix>,
68 eol_char: u8,
69 has_header: bool,
70 skip_lines: usize,
71 skip_rows_before_header: usize,
72 skip_rows_after_header: usize,
73) -> PolarsResult<usize> {
74 let mut reader = CompressedReader::try_new(mem_slice)?;
76
77 let reader_options = CsvReadOptions {
78 parse_options: Arc::new(CsvParseOptions {
79 quote_char,
80 comment_prefix: comment_prefix.cloned(),
81 eol_char,
82 ..Default::default()
83 }),
84 has_header,
85 skip_lines,
86 skip_rows: skip_rows_before_header,
87 skip_rows_after_header,
88 ..Default::default()
89 };
90
91 let (_, leftover) =
92 read_until_start_and_infer_schema(&reader_options, None, None, &mut reader)?;
93
94 let (bytes, _) = reader.read_next_slice(&leftover, usize::MAX)?;
96
97 #[cfg(debug_assertions)]
98 const BYTES_PER_CHUNK: usize = 128;
99 #[cfg(not(debug_assertions))]
100 const BYTES_PER_CHUNK: usize = 1 << 16;
101
102 let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
103 POOL.install(|| {
104 let mut states = Vec::new();
105 if comment_prefix.is_none() {
106 bytes
107 .par_chunks(BYTES_PER_CHUNK)
108 .map(|chunk| count.analyze_chunk(chunk))
109 .collect_into_vec(&mut states);
110 } else {
111 let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
112 (0..num_chunks)
113 .into_par_iter()
114 .map(|chunk_idx| {
115 let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
116 let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
117
118 if start_offset != 0 {
119 if let Some(nl_off) = bytes[start_offset..next_start_offset]
121 .iter()
122 .position(|b| *b == eol_char)
123 {
124 start_offset += nl_off + 1;
125 } else {
126 return count.analyze_chunk(&[]);
127 }
128 }
129
130 let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
131 .iter()
132 .position(|b| *b == eol_char)
133 {
134 next_start_offset + nl_off + 1
135 } else {
136 bytes.len()
137 };
138
139 count.analyze_chunk(&bytes[start_offset..stop_offset])
140 })
141 .collect_into_vec(&mut states);
142 }
143
144 let mut n = 0;
145 let mut in_string = false;
146 for pair in states {
147 n += pair[in_string as usize].newline_count;
148 in_string = pair[in_string as usize].end_inside_string;
149 }
150 if let Some(last) = bytes.last()
151 && *last != eol_char
152 && (comment_prefix.is_none()
153 || !is_comment_line(
154 bytes.rsplit(|c| *c == eol_char).next().unwrap(),
155 comment_prefix,
156 ))
157 {
158 n += 1
159 }
160
161 Ok(n)
162 })
163}
164
165#[inline]
169pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
170 match comment_prefix {
171 Some(CommentPrefix::Single(c)) => line.first() == Some(c),
172 Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
173 None => false,
174 }
175}
176
177pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
180 let pos = memchr::memchr(eol_char, input)? + 1;
181 if input.len() - pos == 0 {
182 return None;
183 }
184 Some(pos)
185}
186
187pub(super) fn next_line_position(
189 mut input: &[u8],
190 mut expected_fields: Option<usize>,
191 separator: u8,
192 quote_char: Option<u8>,
193 eol_char: u8,
194) -> Option<usize> {
195 fn accept_line(
196 line: &[u8],
197 expected_fields: usize,
198 separator: u8,
199 eol_char: u8,
200 quote_char: Option<u8>,
201 ) -> bool {
202 let mut count = 0usize;
203 for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
204 if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
205 return false;
206 }
207 count += 1;
208 }
209
210 expected_fields.wrapping_sub(count) <= 1
220 }
221
222 let mut rejected_line_groups = 0u8;
225
226 let mut total_pos = 0;
227 if input.is_empty() {
228 return None;
229 }
230 let mut lines_checked = 0u8;
231 loop {
232 if rejected_line_groups >= 3 {
233 return None;
234 }
235 lines_checked = lines_checked.wrapping_add(1);
236 if lines_checked == u8::MAX {
240 if let Some(ef) = expected_fields {
241 expected_fields = Some(ef.saturating_sub(1))
242 }
243 };
244 let pos = memchr::memchr(eol_char, input)? + 1;
245 if input.len() - pos == 0 {
246 return None;
247 }
248 debug_assert!(pos <= input.len());
249 let new_input = unsafe { input.get_unchecked(pos..) };
250 let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
251 let line = lines.next();
252
253 match (line, expected_fields) {
254 (Some(line), Some(expected_fields)) => {
256 if accept_line(line, expected_fields, separator, eol_char, quote_char) {
257 let mut valid = true;
258 for line in lines.take(2) {
259 if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
260 valid = false;
261 break;
262 }
263 }
264 if valid {
265 return Some(total_pos + pos);
266 } else {
267 rejected_line_groups += 1;
268 }
269 } else {
270 debug_assert!(pos < input.len());
271 unsafe {
272 input = input.get_unchecked(pos + 1..);
273 }
274 total_pos += pos + 1;
275 }
276 },
277 (Some(_), None) => return Some(total_pos + pos),
279 _ => return None,
281 }
282 }
283}
284
285#[inline(always)]
286pub(super) fn is_whitespace(b: u8) -> bool {
287 b == b' ' || b == b'\t'
288}
289
290#[inline(always)]
292pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
293 b <= 32
298}
299
300#[inline]
301fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
302where
303 F: Fn(u8) -> bool,
304{
305 if input.is_empty() {
306 return input;
307 }
308
309 let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
310 &input[read..]
311}
312
313#[inline]
319pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
320 skip_condition(input, is_whitespace)
321}
322
323pub struct SplitLines<'a> {
333 v: &'a [u8],
334 quote_char: u8,
335 eol_char: u8,
336 #[cfg(feature = "simd")]
337 simd_eol_char: SimdVec,
338 #[cfg(feature = "simd")]
339 simd_quote_char: SimdVec,
340 #[cfg(feature = "simd")]
341 previous_valid_eols: u64,
342 total_index: usize,
343 quoting: bool,
344 comment_prefix: Option<&'a CommentPrefix>,
345}
346
347#[cfg(feature = "simd")]
348const SIMD_SIZE: usize = 64;
349#[cfg(feature = "simd")]
350use std::simd::prelude::*;
351
352#[cfg(feature = "simd")]
353use polars_utils::clmul::prefix_xorsum_inclusive;
354
355#[cfg(feature = "simd")]
356type SimdVec = u8x64;
357
358impl<'a> SplitLines<'a> {
359 pub fn new(
360 slice: &'a [u8],
361 quote_char: Option<u8>,
362 eol_char: u8,
363 comment_prefix: Option<&'a CommentPrefix>,
364 ) -> Self {
365 let quoting = quote_char.is_some();
366 let quote_char = quote_char.unwrap_or(b'\"');
367 #[cfg(feature = "simd")]
368 let simd_eol_char = SimdVec::splat(eol_char);
369 #[cfg(feature = "simd")]
370 let simd_quote_char = SimdVec::splat(quote_char);
371 Self {
372 v: slice,
373 quote_char,
374 eol_char,
375 #[cfg(feature = "simd")]
376 simd_eol_char,
377 #[cfg(feature = "simd")]
378 simd_quote_char,
379 #[cfg(feature = "simd")]
380 previous_valid_eols: 0,
381 total_index: 0,
382 quoting,
383 comment_prefix,
384 }
385 }
386}
387
388impl<'a> SplitLines<'a> {
389 fn next_scalar(&mut self) -> Option<&'a [u8]> {
391 if self.v.is_empty() {
392 return None;
393 }
394 if is_comment_line(self.v, self.comment_prefix) {
395 return self.next_comment_line();
396 }
397 {
398 let mut pos = 0u32;
399 let mut iter = self.v.iter();
400 let mut in_field = false;
401 loop {
402 match iter.next() {
403 Some(&c) => {
404 pos += 1;
405
406 if self.quoting && c == self.quote_char {
407 in_field = !in_field;
411 }
412 else if c == self.eol_char && !in_field {
414 break;
415 }
416 },
417 None => {
418 let remainder = self.v;
419 self.v = &[];
420 return Some(remainder);
421 },
422 }
423 }
424
425 unsafe {
426 debug_assert!((pos as usize) <= self.v.len());
427
428 let ret = Some(
430 self.v
431 .get_unchecked(..(self.total_index + pos as usize - 1)),
432 );
433 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
435 ret
436 }
437 }
438 }
439 fn next_comment_line(&mut self) -> Option<&'a [u8]> {
440 if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
441 unsafe {
442 let ret = Some(self.v.get_unchecked(..(pos - 1)));
444 self.v = self.v.get_unchecked(pos..);
446 ret
447 }
448 } else {
449 let remainder = self.v;
450 self.v = &[];
451 Some(remainder)
452 }
453 }
454}
455
456impl<'a> Iterator for SplitLines<'a> {
457 type Item = &'a [u8];
458
459 #[inline]
460 #[cfg(not(feature = "simd"))]
461 fn next(&mut self) -> Option<&'a [u8]> {
462 self.next_scalar()
463 }
464
465 #[inline]
466 #[cfg(feature = "simd")]
467 fn next(&mut self) -> Option<&'a [u8]> {
468 if self.previous_valid_eols != 0 {
470 let pos = self.previous_valid_eols.trailing_zeros() as usize;
471 self.previous_valid_eols >>= (pos + 1) as u64;
472
473 unsafe {
474 debug_assert!((pos) <= self.v.len());
475
476 let ret = Some(self.v.get_unchecked(..pos));
478 self.v = self.v.get_unchecked(pos + 1..);
480 return ret;
481 }
482 }
483 if self.v.is_empty() {
484 return None;
485 }
486 if self.comment_prefix.is_some() {
487 return self.next_scalar();
488 }
489
490 self.total_index = 0;
491 let mut not_in_field_previous_iter = true;
492
493 loop {
494 let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
495 if bytes.len() > SIMD_SIZE {
496 let lane: [u8; SIMD_SIZE] = unsafe {
497 bytes
498 .get_unchecked(0..SIMD_SIZE)
499 .try_into()
500 .unwrap_unchecked()
501 };
502 let simd_bytes = SimdVec::from(lane);
503 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
504
505 let valid_eols = if self.quoting {
506 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
507 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
508
509 if not_in_field_previous_iter {
510 not_in_quote_field = !not_in_quote_field;
511 }
512 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
513 eol_mask & not_in_quote_field
514 } else {
515 eol_mask
516 };
517
518 if valid_eols != 0 {
519 let pos = valid_eols.trailing_zeros() as usize;
520 if pos == SIMD_SIZE - 1 {
521 self.previous_valid_eols = 0;
522 } else {
523 self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
524 }
525
526 unsafe {
527 let pos = self.total_index + pos;
528 debug_assert!((pos) <= self.v.len());
529
530 let ret = Some(self.v.get_unchecked(..pos));
532 self.v = self.v.get_unchecked(pos + 1..);
534 return ret;
535 }
536 } else {
537 self.total_index += SIMD_SIZE;
538 }
539 } else {
540 let mut in_field = !not_in_field_previous_iter;
542 let mut pos = 0u32;
543 let mut iter = bytes.iter();
544 loop {
545 match iter.next() {
546 Some(&c) => {
547 pos += 1;
548
549 if self.quoting && c == self.quote_char {
550 in_field = !in_field;
554 }
555 else if c == self.eol_char && !in_field {
557 break;
558 }
559 },
560 None => {
561 let remainder = self.v;
562 self.v = &[];
563 return Some(remainder);
564 },
565 }
566 }
567
568 unsafe {
569 debug_assert!((pos as usize) <= self.v.len());
570
571 let ret = Some(
573 self.v
574 .get_unchecked(..(self.total_index + pos as usize - 1)),
575 );
576 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
578 return ret;
579 }
580 }
581 }
582 }
583}
584
585pub struct CountLines {
586 quote_char: u8,
587 eol_char: u8,
588 #[cfg(feature = "simd")]
589 simd_eol_char: SimdVec,
590 #[cfg(feature = "simd")]
591 simd_quote_char: SimdVec,
592 quoting: bool,
593 comment_prefix: Option<CommentPrefix>,
594}
595
596#[derive(Copy, Clone, Debug, Default)]
597pub struct LineStats {
598 pub newline_count: usize,
599 pub last_newline_offset: usize,
600 pub end_inside_string: bool,
601}
602
603impl CountLines {
604 pub fn new(
605 quote_char: Option<u8>,
606 eol_char: u8,
607 comment_prefix: Option<CommentPrefix>,
608 ) -> Self {
609 let quoting = quote_char.is_some();
610 let quote_char = quote_char.unwrap_or(b'\"');
611 #[cfg(feature = "simd")]
612 let simd_eol_char = SimdVec::splat(eol_char);
613 #[cfg(feature = "simd")]
614 let simd_quote_char = SimdVec::splat(quote_char);
615 Self {
616 quote_char,
617 eol_char,
618 #[cfg(feature = "simd")]
619 simd_eol_char,
620 #[cfg(feature = "simd")]
621 simd_quote_char,
622 quoting,
623 comment_prefix,
624 }
625 }
626
627 pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
636 let mut states = [
637 LineStats {
638 newline_count: 0,
639 last_newline_offset: 0,
640 end_inside_string: false,
641 },
642 LineStats {
643 newline_count: 0,
644 last_newline_offset: 0,
645 end_inside_string: false,
646 },
647 ];
648
649 if self.comment_prefix.is_some() {
651 states[0] = self.analyze_chunk_with_comment(bytes, false);
652 states[1] = self.analyze_chunk_with_comment(bytes, true);
653 return states;
654 }
655
656 #[allow(unused_assignments)]
658 let mut global_quote_parity = false;
659 let mut scan_offset = 0;
660
661 #[cfg(feature = "simd")]
662 {
663 let mut global_quote_parity_mask = 0;
665 while scan_offset + 64 <= bytes.len() {
666 let block: [u8; 64] = unsafe {
667 bytes
668 .get_unchecked(scan_offset..scan_offset + 64)
669 .try_into()
670 .unwrap_unchecked()
671 };
672 let simd_bytes = SimdVec::from(block);
673 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
674 if self.quoting {
675 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
676 let quote_parity =
677 prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
678 global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
679
680 let start_outside_string_eol_mask = eol_mask & !quote_parity;
681 states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
682 states[0].last_newline_offset = select_unpredictable(
683 start_outside_string_eol_mask != 0,
684 (scan_offset + 63)
685 .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
686 states[0].last_newline_offset,
687 );
688
689 let start_inside_string_eol_mask = eol_mask & quote_parity;
690 states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
691 states[1].last_newline_offset = select_unpredictable(
692 start_inside_string_eol_mask != 0,
693 (scan_offset + 63)
694 .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
695 states[1].last_newline_offset,
696 );
697 } else {
698 states[0].newline_count += eol_mask.count_ones() as usize;
699 states[0].last_newline_offset = select_unpredictable(
700 eol_mask != 0,
701 (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
702 states[0].last_newline_offset,
703 );
704 }
705
706 scan_offset += 64;
707 }
708
709 global_quote_parity = global_quote_parity_mask > 0;
710 }
711
712 while scan_offset < bytes.len() {
713 let c = unsafe { *bytes.get_unchecked(scan_offset) };
714 global_quote_parity ^= (c == self.quote_char) & self.quoting;
715
716 let state = &mut states[global_quote_parity as usize];
717 state.newline_count += (c == self.eol_char) as usize;
718 state.last_newline_offset =
719 select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
720
721 scan_offset += 1;
722 }
723
724 states[0].end_inside_string = global_quote_parity;
725 states[1].end_inside_string = !global_quote_parity;
726 states
727 }
728
729 fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
731 let pre_s = match self.comment_prefix.as_ref().unwrap() {
732 CommentPrefix::Single(pc) => core::slice::from_ref(pc),
733 CommentPrefix::Multi(ps) => ps.as_bytes(),
734 };
735
736 let mut state = LineStats::default();
737 let mut scan_offset = 0;
738 while scan_offset < bytes.len() {
739 while bytes[scan_offset..].starts_with(pre_s) {
741 scan_offset += pre_s.len();
742 let Some(nl_off) = bytes[scan_offset..]
743 .iter()
744 .position(|c| *c == self.eol_char)
745 else {
746 break;
747 };
748 scan_offset += nl_off + 1;
749 }
750
751 while scan_offset < bytes.len() {
752 let c = unsafe { *bytes.get_unchecked(scan_offset) };
753 in_string ^= (c == self.quote_char) & self.quoting;
754
755 if c == self.eol_char && !in_string {
756 state.newline_count += 1;
757 state.last_newline_offset = scan_offset;
758 scan_offset += 1;
759 break;
760 } else {
761 scan_offset += 1;
762 }
763 }
764 }
765
766 state.end_inside_string = in_string;
767 state
768 }
769
770 pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
771 loop {
772 let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
773
774 let (count, offset) = if self.comment_prefix.is_some() {
775 let stats = self.analyze_chunk_with_comment(b, false);
776 (stats.newline_count, stats.last_newline_offset)
777 } else {
778 self.count(b)
779 };
780
781 if count > 0 || b.len() == bytes.len() {
782 return (count, offset);
783 }
784
785 *chunk_size = chunk_size.saturating_mul(2);
786 }
787 }
788
789 pub fn count_rows(&self, bytes: &[u8], is_eof: bool) -> (usize, usize) {
790 let stats = if self.comment_prefix.is_some() {
791 self.analyze_chunk_with_comment(bytes, false)
792 } else {
793 self.analyze_chunk(bytes)[0]
794 };
795
796 let mut count = stats.newline_count;
797 let mut offset = stats.last_newline_offset;
798
799 if count > 0 {
800 offset = cmp::min(offset + 1, bytes.len());
801 } else {
802 debug_assert!(offset == 0);
803 }
804
805 if is_eof {
806 if !bytes.is_empty() && bytes.last().copied().unwrap() != self.eol_char {
808 let last_new_line_post = memchr::memrchr(self.eol_char, bytes).unwrap_or(0);
811 let last_line_is_comment_line = bytes
812 .get(last_new_line_post + 1..)
813 .map(|line| is_comment_line(line, self.comment_prefix.as_ref()))
814 .unwrap_or(false);
815
816 count += !last_line_is_comment_line as usize;
817 }
818 offset = bytes.len();
819 }
820
821 (count, offset)
822 }
823
824 #[cfg(feature = "simd")]
826 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
827 let mut total_idx = 0;
828 let original_bytes = bytes;
829 let mut count = 0;
830 let mut position = 0;
831 let mut not_in_field_previous_iter = true;
832
833 loop {
834 let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
835
836 if bytes.len() > SIMD_SIZE {
837 let lane: [u8; SIMD_SIZE] = unsafe {
838 bytes
839 .get_unchecked(0..SIMD_SIZE)
840 .try_into()
841 .unwrap_unchecked()
842 };
843 let simd_bytes = SimdVec::from(lane);
844 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
845
846 let valid_eols = if self.quoting {
847 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
848 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
849
850 if not_in_field_previous_iter {
851 not_in_quote_field = !not_in_quote_field;
852 }
853 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
854 eol_mask & not_in_quote_field
855 } else {
856 eol_mask
857 };
858
859 if valid_eols != 0 {
860 count += valid_eols.count_ones() as usize;
861 position = total_idx + 63 - valid_eols.leading_zeros() as usize;
862 debug_assert_eq!(original_bytes[position], self.eol_char)
863 }
864 total_idx += SIMD_SIZE;
865 } else if bytes.is_empty() {
866 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
867 return (count, position);
868 } else {
869 let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
870
871 let (count, position) = if c > 0 {
872 (count + c, total_idx + o)
873 } else {
874 (count, position)
875 };
876 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
877
878 return (count, position);
879 }
880 }
881 }
882
883 #[cfg(not(feature = "simd"))]
884 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
885 self.count_no_simd(bytes, false)
886 }
887
888 fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
889 let iter = bytes.iter();
890 let mut in_field = in_field;
891 let mut count = 0;
892 let mut position = 0;
893
894 for b in iter {
895 let c = *b;
896 if self.quoting && c == self.quote_char {
897 in_field = !in_field;
901 }
902 else if c == self.eol_char && !in_field {
904 position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
905 count += 1;
906 }
907 }
908 debug_assert!(count == 0 || bytes[position] == self.eol_char);
909
910 (count, position)
911 }
912}
913
914#[inline]
915fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
916 let mut in_field = false;
917
918 let mut idx = 0u32;
919 #[allow(clippy::explicit_counter_loop)]
921 for &c in bytes.iter() {
922 if c == quote_char {
923 in_field = !in_field;
927 }
928
929 if !in_field && c == needle {
930 return Some(idx as usize);
931 }
932 idx += 1;
933 }
934 None
935}
936
937#[inline]
938pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
939 let pos = match quote {
940 Some(quote) => find_quoted(bytes, quote, eol_char),
941 None => bytes.iter().position(|x| *x == eol_char),
942 };
943 match pos {
944 None => &[],
945 Some(pos) => &bytes[pos + 1..],
946 }
947}
948
949#[inline]
950pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
951 if let Some(pos) = next_line_position_naive(input, eol_char) {
952 unsafe { input.get_unchecked(pos..) }
953 } else {
954 &[]
955 }
956}
957
958#[allow(clippy::too_many_arguments)]
970pub(super) fn parse_lines(
971 mut bytes: &[u8],
972 parse_options: &CsvParseOptions,
973 offset: usize,
974 ignore_errors: bool,
975 null_values: Option<&NullValuesCompiled>,
976 projection: &[usize],
977 buffers: &mut [Buffer],
978 n_lines: usize,
979 schema_len: usize,
981 schema: &Schema,
982) -> PolarsResult<usize> {
983 assert!(
984 !projection.is_empty(),
985 "at least one column should be projected"
986 );
987 let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
988 if projection.len() != schema_len {
992 truncate_ragged_lines = true
993 }
994
995 let start = bytes.as_ptr() as usize;
997 let original_bytes_len = bytes.len();
998 let n_lines = n_lines as u32;
999
1000 let mut line_count = 0u32;
1001 loop {
1002 if line_count > n_lines {
1003 let end = bytes.as_ptr() as usize;
1004 return Ok(end - start);
1005 }
1006
1007 if bytes.is_empty() {
1008 return Ok(original_bytes_len);
1009 } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1010 let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1012 bytes = bytes_rem;
1013 continue;
1014 }
1015
1016 let mut projection_iter = projection.iter().copied();
1020 let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1021 let mut processed_fields = 0;
1022
1023 let mut iter = SplitFields::new(
1024 bytes,
1025 parse_options.separator,
1026 parse_options.quote_char,
1027 parse_options.eol_char,
1028 );
1029 let mut idx = 0u32;
1030 let mut read_sol = 0;
1031 loop {
1032 match iter.next() {
1033 None => {
1035 bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1036 break;
1037 },
1038 Some((mut field, needs_escaping)) => {
1039 let field_len = field.len();
1040
1041 read_sol += field_len + 1;
1043
1044 if idx == next_projected as u32 {
1045 unsafe {
1048 if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1049 field = field.get_unchecked(..field_len - 1);
1050 }
1051 }
1052
1053 debug_assert!(processed_fields < buffers.len());
1054 let buf = unsafe {
1055 buffers.get_unchecked_mut(processed_fields)
1057 };
1058 let mut add_null = false;
1059
1060 if let Some(null_values) = null_values {
1062 let field = if needs_escaping && !field.is_empty() {
1063 unsafe { field.get_unchecked(1..field.len() - 1) }
1064 } else {
1065 field
1066 };
1067
1068 add_null = unsafe { null_values.is_null(field, idx as usize) }
1071 }
1072 if add_null {
1073 buf.add_null(!parse_options.missing_is_null && field.is_empty())
1074 } else {
1075 buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1076 .map_err(|e| {
1077 let bytes_offset = offset + field.as_ptr() as usize - start;
1078 let unparsable = String::from_utf8_lossy(field);
1079 let column_name = schema.get_at_index(idx as usize).unwrap().0;
1080 polars_err!(
1081 ComputeError:
1082 "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1083 The current offset in the file is {} bytes.\n\
1084 \n\
1085 You might want to try:\n\
1086 - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1087 - specifying correct dtype with the `schema_overrides` argument\n\
1088 - setting `ignore_errors` to `True`,\n\
1089 - adding `{}` to the `null_values` list.\n\n\
1090 Original error: ```{}```",
1091 &unparsable,
1092 buf.dtype(),
1093 column_name,
1094 idx + 1,
1095 bytes_offset,
1096 &unparsable,
1097 e
1098 )
1099 })?;
1100 }
1101 processed_fields += 1;
1102
1103 match projection_iter.next() {
1105 Some(p) => next_projected = p,
1106 None => {
1107 if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1108 bytes = unsafe { bytes.get_unchecked(read_sol..) };
1109 } else {
1110 if !truncate_ragged_lines && read_sol < bytes.len() {
1111 polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1112
1113Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1114 }
1115 let bytes_rem = skip_this_line(
1116 unsafe { bytes.get_unchecked(read_sol - 1..) },
1117 parse_options.quote_char,
1118 parse_options.eol_char,
1119 );
1120 bytes = bytes_rem;
1121 }
1122 break;
1123 },
1124 }
1125 }
1126 idx += 1;
1127 },
1128 }
1129 }
1130
1131 while processed_fields < projection.len() {
1135 debug_assert!(processed_fields < buffers.len());
1136 let buf = unsafe {
1137 buffers.get_unchecked_mut(processed_fields)
1139 };
1140 buf.add_null(!parse_options.missing_is_null);
1141 processed_fields += 1;
1142 }
1143 line_count += 1;
1144 }
1145}
1146
1147#[cfg(test)]
1148mod test {
1149 use super::SplitLines;
1150
1151 #[test]
1152 fn test_splitlines() {
1153 let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1154 let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1155 assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1156 assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1157 assert_eq!(lines.next(), None);
1158
1159 let input2 = "1,'foo\n'\n2,'foo\n'\n";
1160 let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1161 assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1162 assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1163 assert_eq!(lines2.next(), None);
1164 }
1165}