polars_io/parquet/read/
mmap.rs

1use arrow::array::Array;
2use arrow::bitmap::Bitmap;
3use arrow::datatypes::Field;
4use polars_error::PolarsResult;
5use polars_parquet::read::{
6    BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,
7};
8use polars_utils::mmap::{MemReader, MemSlice};
9
10/// Store columns data in two scenarios:
11/// 1. a local memory mapped file
12/// 2. data fetched from cloud storage on demand, in this case
13///    a. the key in the hashmap is the start in the file
14///    b. the value in the hashmap is the actual data.
15///
16/// For the fetched case we use a two phase approach:
17///   a. identify all the needed columns
18///   b. asynchronously fetch them in parallel, for example using object_store
19///   c. store the data in this data structure
20///   d. when all the data is available deserialize on multiple threads, for example using rayon
21pub enum ColumnStore {
22    Local(MemSlice),
23}
24
25/// For local files memory maps all columns that are part of the parquet field `field_name`.
26/// For cloud files the relevant memory regions should have been prefetched.
27pub(super) fn mmap_columns<'a>(
28    store: &'a ColumnStore,
29    field_columns: &'a [&ColumnChunkMetadata],
30) -> Vec<(&'a ColumnChunkMetadata, MemSlice)> {
31    field_columns
32        .iter()
33        .map(|meta| _mmap_single_column(store, meta))
34        .collect()
35}
36
37fn _mmap_single_column<'a>(
38    store: &'a ColumnStore,
39    meta: &'a ColumnChunkMetadata,
40) -> (&'a ColumnChunkMetadata, MemSlice) {
41    let byte_range = meta.byte_range();
42    let chunk = match store {
43        ColumnStore::Local(mem_slice) => {
44            mem_slice.slice(byte_range.start as usize..byte_range.end as usize)
45        },
46    };
47    (meta, chunk)
48}
49
50// similar to arrow2 serializer, except this accepts a slice instead of a vec.
51// this allows us to memory map
52pub fn to_deserializer(
53    columns: Vec<(&ColumnChunkMetadata, MemSlice)>,
54    field: Field,
55    filter: Option<Filter>,
56) -> PolarsResult<(Vec<Box<dyn Array>>, Bitmap)> {
57    let (columns, types): (Vec<_>, Vec<_>) = columns
58        .into_iter()
59        .map(|(column_meta, chunk)| {
60            // Advise fetching the data for the column chunk
61            chunk.prefetch();
62
63            let pages = PageReader::new(MemReader::new(chunk), column_meta, vec![], usize::MAX);
64            (
65                BasicDecompressor::new(pages, vec![]),
66                &column_meta.descriptor().descriptor.primitive_type,
67            )
68        })
69        .unzip();
70
71    column_iter_to_arrays(columns, types, field, filter)
72}