polars_lazy/scan/
parquet.rs1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::parquet::read::ParallelStrategy;
6use polars_io::prelude::ParquetOptions;
7use polars_io::{HiveOptions, RowIndex};
8use polars_utils::slice_enum::Slice;
9
10use crate::prelude::*;
11
12#[derive(Clone)]
13pub struct ScanArgsParquet {
14 pub n_rows: Option<usize>,
15 pub parallel: ParallelStrategy,
16 pub row_index: Option<RowIndex>,
17 pub cloud_options: Option<CloudOptions>,
18 pub hive_options: HiveOptions,
19 pub use_statistics: bool,
20 pub schema: Option<SchemaRef>,
21 pub low_memory: bool,
22 pub rechunk: bool,
23 pub cache: bool,
24 pub glob: bool,
26 pub include_file_paths: Option<PlSmallStr>,
27 pub allow_missing_columns: bool,
28}
29
30impl Default for ScanArgsParquet {
31 fn default() -> Self {
32 Self {
33 n_rows: None,
34 parallel: Default::default(),
35 row_index: None,
36 cloud_options: None,
37 hive_options: Default::default(),
38 use_statistics: true,
39 schema: None,
40 rechunk: false,
41 low_memory: false,
42 cache: true,
43 glob: true,
44 include_file_paths: None,
45 allow_missing_columns: false,
46 }
47 }
48}
49
50#[derive(Clone)]
51struct LazyParquetReader {
52 args: ScanArgsParquet,
53 sources: ScanSources,
54}
55
56impl LazyParquetReader {
57 fn new(args: ScanArgsParquet) -> Self {
58 Self {
59 args,
60 sources: ScanSources::default(),
61 }
62 }
63}
64
65impl LazyFileListReader for LazyParquetReader {
66 fn finish(self) -> PolarsResult<LazyFrame> {
68 let parquet_options = ParquetOptions {
69 schema: self.args.schema,
70 parallel: self.args.parallel,
71 low_memory: self.args.low_memory,
72 use_statistics: self.args.use_statistics,
73 };
74
75 let unified_scan_args = UnifiedScanArgs {
76 schema: None,
77 cloud_options: self.args.cloud_options,
78 hive_options: self.args.hive_options,
79 rechunk: self.args.rechunk,
80 cache: self.args.cache,
81 glob: self.args.glob,
82 projection: None,
83 row_index: None,
85 pre_slice: self
86 .args
87 .n_rows
88 .map(|len| Slice::Positive { offset: 0, len }),
89 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
90 missing_columns_policy: if self.args.allow_missing_columns {
91 MissingColumnsPolicy::Insert
92 } else {
93 MissingColumnsPolicy::Raise
94 },
95 extra_columns_policy: ExtraColumnsPolicy::Raise,
96 include_file_paths: self.args.include_file_paths,
97 deletion_files: Default::default(),
98 };
99
100 let mut lf: LazyFrame =
101 DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
102 .build()
103 .into();
104
105 if let Some(row_index) = self.args.row_index {
107 lf = lf.with_row_index(row_index.name, Some(row_index.offset))
108 }
109
110 Ok(lf)
111 }
112
113 fn glob(&self) -> bool {
114 self.args.glob
115 }
116
117 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
118 unreachable!();
119 }
120
121 fn sources(&self) -> &ScanSources {
122 &self.sources
123 }
124
125 fn with_sources(mut self, sources: ScanSources) -> Self {
126 self.sources = sources;
127 self
128 }
129
130 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
131 self.args.n_rows = n_rows.into();
132 self
133 }
134
135 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
136 self.args.row_index = row_index.into();
137 self
138 }
139
140 fn rechunk(&self) -> bool {
141 self.args.rechunk
142 }
143
144 fn with_rechunk(mut self, toggle: bool) -> Self {
145 self.args.rechunk = toggle;
146 self
147 }
148
149 fn cloud_options(&self) -> Option<&CloudOptions> {
150 self.args.cloud_options.as_ref()
151 }
152
153 fn n_rows(&self) -> Option<usize> {
154 self.args.n_rows
155 }
156
157 fn row_index(&self) -> Option<&RowIndex> {
158 self.args.row_index.as_ref()
159 }
160}
161
162impl LazyFrame {
163 pub fn scan_parquet(path: impl AsRef<Path>, args: ScanArgsParquet) -> PolarsResult<Self> {
165 Self::scan_parquet_sources(
166 ScanSources::Paths([path.as_ref().to_path_buf()].into()),
167 args,
168 )
169 }
170
171 pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
173 LazyParquetReader::new(args).with_sources(sources).finish()
174 }
175
176 pub fn scan_parquet_files(paths: Arc<[PathBuf]>, args: ScanArgsParquet) -> PolarsResult<Self> {
178 Self::scan_parquet_sources(ScanSources::Paths(paths), args)
179 }
180}