polars_io/file_cache/
cache_lock.rs1use std::sync::atomic::AtomicBool;
2use std::sync::{Arc, LazyLock};
3use std::time::Duration;
4
5use fs4::FileExt;
6use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
7use polars_core::runtime::ASYNC;
8
9use super::utils::FILE_CACHE_PREFIX;
10
11pub(super) static GLOBAL_FILE_CACHE_LOCK: LazyLock<GlobalLock> = LazyLock::new(|| {
12 let path = FILE_CACHE_PREFIX.join(".process-lock");
13
14 let file = std::fs::OpenOptions::new()
15 .write(true)
16 .create(true)
17 .truncate(false)
18 .open(path)
19 .map_err(|err| {
20 panic!("failed to open/create global file cache lockfile: {err}");
21 })
22 .unwrap();
23
24 let at_bool = Arc::new(AtomicBool::new(false));
25 let access_tracker = AccessTracker(at_bool.clone());
28 let notify_lock_acquired = Arc::new(tokio::sync::Notify::new());
29 let notify_lock_acquired_2 = notify_lock_acquired.clone();
30
31 ASYNC.spawn(async move {
32 let at_bool = std::mem::ManuallyDrop::new(at_bool);
33 let access_tracker = at_bool.as_ref();
34 let notify_lock_acquired = notify_lock_acquired_2;
35 let verbose = false;
36
37 loop {
38 if verbose {
39 eprintln!("file cache background unlock: waiting for acquisition notification");
40 }
41
42 notify_lock_acquired.notified().await;
43
44 if verbose {
45 eprintln!("file cache background unlock: got acquisition notification");
46 }
47
48 loop {
49 if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) {
50 if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() {
51 if unlocked_by_this_call && verbose {
52 eprintln!(
53 "file cache background unlock: unlocked global file cache lockfile"
54 );
55 }
56 break;
57 }
58 }
59 tokio::time::sleep(Duration::from_secs(3)).await;
60 }
61 }
62 });
63
64 GlobalLock {
65 inner: RwLock::new(GlobalLockData { file, state: None }),
66 access_tracker,
67 notify_lock_acquired,
68 }
69});
70
71pub(super) enum LockedState {
72 Shared,
74 #[allow(dead_code)]
75 Eviction,
77}
78
79#[allow(dead_code)]
80pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>;
81pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>;
82
83pub(super) struct GlobalLockData {
84 file: std::fs::File,
85 state: Option<LockedState>,
86}
87
88pub(super) struct GlobalLock {
89 inner: RwLock<GlobalLockData>,
90 access_tracker: AccessTracker,
91 notify_lock_acquired: Arc<tokio::sync::Notify>,
92}
93
94#[derive(Clone)]
100struct AccessTracker(Arc<AtomicBool>);
101
102impl Drop for AccessTracker {
103 fn drop(&mut self) {
104 self.0.store(true, std::sync::atomic::Ordering::Relaxed);
105 }
106}
107
108struct NotifyOnDrop(Arc<tokio::sync::Notify>);
109
110impl Drop for NotifyOnDrop {
111 fn drop(&mut self) {
112 self.0.notify_one();
113 }
114}
115
116impl GlobalLock {
117 fn get_access_tracker(&self) -> AccessTracker {
118 let at = self.access_tracker.clone();
119 at.0.store(true, std::sync::atomic::Ordering::Relaxed);
120 at
121 }
122
123 fn try_unlock(&self) -> Option<bool> {
128 if let Some(mut this) = self.inner.try_write() {
129 if Arc::strong_count(&self.access_tracker.0) <= 2 {
130 return if this.state.take().is_some() {
131 FileExt::unlock(&this.file).unwrap();
132 Some(true)
133 } else {
134 Some(false)
135 };
136 }
137 }
138 None
139 }
140
141 pub(super) fn lock_shared(&self) -> GlobalFileCacheGuardAny<'_> {
143 let access_tracker = self.get_access_tracker();
144 let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
145
146 {
147 let this = self.inner.read();
148
149 if let Some(LockedState::Shared) = this.state {
150 return this;
151 }
152 }
153
154 {
155 let mut this = self.inner.write();
156
157 if let Some(LockedState::Eviction) = this.state {
158 FileExt::unlock(&this.file).unwrap();
159 this.state = None;
160 }
161
162 if this.state.is_none() {
163 FileExt::lock_shared(&this.file).unwrap();
164 this.state = Some(LockedState::Shared);
165 }
166 }
167
168 debug_assert!(Arc::strong_count(&access_tracker.0) > 2);
171
172 {
173 let this = self.inner.read();
174
175 if let Some(LockedState::Eviction) = this.state {
176 drop(this);
178 return self.lock_shared();
179 }
180
181 assert!(
182 this.state.is_some(),
183 "impl error: global file cache lock was unlocked"
184 );
185 this
186 }
187 }
188
189 #[allow(dead_code)]
192 pub(super) fn try_lock_eviction(&self) -> Option<GlobalFileCacheGuardExclusive<'_>> {
193 let access_tracker = self.get_access_tracker();
194
195 if let Some(mut this) = self.inner.try_write() {
196 if
197 Arc::strong_count(&access_tracker.0) > 3 {
202 return None;
203 }
204
205 let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
206
207 if let Some(ref state) = this.state {
208 if matches!(state, LockedState::Eviction) {
209 return Some(this);
210 }
211 }
212
213 if this.state.take().is_some() {
214 FileExt::unlock(&this.file).unwrap();
215 }
216
217 if this.file.try_lock().is_ok() {
218 this.state = Some(LockedState::Eviction);
219 return Some(this);
220 }
221 }
222 None
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 fn make_test_lock() -> Arc<GlobalLock> {
231 let at_bool = Arc::new(AtomicBool::new(false));
232 std::mem::forget(at_bool.clone());
235 Arc::new(GlobalLock {
236 inner: RwLock::new(GlobalLockData {
237 file: tempfile::tempfile().unwrap(),
238 state: None,
239 }),
240 access_tracker: AccessTracker(at_bool),
241 notify_lock_acquired: Arc::new(tokio::sync::Notify::new()),
242 })
243 }
244
245 #[test]
246 fn try_unlock_when_not_locked() {
247 let lock = make_test_lock();
248 assert_eq!(lock.try_unlock(), Some(false));
249 }
250
251 #[test]
252 fn lock_shared_lifecycle() {
253 let lock = make_test_lock();
254
255 let guard = lock.lock_shared();
257 assert!(matches!(guard.state, Some(LockedState::Shared)));
258 drop(guard);
259
260 let guard = lock.lock_shared();
262 assert!(matches!(guard.state, Some(LockedState::Shared)));
263 drop(guard);
264
265 assert_eq!(lock.try_unlock(), Some(true));
267 assert_eq!(lock.try_unlock(), Some(false));
268 }
269
270 #[test]
271 fn concurrent_mixed_operations_stress() {
272 let lock = make_test_lock();
273 let mut handles = Vec::new();
274
275 for _ in 0..6 {
277 let lock = Arc::clone(&lock);
278 handles.push(std::thread::spawn(move || {
279 for _ in 0..200 {
280 let _guard = lock.lock_shared();
281 std::thread::yield_now();
282 }
283 }));
284 }
285
286 for _ in 0..2 {
288 let lock = Arc::clone(&lock);
289 handles.push(std::thread::spawn(move || {
290 for _ in 0..200 {
291 let _ = lock.try_unlock();
292 std::thread::yield_now();
293 }
294 }));
295 }
296
297 for _ in 0..2 {
299 let lock = Arc::clone(&lock);
300 handles.push(std::thread::spawn(move || {
301 for _ in 0..200 {
302 drop(lock.try_lock_eviction());
303 std::thread::yield_now();
304 }
305 }));
306 }
307
308 for h in handles {
309 h.join().unwrap();
310 }
311 }
312}