polars_lazy/scan/
parquet.rs

1use std::path::{Path, PathBuf};
2
3use polars_core::prelude::*;
4use polars_io::cloud::CloudOptions;
5use polars_io::parquet::read::ParallelStrategy;
6use polars_io::{HiveOptions, RowIndex};
7
8use crate::prelude::*;
9
10#[derive(Clone)]
11pub struct ScanArgsParquet {
12    pub n_rows: Option<usize>,
13    pub parallel: ParallelStrategy,
14    pub row_index: Option<RowIndex>,
15    pub cloud_options: Option<CloudOptions>,
16    pub hive_options: HiveOptions,
17    pub use_statistics: bool,
18    pub schema: Option<SchemaRef>,
19    pub low_memory: bool,
20    pub rechunk: bool,
21    pub cache: bool,
22    /// Expand path given via globbing rules.
23    pub glob: bool,
24    pub include_file_paths: Option<PlSmallStr>,
25    pub allow_missing_columns: bool,
26}
27
28impl Default for ScanArgsParquet {
29    fn default() -> Self {
30        Self {
31            n_rows: None,
32            parallel: Default::default(),
33            row_index: None,
34            cloud_options: None,
35            hive_options: Default::default(),
36            use_statistics: true,
37            schema: None,
38            rechunk: false,
39            low_memory: false,
40            cache: true,
41            glob: true,
42            include_file_paths: None,
43            allow_missing_columns: false,
44        }
45    }
46}
47
48#[derive(Clone)]
49struct LazyParquetReader {
50    args: ScanArgsParquet,
51    sources: ScanSources,
52}
53
54impl LazyParquetReader {
55    fn new(args: ScanArgsParquet) -> Self {
56        Self {
57            args,
58            sources: ScanSources::default(),
59        }
60    }
61}
62
63impl LazyFileListReader for LazyParquetReader {
64    /// Get the final [LazyFrame].
65    fn finish(self) -> PolarsResult<LazyFrame> {
66        let row_index = self.args.row_index;
67
68        let mut lf: LazyFrame = DslBuilder::scan_parquet(
69            self.sources,
70            self.args.n_rows,
71            self.args.cache,
72            self.args.parallel,
73            None,
74            self.args.rechunk,
75            self.args.low_memory,
76            self.args.cloud_options,
77            self.args.use_statistics,
78            self.args.schema,
79            self.args.hive_options,
80            self.args.glob,
81            self.args.include_file_paths,
82            self.args.allow_missing_columns,
83        )?
84        .build()
85        .into();
86
87        // It's a bit hacky, but this row_index function updates the schema.
88        if let Some(row_index) = row_index {
89            lf = lf.with_row_index(row_index.name.clone(), Some(row_index.offset))
90        }
91
92        Ok(lf)
93    }
94
95    fn glob(&self) -> bool {
96        self.args.glob
97    }
98
99    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
100        unreachable!();
101    }
102
103    fn sources(&self) -> &ScanSources {
104        &self.sources
105    }
106
107    fn with_sources(mut self, sources: ScanSources) -> Self {
108        self.sources = sources;
109        self
110    }
111
112    fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
113        self.args.n_rows = n_rows.into();
114        self
115    }
116
117    fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
118        self.args.row_index = row_index.into();
119        self
120    }
121
122    fn rechunk(&self) -> bool {
123        self.args.rechunk
124    }
125
126    fn with_rechunk(mut self, toggle: bool) -> Self {
127        self.args.rechunk = toggle;
128        self
129    }
130
131    fn cloud_options(&self) -> Option<&CloudOptions> {
132        self.args.cloud_options.as_ref()
133    }
134
135    fn n_rows(&self) -> Option<usize> {
136        self.args.n_rows
137    }
138
139    fn row_index(&self) -> Option<&RowIndex> {
140        self.args.row_index.as_ref()
141    }
142}
143
144impl LazyFrame {
145    /// Create a LazyFrame directly from a parquet scan.
146    pub fn scan_parquet(path: impl AsRef<Path>, args: ScanArgsParquet) -> PolarsResult<Self> {
147        Self::scan_parquet_sources(
148            ScanSources::Paths([path.as_ref().to_path_buf()].into()),
149            args,
150        )
151    }
152
153    /// Create a LazyFrame directly from a parquet scan.
154    pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
155        LazyParquetReader::new(args).with_sources(sources).finish()
156    }
157
158    /// Create a LazyFrame directly from a parquet scan.
159    pub fn scan_parquet_files(paths: Arc<[PathBuf]>, args: ScanArgsParquet) -> PolarsResult<Self> {
160        Self::scan_parquet_sources(ScanSources::Paths(paths), args)
161    }
162}