polars_io/file_cache/
cache_lock.rs

1use std::sync::atomic::AtomicBool;
2use std::sync::{Arc, LazyLock, RwLock, RwLockReadGuard, RwLockWriteGuard};
3use std::time::Duration;
4
5use fs4::fs_std::FileExt;
6
7use super::utils::FILE_CACHE_PREFIX;
8use crate::pl_async;
9
10pub(super) static GLOBAL_FILE_CACHE_LOCK: LazyLock<GlobalLock> = LazyLock::new(|| {
11    let path = FILE_CACHE_PREFIX.join(".process-lock");
12
13    let file = std::fs::OpenOptions::new()
14        .write(true)
15        .create(true)
16        .truncate(false)
17        .open(path)
18        .map_err(|err| {
19            panic!("failed to open/create global file cache lockfile: {}", err);
20        })
21        .unwrap();
22
23    let at_bool = Arc::new(AtomicBool::new(false));
24    // Holding this access tracker prevents the background task from
25    // unlocking the lock.
26    let access_tracker = AccessTracker(at_bool.clone());
27    let notify_lock_acquired = Arc::new(tokio::sync::Notify::new());
28    let notify_lock_acquired_2 = notify_lock_acquired.clone();
29
30    pl_async::get_runtime().spawn(async move {
31        let access_tracker = at_bool;
32        let notify_lock_acquired = notify_lock_acquired_2;
33        let verbose = false;
34
35        loop {
36            if verbose {
37                eprintln!("file cache background unlock: waiting for acquisition notification");
38            }
39
40            notify_lock_acquired.notified().await;
41
42            if verbose {
43                eprintln!("file cache background unlock: got acquisition notification");
44            }
45
46            loop {
47                if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) {
48                    if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() {
49                        if unlocked_by_this_call && verbose {
50                            eprintln!(
51                                "file cache background unlock: unlocked global file cache lockfile"
52                            );
53                        }
54                        break;
55                    }
56                }
57                tokio::time::sleep(Duration::from_secs(3)).await;
58            }
59        }
60    });
61
62    GlobalLock {
63        inner: RwLock::new(GlobalLockData { file, state: None }),
64        access_tracker,
65        notify_lock_acquired,
66    }
67});
68
69pub(super) enum LockedState {
70    /// Shared between threads and other processes.
71    Shared,
72    #[allow(dead_code)]
73    /// Locked exclusively by the eviction task of this process.
74    Eviction,
75}
76
77#[allow(dead_code)]
78pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>;
79pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>;
80
81pub(super) struct GlobalLockData {
82    file: std::fs::File,
83    state: Option<LockedState>,
84}
85
86pub(super) struct GlobalLock {
87    inner: RwLock<GlobalLockData>,
88    access_tracker: AccessTracker,
89    notify_lock_acquired: Arc<tokio::sync::Notify>,
90}
91
92/// Tracks access to the global lock:
93/// * The inner `bool` is used to delay the background unlock task from unlocking
94///   the global lock until 3 seconds after the last lock attempt.
95/// * The `Arc` ref-count is used as a semaphore that allows us to block exclusive
96///   lock attempts while temporarily releasing the `RwLock`.
97#[derive(Clone)]
98struct AccessTracker(Arc<AtomicBool>);
99
100impl Drop for AccessTracker {
101    fn drop(&mut self) {
102        self.0.store(true, std::sync::atomic::Ordering::Relaxed);
103    }
104}
105
106struct NotifyOnDrop(Arc<tokio::sync::Notify>);
107
108impl Drop for NotifyOnDrop {
109    fn drop(&mut self) {
110        self.0.notify_one();
111    }
112}
113
114impl GlobalLock {
115    fn get_access_tracker(&self) -> AccessTracker {
116        let at = self.access_tracker.clone();
117        at.0.store(true, std::sync::atomic::Ordering::Relaxed);
118        at
119    }
120
121    /// Returns
122    /// * `None` - Could be locked (ambiguous)
123    /// * `Some(true)` - Unlocked (by this function call)
124    /// * `Some(false)` - Unlocked (was not locked)
125    fn try_unlock(&self) -> Option<bool> {
126        if let Ok(mut this) = self.inner.try_write() {
127            if Arc::strong_count(&self.access_tracker.0) <= 2 {
128                return if this.state.take().is_some() {
129                    FileExt::unlock(&this.file).unwrap();
130                    Some(true)
131                } else {
132                    Some(false)
133                };
134            }
135        }
136        None
137    }
138
139    /// Acquire a shared lock.
140    pub(super) fn lock_shared(&self) -> GlobalFileCacheGuardAny {
141        let access_tracker = self.get_access_tracker();
142        let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
143
144        {
145            let this = self.inner.read().unwrap();
146
147            if let Some(LockedState::Shared) = this.state {
148                return this;
149            }
150        }
151
152        {
153            let mut this = self.inner.write().unwrap();
154
155            if let Some(LockedState::Eviction) = this.state {
156                FileExt::unlock(&this.file).unwrap();
157                this.state = None;
158            }
159
160            if this.state.is_none() {
161                FileExt::lock_shared(&this.file).unwrap();
162                this.state = Some(LockedState::Shared);
163            }
164        }
165
166        // Safety: Holding the access tracker guard maintains an Arc refcount
167        // > 2, which prevents automatic unlock.
168        debug_assert!(Arc::strong_count(&access_tracker.0) > 2);
169
170        {
171            let this = self.inner.read().unwrap();
172
173            if let Some(LockedState::Eviction) = this.state {
174                // Try again
175                drop(this);
176                return self.lock_shared();
177            }
178
179            assert!(
180                this.state.is_some(),
181                "impl error: global file cache lock was unlocked"
182            );
183            this
184        }
185    }
186
187    /// Acquire an exclusive lock on the cache directory. Holding this lock freezes
188    /// all cache operations except for reading from already-opened data files.
189    #[allow(dead_code)]
190    pub(super) fn try_lock_eviction(&self) -> Option<GlobalFileCacheGuardExclusive> {
191        let access_tracker = self.get_access_tracker();
192
193        if let Ok(mut this) = self.inner.try_write() {
194            if
195            // 3:
196            // * the Lazy<GlobalLock>
197            // * the global unlock background task
198            // * this function
199            Arc::strong_count(&access_tracker.0) > 3 {
200                return None;
201            }
202
203            let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
204
205            if let Some(ref state) = this.state {
206                if matches!(state, LockedState::Eviction) {
207                    return Some(this);
208                }
209            }
210
211            if this.state.take().is_some() {
212                FileExt::unlock(&this.file).unwrap();
213            }
214
215            if this.file.try_lock_exclusive().is_ok() {
216                this.state = Some(LockedState::Eviction);
217                return Some(this);
218            }
219        }
220        None
221    }
222}