polars_io/parquet/read/
mmap.rs

1use arrow::array::Array;
2use arrow::bitmap::Bitmap;
3use arrow::datatypes::Field;
4#[cfg(feature = "async")]
5use bytes::Bytes;
6#[cfg(feature = "async")]
7use polars_core::datatypes::PlHashMap;
8use polars_error::PolarsResult;
9use polars_parquet::read::{
10    BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,
11};
12use polars_utils::mmap::{MemReader, MemSlice};
13
14/// Store columns data in two scenarios:
15/// 1. a local memory mapped file
16/// 2. data fetched from cloud storage on demand, in this case
17///    a. the key in the hashmap is the start in the file
18///    b. the value in the hashmap is the actual data.
19///
20/// For the fetched case we use a two phase approach:
21///   a. identify all the needed columns
22///   b. asynchronously fetch them in parallel, for example using object_store
23///   c. store the data in this data structure
24///   d. when all the data is available deserialize on multiple threads, for example using rayon
25pub enum ColumnStore {
26    Local(MemSlice),
27    #[cfg(feature = "async")]
28    Fetched(PlHashMap<u64, Bytes>),
29}
30
31/// For local files memory maps all columns that are part of the parquet field `field_name`.
32/// For cloud files the relevant memory regions should have been prefetched.
33pub(super) fn mmap_columns<'a>(
34    store: &'a ColumnStore,
35    field_columns: &'a [&ColumnChunkMetadata],
36) -> Vec<(&'a ColumnChunkMetadata, MemSlice)> {
37    field_columns
38        .iter()
39        .map(|meta| _mmap_single_column(store, meta))
40        .collect()
41}
42
43fn _mmap_single_column<'a>(
44    store: &'a ColumnStore,
45    meta: &'a ColumnChunkMetadata,
46) -> (&'a ColumnChunkMetadata, MemSlice) {
47    let byte_range = meta.byte_range();
48    let chunk = match store {
49        ColumnStore::Local(mem_slice) => {
50            mem_slice.slice(byte_range.start as usize..byte_range.end as usize)
51        },
52        #[cfg(all(feature = "async", feature = "parquet"))]
53        ColumnStore::Fetched(fetched) => {
54            let entry = fetched.get(&byte_range.start).unwrap_or_else(|| {
55                panic!(
56                    "mmap_columns: column with start {} must be prefetched in ColumnStore.\n",
57                    byte_range.start
58                )
59            });
60            MemSlice::from_bytes(entry.clone())
61        },
62    };
63    (meta, chunk)
64}
65
66// similar to arrow2 serializer, except this accepts a slice instead of a vec.
67// this allows us to memory map
68pub fn to_deserializer(
69    columns: Vec<(&ColumnChunkMetadata, MemSlice)>,
70    field: Field,
71    filter: Option<Filter>,
72) -> PolarsResult<(Box<dyn Array>, Bitmap)> {
73    let (columns, types): (Vec<_>, Vec<_>) = columns
74        .into_iter()
75        .map(|(column_meta, chunk)| {
76            // Advise fetching the data for the column chunk
77            chunk.prefetch();
78
79            let pages = PageReader::new(MemReader::new(chunk), column_meta, vec![], usize::MAX);
80            (
81                BasicDecompressor::new(pages, vec![]),
82                &column_meta.descriptor().descriptor.primitive_type,
83            )
84        })
85        .unzip();
86
87    column_iter_to_arrays(columns, types, field, filter)
88}