polars_io/file_cache/
eviction.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::atomic::AtomicU64;
4use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6use fs4::fs_std::FileExt;
7use polars_error::{PolarsError, PolarsResult};
8
9use super::cache_lock::{GLOBAL_FILE_CACHE_LOCK, GlobalFileCacheGuardExclusive};
10use super::metadata::EntryMetadata;
11use crate::pl_async;
12
13#[derive(Debug, Clone)]
14pub(super) struct EvictionCandidate {
15    path: PathBuf,
16    metadata_path: PathBuf,
17    metadata_last_modified: SystemTime,
18    ttl: u64,
19}
20
21pub(super) struct EvictionManager {
22    pub(super) data_dir: Box<Path>,
23    pub(super) metadata_dir: Box<Path>,
24    pub(super) files_to_remove: Option<Vec<EvictionCandidate>>,
25    pub(super) min_ttl: Arc<AtomicU64>,
26    pub(super) notify_ttl_updated: Arc<tokio::sync::Notify>,
27}
28
29impl EvictionCandidate {
30    fn update_ttl(&mut self) {
31        let Ok(metadata_last_modified) =
32            std::fs::metadata(&self.metadata_path).map(|md| md.modified().unwrap())
33        else {
34            self.ttl = 0;
35            return;
36        };
37
38        if self.metadata_last_modified == metadata_last_modified {
39            return;
40        }
41
42        let Ok(ref mut file) = std::fs::OpenOptions::new()
43            .read(true)
44            .open(&self.metadata_path)
45        else {
46            self.ttl = 0;
47            return;
48        };
49
50        let ttl = EntryMetadata::try_from_reader(file)
51            .map(|x| x.ttl)
52            .unwrap_or(0);
53
54        self.metadata_last_modified = metadata_last_modified;
55        self.ttl = ttl;
56    }
57
58    fn should_remove(&self, now: &SystemTime) -> bool {
59        let Ok(metadata) = std::fs::metadata(&self.path) else {
60            return false;
61        };
62
63        if let Ok(duration) = now.duration_since(
64            metadata
65                .accessed()
66                .unwrap_or_else(|_| metadata.modified().unwrap()),
67        ) {
68            duration.as_secs() >= self.ttl
69        } else {
70            false
71        }
72    }
73
74    fn try_evict(
75        &mut self,
76        now: &SystemTime,
77        verbose: bool,
78        _guard: &GlobalFileCacheGuardExclusive,
79    ) {
80        self.update_ttl();
81        let path = &self.path;
82
83        if !path.exists() {
84            if verbose {
85                eprintln!(
86                    "[EvictionManager] evict_files: skipping {} (path no longer exists)",
87                    path.to_str().unwrap()
88                );
89            }
90            return;
91        }
92
93        let metadata = std::fs::metadata(path).unwrap();
94
95        let since_last_accessed = match now.duration_since(
96            metadata
97                .accessed()
98                .unwrap_or_else(|_| metadata.modified().unwrap()),
99        ) {
100            Ok(v) => v.as_secs(),
101            Err(_) => {
102                if verbose {
103                    eprintln!(
104                        "[EvictionManager] evict_files: skipping {} (last accessed time was updated)",
105                        path.to_str().unwrap()
106                    );
107                }
108                return;
109            },
110        };
111
112        if since_last_accessed < self.ttl {
113            if verbose {
114                eprintln!(
115                    "[EvictionManager] evict_files: skipping {} (last accessed time was updated)",
116                    path.to_str().unwrap()
117                );
118            }
119            return;
120        }
121
122        {
123            let file = std::fs::OpenOptions::new().read(true).open(path).unwrap();
124
125            if file.try_lock_exclusive().is_err() {
126                if verbose {
127                    eprintln!(
128                        "[EvictionManager] evict_files: skipping {} (file is locked)",
129                        self.path.to_str().unwrap()
130                    );
131                }
132                return;
133            }
134        }
135
136        if let Err(err) = std::fs::remove_file(path) {
137            if verbose {
138                eprintln!(
139                    "[EvictionManager] evict_files: error removing file: {} ({})",
140                    path.to_str().unwrap(),
141                    err
142                );
143            }
144        } else if verbose {
145            eprintln!(
146                "[EvictionManager] evict_files: removed file at {}",
147                path.to_str().unwrap()
148            );
149        }
150    }
151}
152
153impl EvictionManager {
154    /// # Safety
155    /// The following directories exist:
156    /// * `self.data_dir`
157    /// * `self.metadata_dir`
158    pub(super) fn run_in_background(mut self) {
159        let verbose = false;
160
161        if verbose {
162            eprintln!(
163                "[EvictionManager] creating cache eviction background task, self.min_ttl = {}",
164                self.min_ttl.load(std::sync::atomic::Ordering::Relaxed)
165            );
166        }
167
168        pl_async::get_runtime().spawn(async move {
169            // Give some time at startup for other code to run.
170            tokio::time::sleep(Duration::from_secs(3)).await;
171            let mut last_eviction_time;
172
173            loop {
174                let this: &'static mut Self = unsafe { std::mem::transmute(&mut self) };
175
176                let result = tokio::task::spawn_blocking(|| this.update_file_list())
177                    .await
178                    .unwrap();
179
180                last_eviction_time = Instant::now();
181
182                match result {
183                    Ok(_) if self.files_to_remove.as_ref().unwrap().is_empty() => {},
184                    Ok(_) => loop {
185                        if let Some(guard) = GLOBAL_FILE_CACHE_LOCK.try_lock_eviction() {
186                            if verbose {
187                                eprintln!(
188                                    "[EvictionManager] got exclusive cache lock, evicting {} files",
189                                    self.files_to_remove.as_ref().unwrap().len()
190                                );
191                            }
192
193                            tokio::task::block_in_place(|| self.evict_files(&guard));
194                            break;
195                        }
196                        tokio::time::sleep(Duration::from_secs(7)).await;
197                    },
198                    Err(err) => {
199                        if verbose {
200                            eprintln!("[EvictionManager] error updating file list: {}", err);
201                        }
202                    },
203                }
204
205                loop {
206                    let min_ttl = self.min_ttl.load(std::sync::atomic::Ordering::Relaxed);
207                    let sleep_interval = std::cmp::max(min_ttl / 4, {
208                        #[cfg(debug_assertions)]
209                        {
210                            3
211                        }
212                        #[cfg(not(debug_assertions))]
213                        {
214                            60
215                        }
216                    });
217
218                    let since_last_eviction =
219                        Instant::now().duration_since(last_eviction_time).as_secs();
220                    let sleep_interval = sleep_interval.saturating_sub(since_last_eviction);
221                    let sleep_interval = Duration::from_secs(sleep_interval);
222
223                    tokio::select! {
224                        _ = self.notify_ttl_updated.notified() => {
225                            continue;
226                        }
227                        _ = tokio::time::sleep(sleep_interval) => {
228                            break;
229                        }
230                    }
231                }
232            }
233        });
234    }
235
236    fn update_file_list(&mut self) -> PolarsResult<()> {
237        let data_files_iter = match std::fs::read_dir(self.data_dir.as_ref()) {
238            Ok(v) => v,
239            Err(e) => {
240                let msg = format!("failed to read data directory: {}", e);
241
242                return Err(PolarsError::IO {
243                    error: e.into(),
244                    msg: Some(msg.into()),
245                });
246            },
247        };
248
249        let metadata_files_iter = match std::fs::read_dir(self.metadata_dir.as_ref()) {
250            Ok(v) => v,
251            Err(e) => {
252                let msg = format!("failed to read metadata directory: {}", e);
253
254                return Err(PolarsError::IO {
255                    error: e.into(),
256                    msg: Some(msg.into()),
257                });
258            },
259        };
260
261        let mut files_to_remove = Vec::with_capacity(
262            data_files_iter
263                .size_hint()
264                .1
265                .unwrap_or(data_files_iter.size_hint().0)
266                + metadata_files_iter
267                    .size_hint()
268                    .1
269                    .unwrap_or(metadata_files_iter.size_hint().0),
270        );
271
272        let now = SystemTime::now();
273
274        for file in data_files_iter {
275            let file = file?;
276            let path = file.path();
277
278            let hash = path
279                .file_name()
280                .unwrap()
281                .to_str()
282                .unwrap()
283                .get(..32)
284                .unwrap();
285            let metadata_path = self.metadata_dir.join(hash);
286
287            let mut eviction_candidate = EvictionCandidate {
288                path,
289                metadata_path,
290                metadata_last_modified: UNIX_EPOCH,
291                ttl: 0,
292            };
293            eviction_candidate.update_ttl();
294
295            if eviction_candidate.should_remove(&now) {
296                files_to_remove.push(eviction_candidate);
297            }
298        }
299
300        for file in metadata_files_iter {
301            let file = file?;
302            let path = file.path();
303            let metadata_path = path.clone();
304
305            let mut eviction_candidate = EvictionCandidate {
306                path,
307                metadata_path,
308                metadata_last_modified: UNIX_EPOCH,
309                ttl: 0,
310            };
311
312            eviction_candidate.update_ttl();
313
314            if eviction_candidate.should_remove(&now) {
315                files_to_remove.push(eviction_candidate);
316            }
317        }
318
319        self.files_to_remove = Some(files_to_remove);
320
321        Ok(())
322    }
323
324    /// # Panics
325    /// Panics if `self.files_to_remove` is `None`.
326    fn evict_files(&mut self, _guard: &GlobalFileCacheGuardExclusive) {
327        let verbose = false;
328        let mut files_to_remove = self.files_to_remove.take().unwrap();
329        let now = &SystemTime::now();
330
331        for eviction_candidate in files_to_remove.iter_mut() {
332            eviction_candidate.try_evict(now, verbose, _guard);
333        }
334    }
335}