1use std::cmp;
2use std::iter::Iterator;
3use std::sync::Arc;
4
5use polars_core::prelude::Schema;
6use polars_core::schema::SchemaRef;
7use polars_error::{PolarsResult, polars_bail, polars_ensure};
8use polars_utils::mmap::MemSlice;
9
10use crate::csv::read::schema_inference::infer_file_schema_impl;
11use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
12use crate::prelude::{CsvParseOptions, CsvReadOptions};
13use crate::utils::compression::CompressedReader;
14
15pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
16
17#[inline(never)]
30pub fn read_until_start_and_infer_schema(
31 options: &CsvReadOptions,
32 projected_schema: Option<SchemaRef>,
33 mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
34 reader: &mut CompressedReader,
35) -> PolarsResult<(Schema, MemSlice)> {
36 #[derive(Copy, Clone)]
37 enum State {
38 SkipEmpty,
40 SkipRowsBeforeHeader(usize),
41 SkipHeader(bool),
42 SkipRowsAfterHeader(usize),
43 ContentInspect,
44 InferCollect,
45 Done,
46 }
47
48 polars_ensure!(
49 !(options.skip_lines != 0 && options.skip_rows != 0),
50 InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
51 );
52
53 let prev_leftover = skip_lines_naive(
56 options.parse_options.eol_char,
57 options.skip_lines,
58 options.raise_if_empty,
59 reader,
60 )?;
61
62 let mut state = if options.has_header {
63 State::SkipEmpty
64 } else if options.skip_lines != 0 {
65 State::SkipHeader(false)
68 } else {
69 State::SkipRowsBeforeHeader(options.skip_rows)
70 };
71
72 let comment_prefix = options.parse_options.comment_prefix.as_ref();
73 let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
74
75 let mut header_line = None;
76 let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or(256));
77
78 let leftover = for_each_line_from_reader(
79 &options.parse_options,
80 true,
81 prev_leftover,
82 reader,
83 |mem_slice_line| {
84 let line = &*mem_slice_line;
85
86 let done = loop {
87 match &mut state {
88 State::SkipEmpty => {
89 if line.is_empty() || line == b"\r" {
90 break LineUse::ConsumeDiscard;
91 }
92
93 state = State::SkipRowsBeforeHeader(options.skip_rows);
94 },
95 State::SkipRowsBeforeHeader(remaining) => {
96 let is_comment = is_comment_line(line, comment_prefix);
97
98 if *remaining == 0 && !is_comment {
99 state = State::SkipHeader(false);
100 continue;
101 }
102
103 *remaining -= !is_comment as usize;
104 break LineUse::ConsumeDiscard;
105 },
106 State::SkipHeader(did_skip) => {
107 if !options.has_header || *did_skip {
108 state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
109 continue;
110 }
111
112 header_line = Some(mem_slice_line.clone());
113 *did_skip = true;
114 break LineUse::ConsumeDiscard;
115 },
116 State::SkipRowsAfterHeader(remaining) => {
117 let is_comment = is_comment_line(line, comment_prefix);
118
119 if *remaining == 0 && !is_comment {
120 state = State::ContentInspect;
121 continue;
122 }
123
124 *remaining -= !is_comment as usize;
125 break LineUse::ConsumeDiscard;
126 },
127 State::ContentInspect => {
128 if let Some(func) = &mut inspect_first_content_row_fn {
129 func(line);
130 }
131
132 state = State::InferCollect;
133 },
134 State::InferCollect => {
135 if !is_comment_line(line, comment_prefix) {
136 content_lines.push(mem_slice_line.clone());
137 if content_lines.len() >= infer_schema_length {
138 state = State::Done;
139 continue;
140 }
141 }
142
143 break LineUse::ConsumeKeep;
144 },
145 State::Done => {
146 break LineUse::Done;
147 },
148 }
149 };
150
151 Ok(done)
152 },
153 )?;
154
155 let infer_all_as_str = infer_schema_length == 0;
156
157 let inferred_schema = infer_schema(
158 &header_line,
159 &content_lines,
160 infer_all_as_str,
161 options,
162 projected_schema,
163 )?;
164
165 Ok((inferred_schema, leftover))
166}
167
168enum LineUse {
169 ConsumeDiscard,
170 ConsumeKeep,
171 Done,
172}
173
174fn for_each_line_from_reader(
179 parse_options: &CsvParseOptions,
180 is_file_start: bool,
181 mut prev_leftover: MemSlice,
182 reader: &mut CompressedReader,
183 mut line_fn: impl FnMut(MemSlice) -> PolarsResult<LineUse>,
184) -> PolarsResult<MemSlice> {
185 let mut is_first_line = is_file_start;
186
187 let mut read_size = 128 * 1024;
189 let mut retain_offset = None;
190
191 loop {
192 let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
193 if slice.is_empty() {
194 return Ok(MemSlice::EMPTY);
195 }
196
197 if is_first_line {
198 is_first_line = false;
199 const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
200 if slice.get(0..3) == UTF8_BOM_MARKER {
201 slice = slice.slice(3..slice.len());
202 }
203 }
204
205 let line_to_sub_slice = |line: &[u8]| {
206 let start = line.as_ptr() as usize - slice.as_ptr() as usize;
207 slice.slice(start..(start + line.len()))
208 };
209
210 let effective_slice = if let Some(offset) = retain_offset {
212 slice.slice(offset..slice.len())
213 } else {
214 slice.clone()
215 };
216
217 let mut lines = SplitLines::new(
218 &effective_slice,
219 parse_options.quote_char,
220 parse_options.eol_char,
221 parse_options.comment_prefix.as_ref(),
222 );
223 let Some(mut prev_line) = lines.next() else {
224 read_size *= 2;
225 prev_leftover = slice;
226 continue;
227 };
228
229 let mut should_ret = false;
230
231 for next_line in lines {
234 match line_fn(line_to_sub_slice(prev_line))? {
235 LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
236 LineUse::ConsumeKeep => {
237 retain_offset
238 .get_or_insert(prev_line.as_ptr() as usize - slice.as_ptr() as usize);
239 },
240 LineUse::Done => {
241 should_ret = true;
242 break;
243 },
244 }
245 prev_line = next_line;
246 }
247
248 let mut unconsumed_offset = prev_line.as_ptr() as usize - slice.as_ptr() as usize;
249
250 if bytes_read == 0 {
253 match line_fn(line_to_sub_slice(prev_line))? {
254 LineUse::ConsumeDiscard => {
255 unconsumed_offset += prev_line.len();
256 if slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
257 unconsumed_offset += 1;
258 }
259 },
260 LineUse::ConsumeKeep | LineUse::Done => (),
261 }
262 should_ret = true;
263 }
264
265 if retain_offset.is_some() {
266 prev_leftover = slice;
267 } else {
268 prev_leftover = slice.slice(unconsumed_offset..slice.len());
271 }
272
273 if should_ret {
274 let leftover = prev_leftover.slice(retain_offset.unwrap_or(0)..prev_leftover.len());
275 return Ok(leftover);
276 }
277 }
278}
279
280fn skip_lines_naive(
281 eol_char: u8,
282 skip_lines: usize,
283 raise_if_empty: bool,
284 reader: &mut CompressedReader,
285) -> PolarsResult<MemSlice> {
286 let mut prev_leftover = MemSlice::EMPTY;
287
288 if skip_lines == 0 {
289 return Ok(prev_leftover);
290 }
291
292 let mut remaining = skip_lines;
293 let mut read_size = 128 * 1024;
295
296 loop {
297 let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
298 let mut bytes: &[u8] = &slice;
299
300 'inner: loop {
301 let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
302 read_size *= 2;
303 break 'inner;
304 };
305 pos = cmp::min(pos + 1, bytes.len());
306
307 bytes = &bytes[pos..];
308 remaining -= 1;
309
310 if remaining == 0 {
311 let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
312 prev_leftover = slice.slice(unconsumed_offset..slice.len());
313 return Ok(prev_leftover);
314 }
315 }
316
317 if bytes_read == 0 {
318 if raise_if_empty {
319 polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
320 } else {
321 return Ok(MemSlice::EMPTY);
322 }
323 }
324
325 prev_leftover = MemSlice::EMPTY;
327 }
328}
329
330fn infer_schema(
331 header_line: &Option<MemSlice>,
332 content_lines: &[MemSlice],
333 infer_all_as_str: bool,
334 options: &CsvReadOptions,
335 projected_schema: Option<SchemaRef>,
336) -> PolarsResult<Schema> {
337 let has_no_inference_data = if options.has_header {
338 header_line.is_none()
339 } else {
340 content_lines.is_empty()
341 };
342
343 if options.raise_if_empty && has_no_inference_data {
344 polars_bail!(NoData: "empty CSV");
345 }
346
347 let mut inferred_schema = if has_no_inference_data {
348 Schema::default()
349 } else {
350 infer_file_schema_impl(
351 header_line,
352 content_lines,
353 infer_all_as_str,
354 &options.parse_options,
355 options.schema_overwrite.as_deref(),
356 )?
357 };
358
359 if let Some(schema) = &options.schema {
360 if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
364 polars_bail!(
365 SchemaMismatch:
366 "provided schema does not match number of columns in file ({} != {} in file)",
367 schema.len(),
368 inferred_schema.len(),
369 );
370 }
371
372 if options.parse_options.truncate_ragged_lines {
373 inferred_schema = Arc::unwrap_or_clone(schema.clone());
374 } else {
375 inferred_schema = schema
376 .iter_names()
377 .zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
378 .map(|(name, dtype)| (name.clone(), dtype))
379 .collect();
380 }
381 }
382
383 if let Some(dtypes) = options.dtype_overwrite.as_deref() {
384 for (i, dtype) in dtypes.iter().enumerate() {
385 inferred_schema.set_dtype_at_index(i, dtype.clone());
386 }
387 }
388
389 if let Some(projected_schema) = projected_schema {
392 for (name, inferred_dtype) in inferred_schema.iter_mut() {
393 if let Some(projected_dtype) = projected_schema.get(name) {
394 *inferred_dtype = projected_dtype.clone();
395 }
396 }
397 }
398
399 Ok(inferred_schema)
400}