polars_io/csv/read/read_impl/
batched.rsuse std::collections::VecDeque;
use std::ops::Deref;
use polars_core::datatypes::Field;
use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_utils::IdxSize;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::{cast_columns, read_chunk, CoreReader, CountLines};
use crate::csv::read::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
use crate::csv::read::CsvReader;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::update_row_counts2;
use crate::RowIndex;
#[allow(clippy::too_many_arguments)]
pub(crate) fn get_file_chunks_iterator(
offsets: &mut VecDeque<(usize, usize)>,
last_pos: &mut usize,
n_chunks: usize,
chunk_size: &mut usize,
bytes: &[u8],
quote_char: Option<u8>,
eol_char: u8,
) {
let cl = CountLines::new(quote_char, eol_char);
for _ in 0..n_chunks {
let bytes = &bytes[*last_pos..];
if bytes.is_empty() {
break;
}
let position;
loop {
let b = &bytes[..(*chunk_size).min(bytes.len())];
let (count, position_) = cl.count(b);
let (count, position_) = if b.len() == bytes.len() {
(if count != 0 { count } else { 1 }, b.len())
} else {
(
count,
if position_ < b.len() {
1 + position_
} else {
position_
},
)
};
if count == 0 {
*chunk_size *= 2;
continue;
}
position = position_;
break;
}
offsets.push_back((*last_pos, *last_pos + position));
*last_pos += position;
}
}
struct ChunkOffsetIter<'a> {
bytes: &'a [u8],
offsets: VecDeque<(usize, usize)>,
last_offset: usize,
n_chunks: usize,
chunk_size: usize,
#[allow(unused)]
rows_per_batch: usize,
quote_char: Option<u8>,
eol_char: u8,
}
impl Iterator for ChunkOffsetIter<'_> {
type Item = (usize, usize);
fn next(&mut self) -> Option<Self::Item> {
match self.offsets.pop_front() {
Some(offsets) => Some(offsets),
None => {
if self.last_offset == self.bytes.len() {
return None;
}
get_file_chunks_iterator(
&mut self.offsets,
&mut self.last_offset,
self.n_chunks,
&mut self.chunk_size,
self.bytes,
self.quote_char,
self.eol_char,
);
match self.offsets.pop_front() {
Some(offsets) => Some(offsets),
None => {
let out = Some((self.last_offset, self.bytes.len()));
self.last_offset = self.bytes.len();
out
},
}
},
}
}
}
impl<'a> CoreReader<'a> {
pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {
let reader_bytes = self.reader_bytes.take().unwrap();
let bytes = reader_bytes.as_ref();
let (bytes, starting_point_offset) =
self.find_starting_point(bytes, self.quote_char, self.eol_char)?;
let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
let n_parts_hint = n_threads * 16;
let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
#[cfg(debug_assertions)]
let min_chunk_size = 64;
#[cfg(not(debug_assertions))]
let min_chunk_size = 1024 * 4;
let chunk_size = std::cmp::max(chunk_size, min_chunk_size);
let offset_batch_size = 16;
let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };
let file_chunks = ChunkOffsetIter {
bytes,
offsets: VecDeque::with_capacity(offset_batch_size),
last_offset: 0,
n_chunks: offset_batch_size,
chunk_size,
rows_per_batch: self.chunk_size,
quote_char: self.quote_char,
eol_char: self.eol_char,
};
let projection = self.get_projection()?;
#[cfg(feature = "dtype-categorical")]
let _cat_lock = if self.has_categorical {
Some(polars_core::StringCacheHolder::hold())
} else {
None
};
#[cfg(not(feature = "dtype-categorical"))]
let _cat_lock = None;
Ok(BatchedCsvReader {
reader_bytes,
chunk_size: self.chunk_size,
file_chunks_iter: file_chunks,
file_chunks: vec![],
projection,
starting_point_offset,
row_index: self.row_index,
comment_prefix: self.comment_prefix,
quote_char: self.quote_char,
eol_char: self.eol_char,
null_values: self.null_values,
missing_is_null: self.missing_is_null,
to_cast: self.to_cast,
ignore_errors: self.ignore_errors,
truncate_ragged_lines: self.truncate_ragged_lines,
remaining: self.n_rows.unwrap_or(usize::MAX),
encoding: self.encoding,
separator: self.separator,
schema: self.schema,
rows_read: 0,
_cat_lock,
decimal_comma: self.decimal_comma,
})
}
}
pub struct BatchedCsvReader<'a> {
reader_bytes: ReaderBytes<'a>,
chunk_size: usize,
file_chunks_iter: ChunkOffsetIter<'a>,
file_chunks: Vec<(usize, usize)>,
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_index: Option<RowIndex>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
missing_is_null: bool,
truncate_ragged_lines: bool,
to_cast: Vec<Field>,
ignore_errors: bool,
remaining: usize,
encoding: CsvEncoding,
separator: u8,
schema: SchemaRef,
rows_read: IdxSize,
#[cfg(feature = "dtype-categorical")]
_cat_lock: Option<polars_core::StringCacheHolder>,
#[cfg(not(feature = "dtype-categorical"))]
_cat_lock: Option<u8>,
decimal_comma: bool,
}
impl BatchedCsvReader<'_> {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 || self.remaining == 0 {
return Ok(None);
}
let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
self.file_chunks.extend(file_chunks_iter);
if self.file_chunks.is_empty() {
return Ok(None);
}
let chunks = &self.file_chunks;
let mut bytes = self.reader_bytes.deref();
if let Some(pos) = self.starting_point_offset {
bytes = &bytes[pos..];
}
let mut chunks = POOL.install(|| {
chunks
.into_par_iter()
.copied()
.map(|(bytes_offset_thread, stop_at_nbytes)| {
let mut df = read_chunk(
bytes,
self.separator,
self.schema.as_ref(),
self.ignore_errors,
&self.projection,
bytes_offset_thread,
self.quote_char,
self.eol_char,
self.comment_prefix.as_ref(),
self.chunk_size,
self.encoding,
self.null_values.as_ref(),
self.missing_is_null,
self.truncate_ragged_lines,
usize::MAX,
stop_at_nbytes,
self.starting_point_offset,
self.decimal_comma,
)?;
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
if let Some(rc) = &self.row_index {
df.with_row_index_mut(rc.name.clone(), Some(rc.offset));
}
Ok(df)
})
.collect::<PolarsResult<Vec<_>>>()
})?;
self.file_chunks.clear();
if self.row_index.is_some() {
update_row_counts2(&mut chunks, self.rows_read)
}
for df in &mut chunks {
let h = df.height();
if self.remaining < h {
*df = df.slice(0, self.remaining)
};
self.remaining = self.remaining.saturating_sub(h);
self.rows_read += h as IdxSize;
}
Ok(Some(chunks))
}
}
pub struct OwnedBatchedCsvReader {
#[allow(dead_code)]
schema: SchemaRef,
batched_reader: BatchedCsvReader<'static>,
_reader: CsvReader<Box<dyn MmapBytesReader>>,
}
impl OwnedBatchedCsvReader {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
self.batched_reader.next_batches(n)
}
}
pub fn to_batched_owned(
mut reader: CsvReader<Box<dyn MmapBytesReader>>,
) -> PolarsResult<OwnedBatchedCsvReader> {
let batched_reader = reader.batched_borrowed()?;
let schema = batched_reader.schema.clone();
let batched_reader: BatchedCsvReader<'static> = unsafe { std::mem::transmute(batched_reader) };
Ok(OwnedBatchedCsvReader {
schema,
batched_reader,
_reader: reader,
})
}