polars_lazy/scan/
parquet.rs1use arrow::buffer::Buffer;
2use polars_core::prelude::*;
3use polars_io::cloud::CloudOptions;
4use polars_io::parquet::read::ParallelStrategy;
5use polars_io::prelude::ParquetOptions;
6use polars_io::{HiveOptions, RowIndex};
7use polars_utils::plpath::PlPath;
8use polars_utils::slice_enum::Slice;
9
10use crate::prelude::*;
11
12#[derive(Clone)]
13pub struct ScanArgsParquet {
14 pub n_rows: Option<usize>,
15 pub parallel: ParallelStrategy,
16 pub row_index: Option<RowIndex>,
17 pub cloud_options: Option<CloudOptions>,
18 pub hive_options: HiveOptions,
19 pub use_statistics: bool,
20 pub schema: Option<SchemaRef>,
21 pub low_memory: bool,
22 pub rechunk: bool,
23 pub cache: bool,
24 pub glob: bool,
26 pub include_file_paths: Option<PlSmallStr>,
27 pub allow_missing_columns: bool,
28}
29
30impl Default for ScanArgsParquet {
31 fn default() -> Self {
32 Self {
33 n_rows: None,
34 parallel: Default::default(),
35 row_index: None,
36 cloud_options: None,
37 hive_options: Default::default(),
38 use_statistics: true,
39 schema: None,
40 rechunk: false,
41 low_memory: false,
42 cache: true,
43 glob: true,
44 include_file_paths: None,
45 allow_missing_columns: false,
46 }
47 }
48}
49
50#[derive(Clone)]
51struct LazyParquetReader {
52 args: ScanArgsParquet,
53 sources: ScanSources,
54}
55
56impl LazyParquetReader {
57 fn new(args: ScanArgsParquet) -> Self {
58 Self {
59 args,
60 sources: ScanSources::default(),
61 }
62 }
63}
64
65impl LazyFileListReader for LazyParquetReader {
66 fn finish(self) -> PolarsResult<LazyFrame> {
68 let parquet_options = ParquetOptions {
69 schema: self.args.schema,
70 parallel: self.args.parallel,
71 low_memory: self.args.low_memory,
72 use_statistics: self.args.use_statistics,
73 };
74
75 let unified_scan_args = UnifiedScanArgs {
76 schema: None,
77 cloud_options: self.args.cloud_options,
78 hive_options: self.args.hive_options,
79 rechunk: self.args.rechunk,
80 cache: self.args.cache,
81 glob: self.args.glob,
82 hidden_file_prefix: None,
83 projection: None,
84 column_mapping: None,
85 default_values: None,
86 row_index: None,
88 pre_slice: self
89 .args
90 .n_rows
91 .map(|len| Slice::Positive { offset: 0, len }),
92 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
93 missing_columns_policy: if self.args.allow_missing_columns {
94 MissingColumnsPolicy::Insert
95 } else {
96 MissingColumnsPolicy::Raise
97 },
98 extra_columns_policy: ExtraColumnsPolicy::Raise,
99 include_file_paths: self.args.include_file_paths,
100 deletion_files: None,
101 table_statistics: None,
102 };
103
104 let mut lf: LazyFrame =
105 DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
106 .build()
107 .into();
108
109 if let Some(row_index) = self.args.row_index {
111 lf = lf.with_row_index(row_index.name, Some(row_index.offset))
112 }
113
114 Ok(lf)
115 }
116
117 fn glob(&self) -> bool {
118 self.args.glob
119 }
120
121 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
122 unreachable!();
123 }
124
125 fn sources(&self) -> &ScanSources {
126 &self.sources
127 }
128
129 fn with_sources(mut self, sources: ScanSources) -> Self {
130 self.sources = sources;
131 self
132 }
133
134 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
135 self.args.n_rows = n_rows.into();
136 self
137 }
138
139 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
140 self.args.row_index = row_index.into();
141 self
142 }
143
144 fn rechunk(&self) -> bool {
145 self.args.rechunk
146 }
147
148 fn with_rechunk(mut self, toggle: bool) -> Self {
149 self.args.rechunk = toggle;
150 self
151 }
152
153 fn cloud_options(&self) -> Option<&CloudOptions> {
154 self.args.cloud_options.as_ref()
155 }
156
157 fn n_rows(&self) -> Option<usize> {
158 self.args.n_rows
159 }
160
161 fn row_index(&self) -> Option<&RowIndex> {
162 self.args.row_index.as_ref()
163 }
164}
165
166impl LazyFrame {
167 pub fn scan_parquet(path: PlPath, args: ScanArgsParquet) -> PolarsResult<Self> {
169 Self::scan_parquet_sources(ScanSources::Paths(Buffer::from_iter([path])), args)
170 }
171
172 pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
174 LazyParquetReader::new(args).with_sources(sources).finish()
175 }
176
177 pub fn scan_parquet_files(paths: Buffer<PlPath>, args: ScanArgsParquet) -> PolarsResult<Self> {
179 Self::scan_parquet_sources(ScanSources::Paths(paths), args)
180 }
181}