1use std::cmp;
2use std::iter::Iterator;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use polars_buffer::Buffer;
7use polars_core::prelude::Schema;
8use polars_core::schema::SchemaRef;
9use polars_error::{PolarsResult, polars_bail, polars_ensure};
10
11use crate::csv::read::schema_inference::infer_file_schema_impl;
12use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
13use crate::prelude::{CsvParseOptions, CsvReadOptions};
14use crate::utils::compression::{ByteSourceReader, CompressedReader};
15use crate::utils::stream_buf_reader::ReaderSource;
16
17pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
18
19#[inline(never)]
32pub fn read_until_start_and_infer_schema_from_compressed_reader(
33 options: &CsvReadOptions,
34 projected_schema: Option<SchemaRef>,
35 mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
36 reader: &mut CompressedReader,
37) -> PolarsResult<(Schema, Buffer<u8>)> {
38 const ESTIMATED_BYTES_PER_ROW: usize = 200;
40
41 #[derive(Copy, Clone)]
42 enum State {
43 SkipEmpty,
45 SkipRowsBeforeHeader(usize),
46 SkipHeader(bool),
47 SkipRowsAfterHeader(usize),
48 ContentInspect,
49 InferCollect,
50 Done,
51 }
52
53 polars_ensure!(
54 !(options.skip_lines != 0 && options.skip_rows != 0),
55 InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
56 );
57
58 let prev_leftover = skip_lines_naive_from_compressed_reader(
61 options.parse_options.eol_char,
62 options.skip_lines,
63 options.raise_if_empty,
64 reader,
65 )?;
66
67 let mut state = if options.has_header {
68 State::SkipEmpty
69 } else if options.skip_lines != 0 {
70 State::SkipHeader(false)
73 } else {
74 State::SkipRowsBeforeHeader(options.skip_rows)
75 };
76
77 let comment_prefix = options.parse_options.comment_prefix.as_ref();
78 let infer_schema_length = if options.schema.is_some() {
79 Some(0)
81 } else {
82 options.infer_schema_length
83 };
84
85 let mut header_line = None;
86 let mut content_lines = Vec::with_capacity(infer_schema_length.unwrap_or_else(|| {
87 reader
88 .total_len_estimate()
89 .saturating_div(ESTIMATED_BYTES_PER_ROW)
90 }));
91
92 let initial_read_size = infer_schema_length
98 .map(|isl| {
99 cmp::max(
100 CompressedReader::initial_read_size(),
101 isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
102 )
103 })
104 .unwrap_or(usize::MAX);
105
106 let leftover = for_each_line_from_reader_from_compressed_reader(
107 &options.parse_options,
108 true,
109 prev_leftover,
110 initial_read_size,
111 reader,
112 |mem_slice_line| {
113 let line = &*mem_slice_line;
114
115 let done = loop {
116 match &mut state {
117 State::SkipEmpty => {
118 if line.is_empty() || line == b"\r" {
119 break LineUse::ConsumeDiscard;
120 }
121
122 state = State::SkipRowsBeforeHeader(options.skip_rows);
123 },
124 State::SkipRowsBeforeHeader(remaining) => {
125 let is_comment = is_comment_line(line, comment_prefix);
126
127 if *remaining == 0 && !is_comment {
128 state = State::SkipHeader(false);
129 continue;
130 }
131
132 *remaining -= !is_comment as usize;
133 break LineUse::ConsumeDiscard;
134 },
135 State::SkipHeader(did_skip) => {
136 if !options.has_header || *did_skip {
137 state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
138 continue;
139 }
140
141 header_line = Some(mem_slice_line.clone());
142 *did_skip = true;
143 break LineUse::ConsumeDiscard;
144 },
145 State::SkipRowsAfterHeader(remaining) => {
146 let is_comment = is_comment_line(line, comment_prefix);
147
148 if *remaining == 0 && !is_comment {
149 state = State::ContentInspect;
150 continue;
151 }
152
153 *remaining -= !is_comment as usize;
154 break LineUse::ConsumeDiscard;
155 },
156 State::ContentInspect => {
157 if let Some(func) = &mut inspect_first_content_row_fn {
158 func(line);
159 }
160
161 state = State::InferCollect;
162 },
163 State::InferCollect => {
164 if !is_comment_line(line, comment_prefix) {
165 content_lines.push(mem_slice_line.clone());
166 if content_lines.len() >= infer_schema_length.unwrap_or(usize::MAX) {
167 state = State::Done;
168 continue;
169 }
170 }
171
172 break LineUse::ConsumeKeep;
173 },
174 State::Done => {
175 break LineUse::Done;
176 },
177 }
178 };
179
180 Ok(done)
181 },
182 )?;
183
184 let infer_all_as_str = infer_schema_length == Some(0);
185
186 let inferred_schema = infer_schema(
187 &header_line,
188 &content_lines,
189 infer_all_as_str,
190 options,
191 projected_schema,
192 )?;
193
194 Ok((inferred_schema, leftover))
195}
196
197#[inline(never)]
210pub fn read_until_start_and_infer_schema(
211 options: &CsvReadOptions,
212 projected_schema: Option<SchemaRef>,
213 decompressed_file_size_hint: Option<usize>,
214 mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
215 reader: &mut ByteSourceReader<ReaderSource>,
216) -> PolarsResult<(Schema, Buffer<u8>)> {
217 const ESTIMATED_BYTES_PER_ROW: usize = 200;
219
220 #[derive(Copy, Clone)]
221 enum State {
222 SkipEmpty,
224 SkipRowsBeforeHeader(usize),
225 SkipHeader(bool),
226 SkipRowsAfterHeader(usize),
227 ContentInspect,
228 InferCollect,
229 Done,
230 }
231
232 polars_ensure!(
233 !(options.skip_lines != 0 && options.skip_rows != 0),
234 InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
235 );
236
237 let prev_leftover = skip_lines_naive(
240 options.parse_options.eol_char,
241 options.skip_lines,
242 options.raise_if_empty,
243 decompressed_file_size_hint,
244 reader,
245 )?;
246
247 let mut state = if options.has_header {
248 State::SkipEmpty
249 } else if options.skip_lines != 0 {
250 State::SkipHeader(false)
253 } else {
254 State::SkipRowsBeforeHeader(options.skip_rows)
255 };
256
257 let comment_prefix = options.parse_options.comment_prefix.as_ref();
258 let infer_schema_length = if options.schema.is_some() {
259 Some(0)
261 } else {
262 options.infer_schema_length
263 };
264
265 let mut header_line = None;
266 let mut content_lines = Vec::with_capacity(infer_schema_length.unwrap_or_else(|| {
267 decompressed_file_size_hint
268 .map(|size| size.saturating_div(ESTIMATED_BYTES_PER_ROW))
269 .unwrap_or(100)
270 }));
271
272 let initial_read_size = infer_schema_length
278 .map(|isl| {
279 cmp::max(
280 CompressedReader::initial_read_size(),
281 isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
282 )
283 })
284 .unwrap_or(usize::MAX);
285
286 let leftover = for_each_line_from_reader(
287 &options.parse_options,
288 true,
289 prev_leftover,
290 initial_read_size,
291 decompressed_file_size_hint,
292 reader,
293 |mem_slice_line| {
294 let line = &*mem_slice_line;
295
296 let done = loop {
297 match &mut state {
298 State::SkipEmpty => {
299 if line.is_empty() || line == b"\r" {
300 break LineUse::ConsumeDiscard;
301 }
302
303 state = State::SkipRowsBeforeHeader(options.skip_rows);
304 },
305 State::SkipRowsBeforeHeader(remaining) => {
306 let is_comment = is_comment_line(line, comment_prefix);
307
308 if *remaining == 0 && !is_comment {
309 state = State::SkipHeader(false);
310 continue;
311 }
312
313 *remaining -= !is_comment as usize;
314 break LineUse::ConsumeDiscard;
315 },
316 State::SkipHeader(did_skip) => {
317 if !options.has_header || *did_skip {
318 state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
319 continue;
320 }
321
322 header_line = Some(mem_slice_line.clone());
323 *did_skip = true;
324 break LineUse::ConsumeDiscard;
325 },
326 State::SkipRowsAfterHeader(remaining) => {
327 let is_comment = is_comment_line(line, comment_prefix);
328
329 if *remaining == 0 && !is_comment {
330 state = State::ContentInspect;
331 continue;
332 }
333
334 *remaining -= !is_comment as usize;
335 break LineUse::ConsumeDiscard;
336 },
337 State::ContentInspect => {
338 if let Some(func) = &mut inspect_first_content_row_fn {
339 func(line);
340 }
341
342 state = State::InferCollect;
343 },
344 State::InferCollect => {
345 if !is_comment_line(line, comment_prefix) {
346 content_lines.push(mem_slice_line.clone());
347 if content_lines.len() >= infer_schema_length.unwrap_or(usize::MAX) {
348 state = State::Done;
349 continue;
350 }
351 }
352
353 break LineUse::ConsumeKeep;
354 },
355 State::Done => {
356 break LineUse::Done;
357 },
358 }
359 };
360
361 Ok(done)
362 },
363 )?;
364
365 let infer_all_as_str = infer_schema_length == Some(0);
366
367 let inferred_schema = infer_schema(
368 &header_line,
369 &content_lines,
370 infer_all_as_str,
371 options,
372 projected_schema,
373 )?;
374
375 Ok((inferred_schema, leftover))
376}
377
378enum LineUse {
379 ConsumeDiscard,
380 ConsumeKeep,
381 Done,
382}
383
384fn for_each_line_from_reader_from_compressed_reader(
389 parse_options: &CsvParseOptions,
390 is_file_start: bool,
391 mut prev_leftover: Buffer<u8>,
392 initial_read_size: usize,
393 reader: &mut CompressedReader,
394 mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
395) -> PolarsResult<Buffer<u8>> {
396 let mut is_first_line = is_file_start;
397
398 let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
399 .map(|x| {
400 x.parse::<NonZeroUsize>()
401 .unwrap_or_else(|_| {
402 panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
403 })
404 .get()
405 })
406 .ok();
407
408 let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
409 let mut retain_offset = None;
410
411 loop {
412 let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
413 if slice.is_empty() {
414 return Ok(Buffer::new());
415 }
416
417 if is_first_line {
418 is_first_line = false;
419 const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
420 if slice.get(0..3) == UTF8_BOM_MARKER {
421 slice = slice.sliced(3..);
422 }
423 }
424
425 let line_to_sub_slice = |line: &[u8]| {
426 let start = line.as_ptr() as usize - slice.as_ptr() as usize;
427 slice.clone().sliced(start..(start + line.len()))
428 };
429
430 let effective_slice = if let Some(offset) = retain_offset {
432 slice.clone().sliced(offset..)
433 } else {
434 slice.clone()
435 };
436
437 let mut lines = SplitLines::new(
438 &effective_slice,
439 parse_options.quote_char,
440 parse_options.eol_char,
441 parse_options.comment_prefix.as_ref(),
442 );
443 let Some(mut prev_line) = lines.next() else {
444 read_size = read_size.saturating_mul(2);
445 prev_leftover = slice;
446 continue;
447 };
448
449 let mut should_ret = false;
450
451 for next_line in lines {
454 match line_fn(line_to_sub_slice(prev_line))? {
455 LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
456 LineUse::ConsumeKeep => {
457 if retain_offset.is_none() {
458 let retain_start_offset =
459 prev_line.as_ptr() as usize - slice.as_ptr() as usize;
460 prev_leftover = slice.clone().sliced(retain_start_offset..);
461 retain_offset = Some(0);
462 }
463 },
464 LineUse::Done => {
465 should_ret = true;
466 break;
467 },
468 }
469 prev_line = next_line;
470 }
471
472 let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
473
474 if bytes_read < read_size {
477 match line_fn(line_to_sub_slice(prev_line))? {
478 LineUse::ConsumeDiscard => {
479 debug_assert!(retain_offset.is_none());
480 unconsumed_offset += prev_line.len();
481 if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
482 unconsumed_offset += 1;
483 }
484 },
485 LineUse::ConsumeKeep | LineUse::Done => (),
486 }
487 should_ret = true;
488 }
489
490 if let Some(offset) = &mut retain_offset {
491 if *offset == 0 {
492 *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
495 } else {
496 prev_leftover = slice;
497 *offset += unconsumed_offset;
498 }
499 } else {
500 prev_leftover = slice.sliced(unconsumed_offset..);
503 }
504
505 if should_ret {
506 return Ok(prev_leftover);
507 }
508
509 if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {
510 read_size *= 4;
511 }
512 }
513}
514
515fn for_each_line_from_reader(
520 parse_options: &CsvParseOptions,
521 is_file_start: bool,
522 mut prev_leftover: Buffer<u8>,
523 initial_read_size: usize,
524 decompressed_file_size_hint: Option<usize>,
525 reader: &mut ByteSourceReader<ReaderSource>,
526 mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
527) -> PolarsResult<Buffer<u8>> {
528 let mut is_first_line = is_file_start;
529
530 let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
531 .map(|x| {
532 x.parse::<NonZeroUsize>()
533 .unwrap_or_else(|_| {
534 panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
535 })
536 .get()
537 })
538 .ok();
539
540 let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
541 let mut retain_offset = None;
542
543 loop {
544 let (mut slice, bytes_read) =
545 reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
546 if slice.is_empty() {
547 return Ok(Buffer::new());
548 }
549
550 if is_first_line {
551 is_first_line = false;
552 const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
553 if slice.get(0..3) == UTF8_BOM_MARKER {
554 slice = slice.sliced(3..);
555 }
556 }
557
558 let line_to_sub_slice = |line: &[u8]| {
559 let start = line.as_ptr() as usize - slice.as_ptr() as usize;
560 slice.clone().sliced(start..(start + line.len()))
561 };
562
563 let effective_slice = if let Some(offset) = retain_offset {
565 slice.clone().sliced(offset..)
566 } else {
567 slice.clone()
568 };
569
570 let mut lines = SplitLines::new(
571 &effective_slice,
572 parse_options.quote_char,
573 parse_options.eol_char,
574 parse_options.comment_prefix.as_ref(),
575 );
576 let Some(mut prev_line) = lines.next() else {
577 read_size = read_size.saturating_mul(2);
578 prev_leftover = slice;
579 continue;
580 };
581
582 let mut should_ret = false;
583
584 for next_line in lines {
587 match line_fn(line_to_sub_slice(prev_line))? {
588 LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
589 LineUse::ConsumeKeep => {
590 if retain_offset.is_none() {
591 let retain_start_offset =
592 prev_line.as_ptr() as usize - slice.as_ptr() as usize;
593 prev_leftover = slice.clone().sliced(retain_start_offset..);
594 retain_offset = Some(0);
595 }
596 },
597 LineUse::Done => {
598 should_ret = true;
599 break;
600 },
601 }
602 prev_line = next_line;
603 }
604
605 let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
606
607 if bytes_read < read_size {
610 match line_fn(line_to_sub_slice(prev_line))? {
611 LineUse::ConsumeDiscard => {
612 debug_assert!(retain_offset.is_none());
613 unconsumed_offset += prev_line.len();
614 if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
615 unconsumed_offset += 1;
616 }
617 },
618 LineUse::ConsumeKeep | LineUse::Done => (),
619 }
620 should_ret = true;
621 }
622
623 if let Some(offset) = &mut retain_offset {
624 if *offset == 0 {
625 *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
628 } else {
629 prev_leftover = slice;
630 *offset += unconsumed_offset;
631 }
632 } else {
633 prev_leftover = slice.sliced(unconsumed_offset..);
636 }
637
638 if should_ret {
639 return Ok(prev_leftover);
640 }
641
642 if read_size < ByteSourceReader::<ReaderSource>::ideal_read_size()
643 && fixed_read_size.is_none()
644 {
645 read_size *= 4;
646 }
647 }
648}
649
650fn skip_lines_naive_from_compressed_reader(
651 eol_char: u8,
652 skip_lines: usize,
653 raise_if_empty: bool,
654 reader: &mut CompressedReader,
655) -> PolarsResult<Buffer<u8>> {
656 let mut prev_leftover = Buffer::new();
657
658 if skip_lines == 0 {
659 return Ok(prev_leftover);
660 }
661
662 let mut remaining = skip_lines;
663 let mut read_size = CompressedReader::initial_read_size();
664
665 loop {
666 let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
667 let mut bytes: &[u8] = &slice;
668
669 'inner: loop {
670 let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
671 read_size = read_size.saturating_mul(2);
672 break 'inner;
673 };
674 pos = cmp::min(pos + 1, bytes.len());
675
676 bytes = &bytes[pos..];
677 remaining -= 1;
678
679 if remaining == 0 {
680 let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
681 prev_leftover = slice.sliced(unconsumed_offset..);
682 return Ok(prev_leftover);
683 }
684 }
685
686 if bytes_read == 0 {
687 if raise_if_empty {
688 polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
689 } else {
690 return Ok(Buffer::new());
691 }
692 }
693
694 prev_leftover = Buffer::new();
696
697 if read_size < CompressedReader::ideal_read_size() {
698 read_size *= 4;
699 }
700 }
701}
702
703fn skip_lines_naive(
704 eol_char: u8,
705 skip_lines: usize,
706 raise_if_empty: bool,
707 decompressed_file_size_hint: Option<usize>,
708 reader: &mut ByteSourceReader<ReaderSource>,
709) -> PolarsResult<Buffer<u8>> {
710 let mut prev_leftover = Buffer::new();
711
712 if skip_lines == 0 {
713 return Ok(prev_leftover);
714 }
715
716 let mut remaining = skip_lines;
717 let mut read_size = CompressedReader::initial_read_size();
718
719 loop {
720 let (slice, bytes_read) =
721 reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
722 let mut bytes: &[u8] = &slice;
723
724 'inner: loop {
725 let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
726 read_size = read_size.saturating_mul(2);
727 break 'inner;
728 };
729 pos = cmp::min(pos + 1, bytes.len());
730
731 bytes = &bytes[pos..];
732 remaining -= 1;
733
734 if remaining == 0 {
735 let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
736 prev_leftover = slice.sliced(unconsumed_offset..);
737 return Ok(prev_leftover);
738 }
739 }
740
741 if bytes_read == 0 {
742 if raise_if_empty {
743 polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
744 } else {
745 return Ok(Buffer::new());
746 }
747 }
748
749 prev_leftover = Buffer::new();
751
752 if read_size < CompressedReader::ideal_read_size() {
753 read_size *= 4;
754 }
755 }
756}
757
758fn infer_schema(
759 header_line: &Option<Buffer<u8>>,
760 content_lines: &[Buffer<u8>],
761 infer_all_as_str: bool,
762 options: &CsvReadOptions,
763 projected_schema: Option<SchemaRef>,
764) -> PolarsResult<Schema> {
765 let has_no_inference_data = if options.has_header {
766 header_line.is_none()
767 } else {
768 content_lines.is_empty()
769 };
770
771 if options.raise_if_empty && has_no_inference_data {
772 polars_bail!(NoData: "empty CSV");
773 }
774
775 let mut inferred_schema = if has_no_inference_data {
776 Schema::default()
777 } else {
778 infer_file_schema_impl(
779 header_line,
780 content_lines,
781 infer_all_as_str,
782 &options.parse_options,
783 options.schema_overwrite.as_deref(),
784 )
785 };
786
787 if let Some(schema) = &options.schema {
788 if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
792 polars_bail!(
793 SchemaMismatch:
794 "provided schema does not match number of columns in file ({} != {} in file)",
795 schema.len(),
796 inferred_schema.len(),
797 );
798 }
799
800 if options.parse_options.truncate_ragged_lines {
801 inferred_schema = Arc::unwrap_or_clone(schema.clone());
802 } else {
803 inferred_schema = schema
804 .iter_names()
805 .zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
806 .map(|(name, dtype)| (name.clone(), dtype))
807 .collect();
808 }
809 }
810
811 if let Some(dtypes) = options.dtype_overwrite.as_deref() {
812 for (i, dtype) in dtypes.iter().enumerate() {
813 inferred_schema.set_dtype_at_index(i, dtype.clone());
814 }
815 }
816
817 if let Some(projected_schema) = projected_schema {
820 for (name, inferred_dtype) in inferred_schema.iter_mut() {
821 if let Some(projected_dtype) = projected_schema.get(name) {
822 *inferred_dtype = projected_dtype.clone();
823 }
824 }
825 }
826
827 Ok(inferred_schema)
828}