1use arrow::buffer::Buffer;
2use polars_core::prelude::*;
3use polars_io::cloud::CloudOptions;
4use polars_io::ipc::IpcScanOptions;
5use polars_io::{HiveOptions, RowIndex};
6use polars_utils::plpath::PlPath;
7use polars_utils::slice_enum::Slice;
8
9use crate::prelude::*;
10
11#[derive(Clone)]
12pub struct ScanArgsIpc {
13 pub n_rows: Option<usize>,
14 pub cache: bool,
15 pub rechunk: bool,
16 pub row_index: Option<RowIndex>,
17 pub cloud_options: Option<CloudOptions>,
18 pub hive_options: HiveOptions,
19 pub include_file_paths: Option<PlSmallStr>,
20}
21
22impl Default for ScanArgsIpc {
23 fn default() -> Self {
24 Self {
25 n_rows: None,
26 cache: true,
27 rechunk: false,
28 row_index: None,
29 cloud_options: Default::default(),
30 hive_options: Default::default(),
31 include_file_paths: None,
32 }
33 }
34}
35
36#[derive(Clone)]
37struct LazyIpcReader {
38 args: ScanArgsIpc,
39 sources: ScanSources,
40}
41
42impl LazyIpcReader {
43 fn new(args: ScanArgsIpc) -> Self {
44 Self {
45 args,
46 sources: ScanSources::default(),
47 }
48 }
49}
50
51impl LazyFileListReader for LazyIpcReader {
52 fn finish(self) -> PolarsResult<LazyFrame> {
53 let args = self.args;
54
55 let options = IpcScanOptions {};
56 let pre_slice = args.n_rows.map(|len| Slice::Positive { offset: 0, len });
57
58 let cloud_options = args.cloud_options;
59 let hive_options = args.hive_options;
60 let rechunk = args.rechunk;
61 let cache = args.cache;
62 let row_index = args.row_index;
63 let include_file_paths = args.include_file_paths;
64
65 let lf: LazyFrame = DslBuilder::scan_ipc(
66 self.sources,
67 options,
68 UnifiedScanArgs {
69 schema: None,
70 cloud_options,
71 hive_options,
72 rechunk,
73 cache,
74 glob: true,
75 hidden_file_prefix: None,
76 projection: None,
77 column_mapping: None,
78 default_values: None,
79 row_index,
80 pre_slice,
81 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
82 missing_columns_policy: MissingColumnsPolicy::Raise,
83 extra_columns_policy: ExtraColumnsPolicy::Raise,
84 include_file_paths,
85 deletion_files: None,
86 table_statistics: None,
87 },
88 )?
89 .build()
90 .into();
91
92 Ok(lf)
93 }
94
95 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
96 unreachable!()
97 }
98
99 fn sources(&self) -> &ScanSources {
100 &self.sources
101 }
102
103 fn with_sources(mut self, sources: ScanSources) -> Self {
104 self.sources = sources;
105 self
106 }
107
108 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
109 self.args.n_rows = n_rows.into();
110 self
111 }
112
113 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
114 self.args.row_index = row_index.into();
115 self
116 }
117
118 fn rechunk(&self) -> bool {
119 self.args.rechunk
120 }
121
122 fn with_rechunk(mut self, toggle: bool) -> Self {
123 self.args.rechunk = toggle;
124 self
125 }
126
127 fn n_rows(&self) -> Option<usize> {
128 self.args.n_rows
129 }
130
131 fn row_index(&self) -> Option<&RowIndex> {
132 self.args.row_index.as_ref()
133 }
134
135 fn cloud_options(&self) -> Option<&CloudOptions> {
137 self.args.cloud_options.as_ref()
138 }
139}
140
141impl LazyFrame {
142 pub fn scan_ipc(path: PlPath, args: ScanArgsIpc) -> PolarsResult<Self> {
144 Self::scan_ipc_sources(ScanSources::Paths(Buffer::from_iter([path])), args)
145 }
146
147 pub fn scan_ipc_files(paths: Buffer<PlPath>, args: ScanArgsIpc) -> PolarsResult<Self> {
148 Self::scan_ipc_sources(ScanSources::Paths(paths), args)
149 }
150
151 pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
152 LazyIpcReader::new(args).with_sources(sources).finish()
153 }
154}