polars_io/file_cache/
entry.rs

1use std::io::{Seek, SeekFrom};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::AtomicU64;
4use std::sync::{Arc, LazyLock, Mutex};
5
6use fs4::fs_std::FileExt;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
9
10use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
11use super::file_fetcher::{FileFetcher, RemoteMetadata};
12use super::file_lock::{FileLock, FileLockAnyGuard};
13use super::metadata::{EntryMetadata, FileVersion};
14use super::utils::update_last_accessed;
15
16pub(super) const DATA_PREFIX: u8 = b'd';
17pub(super) const METADATA_PREFIX: u8 = b'm';
18
19struct CachedData {
20    last_modified: u64,
21    metadata: Arc<EntryMetadata>,
22    data_file_path: PathBuf,
23}
24
25struct Inner {
26    uri: Arc<str>,
27    uri_hash: String,
28    path_prefix: Arc<Path>,
29    metadata: FileLock<PathBuf>,
30    cached_data: Option<CachedData>,
31    ttl: Arc<AtomicU64>,
32    file_fetcher: Arc<dyn FileFetcher>,
33}
34
35struct EntryData {
36    uri: Arc<str>,
37    inner: Mutex<Inner>,
38    ttl: Arc<AtomicU64>,
39}
40
41pub struct FileCacheEntry(EntryData);
42
43impl EntryMetadata {
44    fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
45        self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
46    }
47}
48
49impl Inner {
50    fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
51        let verbose = config::verbose();
52
53        {
54            let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
55            // We want to use an exclusive lock here to avoid an API call in the case where only the
56            // local TTL was updated.
57            let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
58            update_last_accessed(metadata_file);
59
60            if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
61                let data_file_path = self.get_cached_data_file_path();
62
63                if metadata.compare_local_state(data_file_path).is_ok() {
64                    if verbose {
65                        eprintln!(
66                            "[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",
67                            self.uri.clone()
68                        );
69                    }
70                    return Ok(finish_open(data_file_path, metadata_file));
71                }
72            }
73        }
74
75        if verbose {
76            eprintln!(
77                "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
78                self.uri.clone()
79            );
80        }
81
82        self.try_open_check_latest()
83    }
84
85    fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
86        let verbose = config::verbose();
87        let remote_metadata = &self.file_fetcher.fetch_metadata()?;
88        let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
89
90        {
91            let metadata_file = &mut self.metadata.acquire_shared().unwrap();
92            update_last_accessed(metadata_file);
93
94            if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
95                if metadata.matches_remote_metadata(remote_metadata) {
96                    let data_file_path = self.get_cached_data_file_path();
97
98                    if metadata.compare_local_state(data_file_path).is_ok() {
99                        if verbose {
100                            eprintln!(
101                                "[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",
102                                self.uri.clone()
103                            );
104                        }
105                        return Ok(finish_open(data_file_path, metadata_file));
106                    }
107                }
108            }
109        }
110
111        let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
112        let metadata = self
113            .try_get_metadata(metadata_file, &cache_guard)
114            // Safety: `metadata_file` is an exclusive guard.
115            .unwrap_or_else(|_| {
116                Arc::new(EntryMetadata::new(
117                    self.uri.clone(),
118                    self.ttl.load(std::sync::atomic::Ordering::Relaxed),
119                ))
120            });
121
122        if metadata.matches_remote_metadata(remote_metadata) {
123            let data_file_path = self.get_cached_data_file_path();
124
125            if metadata.compare_local_state(data_file_path).is_ok() {
126                if verbose {
127                    eprintln!(
128                        "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
129                        self.uri.clone()
130                    );
131                }
132                return Ok(finish_open(data_file_path, metadata_file));
133            }
134        }
135
136        if verbose {
137            eprintln!(
138                "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
139                self.uri.clone(),
140                remote_metadata.version,
141                remote_metadata.size
142            );
143        }
144
145        let data_file_path = &get_data_file_path(
146            self.path_prefix.to_str().unwrap().as_bytes(),
147            self.uri_hash.as_bytes(),
148            &remote_metadata.version,
149        );
150        // Remove the file if it exists, since it doesn't match the metadata.
151        // This could be left from an aborted process.
152        let _ = std::fs::remove_file(data_file_path);
153        if !self.file_fetcher.fetches_as_symlink() {
154            let file = std::fs::OpenOptions::new()
155                .write(true)
156                .create(true)
157                .truncate(true)
158                .open(data_file_path)
159                .map_err(PolarsError::from)?;
160
161            // * Some(true)   => always raise
162            // * Some(false)  => never raise
163            // * None         => do not raise if fallocate() is not permitted, otherwise raise.
164            static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {
165                let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
166                    Ok("1") => Some(false),
167                    Ok("0") => Some(true),
168                    Err(_) => None,
169                    Ok(v) => panic!(
170                        "invalid value {} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR",
171                        v
172                    ),
173                };
174                if config::verbose() {
175                    eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {:?}", v);
176                }
177                v
178            });
179
180            // Initialize it to get the verbose print
181            let raise_alloc_err = *RAISE_ALLOC_ERROR;
182
183            file.lock_exclusive().unwrap();
184            if let Err(e) = file.allocate(remote_metadata.size) {
185                let msg = format!(
186                    "failed to reserve {} bytes on disk to download uri = {}: {:?}",
187                    remote_metadata.size,
188                    self.uri.as_ref(),
189                    e
190                );
191
192                if raise_alloc_err == Some(true)
193                    || (raise_alloc_err.is_none() && file.allocate(1).is_ok())
194                {
195                    polars_bail!(ComputeError: msg)
196                } else if config::verbose() {
197                    eprintln!("[file_cache]: warning: {}", msg)
198                }
199            }
200        }
201        self.file_fetcher.fetch(data_file_path)?;
202
203        // Don't do this on windows as it will break setting last accessed times.
204        #[cfg(target_family = "unix")]
205        if !self.file_fetcher.fetches_as_symlink() {
206            let mut perms = std::fs::metadata(data_file_path.clone())
207                .unwrap()
208                .permissions();
209            perms.set_readonly(true);
210            std::fs::set_permissions(data_file_path, perms).unwrap();
211        }
212
213        let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
214        let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
215        let local_size = data_file_metadata.len();
216
217        if local_size != remote_metadata.size {
218            polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
219        }
220
221        let mut metadata = metadata;
222        let metadata = Arc::make_mut(&mut metadata);
223        metadata.local_last_modified = local_last_modified;
224        metadata.local_size = local_size;
225        metadata.remote_version = remote_metadata.version.clone();
226
227        if let Err(e) = metadata.compare_local_state(data_file_path) {
228            panic!("metadata mismatch after file fetch: {}", e);
229        }
230
231        let data_file = finish_open(data_file_path, metadata_file);
232
233        metadata_file.set_len(0).unwrap();
234        metadata_file.seek(SeekFrom::Start(0)).unwrap();
235        metadata
236            .try_write(&mut **metadata_file)
237            .map_err(to_compute_err)?;
238
239        Ok(data_file)
240    }
241
242    /// Try to read the metadata from disk. If `F` is an exclusive guard, this
243    /// will update the TTL stored in the metadata file if it does not match.
244    fn try_get_metadata<F: FileLockAnyGuard>(
245        &mut self,
246        metadata_file: &mut F,
247        _cache_guard: &cache_lock::GlobalFileCacheGuardAny,
248    ) -> PolarsResult<Arc<EntryMetadata>> {
249        let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
250        let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
251
252        for _ in 0..2 {
253            if let Some(ref cached) = self.cached_data {
254                if cached.last_modified == last_modified {
255                    if cached.metadata.ttl != ttl {
256                        polars_bail!(ComputeError: "TTL mismatch");
257                    }
258
259                    if cached.metadata.uri != self.uri {
260                        unimplemented!(
261                            "hash collision: uri1 = {}, uri2 = {}, hash = {}",
262                            cached.metadata.uri,
263                            self.uri,
264                            self.uri_hash,
265                        );
266                    }
267
268                    return Ok(cached.metadata.clone());
269                }
270            }
271
272            // Ensure cache is unset if read fails
273            self.cached_data = None;
274
275            let mut metadata =
276                EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
277
278            // Note this means if multiple processes on the same system set a
279            // different TTL for the same path, the metadata file will constantly
280            // get overwritten.
281            if metadata.ttl != ttl {
282                if F::IS_EXCLUSIVE {
283                    metadata.ttl = ttl;
284                    metadata_file.set_len(0).unwrap();
285                    metadata_file.seek(SeekFrom::Start(0)).unwrap();
286                    metadata
287                        .try_write(&mut **metadata_file)
288                        .map_err(to_compute_err)?;
289                } else {
290                    polars_bail!(ComputeError: "TTL mismatch");
291                }
292            }
293
294            let metadata = Arc::new(metadata);
295            let data_file_path = get_data_file_path(
296                self.path_prefix.to_str().unwrap().as_bytes(),
297                self.uri_hash.as_bytes(),
298                &metadata.remote_version,
299            );
300            self.cached_data = Some(CachedData {
301                last_modified,
302                metadata,
303                data_file_path,
304            });
305        }
306
307        unreachable!();
308    }
309
310    /// # Panics
311    /// Panics if `self.cached_data` is `None`.
312    fn get_cached_data_file_path(&self) -> &Path {
313        &self.cached_data.as_ref().unwrap().data_file_path
314    }
315}
316
317impl FileCacheEntry {
318    pub(crate) fn new(
319        uri: Arc<str>,
320        uri_hash: String,
321        path_prefix: Arc<Path>,
322        file_fetcher: Arc<dyn FileFetcher>,
323        file_cache_ttl: u64,
324    ) -> Self {
325        let metadata = FileLock::from(get_metadata_file_path(
326            path_prefix.to_str().unwrap().as_bytes(),
327            uri_hash.as_bytes(),
328        ));
329
330        debug_assert!(
331            Arc::ptr_eq(&uri, file_fetcher.get_uri()),
332            "impl error: entry uri != file_fetcher uri"
333        );
334
335        let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
336
337        Self(EntryData {
338            uri: uri.clone(),
339            inner: Mutex::new(Inner {
340                uri,
341                uri_hash,
342                path_prefix,
343                metadata,
344                cached_data: None,
345                ttl: ttl.clone(),
346                file_fetcher,
347            }),
348            ttl,
349        })
350    }
351
352    pub fn uri(&self) -> &Arc<str> {
353        &self.0.uri
354    }
355
356    /// Directly returns the cached file if it finds one without checking if
357    /// there is a newer version on the remote. This does not make any API calls
358    /// if it finds a cached file, otherwise it simply downloads the file.
359    pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
360        self.0.inner.lock().unwrap().try_open_assume_latest()
361    }
362
363    /// Returns the cached file after ensuring it is up to date against the remote
364    /// This will always perform at least 1 API call for fetching metadata.
365    pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
366        self.0.inner.lock().unwrap().try_open_check_latest()
367    }
368
369    pub fn update_ttl(&self, ttl: u64) {
370        self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
371    }
372}
373
374fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
375    let file = {
376        #[cfg(not(target_family = "windows"))]
377        {
378            std::fs::OpenOptions::new()
379                .read(true)
380                .open(data_file_path)
381                .unwrap()
382        }
383        // windows requires write access to update the last accessed time
384        #[cfg(target_family = "windows")]
385        {
386            std::fs::OpenOptions::new()
387                .read(true)
388                .write(true)
389                .open(data_file_path)
390                .unwrap()
391        }
392    };
393    update_last_accessed(&file);
394    if FileExt::try_lock_shared(&file).is_err() {
395        panic!(
396            "finish_open: could not acquire shared lock on data file at {}",
397            data_file_path.to_str().unwrap()
398        );
399    }
400    file
401}
402
403/// `[prefix]/d/[uri hash][last modified]`
404fn get_data_file_path(
405    path_prefix: &[u8],
406    uri_hash: &[u8],
407    remote_version: &FileVersion,
408) -> PathBuf {
409    let owned;
410    let path = [
411        path_prefix,
412        &[b'/', DATA_PREFIX, b'/'],
413        uri_hash,
414        match remote_version {
415            FileVersion::Timestamp(v) => {
416                owned = Some(format!("{:013x}", v));
417                owned.as_deref().unwrap()
418            },
419            FileVersion::ETag(v) => v.as_str(),
420            FileVersion::Uninitialized => panic!("impl error: version not initialized"),
421        }
422        .as_bytes(),
423    ]
424    .concat();
425    PathBuf::from(String::from_utf8(path).unwrap())
426}
427
428/// `[prefix]/m/[uri hash]`
429fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
430    let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
431    PathBuf::from(String::from_utf8(bytes).unwrap())
432}