polars_lazy/scan/
parquet.rs1use arrow::buffer::Buffer;
2use polars_core::prelude::*;
3use polars_io::cloud::CloudOptions;
4use polars_io::parquet::read::ParallelStrategy;
5use polars_io::prelude::ParquetOptions;
6use polars_io::{HiveOptions, RowIndex};
7use polars_utils::plpath::PlPath;
8use polars_utils::slice_enum::Slice;
9
10use crate::prelude::*;
11
12#[derive(Clone)]
13pub struct ScanArgsParquet {
14 pub n_rows: Option<usize>,
15 pub parallel: ParallelStrategy,
16 pub row_index: Option<RowIndex>,
17 pub cloud_options: Option<CloudOptions>,
18 pub hive_options: HiveOptions,
19 pub use_statistics: bool,
20 pub schema: Option<SchemaRef>,
21 pub low_memory: bool,
22 pub rechunk: bool,
23 pub cache: bool,
24 pub glob: bool,
26 pub include_file_paths: Option<PlSmallStr>,
27 pub allow_missing_columns: bool,
28}
29
30impl Default for ScanArgsParquet {
31 fn default() -> Self {
32 Self {
33 n_rows: None,
34 parallel: Default::default(),
35 row_index: None,
36 cloud_options: None,
37 hive_options: Default::default(),
38 use_statistics: true,
39 schema: None,
40 rechunk: false,
41 low_memory: false,
42 cache: true,
43 glob: true,
44 include_file_paths: None,
45 allow_missing_columns: false,
46 }
47 }
48}
49
50#[derive(Clone)]
51struct LazyParquetReader {
52 args: ScanArgsParquet,
53 sources: ScanSources,
54}
55
56impl LazyParquetReader {
57 fn new(args: ScanArgsParquet) -> Self {
58 Self {
59 args,
60 sources: ScanSources::default(),
61 }
62 }
63}
64
65impl LazyFileListReader for LazyParquetReader {
66 fn finish(self) -> PolarsResult<LazyFrame> {
68 let parquet_options = ParquetOptions {
69 schema: self.args.schema,
70 parallel: self.args.parallel,
71 low_memory: self.args.low_memory,
72 use_statistics: self.args.use_statistics,
73 };
74
75 let unified_scan_args = UnifiedScanArgs {
76 schema: None,
77 cloud_options: self.args.cloud_options,
78 hive_options: self.args.hive_options,
79 rechunk: self.args.rechunk,
80 cache: self.args.cache,
81 glob: self.args.glob,
82 hidden_file_prefix: None,
83 projection: None,
84 column_mapping: None,
85 default_values: None,
86 row_index: None,
88 pre_slice: self
89 .args
90 .n_rows
91 .map(|len| Slice::Positive { offset: 0, len }),
92 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
93 missing_columns_policy: if self.args.allow_missing_columns {
94 MissingColumnsPolicy::Insert
95 } else {
96 MissingColumnsPolicy::Raise
97 },
98 extra_columns_policy: ExtraColumnsPolicy::Raise,
99 include_file_paths: self.args.include_file_paths,
100 deletion_files: None,
101 table_statistics: None,
102 row_count: None,
103 };
104
105 let mut lf: LazyFrame =
106 DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
107 .build()
108 .into();
109
110 if let Some(row_index) = self.args.row_index {
112 lf = lf.with_row_index(row_index.name, Some(row_index.offset))
113 }
114
115 Ok(lf)
116 }
117
118 fn glob(&self) -> bool {
119 self.args.glob
120 }
121
122 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
123 unreachable!();
124 }
125
126 fn sources(&self) -> &ScanSources {
127 &self.sources
128 }
129
130 fn with_sources(mut self, sources: ScanSources) -> Self {
131 self.sources = sources;
132 self
133 }
134
135 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
136 self.args.n_rows = n_rows.into();
137 self
138 }
139
140 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
141 self.args.row_index = row_index.into();
142 self
143 }
144
145 fn rechunk(&self) -> bool {
146 self.args.rechunk
147 }
148
149 fn with_rechunk(mut self, toggle: bool) -> Self {
150 self.args.rechunk = toggle;
151 self
152 }
153
154 fn cloud_options(&self) -> Option<&CloudOptions> {
155 self.args.cloud_options.as_ref()
156 }
157
158 fn n_rows(&self) -> Option<usize> {
159 self.args.n_rows
160 }
161
162 fn row_index(&self) -> Option<&RowIndex> {
163 self.args.row_index.as_ref()
164 }
165}
166
167impl LazyFrame {
168 pub fn scan_parquet(path: PlPath, args: ScanArgsParquet) -> PolarsResult<Self> {
170 Self::scan_parquet_sources(ScanSources::Paths(Buffer::from_iter([path])), args)
171 }
172
173 pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
175 LazyParquetReader::new(args).with_sources(sources).finish()
176 }
177
178 pub fn scan_parquet_files(paths: Buffer<PlPath>, args: ScanArgsParquet) -> PolarsResult<Self> {
180 Self::scan_parquet_sources(ScanSources::Paths(paths), args)
181 }
182}