polars_lazy/scan/
parquet.rs

1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::parquet::read::ParallelStrategy;
6use polars_io::prelude::ParquetOptions;
7use polars_io::{HiveOptions, RowIndex};
8use polars_utils::slice_enum::Slice;
9
10use crate::prelude::*;
11
12#[derive(Clone)]
13pub struct ScanArgsParquet {
14    pub n_rows: Option<usize>,
15    pub parallel: ParallelStrategy,
16    pub row_index: Option<RowIndex>,
17    pub cloud_options: Option<CloudOptions>,
18    pub hive_options: HiveOptions,
19    pub use_statistics: bool,
20    pub schema: Option<SchemaRef>,
21    pub low_memory: bool,
22    pub rechunk: bool,
23    pub cache: bool,
24    /// Expand path given via globbing rules.
25    pub glob: bool,
26    pub include_file_paths: Option<PlSmallStr>,
27    pub allow_missing_columns: bool,
28}
29
30impl Default for ScanArgsParquet {
31    fn default() -> Self {
32        Self {
33            n_rows: None,
34            parallel: Default::default(),
35            row_index: None,
36            cloud_options: None,
37            hive_options: Default::default(),
38            use_statistics: true,
39            schema: None,
40            rechunk: false,
41            low_memory: false,
42            cache: true,
43            glob: true,
44            include_file_paths: None,
45            allow_missing_columns: false,
46        }
47    }
48}
49
50#[derive(Clone)]
51struct LazyParquetReader {
52    args: ScanArgsParquet,
53    sources: ScanSources,
54}
55
56impl LazyParquetReader {
57    fn new(args: ScanArgsParquet) -> Self {
58        Self {
59            args,
60            sources: ScanSources::default(),
61        }
62    }
63}
64
65impl LazyFileListReader for LazyParquetReader {
66    /// Get the final [LazyFrame].
67    fn finish(self) -> PolarsResult<LazyFrame> {
68        let parquet_options = ParquetOptions {
69            schema: self.args.schema,
70            parallel: self.args.parallel,
71            low_memory: self.args.low_memory,
72            use_statistics: self.args.use_statistics,
73        };
74
75        let unified_scan_args = UnifiedScanArgs {
76            schema: None,
77            cloud_options: self.args.cloud_options,
78            hive_options: self.args.hive_options,
79            rechunk: self.args.rechunk,
80            cache: self.args.cache,
81            glob: self.args.glob,
82            projection: None,
83            // Note: We call `with_row_index()` on the LazyFrame below
84            row_index: None,
85            pre_slice: self
86                .args
87                .n_rows
88                .map(|len| Slice::Positive { offset: 0, len }),
89            cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
90            missing_columns_policy: if self.args.allow_missing_columns {
91                MissingColumnsPolicy::Insert
92            } else {
93                MissingColumnsPolicy::Raise
94            },
95            extra_columns_policy: ExtraColumnsPolicy::Raise,
96            include_file_paths: self.args.include_file_paths,
97            deletion_files: Default::default(),
98        };
99
100        let mut lf: LazyFrame =
101            DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
102                .build()
103                .into();
104
105        // It's a bit hacky, but this row_index function updates the schema.
106        if let Some(row_index) = self.args.row_index {
107            lf = lf.with_row_index(row_index.name, Some(row_index.offset))
108        }
109
110        Ok(lf)
111    }
112
113    fn glob(&self) -> bool {
114        self.args.glob
115    }
116
117    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
118        unreachable!();
119    }
120
121    fn sources(&self) -> &ScanSources {
122        &self.sources
123    }
124
125    fn with_sources(mut self, sources: ScanSources) -> Self {
126        self.sources = sources;
127        self
128    }
129
130    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
131        self.args.n_rows = n_rows.into();
132        self
133    }
134
135    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
136        self.args.row_index = row_index.into();
137        self
138    }
139
140    fn rechunk(&self) -> bool {
141        self.args.rechunk
142    }
143
144    fn with_rechunk(mut self, toggle: bool) -> Self {
145        self.args.rechunk = toggle;
146        self
147    }
148
149    fn cloud_options(&self) -> Option<&CloudOptions> {
150        self.args.cloud_options.as_ref()
151    }
152
153    fn n_rows(&self) -> Option<usize> {
154        self.args.n_rows
155    }
156
157    fn row_index(&self) -> Option<&RowIndex> {
158        self.args.row_index.as_ref()
159    }
160}
161
162impl LazyFrame {
163    /// Create a LazyFrame directly from a parquet scan.
164    pub fn scan_parquet(path: impl AsRef<Path>, args: ScanArgsParquet) -> PolarsResult<Self> {
165        Self::scan_parquet_sources(
166            ScanSources::Paths([path.as_ref().to_path_buf()].into()),
167            args,
168        )
169    }
170
171    /// Create a LazyFrame directly from a parquet scan.
172    pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
173        LazyParquetReader::new(args).with_sources(sources).finish()
174    }
175
176    /// Create a LazyFrame directly from a parquet scan.
177    pub fn scan_parquet_files(paths: Arc<[PathBuf]>, args: ScanArgsParquet) -> PolarsResult<Self> {
178        Self::scan_parquet_sources(ScanSources::Paths(paths), args)
179    }
180}