Skip to main content

polars_io/file_cache/
file_fetcher.rs

1use polars_core::runtime::ASYNC;
2use polars_error::{PolarsError, PolarsResult};
3use polars_utils::pl_path::PlRefPath;
4
5use super::metadata::FileVersion;
6use super::utils::last_modified_u64;
7use crate::cloud::PolarsObjectStore;
8
9pub trait FileFetcher: Send + Sync {
10    fn get_uri(&self) -> &PlRefPath;
11    fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata>;
12    /// Fetches the object to a `local_path`.
13    fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()>;
14    fn fetches_as_symlink(&self) -> bool;
15}
16
17pub struct RemoteMetadata {
18    pub size: u64,
19    pub(super) version: FileVersion,
20}
21
22/// A struct that fetches data from local disk and stores it into the `cache`.
23/// Mostly used for debugging, it only ever gets called if `POLARS_FORCE_ASYNC` is set.
24pub(super) struct LocalFileFetcher {
25    path: PlRefPath,
26}
27
28impl LocalFileFetcher {
29    pub(super) fn from_uri(uri: PlRefPath) -> Self {
30        debug_assert_eq!(
31            std::fs::canonicalize(uri.as_str())
32                .ok()
33                .and_then(|x| PlRefPath::try_from_pathbuf(x).ok())
34                .as_ref(),
35            Some(&uri),
36        );
37
38        Self { path: uri }
39    }
40}
41
42impl FileFetcher for LocalFileFetcher {
43    fn get_uri(&self) -> &PlRefPath {
44        &self.path
45    }
46
47    fn fetches_as_symlink(&self) -> bool {
48        #[cfg(target_family = "unix")]
49        {
50            true
51        }
52        #[cfg(not(target_family = "unix"))]
53        {
54            false
55        }
56    }
57
58    fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
59        let metadata = std::fs::metadata(&self.path).map_err(PolarsError::from)?;
60
61        Ok(RemoteMetadata {
62            size: metadata.len(),
63            version: FileVersion::Timestamp(last_modified_u64(&metadata)),
64        })
65    }
66
67    fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
68        #[cfg(target_family = "unix")]
69        {
70            std::os::unix::fs::symlink(&self.path, local_path).map_err(PolarsError::from)
71        }
72        #[cfg(not(target_family = "unix"))]
73        {
74            std::fs::copy(&self.path, local_path).map_err(PolarsError::from)?;
75            Ok(())
76        }
77    }
78}
79
80pub(super) struct CloudFileFetcher {
81    pub(super) uri: PlRefPath,
82    pub(super) cloud_path: object_store::path::Path,
83    pub(super) object_store: PolarsObjectStore,
84}
85
86impl FileFetcher for CloudFileFetcher {
87    fn get_uri(&self) -> &PlRefPath {
88        &self.uri
89    }
90
91    fn fetches_as_symlink(&self) -> bool {
92        false
93    }
94
95    fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
96        let metadata = ASYNC.block_in_place_on(self.object_store.head(&self.cloud_path))?;
97
98        Ok(RemoteMetadata {
99            size: metadata.size,
100            version: metadata
101                .e_tag
102                .map(|x| FileVersion::ETag(blake3::hash(x.as_bytes()).to_hex()[..32].to_string()))
103                .unwrap_or_else(|| {
104                    FileVersion::Timestamp(metadata.last_modified.timestamp_millis() as u64)
105                }),
106        })
107    }
108
109    fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
110        ASYNC.block_in_place_on(async {
111            let file = &mut tokio::fs::OpenOptions::new()
112                .write(true)
113                .truncate(true)
114                .open(local_path)
115                .await
116                .map_err(PolarsError::from)?;
117
118            self.object_store.download(&self.cloud_path, file).await
119        })
120    }
121}