polars_io/file_cache/
cache.rs

1use std::path::Path;
2use std::sync::atomic::AtomicU64;
3use std::sync::{Arc, LazyLock, RwLock};
4
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::aliases::PlHashMap;
8
9use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};
10use super::eviction::EvictionManager;
11use super::file_fetcher::FileFetcher;
12use super::utils::FILE_CACHE_PREFIX;
13use crate::path_utils::{ensure_directory_init, is_cloud_url};
14
15pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
16    let prefix = FILE_CACHE_PREFIX.as_ref();
17    let prefix = Arc::<Path>::from(prefix);
18
19    if config::verbose() {
20        eprintln!("file cache prefix: {}", prefix.to_str().unwrap());
21    }
22
23    let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));
24    let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());
25
26    let metadata_dir = prefix
27        .as_ref()
28        .join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap())
29        .into_boxed_path();
30    if let Err(err) = ensure_directory_init(&metadata_dir) {
31        panic!(
32            "failed to create file cache metadata directory: path = {}, err = {}",
33            metadata_dir.to_str().unwrap(),
34            err
35        )
36    }
37
38    let data_dir = prefix
39        .as_ref()
40        .join(std::str::from_utf8(&[DATA_PREFIX]).unwrap())
41        .into_boxed_path();
42
43    if let Err(err) = ensure_directory_init(&data_dir) {
44        panic!(
45            "failed to create file cache data directory: path = {}, err = {}",
46            data_dir.to_str().unwrap(),
47            err
48        )
49    }
50
51    EvictionManager {
52        data_dir,
53        metadata_dir,
54        files_to_remove: None,
55        min_ttl: min_ttl.clone(),
56        notify_ttl_updated: notify_ttl_updated.clone(),
57    }
58    .run_in_background();
59
60    // Safety: We have created the data and metadata directories.
61    unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }
62});
63
64pub struct FileCache {
65    prefix: Arc<Path>,
66    entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,
67    min_ttl: Arc<AtomicU64>,
68    notify_ttl_updated: Arc<tokio::sync::Notify>,
69}
70
71impl FileCache {
72    /// # Safety
73    /// The following directories exist:
74    /// * `{prefix}/{METADATA_PREFIX}/`
75    /// * `{prefix}/{DATA_PREFIX}/`
76    unsafe fn new_unchecked(
77        prefix: Arc<Path>,
78        min_ttl: Arc<AtomicU64>,
79        notify_ttl_updated: Arc<tokio::sync::Notify>,
80    ) -> Self {
81        Self {
82            prefix,
83            entries: Default::default(),
84            min_ttl,
85            notify_ttl_updated,
86        }
87    }
88
89    /// If `uri` is a local path, it must be an absolute path. This is not exposed
90    /// for now - initialize entries using `init_entries_from_uri_list` instead.
91    pub(super) fn init_entry<F: Fn() -> PolarsResult<Arc<dyn FileFetcher>>>(
92        &self,
93        uri: Arc<str>,
94        get_file_fetcher: F,
95        ttl: u64,
96    ) -> PolarsResult<Arc<FileCacheEntry>> {
97        let verbose = config::verbose();
98
99        #[cfg(debug_assertions)]
100        {
101            // Local paths must be absolute or else the cache would be wrong.
102            if !crate::path_utils::is_cloud_url(uri.as_ref()) {
103                let path = Path::new(uri.as_ref());
104                assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());
105            }
106        }
107
108        if self
109            .min_ttl
110            .fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)
111            < ttl
112        {
113            self.notify_ttl_updated.notify_one();
114        }
115
116        {
117            let entries = self.entries.read().unwrap();
118
119            if let Some(entry) = entries.get(uri.as_ref()) {
120                if verbose {
121                    eprintln!(
122                        "[file_cache] init_entry: return existing entry for uri = {}",
123                        uri.clone()
124                    );
125                }
126                entry.update_ttl(ttl);
127                return Ok(entry.clone());
128            }
129        }
130
131        let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();
132
133        {
134            let mut entries = self.entries.write().unwrap();
135
136            // May have been raced
137            if let Some(entry) = entries.get(uri.as_ref()) {
138                if verbose {
139                    eprintln!(
140                        "[file_cache] init_entry: return existing entry for uri = {} (lost init race)",
141                        uri.clone()
142                    );
143                }
144                entry.update_ttl(ttl);
145                return Ok(entry.clone());
146            }
147
148            if verbose {
149                eprintln!(
150                    "[file_cache] init_entry: creating new entry for uri = {}, hash = {}",
151                    uri.clone(),
152                    uri_hash.clone()
153                );
154            }
155
156            let entry = Arc::new(FileCacheEntry::new(
157                uri.clone(),
158                uri_hash,
159                self.prefix.clone(),
160                get_file_fetcher()?,
161                ttl,
162            ));
163            entries.insert(uri, entry.clone());
164            Ok(entry.clone())
165        }
166    }
167
168    /// This function can accept relative local paths.
169    pub fn get_entry(&self, uri: &str) -> Option<Arc<FileCacheEntry>> {
170        if is_cloud_url(uri) {
171            self.entries.read().unwrap().get(uri).map(Arc::clone)
172        } else {
173            let uri = std::fs::canonicalize(uri).unwrap();
174            self.entries
175                .read()
176                .unwrap()
177                .get(uri.to_str().unwrap())
178                .map(Arc::clone)
179        }
180    }
181}
182
183pub fn get_env_file_cache_ttl() -> u64 {
184    std::env::var("POLARS_FILE_CACHE_TTL")
185        .map(|x| x.parse::<u64>().expect("integer"))
186        .unwrap_or(60 * 60)
187}