polars_io/file_cache/
cache.rs

1use std::path::Path;
2use std::sync::atomic::AtomicU64;
3use std::sync::{Arc, LazyLock, RwLock};
4
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::aliases::PlHashMap;
8use polars_utils::plpath::PlPathRef;
9
10use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};
11use super::eviction::EvictionManager;
12use super::file_fetcher::FileFetcher;
13use super::utils::FILE_CACHE_PREFIX;
14use crate::path_utils::ensure_directory_init;
15
16pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
17    let prefix = FILE_CACHE_PREFIX.as_ref();
18    let prefix = Arc::<Path>::from(prefix);
19
20    if config::verbose() {
21        eprintln!("file cache prefix: {}", prefix.to_str().unwrap());
22    }
23
24    let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));
25    let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());
26
27    let metadata_dir = prefix
28        .as_ref()
29        .join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap())
30        .into_boxed_path();
31    if let Err(err) = ensure_directory_init(&metadata_dir) {
32        panic!(
33            "failed to create file cache metadata directory: path = {}, err = {}",
34            metadata_dir.to_str().unwrap(),
35            err
36        )
37    }
38
39    let data_dir = prefix
40        .as_ref()
41        .join(std::str::from_utf8(&[DATA_PREFIX]).unwrap())
42        .into_boxed_path();
43
44    if let Err(err) = ensure_directory_init(&data_dir) {
45        panic!(
46            "failed to create file cache data directory: path = {}, err = {}",
47            data_dir.to_str().unwrap(),
48            err
49        )
50    }
51
52    EvictionManager {
53        data_dir,
54        metadata_dir,
55        files_to_remove: None,
56        min_ttl: min_ttl.clone(),
57        notify_ttl_updated: notify_ttl_updated.clone(),
58    }
59    .run_in_background();
60
61    // Safety: We have created the data and metadata directories.
62    unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }
63});
64
65pub struct FileCache {
66    prefix: Arc<Path>,
67    entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,
68    min_ttl: Arc<AtomicU64>,
69    notify_ttl_updated: Arc<tokio::sync::Notify>,
70}
71
72impl FileCache {
73    /// # Safety
74    /// The following directories exist:
75    /// * `{prefix}/{METADATA_PREFIX}/`
76    /// * `{prefix}/{DATA_PREFIX}/`
77    unsafe fn new_unchecked(
78        prefix: Arc<Path>,
79        min_ttl: Arc<AtomicU64>,
80        notify_ttl_updated: Arc<tokio::sync::Notify>,
81    ) -> Self {
82        Self {
83            prefix,
84            entries: Default::default(),
85            min_ttl,
86            notify_ttl_updated,
87        }
88    }
89
90    /// If `uri` is a local path, it must be an absolute path. This is not exposed
91    /// for now - initialize entries using `init_entries_from_uri_list` instead.
92    pub(super) fn init_entry(
93        &self,
94        uri: Arc<str>,
95        get_file_fetcher: &dyn Fn() -> PolarsResult<Arc<dyn FileFetcher>>,
96        ttl: u64,
97    ) -> PolarsResult<Arc<FileCacheEntry>> {
98        let verbose = config::verbose();
99
100        #[cfg(debug_assertions)]
101        {
102            // Local paths must be absolute or else the cache would be wrong.
103            if !PlPathRef::new(uri.as_ref()).is_cloud_url() {
104                let path = Path::new(uri.as_ref());
105                assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());
106            }
107        }
108
109        if self
110            .min_ttl
111            .fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)
112            < ttl
113        {
114            self.notify_ttl_updated.notify_one();
115        }
116
117        {
118            let entries = self.entries.read().unwrap();
119
120            if let Some(entry) = entries.get(uri.as_ref()) {
121                if verbose {
122                    eprintln!(
123                        "[file_cache] init_entry: return existing entry for uri = {}",
124                        uri.clone()
125                    );
126                }
127                entry.update_ttl(ttl);
128                return Ok(entry.clone());
129            }
130        }
131
132        let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();
133
134        {
135            let mut entries = self.entries.write().unwrap();
136
137            // May have been raced
138            if let Some(entry) = entries.get(uri.as_ref()) {
139                if verbose {
140                    eprintln!(
141                        "[file_cache] init_entry: return existing entry for uri = {} (lost init race)",
142                        uri.clone()
143                    );
144                }
145                entry.update_ttl(ttl);
146                return Ok(entry.clone());
147            }
148
149            if verbose {
150                eprintln!(
151                    "[file_cache] init_entry: creating new entry for uri = {uri}, hash = {uri_hash}"
152                );
153            }
154
155            let entry = Arc::new(FileCacheEntry::new(
156                uri.clone(),
157                uri_hash,
158                self.prefix.clone(),
159                get_file_fetcher()?,
160                ttl,
161            ));
162            entries.insert(uri, entry.clone());
163            Ok(entry)
164        }
165    }
166
167    /// This function can accept relative local paths.
168    pub fn get_entry(&self, addr: PlPathRef<'_>) -> Option<Arc<FileCacheEntry>> {
169        match addr {
170            PlPathRef::Local(p) => {
171                let p = std::fs::canonicalize(p).unwrap();
172                self.entries
173                    .read()
174                    .unwrap()
175                    .get(p.to_str().unwrap())
176                    .map(Arc::clone)
177            },
178            PlPathRef::Cloud(p) => self.entries.read().unwrap().get(p.uri()).map(Arc::clone),
179        }
180    }
181}
182
183pub fn get_env_file_cache_ttl() -> u64 {
184    std::env::var("POLARS_FILE_CACHE_TTL")
185        .map(|x| x.parse::<u64>().expect("integer"))
186        .unwrap_or(60 * 60)
187}