polars_lazy/scan/
parquet.rs

1use arrow::buffer::Buffer;
2use polars_core::prelude::*;
3use polars_io::cloud::CloudOptions;
4use polars_io::parquet::read::ParallelStrategy;
5use polars_io::prelude::ParquetOptions;
6use polars_io::{HiveOptions, RowIndex};
7use polars_utils::plpath::PlPath;
8use polars_utils::slice_enum::Slice;
9
10use crate::prelude::*;
11
12#[derive(Clone)]
13pub struct ScanArgsParquet {
14    pub n_rows: Option<usize>,
15    pub parallel: ParallelStrategy,
16    pub row_index: Option<RowIndex>,
17    pub cloud_options: Option<CloudOptions>,
18    pub hive_options: HiveOptions,
19    pub use_statistics: bool,
20    pub schema: Option<SchemaRef>,
21    pub low_memory: bool,
22    pub rechunk: bool,
23    pub cache: bool,
24    /// Expand path given via globbing rules.
25    pub glob: bool,
26    pub include_file_paths: Option<PlSmallStr>,
27    pub allow_missing_columns: bool,
28}
29
30impl Default for ScanArgsParquet {
31    fn default() -> Self {
32        Self {
33            n_rows: None,
34            parallel: Default::default(),
35            row_index: None,
36            cloud_options: None,
37            hive_options: Default::default(),
38            use_statistics: true,
39            schema: None,
40            rechunk: false,
41            low_memory: false,
42            cache: true,
43            glob: true,
44            include_file_paths: None,
45            allow_missing_columns: false,
46        }
47    }
48}
49
50#[derive(Clone)]
51struct LazyParquetReader {
52    args: ScanArgsParquet,
53    sources: ScanSources,
54}
55
56impl LazyParquetReader {
57    fn new(args: ScanArgsParquet) -> Self {
58        Self {
59            args,
60            sources: ScanSources::default(),
61        }
62    }
63}
64
65impl LazyFileListReader for LazyParquetReader {
66    /// Get the final [LazyFrame].
67    fn finish(self) -> PolarsResult<LazyFrame> {
68        let parquet_options = ParquetOptions {
69            schema: self.args.schema,
70            parallel: self.args.parallel,
71            low_memory: self.args.low_memory,
72            use_statistics: self.args.use_statistics,
73        };
74
75        let unified_scan_args = UnifiedScanArgs {
76            schema: None,
77            cloud_options: self.args.cloud_options,
78            hive_options: self.args.hive_options,
79            rechunk: self.args.rechunk,
80            cache: self.args.cache,
81            glob: self.args.glob,
82            hidden_file_prefix: None,
83            projection: None,
84            column_mapping: None,
85            default_values: None,
86            // Note: We call `with_row_index()` on the LazyFrame below
87            row_index: None,
88            pre_slice: self
89                .args
90                .n_rows
91                .map(|len| Slice::Positive { offset: 0, len }),
92            cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
93            missing_columns_policy: if self.args.allow_missing_columns {
94                MissingColumnsPolicy::Insert
95            } else {
96                MissingColumnsPolicy::Raise
97            },
98            extra_columns_policy: ExtraColumnsPolicy::Raise,
99            include_file_paths: self.args.include_file_paths,
100            deletion_files: None,
101            table_statistics: None,
102        };
103
104        let mut lf: LazyFrame =
105            DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
106                .build()
107                .into();
108
109        // It's a bit hacky, but this row_index function updates the schema.
110        if let Some(row_index) = self.args.row_index {
111            lf = lf.with_row_index(row_index.name, Some(row_index.offset))
112        }
113
114        Ok(lf)
115    }
116
117    fn glob(&self) -> bool {
118        self.args.glob
119    }
120
121    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
122        unreachable!();
123    }
124
125    fn sources(&self) -> &ScanSources {
126        &self.sources
127    }
128
129    fn with_sources(mut self, sources: ScanSources) -> Self {
130        self.sources = sources;
131        self
132    }
133
134    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
135        self.args.n_rows = n_rows.into();
136        self
137    }
138
139    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
140        self.args.row_index = row_index.into();
141        self
142    }
143
144    fn rechunk(&self) -> bool {
145        self.args.rechunk
146    }
147
148    fn with_rechunk(mut self, toggle: bool) -> Self {
149        self.args.rechunk = toggle;
150        self
151    }
152
153    fn cloud_options(&self) -> Option<&CloudOptions> {
154        self.args.cloud_options.as_ref()
155    }
156
157    fn n_rows(&self) -> Option<usize> {
158        self.args.n_rows
159    }
160
161    fn row_index(&self) -> Option<&RowIndex> {
162        self.args.row_index.as_ref()
163    }
164}
165
166impl LazyFrame {
167    /// Create a LazyFrame directly from a parquet scan.
168    pub fn scan_parquet(path: PlPath, args: ScanArgsParquet) -> PolarsResult<Self> {
169        Self::scan_parquet_sources(ScanSources::Paths(Buffer::from_iter([path])), args)
170    }
171
172    /// Create a LazyFrame directly from a parquet scan.
173    pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
174        LazyParquetReader::new(args).with_sources(sources).finish()
175    }
176
177    /// Create a LazyFrame directly from a parquet scan.
178    pub fn scan_parquet_files(paths: Buffer<PlPath>, args: ScanArgsParquet) -> PolarsResult<Self> {
179        Self::scan_parquet_sources(ScanSources::Paths(paths), args)
180    }
181}