polars_io/file_cache/
cache.rs1use std::path::Path;
2use std::sync::atomic::AtomicU64;
3use std::sync::{Arc, LazyLock, RwLock};
4
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::aliases::PlHashMap;
8use polars_utils::plpath::PlPathRef;
9
10use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};
11use super::eviction::EvictionManager;
12use super::file_fetcher::FileFetcher;
13use super::utils::FILE_CACHE_PREFIX;
14use crate::path_utils::ensure_directory_init;
15
16pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
17 let prefix = FILE_CACHE_PREFIX.as_ref();
18 let prefix = Arc::<Path>::from(prefix);
19
20 if config::verbose() {
21 eprintln!("file cache prefix: {}", prefix.to_str().unwrap());
22 }
23
24 let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));
25 let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());
26
27 let metadata_dir = prefix
28 .as_ref()
29 .join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap())
30 .into_boxed_path();
31 if let Err(err) = ensure_directory_init(&metadata_dir) {
32 panic!(
33 "failed to create file cache metadata directory: path = {}, err = {}",
34 metadata_dir.to_str().unwrap(),
35 err
36 )
37 }
38
39 let data_dir = prefix
40 .as_ref()
41 .join(std::str::from_utf8(&[DATA_PREFIX]).unwrap())
42 .into_boxed_path();
43
44 if let Err(err) = ensure_directory_init(&data_dir) {
45 panic!(
46 "failed to create file cache data directory: path = {}, err = {}",
47 data_dir.to_str().unwrap(),
48 err
49 )
50 }
51
52 EvictionManager {
53 data_dir,
54 metadata_dir,
55 files_to_remove: None,
56 min_ttl: min_ttl.clone(),
57 notify_ttl_updated: notify_ttl_updated.clone(),
58 }
59 .run_in_background();
60
61 unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }
63});
64
65pub struct FileCache {
66 prefix: Arc<Path>,
67 entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,
68 min_ttl: Arc<AtomicU64>,
69 notify_ttl_updated: Arc<tokio::sync::Notify>,
70}
71
72impl FileCache {
73 unsafe fn new_unchecked(
78 prefix: Arc<Path>,
79 min_ttl: Arc<AtomicU64>,
80 notify_ttl_updated: Arc<tokio::sync::Notify>,
81 ) -> Self {
82 Self {
83 prefix,
84 entries: Default::default(),
85 min_ttl,
86 notify_ttl_updated,
87 }
88 }
89
90 pub(super) fn init_entry(
93 &self,
94 uri: Arc<str>,
95 get_file_fetcher: &dyn Fn() -> PolarsResult<Arc<dyn FileFetcher>>,
96 ttl: u64,
97 ) -> PolarsResult<Arc<FileCacheEntry>> {
98 let verbose = config::verbose();
99
100 #[cfg(debug_assertions)]
101 {
102 if !PlPathRef::new(uri.as_ref()).is_cloud_url() {
104 let path = Path::new(uri.as_ref());
105 assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());
106 }
107 }
108
109 if self
110 .min_ttl
111 .fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)
112 < ttl
113 {
114 self.notify_ttl_updated.notify_one();
115 }
116
117 {
118 let entries = self.entries.read().unwrap();
119
120 if let Some(entry) = entries.get(uri.as_ref()) {
121 if verbose {
122 eprintln!(
123 "[file_cache] init_entry: return existing entry for uri = {}",
124 uri.clone()
125 );
126 }
127 entry.update_ttl(ttl);
128 return Ok(entry.clone());
129 }
130 }
131
132 let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();
133
134 {
135 let mut entries = self.entries.write().unwrap();
136
137 if let Some(entry) = entries.get(uri.as_ref()) {
139 if verbose {
140 eprintln!(
141 "[file_cache] init_entry: return existing entry for uri = {} (lost init race)",
142 uri.clone()
143 );
144 }
145 entry.update_ttl(ttl);
146 return Ok(entry.clone());
147 }
148
149 if verbose {
150 eprintln!(
151 "[file_cache] init_entry: creating new entry for uri = {uri}, hash = {uri_hash}"
152 );
153 }
154
155 let entry = Arc::new(FileCacheEntry::new(
156 uri.clone(),
157 uri_hash,
158 self.prefix.clone(),
159 get_file_fetcher()?,
160 ttl,
161 ));
162 entries.insert(uri, entry.clone());
163 Ok(entry)
164 }
165 }
166
167 pub fn get_entry(&self, addr: PlPathRef<'_>) -> Option<Arc<FileCacheEntry>> {
169 match addr {
170 PlPathRef::Local(p) => {
171 let p = std::fs::canonicalize(p).unwrap();
172 self.entries
173 .read()
174 .unwrap()
175 .get(p.to_str().unwrap())
176 .map(Arc::clone)
177 },
178 PlPathRef::Cloud(p) => self.entries.read().unwrap().get(p.uri()).map(Arc::clone),
179 }
180 }
181}
182
183pub fn get_env_file_cache_ttl() -> u64 {
184 std::env::var("POLARS_FILE_CACHE_TTL")
185 .map(|x| x.parse::<u64>().expect("integer"))
186 .unwrap_or(60 * 60)
187}