polars_io/parquet/read/
mmap.rs

1use std::io::Cursor;
2
3use arrow::array::Array;
4use arrow::bitmap::Bitmap;
5use arrow::datatypes::Field;
6use polars_buffer::Buffer;
7use polars_error::PolarsResult;
8use polars_parquet::read::{
9    BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,
10};
11use polars_utils::mem::prefetch::prefetch_l2;
12
13/// Store columns data in two scenarios:
14/// 1. a local memory mapped file
15/// 2. data fetched from cloud storage on demand, in this case
16///    a. the key in the hashmap is the start in the file
17///    b. the value in the hashmap is the actual data.
18///
19/// For the fetched case we use a two phase approach:
20///   a. identify all the needed columns
21///   b. asynchronously fetch them in parallel, for example using object_store
22///   c. store the data in this data structure
23///   d. when all the data is available deserialize on multiple threads, for example using rayon
24pub enum ColumnStore {
25    Local(Buffer<u8>),
26}
27
28/// For local files memory maps all columns that are part of the parquet field `field_name`.
29/// For cloud files the relevant memory regions should have been prefetched.
30pub(super) fn mmap_columns<'a>(
31    store: &'a ColumnStore,
32    field_columns: &'a [&ColumnChunkMetadata],
33) -> Vec<(&'a ColumnChunkMetadata, Buffer<u8>)> {
34    field_columns
35        .iter()
36        .map(|meta| _mmap_single_column(store, meta))
37        .collect()
38}
39
40fn _mmap_single_column<'a>(
41    store: &'a ColumnStore,
42    meta: &'a ColumnChunkMetadata,
43) -> (&'a ColumnChunkMetadata, Buffer<u8>) {
44    let byte_range = meta.byte_range();
45    let chunk = match store {
46        ColumnStore::Local(mem_slice) => mem_slice
47            .clone()
48            .sliced(byte_range.start as usize..byte_range.end as usize),
49    };
50    (meta, chunk)
51}
52
53// similar to arrow2 serializer, except this accepts a slice instead of a vec.
54// this allows us to memory map
55pub fn to_deserializer(
56    columns: Vec<(&ColumnChunkMetadata, Buffer<u8>)>,
57    field: Field,
58    filter: Option<Filter>,
59) -> PolarsResult<(Vec<Box<dyn Array>>, Bitmap)> {
60    let (columns, types): (Vec<_>, Vec<_>) = columns
61        .into_iter()
62        .map(|(column_meta, chunk)| {
63            // Advise fetching the data for the column chunk
64            prefetch_l2(&chunk);
65
66            let pages = PageReader::new(Cursor::new(chunk), column_meta, vec![], usize::MAX);
67            (
68                BasicDecompressor::new(pages, vec![]),
69                &column_meta.descriptor().descriptor.primitive_type,
70            )
71        })
72        .unzip();
73
74    column_iter_to_arrays(columns, types, field, filter)
75}