polars_lazy/scan/
ipc.rs

1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::ipc::IpcScanOptions;
6use polars_io::{HiveOptions, RowIndex};
7
8use crate::prelude::*;
9
10#[derive(Clone)]
11pub struct ScanArgsIpc {
12    pub n_rows: Option<usize>,
13    pub cache: bool,
14    pub rechunk: bool,
15    pub row_index: Option<RowIndex>,
16    pub cloud_options: Option<CloudOptions>,
17    pub hive_options: HiveOptions,
18    pub include_file_paths: Option<PlSmallStr>,
19}
20
21impl Default for ScanArgsIpc {
22    fn default() -> Self {
23        Self {
24            n_rows: None,
25            cache: true,
26            rechunk: false,
27            row_index: None,
28            cloud_options: Default::default(),
29            hive_options: Default::default(),
30            include_file_paths: None,
31        }
32    }
33}
34
35#[derive(Clone)]
36struct LazyIpcReader {
37    args: ScanArgsIpc,
38    sources: ScanSources,
39}
40
41impl LazyIpcReader {
42    fn new(args: ScanArgsIpc) -> Self {
43        Self {
44            args,
45            sources: ScanSources::default(),
46        }
47    }
48}
49
50impl LazyFileListReader for LazyIpcReader {
51    fn finish(self) -> PolarsResult<LazyFrame> {
52        let args = self.args;
53
54        let options = IpcScanOptions {};
55
56        let lf: LazyFrame = DslBuilder::scan_ipc(
57            self.sources,
58            options,
59            args.n_rows,
60            args.cache,
61            args.row_index,
62            args.rechunk,
63            args.cloud_options,
64            args.hive_options,
65            args.include_file_paths,
66        )?
67        .build()
68        .into();
69
70        Ok(lf)
71    }
72
73    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
74        unreachable!()
75    }
76
77    fn sources(&self) -> &ScanSources {
78        &self.sources
79    }
80
81    fn with_sources(mut self, sources: ScanSources) -> Self {
82        self.sources = sources;
83        self
84    }
85
86    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
87        self.args.n_rows = n_rows.into();
88        self
89    }
90
91    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
92        self.args.row_index = row_index.into();
93        self
94    }
95
96    fn rechunk(&self) -> bool {
97        self.args.rechunk
98    }
99
100    fn with_rechunk(mut self, toggle: bool) -> Self {
101        self.args.rechunk = toggle;
102        self
103    }
104
105    fn n_rows(&self) -> Option<usize> {
106        self.args.n_rows
107    }
108
109    fn row_index(&self) -> Option<&RowIndex> {
110        self.args.row_index.as_ref()
111    }
112
113    /// [CloudOptions] used to list files.
114    fn cloud_options(&self) -> Option<&CloudOptions> {
115        self.args.cloud_options.as_ref()
116    }
117}
118
119impl LazyFrame {
120    /// Create a LazyFrame directly from a ipc scan.
121    pub fn scan_ipc(path: impl AsRef<Path>, args: ScanArgsIpc) -> PolarsResult<Self> {
122        Self::scan_ipc_sources(
123            ScanSources::Paths([path.as_ref().to_path_buf()].into()),
124            args,
125        )
126    }
127
128    pub fn scan_ipc_files(paths: Arc<[PathBuf]>, args: ScanArgsIpc) -> PolarsResult<Self> {
129        Self::scan_ipc_sources(ScanSources::Paths(paths), args)
130    }
131
132    pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
133        LazyIpcReader::new(args).with_sources(sources).finish()
134    }
135}