polars_io/file_cache/
entry.rs

1use std::io::{Seek, SeekFrom};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::AtomicU64;
4use std::sync::{Arc, LazyLock, Mutex};
5
6use fs4::fs_std::FileExt;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
9use polars_utils::pl_path::PlRefPath;
10
11use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
12use super::file_fetcher::{FileFetcher, RemoteMetadata};
13use super::file_lock::{FileLock, FileLockAnyGuard};
14use super::metadata::{EntryMetadata, FileVersion};
15use super::utils::update_last_accessed;
16
17pub(super) const DATA_PREFIX: u8 = b'd';
18pub(super) const METADATA_PREFIX: u8 = b'm';
19
20struct CachedData {
21    last_modified: u64,
22    metadata: Arc<EntryMetadata>,
23    data_file_path: PathBuf,
24}
25
26struct Inner {
27    uri: PlRefPath,
28    uri_hash: String,
29    path_prefix: PlRefPath,
30    metadata: FileLock<PathBuf>,
31    cached_data: Option<CachedData>,
32    ttl: Arc<AtomicU64>,
33    file_fetcher: Arc<dyn FileFetcher>,
34}
35
36struct EntryData {
37    uri: PlRefPath,
38    inner: Mutex<Inner>,
39    ttl: Arc<AtomicU64>,
40}
41
42pub struct FileCacheEntry(EntryData);
43
44impl EntryMetadata {
45    fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
46        self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
47    }
48}
49
50impl Inner {
51    fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
52        let verbose = config::verbose();
53
54        {
55            let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
56            // We want to use an exclusive lock here to avoid an API call in the case where only the
57            // local TTL was updated.
58            let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
59            update_last_accessed(metadata_file);
60
61            if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
62                let data_file_path = self.get_cached_data_file_path();
63
64                if metadata.compare_local_state(data_file_path).is_ok() {
65                    if verbose {
66                        eprintln!(
67                            "[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",
68                            self.uri.clone()
69                        );
70                    }
71                    return Ok(finish_open(data_file_path, metadata_file));
72                }
73            }
74        }
75
76        if verbose {
77            eprintln!(
78                "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
79                self.uri.clone()
80            );
81        }
82
83        self.try_open_check_latest()
84    }
85
86    fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
87        let verbose = config::verbose();
88        let remote_metadata = &self.file_fetcher.fetch_metadata()?;
89        let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
90
91        {
92            let metadata_file = &mut self.metadata.acquire_shared().unwrap();
93            update_last_accessed(metadata_file);
94
95            if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
96                if metadata.matches_remote_metadata(remote_metadata) {
97                    let data_file_path = self.get_cached_data_file_path();
98
99                    if metadata.compare_local_state(data_file_path).is_ok() {
100                        if verbose {
101                            eprintln!(
102                                "[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",
103                                self.uri.clone()
104                            );
105                        }
106                        return Ok(finish_open(data_file_path, metadata_file));
107                    }
108                }
109            }
110        }
111
112        let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
113        let metadata = self
114            .try_get_metadata(metadata_file, &cache_guard)
115            // Safety: `metadata_file` is an exclusive guard.
116            .unwrap_or_else(|_| {
117                Arc::new(EntryMetadata::new(
118                    self.uri.clone(),
119                    self.ttl.load(std::sync::atomic::Ordering::Relaxed),
120                ))
121            });
122
123        if metadata.matches_remote_metadata(remote_metadata) {
124            let data_file_path = self.get_cached_data_file_path();
125
126            if metadata.compare_local_state(data_file_path).is_ok() {
127                if verbose {
128                    eprintln!(
129                        "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
130                        self.uri.clone()
131                    );
132                }
133                return Ok(finish_open(data_file_path, metadata_file));
134            }
135        }
136
137        if verbose {
138            eprintln!(
139                "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
140                self.uri.clone(),
141                remote_metadata.version,
142                remote_metadata.size
143            );
144        }
145
146        let data_file_path = &get_data_file_path(
147            self.path_prefix.as_bytes(),
148            self.uri_hash.as_bytes(),
149            &remote_metadata.version,
150        );
151        // Remove the file if it exists, since it doesn't match the metadata.
152        // This could be left from an aborted process.
153        let _ = std::fs::remove_file(data_file_path);
154        if !self.file_fetcher.fetches_as_symlink() {
155            let file = std::fs::OpenOptions::new()
156                .write(true)
157                .create(true)
158                .truncate(true)
159                .open(data_file_path)
160                .map_err(PolarsError::from)?;
161
162            // * Some(true)   => always raise
163            // * Some(false)  => never raise
164            // * None         => do not raise if fallocate() is not permitted, otherwise raise.
165            static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {
166                let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
167                    Ok("1") => Some(false),
168                    Ok("0") => Some(true),
169                    Err(_) => None,
170                    Ok(v) => {
171                        panic!("invalid value {v} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR")
172                    },
173                };
174                if config::verbose() {
175                    eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {v:?}");
176                }
177                v
178            });
179
180            // Initialize it to get the verbose print
181            let raise_alloc_err = *RAISE_ALLOC_ERROR;
182
183            file.lock_exclusive().unwrap();
184            if let Err(e) = file.allocate(remote_metadata.size) {
185                let msg = format!(
186                    "failed to reserve {} bytes on disk to download uri = {}: {:?}",
187                    remote_metadata.size, &self.uri, e
188                );
189
190                if raise_alloc_err == Some(true)
191                    || (raise_alloc_err.is_none() && file.allocate(1).is_ok())
192                {
193                    polars_bail!(ComputeError: msg)
194                } else if config::verbose() {
195                    eprintln!("[file_cache]: warning: {msg}")
196                }
197            }
198        }
199        self.file_fetcher.fetch(data_file_path)?;
200
201        // Don't do this on windows as it will break setting last accessed times.
202        #[cfg(target_family = "unix")]
203        if !self.file_fetcher.fetches_as_symlink() {
204            let mut perms = std::fs::metadata(data_file_path.clone())
205                .unwrap()
206                .permissions();
207            perms.set_readonly(true);
208            std::fs::set_permissions(data_file_path, perms).unwrap();
209        }
210
211        let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
212        let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
213        let local_size = data_file_metadata.len();
214
215        if local_size != remote_metadata.size {
216            polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
217        }
218
219        let mut metadata = metadata;
220        let metadata = Arc::make_mut(&mut metadata);
221        metadata.local_last_modified = local_last_modified;
222        metadata.local_size = local_size;
223        metadata.remote_version = remote_metadata.version.clone();
224
225        if let Err(e) = metadata.compare_local_state(data_file_path) {
226            panic!("metadata mismatch after file fetch: {e}");
227        }
228
229        let data_file = finish_open(data_file_path, metadata_file);
230
231        metadata_file.set_len(0).unwrap();
232        metadata_file.seek(SeekFrom::Start(0)).unwrap();
233        metadata
234            .try_write(&mut **metadata_file)
235            .map_err(to_compute_err)?;
236
237        Ok(data_file)
238    }
239
240    /// Try to read the metadata from disk. If `F` is an exclusive guard, this
241    /// will update the TTL stored in the metadata file if it does not match.
242    fn try_get_metadata<F: FileLockAnyGuard>(
243        &mut self,
244        metadata_file: &mut F,
245        _cache_guard: &cache_lock::GlobalFileCacheGuardAny,
246    ) -> PolarsResult<Arc<EntryMetadata>> {
247        let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
248        let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
249
250        for _ in 0..2 {
251            if let Some(ref cached) = self.cached_data {
252                if cached.last_modified == last_modified {
253                    if cached.metadata.ttl != ttl {
254                        polars_bail!(ComputeError: "TTL mismatch");
255                    }
256
257                    if cached.metadata.uri != self.uri {
258                        unimplemented!(
259                            "hash collision: uri1 = {}, uri2 = {}, hash = {}",
260                            cached.metadata.uri,
261                            self.uri,
262                            self.uri_hash,
263                        );
264                    }
265
266                    return Ok(cached.metadata.clone());
267                }
268            }
269
270            // Ensure cache is unset if read fails
271            self.cached_data = None;
272
273            let mut metadata =
274                EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
275
276            // Note this means if multiple processes on the same system set a
277            // different TTL for the same path, the metadata file will constantly
278            // get overwritten.
279            if metadata.ttl != ttl {
280                if F::IS_EXCLUSIVE {
281                    metadata.ttl = ttl;
282                    metadata_file.set_len(0).unwrap();
283                    metadata_file.seek(SeekFrom::Start(0)).unwrap();
284                    metadata
285                        .try_write(&mut **metadata_file)
286                        .map_err(to_compute_err)?;
287                } else {
288                    polars_bail!(ComputeError: "TTL mismatch");
289                }
290            }
291
292            let metadata = Arc::new(metadata);
293            let data_file_path = get_data_file_path(
294                self.path_prefix.as_bytes(),
295                self.uri_hash.as_bytes(),
296                &metadata.remote_version,
297            );
298            self.cached_data = Some(CachedData {
299                last_modified,
300                metadata,
301                data_file_path,
302            });
303        }
304
305        unreachable!();
306    }
307
308    /// # Panics
309    /// Panics if `self.cached_data` is `None`.
310    fn get_cached_data_file_path(&self) -> &Path {
311        &self.cached_data.as_ref().unwrap().data_file_path
312    }
313}
314
315impl FileCacheEntry {
316    pub(crate) fn new(
317        uri: PlRefPath,
318        uri_hash: String,
319        path_prefix: PlRefPath,
320        file_fetcher: Arc<dyn FileFetcher>,
321        file_cache_ttl: u64,
322    ) -> Self {
323        let metadata = FileLock::from(get_metadata_file_path(
324            path_prefix.as_bytes(),
325            uri_hash.as_bytes(),
326        ));
327
328        debug_assert!(
329            PlRefPath::ptr_eq(&uri, file_fetcher.get_uri()),
330            "impl error: entry uri != file_fetcher uri"
331        );
332
333        let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
334
335        Self(EntryData {
336            uri: uri.clone(),
337            inner: Mutex::new(Inner {
338                uri,
339                uri_hash,
340                path_prefix,
341                metadata,
342                cached_data: None,
343                ttl: ttl.clone(),
344                file_fetcher,
345            }),
346            ttl,
347        })
348    }
349
350    pub fn uri(&self) -> &PlRefPath {
351        &self.0.uri
352    }
353
354    /// Directly returns the cached file if it finds one without checking if
355    /// there is a newer version on the remote. This does not make any API calls
356    /// if it finds a cached file, otherwise it simply downloads the file.
357    pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
358        self.0.inner.lock().unwrap().try_open_assume_latest()
359    }
360
361    /// Returns the cached file after ensuring it is up to date against the remote
362    /// This will always perform at least 1 API call for fetching metadata.
363    pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
364        self.0.inner.lock().unwrap().try_open_check_latest()
365    }
366
367    pub fn update_ttl(&self, ttl: u64) {
368        self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
369    }
370}
371
372fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
373    let file = {
374        #[cfg(not(target_family = "windows"))]
375        {
376            std::fs::OpenOptions::new()
377                .read(true)
378                .open(data_file_path)
379                .unwrap()
380        }
381        // windows requires write access to update the last accessed time
382        #[cfg(target_family = "windows")]
383        {
384            std::fs::OpenOptions::new()
385                .read(true)
386                .write(true)
387                .open(data_file_path)
388                .unwrap()
389        }
390    };
391    update_last_accessed(&file);
392    if FileExt::try_lock_shared(&file).is_err() {
393        panic!(
394            "finish_open: could not acquire shared lock on data file at {:?}",
395            data_file_path
396        );
397    }
398    file
399}
400
401/// `[prefix]/d/[uri hash][last modified]`
402fn get_data_file_path(
403    path_prefix: &[u8],
404    uri_hash: &[u8],
405    remote_version: &FileVersion,
406) -> PathBuf {
407    let owned;
408    let path = [
409        path_prefix,
410        &[b'/', DATA_PREFIX, b'/'],
411        uri_hash,
412        match remote_version {
413            FileVersion::Timestamp(v) => {
414                owned = Some(format!("{v:013x}"));
415                owned.as_deref().unwrap()
416            },
417            FileVersion::ETag(v) => v.as_str(),
418            FileVersion::Uninitialized => panic!("impl error: version not initialized"),
419        }
420        .as_bytes(),
421    ]
422    .concat();
423    PathBuf::from(String::from_utf8(path).unwrap())
424}
425
426/// `[prefix]/m/[uri hash]`
427fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
428    let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
429    PathBuf::from(String::from_utf8(bytes).unwrap())
430}