use arrow::io::ipc::read;
use arrow::io::ipc::read::{Dictionaries, FileMetadata};
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
use polars_utils::mmap::MMapSemaphore;
use super::ipc_file::IpcReader;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::shared::{finish_reader, ArrowReader};
use crate::utils::{apply_projection, columns_to_projection};
impl<R: MmapBytesReader> IpcReader<R> {
pub(super) fn finish_memmapped(
&mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<DataFrame> {
match self.reader.to_file() {
Some(file) => {
let semaphore = MMapSemaphore::new_from_file(file)?;
let metadata =
read::read_file_metadata(&mut std::io::Cursor::new(semaphore.as_ref()))?;
if let Some(columns) = &self.columns {
let schema = &metadata.schema;
let prj = columns_to_projection(columns, schema)?;
self.projection = Some(prj);
}
let schema = if let Some(projection) = &self.projection {
Arc::new(apply_projection(&metadata.schema, projection))
} else {
metadata.schema.clone()
};
let reader = MMapChunkIter::new(Arc::new(semaphore), metadata, &self.projection)?;
finish_reader(
reader,
false,
self.n_rows,
predicate,
&schema,
self.row_index.clone(),
)
},
None => polars_bail!(ComputeError: "cannot memory-map, you must provide a file"),
}
}
}
struct MMapChunkIter<'a> {
dictionaries: Dictionaries,
metadata: FileMetadata,
mmap: Arc<MMapSemaphore>,
idx: usize,
end: usize,
projection: &'a Option<Vec<usize>>,
}
impl<'a> MMapChunkIter<'a> {
fn new(
mmap: Arc<MMapSemaphore>,
metadata: FileMetadata,
projection: &'a Option<Vec<usize>>,
) -> PolarsResult<Self> {
let end = metadata.blocks.len();
let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };
Ok(Self {
dictionaries,
metadata,
mmap,
idx: 0,
end,
projection,
})
}
}
impl ArrowReader for MMapChunkIter<'_> {
fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
if self.idx < self.end {
let chunk = unsafe {
mmap_unchecked(
&self.metadata,
&self.dictionaries,
self.mmap.clone(),
self.idx,
)
}?;
self.idx += 1;
let chunk = match &self.projection {
None => chunk,
Some(proj) => {
let length = chunk.len();
let (schema, cols) = chunk.into_schema_and_arrays();
let schema = schema.try_project_indices(proj).unwrap();
let arrays = proj.iter().map(|i| cols[*i].clone()).collect();
RecordBatch::new(length, Arc::new(schema), arrays)
},
};
Ok(Some(chunk))
} else {
Ok(None)
}
}
}