polars_io/csv/read/read_impl/
batched.rs

1use std::collections::VecDeque;
2use std::ops::Deref;
3
4use polars_core::POOL;
5use polars_core::datatypes::Field;
6use polars_core::frame::DataFrame;
7use polars_core::schema::SchemaRef;
8use polars_error::PolarsResult;
9use polars_utils::IdxSize;
10use rayon::iter::{IntoParallelIterator, ParallelIterator};
11
12use super::{CoreReader, CountLines, cast_columns, read_chunk};
13use crate::RowIndex;
14use crate::csv::read::CsvReader;
15use crate::csv::read::options::NullValuesCompiled;
16use crate::mmap::{MmapBytesReader, ReaderBytes};
17use crate::prelude::{CsvParseOptions, update_row_counts2};
18
19#[allow(clippy::too_many_arguments)]
20pub(crate) fn get_file_chunks_iterator(
21    offsets: &mut VecDeque<(usize, usize)>,
22    last_pos: &mut usize,
23    n_chunks: usize,
24    chunk_size: &mut usize,
25    bytes: &[u8],
26    quote_char: Option<u8>,
27    eol_char: u8,
28) {
29    let cl = CountLines::new(quote_char, eol_char);
30
31    for _ in 0..n_chunks {
32        let bytes = &bytes[*last_pos..];
33
34        if bytes.is_empty() {
35            break;
36        }
37
38        let position;
39
40        loop {
41            let b = &bytes[..(*chunk_size).min(bytes.len())];
42            let (count, position_) = cl.count(b);
43
44            let (count, position_) = if b.len() == bytes.len() {
45                (if count != 0 { count } else { 1 }, b.len())
46            } else {
47                (
48                    count,
49                    if position_ < b.len() {
50                        // 1+ for the '\n'
51                        1 + position_
52                    } else {
53                        position_
54                    },
55                )
56            };
57
58            if count == 0 {
59                *chunk_size *= 2;
60                continue;
61            }
62
63            position = position_;
64            break;
65        }
66
67        offsets.push_back((*last_pos, *last_pos + position));
68        *last_pos += position;
69    }
70}
71
72struct ChunkOffsetIter<'a> {
73    bytes: &'a [u8],
74    offsets: VecDeque<(usize, usize)>,
75    last_offset: usize,
76    n_chunks: usize,
77    chunk_size: usize,
78    // not a promise, but something we want
79    #[allow(unused)]
80    rows_per_batch: usize,
81    quote_char: Option<u8>,
82    eol_char: u8,
83}
84
85impl Iterator for ChunkOffsetIter<'_> {
86    type Item = (usize, usize);
87
88    fn next(&mut self) -> Option<Self::Item> {
89        match self.offsets.pop_front() {
90            Some(offsets) => Some(offsets),
91            None => {
92                if self.last_offset == self.bytes.len() {
93                    return None;
94                }
95                get_file_chunks_iterator(
96                    &mut self.offsets,
97                    &mut self.last_offset,
98                    self.n_chunks,
99                    &mut self.chunk_size,
100                    self.bytes,
101                    self.quote_char,
102                    self.eol_char,
103                );
104                match self.offsets.pop_front() {
105                    Some(offsets) => Some(offsets),
106                    // We depleted the iterator. Ensure we deplete the slice as well
107                    None => {
108                        let out = Some((self.last_offset, self.bytes.len()));
109                        self.last_offset = self.bytes.len();
110                        out
111                    },
112                }
113            },
114        }
115    }
116}
117
118impl<'a> CoreReader<'a> {
119    /// Create a batched csv reader that uses mmap to load data.
120    pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {
121        let reader_bytes = self.reader_bytes.take().unwrap();
122        let bytes = reader_bytes.as_ref();
123        let (bytes, starting_point_offset) = self.find_starting_point(
124            bytes,
125            self.parse_options.quote_char,
126            self.parse_options.eol_char,
127        )?;
128
129        let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
130
131        // Copied from [`Self::parse_csv`]
132        let n_parts_hint = n_threads * 16;
133        let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
134
135        // Use a small min chunk size to catch failures in tests.
136        #[cfg(debug_assertions)]
137        let min_chunk_size = 64;
138        #[cfg(not(debug_assertions))]
139        let min_chunk_size = 1024 * 4;
140
141        let chunk_size = std::cmp::max(chunk_size, min_chunk_size);
142
143        // this is arbitrarily chosen.
144        // we don't want this to depend on the thread pool size
145        // otherwise the chunks are not deterministic
146        let offset_batch_size = 16;
147        // extend lifetime. It is bound to `readerbytes` and we keep track of that
148        // lifetime so this is sound.
149        let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };
150        let file_chunks = ChunkOffsetIter {
151            bytes,
152            offsets: VecDeque::with_capacity(offset_batch_size),
153            last_offset: 0,
154            n_chunks: offset_batch_size,
155            chunk_size,
156            rows_per_batch: self.chunk_size,
157            quote_char: self.parse_options.quote_char,
158            eol_char: self.parse_options.eol_char,
159        };
160
161        let projection = self.get_projection()?;
162
163        Ok(BatchedCsvReader {
164            reader_bytes,
165            parse_options: self.parse_options,
166            chunk_size: self.chunk_size,
167            file_chunks_iter: file_chunks,
168            file_chunks: vec![],
169            projection,
170            starting_point_offset,
171            row_index: self.row_index,
172            null_values: self.null_values,
173            to_cast: self.to_cast,
174            ignore_errors: self.ignore_errors,
175            remaining: self.n_rows.unwrap_or(usize::MAX),
176            schema: self.schema,
177            rows_read: 0,
178        })
179    }
180}
181
182pub struct BatchedCsvReader<'a> {
183    reader_bytes: ReaderBytes<'a>,
184    parse_options: CsvParseOptions,
185    chunk_size: usize,
186    file_chunks_iter: ChunkOffsetIter<'a>,
187    file_chunks: Vec<(usize, usize)>,
188    projection: Vec<usize>,
189    starting_point_offset: Option<usize>,
190    row_index: Option<RowIndex>,
191    null_values: Option<NullValuesCompiled>,
192    to_cast: Vec<Field>,
193    ignore_errors: bool,
194    remaining: usize,
195    schema: SchemaRef,
196    rows_read: IdxSize,
197}
198
199impl BatchedCsvReader<'_> {
200    pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
201        if n == 0 || self.remaining == 0 {
202            return Ok(None);
203        }
204
205        // get next `n` offset positions.
206        let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
207        self.file_chunks.extend(file_chunks_iter);
208        // depleted the offsets iterator, we are done as well.
209        if self.file_chunks.is_empty() {
210            return Ok(None);
211        }
212        let chunks = &self.file_chunks;
213
214        let mut bytes = self.reader_bytes.deref();
215        if let Some(pos) = self.starting_point_offset {
216            bytes = &bytes[pos..];
217        }
218
219        let mut chunks = POOL.install(|| {
220            chunks
221                .into_par_iter()
222                .copied()
223                .map(|(bytes_offset_thread, stop_at_nbytes)| {
224                    let mut df = read_chunk(
225                        bytes,
226                        &self.parse_options,
227                        self.schema.as_ref(),
228                        self.ignore_errors,
229                        &self.projection,
230                        bytes_offset_thread,
231                        self.chunk_size,
232                        self.null_values.as_ref(),
233                        usize::MAX,
234                        stop_at_nbytes,
235                        self.starting_point_offset,
236                    )?;
237
238                    cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
239
240                    if let Some(rc) = &self.row_index {
241                        unsafe { df.with_row_index_mut(rc.name.clone(), Some(rc.offset)) };
242                    }
243                    Ok(df)
244                })
245                .collect::<PolarsResult<Vec<_>>>()
246        })?;
247        self.file_chunks.clear();
248
249        if self.row_index.is_some() {
250            update_row_counts2(&mut chunks, self.rows_read)
251        }
252        for df in &mut chunks {
253            let h = df.height();
254
255            if self.remaining < h {
256                *df = df.slice(0, self.remaining)
257            };
258            self.remaining = self.remaining.saturating_sub(h);
259
260            self.rows_read += h as IdxSize;
261        }
262        Ok(Some(chunks))
263    }
264}
265
266pub struct OwnedBatchedCsvReader {
267    #[allow(dead_code)]
268    // this exist because we need to keep ownership
269    schema: SchemaRef,
270    batched_reader: BatchedCsvReader<'static>,
271    // keep ownership
272    _reader: CsvReader<Box<dyn MmapBytesReader>>,
273}
274
275impl OwnedBatchedCsvReader {
276    pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
277        self.batched_reader.next_batches(n)
278    }
279}
280
281pub fn to_batched_owned(
282    mut reader: CsvReader<Box<dyn MmapBytesReader>>,
283) -> PolarsResult<OwnedBatchedCsvReader> {
284    let batched_reader = reader.batched_borrowed()?;
285    let schema = batched_reader.schema.clone();
286    // If you put a drop(reader) here, rust will complain that reader is borrowed,
287    // so we presumably have to keep ownership of it to maintain the safety of the
288    // 'static transmute.
289    let batched_reader: BatchedCsvReader<'static> = unsafe { std::mem::transmute(batched_reader) };
290
291    Ok(OwnedBatchedCsvReader {
292        schema,
293        batched_reader,
294        _reader: reader,
295    })
296}