1use polars_core::prelude::*;
2use polars_io::cloud::CloudOptions;
3use polars_io::ipc::IpcScanOptions;
4use polars_io::{HiveOptions, RowIndex};
5use polars_utils::plpath::PlPath;
6use polars_utils::slice_enum::Slice;
7
8use crate::prelude::*;
9
10#[derive(Clone)]
11pub struct ScanArgsIpc {
12 pub n_rows: Option<usize>,
13 pub cache: bool,
14 pub rechunk: bool,
15 pub row_index: Option<RowIndex>,
16 pub cloud_options: Option<CloudOptions>,
17 pub hive_options: HiveOptions,
18 pub include_file_paths: Option<PlSmallStr>,
19}
20
21impl Default for ScanArgsIpc {
22 fn default() -> Self {
23 Self {
24 n_rows: None,
25 cache: true,
26 rechunk: false,
27 row_index: None,
28 cloud_options: Default::default(),
29 hive_options: Default::default(),
30 include_file_paths: None,
31 }
32 }
33}
34
35#[derive(Clone)]
36struct LazyIpcReader {
37 args: ScanArgsIpc,
38 sources: ScanSources,
39}
40
41impl LazyIpcReader {
42 fn new(args: ScanArgsIpc) -> Self {
43 Self {
44 args,
45 sources: ScanSources::default(),
46 }
47 }
48}
49
50impl LazyFileListReader for LazyIpcReader {
51 fn finish(self) -> PolarsResult<LazyFrame> {
52 let args = self.args;
53
54 let options = IpcScanOptions {};
55 let pre_slice = args.n_rows.map(|len| Slice::Positive { offset: 0, len });
56
57 let cloud_options = args.cloud_options;
58 let hive_options = args.hive_options;
59 let rechunk = args.rechunk;
60 let cache = args.cache;
61 let row_index = args.row_index;
62 let include_file_paths = args.include_file_paths;
63
64 let lf: LazyFrame = DslBuilder::scan_ipc(
65 self.sources,
66 options,
67 UnifiedScanArgs {
68 schema: None,
69 cloud_options,
70 hive_options,
71 rechunk,
72 cache,
73 glob: true,
74 projection: None,
75 column_mapping: None,
76 default_values: None,
77 row_index,
78 pre_slice,
79 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
80 missing_columns_policy: MissingColumnsPolicy::Raise,
81 extra_columns_policy: ExtraColumnsPolicy::Raise,
82 include_file_paths,
83 deletion_files: None,
84 },
85 )?
86 .build()
87 .into();
88
89 Ok(lf)
90 }
91
92 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
93 unreachable!()
94 }
95
96 fn sources(&self) -> &ScanSources {
97 &self.sources
98 }
99
100 fn with_sources(mut self, sources: ScanSources) -> Self {
101 self.sources = sources;
102 self
103 }
104
105 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
106 self.args.n_rows = n_rows.into();
107 self
108 }
109
110 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
111 self.args.row_index = row_index.into();
112 self
113 }
114
115 fn rechunk(&self) -> bool {
116 self.args.rechunk
117 }
118
119 fn with_rechunk(mut self, toggle: bool) -> Self {
120 self.args.rechunk = toggle;
121 self
122 }
123
124 fn n_rows(&self) -> Option<usize> {
125 self.args.n_rows
126 }
127
128 fn row_index(&self) -> Option<&RowIndex> {
129 self.args.row_index.as_ref()
130 }
131
132 fn cloud_options(&self) -> Option<&CloudOptions> {
134 self.args.cloud_options.as_ref()
135 }
136}
137
138impl LazyFrame {
139 pub fn scan_ipc(path: PlPath, args: ScanArgsIpc) -> PolarsResult<Self> {
141 Self::scan_ipc_sources(ScanSources::Paths([path].into()), args)
142 }
143
144 pub fn scan_ipc_files(paths: Arc<[PlPath]>, args: ScanArgsIpc) -> PolarsResult<Self> {
145 Self::scan_ipc_sources(ScanSources::Paths(paths), args)
146 }
147
148 pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
149 LazyIpcReader::new(args).with_sources(sources).finish()
150 }
151}