polars_lazy/scan/
parquet.rs1use polars_core::prelude::*;
2use polars_io::cloud::CloudOptions;
3use polars_io::parquet::read::ParallelStrategy;
4use polars_io::prelude::ParquetOptions;
5use polars_io::{HiveOptions, RowIndex};
6use polars_utils::plpath::PlPath;
7use polars_utils::slice_enum::Slice;
8
9use crate::prelude::*;
10
11#[derive(Clone)]
12pub struct ScanArgsParquet {
13 pub n_rows: Option<usize>,
14 pub parallel: ParallelStrategy,
15 pub row_index: Option<RowIndex>,
16 pub cloud_options: Option<CloudOptions>,
17 pub hive_options: HiveOptions,
18 pub use_statistics: bool,
19 pub schema: Option<SchemaRef>,
20 pub low_memory: bool,
21 pub rechunk: bool,
22 pub cache: bool,
23 pub glob: bool,
25 pub include_file_paths: Option<PlSmallStr>,
26 pub allow_missing_columns: bool,
27}
28
29impl Default for ScanArgsParquet {
30 fn default() -> Self {
31 Self {
32 n_rows: None,
33 parallel: Default::default(),
34 row_index: None,
35 cloud_options: None,
36 hive_options: Default::default(),
37 use_statistics: true,
38 schema: None,
39 rechunk: false,
40 low_memory: false,
41 cache: true,
42 glob: true,
43 include_file_paths: None,
44 allow_missing_columns: false,
45 }
46 }
47}
48
49#[derive(Clone)]
50struct LazyParquetReader {
51 args: ScanArgsParquet,
52 sources: ScanSources,
53}
54
55impl LazyParquetReader {
56 fn new(args: ScanArgsParquet) -> Self {
57 Self {
58 args,
59 sources: ScanSources::default(),
60 }
61 }
62}
63
64impl LazyFileListReader for LazyParquetReader {
65 fn finish(self) -> PolarsResult<LazyFrame> {
67 let parquet_options = ParquetOptions {
68 schema: self.args.schema,
69 parallel: self.args.parallel,
70 low_memory: self.args.low_memory,
71 use_statistics: self.args.use_statistics,
72 };
73
74 let unified_scan_args = UnifiedScanArgs {
75 schema: None,
76 cloud_options: self.args.cloud_options,
77 hive_options: self.args.hive_options,
78 rechunk: self.args.rechunk,
79 cache: self.args.cache,
80 glob: self.args.glob,
81 projection: None,
82 column_mapping: None,
83 default_values: None,
84 row_index: None,
86 pre_slice: self
87 .args
88 .n_rows
89 .map(|len| Slice::Positive { offset: 0, len }),
90 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
91 missing_columns_policy: if self.args.allow_missing_columns {
92 MissingColumnsPolicy::Insert
93 } else {
94 MissingColumnsPolicy::Raise
95 },
96 extra_columns_policy: ExtraColumnsPolicy::Raise,
97 include_file_paths: self.args.include_file_paths,
98 deletion_files: None,
99 };
100
101 let mut lf: LazyFrame =
102 DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
103 .build()
104 .into();
105
106 if let Some(row_index) = self.args.row_index {
108 lf = lf.with_row_index(row_index.name, Some(row_index.offset))
109 }
110
111 Ok(lf)
112 }
113
114 fn glob(&self) -> bool {
115 self.args.glob
116 }
117
118 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
119 unreachable!();
120 }
121
122 fn sources(&self) -> &ScanSources {
123 &self.sources
124 }
125
126 fn with_sources(mut self, sources: ScanSources) -> Self {
127 self.sources = sources;
128 self
129 }
130
131 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
132 self.args.n_rows = n_rows.into();
133 self
134 }
135
136 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
137 self.args.row_index = row_index.into();
138 self
139 }
140
141 fn rechunk(&self) -> bool {
142 self.args.rechunk
143 }
144
145 fn with_rechunk(mut self, toggle: bool) -> Self {
146 self.args.rechunk = toggle;
147 self
148 }
149
150 fn cloud_options(&self) -> Option<&CloudOptions> {
151 self.args.cloud_options.as_ref()
152 }
153
154 fn n_rows(&self) -> Option<usize> {
155 self.args.n_rows
156 }
157
158 fn row_index(&self) -> Option<&RowIndex> {
159 self.args.row_index.as_ref()
160 }
161}
162
163impl LazyFrame {
164 pub fn scan_parquet(path: PlPath, args: ScanArgsParquet) -> PolarsResult<Self> {
166 Self::scan_parquet_sources(ScanSources::Paths([path].into()), args)
167 }
168
169 pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
171 LazyParquetReader::new(args).with_sources(sources).finish()
172 }
173
174 pub fn scan_parquet_files(paths: Arc<[PlPath]>, args: ScanArgsParquet) -> PolarsResult<Self> {
176 Self::scan_parquet_sources(ScanSources::Paths(paths), args)
177 }
178}