polars_lazy/scan/file_list_reader.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
use std::path::PathBuf;
use std::sync::Arc;
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::RowIndex;
use polars_plan::prelude::UnionArgs;
use crate::prelude::*;
/// Reads [LazyFrame] from a filesystem or a cloud storage.
/// Supports glob patterns.
///
/// Use [LazyFileListReader::finish] to get the final [LazyFrame].
pub trait LazyFileListReader: Clone {
/// Get the final [LazyFrame].
fn finish(self) -> PolarsResult<LazyFrame> {
if !self.glob() {
return self.finish_no_glob();
}
let ScanSources::Paths(paths) = self.sources() else {
unreachable!("opened-files or in-memory buffers should never be globbed");
};
let lfs = paths
.iter()
.map(|path| {
self.clone()
// Each individual reader should not apply a row limit.
.with_n_rows(None)
// Each individual reader should not apply a row index.
.with_row_index(None)
.with_paths([path.clone()].into())
.with_rechunk(false)
.finish_no_glob()
.map_err(|e| {
polars_err!(
ComputeError: "error while reading {}: {}", path.display(), e
)
})
})
.collect::<PolarsResult<Vec<_>>>()?;
polars_ensure!(
!lfs.is_empty(),
ComputeError: "no matching files found in {:?}", paths.iter().map(|x| x.to_str().unwrap()).collect::<Vec<_>>()
);
let mut lf = self.concat_impl(lfs)?;
if let Some(n_rows) = self.n_rows() {
lf = lf.slice(0, n_rows as IdxSize)
};
if let Some(rc) = self.row_index() {
lf = lf.with_row_index(rc.name.clone(), Some(rc.offset))
};
Ok(lf)
}
/// Recommended concatenation of [LazyFrame]s from many input files.
///
/// This method should not take into consideration [LazyFileListReader::n_rows]
/// nor [LazyFileListReader::row_index].
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
let args = UnionArgs {
rechunk: self.rechunk(),
parallel: true,
to_supertypes: false,
from_partitioned_ds: true,
..Default::default()
};
concat_impl(&lfs, args)
}
/// Get the final [LazyFrame].
/// This method assumes, that path is *not* a glob.
///
/// It is recommended to always use [LazyFileListReader::finish] method.
fn finish_no_glob(self) -> PolarsResult<LazyFrame>;
fn glob(&self) -> bool {
true
}
/// Get the sources for this reader.
fn sources(&self) -> &ScanSources;
/// Set sources of the scanned files.
#[must_use]
fn with_sources(self, source: ScanSources) -> Self;
/// Set paths of the scanned files.
#[must_use]
fn with_paths(self, paths: Arc<[PathBuf]>) -> Self {
self.with_sources(ScanSources::Paths(paths))
}
/// Configure the row limit.
fn with_n_rows(self, n_rows: impl Into<Option<usize>>) -> Self;
/// Configure the row index.
fn with_row_index(self, row_index: impl Into<Option<RowIndex>>) -> Self;
/// Rechunk the memory to contiguous chunks when parsing is done.
fn rechunk(&self) -> bool;
/// Rechunk the memory to contiguous chunks when parsing is done.
#[must_use]
fn with_rechunk(self, toggle: bool) -> Self;
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
/// be guaranteed.
fn n_rows(&self) -> Option<usize>;
/// Add a row index column.
fn row_index(&self) -> Option<&RowIndex>;
/// [CloudOptions] used to list files.
fn cloud_options(&self) -> Option<&CloudOptions> {
None
}
}