polars_io/file_cache/
utils.rs1use std::path::Path;
2use std::sync::{Arc, LazyLock};
3use std::time::UNIX_EPOCH;
4
5use polars_error::{PolarsError, PolarsResult};
6use polars_utils::plpath::{CloudScheme, PlPathRef};
7
8use super::cache::{FILE_CACHE, get_env_file_cache_ttl};
9use super::entry::FileCacheEntry;
10use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
11use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};
12use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init};
13use crate::pl_async;
14
15pub static FILE_CACHE_PREFIX: LazyLock<Box<Path>> = LazyLock::new(|| {
16 let path = POLARS_TEMP_DIR_BASE_PATH
17 .join("file-cache/")
18 .into_boxed_path();
19
20 if let Err(err) = ensure_directory_init(path.as_ref()) {
21 panic!(
22 "failed to create file cache directory: path = {}, err = {}",
23 path.to_str().unwrap(),
24 err
25 );
26 }
27
28 path
29});
30
31pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
32 u64::try_from(
33 metadata
34 .modified()
35 .unwrap()
36 .duration_since(UNIX_EPOCH)
37 .unwrap()
38 .as_millis(),
39 )
40 .unwrap()
41}
42
43pub(super) fn update_last_accessed(file: &std::fs::File) {
44 let file_metadata = file.metadata().unwrap();
45
46 if let Err(e) = file.set_times(
47 std::fs::FileTimes::new()
48 .set_modified(file_metadata.modified().unwrap())
49 .set_accessed(std::time::SystemTime::now()),
50 ) {
51 panic!("failed to update file last accessed time: {e}");
52 }
53}
54
55pub fn init_entries_from_uri_list(
56 mut uri_list: impl ExactSizeIterator<Item = Arc<str>>,
57 cloud_options: Option<&CloudOptions>,
58) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
59 init_entries_from_uri_list_impl(&mut uri_list, cloud_options)
60}
61
62fn init_entries_from_uri_list_impl(
63 uri_list: &mut dyn ExactSizeIterator<Item = Arc<str>>,
64 cloud_options: Option<&CloudOptions>,
65) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
66 #[expect(clippy::len_zero)]
67 if uri_list.len() == 0 {
68 return Ok(Default::default());
69 }
70
71 let mut uri_list = uri_list.peekable();
72
73 let first_uri = PlPathRef::new(uri_list.peek().unwrap().as_ref());
74
75 let file_cache_ttl = cloud_options
76 .map(|x| x.file_cache_ttl)
77 .unwrap_or_else(get_env_file_cache_ttl);
78
79 if first_uri.is_cloud_url() {
80 let shared_object_store = (!matches!(
81 first_uri.scheme(),
82 Some(CloudScheme::Http | CloudScheme::Https) ))
84 .then(|| {
85 pl_async::get_runtime().block_in_place_on(async {
86 let (_, object_store) =
87 build_object_store(first_uri.to_str(), cloud_options, false).await?;
88
89 PolarsResult::Ok(object_store)
90 })
91 })
92 .transpose()?;
93
94 pl_async::get_runtime().block_in_place_on(async {
95 futures::future::try_join_all(uri_list.map(|uri| {
96 let shared_object_store = shared_object_store.clone();
97
98 async move {
99 let object_store =
100 if let Some(shared_object_store) = shared_object_store.clone() {
101 shared_object_store
102 } else {
103 let (_, object_store) =
104 build_object_store(&uri, cloud_options, false).await?;
105 object_store
106 };
107
108 FILE_CACHE.init_entry(
109 uri.clone(),
110 &|| {
111 let CloudLocation { prefix, .. } =
112 CloudLocation::new(uri.as_ref(), false).unwrap();
113 let cloud_path = object_path_from_str(&prefix)?;
114
115 let uri = uri.clone();
116 let object_store = object_store.clone();
117
118 Ok(Arc::new(CloudFileFetcher {
119 uri,
120 object_store,
121 cloud_path,
122 }))
123 },
124 file_cache_ttl,
125 )
126 }
127 }))
128 .await
129 })
130 } else {
131 uri_list
132 .map(|uri| {
133 let uri = std::fs::canonicalize(uri.as_ref()).map_err(|err| {
134 let msg = Some(format!("{}: {}", err, uri.as_ref()).into());
135 PolarsError::IO {
136 error: err.into(),
137 msg,
138 }
139 })?;
140 let uri = Arc::<str>::from(uri.to_str().unwrap());
141
142 FILE_CACHE.init_entry(
143 uri.clone(),
144 &|| Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),
145 file_cache_ttl,
146 )
147 })
148 .collect::<PolarsResult<Vec<_>>>()
149 }
150}