polars_io/file_cache/
cache.rs
1use std::path::Path;
2use std::sync::atomic::AtomicU64;
3use std::sync::{Arc, LazyLock, RwLock};
4
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::aliases::PlHashMap;
8
9use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};
10use super::eviction::EvictionManager;
11use super::file_fetcher::FileFetcher;
12use super::utils::FILE_CACHE_PREFIX;
13use crate::path_utils::{ensure_directory_init, is_cloud_url};
14
15pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
16 let prefix = FILE_CACHE_PREFIX.as_ref();
17 let prefix = Arc::<Path>::from(prefix);
18
19 if config::verbose() {
20 eprintln!("file cache prefix: {}", prefix.to_str().unwrap());
21 }
22
23 let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));
24 let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());
25
26 let metadata_dir = prefix
27 .as_ref()
28 .join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap())
29 .into_boxed_path();
30 if let Err(err) = ensure_directory_init(&metadata_dir) {
31 panic!(
32 "failed to create file cache metadata directory: path = {}, err = {}",
33 metadata_dir.to_str().unwrap(),
34 err
35 )
36 }
37
38 let data_dir = prefix
39 .as_ref()
40 .join(std::str::from_utf8(&[DATA_PREFIX]).unwrap())
41 .into_boxed_path();
42
43 if let Err(err) = ensure_directory_init(&data_dir) {
44 panic!(
45 "failed to create file cache data directory: path = {}, err = {}",
46 data_dir.to_str().unwrap(),
47 err
48 )
49 }
50
51 EvictionManager {
52 data_dir,
53 metadata_dir,
54 files_to_remove: None,
55 min_ttl: min_ttl.clone(),
56 notify_ttl_updated: notify_ttl_updated.clone(),
57 }
58 .run_in_background();
59
60 unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }
62});
63
64pub struct FileCache {
65 prefix: Arc<Path>,
66 entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,
67 min_ttl: Arc<AtomicU64>,
68 notify_ttl_updated: Arc<tokio::sync::Notify>,
69}
70
71impl FileCache {
72 unsafe fn new_unchecked(
77 prefix: Arc<Path>,
78 min_ttl: Arc<AtomicU64>,
79 notify_ttl_updated: Arc<tokio::sync::Notify>,
80 ) -> Self {
81 Self {
82 prefix,
83 entries: Default::default(),
84 min_ttl,
85 notify_ttl_updated,
86 }
87 }
88
89 pub(super) fn init_entry<F: Fn() -> PolarsResult<Arc<dyn FileFetcher>>>(
92 &self,
93 uri: Arc<str>,
94 get_file_fetcher: F,
95 ttl: u64,
96 ) -> PolarsResult<Arc<FileCacheEntry>> {
97 let verbose = config::verbose();
98
99 #[cfg(debug_assertions)]
100 {
101 if !crate::path_utils::is_cloud_url(uri.as_ref()) {
103 let path = Path::new(uri.as_ref());
104 assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());
105 }
106 }
107
108 if self
109 .min_ttl
110 .fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)
111 < ttl
112 {
113 self.notify_ttl_updated.notify_one();
114 }
115
116 {
117 let entries = self.entries.read().unwrap();
118
119 if let Some(entry) = entries.get(uri.as_ref()) {
120 if verbose {
121 eprintln!(
122 "[file_cache] init_entry: return existing entry for uri = {}",
123 uri.clone()
124 );
125 }
126 entry.update_ttl(ttl);
127 return Ok(entry.clone());
128 }
129 }
130
131 let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();
132
133 {
134 let mut entries = self.entries.write().unwrap();
135
136 if let Some(entry) = entries.get(uri.as_ref()) {
138 if verbose {
139 eprintln!(
140 "[file_cache] init_entry: return existing entry for uri = {} (lost init race)",
141 uri.clone()
142 );
143 }
144 entry.update_ttl(ttl);
145 return Ok(entry.clone());
146 }
147
148 if verbose {
149 eprintln!(
150 "[file_cache] init_entry: creating new entry for uri = {}, hash = {}",
151 uri.clone(),
152 uri_hash.clone()
153 );
154 }
155
156 let entry = Arc::new(FileCacheEntry::new(
157 uri.clone(),
158 uri_hash,
159 self.prefix.clone(),
160 get_file_fetcher()?,
161 ttl,
162 ));
163 entries.insert(uri, entry.clone());
164 Ok(entry.clone())
165 }
166 }
167
168 pub fn get_entry(&self, uri: &str) -> Option<Arc<FileCacheEntry>> {
170 if is_cloud_url(uri) {
171 self.entries.read().unwrap().get(uri).map(Arc::clone)
172 } else {
173 let uri = std::fs::canonicalize(uri).unwrap();
174 self.entries
175 .read()
176 .unwrap()
177 .get(uri.to_str().unwrap())
178 .map(Arc::clone)
179 }
180 }
181}
182
183pub fn get_env_file_cache_ttl() -> u64 {
184 std::env::var("POLARS_FILE_CACHE_TTL")
185 .map(|x| x.parse::<u64>().expect("integer"))
186 .unwrap_or(60 * 60)
187}