polars_io/file_cache/
utils.rs1use std::sync::{Arc, LazyLock};
2use std::time::UNIX_EPOCH;
3
4use polars_error::{PolarsError, PolarsResult};
5use polars_utils::pl_path::{CloudScheme, PlRefPath};
6
7use super::cache::{FILE_CACHE, get_env_file_cache_ttl};
8use super::entry::FileCacheEntry;
9use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
10use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};
11use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init};
12
13pub static FILE_CACHE_PREFIX: LazyLock<PlRefPath> = LazyLock::new(|| {
14 let path = PlRefPath::try_from_path(&POLARS_TEMP_DIR_BASE_PATH.join("file-cache/")).unwrap();
15
16 if let Err(err) = ensure_directory_init(path.as_ref()) {
17 panic!(
18 "failed to create file cache directory: path = {}, err = {}",
19 path, err
20 );
21 }
22
23 path
24});
25
26pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
27 u64::try_from(
28 metadata
29 .modified()
30 .unwrap()
31 .duration_since(UNIX_EPOCH)
32 .unwrap()
33 .as_millis(),
34 )
35 .unwrap()
36}
37
38pub(super) fn update_last_accessed(file: &std::fs::File) {
39 let file_metadata = file.metadata().unwrap();
40
41 if let Err(e) = file.set_times(
42 std::fs::FileTimes::new()
43 .set_modified(file_metadata.modified().unwrap())
44 .set_accessed(std::time::SystemTime::now()),
45 ) {
46 panic!("failed to update file last accessed time: {e}");
47 }
48}
49
50pub async fn init_entries_from_uri_list(
51 uri_list: impl ExactSizeIterator<Item = PlRefPath> + Send + 'static,
52 cloud_options: Option<&CloudOptions>,
53) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
54 init_entries_from_uri_list_impl(Box::new(uri_list), cloud_options).await
55}
56
57async fn init_entries_from_uri_list_impl(
58 uri_list: Box<dyn ExactSizeIterator<Item = PlRefPath> + Send + 'static>,
59 cloud_options: Option<&CloudOptions>,
60) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
61 #[allow(clippy::len_zero)]
62 if uri_list.len() == 0 {
63 return Ok(Default::default());
64 }
65
66 let mut uri_list = uri_list.peekable();
67
68 let first_uri = uri_list.peek().unwrap().clone();
69
70 let file_cache_ttl = cloud_options
71 .map(|x| x.file_cache_ttl)
72 .unwrap_or_else(get_env_file_cache_ttl);
73
74 if first_uri.has_scheme() {
75 let shared_object_store = if !matches!(
76 first_uri.scheme(),
77 Some(CloudScheme::Http | CloudScheme::Https) ) {
79 let (_, object_store) = build_object_store(first_uri, cloud_options, false).await?;
80 Some(object_store)
81 } else {
82 None
83 };
84
85 futures::future::try_join_all(uri_list.map(|uri| {
86 let shared_object_store = shared_object_store.clone();
87
88 async move {
89 let object_store = if let Some(shared_object_store) = shared_object_store.clone() {
90 shared_object_store
91 } else {
92 let (_, object_store) =
93 build_object_store(uri.clone(), cloud_options, false).await?;
94 object_store
95 };
96
97 FILE_CACHE.init_entry(
98 uri.clone(),
99 &|| {
100 let CloudLocation { prefix, .. } =
101 CloudLocation::new(uri.clone(), false).unwrap();
102 let cloud_path = object_path_from_str(&prefix)?;
103 let object_store = object_store.clone();
104
105 Ok(Arc::new(CloudFileFetcher {
106 uri: uri.clone(),
107 object_store,
108 cloud_path,
109 }))
110 },
111 file_cache_ttl,
112 )
113 }
114 }))
115 .await
116 } else {
117 let mut out = Vec::with_capacity(uri_list.len());
118 for uri in uri_list {
119 let uri = tokio::fs::canonicalize(uri.as_str()).await.map_err(|err| {
120 let msg = Some(format!("{}: {}", err, uri).into());
121 PolarsError::IO {
122 error: err.into(),
123 msg,
124 }
125 })?;
126 let uri = PlRefPath::try_from_pathbuf(uri)?;
127
128 out.push(FILE_CACHE.init_entry(
129 uri.clone(),
130 &|| Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),
131 file_cache_ttl,
132 )?)
133 }
134 Ok(out)
135 }
136}