Skip to main content

polars_io/file_cache/
cache_lock.rs

1use std::sync::atomic::AtomicBool;
2use std::sync::{Arc, LazyLock};
3use std::time::Duration;
4
5use fs4::FileExt;
6use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
7use polars_core::runtime::ASYNC;
8
9use super::utils::FILE_CACHE_PREFIX;
10
11pub(super) static GLOBAL_FILE_CACHE_LOCK: LazyLock<GlobalLock> = LazyLock::new(|| {
12    let path = FILE_CACHE_PREFIX.join(".process-lock");
13
14    let file = std::fs::OpenOptions::new()
15        .write(true)
16        .create(true)
17        .truncate(false)
18        .open(path)
19        .map_err(|err| {
20            panic!("failed to open/create global file cache lockfile: {err}");
21        })
22        .unwrap();
23
24    let at_bool = Arc::new(AtomicBool::new(false));
25    // Holding this access tracker prevents the background task from
26    // unlocking the lock.
27    let access_tracker = AccessTracker(at_bool.clone());
28    let notify_lock_acquired = Arc::new(tokio::sync::Notify::new());
29    let notify_lock_acquired_2 = notify_lock_acquired.clone();
30
31    ASYNC.spawn(async move {
32        let at_bool = std::mem::ManuallyDrop::new(at_bool);
33        let access_tracker = at_bool.as_ref();
34        let notify_lock_acquired = notify_lock_acquired_2;
35        let verbose = false;
36
37        loop {
38            if verbose {
39                eprintln!("file cache background unlock: waiting for acquisition notification");
40            }
41
42            notify_lock_acquired.notified().await;
43
44            if verbose {
45                eprintln!("file cache background unlock: got acquisition notification");
46            }
47
48            loop {
49                if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) {
50                    if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() {
51                        if unlocked_by_this_call && verbose {
52                            eprintln!(
53                                "file cache background unlock: unlocked global file cache lockfile"
54                            );
55                        }
56                        break;
57                    }
58                }
59                tokio::time::sleep(Duration::from_secs(3)).await;
60            }
61        }
62    });
63
64    GlobalLock {
65        inner: RwLock::new(GlobalLockData { file, state: None }),
66        access_tracker,
67        notify_lock_acquired,
68    }
69});
70
71pub(super) enum LockedState {
72    /// Shared between threads and other processes.
73    Shared,
74    #[allow(dead_code)]
75    /// Locked exclusively by the eviction task of this process.
76    Eviction,
77}
78
79#[allow(dead_code)]
80pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>;
81pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>;
82
83pub(super) struct GlobalLockData {
84    file: std::fs::File,
85    state: Option<LockedState>,
86}
87
88pub(super) struct GlobalLock {
89    inner: RwLock<GlobalLockData>,
90    access_tracker: AccessTracker,
91    notify_lock_acquired: Arc<tokio::sync::Notify>,
92}
93
94/// Tracks access to the global lock:
95/// * The inner `bool` is used to delay the background unlock task from unlocking
96///   the global lock until 3 seconds after the last lock attempt.
97/// * The `Arc` ref-count is used as a semaphore that allows us to block exclusive
98///   lock attempts while temporarily releasing the `RwLock`.
99#[derive(Clone)]
100struct AccessTracker(Arc<AtomicBool>);
101
102impl Drop for AccessTracker {
103    fn drop(&mut self) {
104        self.0.store(true, std::sync::atomic::Ordering::Relaxed);
105    }
106}
107
108struct NotifyOnDrop(Arc<tokio::sync::Notify>);
109
110impl Drop for NotifyOnDrop {
111    fn drop(&mut self) {
112        self.0.notify_one();
113    }
114}
115
116impl GlobalLock {
117    fn get_access_tracker(&self) -> AccessTracker {
118        let at = self.access_tracker.clone();
119        at.0.store(true, std::sync::atomic::Ordering::Relaxed);
120        at
121    }
122
123    /// Returns
124    /// * `None` - Could be locked (ambiguous)
125    /// * `Some(true)` - Unlocked (by this function call)
126    /// * `Some(false)` - Unlocked (was not locked)
127    fn try_unlock(&self) -> Option<bool> {
128        if let Some(mut this) = self.inner.try_write() {
129            if Arc::strong_count(&self.access_tracker.0) <= 2 {
130                return if this.state.take().is_some() {
131                    FileExt::unlock(&this.file).unwrap();
132                    Some(true)
133                } else {
134                    Some(false)
135                };
136            }
137        }
138        None
139    }
140
141    /// Acquire a shared lock.
142    pub(super) fn lock_shared(&self) -> GlobalFileCacheGuardAny<'_> {
143        let access_tracker = self.get_access_tracker();
144        let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
145
146        {
147            let this = self.inner.read();
148
149            if let Some(LockedState::Shared) = this.state {
150                return this;
151            }
152        }
153
154        {
155            let mut this = self.inner.write();
156
157            if let Some(LockedState::Eviction) = this.state {
158                FileExt::unlock(&this.file).unwrap();
159                this.state = None;
160            }
161
162            if this.state.is_none() {
163                FileExt::lock_shared(&this.file).unwrap();
164                this.state = Some(LockedState::Shared);
165            }
166        }
167
168        // Safety: Holding the access tracker guard maintains an Arc refcount
169        // > 2, which prevents automatic unlock.
170        debug_assert!(Arc::strong_count(&access_tracker.0) > 2);
171
172        {
173            let this = self.inner.read();
174
175            if let Some(LockedState::Eviction) = this.state {
176                // Try again
177                drop(this);
178                return self.lock_shared();
179            }
180
181            assert!(
182                this.state.is_some(),
183                "impl error: global file cache lock was unlocked"
184            );
185            this
186        }
187    }
188
189    /// Acquire an exclusive lock on the cache directory. Holding this lock freezes
190    /// all cache operations except for reading from already-opened data files.
191    #[allow(dead_code)]
192    pub(super) fn try_lock_eviction(&self) -> Option<GlobalFileCacheGuardExclusive<'_>> {
193        let access_tracker = self.get_access_tracker();
194
195        if let Some(mut this) = self.inner.try_write() {
196            if
197            // 3:
198            // * the Lazy<GlobalLock>
199            // * the global unlock background task
200            // * this function
201            Arc::strong_count(&access_tracker.0) > 3 {
202                return None;
203            }
204
205            let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
206
207            if let Some(ref state) = this.state {
208                if matches!(state, LockedState::Eviction) {
209                    return Some(this);
210                }
211            }
212
213            if this.state.take().is_some() {
214                FileExt::unlock(&this.file).unwrap();
215            }
216
217            if this.file.try_lock().is_ok() {
218                this.state = Some(LockedState::Eviction);
219                return Some(this);
220            }
221        }
222        None
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    fn make_test_lock() -> Arc<GlobalLock> {
231        let at_bool = Arc::new(AtomicBool::new(false));
232        // Leak one ref to match the background unlock task's ref in production,
233        // keeping Arc strong count consistent for debug_assert in lock_shared.
234        std::mem::forget(at_bool.clone());
235        Arc::new(GlobalLock {
236            inner: RwLock::new(GlobalLockData {
237                file: tempfile::tempfile().unwrap(),
238                state: None,
239            }),
240            access_tracker: AccessTracker(at_bool),
241            notify_lock_acquired: Arc::new(tokio::sync::Notify::new()),
242        })
243    }
244
245    #[test]
246    fn try_unlock_when_not_locked() {
247        let lock = make_test_lock();
248        assert_eq!(lock.try_unlock(), Some(false));
249    }
250
251    #[test]
252    fn lock_shared_lifecycle() {
253        let lock = make_test_lock();
254
255        // Acquires the file lock and sets state = Shared.
256        let guard = lock.lock_shared();
257        assert!(matches!(guard.state, Some(LockedState::Shared)));
258        drop(guard);
259
260        // Second call sees existing Shared state (reentrant).
261        let guard = lock.lock_shared();
262        assert!(matches!(guard.state, Some(LockedState::Shared)));
263        drop(guard);
264
265        // `try_unlock` releases the file lock.
266        assert_eq!(lock.try_unlock(), Some(true));
267        assert_eq!(lock.try_unlock(), Some(false));
268    }
269
270    #[test]
271    fn concurrent_mixed_operations_stress() {
272        let lock = make_test_lock();
273        let mut handles = Vec::new();
274
275        // Shared-lock threads.
276        for _ in 0..6 {
277            let lock = Arc::clone(&lock);
278            handles.push(std::thread::spawn(move || {
279                for _ in 0..200 {
280                    let _guard = lock.lock_shared();
281                    std::thread::yield_now();
282                }
283            }));
284        }
285
286        // `try_unlock` threads.
287        for _ in 0..2 {
288            let lock = Arc::clone(&lock);
289            handles.push(std::thread::spawn(move || {
290                for _ in 0..200 {
291                    let _ = lock.try_unlock();
292                    std::thread::yield_now();
293                }
294            }));
295        }
296
297        // `try_lock_eviction` threads.
298        for _ in 0..2 {
299            let lock = Arc::clone(&lock);
300            handles.push(std::thread::spawn(move || {
301                for _ in 0..200 {
302                    drop(lock.try_lock_eviction());
303                    std::thread::yield_now();
304                }
305            }));
306        }
307
308        for h in handles {
309            h.join().unwrap();
310        }
311    }
312}