1pub(super) mod batched;
2
3use std::fmt;
4use std::sync::Mutex;
5use std::sync::atomic::{AtomicUsize, Ordering};
6
7use polars_core::POOL;
8use polars_core::prelude::*;
9use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
10#[cfg(feature = "polars-time")]
11use polars_time::prelude::*;
12use rayon::prelude::*;
13
14use super::CsvParseOptions;
15use super::buffer::init_buffers;
16use super::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
17use super::parser::{
18 CountLines, SplitLines, is_comment_line, parse_lines, skip_bom, skip_line_ending,
19 skip_lines_naive, skip_this_line,
20};
21use super::reader::prepare_csv_schema;
22use super::schema_inference::{check_decimal_comma, infer_file_schema};
23#[cfg(feature = "decompress")]
24use super::utils::decompress;
25use crate::RowIndex;
26use crate::csv::read::parser::skip_this_line_naive;
27use crate::mmap::ReaderBytes;
28use crate::predicates::PhysicalIoExpr;
29use crate::utils::compression::SupportedCompression;
30use crate::utils::update_row_counts2;
31
32pub fn cast_columns(
33 df: &mut DataFrame,
34 to_cast: &[Field],
35 parallel: bool,
36 ignore_errors: bool,
37) -> PolarsResult<()> {
38 let cast_fn = |c: &Column, fld: &Field| {
39 let out = match (c.dtype(), fld.dtype()) {
40 #[cfg(feature = "temporal")]
41 (DataType::String, DataType::Date) => c
42 .str()
43 .unwrap()
44 .as_date(None, false)
45 .map(|ca| ca.into_column()),
46 #[cfg(feature = "temporal")]
47 (DataType::String, DataType::Time) => c
48 .str()
49 .unwrap()
50 .as_time(None, false)
51 .map(|ca| ca.into_column()),
52 #[cfg(feature = "temporal")]
53 (DataType::String, DataType::Datetime(tu, _)) => c
54 .str()
55 .unwrap()
56 .as_datetime(
57 None,
58 *tu,
59 false,
60 false,
61 None,
62 &StringChunked::from_iter(std::iter::once("raise")),
63 )
64 .map(|ca| ca.into_column()),
65 (_, dt) => c.cast(dt),
66 }?;
67 if !ignore_errors && c.null_count() != out.null_count() {
68 handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
69 }
70 Ok(out)
71 };
72
73 if parallel {
74 let cols = POOL.install(|| {
75 df.get_columns()
76 .into_par_iter()
77 .map(|s| {
78 if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
79 cast_fn(s, fld)
80 } else {
81 Ok(s.clone())
82 }
83 })
84 .collect::<PolarsResult<Vec<_>>>()
85 })?;
86 *df = unsafe { DataFrame::new_no_checks(df.height(), cols) }
87 } else {
88 for fld in to_cast {
90 if let Some(idx) = df.get_column_index(fld.name()) {
92 df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
93 }
94 }
95 }
96 Ok(())
97}
98
99pub(crate) struct CoreReader<'a> {
101 reader_bytes: Option<ReaderBytes<'a>>,
102 schema: SchemaRef,
104 parse_options: CsvParseOptions,
105 projection: Option<Vec<usize>>,
107 current_line: usize,
109 ignore_errors: bool,
110 skip_lines: usize,
111 skip_rows_before_header: usize,
112 skip_rows_after_header: usize,
114 n_rows: Option<usize>,
115 n_threads: Option<usize>,
116 has_header: bool,
117 chunk_size: usize,
118 null_values: Option<NullValuesCompiled>,
119 predicate: Option<Arc<dyn PhysicalIoExpr>>,
120 to_cast: Vec<Field>,
121 row_index: Option<RowIndex>,
122 #[cfg_attr(not(feature = "dtype-categorical"), allow(unused))]
123 has_categorical: bool,
124}
125
126impl fmt::Debug for CoreReader<'_> {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 f.debug_struct("Reader")
129 .field("schema", &self.schema)
130 .field("projection", &self.projection)
131 .field("current_line", &self.current_line)
132 .finish()
133 }
134}
135
136impl<'a> CoreReader<'a> {
137 #[allow(clippy::too_many_arguments)]
138 pub(crate) fn new(
139 reader_bytes: ReaderBytes<'a>,
140 parse_options: Arc<CsvParseOptions>,
141 n_rows: Option<usize>,
142 skip_rows: usize,
143 skip_lines: usize,
144 mut projection: Option<Vec<usize>>,
145 max_records: Option<usize>,
146 has_header: bool,
147 ignore_errors: bool,
148 schema: Option<SchemaRef>,
149 columns: Option<Arc<[PlSmallStr]>>,
150 mut n_threads: Option<usize>,
151 schema_overwrite: Option<SchemaRef>,
152 dtype_overwrite: Option<Arc<Vec<DataType>>>,
153 chunk_size: usize,
154 predicate: Option<Arc<dyn PhysicalIoExpr>>,
155 mut to_cast: Vec<Field>,
156 skip_rows_after_header: usize,
157 row_index: Option<RowIndex>,
158 raise_if_empty: bool,
159 ) -> PolarsResult<CoreReader<'a>> {
160 let separator = parse_options.separator;
161
162 check_decimal_comma(parse_options.decimal_comma, separator)?;
163 #[cfg(feature = "decompress")]
164 let mut reader_bytes = reader_bytes;
165
166 if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
167 polars_bail!(
168 ComputeError: "cannot read compressed CSV file; \
169 compile with feature 'decompress'"
170 );
171 }
172 #[cfg(feature = "decompress")]
176 {
177 let total_n_rows =
178 n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
179 if let Some(b) = decompress(
180 &reader_bytes,
181 total_n_rows,
182 separator,
183 parse_options.quote_char,
184 parse_options.eol_char,
185 ) {
186 reader_bytes = ReaderBytes::Owned(b.into());
187 }
188 }
189
190 let mut schema = match schema {
191 Some(schema) => schema,
192 None => {
193 let (inferred_schema, _, _) = infer_file_schema(
194 &reader_bytes,
195 &parse_options,
196 max_records,
197 has_header,
198 schema_overwrite.as_deref(),
199 skip_rows,
200 skip_lines,
201 skip_rows_after_header,
202 raise_if_empty,
203 &mut n_threads,
204 )?;
205 Arc::new(inferred_schema)
206 },
207 };
208 if let Some(dtypes) = dtype_overwrite {
209 polars_ensure!(
210 dtypes.len() <= schema.len(),
211 InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
212 );
213 let s = Arc::make_mut(&mut schema);
214 for (index, dt) in dtypes.iter().enumerate() {
215 s.set_dtype_at_index(index, dt.clone()).unwrap();
216 }
217 }
218
219 let has_categorical = prepare_csv_schema(&mut schema, &mut to_cast)?;
220
221 let null_values = parse_options
223 .null_values
224 .as_ref()
225 .map(|nv| nv.clone().compile(&schema))
226 .transpose()?;
227
228 if let Some(cols) = columns {
229 let mut prj = Vec::with_capacity(cols.len());
230 for col in cols.as_ref() {
231 let i = schema.try_index_of(col)?;
232 prj.push(i);
233 }
234 projection = Some(prj);
235 }
236
237 Ok(CoreReader {
238 reader_bytes: Some(reader_bytes),
239 parse_options: (*parse_options).clone(),
240 schema,
241 projection,
242 current_line: usize::from(has_header),
243 ignore_errors,
244 skip_lines,
245 skip_rows_before_header: skip_rows,
246 skip_rows_after_header,
247 n_rows,
248 n_threads,
249 has_header,
250 chunk_size,
251 null_values,
252 predicate,
253 to_cast,
254 row_index,
255 has_categorical,
256 })
257 }
258
259 fn find_starting_point<'b>(
260 &self,
261 bytes: &'b [u8],
262 quote_char: Option<u8>,
263 eol_char: u8,
264 ) -> PolarsResult<(&'b [u8], Option<usize>)> {
265 let i = find_starting_point(
266 bytes,
267 quote_char,
268 eol_char,
269 self.schema.len(),
270 self.skip_lines,
271 self.skip_rows_before_header,
272 self.skip_rows_after_header,
273 self.parse_options.comment_prefix.as_ref(),
274 self.has_header,
275 )?;
276
277 Ok((&bytes[i..], (i <= bytes.len()).then_some(i)))
278 }
279
280 fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
281 self.projection
284 .take()
285 .map(|mut v| {
286 v.sort_unstable();
287 if let Some(idx) = v.last() {
288 polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
289 }
290 Ok(v)
291 })
292 .unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
293 }
294
295 fn read_chunk(
296 &self,
297 bytes: &[u8],
298 projection: &[usize],
299 bytes_offset: usize,
300 capacity: usize,
301 starting_point_offset: Option<usize>,
302 stop_at_nbytes: usize,
303 ) -> PolarsResult<DataFrame> {
304 let mut df = read_chunk(
305 bytes,
306 &self.parse_options,
307 self.schema.as_ref(),
308 self.ignore_errors,
309 projection,
310 bytes_offset,
311 capacity,
312 self.null_values.as_ref(),
313 usize::MAX,
314 stop_at_nbytes,
315 starting_point_offset,
316 )?;
317
318 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
319 Ok(df)
320 }
321
322 fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
323 let (bytes, _) = self.find_starting_point(
324 bytes,
325 self.parse_options.quote_char,
326 self.parse_options.eol_char,
327 )?;
328
329 let projection = self.get_projection()?;
330
331 if bytes.is_empty() {
333 let mut df = if projection.len() == self.schema.len() {
334 DataFrame::empty_with_schema(self.schema.as_ref())
335 } else {
336 DataFrame::empty_with_schema(
337 &projection
338 .iter()
339 .map(|&i| self.schema.get_at_index(i).unwrap())
340 .map(|(name, dtype)| Field {
341 name: name.clone(),
342 dtype: dtype.clone(),
343 })
344 .collect::<Schema>(),
345 )
346 };
347 if let Some(ref row_index) = self.row_index {
348 df.insert_column(0, Series::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
349 }
350 return Ok(df);
351 }
352
353 let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
354
355 let n_parts_hint = n_threads * 16;
360 let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
361
362 #[cfg(debug_assertions)]
364 let min_chunk_size = 64;
365 #[cfg(not(debug_assertions))]
366 let min_chunk_size = 1024 * 4;
367
368 let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
369 let mut total_bytes_offset = 0;
370
371 let results = Arc::new(Mutex::new(vec![]));
372 let total_line_count = &AtomicUsize::new(0);
374
375 #[cfg(not(target_family = "wasm"))]
376 let pool;
377 #[cfg(not(target_family = "wasm"))]
378 let pool = if n_threads == POOL.current_num_threads() {
379 &POOL
380 } else {
381 pool = rayon::ThreadPoolBuilder::new()
382 .num_threads(n_threads)
383 .build()
384 .map_err(|_| polars_err!(ComputeError: "could not spawn threads"))?;
385 &pool
386 };
387 #[cfg(target_family = "wasm")]
388 let pool = &POOL;
389
390 let counter = CountLines::new(self.parse_options.quote_char, self.parse_options.eol_char);
391 let mut total_offset = 0;
392 let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
393 && self.schema.iter_fields().any(|f| f.dtype().is_string());
394
395 pool.scope(|s| {
396 loop {
397 let b = unsafe { bytes.get_unchecked(total_offset..) };
398 if b.is_empty() {
399 break;
400 }
401 debug_assert!(
402 total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
403 );
404 let (count, position) = counter.find_next(b, &mut chunk_size);
405 debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
406
407 let (b, count) = if count == 0
408 && unsafe { b.as_ptr().add(b.len()) == bytes.as_ptr().add(bytes.len()) }
409 {
410 total_offset = bytes.len();
411 (b, 1)
412 } else {
413 if count == 0 {
414 chunk_size *= 2;
415 continue;
416 }
417
418 let end = total_offset + position + 1;
419 let b = unsafe { bytes.get_unchecked(total_offset..end) };
420
421 total_offset = end;
422 (b, count)
423 };
424
425 if !b.is_empty() {
426 let results = results.clone();
427 let projection = projection.as_ref();
428 let slf = &(*self);
429 s.spawn(move |_| {
430 if check_utf8 && !super::buffer::validate_utf8(b) {
431 let mut results = results.lock().unwrap();
432 results.push((
433 b.as_ptr() as usize,
434 Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
435 ));
436 return;
437 }
438
439 let result = slf
440 .read_chunk(b, projection, 0, count, Some(0), b.len())
441 .and_then(|mut df| {
442 debug_assert!(df.height() <= count);
443
444 if slf.n_rows.is_some() {
445 total_line_count.fetch_add(df.height(), Ordering::Relaxed);
446 }
447
448 if let Some(rc) = &slf.row_index {
450 let offset = if b.as_ptr() == bytes.as_ptr() {
452 Some(rc.offset)
453 } else {
454 None
455 };
456
457 df.with_row_index_mut(rc.name.clone(), offset);
458 };
459
460 if let Some(predicate) = slf.predicate.as_ref() {
461 let s = predicate.evaluate_io(&df)?;
462 let mask = s.bool()?;
463 df = df.filter(mask)?;
464 }
465 Ok(df)
466 });
467
468 results.lock().unwrap().push((b.as_ptr() as usize, result));
469 });
470
471 if self.n_rows.is_some()
474 && total_line_count.load(Ordering::Relaxed) > self.n_rows.unwrap()
475 {
476 break;
477 }
478 }
479 total_bytes_offset += b.len();
480 }
481 });
482 let mut results = std::mem::take(&mut *results.lock().unwrap());
483 results.sort_unstable_by_key(|k| k.0);
484 let mut dfs = results
485 .into_iter()
486 .map(|k| k.1)
487 .collect::<PolarsResult<Vec<_>>>()?;
488
489 if let Some(rc) = &self.row_index {
490 update_row_counts2(&mut dfs, rc.offset)
491 };
492 accumulate_dataframes_vertical(dfs)
493 }
494
495 pub fn finish(mut self) -> PolarsResult<DataFrame> {
497 #[cfg(feature = "dtype-categorical")]
498 let mut _cat_lock = if self.has_categorical {
499 Some(polars_core::StringCacheHolder::hold())
500 } else {
501 None
502 };
503
504 let reader_bytes = self.reader_bytes.take().unwrap();
505
506 let mut df = self.parse_csv(&reader_bytes)?;
507
508 if let Some(n_rows) = self.n_rows {
511 if n_rows < df.height() {
512 df = df.slice(0, n_rows)
513 }
514 }
515 Ok(df)
516 }
517}
518
519#[allow(clippy::too_many_arguments)]
520pub fn read_chunk(
521 bytes: &[u8],
522 parse_options: &CsvParseOptions,
523 schema: &Schema,
524 ignore_errors: bool,
525 projection: &[usize],
526 bytes_offset_thread: usize,
527 capacity: usize,
528 null_values: Option<&NullValuesCompiled>,
529 chunk_size: usize,
530 stop_at_nbytes: usize,
531 starting_point_offset: Option<usize>,
532) -> PolarsResult<DataFrame> {
533 let mut read = bytes_offset_thread;
534 let mut buffers = init_buffers(
540 projection,
541 capacity + 1,
542 schema,
543 parse_options.quote_char,
544 parse_options.encoding,
545 parse_options.decimal_comma,
546 )?;
547
548 debug_assert!(projection.is_sorted());
549
550 let mut last_read = usize::MAX;
551 loop {
552 if read >= stop_at_nbytes || read == last_read {
553 break;
554 }
555 let local_bytes = &bytes[read..stop_at_nbytes];
556
557 last_read = read;
558 let offset = read + starting_point_offset.unwrap();
559 read += parse_lines(
560 local_bytes,
561 parse_options,
562 offset,
563 ignore_errors,
564 null_values,
565 projection,
566 &mut buffers,
567 chunk_size,
568 schema.len(),
569 schema,
570 )?;
571 }
572
573 let columns = buffers
574 .into_iter()
575 .map(|buf| buf.into_series().map(Column::from))
576 .collect::<PolarsResult<Vec<_>>>()?;
577 Ok(unsafe { DataFrame::new_no_checks_height_from_first(columns) })
578}
579
580#[allow(clippy::too_many_arguments)]
581pub fn find_starting_point(
582 mut bytes: &[u8],
583 quote_char: Option<u8>,
584 eol_char: u8,
585 schema_len: usize,
586 skip_lines: usize,
587 skip_rows_before_header: usize,
588 skip_rows_after_header: usize,
589 comment_prefix: Option<&CommentPrefix>,
590 has_header: bool,
591) -> PolarsResult<usize> {
592 let full_len = bytes.len();
593 let starting_point_offset = bytes.as_ptr() as usize;
594
595 bytes = if skip_lines > 0 {
596 polars_ensure!(skip_rows_before_header == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set");
597 skip_lines_naive(bytes, eol_char, skip_lines)
598 } else {
599 bytes = skip_bom(bytes);
601
602 if schema_len > 1 {
605 bytes = skip_line_ending(bytes, eol_char)
606 }
607 bytes
608 };
609
610 if skip_rows_before_header > 0 {
612 let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
613 let mut current_line = &bytes[..0];
614
615 for _ in 0..skip_rows_before_header {
616 current_line = split_lines
617 .next()
618 .ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
619 }
620
621 current_line = split_lines
622 .next()
623 .unwrap_or(¤t_line[current_line.len()..]);
624 bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
625 }
626
627 while is_comment_line(bytes, comment_prefix) {
629 bytes = skip_this_line_naive(bytes, eol_char);
630 }
631
632 if has_header {
634 bytes = skip_this_line(bytes, quote_char, eol_char);
635 }
636 if skip_rows_after_header > 0 {
638 let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
639 let mut current_line = &bytes[..0];
640
641 for _ in 0..skip_rows_after_header {
642 current_line = split_lines
643 .next()
644 .ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
645 }
646
647 current_line = split_lines
648 .next()
649 .unwrap_or(¤t_line[current_line.len()..]);
650 bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
651 }
652
653 Ok(
654 if bytes.is_empty() {
657 full_len
658 } else {
659 bytes.as_ptr() as usize - starting_point_offset
660 },
661 )
662}