polars_io/parquet/read/
mmap.rsuse arrow::array::Array;
use arrow::datatypes::Field;
#[cfg(feature = "async")]
use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::read::{
column_iter_to_arrays, BasicDecompressor, ColumnChunkMetadata, Filter, PageReader,
};
use polars_utils::mmap::{MemReader, MemSlice};
pub enum ColumnStore {
Local(MemSlice),
#[cfg(feature = "async")]
Fetched(PlHashMap<u64, Bytes>),
}
pub(super) fn mmap_columns<'a>(
store: &'a ColumnStore,
field_columns: &'a [&ColumnChunkMetadata],
) -> Vec<(&'a ColumnChunkMetadata, MemSlice)> {
field_columns
.iter()
.map(|meta| _mmap_single_column(store, meta))
.collect()
}
fn _mmap_single_column<'a>(
store: &'a ColumnStore,
meta: &'a ColumnChunkMetadata,
) -> (&'a ColumnChunkMetadata, MemSlice) {
let byte_range = meta.byte_range();
let chunk = match store {
ColumnStore::Local(mem_slice) => {
mem_slice.slice(byte_range.start as usize..byte_range.end as usize)
},
#[cfg(all(feature = "async", feature = "parquet"))]
ColumnStore::Fetched(fetched) => {
let entry = fetched.get(&byte_range.start).unwrap_or_else(|| {
panic!(
"mmap_columns: column with start {} must be prefetched in ColumnStore.\n",
byte_range.start
)
});
MemSlice::from_bytes(entry.clone())
},
};
(meta, chunk)
}
pub fn to_deserializer(
columns: Vec<(&ColumnChunkMetadata, MemSlice)>,
field: Field,
filter: Option<Filter>,
) -> PolarsResult<Box<dyn Array>> {
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
chunk.prefetch();
let pages = PageReader::new(MemReader::new(chunk), column_meta, vec![], usize::MAX);
(
BasicDecompressor::new(pages, vec![]),
&column_meta.descriptor().descriptor.primitive_type,
)
})
.unzip();
column_iter_to_arrays(columns, types, field, filter)
}