1use std::cmp;
2use std::iter::Iterator;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use polars_buffer::Buffer;
7use polars_core::prelude::Schema;
8use polars_core::schema::SchemaRef;
9use polars_error::{PolarsResult, polars_bail, polars_ensure};
10
11use crate::csv::read::schema_inference::infer_file_schema_impl;
12use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
13use crate::prelude::{CsvParseOptions, CsvReadOptions};
14use crate::utils::compression::{ByteSourceReader, CompressedReader};
15use crate::utils::stream_buf_reader::ReaderSource;
16
17pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
18
19#[inline(never)]
32pub fn read_until_start_and_infer_schema_from_compressed_reader(
33 options: &CsvReadOptions,
34 projected_schema: Option<SchemaRef>,
35 mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
36 reader: &mut CompressedReader,
37) -> PolarsResult<(Schema, Buffer<u8>)> {
38 const ESTIMATED_BYTES_PER_ROW: usize = 200;
40
41 #[derive(Copy, Clone)]
42 enum State {
43 SkipEmpty,
45 SkipRowsBeforeHeader(usize),
46 SkipHeader(bool),
47 SkipRowsAfterHeader(usize),
48 ContentInspect,
49 InferCollect,
50 Done,
51 }
52
53 polars_ensure!(
54 !(options.skip_lines != 0 && options.skip_rows != 0),
55 InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
56 );
57
58 let prev_leftover = skip_lines_naive_from_compressed_reader(
61 options.parse_options.eol_char,
62 options.skip_lines,
63 options.raise_if_empty,
64 reader,
65 )?;
66
67 let mut state = if options.has_header {
68 State::SkipEmpty
69 } else if options.skip_lines != 0 {
70 State::SkipHeader(false)
73 } else {
74 State::SkipRowsBeforeHeader(options.skip_rows)
75 };
76
77 let comment_prefix = options.parse_options.comment_prefix.as_ref();
78 let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
79
80 let mut header_line = None;
81 let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or_else(|| {
82 reader
83 .total_len_estimate()
84 .saturating_div(ESTIMATED_BYTES_PER_ROW)
85 }));
86
87 let initial_read_size = options
93 .infer_schema_length
94 .map(|isl| {
95 cmp::max(
96 CompressedReader::initial_read_size(),
97 isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
98 )
99 })
100 .unwrap_or(usize::MAX);
101
102 let leftover = for_each_line_from_reader_from_compressed_reader(
103 &options.parse_options,
104 true,
105 prev_leftover,
106 initial_read_size,
107 reader,
108 |mem_slice_line| {
109 let line = &*mem_slice_line;
110
111 let done = loop {
112 match &mut state {
113 State::SkipEmpty => {
114 if line.is_empty() || line == b"\r" {
115 break LineUse::ConsumeDiscard;
116 }
117
118 state = State::SkipRowsBeforeHeader(options.skip_rows);
119 },
120 State::SkipRowsBeforeHeader(remaining) => {
121 let is_comment = is_comment_line(line, comment_prefix);
122
123 if *remaining == 0 && !is_comment {
124 state = State::SkipHeader(false);
125 continue;
126 }
127
128 *remaining -= !is_comment as usize;
129 break LineUse::ConsumeDiscard;
130 },
131 State::SkipHeader(did_skip) => {
132 if !options.has_header || *did_skip {
133 state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
134 continue;
135 }
136
137 header_line = Some(mem_slice_line.clone());
138 *did_skip = true;
139 break LineUse::ConsumeDiscard;
140 },
141 State::SkipRowsAfterHeader(remaining) => {
142 let is_comment = is_comment_line(line, comment_prefix);
143
144 if *remaining == 0 && !is_comment {
145 state = State::ContentInspect;
146 continue;
147 }
148
149 *remaining -= !is_comment as usize;
150 break LineUse::ConsumeDiscard;
151 },
152 State::ContentInspect => {
153 if let Some(func) = &mut inspect_first_content_row_fn {
154 func(line);
155 }
156
157 state = State::InferCollect;
158 },
159 State::InferCollect => {
160 if !is_comment_line(line, comment_prefix) {
161 content_lines.push(mem_slice_line.clone());
162 if content_lines.len() >= infer_schema_length {
163 state = State::Done;
164 continue;
165 }
166 }
167
168 break LineUse::ConsumeKeep;
169 },
170 State::Done => {
171 break LineUse::Done;
172 },
173 }
174 };
175
176 Ok(done)
177 },
178 )?;
179
180 let infer_all_as_str = infer_schema_length == 0;
181
182 let inferred_schema = infer_schema(
183 &header_line,
184 &content_lines,
185 infer_all_as_str,
186 options,
187 projected_schema,
188 )?;
189
190 Ok((inferred_schema, leftover))
191}
192
193#[inline(never)]
206pub fn read_until_start_and_infer_schema(
207 options: &CsvReadOptions,
208 projected_schema: Option<SchemaRef>,
209 decompressed_file_size_hint: Option<usize>,
210 mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
211 reader: &mut ByteSourceReader<ReaderSource>,
212) -> PolarsResult<(Schema, Buffer<u8>)> {
213 const ESTIMATED_BYTES_PER_ROW: usize = 200;
215
216 #[derive(Copy, Clone)]
217 enum State {
218 SkipEmpty,
220 SkipRowsBeforeHeader(usize),
221 SkipHeader(bool),
222 SkipRowsAfterHeader(usize),
223 ContentInspect,
224 InferCollect,
225 Done,
226 }
227
228 polars_ensure!(
229 !(options.skip_lines != 0 && options.skip_rows != 0),
230 InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
231 );
232
233 let prev_leftover = skip_lines_naive(
236 options.parse_options.eol_char,
237 options.skip_lines,
238 options.raise_if_empty,
239 decompressed_file_size_hint,
240 reader,
241 )?;
242
243 let mut state = if options.has_header {
244 State::SkipEmpty
245 } else if options.skip_lines != 0 {
246 State::SkipHeader(false)
249 } else {
250 State::SkipRowsBeforeHeader(options.skip_rows)
251 };
252
253 let comment_prefix = options.parse_options.comment_prefix.as_ref();
254 let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
255
256 let mut header_line = None;
257 let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or_else(|| {
258 decompressed_file_size_hint
259 .map(|size| size.saturating_div(ESTIMATED_BYTES_PER_ROW))
260 .unwrap_or(100)
261 }));
262
263 let initial_read_size = options
269 .infer_schema_length
270 .map(|isl| {
271 cmp::max(
272 CompressedReader::initial_read_size(),
273 isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
274 )
275 })
276 .unwrap_or(usize::MAX);
277
278 let leftover = for_each_line_from_reader(
279 &options.parse_options,
280 true,
281 prev_leftover,
282 initial_read_size,
283 decompressed_file_size_hint,
284 reader,
285 |mem_slice_line| {
286 let line = &*mem_slice_line;
287
288 let done = loop {
289 match &mut state {
290 State::SkipEmpty => {
291 if line.is_empty() || line == b"\r" {
292 break LineUse::ConsumeDiscard;
293 }
294
295 state = State::SkipRowsBeforeHeader(options.skip_rows);
296 },
297 State::SkipRowsBeforeHeader(remaining) => {
298 let is_comment = is_comment_line(line, comment_prefix);
299
300 if *remaining == 0 && !is_comment {
301 state = State::SkipHeader(false);
302 continue;
303 }
304
305 *remaining -= !is_comment as usize;
306 break LineUse::ConsumeDiscard;
307 },
308 State::SkipHeader(did_skip) => {
309 if !options.has_header || *did_skip {
310 state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
311 continue;
312 }
313
314 header_line = Some(mem_slice_line.clone());
315 *did_skip = true;
316 break LineUse::ConsumeDiscard;
317 },
318 State::SkipRowsAfterHeader(remaining) => {
319 let is_comment = is_comment_line(line, comment_prefix);
320
321 if *remaining == 0 && !is_comment {
322 state = State::ContentInspect;
323 continue;
324 }
325
326 *remaining -= !is_comment as usize;
327 break LineUse::ConsumeDiscard;
328 },
329 State::ContentInspect => {
330 if let Some(func) = &mut inspect_first_content_row_fn {
331 func(line);
332 }
333
334 state = State::InferCollect;
335 },
336 State::InferCollect => {
337 if !is_comment_line(line, comment_prefix) {
338 content_lines.push(mem_slice_line.clone());
339 if content_lines.len() >= infer_schema_length {
340 state = State::Done;
341 continue;
342 }
343 }
344
345 break LineUse::ConsumeKeep;
346 },
347 State::Done => {
348 break LineUse::Done;
349 },
350 }
351 };
352
353 Ok(done)
354 },
355 )?;
356
357 let infer_all_as_str = infer_schema_length == 0;
358
359 let inferred_schema = infer_schema(
360 &header_line,
361 &content_lines,
362 infer_all_as_str,
363 options,
364 projected_schema,
365 )?;
366
367 Ok((inferred_schema, leftover))
368}
369
370enum LineUse {
371 ConsumeDiscard,
372 ConsumeKeep,
373 Done,
374}
375
376fn for_each_line_from_reader_from_compressed_reader(
381 parse_options: &CsvParseOptions,
382 is_file_start: bool,
383 mut prev_leftover: Buffer<u8>,
384 initial_read_size: usize,
385 reader: &mut CompressedReader,
386 mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
387) -> PolarsResult<Buffer<u8>> {
388 let mut is_first_line = is_file_start;
389
390 let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
391 .map(|x| {
392 x.parse::<NonZeroUsize>()
393 .unwrap_or_else(|_| {
394 panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
395 })
396 .get()
397 })
398 .ok();
399
400 let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
401 let mut retain_offset = None;
402
403 loop {
404 let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
405 if slice.is_empty() {
406 return Ok(Buffer::new());
407 }
408
409 if is_first_line {
410 is_first_line = false;
411 const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
412 if slice.get(0..3) == UTF8_BOM_MARKER {
413 slice = slice.sliced(3..);
414 }
415 }
416
417 let line_to_sub_slice = |line: &[u8]| {
418 let start = line.as_ptr() as usize - slice.as_ptr() as usize;
419 slice.clone().sliced(start..(start + line.len()))
420 };
421
422 let effective_slice = if let Some(offset) = retain_offset {
424 slice.clone().sliced(offset..)
425 } else {
426 slice.clone()
427 };
428
429 let mut lines = SplitLines::new(
430 &effective_slice,
431 parse_options.quote_char,
432 parse_options.eol_char,
433 parse_options.comment_prefix.as_ref(),
434 );
435 let Some(mut prev_line) = lines.next() else {
436 read_size = read_size.saturating_mul(2);
437 prev_leftover = slice;
438 continue;
439 };
440
441 let mut should_ret = false;
442
443 for next_line in lines {
446 match line_fn(line_to_sub_slice(prev_line))? {
447 LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
448 LineUse::ConsumeKeep => {
449 if retain_offset.is_none() {
450 let retain_start_offset =
451 prev_line.as_ptr() as usize - slice.as_ptr() as usize;
452 prev_leftover = slice.clone().sliced(retain_start_offset..);
453 retain_offset = Some(0);
454 }
455 },
456 LineUse::Done => {
457 should_ret = true;
458 break;
459 },
460 }
461 prev_line = next_line;
462 }
463
464 let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
465
466 if bytes_read < read_size {
469 match line_fn(line_to_sub_slice(prev_line))? {
470 LineUse::ConsumeDiscard => {
471 debug_assert!(retain_offset.is_none());
472 unconsumed_offset += prev_line.len();
473 if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
474 unconsumed_offset += 1;
475 }
476 },
477 LineUse::ConsumeKeep | LineUse::Done => (),
478 }
479 should_ret = true;
480 }
481
482 if let Some(offset) = &mut retain_offset {
483 if *offset == 0 {
484 *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
487 } else {
488 prev_leftover = slice;
489 *offset += unconsumed_offset;
490 }
491 } else {
492 prev_leftover = slice.sliced(unconsumed_offset..);
495 }
496
497 if should_ret {
498 return Ok(prev_leftover);
499 }
500
501 if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {
502 read_size *= 4;
503 }
504 }
505}
506
507fn for_each_line_from_reader(
512 parse_options: &CsvParseOptions,
513 is_file_start: bool,
514 mut prev_leftover: Buffer<u8>,
515 initial_read_size: usize,
516 decompressed_file_size_hint: Option<usize>,
517 reader: &mut ByteSourceReader<ReaderSource>,
518 mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
519) -> PolarsResult<Buffer<u8>> {
520 let mut is_first_line = is_file_start;
521
522 let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
523 .map(|x| {
524 x.parse::<NonZeroUsize>()
525 .unwrap_or_else(|_| {
526 panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
527 })
528 .get()
529 })
530 .ok();
531
532 let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
533 let mut retain_offset = None;
534
535 loop {
536 let (mut slice, bytes_read) =
537 reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
538 if slice.is_empty() {
539 return Ok(Buffer::new());
540 }
541
542 if is_first_line {
543 is_first_line = false;
544 const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
545 if slice.get(0..3) == UTF8_BOM_MARKER {
546 slice = slice.sliced(3..);
547 }
548 }
549
550 let line_to_sub_slice = |line: &[u8]| {
551 let start = line.as_ptr() as usize - slice.as_ptr() as usize;
552 slice.clone().sliced(start..(start + line.len()))
553 };
554
555 let effective_slice = if let Some(offset) = retain_offset {
557 slice.clone().sliced(offset..)
558 } else {
559 slice.clone()
560 };
561
562 let mut lines = SplitLines::new(
563 &effective_slice,
564 parse_options.quote_char,
565 parse_options.eol_char,
566 parse_options.comment_prefix.as_ref(),
567 );
568 let Some(mut prev_line) = lines.next() else {
569 read_size = read_size.saturating_mul(2);
570 prev_leftover = slice;
571 continue;
572 };
573
574 let mut should_ret = false;
575
576 for next_line in lines {
579 match line_fn(line_to_sub_slice(prev_line))? {
580 LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
581 LineUse::ConsumeKeep => {
582 if retain_offset.is_none() {
583 let retain_start_offset =
584 prev_line.as_ptr() as usize - slice.as_ptr() as usize;
585 prev_leftover = slice.clone().sliced(retain_start_offset..);
586 retain_offset = Some(0);
587 }
588 },
589 LineUse::Done => {
590 should_ret = true;
591 break;
592 },
593 }
594 prev_line = next_line;
595 }
596
597 let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
598
599 if bytes_read < read_size {
602 match line_fn(line_to_sub_slice(prev_line))? {
603 LineUse::ConsumeDiscard => {
604 debug_assert!(retain_offset.is_none());
605 unconsumed_offset += prev_line.len();
606 if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
607 unconsumed_offset += 1;
608 }
609 },
610 LineUse::ConsumeKeep | LineUse::Done => (),
611 }
612 should_ret = true;
613 }
614
615 if let Some(offset) = &mut retain_offset {
616 if *offset == 0 {
617 *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
620 } else {
621 prev_leftover = slice;
622 *offset += unconsumed_offset;
623 }
624 } else {
625 prev_leftover = slice.sliced(unconsumed_offset..);
628 }
629
630 if should_ret {
631 return Ok(prev_leftover);
632 }
633
634 if read_size < ByteSourceReader::<ReaderSource>::ideal_read_size()
635 && fixed_read_size.is_none()
636 {
637 read_size *= 4;
638 }
639 }
640}
641
642fn skip_lines_naive_from_compressed_reader(
643 eol_char: u8,
644 skip_lines: usize,
645 raise_if_empty: bool,
646 reader: &mut CompressedReader,
647) -> PolarsResult<Buffer<u8>> {
648 let mut prev_leftover = Buffer::new();
649
650 if skip_lines == 0 {
651 return Ok(prev_leftover);
652 }
653
654 let mut remaining = skip_lines;
655 let mut read_size = CompressedReader::initial_read_size();
656
657 loop {
658 let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
659 let mut bytes: &[u8] = &slice;
660
661 'inner: loop {
662 let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
663 read_size = read_size.saturating_mul(2);
664 break 'inner;
665 };
666 pos = cmp::min(pos + 1, bytes.len());
667
668 bytes = &bytes[pos..];
669 remaining -= 1;
670
671 if remaining == 0 {
672 let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
673 prev_leftover = slice.sliced(unconsumed_offset..);
674 return Ok(prev_leftover);
675 }
676 }
677
678 if bytes_read == 0 {
679 if raise_if_empty {
680 polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
681 } else {
682 return Ok(Buffer::new());
683 }
684 }
685
686 prev_leftover = Buffer::new();
688
689 if read_size < CompressedReader::ideal_read_size() {
690 read_size *= 4;
691 }
692 }
693}
694
695fn skip_lines_naive(
696 eol_char: u8,
697 skip_lines: usize,
698 raise_if_empty: bool,
699 decompressed_file_size_hint: Option<usize>,
700 reader: &mut ByteSourceReader<ReaderSource>,
701) -> PolarsResult<Buffer<u8>> {
702 let mut prev_leftover = Buffer::new();
703
704 if skip_lines == 0 {
705 return Ok(prev_leftover);
706 }
707
708 let mut remaining = skip_lines;
709 let mut read_size = CompressedReader::initial_read_size();
710
711 loop {
712 let (slice, bytes_read) =
713 reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
714 let mut bytes: &[u8] = &slice;
715
716 'inner: loop {
717 let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
718 read_size = read_size.saturating_mul(2);
719 break 'inner;
720 };
721 pos = cmp::min(pos + 1, bytes.len());
722
723 bytes = &bytes[pos..];
724 remaining -= 1;
725
726 if remaining == 0 {
727 let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
728 prev_leftover = slice.sliced(unconsumed_offset..);
729 return Ok(prev_leftover);
730 }
731 }
732
733 if bytes_read == 0 {
734 if raise_if_empty {
735 polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
736 } else {
737 return Ok(Buffer::new());
738 }
739 }
740
741 prev_leftover = Buffer::new();
743
744 if read_size < CompressedReader::ideal_read_size() {
745 read_size *= 4;
746 }
747 }
748}
749
750fn infer_schema(
751 header_line: &Option<Buffer<u8>>,
752 content_lines: &[Buffer<u8>],
753 infer_all_as_str: bool,
754 options: &CsvReadOptions,
755 projected_schema: Option<SchemaRef>,
756) -> PolarsResult<Schema> {
757 let has_no_inference_data = if options.has_header {
758 header_line.is_none()
759 } else {
760 content_lines.is_empty()
761 };
762
763 if options.raise_if_empty && has_no_inference_data {
764 polars_bail!(NoData: "empty CSV");
765 }
766
767 let mut inferred_schema = if has_no_inference_data {
768 Schema::default()
769 } else {
770 infer_file_schema_impl(
771 header_line,
772 content_lines,
773 infer_all_as_str,
774 &options.parse_options,
775 options.schema_overwrite.as_deref(),
776 )
777 };
778
779 if let Some(schema) = &options.schema {
780 if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
784 polars_bail!(
785 SchemaMismatch:
786 "provided schema does not match number of columns in file ({} != {} in file)",
787 schema.len(),
788 inferred_schema.len(),
789 );
790 }
791
792 if options.parse_options.truncate_ragged_lines {
793 inferred_schema = Arc::unwrap_or_clone(schema.clone());
794 } else {
795 inferred_schema = schema
796 .iter_names()
797 .zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
798 .map(|(name, dtype)| (name.clone(), dtype))
799 .collect();
800 }
801 }
802
803 if let Some(dtypes) = options.dtype_overwrite.as_deref() {
804 for (i, dtype) in dtypes.iter().enumerate() {
805 inferred_schema.set_dtype_at_index(i, dtype.clone());
806 }
807 }
808
809 if let Some(projected_schema) = projected_schema {
812 for (name, inferred_dtype) in inferred_schema.iter_mut() {
813 if let Some(projected_dtype) = projected_schema.get(name) {
814 *inferred_dtype = projected_dtype.clone();
815 }
816 }
817 }
818
819 Ok(inferred_schema)
820}