polars_lazy/scan/
file_list_reader.rs

1use arrow::buffer::Buffer;
2use polars_core::prelude::*;
3use polars_io::RowIndex;
4use polars_io::cloud::CloudOptions;
5use polars_plan::prelude::UnionArgs;
6use polars_utils::plpath::PlPath;
7
8use crate::prelude::*;
9
10/// Reads [LazyFrame] from a filesystem or a cloud storage.
11/// Supports glob patterns.
12///
13/// Use [LazyFileListReader::finish] to get the final [LazyFrame].
14pub trait LazyFileListReader: Clone {
15    /// Get the final [LazyFrame].
16    fn finish(self) -> PolarsResult<LazyFrame> {
17        if !self.glob() {
18            return self.finish_no_glob();
19        }
20
21        let ScanSources::Paths(paths) = self.sources() else {
22            unreachable!("opened-files or in-memory buffers should never be globbed");
23        };
24
25        let lfs = paths
26            .iter()
27            .map(|path| {
28                self.clone()
29                    // Each individual reader should not apply a row limit.
30                    .with_n_rows(None)
31                    // Each individual reader should not apply a row index.
32                    .with_row_index(None)
33                    .with_paths(Buffer::from_iter([path.clone()]))
34                    .with_rechunk(false)
35                    .finish_no_glob()
36                    .map_err(|e| {
37                        polars_err!(
38                            ComputeError: "error while reading {}: {}", path.display(), e
39                        )
40                    })
41            })
42            .collect::<PolarsResult<Vec<_>>>()?;
43
44        polars_ensure!(
45            !lfs.is_empty(),
46            ComputeError: "no matching files found in {:?}", paths.iter().map(|x| x.to_str()).collect::<Vec<_>>()
47        );
48
49        let mut lf = self.concat_impl(lfs)?;
50        if let Some(n_rows) = self.n_rows() {
51            lf = lf.slice(0, n_rows as IdxSize)
52        };
53        if let Some(rc) = self.row_index() {
54            lf = lf.with_row_index(rc.name.clone(), Some(rc.offset))
55        };
56
57        Ok(lf)
58    }
59
60    /// Recommended concatenation of [LazyFrame]s from many input files.
61    ///
62    /// This method should not take into consideration [LazyFileListReader::n_rows]
63    /// nor [LazyFileListReader::row_index].
64    fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
65        let args = UnionArgs {
66            rechunk: self.rechunk(),
67            parallel: true,
68            to_supertypes: false,
69            from_partitioned_ds: true,
70            ..Default::default()
71        };
72        concat_impl(&lfs, args)
73    }
74
75    /// Get the final [LazyFrame].
76    /// This method assumes, that path is *not* a glob.
77    ///
78    /// It is recommended to always use [LazyFileListReader::finish] method.
79    fn finish_no_glob(self) -> PolarsResult<LazyFrame>;
80
81    fn glob(&self) -> bool {
82        true
83    }
84
85    /// Get the sources for this reader.
86    fn sources(&self) -> &ScanSources;
87
88    /// Set sources of the scanned files.
89    #[must_use]
90    fn with_sources(self, source: ScanSources) -> Self;
91
92    /// Set paths of the scanned files.
93    #[must_use]
94    fn with_paths(self, paths: Buffer<PlPath>) -> Self {
95        self.with_sources(ScanSources::Paths(paths))
96    }
97
98    /// Configure the row limit.
99    fn with_n_rows(self, n_rows: impl Into<Option<usize>>) -> Self;
100
101    /// Configure the row index.
102    fn with_row_index(self, row_index: impl Into<Option<RowIndex>>) -> Self;
103
104    /// Rechunk the memory to contiguous chunks when parsing is done.
105    fn rechunk(&self) -> bool;
106
107    /// Rechunk the memory to contiguous chunks when parsing is done.
108    #[must_use]
109    fn with_rechunk(self, toggle: bool) -> Self;
110
111    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
112    /// be guaranteed.
113    fn n_rows(&self) -> Option<usize>;
114
115    /// Add a row index column.
116    fn row_index(&self) -> Option<&RowIndex>;
117
118    /// [CloudOptions] used to list files.
119    fn cloud_options(&self) -> Option<&CloudOptions> {
120        None
121    }
122}