polars_lazy/scan/
ipc.rs

1use polars_core::prelude::*;
2use polars_io::cloud::CloudOptions;
3use polars_io::ipc::IpcScanOptions;
4use polars_io::{HiveOptions, RowIndex};
5use polars_utils::plpath::PlPath;
6use polars_utils::slice_enum::Slice;
7
8use crate::prelude::*;
9
10#[derive(Clone)]
11pub struct ScanArgsIpc {
12    pub n_rows: Option<usize>,
13    pub cache: bool,
14    pub rechunk: bool,
15    pub row_index: Option<RowIndex>,
16    pub cloud_options: Option<CloudOptions>,
17    pub hive_options: HiveOptions,
18    pub include_file_paths: Option<PlSmallStr>,
19}
20
21impl Default for ScanArgsIpc {
22    fn default() -> Self {
23        Self {
24            n_rows: None,
25            cache: true,
26            rechunk: false,
27            row_index: None,
28            cloud_options: Default::default(),
29            hive_options: Default::default(),
30            include_file_paths: None,
31        }
32    }
33}
34
35#[derive(Clone)]
36struct LazyIpcReader {
37    args: ScanArgsIpc,
38    sources: ScanSources,
39}
40
41impl LazyIpcReader {
42    fn new(args: ScanArgsIpc) -> Self {
43        Self {
44            args,
45            sources: ScanSources::default(),
46        }
47    }
48}
49
50impl LazyFileListReader for LazyIpcReader {
51    fn finish(self) -> PolarsResult<LazyFrame> {
52        let args = self.args;
53
54        let options = IpcScanOptions {};
55        let pre_slice = args.n_rows.map(|len| Slice::Positive { offset: 0, len });
56
57        let cloud_options = args.cloud_options;
58        let hive_options = args.hive_options;
59        let rechunk = args.rechunk;
60        let cache = args.cache;
61        let row_index = args.row_index;
62        let include_file_paths = args.include_file_paths;
63
64        let lf: LazyFrame = DslBuilder::scan_ipc(
65            self.sources,
66            options,
67            UnifiedScanArgs {
68                schema: None,
69                cloud_options,
70                hive_options,
71                rechunk,
72                cache,
73                glob: true,
74                projection: None,
75                column_mapping: None,
76                default_values: None,
77                row_index,
78                pre_slice,
79                cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
80                missing_columns_policy: MissingColumnsPolicy::Raise,
81                extra_columns_policy: ExtraColumnsPolicy::Raise,
82                include_file_paths,
83                deletion_files: None,
84            },
85        )?
86        .build()
87        .into();
88
89        Ok(lf)
90    }
91
92    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
93        unreachable!()
94    }
95
96    fn sources(&self) -> &ScanSources {
97        &self.sources
98    }
99
100    fn with_sources(mut self, sources: ScanSources) -> Self {
101        self.sources = sources;
102        self
103    }
104
105    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
106        self.args.n_rows = n_rows.into();
107        self
108    }
109
110    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
111        self.args.row_index = row_index.into();
112        self
113    }
114
115    fn rechunk(&self) -> bool {
116        self.args.rechunk
117    }
118
119    fn with_rechunk(mut self, toggle: bool) -> Self {
120        self.args.rechunk = toggle;
121        self
122    }
123
124    fn n_rows(&self) -> Option<usize> {
125        self.args.n_rows
126    }
127
128    fn row_index(&self) -> Option<&RowIndex> {
129        self.args.row_index.as_ref()
130    }
131
132    /// [CloudOptions] used to list files.
133    fn cloud_options(&self) -> Option<&CloudOptions> {
134        self.args.cloud_options.as_ref()
135    }
136}
137
138impl LazyFrame {
139    /// Create a LazyFrame directly from a ipc scan.
140    pub fn scan_ipc(path: PlPath, args: ScanArgsIpc) -> PolarsResult<Self> {
141        Self::scan_ipc_sources(ScanSources::Paths([path].into()), args)
142    }
143
144    pub fn scan_ipc_files(paths: Arc<[PlPath]>, args: ScanArgsIpc) -> PolarsResult<Self> {
145        Self::scan_ipc_sources(ScanSources::Paths(paths), args)
146    }
147
148    pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
149        LazyIpcReader::new(args).with_sources(sources).finish()
150    }
151}