polars_io/csv/read/read_impl/
batched.rs

1use std::collections::VecDeque;
2use std::ops::Deref;
3
4use polars_core::POOL;
5use polars_core::datatypes::Field;
6use polars_core::frame::DataFrame;
7use polars_core::schema::SchemaRef;
8use polars_error::PolarsResult;
9use polars_utils::IdxSize;
10use rayon::iter::{IntoParallelIterator, ParallelIterator};
11
12use super::{CoreReader, CountLines, cast_columns, read_chunk};
13use crate::RowIndex;
14use crate::csv::read::CsvReader;
15use crate::csv::read::options::NullValuesCompiled;
16use crate::mmap::{MmapBytesReader, ReaderBytes};
17use crate::prelude::{CsvParseOptions, update_row_counts2};
18
19#[allow(clippy::too_many_arguments)]
20pub(crate) fn get_file_chunks_iterator(
21    offsets: &mut VecDeque<(usize, usize)>,
22    last_pos: &mut usize,
23    n_chunks: usize,
24    chunk_size: &mut usize,
25    bytes: &[u8],
26    quote_char: Option<u8>,
27    eol_char: u8,
28) {
29    let cl = CountLines::new(quote_char, eol_char);
30
31    for _ in 0..n_chunks {
32        let bytes = &bytes[*last_pos..];
33
34        if bytes.is_empty() {
35            break;
36        }
37
38        let position;
39
40        loop {
41            let b = &bytes[..(*chunk_size).min(bytes.len())];
42            let (count, position_) = cl.count(b);
43
44            let (count, position_) = if b.len() == bytes.len() {
45                (if count != 0 { count } else { 1 }, b.len())
46            } else {
47                (
48                    count,
49                    if position_ < b.len() {
50                        // 1+ for the '\n'
51                        1 + position_
52                    } else {
53                        position_
54                    },
55                )
56            };
57
58            if count == 0 {
59                *chunk_size *= 2;
60                continue;
61            }
62
63            position = position_;
64            break;
65        }
66
67        offsets.push_back((*last_pos, *last_pos + position));
68        *last_pos += position;
69    }
70}
71
72struct ChunkOffsetIter<'a> {
73    bytes: &'a [u8],
74    offsets: VecDeque<(usize, usize)>,
75    last_offset: usize,
76    n_chunks: usize,
77    chunk_size: usize,
78    // not a promise, but something we want
79    #[allow(unused)]
80    rows_per_batch: usize,
81    quote_char: Option<u8>,
82    eol_char: u8,
83}
84
85impl Iterator for ChunkOffsetIter<'_> {
86    type Item = (usize, usize);
87
88    fn next(&mut self) -> Option<Self::Item> {
89        match self.offsets.pop_front() {
90            Some(offsets) => Some(offsets),
91            None => {
92                if self.last_offset == self.bytes.len() {
93                    return None;
94                }
95                get_file_chunks_iterator(
96                    &mut self.offsets,
97                    &mut self.last_offset,
98                    self.n_chunks,
99                    &mut self.chunk_size,
100                    self.bytes,
101                    self.quote_char,
102                    self.eol_char,
103                );
104                match self.offsets.pop_front() {
105                    Some(offsets) => Some(offsets),
106                    // We depleted the iterator. Ensure we deplete the slice as well
107                    None => {
108                        let out = Some((self.last_offset, self.bytes.len()));
109                        self.last_offset = self.bytes.len();
110                        out
111                    },
112                }
113            },
114        }
115    }
116}
117
118impl<'a> CoreReader<'a> {
119    /// Create a batched csv reader that uses mmap to load data.
120    pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {
121        let reader_bytes = self.reader_bytes.take().unwrap();
122        let bytes = reader_bytes.as_ref();
123        let (bytes, starting_point_offset) = self.find_starting_point(
124            bytes,
125            self.parse_options.quote_char,
126            self.parse_options.eol_char,
127        )?;
128
129        let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
130
131        // Copied from [`Self::parse_csv`]
132        let n_parts_hint = n_threads * 16;
133        let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
134
135        // Use a small min chunk size to catch failures in tests.
136        #[cfg(debug_assertions)]
137        let min_chunk_size = 64;
138        #[cfg(not(debug_assertions))]
139        let min_chunk_size = 1024 * 4;
140
141        let chunk_size = std::cmp::max(chunk_size, min_chunk_size);
142
143        // this is arbitrarily chosen.
144        // we don't want this to depend on the thread pool size
145        // otherwise the chunks are not deterministic
146        let offset_batch_size = 16;
147        // extend lifetime. It is bound to `readerbytes` and we keep track of that
148        // lifetime so this is sound.
149        let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };
150        let file_chunks = ChunkOffsetIter {
151            bytes,
152            offsets: VecDeque::with_capacity(offset_batch_size),
153            last_offset: 0,
154            n_chunks: offset_batch_size,
155            chunk_size,
156            rows_per_batch: self.chunk_size,
157            quote_char: self.parse_options.quote_char,
158            eol_char: self.parse_options.eol_char,
159        };
160
161        let projection = self.get_projection()?;
162
163        // RAII structure that will ensure we maintain a global stringcache
164        #[cfg(feature = "dtype-categorical")]
165        let _cat_lock = if self.has_categorical {
166            Some(polars_core::StringCacheHolder::hold())
167        } else {
168            None
169        };
170
171        #[cfg(not(feature = "dtype-categorical"))]
172        let _cat_lock = None;
173
174        Ok(BatchedCsvReader {
175            reader_bytes,
176            parse_options: self.parse_options,
177            chunk_size: self.chunk_size,
178            file_chunks_iter: file_chunks,
179            file_chunks: vec![],
180            projection,
181            starting_point_offset,
182            row_index: self.row_index,
183            null_values: self.null_values,
184            to_cast: self.to_cast,
185            ignore_errors: self.ignore_errors,
186            remaining: self.n_rows.unwrap_or(usize::MAX),
187            schema: self.schema,
188            rows_read: 0,
189            _cat_lock,
190        })
191    }
192}
193
194pub struct BatchedCsvReader<'a> {
195    reader_bytes: ReaderBytes<'a>,
196    parse_options: CsvParseOptions,
197    chunk_size: usize,
198    file_chunks_iter: ChunkOffsetIter<'a>,
199    file_chunks: Vec<(usize, usize)>,
200    projection: Vec<usize>,
201    starting_point_offset: Option<usize>,
202    row_index: Option<RowIndex>,
203    null_values: Option<NullValuesCompiled>,
204    to_cast: Vec<Field>,
205    ignore_errors: bool,
206    remaining: usize,
207    schema: SchemaRef,
208    rows_read: IdxSize,
209    #[cfg(feature = "dtype-categorical")]
210    _cat_lock: Option<polars_core::StringCacheHolder>,
211    #[cfg(not(feature = "dtype-categorical"))]
212    _cat_lock: Option<u8>,
213}
214
215impl BatchedCsvReader<'_> {
216    pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
217        if n == 0 || self.remaining == 0 {
218            return Ok(None);
219        }
220
221        // get next `n` offset positions.
222        let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
223        self.file_chunks.extend(file_chunks_iter);
224        // depleted the offsets iterator, we are done as well.
225        if self.file_chunks.is_empty() {
226            return Ok(None);
227        }
228        let chunks = &self.file_chunks;
229
230        let mut bytes = self.reader_bytes.deref();
231        if let Some(pos) = self.starting_point_offset {
232            bytes = &bytes[pos..];
233        }
234
235        let mut chunks = POOL.install(|| {
236            chunks
237                .into_par_iter()
238                .copied()
239                .map(|(bytes_offset_thread, stop_at_nbytes)| {
240                    let mut df = read_chunk(
241                        bytes,
242                        &self.parse_options,
243                        self.schema.as_ref(),
244                        self.ignore_errors,
245                        &self.projection,
246                        bytes_offset_thread,
247                        self.chunk_size,
248                        self.null_values.as_ref(),
249                        usize::MAX,
250                        stop_at_nbytes,
251                        self.starting_point_offset,
252                    )?;
253
254                    cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
255
256                    if let Some(rc) = &self.row_index {
257                        unsafe { df.with_row_index_mut(rc.name.clone(), Some(rc.offset)) };
258                    }
259                    Ok(df)
260                })
261                .collect::<PolarsResult<Vec<_>>>()
262        })?;
263        self.file_chunks.clear();
264
265        if self.row_index.is_some() {
266            update_row_counts2(&mut chunks, self.rows_read)
267        }
268        for df in &mut chunks {
269            let h = df.height();
270
271            if self.remaining < h {
272                *df = df.slice(0, self.remaining)
273            };
274            self.remaining = self.remaining.saturating_sub(h);
275
276            self.rows_read += h as IdxSize;
277        }
278        Ok(Some(chunks))
279    }
280}
281
282pub struct OwnedBatchedCsvReader {
283    #[allow(dead_code)]
284    // this exist because we need to keep ownership
285    schema: SchemaRef,
286    batched_reader: BatchedCsvReader<'static>,
287    // keep ownership
288    _reader: CsvReader<Box<dyn MmapBytesReader>>,
289}
290
291impl OwnedBatchedCsvReader {
292    pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
293        self.batched_reader.next_batches(n)
294    }
295}
296
297pub fn to_batched_owned(
298    mut reader: CsvReader<Box<dyn MmapBytesReader>>,
299) -> PolarsResult<OwnedBatchedCsvReader> {
300    let batched_reader = reader.batched_borrowed()?;
301    let schema = batched_reader.schema.clone();
302    // If you put a drop(reader) here, rust will complain that reader is borrowed,
303    // so we presumably have to keep ownership of it to maintain the safety of the
304    // 'static transmute.
305    let batched_reader: BatchedCsvReader<'static> = unsafe { std::mem::transmute(batched_reader) };
306
307    Ok(OwnedBatchedCsvReader {
308        schema,
309        batched_reader,
310        _reader: reader,
311    })
312}