polars_lazy/scan/
ipc.rs

1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::ipc::IpcScanOptions;
6use polars_io::{HiveOptions, RowIndex};
7use polars_utils::slice_enum::Slice;
8
9use crate::prelude::*;
10
11#[derive(Clone)]
12pub struct ScanArgsIpc {
13    pub n_rows: Option<usize>,
14    pub cache: bool,
15    pub rechunk: bool,
16    pub row_index: Option<RowIndex>,
17    pub cloud_options: Option<CloudOptions>,
18    pub hive_options: HiveOptions,
19    pub include_file_paths: Option<PlSmallStr>,
20}
21
22impl Default for ScanArgsIpc {
23    fn default() -> Self {
24        Self {
25            n_rows: None,
26            cache: true,
27            rechunk: false,
28            row_index: None,
29            cloud_options: Default::default(),
30            hive_options: Default::default(),
31            include_file_paths: None,
32        }
33    }
34}
35
36#[derive(Clone)]
37struct LazyIpcReader {
38    args: ScanArgsIpc,
39    sources: ScanSources,
40}
41
42impl LazyIpcReader {
43    fn new(args: ScanArgsIpc) -> Self {
44        Self {
45            args,
46            sources: ScanSources::default(),
47        }
48    }
49}
50
51impl LazyFileListReader for LazyIpcReader {
52    fn finish(self) -> PolarsResult<LazyFrame> {
53        let args = self.args;
54
55        let options = IpcScanOptions {};
56        let pre_slice = args.n_rows.map(|len| Slice::Positive { offset: 0, len });
57
58        let cloud_options = args.cloud_options;
59        let hive_options = args.hive_options;
60        let rechunk = args.rechunk;
61        let cache = args.cache;
62        let row_index = args.row_index;
63        let include_file_paths = args.include_file_paths;
64
65        let lf: LazyFrame = DslBuilder::scan_ipc(
66            self.sources,
67            options,
68            UnifiedScanArgs {
69                schema: None,
70                cloud_options,
71                hive_options,
72                rechunk,
73                cache,
74                glob: true,
75                projection: None,
76                row_index,
77                pre_slice,
78                cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
79                missing_columns_policy: MissingColumnsPolicy::Raise,
80                extra_columns_policy: ExtraColumnsPolicy::Raise,
81                include_file_paths,
82                deletion_files: Default::default(),
83            },
84        )?
85        .build()
86        .into();
87
88        Ok(lf)
89    }
90
91    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
92        unreachable!()
93    }
94
95    fn sources(&self) -> &ScanSources {
96        &self.sources
97    }
98
99    fn with_sources(mut self, sources: ScanSources) -> Self {
100        self.sources = sources;
101        self
102    }
103
104    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
105        self.args.n_rows = n_rows.into();
106        self
107    }
108
109    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
110        self.args.row_index = row_index.into();
111        self
112    }
113
114    fn rechunk(&self) -> bool {
115        self.args.rechunk
116    }
117
118    fn with_rechunk(mut self, toggle: bool) -> Self {
119        self.args.rechunk = toggle;
120        self
121    }
122
123    fn n_rows(&self) -> Option<usize> {
124        self.args.n_rows
125    }
126
127    fn row_index(&self) -> Option<&RowIndex> {
128        self.args.row_index.as_ref()
129    }
130
131    /// [CloudOptions] used to list files.
132    fn cloud_options(&self) -> Option<&CloudOptions> {
133        self.args.cloud_options.as_ref()
134    }
135}
136
137impl LazyFrame {
138    /// Create a LazyFrame directly from a ipc scan.
139    pub fn scan_ipc(path: impl AsRef<Path>, args: ScanArgsIpc) -> PolarsResult<Self> {
140        Self::scan_ipc_sources(
141            ScanSources::Paths([path.as_ref().to_path_buf()].into()),
142            args,
143        )
144    }
145
146    pub fn scan_ipc_files(paths: Arc<[PathBuf]>, args: ScanArgsIpc) -> PolarsResult<Self> {
147        Self::scan_ipc_sources(ScanSources::Paths(paths), args)
148    }
149
150    pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
151        LazyIpcReader::new(args).with_sources(sources).finish()
152    }
153}