1use std::path::Path;
2
3use memchr::memchr2_iter;
4use num_traits::Pow;
5use polars_core::prelude::*;
6use polars_core::{POOL, config};
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::select::select_unpredictable;
10use rayon::prelude::*;
11
12use super::CsvParseOptions;
13use super::buffer::Buffer;
14use super::options::{CommentPrefix, NullValuesCompiled};
15use super::splitfields::SplitFields;
16use super::utils::get_file_chunks;
17use crate::path_utils::is_cloud_url;
18use crate::utils::compression::maybe_decompress_bytes;
19
20pub fn count_rows(
23 path: &Path,
24 separator: u8,
25 quote_char: Option<u8>,
26 comment_prefix: Option<&CommentPrefix>,
27 eol_char: u8,
28 has_header: bool,
29) -> PolarsResult<usize> {
30 let file = if is_cloud_url(path) || config::force_async() {
31 feature_gated!("cloud", {
32 crate::file_cache::FILE_CACHE
33 .get_entry(path.to_str().unwrap())
34 .unwrap()
36 .try_open_assume_latest()?
37 })
38 } else {
39 polars_utils::open_file(path)?
40 };
41
42 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
43 let owned = &mut vec![];
44 let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
45
46 count_rows_from_slice_par(
47 reader_bytes,
48 separator,
49 quote_char,
50 comment_prefix,
51 eol_char,
52 has_header,
53 )
54}
55
56pub fn count_rows_from_slice_par(
59 mut bytes: &[u8],
60 separator: u8,
61 quote_char: Option<u8>,
62 comment_prefix: Option<&CommentPrefix>,
63 eol_char: u8,
64 has_header: bool,
65) -> PolarsResult<usize> {
66 for _ in 0..bytes.len() {
67 if bytes[0] != eol_char {
68 break;
69 }
70
71 bytes = &bytes[1..];
72 }
73
74 const MIN_ROWS_PER_THREAD: usize = 1024;
75 let max_threads = POOL.current_num_threads();
76
77 let n_threads = get_line_stats(
79 bytes,
80 MIN_ROWS_PER_THREAD,
81 eol_char,
82 None,
83 separator,
84 quote_char,
85 )
86 .map(|(mean, std)| {
87 let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
88 (n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
89 })
90 .unwrap_or(1);
91
92 if n_threads == 1 {
93 return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, has_header);
94 }
95
96 let file_chunks: Vec<(usize, usize)> =
97 get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);
98
99 let iter = file_chunks.into_par_iter().map(|(start, stop)| {
100 let bytes = &bytes[start..stop];
101
102 if comment_prefix.is_some() {
103 SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
104 .filter(|line| !is_comment_line(line, comment_prefix))
105 .count()
106 } else {
107 CountLines::new(quote_char, eol_char).count(bytes).0
108 }
109 });
110
111 let n: usize = POOL.install(|| iter.sum());
112
113 Ok(n - (has_header as usize))
114}
115
116pub fn count_rows_from_slice(
118 mut bytes: &[u8],
119 quote_char: Option<u8>,
120 comment_prefix: Option<&CommentPrefix>,
121 eol_char: u8,
122 has_header: bool,
123) -> PolarsResult<usize> {
124 for _ in 0..bytes.len() {
125 if bytes[0] != eol_char {
126 break;
127 }
128
129 bytes = &bytes[1..];
130 }
131
132 let n = if comment_prefix.is_some() {
133 SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
134 .filter(|line| !is_comment_line(line, comment_prefix))
135 .count()
136 } else {
137 CountLines::new(quote_char, eol_char).count(bytes).0
138 };
139
140 Ok(n - (has_header as usize))
141}
142
143pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
146 if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
147 &input[3..]
148 } else {
149 input
150 }
151}
152
153#[inline]
157pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
158 match comment_prefix {
159 Some(CommentPrefix::Single(c)) => line.first() == Some(c),
160 Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
161 None => false,
162 }
163}
164
165pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
168 let pos = memchr::memchr(eol_char, input)? + 1;
169 if input.len() - pos == 0 {
170 return None;
171 }
172 Some(pos)
173}
174
175pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
176 for _ in 0..skip {
177 if let Some(pos) = next_line_position_naive(input, eol_char) {
178 input = &input[pos..];
179 } else {
180 return input;
181 }
182 }
183 input
184}
185
186pub(super) fn next_line_position(
188 mut input: &[u8],
189 mut expected_fields: Option<usize>,
190 separator: u8,
191 quote_char: Option<u8>,
192 eol_char: u8,
193) -> Option<usize> {
194 fn accept_line(
195 line: &[u8],
196 expected_fields: usize,
197 separator: u8,
198 eol_char: u8,
199 quote_char: Option<u8>,
200 ) -> bool {
201 let mut count = 0usize;
202 for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
203 if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
204 return false;
205 }
206 count += 1;
207 }
208
209 expected_fields.wrapping_sub(count) <= 1
219 }
220
221 let mut rejected_line_groups = 0u8;
224
225 let mut total_pos = 0;
226 if input.is_empty() {
227 return None;
228 }
229 let mut lines_checked = 0u8;
230 loop {
231 if rejected_line_groups >= 3 {
232 return None;
233 }
234 lines_checked = lines_checked.wrapping_add(1);
235 if lines_checked == u8::MAX {
239 if let Some(ef) = expected_fields {
240 expected_fields = Some(ef.saturating_sub(1))
241 }
242 };
243 let pos = memchr::memchr(eol_char, input)? + 1;
244 if input.len() - pos == 0 {
245 return None;
246 }
247 debug_assert!(pos <= input.len());
248 let new_input = unsafe { input.get_unchecked(pos..) };
249 let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
250 let line = lines.next();
251
252 match (line, expected_fields) {
253 (Some(line), Some(expected_fields)) => {
255 if accept_line(line, expected_fields, separator, eol_char, quote_char) {
256 let mut valid = true;
257 for line in lines.take(2) {
258 if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
259 valid = false;
260 break;
261 }
262 }
263 if valid {
264 return Some(total_pos + pos);
265 } else {
266 rejected_line_groups += 1;
267 }
268 } else {
269 debug_assert!(pos < input.len());
270 unsafe {
271 input = input.get_unchecked(pos + 1..);
272 }
273 total_pos += pos + 1;
274 }
275 },
276 (Some(_), None) => return Some(total_pos + pos),
278 _ => return None,
280 }
281 }
282}
283
284pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
285 b == eol_char || b == b'\r'
286}
287
288pub(super) fn is_whitespace(b: u8) -> bool {
289 b == b' ' || b == b'\t'
290}
291
292#[inline]
293fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
294where
295 F: Fn(u8) -> bool,
296{
297 if input.is_empty() {
298 return input;
299 }
300
301 let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
302 &input[read..]
303}
304
305#[inline]
311pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
312 skip_condition(input, is_whitespace)
313}
314
315#[inline]
316pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
317 skip_condition(input, |b| is_line_ending(b, eol_char))
318}
319
320pub(super) fn get_line_stats(
322 bytes: &[u8],
323 n_lines: usize,
324 eol_char: u8,
325 expected_fields: Option<usize>,
326 separator: u8,
327 quote_char: Option<u8>,
328) -> Option<(f32, f32)> {
329 let mut lengths = Vec::with_capacity(n_lines);
330
331 let mut bytes_trunc;
332 let n_lines_per_iter = n_lines / 2;
333
334 let mut n_read = 0;
335
336 for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
338 bytes_trunc = &bytes[offset..];
339 let pos = next_line_position(
340 bytes_trunc,
341 expected_fields,
342 separator,
343 quote_char,
344 eol_char,
345 )?;
346 bytes_trunc = &bytes_trunc[pos + 1..];
347
348 for _ in offset..(offset + n_lines_per_iter) {
349 let pos = next_line_position_naive(bytes_trunc, eol_char)? + 1;
350 n_read += pos;
351 lengths.push(pos);
352 bytes_trunc = &bytes_trunc[pos..];
353 }
354 }
355
356 let n_samples = lengths.len();
357
358 let mean = (n_read as f32) / (n_samples as f32);
359 let mut std = 0.0;
360 for &len in lengths.iter() {
361 std += (len as f32 - mean).pow(2.0)
362 }
363 std = (std / n_samples as f32).sqrt();
364 Some((mean, std))
365}
366
367pub(super) struct SplitLines<'a> {
377 v: &'a [u8],
378 quote_char: u8,
379 eol_char: u8,
380 #[cfg(feature = "simd")]
381 simd_eol_char: SimdVec,
382 #[cfg(feature = "simd")]
383 simd_quote_char: SimdVec,
384 #[cfg(feature = "simd")]
385 previous_valid_eols: u64,
386 total_index: usize,
387 quoting: bool,
388 comment_prefix: Option<&'a CommentPrefix>,
389}
390
391#[cfg(feature = "simd")]
392const SIMD_SIZE: usize = 64;
393#[cfg(feature = "simd")]
394use std::simd::prelude::*;
395
396#[cfg(feature = "simd")]
397use polars_utils::clmul::prefix_xorsum_inclusive;
398
399#[cfg(feature = "simd")]
400type SimdVec = u8x64;
401
402impl<'a> SplitLines<'a> {
403 pub(super) fn new(
404 slice: &'a [u8],
405 quote_char: Option<u8>,
406 eol_char: u8,
407 comment_prefix: Option<&'a CommentPrefix>,
408 ) -> Self {
409 let quoting = quote_char.is_some();
410 let quote_char = quote_char.unwrap_or(b'\"');
411 #[cfg(feature = "simd")]
412 let simd_eol_char = SimdVec::splat(eol_char);
413 #[cfg(feature = "simd")]
414 let simd_quote_char = SimdVec::splat(quote_char);
415 Self {
416 v: slice,
417 quote_char,
418 eol_char,
419 #[cfg(feature = "simd")]
420 simd_eol_char,
421 #[cfg(feature = "simd")]
422 simd_quote_char,
423 #[cfg(feature = "simd")]
424 previous_valid_eols: 0,
425 total_index: 0,
426 quoting,
427 comment_prefix,
428 }
429 }
430}
431
432impl<'a> SplitLines<'a> {
433 fn next_scalar(&mut self) -> Option<&'a [u8]> {
435 if self.v.is_empty() {
436 return None;
437 }
438 if is_comment_line(self.v, self.comment_prefix) {
439 return self.next_comment_line();
440 }
441 {
442 let mut pos = 0u32;
443 let mut iter = self.v.iter();
444 let mut in_field = false;
445 loop {
446 match iter.next() {
447 Some(&c) => {
448 pos += 1;
449
450 if self.quoting && c == self.quote_char {
451 in_field = !in_field;
455 }
456 else if c == self.eol_char && !in_field {
458 break;
459 }
460 },
461 None => {
462 let remainder = self.v;
463 self.v = &[];
464 return Some(remainder);
465 },
466 }
467 }
468
469 unsafe {
470 debug_assert!((pos as usize) <= self.v.len());
471
472 let ret = Some(
474 self.v
475 .get_unchecked(..(self.total_index + pos as usize - 1)),
476 );
477 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
479 ret
480 }
481 }
482 }
483 fn next_comment_line(&mut self) -> Option<&'a [u8]> {
484 if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
485 unsafe {
486 let ret = Some(self.v.get_unchecked(..(pos - 1)));
488 self.v = self.v.get_unchecked(pos..);
490 ret
491 }
492 } else {
493 let remainder = self.v;
494 self.v = &[];
495 Some(remainder)
496 }
497 }
498}
499
500impl<'a> Iterator for SplitLines<'a> {
501 type Item = &'a [u8];
502
503 #[inline]
504 #[cfg(not(feature = "simd"))]
505 fn next(&mut self) -> Option<&'a [u8]> {
506 self.next_scalar()
507 }
508
509 #[inline]
510 #[cfg(feature = "simd")]
511 fn next(&mut self) -> Option<&'a [u8]> {
512 if self.previous_valid_eols != 0 {
514 let pos = self.previous_valid_eols.trailing_zeros() as usize;
515 self.previous_valid_eols >>= (pos + 1) as u64;
516
517 unsafe {
518 debug_assert!((pos) <= self.v.len());
519
520 let ret = Some(self.v.get_unchecked(..pos));
522 self.v = self.v.get_unchecked(pos + 1..);
524 return ret;
525 }
526 }
527 if self.v.is_empty() {
528 return None;
529 }
530 if self.comment_prefix.is_some() {
531 return self.next_scalar();
532 }
533
534 self.total_index = 0;
535 let mut not_in_field_previous_iter = true;
536
537 loop {
538 let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
539 if bytes.len() > SIMD_SIZE {
540 let lane: [u8; SIMD_SIZE] = unsafe {
541 bytes
542 .get_unchecked(0..SIMD_SIZE)
543 .try_into()
544 .unwrap_unchecked()
545 };
546 let simd_bytes = SimdVec::from(lane);
547 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
548
549 let valid_eols = if self.quoting {
550 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
551 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
552
553 if not_in_field_previous_iter {
554 not_in_quote_field = !not_in_quote_field;
555 }
556 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
557 eol_mask & not_in_quote_field
558 } else {
559 eol_mask
560 };
561
562 if valid_eols != 0 {
563 let pos = valid_eols.trailing_zeros() as usize;
564 if pos == SIMD_SIZE - 1 {
565 self.previous_valid_eols = 0;
566 } else {
567 self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
568 }
569
570 unsafe {
571 let pos = self.total_index + pos;
572 debug_assert!((pos) <= self.v.len());
573
574 let ret = Some(self.v.get_unchecked(..pos));
576 self.v = self.v.get_unchecked(pos + 1..);
578 return ret;
579 }
580 } else {
581 self.total_index += SIMD_SIZE;
582 }
583 } else {
584 let mut in_field = !not_in_field_previous_iter;
586 let mut pos = 0u32;
587 let mut iter = bytes.iter();
588 loop {
589 match iter.next() {
590 Some(&c) => {
591 pos += 1;
592
593 if self.quoting && c == self.quote_char {
594 in_field = !in_field;
598 }
599 else if c == self.eol_char && !in_field {
601 break;
602 }
603 },
604 None => {
605 let remainder = self.v;
606 self.v = &[];
607 return Some(remainder);
608 },
609 }
610 }
611
612 unsafe {
613 debug_assert!((pos as usize) <= self.v.len());
614
615 let ret = Some(
617 self.v
618 .get_unchecked(..(self.total_index + pos as usize - 1)),
619 );
620 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
622 return ret;
623 }
624 }
625 }
626 }
627}
628
629pub struct CountLines {
630 quote_char: u8,
631 eol_char: u8,
632 #[cfg(feature = "simd")]
633 simd_eol_char: SimdVec,
634 #[cfg(feature = "simd")]
635 simd_quote_char: SimdVec,
636 quoting: bool,
637}
638
639#[derive(Copy, Clone, Debug)]
640pub struct LineStats {
641 newline_count: usize,
642 last_newline_offset: usize,
643 end_inside_string: bool,
644}
645
646impl CountLines {
647 pub fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
648 let quoting = quote_char.is_some();
649 let quote_char = quote_char.unwrap_or(b'\"');
650 #[cfg(feature = "simd")]
651 let simd_eol_char = SimdVec::splat(eol_char);
652 #[cfg(feature = "simd")]
653 let simd_quote_char = SimdVec::splat(quote_char);
654 Self {
655 quote_char,
656 eol_char,
657 #[cfg(feature = "simd")]
658 simd_eol_char,
659 #[cfg(feature = "simd")]
660 simd_quote_char,
661 quoting,
662 }
663 }
664
665 pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
671 let mut scan_offset = 0;
672 let mut states = [
673 LineStats {
674 newline_count: 0,
675 last_newline_offset: 0,
676 end_inside_string: false,
677 },
678 LineStats {
679 newline_count: 0,
680 last_newline_offset: 0,
681 end_inside_string: false,
682 },
683 ];
684
685 #[allow(unused_assignments)]
687 let mut global_quote_parity = false;
688
689 #[cfg(feature = "simd")]
690 {
691 let mut global_quote_parity_mask = 0;
693 while scan_offset + 64 <= bytes.len() {
694 let block: [u8; 64] = unsafe {
695 bytes
696 .get_unchecked(scan_offset..scan_offset + 64)
697 .try_into()
698 .unwrap_unchecked()
699 };
700 let simd_bytes = SimdVec::from(block);
701 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
702 if self.quoting {
703 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
704 let quote_parity =
705 prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
706 global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
707
708 let start_outside_string_eol_mask = eol_mask & !quote_parity;
709 states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
710 states[0].last_newline_offset = select_unpredictable(
711 start_outside_string_eol_mask != 0,
712 (scan_offset + 63)
713 .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
714 states[0].last_newline_offset,
715 );
716
717 let start_inside_string_eol_mask = eol_mask & quote_parity;
718 states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
719 states[1].last_newline_offset = select_unpredictable(
720 start_inside_string_eol_mask != 0,
721 (scan_offset + 63)
722 .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
723 states[1].last_newline_offset,
724 );
725 } else {
726 states[0].newline_count += eol_mask.count_ones() as usize;
727 states[0].last_newline_offset = select_unpredictable(
728 eol_mask != 0,
729 (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
730 states[0].last_newline_offset,
731 );
732 }
733
734 scan_offset += 64;
735 }
736
737 global_quote_parity = global_quote_parity_mask > 0;
738 }
739
740 while scan_offset < bytes.len() {
741 let c = unsafe { *bytes.get_unchecked(scan_offset) };
742 global_quote_parity ^= (c == self.quote_char) & self.quoting;
743
744 let state = &mut states[global_quote_parity as usize];
745 state.newline_count += (c == self.eol_char) as usize;
746 state.last_newline_offset =
747 select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
748
749 scan_offset += 1;
750 }
751
752 states[0].end_inside_string = global_quote_parity;
753 states[1].end_inside_string = !global_quote_parity;
754 states
755 }
756
757 pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
758 loop {
759 let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
760
761 let (count, offset) = self.count(b);
762
763 if count > 0 || b.len() == bytes.len() {
764 return (count, offset);
765 }
766
767 *chunk_size *= 2;
768 }
769 }
770
771 #[cfg(feature = "simd")]
773 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
774 let mut total_idx = 0;
775 let original_bytes = bytes;
776 let mut count = 0;
777 let mut position = 0;
778 let mut not_in_field_previous_iter = true;
779
780 loop {
781 let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
782
783 if bytes.len() > SIMD_SIZE {
784 let lane: [u8; SIMD_SIZE] = unsafe {
785 bytes
786 .get_unchecked(0..SIMD_SIZE)
787 .try_into()
788 .unwrap_unchecked()
789 };
790 let simd_bytes = SimdVec::from(lane);
791 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
792
793 let valid_eols = if self.quoting {
794 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
795 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
796
797 if not_in_field_previous_iter {
798 not_in_quote_field = !not_in_quote_field;
799 }
800 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
801 eol_mask & not_in_quote_field
802 } else {
803 eol_mask
804 };
805
806 if valid_eols != 0 {
807 count += valid_eols.count_ones() as usize;
808 position = total_idx + 63 - valid_eols.leading_zeros() as usize;
809 debug_assert_eq!(original_bytes[position], self.eol_char)
810 }
811 total_idx += SIMD_SIZE;
812 } else if bytes.is_empty() {
813 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
814 return (count, position);
815 } else {
816 let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
817
818 let (count, position) = if c > 0 {
819 (count + c, total_idx + o)
820 } else {
821 (count, position)
822 };
823 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
824
825 return (count, position);
826 }
827 }
828 }
829
830 #[cfg(not(feature = "simd"))]
831 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
832 self.count_no_simd(bytes, false)
833 }
834
835 fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
836 let iter = bytes.iter();
837 let mut in_field = in_field;
838 let mut count = 0;
839 let mut position = 0;
840
841 for b in iter {
842 let c = *b;
843 if self.quoting && c == self.quote_char {
844 in_field = !in_field;
848 }
849 else if c == self.eol_char && !in_field {
851 position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
852 count += 1;
853 }
854 }
855 debug_assert!(count == 0 || bytes[position] == self.eol_char);
856
857 (count, position)
858 }
859}
860
861#[inline]
862fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
863 let mut in_field = false;
864
865 let mut idx = 0u32;
866 #[allow(clippy::explicit_counter_loop)]
868 for &c in bytes.iter() {
869 if c == quote_char {
870 in_field = !in_field;
874 }
875
876 if !in_field && c == needle {
877 return Some(idx as usize);
878 }
879 idx += 1;
880 }
881 None
882}
883
884#[inline]
885pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
886 let pos = match quote {
887 Some(quote) => find_quoted(bytes, quote, eol_char),
888 None => bytes.iter().position(|x| *x == eol_char),
889 };
890 match pos {
891 None => &[],
892 Some(pos) => &bytes[pos + 1..],
893 }
894}
895
896#[inline]
897pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
898 if let Some(pos) = next_line_position_naive(input, eol_char) {
899 unsafe { input.get_unchecked(pos..) }
900 } else {
901 &[]
902 }
903}
904
905#[allow(clippy::too_many_arguments)]
915pub(super) fn parse_lines(
916 mut bytes: &[u8],
917 parse_options: &CsvParseOptions,
918 offset: usize,
919 ignore_errors: bool,
920 null_values: Option<&NullValuesCompiled>,
921 projection: &[usize],
922 buffers: &mut [Buffer],
923 n_lines: usize,
924 schema_len: usize,
926 schema: &Schema,
927) -> PolarsResult<usize> {
928 assert!(
929 !projection.is_empty(),
930 "at least one column should be projected"
931 );
932 let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
933 if projection.len() != schema_len {
937 truncate_ragged_lines = true
938 }
939
940 let start = bytes.as_ptr() as usize;
942 let original_bytes_len = bytes.len();
943 let n_lines = n_lines as u32;
944
945 let mut line_count = 0u32;
946 loop {
947 if line_count > n_lines {
948 let end = bytes.as_ptr() as usize;
949 return Ok(end - start);
950 }
951
952 if bytes.is_empty() {
953 return Ok(original_bytes_len);
954 } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
955 let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
957 bytes = bytes_rem;
958 continue;
959 }
960
961 let mut projection_iter = projection.iter().copied();
965 let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
966 let mut processed_fields = 0;
967
968 let mut iter = SplitFields::new(
969 bytes,
970 parse_options.separator,
971 parse_options.quote_char,
972 parse_options.eol_char,
973 );
974 let mut idx = 0u32;
975 let mut read_sol = 0;
976 loop {
977 match iter.next() {
978 None => {
980 bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
981 break;
982 },
983 Some((mut field, needs_escaping)) => {
984 let field_len = field.len();
985
986 read_sol += field_len + 1;
988
989 if idx == next_projected as u32 {
990 unsafe {
993 if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
994 field = field.get_unchecked(..field_len - 1);
995 }
996 }
997
998 debug_assert!(processed_fields < buffers.len());
999 let buf = unsafe {
1000 buffers.get_unchecked_mut(processed_fields)
1002 };
1003 let mut add_null = false;
1004
1005 if let Some(null_values) = null_values {
1007 let field = if needs_escaping && !field.is_empty() {
1008 unsafe { field.get_unchecked(1..field.len() - 1) }
1009 } else {
1010 field
1011 };
1012
1013 add_null = unsafe { null_values.is_null(field, idx as usize) }
1016 }
1017 if add_null {
1018 buf.add_null(!parse_options.missing_is_null && field.is_empty())
1019 } else {
1020 buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1021 .map_err(|e| {
1022 let bytes_offset = offset + field.as_ptr() as usize - start;
1023 let unparsable = String::from_utf8_lossy(field);
1024 let column_name = schema.get_at_index(idx as usize).unwrap().0;
1025 polars_err!(
1026 ComputeError:
1027 "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1028 The current offset in the file is {} bytes.\n\
1029 \n\
1030 You might want to try:\n\
1031 - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1032 - specifying correct dtype with the `schema_overrides` argument\n\
1033 - setting `ignore_errors` to `True`,\n\
1034 - adding `{}` to the `null_values` list.\n\n\
1035 Original error: ```{}```",
1036 &unparsable,
1037 buf.dtype(),
1038 column_name,
1039 idx + 1,
1040 bytes_offset,
1041 &unparsable,
1042 e
1043 )
1044 })?;
1045 }
1046 processed_fields += 1;
1047
1048 match projection_iter.next() {
1050 Some(p) => next_projected = p,
1051 None => {
1052 if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1053 bytes = &bytes[read_sol..];
1054 } else {
1055 if !truncate_ragged_lines && read_sol < bytes.len() {
1056 polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1057
1058Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1059 }
1060 let bytes_rem = skip_this_line(
1061 unsafe { bytes.get_unchecked(read_sol - 1..) },
1062 parse_options.quote_char,
1063 parse_options.eol_char,
1064 );
1065 bytes = bytes_rem;
1066 }
1067 break;
1068 },
1069 }
1070 }
1071 idx += 1;
1072 },
1073 }
1074 }
1075
1076 while processed_fields < projection.len() {
1080 debug_assert!(processed_fields < buffers.len());
1081 let buf = unsafe {
1082 buffers.get_unchecked_mut(processed_fields)
1084 };
1085 buf.add_null(!parse_options.missing_is_null);
1086 processed_fields += 1;
1087 }
1088 line_count += 1;
1089 }
1090}
1091
1092#[cfg(test)]
1093mod test {
1094 use super::SplitLines;
1095
1096 #[test]
1097 fn test_splitlines() {
1098 let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1099 let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1100 assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1101 assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1102 assert_eq!(lines.next(), None);
1103
1104 let input2 = "1,'foo\n'\n2,'foo\n'\n";
1105 let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1106 assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1107 assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1108 assert_eq!(lines2.next(), None);
1109 }
1110}