polars_lazy/scan/
file_list_reader.rs1use arrow::buffer::Buffer;
2use polars_core::prelude::*;
3use polars_io::RowIndex;
4use polars_io::cloud::CloudOptions;
5use polars_plan::prelude::UnionArgs;
6use polars_utils::plpath::PlPath;
7
8use crate::prelude::*;
9
10pub trait LazyFileListReader: Clone {
15 fn finish(self) -> PolarsResult<LazyFrame> {
17 if !self.glob() {
18 return self.finish_no_glob();
19 }
20
21 let ScanSources::Paths(paths) = self.sources() else {
22 unreachable!("opened-files or in-memory buffers should never be globbed");
23 };
24
25 let lfs = paths
26 .iter()
27 .map(|path| {
28 self.clone()
29 .with_n_rows(None)
31 .with_row_index(None)
33 .with_paths(Buffer::from_iter([path.clone()]))
34 .with_rechunk(false)
35 .finish_no_glob()
36 .map_err(|e| {
37 polars_err!(
38 ComputeError: "error while reading {}: {}", path.display(), e
39 )
40 })
41 })
42 .collect::<PolarsResult<Vec<_>>>()?;
43
44 polars_ensure!(
45 !lfs.is_empty(),
46 ComputeError: "no matching files found in {:?}", paths.iter().map(|x| x.to_str()).collect::<Vec<_>>()
47 );
48
49 let mut lf = self.concat_impl(lfs)?;
50 if let Some(n_rows) = self.n_rows() {
51 lf = lf.slice(0, n_rows as IdxSize)
52 };
53 if let Some(rc) = self.row_index() {
54 lf = lf.with_row_index(rc.name.clone(), Some(rc.offset))
55 };
56
57 Ok(lf)
58 }
59
60 fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
65 let args = UnionArgs {
66 rechunk: self.rechunk(),
67 parallel: true,
68 to_supertypes: false,
69 from_partitioned_ds: true,
70 ..Default::default()
71 };
72 concat_impl(&lfs, args)
73 }
74
75 fn finish_no_glob(self) -> PolarsResult<LazyFrame>;
80
81 fn glob(&self) -> bool {
82 true
83 }
84
85 fn sources(&self) -> &ScanSources;
87
88 #[must_use]
90 fn with_sources(self, source: ScanSources) -> Self;
91
92 #[must_use]
94 fn with_paths(self, paths: Buffer<PlPath>) -> Self {
95 self.with_sources(ScanSources::Paths(paths))
96 }
97
98 fn with_n_rows(self, n_rows: impl Into<Option<usize>>) -> Self;
100
101 fn with_row_index(self, row_index: impl Into<Option<RowIndex>>) -> Self;
103
104 fn rechunk(&self) -> bool;
106
107 #[must_use]
109 fn with_rechunk(self, toggle: bool) -> Self;
110
111 fn n_rows(&self) -> Option<usize>;
114
115 fn row_index(&self) -> Option<&RowIndex>;
117
118 fn cloud_options(&self) -> Option<&CloudOptions> {
120 None
121 }
122}