polars_io/file_cache/
cache_lock.rs
1use std::sync::atomic::AtomicBool;
2use std::sync::{Arc, LazyLock, RwLock, RwLockReadGuard, RwLockWriteGuard};
3use std::time::Duration;
4
5use fs4::fs_std::FileExt;
6
7use super::utils::FILE_CACHE_PREFIX;
8use crate::pl_async;
9
10pub(super) static GLOBAL_FILE_CACHE_LOCK: LazyLock<GlobalLock> = LazyLock::new(|| {
11 let path = FILE_CACHE_PREFIX.join(".process-lock");
12
13 let file = std::fs::OpenOptions::new()
14 .write(true)
15 .create(true)
16 .truncate(false)
17 .open(path)
18 .map_err(|err| {
19 panic!("failed to open/create global file cache lockfile: {}", err);
20 })
21 .unwrap();
22
23 let at_bool = Arc::new(AtomicBool::new(false));
24 let access_tracker = AccessTracker(at_bool.clone());
27 let notify_lock_acquired = Arc::new(tokio::sync::Notify::new());
28 let notify_lock_acquired_2 = notify_lock_acquired.clone();
29
30 pl_async::get_runtime().spawn(async move {
31 let access_tracker = at_bool;
32 let notify_lock_acquired = notify_lock_acquired_2;
33 let verbose = false;
34
35 loop {
36 if verbose {
37 eprintln!("file cache background unlock: waiting for acquisition notification");
38 }
39
40 notify_lock_acquired.notified().await;
41
42 if verbose {
43 eprintln!("file cache background unlock: got acquisition notification");
44 }
45
46 loop {
47 if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) {
48 if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() {
49 if unlocked_by_this_call && verbose {
50 eprintln!(
51 "file cache background unlock: unlocked global file cache lockfile"
52 );
53 }
54 break;
55 }
56 }
57 tokio::time::sleep(Duration::from_secs(3)).await;
58 }
59 }
60 });
61
62 GlobalLock {
63 inner: RwLock::new(GlobalLockData { file, state: None }),
64 access_tracker,
65 notify_lock_acquired,
66 }
67});
68
69pub(super) enum LockedState {
70 Shared,
72 #[allow(dead_code)]
73 Eviction,
75}
76
77#[allow(dead_code)]
78pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>;
79pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>;
80
81pub(super) struct GlobalLockData {
82 file: std::fs::File,
83 state: Option<LockedState>,
84}
85
86pub(super) struct GlobalLock {
87 inner: RwLock<GlobalLockData>,
88 access_tracker: AccessTracker,
89 notify_lock_acquired: Arc<tokio::sync::Notify>,
90}
91
92#[derive(Clone)]
98struct AccessTracker(Arc<AtomicBool>);
99
100impl Drop for AccessTracker {
101 fn drop(&mut self) {
102 self.0.store(true, std::sync::atomic::Ordering::Relaxed);
103 }
104}
105
106struct NotifyOnDrop(Arc<tokio::sync::Notify>);
107
108impl Drop for NotifyOnDrop {
109 fn drop(&mut self) {
110 self.0.notify_one();
111 }
112}
113
114impl GlobalLock {
115 fn get_access_tracker(&self) -> AccessTracker {
116 let at = self.access_tracker.clone();
117 at.0.store(true, std::sync::atomic::Ordering::Relaxed);
118 at
119 }
120
121 fn try_unlock(&self) -> Option<bool> {
126 if let Ok(mut this) = self.inner.try_write() {
127 if Arc::strong_count(&self.access_tracker.0) <= 2 {
128 return if this.state.take().is_some() {
129 FileExt::unlock(&this.file).unwrap();
130 Some(true)
131 } else {
132 Some(false)
133 };
134 }
135 }
136 None
137 }
138
139 pub(super) fn lock_shared(&self) -> GlobalFileCacheGuardAny {
141 let access_tracker = self.get_access_tracker();
142 let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
143
144 {
145 let this = self.inner.read().unwrap();
146
147 if let Some(LockedState::Shared) = this.state {
148 return this;
149 }
150 }
151
152 {
153 let mut this = self.inner.write().unwrap();
154
155 if let Some(LockedState::Eviction) = this.state {
156 FileExt::unlock(&this.file).unwrap();
157 this.state = None;
158 }
159
160 if this.state.is_none() {
161 FileExt::lock_shared(&this.file).unwrap();
162 this.state = Some(LockedState::Shared);
163 }
164 }
165
166 debug_assert!(Arc::strong_count(&access_tracker.0) > 2);
169
170 {
171 let this = self.inner.read().unwrap();
172
173 if let Some(LockedState::Eviction) = this.state {
174 drop(this);
176 return self.lock_shared();
177 }
178
179 assert!(
180 this.state.is_some(),
181 "impl error: global file cache lock was unlocked"
182 );
183 this
184 }
185 }
186
187 #[allow(dead_code)]
190 pub(super) fn try_lock_eviction(&self) -> Option<GlobalFileCacheGuardExclusive> {
191 let access_tracker = self.get_access_tracker();
192
193 if let Ok(mut this) = self.inner.try_write() {
194 if
195 Arc::strong_count(&access_tracker.0) > 3 {
200 return None;
201 }
202
203 let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
204
205 if let Some(ref state) = this.state {
206 if matches!(state, LockedState::Eviction) {
207 return Some(this);
208 }
209 }
210
211 if this.state.take().is_some() {
212 FileExt::unlock(&this.file).unwrap();
213 }
214
215 if this.file.try_lock_exclusive().is_ok() {
216 this.state = Some(LockedState::Eviction);
217 return Some(this);
218 }
219 }
220 None
221 }
222}