polars_io/file_cache/
cache.rs

1use std::sync::atomic::AtomicU64;
2use std::sync::{Arc, LazyLock, RwLock};
3
4use polars_core::config;
5use polars_error::PolarsResult;
6use polars_utils::aliases::PlHashMap;
7use polars_utils::pl_path::PlRefPath;
8
9use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};
10use super::eviction::EvictionManager;
11use super::file_fetcher::FileFetcher;
12use super::utils::FILE_CACHE_PREFIX;
13use crate::path_utils::ensure_directory_init;
14
15pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
16    let prefix = FILE_CACHE_PREFIX.clone();
17
18    if config::verbose() {
19        eprintln!("file cache prefix: {}", prefix);
20    }
21
22    let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));
23    let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());
24
25    let metadata_dir = prefix.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap());
26    if let Err(err) = ensure_directory_init(metadata_dir.as_std_path()) {
27        panic!(
28            "failed to create file cache metadata directory: path = {}, err = {}",
29            metadata_dir, err
30        )
31    }
32
33    let data_dir = prefix.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap());
34
35    if let Err(err) = ensure_directory_init(data_dir.as_std_path()) {
36        panic!(
37            "failed to create file cache data directory: path = {}, err = {}",
38            data_dir, err
39        )
40    }
41
42    EvictionManager {
43        data_dir,
44        metadata_dir,
45        files_to_remove: None,
46        min_ttl: min_ttl.clone(),
47        notify_ttl_updated: notify_ttl_updated.clone(),
48    }
49    .run_in_background();
50
51    // Safety: We have created the data and metadata directories.
52    unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }
53});
54
55pub struct FileCache {
56    prefix: PlRefPath,
57    entries: Arc<RwLock<PlHashMap<PlRefPath, Arc<FileCacheEntry>>>>,
58    min_ttl: Arc<AtomicU64>,
59    notify_ttl_updated: Arc<tokio::sync::Notify>,
60}
61
62impl FileCache {
63    /// # Safety
64    /// The following directories exist:
65    /// * `{prefix}/{METADATA_PREFIX}/`
66    /// * `{prefix}/{DATA_PREFIX}/`
67    unsafe fn new_unchecked(
68        prefix: PlRefPath,
69        min_ttl: Arc<AtomicU64>,
70        notify_ttl_updated: Arc<tokio::sync::Notify>,
71    ) -> Self {
72        Self {
73            prefix,
74            entries: Default::default(),
75            min_ttl,
76            notify_ttl_updated,
77        }
78    }
79
80    /// If `uri` is a local path, it must be an absolute path. This is not exposed
81    /// for now - initialize entries using `init_entries_from_uri_list` instead.
82    pub(super) fn init_entry(
83        &self,
84        uri: PlRefPath,
85        get_file_fetcher: &dyn Fn() -> PolarsResult<Arc<dyn FileFetcher>>,
86        ttl: u64,
87    ) -> PolarsResult<Arc<FileCacheEntry>> {
88        let verbose = config::verbose();
89
90        // Local paths must be absolute or else the cache would be wrong.
91        if !uri.has_scheme() {
92            debug_assert_eq!(
93                std::fs::canonicalize(uri.as_str())
94                    .ok()
95                    .and_then(|x| PlRefPath::try_from_pathbuf(x).ok())
96                    .as_ref(),
97                Some(&uri)
98            )
99        }
100
101        if self
102            .min_ttl
103            .fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)
104            < ttl
105        {
106            self.notify_ttl_updated.notify_one();
107        }
108
109        {
110            let entries = self.entries.read().unwrap();
111
112            if let Some(entry) = entries.get(&uri) {
113                if verbose {
114                    eprintln!(
115                        "[file_cache] init_entry: return existing entry for uri = {}",
116                        uri.clone()
117                    );
118                }
119                entry.update_ttl(ttl);
120                return Ok(entry.clone());
121            }
122        }
123
124        let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();
125
126        {
127            let mut entries = self.entries.write().unwrap();
128
129            // May have been raced
130            if let Some(entry) = entries.get(&uri) {
131                if verbose {
132                    eprintln!(
133                        "[file_cache] init_entry: return existing entry for uri = {} (lost init race)",
134                        uri.clone()
135                    );
136                }
137                entry.update_ttl(ttl);
138                return Ok(entry.clone());
139            }
140
141            if verbose {
142                eprintln!(
143                    "[file_cache] init_entry: creating new entry for uri = {uri}, hash = {uri_hash}"
144                );
145            }
146
147            let entry = Arc::new(FileCacheEntry::new(
148                uri.clone(),
149                uri_hash,
150                self.prefix.clone(),
151                get_file_fetcher()?,
152                ttl,
153            ));
154            entries.insert(uri.clone(), entry.clone());
155            Ok(entry)
156        }
157    }
158
159    /// This function can accept relative local paths.
160    pub fn get_entry(&self, path: PlRefPath) -> Option<Arc<FileCacheEntry>> {
161        if path.has_scheme() {
162            self.entries.read().unwrap().get(&path).cloned()
163        } else {
164            let p =
165                PlRefPath::try_from_pathbuf(std::fs::canonicalize(path.as_str()).unwrap()).unwrap();
166            self.entries.read().unwrap().get(&p).cloned()
167        }
168    }
169}
170
171pub fn get_env_file_cache_ttl() -> u64 {
172    std::env::var("POLARS_FILE_CACHE_TTL")
173        .map(|x| x.parse::<u64>().expect("integer"))
174        .unwrap_or(60 * 60)
175}