polars_io/file_cache/
utils.rs
1use std::path::Path;
2use std::sync::{Arc, LazyLock};
3use std::time::UNIX_EPOCH;
4
5use polars_error::{PolarsError, PolarsResult};
6
7use super::cache::{FILE_CACHE, get_env_file_cache_ttl};
8use super::entry::FileCacheEntry;
9use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
10use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};
11use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init, is_cloud_url};
12use crate::pl_async;
13
14pub static FILE_CACHE_PREFIX: LazyLock<Box<Path>> = LazyLock::new(|| {
15 let path = POLARS_TEMP_DIR_BASE_PATH
16 .join("file-cache/")
17 .into_boxed_path();
18
19 if let Err(err) = ensure_directory_init(path.as_ref()) {
20 panic!(
21 "failed to create file cache directory: path = {}, err = {}",
22 path.to_str().unwrap(),
23 err
24 );
25 }
26
27 path
28});
29
30pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
31 metadata
32 .modified()
33 .unwrap()
34 .duration_since(UNIX_EPOCH)
35 .unwrap()
36 .as_millis() as u64
37}
38
39pub(super) fn update_last_accessed(file: &std::fs::File) {
40 let file_metadata = file.metadata().unwrap();
41
42 if let Err(e) = file.set_times(
43 std::fs::FileTimes::new()
44 .set_modified(file_metadata.modified().unwrap())
45 .set_accessed(std::time::SystemTime::now()),
46 ) {
47 panic!("failed to update file last accessed time: {}", e);
48 }
49}
50
51pub fn init_entries_from_uri_list(
52 uri_list: &[Arc<str>],
53 cloud_options: Option<&CloudOptions>,
54) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
55 if uri_list.is_empty() {
56 return Ok(Default::default());
57 }
58
59 let first_uri = uri_list.first().unwrap().as_ref();
60
61 let file_cache_ttl = cloud_options
62 .map(|x| x.file_cache_ttl)
63 .unwrap_or_else(get_env_file_cache_ttl);
64
65 if is_cloud_url(first_uri) {
66 let object_stores = pl_async::get_runtime().block_in_place_on(async {
67 futures::future::try_join_all(
68 (0..if first_uri.starts_with("http") {
69 uri_list.len()
71 } else {
72 1
73 })
74 .map(|i| async move {
75 let (_, object_store) =
76 build_object_store(&uri_list[i], cloud_options, false).await?;
77 PolarsResult::Ok(object_store)
78 }),
79 )
80 .await
81 })?;
82
83 uri_list
84 .iter()
85 .enumerate()
86 .map(|(i, uri)| {
87 FILE_CACHE.init_entry(
88 uri.clone(),
89 || {
90 let CloudLocation { prefix, .. } =
91 CloudLocation::new(uri.as_ref(), false).unwrap();
92 let cloud_path = object_path_from_str(&prefix)?;
93
94 let object_store =
95 object_stores[std::cmp::min(i, object_stores.len() - 1)].clone();
96 let uri = uri.clone();
97
98 Ok(Arc::new(CloudFileFetcher {
99 uri,
100 object_store,
101 cloud_path,
102 }))
103 },
104 file_cache_ttl,
105 )
106 })
107 .collect::<PolarsResult<Vec<_>>>()
108 } else {
109 uri_list
110 .iter()
111 .map(|uri| {
112 let uri = std::fs::canonicalize(uri.as_ref()).map_err(|err| {
113 let msg = Some(format!("{}: {}", err, uri.as_ref()).into());
114 PolarsError::IO {
115 error: err.into(),
116 msg,
117 }
118 })?;
119 let uri = Arc::<str>::from(uri.to_str().unwrap());
120
121 FILE_CACHE.init_entry(
122 uri.clone(),
123 || Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),
124 file_cache_ttl,
125 )
126 })
127 .collect::<PolarsResult<Vec<_>>>()
128 }
129}