polars_lazy/scan/
parquet.rs

1use arrow::buffer::Buffer;
2use polars_core::prelude::*;
3use polars_io::cloud::CloudOptions;
4use polars_io::parquet::read::ParallelStrategy;
5use polars_io::prelude::ParquetOptions;
6use polars_io::{HiveOptions, RowIndex};
7use polars_utils::plpath::PlPath;
8use polars_utils::slice_enum::Slice;
9
10use crate::prelude::*;
11
12#[derive(Clone)]
13pub struct ScanArgsParquet {
14    pub n_rows: Option<usize>,
15    pub parallel: ParallelStrategy,
16    pub row_index: Option<RowIndex>,
17    pub cloud_options: Option<CloudOptions>,
18    pub hive_options: HiveOptions,
19    pub use_statistics: bool,
20    pub schema: Option<SchemaRef>,
21    pub low_memory: bool,
22    pub rechunk: bool,
23    pub cache: bool,
24    /// Expand path given via globbing rules.
25    pub glob: bool,
26    pub include_file_paths: Option<PlSmallStr>,
27    pub allow_missing_columns: bool,
28}
29
30impl Default for ScanArgsParquet {
31    fn default() -> Self {
32        Self {
33            n_rows: None,
34            parallel: Default::default(),
35            row_index: None,
36            cloud_options: None,
37            hive_options: Default::default(),
38            use_statistics: true,
39            schema: None,
40            rechunk: false,
41            low_memory: false,
42            cache: true,
43            glob: true,
44            include_file_paths: None,
45            allow_missing_columns: false,
46        }
47    }
48}
49
50#[derive(Clone)]
51struct LazyParquetReader {
52    args: ScanArgsParquet,
53    sources: ScanSources,
54}
55
56impl LazyParquetReader {
57    fn new(args: ScanArgsParquet) -> Self {
58        Self {
59            args,
60            sources: ScanSources::default(),
61        }
62    }
63}
64
65impl LazyFileListReader for LazyParquetReader {
66    /// Get the final [LazyFrame].
67    fn finish(self) -> PolarsResult<LazyFrame> {
68        let parquet_options = ParquetOptions {
69            schema: self.args.schema,
70            parallel: self.args.parallel,
71            low_memory: self.args.low_memory,
72            use_statistics: self.args.use_statistics,
73        };
74
75        let unified_scan_args = UnifiedScanArgs {
76            schema: None,
77            cloud_options: self.args.cloud_options,
78            hive_options: self.args.hive_options,
79            rechunk: self.args.rechunk,
80            cache: self.args.cache,
81            glob: self.args.glob,
82            hidden_file_prefix: None,
83            projection: None,
84            column_mapping: None,
85            default_values: None,
86            // Note: We call `with_row_index()` on the LazyFrame below
87            row_index: None,
88            pre_slice: self
89                .args
90                .n_rows
91                .map(|len| Slice::Positive { offset: 0, len }),
92            cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
93            missing_columns_policy: if self.args.allow_missing_columns {
94                MissingColumnsPolicy::Insert
95            } else {
96                MissingColumnsPolicy::Raise
97            },
98            extra_columns_policy: ExtraColumnsPolicy::Raise,
99            include_file_paths: self.args.include_file_paths,
100            deletion_files: None,
101            table_statistics: None,
102            row_count: None,
103        };
104
105        let mut lf: LazyFrame =
106            DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
107                .build()
108                .into();
109
110        // It's a bit hacky, but this row_index function updates the schema.
111        if let Some(row_index) = self.args.row_index {
112            lf = lf.with_row_index(row_index.name, Some(row_index.offset))
113        }
114
115        Ok(lf)
116    }
117
118    fn glob(&self) -> bool {
119        self.args.glob
120    }
121
122    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
123        unreachable!();
124    }
125
126    fn sources(&self) -> &ScanSources {
127        &self.sources
128    }
129
130    fn with_sources(mut self, sources: ScanSources) -> Self {
131        self.sources = sources;
132        self
133    }
134
135    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
136        self.args.n_rows = n_rows.into();
137        self
138    }
139
140    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
141        self.args.row_index = row_index.into();
142        self
143    }
144
145    fn rechunk(&self) -> bool {
146        self.args.rechunk
147    }
148
149    fn with_rechunk(mut self, toggle: bool) -> Self {
150        self.args.rechunk = toggle;
151        self
152    }
153
154    fn cloud_options(&self) -> Option<&CloudOptions> {
155        self.args.cloud_options.as_ref()
156    }
157
158    fn n_rows(&self) -> Option<usize> {
159        self.args.n_rows
160    }
161
162    fn row_index(&self) -> Option<&RowIndex> {
163        self.args.row_index.as_ref()
164    }
165}
166
167impl LazyFrame {
168    /// Create a LazyFrame directly from a parquet scan.
169    pub fn scan_parquet(path: PlPath, args: ScanArgsParquet) -> PolarsResult<Self> {
170        Self::scan_parquet_sources(ScanSources::Paths(Buffer::from_iter([path])), args)
171    }
172
173    /// Create a LazyFrame directly from a parquet scan.
174    pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
175        LazyParquetReader::new(args).with_sources(sources).finish()
176    }
177
178    /// Create a LazyFrame directly from a parquet scan.
179    pub fn scan_parquet_files(paths: Buffer<PlPath>, args: ScanArgsParquet) -> PolarsResult<Self> {
180        Self::scan_parquet_sources(ScanSources::Paths(paths), args)
181    }
182}