1pub(super) mod batched;
2
3use std::fmt;
4use std::sync::Mutex;
5use std::sync::atomic::{AtomicUsize, Ordering};
6
7use polars_core::POOL;
8use polars_core::prelude::*;
9use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
10#[cfg(feature = "polars-time")]
11use polars_time::prelude::*;
12use rayon::prelude::*;
13
14use super::CsvParseOptions;
15use super::buffer::init_buffers;
16use super::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
17use super::parser::{
18 CountLines, SplitLines, is_comment_line, parse_lines, skip_bom, skip_line_ending,
19 skip_lines_naive, skip_this_line,
20};
21use super::reader::prepare_csv_schema;
22use super::schema_inference::{check_decimal_comma, infer_file_schema};
23#[cfg(feature = "decompress")]
24use super::utils::decompress;
25use crate::RowIndex;
26use crate::csv::read::parser::skip_this_line_naive;
27use crate::mmap::ReaderBytes;
28use crate::predicates::PhysicalIoExpr;
29use crate::utils::compression::SupportedCompression;
30use crate::utils::update_row_counts2;
31
32pub fn cast_columns(
33 df: &mut DataFrame,
34 to_cast: &[Field],
35 parallel: bool,
36 ignore_errors: bool,
37) -> PolarsResult<()> {
38 let cast_fn = |c: &Column, fld: &Field| {
39 let out = match (c.dtype(), fld.dtype()) {
40 #[cfg(feature = "temporal")]
41 (DataType::String, DataType::Date) => c
42 .str()
43 .unwrap()
44 .as_date(None, false)
45 .map(|ca| ca.into_column()),
46 #[cfg(feature = "temporal")]
47 (DataType::String, DataType::Time) => c
48 .str()
49 .unwrap()
50 .as_time(None, false)
51 .map(|ca| ca.into_column()),
52 #[cfg(feature = "temporal")]
53 (DataType::String, DataType::Datetime(tu, _)) => c
54 .str()
55 .unwrap()
56 .as_datetime(
57 None,
58 *tu,
59 false,
60 false,
61 None,
62 &StringChunked::from_iter(std::iter::once("raise")),
63 )
64 .map(|ca| ca.into_column()),
65 (_, dt) => c.cast(dt),
66 }?;
67 if !ignore_errors && c.null_count() != out.null_count() {
68 handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
69 }
70 Ok(out)
71 };
72
73 if parallel {
74 let cols = POOL.install(|| {
75 df.get_columns()
76 .into_par_iter()
77 .map(|s| {
78 if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
79 cast_fn(s, fld)
80 } else {
81 Ok(s.clone())
82 }
83 })
84 .collect::<PolarsResult<Vec<_>>>()
85 })?;
86 *df = unsafe { DataFrame::new_no_checks(df.height(), cols) }
87 } else {
88 for fld in to_cast {
90 if let Some(idx) = df.get_column_index(fld.name()) {
92 df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
93 }
94 }
95
96 df.clear_schema();
97 }
98 Ok(())
99}
100
101pub(crate) struct CoreReader<'a> {
103 reader_bytes: Option<ReaderBytes<'a>>,
104 schema: SchemaRef,
106 parse_options: CsvParseOptions,
107 projection: Option<Vec<usize>>,
109 current_line: usize,
111 ignore_errors: bool,
112 skip_lines: usize,
113 skip_rows_before_header: usize,
114 skip_rows_after_header: usize,
116 n_rows: Option<usize>,
117 n_threads: Option<usize>,
118 has_header: bool,
119 chunk_size: usize,
120 null_values: Option<NullValuesCompiled>,
121 predicate: Option<Arc<dyn PhysicalIoExpr>>,
122 to_cast: Vec<Field>,
123 row_index: Option<RowIndex>,
124 #[cfg_attr(not(feature = "dtype-categorical"), allow(unused))]
125 has_categorical: bool,
126}
127
128impl fmt::Debug for CoreReader<'_> {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("Reader")
131 .field("schema", &self.schema)
132 .field("projection", &self.projection)
133 .field("current_line", &self.current_line)
134 .finish()
135 }
136}
137
138impl<'a> CoreReader<'a> {
139 #[allow(clippy::too_many_arguments)]
140 pub(crate) fn new(
141 reader_bytes: ReaderBytes<'a>,
142 parse_options: Arc<CsvParseOptions>,
143 n_rows: Option<usize>,
144 skip_rows: usize,
145 skip_lines: usize,
146 mut projection: Option<Vec<usize>>,
147 max_records: Option<usize>,
148 has_header: bool,
149 ignore_errors: bool,
150 schema: Option<SchemaRef>,
151 columns: Option<Arc<[PlSmallStr]>>,
152 n_threads: Option<usize>,
153 schema_overwrite: Option<SchemaRef>,
154 dtype_overwrite: Option<Arc<Vec<DataType>>>,
155 chunk_size: usize,
156 predicate: Option<Arc<dyn PhysicalIoExpr>>,
157 mut to_cast: Vec<Field>,
158 skip_rows_after_header: usize,
159 row_index: Option<RowIndex>,
160 raise_if_empty: bool,
161 ) -> PolarsResult<CoreReader<'a>> {
162 let separator = parse_options.separator;
163
164 check_decimal_comma(parse_options.decimal_comma, separator)?;
165 #[cfg(feature = "decompress")]
166 let mut reader_bytes = reader_bytes;
167
168 if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
169 polars_bail!(
170 ComputeError: "cannot read compressed CSV file; \
171 compile with feature 'decompress'"
172 );
173 }
174 #[cfg(feature = "decompress")]
178 {
179 let total_n_rows =
180 n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
181 if let Some(b) = decompress(
182 &reader_bytes,
183 total_n_rows,
184 separator,
185 parse_options.quote_char,
186 parse_options.eol_char,
187 ) {
188 reader_bytes = ReaderBytes::Owned(b.into());
189 }
190 }
191
192 let mut schema = match schema {
193 Some(schema) => schema,
194 None => {
195 let (inferred_schema, _, _) = infer_file_schema(
196 &reader_bytes,
197 &parse_options,
198 max_records,
199 has_header,
200 schema_overwrite.as_deref(),
201 skip_rows,
202 skip_lines,
203 skip_rows_after_header,
204 raise_if_empty,
205 )?;
206 Arc::new(inferred_schema)
207 },
208 };
209 if let Some(dtypes) = dtype_overwrite {
210 polars_ensure!(
211 dtypes.len() <= schema.len(),
212 InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
213 );
214 let s = Arc::make_mut(&mut schema);
215 for (index, dt) in dtypes.iter().enumerate() {
216 s.set_dtype_at_index(index, dt.clone()).unwrap();
217 }
218 }
219
220 let has_categorical = prepare_csv_schema(&mut schema, &mut to_cast)?;
221
222 let null_values = parse_options
224 .null_values
225 .as_ref()
226 .map(|nv| nv.clone().compile(&schema))
227 .transpose()?;
228
229 if let Some(cols) = columns {
230 let mut prj = Vec::with_capacity(cols.len());
231 for col in cols.as_ref() {
232 let i = schema.try_index_of(col)?;
233 prj.push(i);
234 }
235 projection = Some(prj);
236 }
237
238 Ok(CoreReader {
239 reader_bytes: Some(reader_bytes),
240 parse_options: (*parse_options).clone(),
241 schema,
242 projection,
243 current_line: usize::from(has_header),
244 ignore_errors,
245 skip_lines,
246 skip_rows_before_header: skip_rows,
247 skip_rows_after_header,
248 n_rows,
249 n_threads,
250 has_header,
251 chunk_size,
252 null_values,
253 predicate,
254 to_cast,
255 row_index,
256 has_categorical,
257 })
258 }
259
260 fn find_starting_point<'b>(
261 &self,
262 bytes: &'b [u8],
263 quote_char: Option<u8>,
264 eol_char: u8,
265 ) -> PolarsResult<(&'b [u8], Option<usize>)> {
266 let i = find_starting_point(
267 bytes,
268 quote_char,
269 eol_char,
270 self.schema.len(),
271 self.skip_lines,
272 self.skip_rows_before_header,
273 self.skip_rows_after_header,
274 self.parse_options.comment_prefix.as_ref(),
275 self.has_header,
276 )?;
277
278 Ok((&bytes[i..], (i <= bytes.len()).then_some(i)))
279 }
280
281 fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
282 self.projection
285 .take()
286 .map(|mut v| {
287 v.sort_unstable();
288 if let Some(idx) = v.last() {
289 polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
290 }
291 Ok(v)
292 })
293 .unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
294 }
295
296 fn read_chunk(
297 &self,
298 bytes: &[u8],
299 projection: &[usize],
300 bytes_offset: usize,
301 capacity: usize,
302 starting_point_offset: Option<usize>,
303 stop_at_nbytes: usize,
304 ) -> PolarsResult<DataFrame> {
305 let mut df = read_chunk(
306 bytes,
307 &self.parse_options,
308 self.schema.as_ref(),
309 self.ignore_errors,
310 projection,
311 bytes_offset,
312 capacity,
313 self.null_values.as_ref(),
314 usize::MAX,
315 stop_at_nbytes,
316 starting_point_offset,
317 )?;
318
319 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
320 Ok(df)
321 }
322
323 fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
324 let (bytes, _) = self.find_starting_point(
325 bytes,
326 self.parse_options.quote_char,
327 self.parse_options.eol_char,
328 )?;
329
330 let projection = self.get_projection()?;
331
332 if bytes.is_empty() {
334 let mut df = if projection.len() == self.schema.len() {
335 DataFrame::empty_with_schema(self.schema.as_ref())
336 } else {
337 DataFrame::empty_with_schema(
338 &projection
339 .iter()
340 .map(|&i| self.schema.get_at_index(i).unwrap())
341 .map(|(name, dtype)| Field {
342 name: name.clone(),
343 dtype: dtype.clone(),
344 })
345 .collect::<Schema>(),
346 )
347 };
348 if let Some(ref row_index) = self.row_index {
349 df.insert_column(0, Series::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
350 }
351 return Ok(df);
352 }
353
354 let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
355
356 let n_parts_hint = n_threads * 16;
361 let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
362
363 #[cfg(debug_assertions)]
365 let min_chunk_size = 64;
366 #[cfg(not(debug_assertions))]
367 let min_chunk_size = 1024 * 4;
368
369 let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
370 let mut total_bytes_offset = 0;
371
372 let results = Arc::new(Mutex::new(vec![]));
373 let total_line_count = &AtomicUsize::new(0);
375
376 #[cfg(not(target_family = "wasm"))]
377 let pool;
378 #[cfg(not(target_family = "wasm"))]
379 let pool = if n_threads == POOL.current_num_threads() {
380 &POOL
381 } else {
382 pool = rayon::ThreadPoolBuilder::new()
383 .num_threads(n_threads)
384 .build()
385 .map_err(|_| polars_err!(ComputeError: "could not spawn threads"))?;
386 &pool
387 };
388 #[cfg(target_family = "wasm")]
389 let pool = &POOL;
390
391 let counter = CountLines::new(self.parse_options.quote_char, self.parse_options.eol_char);
392 let mut total_offset = 0;
393 let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
394 && self.schema.iter_fields().any(|f| f.dtype().is_string());
395
396 pool.scope(|s| {
397 loop {
398 let b = unsafe { bytes.get_unchecked(total_offset..) };
399 if b.is_empty() {
400 break;
401 }
402 debug_assert!(
403 total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
404 );
405 let (count, position) = counter.find_next(b, &mut chunk_size);
406 debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
407
408 let (b, count) = if count == 0
409 && unsafe {
410 std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
411 } {
412 total_offset = bytes.len();
413 (b, 1)
414 } else {
415 if count == 0 {
416 chunk_size *= 2;
417 continue;
418 }
419
420 let end = total_offset + position + 1;
421 let b = unsafe { bytes.get_unchecked(total_offset..end) };
422
423 total_offset = end;
424 (b, count)
425 };
426
427 if !b.is_empty() {
428 let results = results.clone();
429 let projection = projection.as_ref();
430 let slf = &(*self);
431 s.spawn(move |_| {
432 if check_utf8 && !super::buffer::validate_utf8(b) {
433 let mut results = results.lock().unwrap();
434 results.push((
435 b.as_ptr() as usize,
436 Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
437 ));
438 return;
439 }
440
441 let result = slf
442 .read_chunk(b, projection, 0, count, Some(0), b.len())
443 .and_then(|mut df| {
444 debug_assert!(df.height() <= count);
445
446 if slf.n_rows.is_some() {
447 total_line_count.fetch_add(df.height(), Ordering::Relaxed);
448 }
449
450 if let Some(rc) = &slf.row_index {
452 let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {
454 Some(rc.offset)
455 } else {
456 None
457 };
458
459 unsafe { df.with_row_index_mut(rc.name.clone(), offset) };
460 };
461
462 if let Some(predicate) = slf.predicate.as_ref() {
463 let s = predicate.evaluate_io(&df)?;
464 let mask = s.bool()?;
465 df = df.filter(mask)?;
466 }
467 Ok(df)
468 });
469
470 results.lock().unwrap().push((b.as_ptr() as usize, result));
471 });
472
473 if self.n_rows.is_some()
476 && total_line_count.load(Ordering::Relaxed) > self.n_rows.unwrap()
477 {
478 break;
479 }
480 }
481 total_bytes_offset += b.len();
482 }
483 });
484 let mut results = std::mem::take(&mut *results.lock().unwrap());
485 results.sort_unstable_by_key(|k| k.0);
486 let mut dfs = results
487 .into_iter()
488 .map(|k| k.1)
489 .collect::<PolarsResult<Vec<_>>>()?;
490
491 if let Some(rc) = &self.row_index {
492 update_row_counts2(&mut dfs, rc.offset)
493 };
494 accumulate_dataframes_vertical(dfs)
495 }
496
497 pub fn finish(mut self) -> PolarsResult<DataFrame> {
499 #[cfg(feature = "dtype-categorical")]
500 let mut _cat_lock = if self.has_categorical {
501 Some(polars_core::StringCacheHolder::hold())
502 } else {
503 None
504 };
505
506 let reader_bytes = self.reader_bytes.take().unwrap();
507
508 let mut df = self.parse_csv(&reader_bytes)?;
509
510 if let Some(n_rows) = self.n_rows {
513 if n_rows < df.height() {
514 df = df.slice(0, n_rows)
515 }
516 }
517 Ok(df)
518 }
519}
520
521#[allow(clippy::too_many_arguments)]
522pub fn read_chunk(
523 bytes: &[u8],
524 parse_options: &CsvParseOptions,
525 schema: &Schema,
526 ignore_errors: bool,
527 projection: &[usize],
528 bytes_offset_thread: usize,
529 capacity: usize,
530 null_values: Option<&NullValuesCompiled>,
531 chunk_size: usize,
532 stop_at_nbytes: usize,
533 starting_point_offset: Option<usize>,
534) -> PolarsResult<DataFrame> {
535 let mut read = bytes_offset_thread;
536 let mut buffers = init_buffers(
542 projection,
543 capacity + 1,
544 schema,
545 parse_options.quote_char,
546 parse_options.encoding,
547 parse_options.decimal_comma,
548 )?;
549
550 debug_assert!(projection.is_sorted());
551
552 let mut last_read = usize::MAX;
553 loop {
554 if read >= stop_at_nbytes || read == last_read {
555 break;
556 }
557 let local_bytes = &bytes[read..stop_at_nbytes];
558
559 last_read = read;
560 let offset = read + starting_point_offset.unwrap();
561 read += parse_lines(
562 local_bytes,
563 parse_options,
564 offset,
565 ignore_errors,
566 null_values,
567 projection,
568 &mut buffers,
569 chunk_size,
570 schema.len(),
571 schema,
572 )?;
573 }
574
575 let columns = buffers
576 .into_iter()
577 .map(|buf| buf.into_series().map(Column::from))
578 .collect::<PolarsResult<Vec<_>>>()?;
579 Ok(unsafe { DataFrame::new_no_checks_height_from_first(columns) })
580}
581
582#[allow(clippy::too_many_arguments)]
583pub fn find_starting_point(
584 mut bytes: &[u8],
585 quote_char: Option<u8>,
586 eol_char: u8,
587 schema_len: usize,
588 skip_lines: usize,
589 skip_rows_before_header: usize,
590 skip_rows_after_header: usize,
591 comment_prefix: Option<&CommentPrefix>,
592 has_header: bool,
593) -> PolarsResult<usize> {
594 let full_len = bytes.len();
595 let starting_point_offset = bytes.as_ptr() as usize;
596
597 bytes = if skip_lines > 0 {
598 polars_ensure!(skip_rows_before_header == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set");
599 skip_lines_naive(bytes, eol_char, skip_lines)
600 } else {
601 bytes = skip_bom(bytes);
603
604 if schema_len > 1 {
607 bytes = skip_line_ending(bytes, eol_char)
608 }
609 bytes
610 };
611
612 if skip_rows_before_header > 0 {
614 let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
615 let mut current_line = &bytes[..0];
616
617 for _ in 0..skip_rows_before_header {
618 current_line = split_lines
619 .next()
620 .ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
621 }
622
623 current_line = split_lines
624 .next()
625 .unwrap_or(¤t_line[current_line.len()..]);
626 bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
627 }
628
629 while is_comment_line(bytes, comment_prefix) {
631 bytes = skip_this_line_naive(bytes, eol_char);
632 }
633
634 if has_header {
636 bytes = skip_this_line(bytes, quote_char, eol_char);
637 }
638 if skip_rows_after_header > 0 {
640 let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
641 let mut current_line = &bytes[..0];
642
643 for _ in 0..skip_rows_after_header {
644 current_line = split_lines
645 .next()
646 .ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
647 }
648
649 current_line = split_lines
650 .next()
651 .unwrap_or(¤t_line[current_line.len()..]);
652 bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
653 }
654
655 Ok(
656 if bytes.is_empty() {
659 full_len
660 } else {
661 bytes.as_ptr() as usize - starting_point_offset
662 },
663 )
664}