polars_lazy/scan/
parquet.rs

1use polars_core::prelude::*;
2use polars_io::cloud::CloudOptions;
3use polars_io::parquet::read::ParallelStrategy;
4use polars_io::prelude::ParquetOptions;
5use polars_io::{HiveOptions, RowIndex};
6use polars_utils::plpath::PlPath;
7use polars_utils::slice_enum::Slice;
8
9use crate::prelude::*;
10
11#[derive(Clone)]
12pub struct ScanArgsParquet {
13    pub n_rows: Option<usize>,
14    pub parallel: ParallelStrategy,
15    pub row_index: Option<RowIndex>,
16    pub cloud_options: Option<CloudOptions>,
17    pub hive_options: HiveOptions,
18    pub use_statistics: bool,
19    pub schema: Option<SchemaRef>,
20    pub low_memory: bool,
21    pub rechunk: bool,
22    pub cache: bool,
23    /// Expand path given via globbing rules.
24    pub glob: bool,
25    pub include_file_paths: Option<PlSmallStr>,
26    pub allow_missing_columns: bool,
27}
28
29impl Default for ScanArgsParquet {
30    fn default() -> Self {
31        Self {
32            n_rows: None,
33            parallel: Default::default(),
34            row_index: None,
35            cloud_options: None,
36            hive_options: Default::default(),
37            use_statistics: true,
38            schema: None,
39            rechunk: false,
40            low_memory: false,
41            cache: true,
42            glob: true,
43            include_file_paths: None,
44            allow_missing_columns: false,
45        }
46    }
47}
48
49#[derive(Clone)]
50struct LazyParquetReader {
51    args: ScanArgsParquet,
52    sources: ScanSources,
53}
54
55impl LazyParquetReader {
56    fn new(args: ScanArgsParquet) -> Self {
57        Self {
58            args,
59            sources: ScanSources::default(),
60        }
61    }
62}
63
64impl LazyFileListReader for LazyParquetReader {
65    /// Get the final [LazyFrame].
66    fn finish(self) -> PolarsResult<LazyFrame> {
67        let parquet_options = ParquetOptions {
68            schema: self.args.schema,
69            parallel: self.args.parallel,
70            low_memory: self.args.low_memory,
71            use_statistics: self.args.use_statistics,
72        };
73
74        let unified_scan_args = UnifiedScanArgs {
75            schema: None,
76            cloud_options: self.args.cloud_options,
77            hive_options: self.args.hive_options,
78            rechunk: self.args.rechunk,
79            cache: self.args.cache,
80            glob: self.args.glob,
81            projection: None,
82            column_mapping: None,
83            default_values: None,
84            // Note: We call `with_row_index()` on the LazyFrame below
85            row_index: None,
86            pre_slice: self
87                .args
88                .n_rows
89                .map(|len| Slice::Positive { offset: 0, len }),
90            cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
91            missing_columns_policy: if self.args.allow_missing_columns {
92                MissingColumnsPolicy::Insert
93            } else {
94                MissingColumnsPolicy::Raise
95            },
96            extra_columns_policy: ExtraColumnsPolicy::Raise,
97            include_file_paths: self.args.include_file_paths,
98            deletion_files: None,
99        };
100
101        let mut lf: LazyFrame =
102            DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
103                .build()
104                .into();
105
106        // It's a bit hacky, but this row_index function updates the schema.
107        if let Some(row_index) = self.args.row_index {
108            lf = lf.with_row_index(row_index.name, Some(row_index.offset))
109        }
110
111        Ok(lf)
112    }
113
114    fn glob(&self) -> bool {
115        self.args.glob
116    }
117
118    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
119        unreachable!();
120    }
121
122    fn sources(&self) -> &ScanSources {
123        &self.sources
124    }
125
126    fn with_sources(mut self, sources: ScanSources) -> Self {
127        self.sources = sources;
128        self
129    }
130
131    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
132        self.args.n_rows = n_rows.into();
133        self
134    }
135
136    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
137        self.args.row_index = row_index.into();
138        self
139    }
140
141    fn rechunk(&self) -> bool {
142        self.args.rechunk
143    }
144
145    fn with_rechunk(mut self, toggle: bool) -> Self {
146        self.args.rechunk = toggle;
147        self
148    }
149
150    fn cloud_options(&self) -> Option<&CloudOptions> {
151        self.args.cloud_options.as_ref()
152    }
153
154    fn n_rows(&self) -> Option<usize> {
155        self.args.n_rows
156    }
157
158    fn row_index(&self) -> Option<&RowIndex> {
159        self.args.row_index.as_ref()
160    }
161}
162
163impl LazyFrame {
164    /// Create a LazyFrame directly from a parquet scan.
165    pub fn scan_parquet(path: PlPath, args: ScanArgsParquet) -> PolarsResult<Self> {
166        Self::scan_parquet_sources(ScanSources::Paths([path].into()), args)
167    }
168
169    /// Create a LazyFrame directly from a parquet scan.
170    pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
171        LazyParquetReader::new(args).with_sources(sources).finish()
172    }
173
174    /// Create a LazyFrame directly from a parquet scan.
175    pub fn scan_parquet_files(paths: Arc<[PlPath]>, args: ScanArgsParquet) -> PolarsResult<Self> {
176        Self::scan_parquet_sources(ScanSources::Paths(paths), args)
177    }
178}