polars_lazy/scan/
file_list_reader.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use polars_core::prelude::*;
5use polars_io::RowIndex;
6use polars_io::cloud::CloudOptions;
7use polars_plan::prelude::UnionArgs;
8
9use crate::prelude::*;
10
11/// Reads [LazyFrame] from a filesystem or a cloud storage.
12/// Supports glob patterns.
13///
14/// Use [LazyFileListReader::finish] to get the final [LazyFrame].
15pub trait LazyFileListReader: Clone {
16    /// Get the final [LazyFrame].
17    fn finish(self) -> PolarsResult<LazyFrame> {
18        if !self.glob() {
19            return self.finish_no_glob();
20        }
21
22        let ScanSources::Paths(paths) = self.sources() else {
23            unreachable!("opened-files or in-memory buffers should never be globbed");
24        };
25
26        let lfs = paths
27            .iter()
28            .map(|path| {
29                self.clone()
30                    // Each individual reader should not apply a row limit.
31                    .with_n_rows(None)
32                    // Each individual reader should not apply a row index.
33                    .with_row_index(None)
34                    .with_paths([path.clone()].into())
35                    .with_rechunk(false)
36                    .finish_no_glob()
37                    .map_err(|e| {
38                        polars_err!(
39                            ComputeError: "error while reading {}: {}", path.display(), e
40                        )
41                    })
42            })
43            .collect::<PolarsResult<Vec<_>>>()?;
44
45        polars_ensure!(
46            !lfs.is_empty(),
47            ComputeError: "no matching files found in {:?}", paths.iter().map(|x| x.to_str().unwrap()).collect::<Vec<_>>()
48        );
49
50        let mut lf = self.concat_impl(lfs)?;
51        if let Some(n_rows) = self.n_rows() {
52            lf = lf.slice(0, n_rows as IdxSize)
53        };
54        if let Some(rc) = self.row_index() {
55            lf = lf.with_row_index(rc.name.clone(), Some(rc.offset))
56        };
57
58        Ok(lf)
59    }
60
61    /// Recommended concatenation of [LazyFrame]s from many input files.
62    ///
63    /// This method should not take into consideration [LazyFileListReader::n_rows]
64    /// nor [LazyFileListReader::row_index].
65    fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
66        let args = UnionArgs {
67            rechunk: self.rechunk(),
68            parallel: true,
69            to_supertypes: false,
70            from_partitioned_ds: true,
71            ..Default::default()
72        };
73        concat_impl(&lfs, args)
74    }
75
76    /// Get the final [LazyFrame].
77    /// This method assumes, that path is *not* a glob.
78    ///
79    /// It is recommended to always use [LazyFileListReader::finish] method.
80    fn finish_no_glob(self) -> PolarsResult<LazyFrame>;
81
82    fn glob(&self) -> bool {
83        true
84    }
85
86    /// Get the sources for this reader.
87    fn sources(&self) -> &ScanSources;
88
89    /// Set sources of the scanned files.
90    #[must_use]
91    fn with_sources(self, source: ScanSources) -> Self;
92
93    /// Set paths of the scanned files.
94    #[must_use]
95    fn with_paths(self, paths: Arc<[PathBuf]>) -> Self {
96        self.with_sources(ScanSources::Paths(paths))
97    }
98
99    /// Configure the row limit.
100    fn with_n_rows(self, n_rows: impl Into<Option<usize>>) -> Self;
101
102    /// Configure the row index.
103    fn with_row_index(self, row_index: impl Into<Option<RowIndex>>) -> Self;
104
105    /// Rechunk the memory to contiguous chunks when parsing is done.
106    fn rechunk(&self) -> bool;
107
108    /// Rechunk the memory to contiguous chunks when parsing is done.
109    #[must_use]
110    fn with_rechunk(self, toggle: bool) -> Self;
111
112    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
113    /// be guaranteed.
114    fn n_rows(&self) -> Option<usize>;
115
116    /// Add a row index column.
117    fn row_index(&self) -> Option<&RowIndex>;
118
119    /// [CloudOptions] used to list files.
120    fn cloud_options(&self) -> Option<&CloudOptions> {
121        None
122    }
123}