1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::path::PathBuf;
use std::sync::Arc;

use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::RowIndex;
use polars_plan::prelude::UnionArgs;

use crate::prelude::*;

/// Reads [LazyFrame] from a filesystem or a cloud storage.
/// Supports glob patterns.
///
/// Use [LazyFileListReader::finish] to get the final [LazyFrame].
pub trait LazyFileListReader: Clone {
    /// Get the final [LazyFrame].
    fn finish(self) -> PolarsResult<LazyFrame> {
        if !self.glob() {
            return self.finish_no_glob();
        }

        let ScanSources::Paths(paths) = self.sources() else {
            unreachable!("opened-files or in-memory buffers should never be globbed");
        };

        let lfs = paths
            .iter()
            .map(|path| {
                self.clone()
                    // Each individual reader should not apply a row limit.
                    .with_n_rows(None)
                    // Each individual reader should not apply a row index.
                    .with_row_index(None)
                    .with_paths([path.clone()].into())
                    .with_rechunk(false)
                    .finish_no_glob()
                    .map_err(|e| {
                        polars_err!(
                            ComputeError: "error while reading {}: {}", path.display(), e
                        )
                    })
            })
            .collect::<PolarsResult<Vec<_>>>()?;

        polars_ensure!(
            !lfs.is_empty(),
            ComputeError: "no matching files found in {:?}", paths.iter().map(|x| x.to_str().unwrap()).collect::<Vec<_>>()
        );

        let mut lf = self.concat_impl(lfs)?;
        if let Some(n_rows) = self.n_rows() {
            lf = lf.slice(0, n_rows as IdxSize)
        };
        if let Some(rc) = self.row_index() {
            lf = lf.with_row_index(rc.name.clone(), Some(rc.offset))
        };

        Ok(lf)
    }

    /// Recommended concatenation of [LazyFrame]s from many input files.
    ///
    /// This method should not take into consideration [LazyFileListReader::n_rows]
    /// nor [LazyFileListReader::row_index].
    fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
        let args = UnionArgs {
            rechunk: self.rechunk(),
            parallel: true,
            to_supertypes: false,
            from_partitioned_ds: true,
            ..Default::default()
        };
        concat_impl(&lfs, args)
    }

    /// Get the final [LazyFrame].
    /// This method assumes, that path is *not* a glob.
    ///
    /// It is recommended to always use [LazyFileListReader::finish] method.
    fn finish_no_glob(self) -> PolarsResult<LazyFrame>;

    fn glob(&self) -> bool {
        true
    }

    /// Get the sources for this reader.
    fn sources(&self) -> &ScanSources;

    /// Set sources of the scanned files.
    #[must_use]
    fn with_sources(self, source: ScanSources) -> Self;

    /// Set paths of the scanned files.
    #[must_use]
    fn with_paths(self, paths: Arc<[PathBuf]>) -> Self {
        self.with_sources(ScanSources::Paths(paths))
    }

    /// Configure the row limit.
    fn with_n_rows(self, n_rows: impl Into<Option<usize>>) -> Self;

    /// Configure the row index.
    fn with_row_index(self, row_index: impl Into<Option<RowIndex>>) -> Self;

    /// Rechunk the memory to contiguous chunks when parsing is done.
    fn rechunk(&self) -> bool;

    /// Rechunk the memory to contiguous chunks when parsing is done.
    #[must_use]
    fn with_rechunk(self, toggle: bool) -> Self;

    /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
    /// be guaranteed.
    fn n_rows(&self) -> Option<usize>;

    /// Add a row index column.
    fn row_index(&self) -> Option<&RowIndex>;

    /// [CloudOptions] used to list files.
    fn cloud_options(&self) -> Option<&CloudOptions> {
        None
    }
}