1use arrow::array::Array;
2use arrow::bitmap::Bitmap;
3use arrow::datatypes::Field;
4#[cfg(feature = "async")]
5use bytes::Bytes;
6#[cfg(feature = "async")]
7use polars_core::datatypes::PlHashMap;
8use polars_error::PolarsResult;
9use polars_parquet::read::{
10 BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,
11};
12use polars_utils::mmap::{MemReader, MemSlice};
1314/// Store columns data in two scenarios:
15/// 1. a local memory mapped file
16/// 2. data fetched from cloud storage on demand, in this case
17/// a. the key in the hashmap is the start in the file
18/// b. the value in the hashmap is the actual data.
19///
20/// For the fetched case we use a two phase approach:
21/// a. identify all the needed columns
22/// b. asynchronously fetch them in parallel, for example using object_store
23/// c. store the data in this data structure
24/// d. when all the data is available deserialize on multiple threads, for example using rayon
25pub enum ColumnStore {
26 Local(MemSlice),
27#[cfg(feature = "async")]
28Fetched(PlHashMap<u64, Bytes>),
29}
3031/// For local files memory maps all columns that are part of the parquet field `field_name`.
32/// For cloud files the relevant memory regions should have been prefetched.
33pub(super) fn mmap_columns<'a>(
34 store: &'a ColumnStore,
35 field_columns: &'a [&ColumnChunkMetadata],
36) -> Vec<(&'a ColumnChunkMetadata, MemSlice)> {
37 field_columns
38 .iter()
39 .map(|meta| _mmap_single_column(store, meta))
40 .collect()
41}
4243fn _mmap_single_column<'a>(
44 store: &'a ColumnStore,
45 meta: &'a ColumnChunkMetadata,
46) -> (&'a ColumnChunkMetadata, MemSlice) {
47let byte_range = meta.byte_range();
48let chunk = match store {
49 ColumnStore::Local(mem_slice) => {
50 mem_slice.slice(byte_range.start as usize..byte_range.end as usize)
51 },
52#[cfg(all(feature = "async", feature = "parquet"))]
53ColumnStore::Fetched(fetched) => {
54let entry = fetched.get(&byte_range.start).unwrap_or_else(|| {
55panic!(
56"mmap_columns: column with start {} must be prefetched in ColumnStore.\n",
57 byte_range.start
58 )
59 });
60 MemSlice::from_bytes(entry.clone())
61 },
62 };
63 (meta, chunk)
64}
6566// similar to arrow2 serializer, except this accepts a slice instead of a vec.
67// this allows us to memory map
68pub fn to_deserializer(
69 columns: Vec<(&ColumnChunkMetadata, MemSlice)>,
70 field: Field,
71 filter: Option<Filter>,
72) -> PolarsResult<(Box<dyn Array>, Bitmap)> {
73let (columns, types): (Vec<_>, Vec<_>) = columns
74 .into_iter()
75 .map(|(column_meta, chunk)| {
76// Advise fetching the data for the column chunk
77chunk.prefetch();
7879let pages = PageReader::new(MemReader::new(chunk), column_meta, vec![], usize::MAX);
80 (
81 BasicDecompressor::new(pages, vec![]),
82&column_meta.descriptor().descriptor.primitive_type,
83 )
84 })
85 .unzip();
8687 column_iter_to_arrays(columns, types, field, filter)
88}