polars_io/ipc/
mmap.rs

1use arrow::io::ipc::read;
2use arrow::io::ipc::read::{Dictionaries, FileMetadata};
3use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
4use arrow::record_batch::RecordBatch;
5use polars_core::prelude::*;
6use polars_utils::mmap::MMapSemaphore;
7
8use super::ipc_file::IpcReader;
9use crate::mmap::MmapBytesReader;
10use crate::predicates::PhysicalIoExpr;
11use crate::shared::{ArrowReader, finish_reader};
12use crate::utils::{apply_projection, columns_to_projection};
13
14impl<R: MmapBytesReader> IpcReader<R> {
15    pub(super) fn finish_memmapped(
16        &mut self,
17        predicate: Option<Arc<dyn PhysicalIoExpr>>,
18    ) -> PolarsResult<DataFrame> {
19        match self.reader.to_file() {
20            Some(file) => {
21                let semaphore = MMapSemaphore::new_from_file(file)?;
22                let metadata =
23                    read::read_file_metadata(&mut std::io::Cursor::new(semaphore.as_ref()))?;
24
25                if let Some(columns) = &self.columns {
26                    let schema = &metadata.schema;
27                    let prj = columns_to_projection(columns, schema)?;
28                    self.projection = Some(prj);
29                }
30
31                let schema = if let Some(projection) = &self.projection {
32                    Arc::new(apply_projection(&metadata.schema, projection))
33                } else {
34                    metadata.schema.clone()
35                };
36
37                let reader = MMapChunkIter::new(Arc::new(semaphore), metadata, &self.projection)?;
38
39                finish_reader(
40                    reader,
41                    // don't rechunk, that would trigger a read.
42                    false,
43                    self.n_rows,
44                    predicate,
45                    &schema,
46                    self.row_index.clone(),
47                )
48            },
49            None => polars_bail!(ComputeError: "cannot memory-map, you must provide a file"),
50        }
51    }
52}
53
54struct MMapChunkIter<'a> {
55    dictionaries: Dictionaries,
56    metadata: FileMetadata,
57    mmap: Arc<MMapSemaphore>,
58    idx: usize,
59    end: usize,
60    projection: &'a Option<Vec<usize>>,
61}
62
63impl<'a> MMapChunkIter<'a> {
64    fn new(
65        mmap: Arc<MMapSemaphore>,
66        metadata: FileMetadata,
67        projection: &'a Option<Vec<usize>>,
68    ) -> PolarsResult<Self> {
69        let end = metadata.blocks.len();
70        // mmap the dictionaries
71        let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };
72
73        Ok(Self {
74            dictionaries,
75            metadata,
76            mmap,
77            idx: 0,
78            end,
79            projection,
80        })
81    }
82}
83
84impl ArrowReader for MMapChunkIter<'_> {
85    fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
86        if self.idx < self.end {
87            let chunk = unsafe {
88                mmap_unchecked(
89                    &self.metadata,
90                    &self.dictionaries,
91                    self.mmap.clone(),
92                    self.idx,
93                )
94            }?;
95            self.idx += 1;
96            let chunk = match &self.projection {
97                None => chunk,
98                Some(proj) => {
99                    let length = chunk.len();
100                    let (schema, cols) = chunk.into_schema_and_arrays();
101                    let schema = schema.try_project_indices(proj).unwrap();
102                    let arrays = proj.iter().map(|i| cols[*i].clone()).collect();
103                    RecordBatch::new(length, Arc::new(schema), arrays)
104                },
105            };
106            Ok(Some(chunk))
107        } else {
108            Ok(None)
109        }
110    }
111}