polars_io/file_cache/
eviction.rs
1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::atomic::AtomicU64;
4use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6use fs4::fs_std::FileExt;
7use polars_error::{PolarsError, PolarsResult};
8
9use super::cache_lock::{GLOBAL_FILE_CACHE_LOCK, GlobalFileCacheGuardExclusive};
10use super::metadata::EntryMetadata;
11use crate::pl_async;
12
13#[derive(Debug, Clone)]
14pub(super) struct EvictionCandidate {
15 path: PathBuf,
16 metadata_path: PathBuf,
17 metadata_last_modified: SystemTime,
18 ttl: u64,
19}
20
21pub(super) struct EvictionManager {
22 pub(super) data_dir: Box<Path>,
23 pub(super) metadata_dir: Box<Path>,
24 pub(super) files_to_remove: Option<Vec<EvictionCandidate>>,
25 pub(super) min_ttl: Arc<AtomicU64>,
26 pub(super) notify_ttl_updated: Arc<tokio::sync::Notify>,
27}
28
29impl EvictionCandidate {
30 fn update_ttl(&mut self) {
31 let Ok(metadata_last_modified) =
32 std::fs::metadata(&self.metadata_path).map(|md| md.modified().unwrap())
33 else {
34 self.ttl = 0;
35 return;
36 };
37
38 if self.metadata_last_modified == metadata_last_modified {
39 return;
40 }
41
42 let Ok(ref mut file) = std::fs::OpenOptions::new()
43 .read(true)
44 .open(&self.metadata_path)
45 else {
46 self.ttl = 0;
47 return;
48 };
49
50 let ttl = EntryMetadata::try_from_reader(file)
51 .map(|x| x.ttl)
52 .unwrap_or(0);
53
54 self.metadata_last_modified = metadata_last_modified;
55 self.ttl = ttl;
56 }
57
58 fn should_remove(&self, now: &SystemTime) -> bool {
59 let Ok(metadata) = std::fs::metadata(&self.path) else {
60 return false;
61 };
62
63 if let Ok(duration) = now.duration_since(
64 metadata
65 .accessed()
66 .unwrap_or_else(|_| metadata.modified().unwrap()),
67 ) {
68 duration.as_secs() >= self.ttl
69 } else {
70 false
71 }
72 }
73
74 fn try_evict(
75 &mut self,
76 now: &SystemTime,
77 verbose: bool,
78 _guard: &GlobalFileCacheGuardExclusive,
79 ) {
80 self.update_ttl();
81 let path = &self.path;
82
83 if !path.exists() {
84 if verbose {
85 eprintln!(
86 "[EvictionManager] evict_files: skipping {} (path no longer exists)",
87 path.to_str().unwrap()
88 );
89 }
90 return;
91 }
92
93 let metadata = std::fs::metadata(path).unwrap();
94
95 let since_last_accessed = match now.duration_since(
96 metadata
97 .accessed()
98 .unwrap_or_else(|_| metadata.modified().unwrap()),
99 ) {
100 Ok(v) => v.as_secs(),
101 Err(_) => {
102 if verbose {
103 eprintln!(
104 "[EvictionManager] evict_files: skipping {} (last accessed time was updated)",
105 path.to_str().unwrap()
106 );
107 }
108 return;
109 },
110 };
111
112 if since_last_accessed < self.ttl {
113 if verbose {
114 eprintln!(
115 "[EvictionManager] evict_files: skipping {} (last accessed time was updated)",
116 path.to_str().unwrap()
117 );
118 }
119 return;
120 }
121
122 {
123 let file = std::fs::OpenOptions::new().read(true).open(path).unwrap();
124
125 if file.try_lock_exclusive().is_err() {
126 if verbose {
127 eprintln!(
128 "[EvictionManager] evict_files: skipping {} (file is locked)",
129 self.path.to_str().unwrap()
130 );
131 }
132 return;
133 }
134 }
135
136 if let Err(err) = std::fs::remove_file(path) {
137 if verbose {
138 eprintln!(
139 "[EvictionManager] evict_files: error removing file: {} ({})",
140 path.to_str().unwrap(),
141 err
142 );
143 }
144 } else if verbose {
145 eprintln!(
146 "[EvictionManager] evict_files: removed file at {}",
147 path.to_str().unwrap()
148 );
149 }
150 }
151}
152
153impl EvictionManager {
154 pub(super) fn run_in_background(mut self) {
159 let verbose = false;
160
161 if verbose {
162 eprintln!(
163 "[EvictionManager] creating cache eviction background task, self.min_ttl = {}",
164 self.min_ttl.load(std::sync::atomic::Ordering::Relaxed)
165 );
166 }
167
168 pl_async::get_runtime().spawn(async move {
169 tokio::time::sleep(Duration::from_secs(3)).await;
171 let mut last_eviction_time;
172
173 loop {
174 let this: &'static mut Self = unsafe { std::mem::transmute(&mut self) };
175
176 let result = tokio::task::spawn_blocking(|| this.update_file_list())
177 .await
178 .unwrap();
179
180 last_eviction_time = Instant::now();
181
182 match result {
183 Ok(_) if self.files_to_remove.as_ref().unwrap().is_empty() => {},
184 Ok(_) => loop {
185 if let Some(guard) = GLOBAL_FILE_CACHE_LOCK.try_lock_eviction() {
186 if verbose {
187 eprintln!(
188 "[EvictionManager] got exclusive cache lock, evicting {} files",
189 self.files_to_remove.as_ref().unwrap().len()
190 );
191 }
192
193 tokio::task::block_in_place(|| self.evict_files(&guard));
194 break;
195 }
196 tokio::time::sleep(Duration::from_secs(7)).await;
197 },
198 Err(err) => {
199 if verbose {
200 eprintln!("[EvictionManager] error updating file list: {}", err);
201 }
202 },
203 }
204
205 loop {
206 let min_ttl = self.min_ttl.load(std::sync::atomic::Ordering::Relaxed);
207 let sleep_interval = std::cmp::max(min_ttl / 4, {
208 #[cfg(debug_assertions)]
209 {
210 3
211 }
212 #[cfg(not(debug_assertions))]
213 {
214 60
215 }
216 });
217
218 let since_last_eviction =
219 Instant::now().duration_since(last_eviction_time).as_secs();
220 let sleep_interval = sleep_interval.saturating_sub(since_last_eviction);
221 let sleep_interval = Duration::from_secs(sleep_interval);
222
223 tokio::select! {
224 _ = self.notify_ttl_updated.notified() => {
225 continue;
226 }
227 _ = tokio::time::sleep(sleep_interval) => {
228 break;
229 }
230 }
231 }
232 }
233 });
234 }
235
236 fn update_file_list(&mut self) -> PolarsResult<()> {
237 let data_files_iter = match std::fs::read_dir(self.data_dir.as_ref()) {
238 Ok(v) => v,
239 Err(e) => {
240 let msg = format!("failed to read data directory: {}", e);
241
242 return Err(PolarsError::IO {
243 error: e.into(),
244 msg: Some(msg.into()),
245 });
246 },
247 };
248
249 let metadata_files_iter = match std::fs::read_dir(self.metadata_dir.as_ref()) {
250 Ok(v) => v,
251 Err(e) => {
252 let msg = format!("failed to read metadata directory: {}", e);
253
254 return Err(PolarsError::IO {
255 error: e.into(),
256 msg: Some(msg.into()),
257 });
258 },
259 };
260
261 let mut files_to_remove = Vec::with_capacity(
262 data_files_iter
263 .size_hint()
264 .1
265 .unwrap_or(data_files_iter.size_hint().0)
266 + metadata_files_iter
267 .size_hint()
268 .1
269 .unwrap_or(metadata_files_iter.size_hint().0),
270 );
271
272 let now = SystemTime::now();
273
274 for file in data_files_iter {
275 let file = file?;
276 let path = file.path();
277
278 let hash = path
279 .file_name()
280 .unwrap()
281 .to_str()
282 .unwrap()
283 .get(..32)
284 .unwrap();
285 let metadata_path = self.metadata_dir.join(hash);
286
287 let mut eviction_candidate = EvictionCandidate {
288 path,
289 metadata_path,
290 metadata_last_modified: UNIX_EPOCH,
291 ttl: 0,
292 };
293 eviction_candidate.update_ttl();
294
295 if eviction_candidate.should_remove(&now) {
296 files_to_remove.push(eviction_candidate);
297 }
298 }
299
300 for file in metadata_files_iter {
301 let file = file?;
302 let path = file.path();
303 let metadata_path = path.clone();
304
305 let mut eviction_candidate = EvictionCandidate {
306 path,
307 metadata_path,
308 metadata_last_modified: UNIX_EPOCH,
309 ttl: 0,
310 };
311
312 eviction_candidate.update_ttl();
313
314 if eviction_candidate.should_remove(&now) {
315 files_to_remove.push(eviction_candidate);
316 }
317 }
318
319 self.files_to_remove = Some(files_to_remove);
320
321 Ok(())
322 }
323
324 fn evict_files(&mut self, _guard: &GlobalFileCacheGuardExclusive) {
327 let verbose = false;
328 let mut files_to_remove = self.files_to_remove.take().unwrap();
329 let now = &SystemTime::now();
330
331 for eviction_candidate in files_to_remove.iter_mut() {
332 eviction_candidate.try_evict(now, verbose, _guard);
333 }
334 }
335}