polars_io/file_cache/
cache_lock.rsuse std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::time::Duration;
use fs4::fs_std::FileExt;
use once_cell::sync::Lazy;
use super::utils::FILE_CACHE_PREFIX;
use crate::pl_async;
pub(super) static GLOBAL_FILE_CACHE_LOCK: Lazy<GlobalLock> = Lazy::new(|| {
let path = FILE_CACHE_PREFIX.join(".process-lock");
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(|err| {
panic!("failed to open/create global file cache lockfile: {}", err);
})
.unwrap();
let at_bool = Arc::new(AtomicBool::new(false));
let access_tracker = AccessTracker(at_bool.clone());
let notify_lock_acquired = Arc::new(tokio::sync::Notify::new());
let notify_lock_acquired_2 = notify_lock_acquired.clone();
pl_async::get_runtime().spawn(async move {
let access_tracker = at_bool;
let notify_lock_acquired = notify_lock_acquired_2;
let verbose = false;
loop {
if verbose {
eprintln!("file cache background unlock: waiting for acquisition notification");
}
notify_lock_acquired.notified().await;
if verbose {
eprintln!("file cache background unlock: got acquisition notification");
}
loop {
if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) {
if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() {
if unlocked_by_this_call && verbose {
eprintln!(
"file cache background unlock: unlocked global file cache lockfile"
);
}
break;
}
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
});
GlobalLock {
inner: RwLock::new(GlobalLockData { file, state: None }),
access_tracker,
notify_lock_acquired,
}
});
pub(super) enum LockedState {
Shared,
#[allow(dead_code)]
Exclusive,
}
#[allow(dead_code)]
pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>;
pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>;
pub(super) struct GlobalLockData {
file: std::fs::File,
state: Option<LockedState>,
}
pub(super) struct GlobalLock {
inner: RwLock<GlobalLockData>,
access_tracker: AccessTracker,
notify_lock_acquired: Arc<tokio::sync::Notify>,
}
#[derive(Clone)]
struct AccessTracker(Arc<AtomicBool>);
impl Drop for AccessTracker {
fn drop(&mut self) {
self.0.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
struct NotifyOnDrop(Arc<tokio::sync::Notify>);
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
self.0.notify_one();
}
}
impl GlobalLock {
fn get_access_tracker(&self) -> AccessTracker {
let at = self.access_tracker.clone();
at.0.store(true, std::sync::atomic::Ordering::Relaxed);
at
}
fn try_unlock(&self) -> Option<bool> {
if let Ok(mut this) = self.inner.try_write() {
if Arc::strong_count(&self.access_tracker.0) <= 2 {
return if this.state.take().is_some() {
FileExt::unlock(&this.file).unwrap();
Some(true)
} else {
Some(false)
};
}
}
None
}
pub(super) fn lock_any(&self) -> GlobalFileCacheGuardAny {
let access_tracker = self.get_access_tracker();
let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
{
let this = self.inner.read().unwrap();
if this.state.is_some() {
return this;
}
}
{
let mut this = self.inner.write().unwrap();
if this.state.is_none() {
FileExt::lock_shared(&this.file).unwrap();
this.state = Some(LockedState::Shared);
}
}
debug_assert!(Arc::strong_count(&access_tracker.0) > 2);
{
let this = self.inner.read().unwrap();
assert!(
this.state.is_some(),
"impl error: global file cache lock was unlocked"
);
this
}
}
#[allow(dead_code)]
pub(super) fn try_lock_exclusive(&self) -> Option<GlobalFileCacheGuardExclusive> {
let access_tracker = self.get_access_tracker();
if let Ok(mut this) = self.inner.try_write() {
if
Arc::strong_count(&access_tracker.0) > 3 {
return None;
}
let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
if let Some(ref state) = this.state {
if matches!(state, LockedState::Exclusive) {
return Some(this);
}
}
if this.state.take().is_some() {
FileExt::unlock(&this.file).unwrap();
}
if this.file.try_lock_exclusive().is_ok() {
this.state = Some(LockedState::Exclusive);
return Some(this);
}
}
None
}
}