polars_io/ipc/
mmap.rs
1use arrow::io::ipc::read;
2use arrow::io::ipc::read::{Dictionaries, FileMetadata};
3use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
4use arrow::record_batch::RecordBatch;
5use polars_core::prelude::*;
6use polars_utils::mmap::MMapSemaphore;
7
8use super::ipc_file::IpcReader;
9use crate::mmap::MmapBytesReader;
10use crate::predicates::PhysicalIoExpr;
11use crate::shared::{ArrowReader, finish_reader};
12use crate::utils::{apply_projection, columns_to_projection};
13
14impl<R: MmapBytesReader> IpcReader<R> {
15 pub(super) fn finish_memmapped(
16 &mut self,
17 predicate: Option<Arc<dyn PhysicalIoExpr>>,
18 ) -> PolarsResult<DataFrame> {
19 match self.reader.to_file() {
20 Some(file) => {
21 let semaphore = MMapSemaphore::new_from_file(file)?;
22 let metadata =
23 read::read_file_metadata(&mut std::io::Cursor::new(semaphore.as_ref()))?;
24
25 if let Some(columns) = &self.columns {
26 let schema = &metadata.schema;
27 let prj = columns_to_projection(columns, schema)?;
28 self.projection = Some(prj);
29 }
30
31 let schema = if let Some(projection) = &self.projection {
32 Arc::new(apply_projection(&metadata.schema, projection))
33 } else {
34 metadata.schema.clone()
35 };
36
37 let reader = MMapChunkIter::new(Arc::new(semaphore), metadata, &self.projection)?;
38
39 finish_reader(
40 reader,
41 false,
43 self.n_rows,
44 predicate,
45 &schema,
46 self.row_index.clone(),
47 )
48 },
49 None => polars_bail!(ComputeError: "cannot memory-map, you must provide a file"),
50 }
51 }
52}
53
54struct MMapChunkIter<'a> {
55 dictionaries: Dictionaries,
56 metadata: FileMetadata,
57 mmap: Arc<MMapSemaphore>,
58 idx: usize,
59 end: usize,
60 projection: &'a Option<Vec<usize>>,
61}
62
63impl<'a> MMapChunkIter<'a> {
64 fn new(
65 mmap: Arc<MMapSemaphore>,
66 metadata: FileMetadata,
67 projection: &'a Option<Vec<usize>>,
68 ) -> PolarsResult<Self> {
69 let end = metadata.blocks.len();
70 let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };
72
73 Ok(Self {
74 dictionaries,
75 metadata,
76 mmap,
77 idx: 0,
78 end,
79 projection,
80 })
81 }
82}
83
84impl ArrowReader for MMapChunkIter<'_> {
85 fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
86 if self.idx < self.end {
87 let chunk = unsafe {
88 mmap_unchecked(
89 &self.metadata,
90 &self.dictionaries,
91 self.mmap.clone(),
92 self.idx,
93 )
94 }?;
95 self.idx += 1;
96 let chunk = match &self.projection {
97 None => chunk,
98 Some(proj) => {
99 let length = chunk.len();
100 let (schema, cols) = chunk.into_schema_and_arrays();
101 let schema = schema.try_project_indices(proj).unwrap();
102 let arrays = proj.iter().map(|i| cols[*i].clone()).collect();
103 RecordBatch::new(length, Arc::new(schema), arrays)
104 },
105 };
106 Ok(Some(chunk))
107 } else {
108 Ok(None)
109 }
110 }
111}