1use std::path::Path;
2
3use memchr::memchr2_iter;
4use num_traits::Pow;
5use polars_core::prelude::*;
6use polars_core::{POOL, config};
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::select::select_unpredictable;
10use rayon::prelude::*;
11
12use super::CsvParseOptions;
13use super::buffer::Buffer;
14use super::options::{CommentPrefix, NullValuesCompiled};
15use super::splitfields::SplitFields;
16use super::utils::get_file_chunks;
17use crate::path_utils::is_cloud_url;
18use crate::utils::compression::maybe_decompress_bytes;
19
20pub fn count_rows(
23 path: &Path,
24 separator: u8,
25 quote_char: Option<u8>,
26 comment_prefix: Option<&CommentPrefix>,
27 eol_char: u8,
28 has_header: bool,
29) -> PolarsResult<usize> {
30 let file = if is_cloud_url(path) || config::force_async() {
31 feature_gated!("cloud", {
32 crate::file_cache::FILE_CACHE
33 .get_entry(path.to_str().unwrap())
34 .unwrap()
36 .try_open_assume_latest()?
37 })
38 } else {
39 polars_utils::open_file(path)?
40 };
41
42 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
43 let owned = &mut vec![];
44 let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
45
46 count_rows_from_slice_par(
47 reader_bytes,
48 separator,
49 quote_char,
50 comment_prefix,
51 eol_char,
52 has_header,
53 )
54}
55
56pub fn count_rows_from_slice_par(
59 mut bytes: &[u8],
60 separator: u8,
61 quote_char: Option<u8>,
62 comment_prefix: Option<&CommentPrefix>,
63 eol_char: u8,
64 has_header: bool,
65) -> PolarsResult<usize> {
66 for _ in 0..bytes.len() {
67 if bytes[0] != eol_char {
68 break;
69 }
70
71 bytes = &bytes[1..];
72 }
73
74 const MIN_ROWS_PER_THREAD: usize = 1024;
75 let max_threads = POOL.current_num_threads();
76
77 let n_threads = get_line_stats(
79 bytes,
80 MIN_ROWS_PER_THREAD,
81 eol_char,
82 None,
83 separator,
84 quote_char,
85 )
86 .map(|(mean, std)| {
87 let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
88 (n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
89 })
90 .unwrap_or(1);
91
92 if n_threads == 1 {
93 return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, has_header);
94 }
95
96 let file_chunks: Vec<(usize, usize)> =
97 get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);
98
99 let iter = file_chunks.into_par_iter().map(|(start, stop)| {
100 let bytes = &bytes[start..stop];
101
102 if comment_prefix.is_some() {
103 SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
104 .filter(|line| !is_comment_line(line, comment_prefix))
105 .count()
106 } else {
107 CountLines::new(quote_char, eol_char).count(bytes).0
108 + bytes.last().is_some_and(|x| *x != b'\n') as usize
109 }
110 });
111
112 let n: usize = POOL.install(|| iter.sum());
113
114 Ok(n - (has_header as usize))
115}
116
117pub fn count_rows_from_slice(
119 mut bytes: &[u8],
120 quote_char: Option<u8>,
121 comment_prefix: Option<&CommentPrefix>,
122 eol_char: u8,
123 has_header: bool,
124) -> PolarsResult<usize> {
125 for _ in 0..bytes.len() {
126 if bytes[0] != eol_char {
127 break;
128 }
129
130 bytes = &bytes[1..];
131 }
132
133 let n = if comment_prefix.is_some() {
134 SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
135 .filter(|line| !is_comment_line(line, comment_prefix))
136 .count()
137 } else {
138 CountLines::new(quote_char, eol_char).count(bytes).0
139 + bytes.last().is_some_and(|x| *x != b'\n') as usize
140 };
141
142 Ok(n - (has_header as usize))
143}
144
145pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
148 if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
149 &input[3..]
150 } else {
151 input
152 }
153}
154
155#[inline]
159pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
160 match comment_prefix {
161 Some(CommentPrefix::Single(c)) => line.first() == Some(c),
162 Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
163 None => false,
164 }
165}
166
167pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
170 let pos = memchr::memchr(eol_char, input)? + 1;
171 if input.len() - pos == 0 {
172 return None;
173 }
174 Some(pos)
175}
176
177pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
178 for _ in 0..skip {
179 if let Some(pos) = next_line_position_naive(input, eol_char) {
180 input = &input[pos..];
181 } else {
182 return input;
183 }
184 }
185 input
186}
187
188pub(super) fn next_line_position(
190 mut input: &[u8],
191 mut expected_fields: Option<usize>,
192 separator: u8,
193 quote_char: Option<u8>,
194 eol_char: u8,
195) -> Option<usize> {
196 fn accept_line(
197 line: &[u8],
198 expected_fields: usize,
199 separator: u8,
200 eol_char: u8,
201 quote_char: Option<u8>,
202 ) -> bool {
203 let mut count = 0usize;
204 for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
205 if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
206 return false;
207 }
208 count += 1;
209 }
210
211 expected_fields.wrapping_sub(count) <= 1
221 }
222
223 let mut rejected_line_groups = 0u8;
226
227 let mut total_pos = 0;
228 if input.is_empty() {
229 return None;
230 }
231 let mut lines_checked = 0u8;
232 loop {
233 if rejected_line_groups >= 3 {
234 return None;
235 }
236 lines_checked = lines_checked.wrapping_add(1);
237 if lines_checked == u8::MAX {
241 if let Some(ef) = expected_fields {
242 expected_fields = Some(ef.saturating_sub(1))
243 }
244 };
245 let pos = memchr::memchr(eol_char, input)? + 1;
246 if input.len() - pos == 0 {
247 return None;
248 }
249 debug_assert!(pos <= input.len());
250 let new_input = unsafe { input.get_unchecked(pos..) };
251 let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
252 let line = lines.next();
253
254 match (line, expected_fields) {
255 (Some(line), Some(expected_fields)) => {
257 if accept_line(line, expected_fields, separator, eol_char, quote_char) {
258 let mut valid = true;
259 for line in lines.take(2) {
260 if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
261 valid = false;
262 break;
263 }
264 }
265 if valid {
266 return Some(total_pos + pos);
267 } else {
268 rejected_line_groups += 1;
269 }
270 } else {
271 debug_assert!(pos < input.len());
272 unsafe {
273 input = input.get_unchecked(pos + 1..);
274 }
275 total_pos += pos + 1;
276 }
277 },
278 (Some(_), None) => return Some(total_pos + pos),
280 _ => return None,
282 }
283 }
284}
285
286pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
287 b == eol_char || b == b'\r'
288}
289
290pub(super) fn is_whitespace(b: u8) -> bool {
291 b == b' ' || b == b'\t'
292}
293
294#[inline]
295fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
296where
297 F: Fn(u8) -> bool,
298{
299 if input.is_empty() {
300 return input;
301 }
302
303 let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
304 &input[read..]
305}
306
307#[inline]
313pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
314 skip_condition(input, is_whitespace)
315}
316
317#[inline]
318pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
319 skip_condition(input, |b| is_line_ending(b, eol_char))
320}
321
322pub(super) fn get_line_stats(
324 bytes: &[u8],
325 n_lines: usize,
326 eol_char: u8,
327 expected_fields: Option<usize>,
328 separator: u8,
329 quote_char: Option<u8>,
330) -> Option<(f32, f32)> {
331 let mut lengths = Vec::with_capacity(n_lines);
332
333 let mut bytes_trunc;
334 let n_lines_per_iter = n_lines / 2;
335
336 let mut n_read = 0;
337
338 for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
340 bytes_trunc = &bytes[offset..];
341 let pos = next_line_position(
342 bytes_trunc,
343 expected_fields,
344 separator,
345 quote_char,
346 eol_char,
347 )?;
348 bytes_trunc = &bytes_trunc[pos + 1..];
349
350 for _ in offset..(offset + n_lines_per_iter) {
351 let pos = next_line_position_naive(bytes_trunc, eol_char)? + 1;
352 n_read += pos;
353 lengths.push(pos);
354 bytes_trunc = &bytes_trunc[pos..];
355 }
356 }
357
358 let n_samples = lengths.len();
359
360 let mean = (n_read as f32) / (n_samples as f32);
361 let mut std = 0.0;
362 for &len in lengths.iter() {
363 std += (len as f32 - mean).pow(2.0)
364 }
365 std = (std / n_samples as f32).sqrt();
366 Some((mean, std))
367}
368
369pub(super) struct SplitLines<'a> {
379 v: &'a [u8],
380 quote_char: u8,
381 eol_char: u8,
382 #[cfg(feature = "simd")]
383 simd_eol_char: SimdVec,
384 #[cfg(feature = "simd")]
385 simd_quote_char: SimdVec,
386 #[cfg(feature = "simd")]
387 previous_valid_eols: u64,
388 total_index: usize,
389 quoting: bool,
390 comment_prefix: Option<&'a CommentPrefix>,
391}
392
393#[cfg(feature = "simd")]
394const SIMD_SIZE: usize = 64;
395#[cfg(feature = "simd")]
396use std::simd::prelude::*;
397
398#[cfg(feature = "simd")]
399use polars_utils::clmul::prefix_xorsum_inclusive;
400
401#[cfg(feature = "simd")]
402type SimdVec = u8x64;
403
404impl<'a> SplitLines<'a> {
405 pub(super) fn new(
406 slice: &'a [u8],
407 quote_char: Option<u8>,
408 eol_char: u8,
409 comment_prefix: Option<&'a CommentPrefix>,
410 ) -> Self {
411 let quoting = quote_char.is_some();
412 let quote_char = quote_char.unwrap_or(b'\"');
413 #[cfg(feature = "simd")]
414 let simd_eol_char = SimdVec::splat(eol_char);
415 #[cfg(feature = "simd")]
416 let simd_quote_char = SimdVec::splat(quote_char);
417 Self {
418 v: slice,
419 quote_char,
420 eol_char,
421 #[cfg(feature = "simd")]
422 simd_eol_char,
423 #[cfg(feature = "simd")]
424 simd_quote_char,
425 #[cfg(feature = "simd")]
426 previous_valid_eols: 0,
427 total_index: 0,
428 quoting,
429 comment_prefix,
430 }
431 }
432}
433
434impl<'a> SplitLines<'a> {
435 fn next_scalar(&mut self) -> Option<&'a [u8]> {
437 if self.v.is_empty() {
438 return None;
439 }
440 if is_comment_line(self.v, self.comment_prefix) {
441 return self.next_comment_line();
442 }
443 {
444 let mut pos = 0u32;
445 let mut iter = self.v.iter();
446 let mut in_field = false;
447 loop {
448 match iter.next() {
449 Some(&c) => {
450 pos += 1;
451
452 if self.quoting && c == self.quote_char {
453 in_field = !in_field;
457 }
458 else if c == self.eol_char && !in_field {
460 break;
461 }
462 },
463 None => {
464 let remainder = self.v;
465 self.v = &[];
466 return Some(remainder);
467 },
468 }
469 }
470
471 unsafe {
472 debug_assert!((pos as usize) <= self.v.len());
473
474 let ret = Some(
476 self.v
477 .get_unchecked(..(self.total_index + pos as usize - 1)),
478 );
479 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
481 ret
482 }
483 }
484 }
485 fn next_comment_line(&mut self) -> Option<&'a [u8]> {
486 if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
487 unsafe {
488 let ret = Some(self.v.get_unchecked(..(pos - 1)));
490 self.v = self.v.get_unchecked(pos..);
492 ret
493 }
494 } else {
495 let remainder = self.v;
496 self.v = &[];
497 Some(remainder)
498 }
499 }
500}
501
502impl<'a> Iterator for SplitLines<'a> {
503 type Item = &'a [u8];
504
505 #[inline]
506 #[cfg(not(feature = "simd"))]
507 fn next(&mut self) -> Option<&'a [u8]> {
508 self.next_scalar()
509 }
510
511 #[inline]
512 #[cfg(feature = "simd")]
513 fn next(&mut self) -> Option<&'a [u8]> {
514 if self.previous_valid_eols != 0 {
516 let pos = self.previous_valid_eols.trailing_zeros() as usize;
517 self.previous_valid_eols >>= (pos + 1) as u64;
518
519 unsafe {
520 debug_assert!((pos) <= self.v.len());
521
522 let ret = Some(self.v.get_unchecked(..pos));
524 self.v = self.v.get_unchecked(pos + 1..);
526 return ret;
527 }
528 }
529 if self.v.is_empty() {
530 return None;
531 }
532 if self.comment_prefix.is_some() {
533 return self.next_scalar();
534 }
535
536 self.total_index = 0;
537 let mut not_in_field_previous_iter = true;
538
539 loop {
540 let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
541 if bytes.len() > SIMD_SIZE {
542 let lane: [u8; SIMD_SIZE] = unsafe {
543 bytes
544 .get_unchecked(0..SIMD_SIZE)
545 .try_into()
546 .unwrap_unchecked()
547 };
548 let simd_bytes = SimdVec::from(lane);
549 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
550
551 let valid_eols = if self.quoting {
552 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
553 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
554
555 if not_in_field_previous_iter {
556 not_in_quote_field = !not_in_quote_field;
557 }
558 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
559 eol_mask & not_in_quote_field
560 } else {
561 eol_mask
562 };
563
564 if valid_eols != 0 {
565 let pos = valid_eols.trailing_zeros() as usize;
566 if pos == SIMD_SIZE - 1 {
567 self.previous_valid_eols = 0;
568 } else {
569 self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
570 }
571
572 unsafe {
573 let pos = self.total_index + pos;
574 debug_assert!((pos) <= self.v.len());
575
576 let ret = Some(self.v.get_unchecked(..pos));
578 self.v = self.v.get_unchecked(pos + 1..);
580 return ret;
581 }
582 } else {
583 self.total_index += SIMD_SIZE;
584 }
585 } else {
586 let mut in_field = !not_in_field_previous_iter;
588 let mut pos = 0u32;
589 let mut iter = bytes.iter();
590 loop {
591 match iter.next() {
592 Some(&c) => {
593 pos += 1;
594
595 if self.quoting && c == self.quote_char {
596 in_field = !in_field;
600 }
601 else if c == self.eol_char && !in_field {
603 break;
604 }
605 },
606 None => {
607 let remainder = self.v;
608 self.v = &[];
609 return Some(remainder);
610 },
611 }
612 }
613
614 unsafe {
615 debug_assert!((pos as usize) <= self.v.len());
616
617 let ret = Some(
619 self.v
620 .get_unchecked(..(self.total_index + pos as usize - 1)),
621 );
622 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
624 return ret;
625 }
626 }
627 }
628 }
629}
630
631pub struct CountLines {
632 quote_char: u8,
633 eol_char: u8,
634 #[cfg(feature = "simd")]
635 simd_eol_char: SimdVec,
636 #[cfg(feature = "simd")]
637 simd_quote_char: SimdVec,
638 quoting: bool,
639}
640
641#[derive(Copy, Clone, Debug)]
642pub struct LineStats {
643 newline_count: usize,
644 last_newline_offset: usize,
645 end_inside_string: bool,
646}
647
648impl CountLines {
649 pub fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
650 let quoting = quote_char.is_some();
651 let quote_char = quote_char.unwrap_or(b'\"');
652 #[cfg(feature = "simd")]
653 let simd_eol_char = SimdVec::splat(eol_char);
654 #[cfg(feature = "simd")]
655 let simd_quote_char = SimdVec::splat(quote_char);
656 Self {
657 quote_char,
658 eol_char,
659 #[cfg(feature = "simd")]
660 simd_eol_char,
661 #[cfg(feature = "simd")]
662 simd_quote_char,
663 quoting,
664 }
665 }
666
667 pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
673 let mut scan_offset = 0;
674 let mut states = [
675 LineStats {
676 newline_count: 0,
677 last_newline_offset: 0,
678 end_inside_string: false,
679 },
680 LineStats {
681 newline_count: 0,
682 last_newline_offset: 0,
683 end_inside_string: false,
684 },
685 ];
686
687 #[allow(unused_assignments)]
689 let mut global_quote_parity = false;
690
691 #[cfg(feature = "simd")]
692 {
693 let mut global_quote_parity_mask = 0;
695 while scan_offset + 64 <= bytes.len() {
696 let block: [u8; 64] = unsafe {
697 bytes
698 .get_unchecked(scan_offset..scan_offset + 64)
699 .try_into()
700 .unwrap_unchecked()
701 };
702 let simd_bytes = SimdVec::from(block);
703 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
704 if self.quoting {
705 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
706 let quote_parity =
707 prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
708 global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
709
710 let start_outside_string_eol_mask = eol_mask & !quote_parity;
711 states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
712 states[0].last_newline_offset = select_unpredictable(
713 start_outside_string_eol_mask != 0,
714 (scan_offset + 63)
715 .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
716 states[0].last_newline_offset,
717 );
718
719 let start_inside_string_eol_mask = eol_mask & quote_parity;
720 states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
721 states[1].last_newline_offset = select_unpredictable(
722 start_inside_string_eol_mask != 0,
723 (scan_offset + 63)
724 .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
725 states[1].last_newline_offset,
726 );
727 } else {
728 states[0].newline_count += eol_mask.count_ones() as usize;
729 states[0].last_newline_offset = select_unpredictable(
730 eol_mask != 0,
731 (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
732 states[0].last_newline_offset,
733 );
734 }
735
736 scan_offset += 64;
737 }
738
739 global_quote_parity = global_quote_parity_mask > 0;
740 }
741
742 while scan_offset < bytes.len() {
743 let c = unsafe { *bytes.get_unchecked(scan_offset) };
744 global_quote_parity ^= (c == self.quote_char) & self.quoting;
745
746 let state = &mut states[global_quote_parity as usize];
747 state.newline_count += (c == self.eol_char) as usize;
748 state.last_newline_offset =
749 select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
750
751 scan_offset += 1;
752 }
753
754 states[0].end_inside_string = global_quote_parity;
755 states[1].end_inside_string = !global_quote_parity;
756 states
757 }
758
759 pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
760 loop {
761 let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
762
763 let (count, offset) = self.count(b);
764
765 if count > 0 || b.len() == bytes.len() {
766 return (count, offset);
767 }
768
769 *chunk_size *= 2;
770 }
771 }
772
773 #[cfg(feature = "simd")]
775 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
776 let mut total_idx = 0;
777 let original_bytes = bytes;
778 let mut count = 0;
779 let mut position = 0;
780 let mut not_in_field_previous_iter = true;
781
782 loop {
783 let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
784
785 if bytes.len() > SIMD_SIZE {
786 let lane: [u8; SIMD_SIZE] = unsafe {
787 bytes
788 .get_unchecked(0..SIMD_SIZE)
789 .try_into()
790 .unwrap_unchecked()
791 };
792 let simd_bytes = SimdVec::from(lane);
793 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
794
795 let valid_eols = if self.quoting {
796 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
797 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
798
799 if not_in_field_previous_iter {
800 not_in_quote_field = !not_in_quote_field;
801 }
802 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
803 eol_mask & not_in_quote_field
804 } else {
805 eol_mask
806 };
807
808 if valid_eols != 0 {
809 count += valid_eols.count_ones() as usize;
810 position = total_idx + 63 - valid_eols.leading_zeros() as usize;
811 debug_assert_eq!(original_bytes[position], self.eol_char)
812 }
813 total_idx += SIMD_SIZE;
814 } else if bytes.is_empty() {
815 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
816 return (count, position);
817 } else {
818 let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
819
820 let (count, position) = if c > 0 {
821 (count + c, total_idx + o)
822 } else {
823 (count, position)
824 };
825 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
826
827 return (count, position);
828 }
829 }
830 }
831
832 #[cfg(not(feature = "simd"))]
833 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
834 self.count_no_simd(bytes, false)
835 }
836
837 fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
838 let iter = bytes.iter();
839 let mut in_field = in_field;
840 let mut count = 0;
841 let mut position = 0;
842
843 for b in iter {
844 let c = *b;
845 if self.quoting && c == self.quote_char {
846 in_field = !in_field;
850 }
851 else if c == self.eol_char && !in_field {
853 position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
854 count += 1;
855 }
856 }
857 debug_assert!(count == 0 || bytes[position] == self.eol_char);
858
859 (count, position)
860 }
861}
862
863#[inline]
864fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
865 let mut in_field = false;
866
867 let mut idx = 0u32;
868 #[allow(clippy::explicit_counter_loop)]
870 for &c in bytes.iter() {
871 if c == quote_char {
872 in_field = !in_field;
876 }
877
878 if !in_field && c == needle {
879 return Some(idx as usize);
880 }
881 idx += 1;
882 }
883 None
884}
885
886#[inline]
887pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
888 let pos = match quote {
889 Some(quote) => find_quoted(bytes, quote, eol_char),
890 None => bytes.iter().position(|x| *x == eol_char),
891 };
892 match pos {
893 None => &[],
894 Some(pos) => &bytes[pos + 1..],
895 }
896}
897
898#[inline]
899pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
900 if let Some(pos) = next_line_position_naive(input, eol_char) {
901 unsafe { input.get_unchecked(pos..) }
902 } else {
903 &[]
904 }
905}
906
907#[allow(clippy::too_many_arguments)]
917pub(super) fn parse_lines(
918 mut bytes: &[u8],
919 parse_options: &CsvParseOptions,
920 offset: usize,
921 ignore_errors: bool,
922 null_values: Option<&NullValuesCompiled>,
923 projection: &[usize],
924 buffers: &mut [Buffer],
925 n_lines: usize,
926 schema_len: usize,
928 schema: &Schema,
929) -> PolarsResult<usize> {
930 assert!(
931 !projection.is_empty(),
932 "at least one column should be projected"
933 );
934 let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
935 if projection.len() != schema_len {
939 truncate_ragged_lines = true
940 }
941
942 let start = bytes.as_ptr() as usize;
944 let original_bytes_len = bytes.len();
945 let n_lines = n_lines as u32;
946
947 let mut line_count = 0u32;
948 loop {
949 if line_count > n_lines {
950 let end = bytes.as_ptr() as usize;
951 return Ok(end - start);
952 }
953
954 if bytes.is_empty() {
955 return Ok(original_bytes_len);
956 } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
957 let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
959 bytes = bytes_rem;
960 continue;
961 }
962
963 let mut projection_iter = projection.iter().copied();
967 let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
968 let mut processed_fields = 0;
969
970 let mut iter = SplitFields::new(
971 bytes,
972 parse_options.separator,
973 parse_options.quote_char,
974 parse_options.eol_char,
975 );
976 let mut idx = 0u32;
977 let mut read_sol = 0;
978 loop {
979 match iter.next() {
980 None => {
982 bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
983 break;
984 },
985 Some((mut field, needs_escaping)) => {
986 let field_len = field.len();
987
988 read_sol += field_len + 1;
990
991 if idx == next_projected as u32 {
992 unsafe {
995 if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
996 field = field.get_unchecked(..field_len - 1);
997 }
998 }
999
1000 debug_assert!(processed_fields < buffers.len());
1001 let buf = unsafe {
1002 buffers.get_unchecked_mut(processed_fields)
1004 };
1005 let mut add_null = false;
1006
1007 if let Some(null_values) = null_values {
1009 let field = if needs_escaping && !field.is_empty() {
1010 unsafe { field.get_unchecked(1..field.len() - 1) }
1011 } else {
1012 field
1013 };
1014
1015 add_null = unsafe { null_values.is_null(field, idx as usize) }
1018 }
1019 if add_null {
1020 buf.add_null(!parse_options.missing_is_null && field.is_empty())
1021 } else {
1022 buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1023 .map_err(|e| {
1024 let bytes_offset = offset + field.as_ptr() as usize - start;
1025 let unparsable = String::from_utf8_lossy(field);
1026 let column_name = schema.get_at_index(idx as usize).unwrap().0;
1027 polars_err!(
1028 ComputeError:
1029 "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1030 The current offset in the file is {} bytes.\n\
1031 \n\
1032 You might want to try:\n\
1033 - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1034 - specifying correct dtype with the `schema_overrides` argument\n\
1035 - setting `ignore_errors` to `True`,\n\
1036 - adding `{}` to the `null_values` list.\n\n\
1037 Original error: ```{}```",
1038 &unparsable,
1039 buf.dtype(),
1040 column_name,
1041 idx + 1,
1042 bytes_offset,
1043 &unparsable,
1044 e
1045 )
1046 })?;
1047 }
1048 processed_fields += 1;
1049
1050 match projection_iter.next() {
1052 Some(p) => next_projected = p,
1053 None => {
1054 if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1055 bytes = &bytes[read_sol..];
1056 } else {
1057 if !truncate_ragged_lines && read_sol < bytes.len() {
1058 polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1059
1060Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1061 }
1062 let bytes_rem = skip_this_line(
1063 unsafe { bytes.get_unchecked(read_sol - 1..) },
1064 parse_options.quote_char,
1065 parse_options.eol_char,
1066 );
1067 bytes = bytes_rem;
1068 }
1069 break;
1070 },
1071 }
1072 }
1073 idx += 1;
1074 },
1075 }
1076 }
1077
1078 while processed_fields < projection.len() {
1082 debug_assert!(processed_fields < buffers.len());
1083 let buf = unsafe {
1084 buffers.get_unchecked_mut(processed_fields)
1086 };
1087 buf.add_null(!parse_options.missing_is_null);
1088 processed_fields += 1;
1089 }
1090 line_count += 1;
1091 }
1092}
1093
1094#[cfg(test)]
1095mod test {
1096 use super::SplitLines;
1097
1098 #[test]
1099 fn test_splitlines() {
1100 let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1101 let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1102 assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1103 assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1104 assert_eq!(lines.next(), None);
1105
1106 let input2 = "1,'foo\n'\n2,'foo\n'\n";
1107 let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1108 assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1109 assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1110 assert_eq!(lines2.next(), None);
1111 }
1112}