polars_io/csv/read/read_impl/
batched.rs1use std::collections::VecDeque;
2use std::ops::Deref;
3
4use polars_core::POOL;
5use polars_core::datatypes::Field;
6use polars_core::frame::DataFrame;
7use polars_core::schema::SchemaRef;
8use polars_error::PolarsResult;
9use polars_utils::IdxSize;
10use rayon::iter::{IntoParallelIterator, ParallelIterator};
11
12use super::{CoreReader, CountLines, cast_columns, read_chunk};
13use crate::RowIndex;
14use crate::csv::read::CsvReader;
15use crate::csv::read::options::NullValuesCompiled;
16use crate::mmap::{MmapBytesReader, ReaderBytes};
17use crate::prelude::{CsvParseOptions, update_row_counts2};
18
19#[allow(clippy::too_many_arguments)]
20pub(crate) fn get_file_chunks_iterator(
21 offsets: &mut VecDeque<(usize, usize)>,
22 last_pos: &mut usize,
23 n_chunks: usize,
24 chunk_size: &mut usize,
25 bytes: &[u8],
26 quote_char: Option<u8>,
27 eol_char: u8,
28) {
29 let cl = CountLines::new(quote_char, eol_char);
30
31 for _ in 0..n_chunks {
32 let bytes = &bytes[*last_pos..];
33
34 if bytes.is_empty() {
35 break;
36 }
37
38 let position;
39
40 loop {
41 let b = &bytes[..(*chunk_size).min(bytes.len())];
42 let (count, position_) = cl.count(b);
43
44 let (count, position_) = if b.len() == bytes.len() {
45 (if count != 0 { count } else { 1 }, b.len())
46 } else {
47 (
48 count,
49 if position_ < b.len() {
50 1 + position_
52 } else {
53 position_
54 },
55 )
56 };
57
58 if count == 0 {
59 *chunk_size *= 2;
60 continue;
61 }
62
63 position = position_;
64 break;
65 }
66
67 offsets.push_back((*last_pos, *last_pos + position));
68 *last_pos += position;
69 }
70}
71
72struct ChunkOffsetIter<'a> {
73 bytes: &'a [u8],
74 offsets: VecDeque<(usize, usize)>,
75 last_offset: usize,
76 n_chunks: usize,
77 chunk_size: usize,
78 #[allow(unused)]
80 rows_per_batch: usize,
81 quote_char: Option<u8>,
82 eol_char: u8,
83}
84
85impl Iterator for ChunkOffsetIter<'_> {
86 type Item = (usize, usize);
87
88 fn next(&mut self) -> Option<Self::Item> {
89 match self.offsets.pop_front() {
90 Some(offsets) => Some(offsets),
91 None => {
92 if self.last_offset == self.bytes.len() {
93 return None;
94 }
95 get_file_chunks_iterator(
96 &mut self.offsets,
97 &mut self.last_offset,
98 self.n_chunks,
99 &mut self.chunk_size,
100 self.bytes,
101 self.quote_char,
102 self.eol_char,
103 );
104 match self.offsets.pop_front() {
105 Some(offsets) => Some(offsets),
106 None => {
108 let out = Some((self.last_offset, self.bytes.len()));
109 self.last_offset = self.bytes.len();
110 out
111 },
112 }
113 },
114 }
115 }
116}
117
118impl<'a> CoreReader<'a> {
119 pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {
121 let reader_bytes = self.reader_bytes.take().unwrap();
122 let bytes = reader_bytes.as_ref();
123 let (bytes, starting_point_offset) = self.find_starting_point(
124 bytes,
125 self.parse_options.quote_char,
126 self.parse_options.eol_char,
127 )?;
128
129 let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
130
131 let n_parts_hint = n_threads * 16;
133 let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
134
135 #[cfg(debug_assertions)]
137 let min_chunk_size = 64;
138 #[cfg(not(debug_assertions))]
139 let min_chunk_size = 1024 * 4;
140
141 let chunk_size = std::cmp::max(chunk_size, min_chunk_size);
142
143 let offset_batch_size = 16;
147 let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };
150 let file_chunks = ChunkOffsetIter {
151 bytes,
152 offsets: VecDeque::with_capacity(offset_batch_size),
153 last_offset: 0,
154 n_chunks: offset_batch_size,
155 chunk_size,
156 rows_per_batch: self.chunk_size,
157 quote_char: self.parse_options.quote_char,
158 eol_char: self.parse_options.eol_char,
159 };
160
161 let projection = self.get_projection()?;
162
163 Ok(BatchedCsvReader {
164 reader_bytes,
165 parse_options: self.parse_options,
166 chunk_size: self.chunk_size,
167 file_chunks_iter: file_chunks,
168 file_chunks: vec![],
169 projection,
170 starting_point_offset,
171 row_index: self.row_index,
172 null_values: self.null_values,
173 to_cast: self.to_cast,
174 ignore_errors: self.ignore_errors,
175 remaining: self.n_rows.unwrap_or(usize::MAX),
176 schema: self.schema,
177 rows_read: 0,
178 })
179 }
180}
181
182pub struct BatchedCsvReader<'a> {
183 reader_bytes: ReaderBytes<'a>,
184 parse_options: CsvParseOptions,
185 chunk_size: usize,
186 file_chunks_iter: ChunkOffsetIter<'a>,
187 file_chunks: Vec<(usize, usize)>,
188 projection: Vec<usize>,
189 starting_point_offset: Option<usize>,
190 row_index: Option<RowIndex>,
191 null_values: Option<NullValuesCompiled>,
192 to_cast: Vec<Field>,
193 ignore_errors: bool,
194 remaining: usize,
195 schema: SchemaRef,
196 rows_read: IdxSize,
197}
198
199impl BatchedCsvReader<'_> {
200 pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
201 if n == 0 || self.remaining == 0 {
202 return Ok(None);
203 }
204
205 let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
207 self.file_chunks.extend(file_chunks_iter);
208 if self.file_chunks.is_empty() {
210 return Ok(None);
211 }
212 let chunks = &self.file_chunks;
213
214 let mut bytes = self.reader_bytes.deref();
215 if let Some(pos) = self.starting_point_offset {
216 bytes = &bytes[pos..];
217 }
218
219 let mut chunks = POOL.install(|| {
220 chunks
221 .into_par_iter()
222 .copied()
223 .map(|(bytes_offset_thread, stop_at_nbytes)| {
224 let mut df = read_chunk(
225 bytes,
226 &self.parse_options,
227 self.schema.as_ref(),
228 self.ignore_errors,
229 &self.projection,
230 bytes_offset_thread,
231 self.chunk_size,
232 self.null_values.as_ref(),
233 usize::MAX,
234 stop_at_nbytes,
235 self.starting_point_offset,
236 )?;
237
238 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
239
240 if let Some(rc) = &self.row_index {
241 unsafe { df.with_row_index_mut(rc.name.clone(), Some(rc.offset)) };
242 }
243 Ok(df)
244 })
245 .collect::<PolarsResult<Vec<_>>>()
246 })?;
247 self.file_chunks.clear();
248
249 if self.row_index.is_some() {
250 update_row_counts2(&mut chunks, self.rows_read)
251 }
252 for df in &mut chunks {
253 let h = df.height();
254
255 if self.remaining < h {
256 *df = df.slice(0, self.remaining)
257 };
258 self.remaining = self.remaining.saturating_sub(h);
259
260 self.rows_read += h as IdxSize;
261 }
262 Ok(Some(chunks))
263 }
264}
265
266pub struct OwnedBatchedCsvReader {
267 #[allow(dead_code)]
268 schema: SchemaRef,
270 batched_reader: BatchedCsvReader<'static>,
271 _reader: CsvReader<Box<dyn MmapBytesReader>>,
273}
274
275impl OwnedBatchedCsvReader {
276 pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
277 self.batched_reader.next_batches(n)
278 }
279}
280
281pub fn to_batched_owned(
282 mut reader: CsvReader<Box<dyn MmapBytesReader>>,
283) -> PolarsResult<OwnedBatchedCsvReader> {
284 let batched_reader = reader.batched_borrowed()?;
285 let schema = batched_reader.schema.clone();
286 let batched_reader: BatchedCsvReader<'static> = unsafe { std::mem::transmute(batched_reader) };
290
291 Ok(OwnedBatchedCsvReader {
292 schema,
293 batched_reader,
294 _reader: reader,
295 })
296}