polars_io/csv/read/read_impl/
batched.rs1use std::collections::VecDeque;
2use std::ops::Deref;
3
4use polars_core::POOL;
5use polars_core::datatypes::Field;
6use polars_core::frame::DataFrame;
7use polars_core::schema::SchemaRef;
8use polars_error::PolarsResult;
9use polars_utils::IdxSize;
10use rayon::iter::{IntoParallelIterator, ParallelIterator};
11
12use super::{CoreReader, CountLines, cast_columns, read_chunk};
13use crate::RowIndex;
14use crate::csv::read::CsvReader;
15use crate::csv::read::options::NullValuesCompiled;
16use crate::mmap::{MmapBytesReader, ReaderBytes};
17use crate::prelude::{CsvParseOptions, update_row_counts2};
18
19#[allow(clippy::too_many_arguments)]
20pub(crate) fn get_file_chunks_iterator(
21 offsets: &mut VecDeque<(usize, usize)>,
22 last_pos: &mut usize,
23 n_chunks: usize,
24 chunk_size: &mut usize,
25 bytes: &[u8],
26 quote_char: Option<u8>,
27 eol_char: u8,
28) {
29 let cl = CountLines::new(quote_char, eol_char);
30
31 for _ in 0..n_chunks {
32 let bytes = &bytes[*last_pos..];
33
34 if bytes.is_empty() {
35 break;
36 }
37
38 let position;
39
40 loop {
41 let b = &bytes[..(*chunk_size).min(bytes.len())];
42 let (count, position_) = cl.count(b);
43
44 let (count, position_) = if b.len() == bytes.len() {
45 (if count != 0 { count } else { 1 }, b.len())
46 } else {
47 (
48 count,
49 if position_ < b.len() {
50 1 + position_
52 } else {
53 position_
54 },
55 )
56 };
57
58 if count == 0 {
59 *chunk_size *= 2;
60 continue;
61 }
62
63 position = position_;
64 break;
65 }
66
67 offsets.push_back((*last_pos, *last_pos + position));
68 *last_pos += position;
69 }
70}
71
72struct ChunkOffsetIter<'a> {
73 bytes: &'a [u8],
74 offsets: VecDeque<(usize, usize)>,
75 last_offset: usize,
76 n_chunks: usize,
77 chunk_size: usize,
78 #[allow(unused)]
80 rows_per_batch: usize,
81 quote_char: Option<u8>,
82 eol_char: u8,
83}
84
85impl Iterator for ChunkOffsetIter<'_> {
86 type Item = (usize, usize);
87
88 fn next(&mut self) -> Option<Self::Item> {
89 match self.offsets.pop_front() {
90 Some(offsets) => Some(offsets),
91 None => {
92 if self.last_offset == self.bytes.len() {
93 return None;
94 }
95 get_file_chunks_iterator(
96 &mut self.offsets,
97 &mut self.last_offset,
98 self.n_chunks,
99 &mut self.chunk_size,
100 self.bytes,
101 self.quote_char,
102 self.eol_char,
103 );
104 match self.offsets.pop_front() {
105 Some(offsets) => Some(offsets),
106 None => {
108 let out = Some((self.last_offset, self.bytes.len()));
109 self.last_offset = self.bytes.len();
110 out
111 },
112 }
113 },
114 }
115 }
116}
117
118impl<'a> CoreReader<'a> {
119 pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {
121 let reader_bytes = self.reader_bytes.take().unwrap();
122 let bytes = reader_bytes.as_ref();
123 let (bytes, starting_point_offset) = self.find_starting_point(
124 bytes,
125 self.parse_options.quote_char,
126 self.parse_options.eol_char,
127 )?;
128
129 let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
130
131 let n_parts_hint = n_threads * 16;
133 let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
134
135 #[cfg(debug_assertions)]
137 let min_chunk_size = 64;
138 #[cfg(not(debug_assertions))]
139 let min_chunk_size = 1024 * 4;
140
141 let chunk_size = std::cmp::max(chunk_size, min_chunk_size);
142
143 let offset_batch_size = 16;
147 let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };
150 let file_chunks = ChunkOffsetIter {
151 bytes,
152 offsets: VecDeque::with_capacity(offset_batch_size),
153 last_offset: 0,
154 n_chunks: offset_batch_size,
155 chunk_size,
156 rows_per_batch: self.chunk_size,
157 quote_char: self.parse_options.quote_char,
158 eol_char: self.parse_options.eol_char,
159 };
160
161 let projection = self.get_projection()?;
162
163 #[cfg(feature = "dtype-categorical")]
165 let _cat_lock = if self.has_categorical {
166 Some(polars_core::StringCacheHolder::hold())
167 } else {
168 None
169 };
170
171 #[cfg(not(feature = "dtype-categorical"))]
172 let _cat_lock = None;
173
174 Ok(BatchedCsvReader {
175 reader_bytes,
176 parse_options: self.parse_options,
177 chunk_size: self.chunk_size,
178 file_chunks_iter: file_chunks,
179 file_chunks: vec![],
180 projection,
181 starting_point_offset,
182 row_index: self.row_index,
183 null_values: self.null_values,
184 to_cast: self.to_cast,
185 ignore_errors: self.ignore_errors,
186 remaining: self.n_rows.unwrap_or(usize::MAX),
187 schema: self.schema,
188 rows_read: 0,
189 _cat_lock,
190 })
191 }
192}
193
194pub struct BatchedCsvReader<'a> {
195 reader_bytes: ReaderBytes<'a>,
196 parse_options: CsvParseOptions,
197 chunk_size: usize,
198 file_chunks_iter: ChunkOffsetIter<'a>,
199 file_chunks: Vec<(usize, usize)>,
200 projection: Vec<usize>,
201 starting_point_offset: Option<usize>,
202 row_index: Option<RowIndex>,
203 null_values: Option<NullValuesCompiled>,
204 to_cast: Vec<Field>,
205 ignore_errors: bool,
206 remaining: usize,
207 schema: SchemaRef,
208 rows_read: IdxSize,
209 #[cfg(feature = "dtype-categorical")]
210 _cat_lock: Option<polars_core::StringCacheHolder>,
211 #[cfg(not(feature = "dtype-categorical"))]
212 _cat_lock: Option<u8>,
213}
214
215impl BatchedCsvReader<'_> {
216 pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
217 if n == 0 || self.remaining == 0 {
218 return Ok(None);
219 }
220
221 let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
223 self.file_chunks.extend(file_chunks_iter);
224 if self.file_chunks.is_empty() {
226 return Ok(None);
227 }
228 let chunks = &self.file_chunks;
229
230 let mut bytes = self.reader_bytes.deref();
231 if let Some(pos) = self.starting_point_offset {
232 bytes = &bytes[pos..];
233 }
234
235 let mut chunks = POOL.install(|| {
236 chunks
237 .into_par_iter()
238 .copied()
239 .map(|(bytes_offset_thread, stop_at_nbytes)| {
240 let mut df = read_chunk(
241 bytes,
242 &self.parse_options,
243 self.schema.as_ref(),
244 self.ignore_errors,
245 &self.projection,
246 bytes_offset_thread,
247 self.chunk_size,
248 self.null_values.as_ref(),
249 usize::MAX,
250 stop_at_nbytes,
251 self.starting_point_offset,
252 )?;
253
254 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
255
256 if let Some(rc) = &self.row_index {
257 unsafe { df.with_row_index_mut(rc.name.clone(), Some(rc.offset)) };
258 }
259 Ok(df)
260 })
261 .collect::<PolarsResult<Vec<_>>>()
262 })?;
263 self.file_chunks.clear();
264
265 if self.row_index.is_some() {
266 update_row_counts2(&mut chunks, self.rows_read)
267 }
268 for df in &mut chunks {
269 let h = df.height();
270
271 if self.remaining < h {
272 *df = df.slice(0, self.remaining)
273 };
274 self.remaining = self.remaining.saturating_sub(h);
275
276 self.rows_read += h as IdxSize;
277 }
278 Ok(Some(chunks))
279 }
280}
281
282pub struct OwnedBatchedCsvReader {
283 #[allow(dead_code)]
284 schema: SchemaRef,
286 batched_reader: BatchedCsvReader<'static>,
287 _reader: CsvReader<Box<dyn MmapBytesReader>>,
289}
290
291impl OwnedBatchedCsvReader {
292 pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
293 self.batched_reader.next_batches(n)
294 }
295}
296
297pub fn to_batched_owned(
298 mut reader: CsvReader<Box<dyn MmapBytesReader>>,
299) -> PolarsResult<OwnedBatchedCsvReader> {
300 let batched_reader = reader.batched_borrowed()?;
301 let schema = batched_reader.schema.clone();
302 let batched_reader: BatchedCsvReader<'static> = unsafe { std::mem::transmute(batched_reader) };
306
307 Ok(OwnedBatchedCsvReader {
308 schema,
309 batched_reader,
310 _reader: reader,
311 })
312}