polars_io/file_cache/
cache.rs1use std::sync::atomic::AtomicU64;
2use std::sync::{Arc, LazyLock, RwLock};
3
4use polars_core::config;
5use polars_error::PolarsResult;
6use polars_utils::aliases::PlHashMap;
7use polars_utils::pl_path::PlRefPath;
8
9use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};
10use super::eviction::EvictionManager;
11use super::file_fetcher::FileFetcher;
12use super::utils::FILE_CACHE_PREFIX;
13use crate::path_utils::ensure_directory_init;
14
15pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
16 let prefix = FILE_CACHE_PREFIX.clone();
17
18 if config::verbose() {
19 eprintln!("file cache prefix: {}", prefix);
20 }
21
22 let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));
23 let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());
24
25 let metadata_dir = prefix.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap());
26 if let Err(err) = ensure_directory_init(metadata_dir.as_std_path()) {
27 panic!(
28 "failed to create file cache metadata directory: path = {}, err = {}",
29 metadata_dir, err
30 )
31 }
32
33 let data_dir = prefix.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap());
34
35 if let Err(err) = ensure_directory_init(data_dir.as_std_path()) {
36 panic!(
37 "failed to create file cache data directory: path = {}, err = {}",
38 data_dir, err
39 )
40 }
41
42 EvictionManager {
43 data_dir,
44 metadata_dir,
45 files_to_remove: None,
46 min_ttl: min_ttl.clone(),
47 notify_ttl_updated: notify_ttl_updated.clone(),
48 }
49 .run_in_background();
50
51 unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }
53});
54
55pub struct FileCache {
56 prefix: PlRefPath,
57 entries: Arc<RwLock<PlHashMap<PlRefPath, Arc<FileCacheEntry>>>>,
58 min_ttl: Arc<AtomicU64>,
59 notify_ttl_updated: Arc<tokio::sync::Notify>,
60}
61
62impl FileCache {
63 unsafe fn new_unchecked(
68 prefix: PlRefPath,
69 min_ttl: Arc<AtomicU64>,
70 notify_ttl_updated: Arc<tokio::sync::Notify>,
71 ) -> Self {
72 Self {
73 prefix,
74 entries: Default::default(),
75 min_ttl,
76 notify_ttl_updated,
77 }
78 }
79
80 pub(super) fn init_entry(
83 &self,
84 uri: PlRefPath,
85 get_file_fetcher: &dyn Fn() -> PolarsResult<Arc<dyn FileFetcher>>,
86 ttl: u64,
87 ) -> PolarsResult<Arc<FileCacheEntry>> {
88 let verbose = config::verbose();
89
90 if !uri.has_scheme() {
92 debug_assert_eq!(
93 std::fs::canonicalize(uri.as_str())
94 .ok()
95 .and_then(|x| PlRefPath::try_from_pathbuf(x).ok())
96 .as_ref(),
97 Some(&uri)
98 )
99 }
100
101 if self
102 .min_ttl
103 .fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)
104 < ttl
105 {
106 self.notify_ttl_updated.notify_one();
107 }
108
109 {
110 let entries = self.entries.read().unwrap();
111
112 if let Some(entry) = entries.get(&uri) {
113 if verbose {
114 eprintln!(
115 "[file_cache] init_entry: return existing entry for uri = {}",
116 uri.clone()
117 );
118 }
119 entry.update_ttl(ttl);
120 return Ok(entry.clone());
121 }
122 }
123
124 let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();
125
126 {
127 let mut entries = self.entries.write().unwrap();
128
129 if let Some(entry) = entries.get(&uri) {
131 if verbose {
132 eprintln!(
133 "[file_cache] init_entry: return existing entry for uri = {} (lost init race)",
134 uri.clone()
135 );
136 }
137 entry.update_ttl(ttl);
138 return Ok(entry.clone());
139 }
140
141 if verbose {
142 eprintln!(
143 "[file_cache] init_entry: creating new entry for uri = {uri}, hash = {uri_hash}"
144 );
145 }
146
147 let entry = Arc::new(FileCacheEntry::new(
148 uri.clone(),
149 uri_hash,
150 self.prefix.clone(),
151 get_file_fetcher()?,
152 ttl,
153 ));
154 entries.insert(uri.clone(), entry.clone());
155 Ok(entry)
156 }
157 }
158
159 pub fn get_entry(&self, path: PlRefPath) -> Option<Arc<FileCacheEntry>> {
161 if path.has_scheme() {
162 self.entries.read().unwrap().get(&path).cloned()
163 } else {
164 let p =
165 PlRefPath::try_from_pathbuf(std::fs::canonicalize(path.as_str()).unwrap()).unwrap();
166 self.entries.read().unwrap().get(&p).cloned()
167 }
168 }
169}
170
171pub fn get_env_file_cache_ttl() -> u64 {
172 std::env::var("POLARS_FILE_CACHE_TTL")
173 .map(|x| x.parse::<u64>().expect("integer"))
174 .unwrap_or(60 * 60)
175}