polars_io/ipc/
mmap.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use arrow::io::ipc::read;
use arrow::io::ipc::read::{Dictionaries, FileMetadata};
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
use polars_utils::mmap::MMapSemaphore;

use super::ipc_file::IpcReader;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::shared::{finish_reader, ArrowReader};
use crate::utils::{apply_projection, columns_to_projection};

impl<R: MmapBytesReader> IpcReader<R> {
    pub(super) fn finish_memmapped(
        &mut self,
        predicate: Option<Arc<dyn PhysicalIoExpr>>,
    ) -> PolarsResult<DataFrame> {
        match self.reader.to_file() {
            Some(file) => {
                let semaphore = MMapSemaphore::new_from_file(file)?;
                let metadata =
                    read::read_file_metadata(&mut std::io::Cursor::new(semaphore.as_ref()))?;

                if let Some(columns) = &self.columns {
                    let schema = &metadata.schema;
                    let prj = columns_to_projection(columns, schema)?;
                    self.projection = Some(prj);
                }

                let schema = if let Some(projection) = &self.projection {
                    Arc::new(apply_projection(&metadata.schema, projection))
                } else {
                    metadata.schema.clone()
                };

                let reader = MMapChunkIter::new(Arc::new(semaphore), metadata, &self.projection)?;

                finish_reader(
                    reader,
                    // don't rechunk, that would trigger a read.
                    false,
                    self.n_rows,
                    predicate,
                    &schema,
                    self.row_index.clone(),
                )
            },
            None => polars_bail!(ComputeError: "cannot memory-map, you must provide a file"),
        }
    }
}

struct MMapChunkIter<'a> {
    dictionaries: Dictionaries,
    metadata: FileMetadata,
    mmap: Arc<MMapSemaphore>,
    idx: usize,
    end: usize,
    projection: &'a Option<Vec<usize>>,
}

impl<'a> MMapChunkIter<'a> {
    fn new(
        mmap: Arc<MMapSemaphore>,
        metadata: FileMetadata,
        projection: &'a Option<Vec<usize>>,
    ) -> PolarsResult<Self> {
        let end = metadata.blocks.len();
        // mmap the dictionaries
        let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };

        Ok(Self {
            dictionaries,
            metadata,
            mmap,
            idx: 0,
            end,
            projection,
        })
    }
}

impl ArrowReader for MMapChunkIter<'_> {
    fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
        if self.idx < self.end {
            let chunk = unsafe {
                mmap_unchecked(
                    &self.metadata,
                    &self.dictionaries,
                    self.mmap.clone(),
                    self.idx,
                )
            }?;
            self.idx += 1;
            let chunk = match &self.projection {
                None => chunk,
                Some(proj) => {
                    let length = chunk.len();
                    let (schema, cols) = chunk.into_schema_and_arrays();
                    let schema = schema.try_project_indices(proj).unwrap();
                    let arrays = proj.iter().map(|i| cols[*i].clone()).collect();
                    RecordBatch::new(length, Arc::new(schema), arrays)
                },
            };
            Ok(Some(chunk))
        } else {
            Ok(None)
        }
    }
}