polars_io/file_cache/
file_fetcher.rs1use polars_core::runtime::ASYNC;
2use polars_error::{PolarsError, PolarsResult};
3use polars_utils::pl_path::PlRefPath;
4
5use super::metadata::FileVersion;
6use super::utils::last_modified_u64;
7use crate::cloud::PolarsObjectStore;
8
9pub trait FileFetcher: Send + Sync {
10 fn get_uri(&self) -> &PlRefPath;
11 fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata>;
12 fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()>;
14 fn fetches_as_symlink(&self) -> bool;
15}
16
17pub struct RemoteMetadata {
18 pub size: u64,
19 pub(super) version: FileVersion,
20}
21
22pub(super) struct LocalFileFetcher {
25 path: PlRefPath,
26}
27
28impl LocalFileFetcher {
29 pub(super) fn from_uri(uri: PlRefPath) -> Self {
30 debug_assert_eq!(
31 std::fs::canonicalize(uri.as_str())
32 .ok()
33 .and_then(|x| PlRefPath::try_from_pathbuf(x).ok())
34 .as_ref(),
35 Some(&uri),
36 );
37
38 Self { path: uri }
39 }
40}
41
42impl FileFetcher for LocalFileFetcher {
43 fn get_uri(&self) -> &PlRefPath {
44 &self.path
45 }
46
47 fn fetches_as_symlink(&self) -> bool {
48 #[cfg(target_family = "unix")]
49 {
50 true
51 }
52 #[cfg(not(target_family = "unix"))]
53 {
54 false
55 }
56 }
57
58 fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
59 let metadata = std::fs::metadata(&self.path).map_err(PolarsError::from)?;
60
61 Ok(RemoteMetadata {
62 size: metadata.len(),
63 version: FileVersion::Timestamp(last_modified_u64(&metadata)),
64 })
65 }
66
67 fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
68 #[cfg(target_family = "unix")]
69 {
70 std::os::unix::fs::symlink(&self.path, local_path).map_err(PolarsError::from)
71 }
72 #[cfg(not(target_family = "unix"))]
73 {
74 std::fs::copy(&self.path, local_path).map_err(PolarsError::from)?;
75 Ok(())
76 }
77 }
78}
79
80pub(super) struct CloudFileFetcher {
81 pub(super) uri: PlRefPath,
82 pub(super) cloud_path: object_store::path::Path,
83 pub(super) object_store: PolarsObjectStore,
84}
85
86impl FileFetcher for CloudFileFetcher {
87 fn get_uri(&self) -> &PlRefPath {
88 &self.uri
89 }
90
91 fn fetches_as_symlink(&self) -> bool {
92 false
93 }
94
95 fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
96 let metadata = ASYNC.block_in_place_on(self.object_store.head(&self.cloud_path))?;
97
98 Ok(RemoteMetadata {
99 size: metadata.size,
100 version: metadata
101 .e_tag
102 .map(|x| FileVersion::ETag(blake3::hash(x.as_bytes()).to_hex()[..32].to_string()))
103 .unwrap_or_else(|| {
104 FileVersion::Timestamp(metadata.last_modified.timestamp_millis() as u64)
105 }),
106 })
107 }
108
109 fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
110 ASYNC.block_in_place_on(async {
111 let file = &mut tokio::fs::OpenOptions::new()
112 .write(true)
113 .truncate(true)
114 .open(local_path)
115 .await
116 .map_err(PolarsError::from)?;
117
118 self.object_store.download(&self.cloud_path, file).await
119 })
120 }
121}