polars_lazy/scan/
ipc.rs

1use arrow::buffer::Buffer;
2use polars_core::prelude::*;
3use polars_io::cloud::CloudOptions;
4use polars_io::ipc::IpcScanOptions;
5use polars_io::{HiveOptions, RowIndex};
6use polars_utils::plpath::PlPath;
7use polars_utils::slice_enum::Slice;
8
9use crate::prelude::*;
10
11#[derive(Clone)]
12pub struct ScanArgsIpc {
13    pub n_rows: Option<usize>,
14    pub cache: bool,
15    pub rechunk: bool,
16    pub row_index: Option<RowIndex>,
17    pub cloud_options: Option<CloudOptions>,
18    pub hive_options: HiveOptions,
19    pub include_file_paths: Option<PlSmallStr>,
20}
21
22impl Default for ScanArgsIpc {
23    fn default() -> Self {
24        Self {
25            n_rows: None,
26            cache: true,
27            rechunk: false,
28            row_index: None,
29            cloud_options: Default::default(),
30            hive_options: Default::default(),
31            include_file_paths: None,
32        }
33    }
34}
35
36#[derive(Clone)]
37struct LazyIpcReader {
38    args: ScanArgsIpc,
39    sources: ScanSources,
40}
41
42impl LazyIpcReader {
43    fn new(args: ScanArgsIpc) -> Self {
44        Self {
45            args,
46            sources: ScanSources::default(),
47        }
48    }
49}
50
51impl LazyFileListReader for LazyIpcReader {
52    fn finish(self) -> PolarsResult<LazyFrame> {
53        let args = self.args;
54
55        let options = IpcScanOptions {};
56        let pre_slice = args.n_rows.map(|len| Slice::Positive { offset: 0, len });
57
58        let cloud_options = args.cloud_options;
59        let hive_options = args.hive_options;
60        let rechunk = args.rechunk;
61        let cache = args.cache;
62        let row_index = args.row_index;
63        let include_file_paths = args.include_file_paths;
64
65        let lf: LazyFrame = DslBuilder::scan_ipc(
66            self.sources,
67            options,
68            UnifiedScanArgs {
69                schema: None,
70                cloud_options,
71                hive_options,
72                rechunk,
73                cache,
74                glob: true,
75                hidden_file_prefix: None,
76                projection: None,
77                column_mapping: None,
78                default_values: None,
79                row_index,
80                pre_slice,
81                cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
82                missing_columns_policy: MissingColumnsPolicy::Raise,
83                extra_columns_policy: ExtraColumnsPolicy::Raise,
84                include_file_paths,
85                deletion_files: None,
86                table_statistics: None,
87            },
88        )?
89        .build()
90        .into();
91
92        Ok(lf)
93    }
94
95    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
96        unreachable!()
97    }
98
99    fn sources(&self) -> &ScanSources {
100        &self.sources
101    }
102
103    fn with_sources(mut self, sources: ScanSources) -> Self {
104        self.sources = sources;
105        self
106    }
107
108    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
109        self.args.n_rows = n_rows.into();
110        self
111    }
112
113    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
114        self.args.row_index = row_index.into();
115        self
116    }
117
118    fn rechunk(&self) -> bool {
119        self.args.rechunk
120    }
121
122    fn with_rechunk(mut self, toggle: bool) -> Self {
123        self.args.rechunk = toggle;
124        self
125    }
126
127    fn n_rows(&self) -> Option<usize> {
128        self.args.n_rows
129    }
130
131    fn row_index(&self) -> Option<&RowIndex> {
132        self.args.row_index.as_ref()
133    }
134
135    /// [CloudOptions] used to list files.
136    fn cloud_options(&self) -> Option<&CloudOptions> {
137        self.args.cloud_options.as_ref()
138    }
139}
140
141impl LazyFrame {
142    /// Create a LazyFrame directly from a ipc scan.
143    pub fn scan_ipc(path: PlPath, args: ScanArgsIpc) -> PolarsResult<Self> {
144        Self::scan_ipc_sources(ScanSources::Paths(Buffer::from_iter([path])), args)
145    }
146
147    pub fn scan_ipc_files(paths: Buffer<PlPath>, args: ScanArgsIpc) -> PolarsResult<Self> {
148        Self::scan_ipc_sources(ScanSources::Paths(paths), args)
149    }
150
151    pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
152        LazyIpcReader::new(args).with_sources(sources).finish()
153    }
154}