1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::ipc::IpcScanOptions;
6use polars_io::{HiveOptions, RowIndex};
7use polars_utils::slice_enum::Slice;
8
9use crate::prelude::*;
10
11#[derive(Clone)]
12pub struct ScanArgsIpc {
13 pub n_rows: Option<usize>,
14 pub cache: bool,
15 pub rechunk: bool,
16 pub row_index: Option<RowIndex>,
17 pub cloud_options: Option<CloudOptions>,
18 pub hive_options: HiveOptions,
19 pub include_file_paths: Option<PlSmallStr>,
20}
21
22impl Default for ScanArgsIpc {
23 fn default() -> Self {
24 Self {
25 n_rows: None,
26 cache: true,
27 rechunk: false,
28 row_index: None,
29 cloud_options: Default::default(),
30 hive_options: Default::default(),
31 include_file_paths: None,
32 }
33 }
34}
35
36#[derive(Clone)]
37struct LazyIpcReader {
38 args: ScanArgsIpc,
39 sources: ScanSources,
40}
41
42impl LazyIpcReader {
43 fn new(args: ScanArgsIpc) -> Self {
44 Self {
45 args,
46 sources: ScanSources::default(),
47 }
48 }
49}
50
51impl LazyFileListReader for LazyIpcReader {
52 fn finish(self) -> PolarsResult<LazyFrame> {
53 let args = self.args;
54
55 let options = IpcScanOptions {};
56 let pre_slice = args.n_rows.map(|len| Slice::Positive { offset: 0, len });
57
58 let cloud_options = args.cloud_options;
59 let hive_options = args.hive_options;
60 let rechunk = args.rechunk;
61 let cache = args.cache;
62 let row_index = args.row_index;
63 let include_file_paths = args.include_file_paths;
64
65 let lf: LazyFrame = DslBuilder::scan_ipc(
66 self.sources,
67 options,
68 UnifiedScanArgs {
69 schema: None,
70 cloud_options,
71 hive_options,
72 rechunk,
73 cache,
74 glob: true,
75 projection: None,
76 row_index,
77 pre_slice,
78 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
79 missing_columns_policy: MissingColumnsPolicy::Raise,
80 extra_columns_policy: ExtraColumnsPolicy::Raise,
81 include_file_paths,
82 deletion_files: Default::default(),
83 },
84 )?
85 .build()
86 .into();
87
88 Ok(lf)
89 }
90
91 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
92 unreachable!()
93 }
94
95 fn sources(&self) -> &ScanSources {
96 &self.sources
97 }
98
99 fn with_sources(mut self, sources: ScanSources) -> Self {
100 self.sources = sources;
101 self
102 }
103
104 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
105 self.args.n_rows = n_rows.into();
106 self
107 }
108
109 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
110 self.args.row_index = row_index.into();
111 self
112 }
113
114 fn rechunk(&self) -> bool {
115 self.args.rechunk
116 }
117
118 fn with_rechunk(mut self, toggle: bool) -> Self {
119 self.args.rechunk = toggle;
120 self
121 }
122
123 fn n_rows(&self) -> Option<usize> {
124 self.args.n_rows
125 }
126
127 fn row_index(&self) -> Option<&RowIndex> {
128 self.args.row_index.as_ref()
129 }
130
131 fn cloud_options(&self) -> Option<&CloudOptions> {
133 self.args.cloud_options.as_ref()
134 }
135}
136
137impl LazyFrame {
138 pub fn scan_ipc(path: impl AsRef<Path>, args: ScanArgsIpc) -> PolarsResult<Self> {
140 Self::scan_ipc_sources(
141 ScanSources::Paths([path.as_ref().to_path_buf()].into()),
142 args,
143 )
144 }
145
146 pub fn scan_ipc_files(paths: Arc<[PathBuf]>, args: ScanArgsIpc) -> PolarsResult<Self> {
147 Self::scan_ipc_sources(ScanSources::Paths(paths), args)
148 }
149
150 pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
151 LazyIpcReader::new(args).with_sources(sources).finish()
152 }
153}