polars_io/file_cache/
entry.rs

1use std::io::{Seek, SeekFrom};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::AtomicU64;
4use std::sync::{Arc, LazyLock, Mutex};
5
6use fs4::fs_std::FileExt;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
9
10use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
11use super::file_fetcher::{FileFetcher, RemoteMetadata};
12use super::file_lock::{FileLock, FileLockAnyGuard};
13use super::metadata::{EntryMetadata, FileVersion};
14use super::utils::update_last_accessed;
15
16pub(super) const DATA_PREFIX: u8 = b'd';
17pub(super) const METADATA_PREFIX: u8 = b'm';
18
19struct CachedData {
20    last_modified: u64,
21    metadata: Arc<EntryMetadata>,
22    data_file_path: PathBuf,
23}
24
25struct Inner {
26    uri: Arc<str>,
27    uri_hash: String,
28    path_prefix: Arc<Path>,
29    metadata: FileLock<PathBuf>,
30    cached_data: Option<CachedData>,
31    ttl: Arc<AtomicU64>,
32    file_fetcher: Arc<dyn FileFetcher>,
33}
34
35struct EntryData {
36    uri: Arc<str>,
37    inner: Mutex<Inner>,
38    ttl: Arc<AtomicU64>,
39}
40
41pub struct FileCacheEntry(EntryData);
42
43impl EntryMetadata {
44    fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
45        self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
46    }
47}
48
49impl Inner {
50    fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
51        let verbose = config::verbose();
52
53        {
54            let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
55            // We want to use an exclusive lock here to avoid an API call in the case where only the
56            // local TTL was updated.
57            let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
58            update_last_accessed(metadata_file);
59
60            if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
61                let data_file_path = self.get_cached_data_file_path();
62
63                if metadata.compare_local_state(data_file_path).is_ok() {
64                    if verbose {
65                        eprintln!(
66                            "[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",
67                            self.uri.clone()
68                        );
69                    }
70                    return Ok(finish_open(data_file_path, metadata_file));
71                }
72            }
73        }
74
75        if verbose {
76            eprintln!(
77                "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
78                self.uri.clone()
79            );
80        }
81
82        self.try_open_check_latest()
83    }
84
85    fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
86        let verbose = config::verbose();
87        let remote_metadata = &self.file_fetcher.fetch_metadata()?;
88        let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
89
90        {
91            let metadata_file = &mut self.metadata.acquire_shared().unwrap();
92            update_last_accessed(metadata_file);
93
94            if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
95                if metadata.matches_remote_metadata(remote_metadata) {
96                    let data_file_path = self.get_cached_data_file_path();
97
98                    if metadata.compare_local_state(data_file_path).is_ok() {
99                        if verbose {
100                            eprintln!(
101                                "[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",
102                                self.uri.clone()
103                            );
104                        }
105                        return Ok(finish_open(data_file_path, metadata_file));
106                    }
107                }
108            }
109        }
110
111        let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
112        let metadata = self
113            .try_get_metadata(metadata_file, &cache_guard)
114            // Safety: `metadata_file` is an exclusive guard.
115            .unwrap_or_else(|_| {
116                Arc::new(EntryMetadata::new(
117                    self.uri.clone(),
118                    self.ttl.load(std::sync::atomic::Ordering::Relaxed),
119                ))
120            });
121
122        if metadata.matches_remote_metadata(remote_metadata) {
123            let data_file_path = self.get_cached_data_file_path();
124
125            if metadata.compare_local_state(data_file_path).is_ok() {
126                if verbose {
127                    eprintln!(
128                        "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
129                        self.uri.clone()
130                    );
131                }
132                return Ok(finish_open(data_file_path, metadata_file));
133            }
134        }
135
136        if verbose {
137            eprintln!(
138                "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
139                self.uri.clone(),
140                remote_metadata.version,
141                remote_metadata.size
142            );
143        }
144
145        let data_file_path = &get_data_file_path(
146            self.path_prefix.to_str().unwrap().as_bytes(),
147            self.uri_hash.as_bytes(),
148            &remote_metadata.version,
149        );
150        // Remove the file if it exists, since it doesn't match the metadata.
151        // This could be left from an aborted process.
152        let _ = std::fs::remove_file(data_file_path);
153        if !self.file_fetcher.fetches_as_symlink() {
154            let file = std::fs::OpenOptions::new()
155                .write(true)
156                .create(true)
157                .truncate(true)
158                .open(data_file_path)
159                .map_err(PolarsError::from)?;
160
161            // * Some(true)   => always raise
162            // * Some(false)  => never raise
163            // * None         => do not raise if fallocate() is not permitted, otherwise raise.
164            static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {
165                let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
166                    Ok("1") => Some(false),
167                    Ok("0") => Some(true),
168                    Err(_) => None,
169                    Ok(v) => {
170                        panic!("invalid value {v} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR")
171                    },
172                };
173                if config::verbose() {
174                    eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {v:?}");
175                }
176                v
177            });
178
179            // Initialize it to get the verbose print
180            let raise_alloc_err = *RAISE_ALLOC_ERROR;
181
182            file.lock_exclusive().unwrap();
183            if let Err(e) = file.allocate(remote_metadata.size) {
184                let msg = format!(
185                    "failed to reserve {} bytes on disk to download uri = {}: {:?}",
186                    remote_metadata.size,
187                    self.uri.as_ref(),
188                    e
189                );
190
191                if raise_alloc_err == Some(true)
192                    || (raise_alloc_err.is_none() && file.allocate(1).is_ok())
193                {
194                    polars_bail!(ComputeError: msg)
195                } else if config::verbose() {
196                    eprintln!("[file_cache]: warning: {msg}")
197                }
198            }
199        }
200        self.file_fetcher.fetch(data_file_path)?;
201
202        // Don't do this on windows as it will break setting last accessed times.
203        #[cfg(target_family = "unix")]
204        if !self.file_fetcher.fetches_as_symlink() {
205            let mut perms = std::fs::metadata(data_file_path.clone())
206                .unwrap()
207                .permissions();
208            perms.set_readonly(true);
209            std::fs::set_permissions(data_file_path, perms).unwrap();
210        }
211
212        let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
213        let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
214        let local_size = data_file_metadata.len();
215
216        if local_size != remote_metadata.size {
217            polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
218        }
219
220        let mut metadata = metadata;
221        let metadata = Arc::make_mut(&mut metadata);
222        metadata.local_last_modified = local_last_modified;
223        metadata.local_size = local_size;
224        metadata.remote_version = remote_metadata.version.clone();
225
226        if let Err(e) = metadata.compare_local_state(data_file_path) {
227            panic!("metadata mismatch after file fetch: {e}");
228        }
229
230        let data_file = finish_open(data_file_path, metadata_file);
231
232        metadata_file.set_len(0).unwrap();
233        metadata_file.seek(SeekFrom::Start(0)).unwrap();
234        metadata
235            .try_write(&mut **metadata_file)
236            .map_err(to_compute_err)?;
237
238        Ok(data_file)
239    }
240
241    /// Try to read the metadata from disk. If `F` is an exclusive guard, this
242    /// will update the TTL stored in the metadata file if it does not match.
243    fn try_get_metadata<F: FileLockAnyGuard>(
244        &mut self,
245        metadata_file: &mut F,
246        _cache_guard: &cache_lock::GlobalFileCacheGuardAny,
247    ) -> PolarsResult<Arc<EntryMetadata>> {
248        let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
249        let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
250
251        for _ in 0..2 {
252            if let Some(ref cached) = self.cached_data {
253                if cached.last_modified == last_modified {
254                    if cached.metadata.ttl != ttl {
255                        polars_bail!(ComputeError: "TTL mismatch");
256                    }
257
258                    if cached.metadata.uri != self.uri {
259                        unimplemented!(
260                            "hash collision: uri1 = {}, uri2 = {}, hash = {}",
261                            cached.metadata.uri,
262                            self.uri,
263                            self.uri_hash,
264                        );
265                    }
266
267                    return Ok(cached.metadata.clone());
268                }
269            }
270
271            // Ensure cache is unset if read fails
272            self.cached_data = None;
273
274            let mut metadata =
275                EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
276
277            // Note this means if multiple processes on the same system set a
278            // different TTL for the same path, the metadata file will constantly
279            // get overwritten.
280            if metadata.ttl != ttl {
281                if F::IS_EXCLUSIVE {
282                    metadata.ttl = ttl;
283                    metadata_file.set_len(0).unwrap();
284                    metadata_file.seek(SeekFrom::Start(0)).unwrap();
285                    metadata
286                        .try_write(&mut **metadata_file)
287                        .map_err(to_compute_err)?;
288                } else {
289                    polars_bail!(ComputeError: "TTL mismatch");
290                }
291            }
292
293            let metadata = Arc::new(metadata);
294            let data_file_path = get_data_file_path(
295                self.path_prefix.to_str().unwrap().as_bytes(),
296                self.uri_hash.as_bytes(),
297                &metadata.remote_version,
298            );
299            self.cached_data = Some(CachedData {
300                last_modified,
301                metadata,
302                data_file_path,
303            });
304        }
305
306        unreachable!();
307    }
308
309    /// # Panics
310    /// Panics if `self.cached_data` is `None`.
311    fn get_cached_data_file_path(&self) -> &Path {
312        &self.cached_data.as_ref().unwrap().data_file_path
313    }
314}
315
316impl FileCacheEntry {
317    pub(crate) fn new(
318        uri: Arc<str>,
319        uri_hash: String,
320        path_prefix: Arc<Path>,
321        file_fetcher: Arc<dyn FileFetcher>,
322        file_cache_ttl: u64,
323    ) -> Self {
324        let metadata = FileLock::from(get_metadata_file_path(
325            path_prefix.to_str().unwrap().as_bytes(),
326            uri_hash.as_bytes(),
327        ));
328
329        debug_assert!(
330            Arc::ptr_eq(&uri, file_fetcher.get_uri()),
331            "impl error: entry uri != file_fetcher uri"
332        );
333
334        let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
335
336        Self(EntryData {
337            uri: uri.clone(),
338            inner: Mutex::new(Inner {
339                uri,
340                uri_hash,
341                path_prefix,
342                metadata,
343                cached_data: None,
344                ttl: ttl.clone(),
345                file_fetcher,
346            }),
347            ttl,
348        })
349    }
350
351    pub fn uri(&self) -> &Arc<str> {
352        &self.0.uri
353    }
354
355    /// Directly returns the cached file if it finds one without checking if
356    /// there is a newer version on the remote. This does not make any API calls
357    /// if it finds a cached file, otherwise it simply downloads the file.
358    pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
359        self.0.inner.lock().unwrap().try_open_assume_latest()
360    }
361
362    /// Returns the cached file after ensuring it is up to date against the remote
363    /// This will always perform at least 1 API call for fetching metadata.
364    pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
365        self.0.inner.lock().unwrap().try_open_check_latest()
366    }
367
368    pub fn update_ttl(&self, ttl: u64) {
369        self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
370    }
371}
372
373fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
374    let file = {
375        #[cfg(not(target_family = "windows"))]
376        {
377            std::fs::OpenOptions::new()
378                .read(true)
379                .open(data_file_path)
380                .unwrap()
381        }
382        // windows requires write access to update the last accessed time
383        #[cfg(target_family = "windows")]
384        {
385            std::fs::OpenOptions::new()
386                .read(true)
387                .write(true)
388                .open(data_file_path)
389                .unwrap()
390        }
391    };
392    update_last_accessed(&file);
393    if FileExt::try_lock_shared(&file).is_err() {
394        panic!(
395            "finish_open: could not acquire shared lock on data file at {}",
396            data_file_path.to_str().unwrap()
397        );
398    }
399    file
400}
401
402/// `[prefix]/d/[uri hash][last modified]`
403fn get_data_file_path(
404    path_prefix: &[u8],
405    uri_hash: &[u8],
406    remote_version: &FileVersion,
407) -> PathBuf {
408    let owned;
409    let path = [
410        path_prefix,
411        &[b'/', DATA_PREFIX, b'/'],
412        uri_hash,
413        match remote_version {
414            FileVersion::Timestamp(v) => {
415                owned = Some(format!("{v:013x}"));
416                owned.as_deref().unwrap()
417            },
418            FileVersion::ETag(v) => v.as_str(),
419            FileVersion::Uninitialized => panic!("impl error: version not initialized"),
420        }
421        .as_bytes(),
422    ]
423    .concat();
424    PathBuf::from(String::from_utf8(path).unwrap())
425}
426
427/// `[prefix]/m/[uri hash]`
428fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
429    let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
430    PathBuf::from(String::from_utf8(bytes).unwrap())
431}