polars_lazy/scan/
parquet.rs1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::parquet::read::ParallelStrategy;
6use polars_io::{HiveOptions, RowIndex};
7
8use crate::prelude::*;
9
10#[derive(Clone)]
11pub struct ScanArgsParquet {
12 pub n_rows: Option<usize>,
13 pub parallel: ParallelStrategy,
14 pub row_index: Option<RowIndex>,
15 pub cloud_options: Option<CloudOptions>,
16 pub hive_options: HiveOptions,
17 pub use_statistics: bool,
18 pub schema: Option<SchemaRef>,
19 pub low_memory: bool,
20 pub rechunk: bool,
21 pub cache: bool,
22 pub glob: bool,
24 pub include_file_paths: Option<PlSmallStr>,
25 pub allow_missing_columns: bool,
26}
27
28impl Default for ScanArgsParquet {
29 fn default() -> Self {
30 Self {
31 n_rows: None,
32 parallel: Default::default(),
33 row_index: None,
34 cloud_options: None,
35 hive_options: Default::default(),
36 use_statistics: true,
37 schema: None,
38 rechunk: false,
39 low_memory: false,
40 cache: true,
41 glob: true,
42 include_file_paths: None,
43 allow_missing_columns: false,
44 }
45 }
46}
47
48#[derive(Clone)]
49struct LazyParquetReader {
50 args: ScanArgsParquet,
51 sources: ScanSources,
52}
53
54impl LazyParquetReader {
55 fn new(args: ScanArgsParquet) -> Self {
56 Self {
57 args,
58 sources: ScanSources::default(),
59 }
60 }
61}
62
63impl LazyFileListReader for LazyParquetReader {
64 fn finish(self) -> PolarsResult<LazyFrame> {
66 let row_index = self.args.row_index;
67
68 let mut lf: LazyFrame = DslBuilder::scan_parquet(
69 self.sources,
70 self.args.n_rows,
71 self.args.cache,
72 self.args.parallel,
73 None,
74 self.args.rechunk,
75 self.args.low_memory,
76 self.args.cloud_options,
77 self.args.use_statistics,
78 self.args.schema,
79 self.args.hive_options,
80 self.args.glob,
81 self.args.include_file_paths,
82 self.args.allow_missing_columns,
83 )?
84 .build()
85 .into();
86
87 if let Some(row_index) = row_index {
89 lf = lf.with_row_index(row_index.name.clone(), Some(row_index.offset))
90 }
91
92 Ok(lf)
93 }
94
95 fn glob(&self) -> bool {
96 self.args.glob
97 }
98
99 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
100 unreachable!();
101 }
102
103 fn sources(&self) -> &ScanSources {
104 &self.sources
105 }
106
107 fn with_sources(mut self, sources: ScanSources) -> Self {
108 self.sources = sources;
109 self
110 }
111
112 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
113 self.args.n_rows = n_rows.into();
114 self
115 }
116
117 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
118 self.args.row_index = row_index.into();
119 self
120 }
121
122 fn rechunk(&self) -> bool {
123 self.args.rechunk
124 }
125
126 fn with_rechunk(mut self, toggle: bool) -> Self {
127 self.args.rechunk = toggle;
128 self
129 }
130
131 fn cloud_options(&self) -> Option<&CloudOptions> {
132 self.args.cloud_options.as_ref()
133 }
134
135 fn n_rows(&self) -> Option<usize> {
136 self.args.n_rows
137 }
138
139 fn row_index(&self) -> Option<&RowIndex> {
140 self.args.row_index.as_ref()
141 }
142}
143
144impl LazyFrame {
145 pub fn scan_parquet(path: impl AsRef<Path>, args: ScanArgsParquet) -> PolarsResult<Self> {
147 Self::scan_parquet_sources(
148 ScanSources::Paths([path.as_ref().to_path_buf()].into()),
149 args,
150 )
151 }
152
153 pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
155 LazyParquetReader::new(args).with_sources(sources).finish()
156 }
157
158 pub fn scan_parquet_files(paths: Arc<[PathBuf]>, args: ScanArgsParquet) -> PolarsResult<Self> {
160 Self::scan_parquet_sources(ScanSources::Paths(paths), args)
161 }
162}