1use std::io::{Seek, SeekFrom};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::AtomicU64;
4use std::sync::{Arc, LazyLock, Mutex};
5
6use fs4::fs_std::FileExt;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
9use polars_utils::pl_path::PlRefPath;
10
11use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
12use super::file_fetcher::{FileFetcher, RemoteMetadata};
13use super::file_lock::{FileLock, FileLockAnyGuard};
14use super::metadata::{EntryMetadata, FileVersion};
15use super::utils::update_last_accessed;
16
17pub(super) const DATA_PREFIX: u8 = b'd';
18pub(super) const METADATA_PREFIX: u8 = b'm';
19
20struct CachedData {
21 last_modified: u64,
22 metadata: Arc<EntryMetadata>,
23 data_file_path: PathBuf,
24}
25
26struct Inner {
27 uri: PlRefPath,
28 uri_hash: String,
29 path_prefix: PlRefPath,
30 metadata: FileLock<PathBuf>,
31 cached_data: Option<CachedData>,
32 ttl: Arc<AtomicU64>,
33 file_fetcher: Arc<dyn FileFetcher>,
34}
35
36struct EntryData {
37 uri: PlRefPath,
38 inner: Mutex<Inner>,
39 ttl: Arc<AtomicU64>,
40}
41
42pub struct FileCacheEntry(EntryData);
43
44impl EntryMetadata {
45 fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
46 self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
47 }
48}
49
50impl Inner {
51 fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
52 let verbose = config::verbose();
53
54 {
55 let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
56 let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
59 update_last_accessed(metadata_file);
60
61 if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
62 let data_file_path = self.get_cached_data_file_path();
63
64 if metadata.compare_local_state(data_file_path).is_ok() {
65 if verbose {
66 eprintln!(
67 "[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",
68 self.uri.clone()
69 );
70 }
71 return Ok(finish_open(data_file_path, metadata_file));
72 }
73 }
74 }
75
76 if verbose {
77 eprintln!(
78 "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
79 self.uri.clone()
80 );
81 }
82
83 self.try_open_check_latest()
84 }
85
86 fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
87 let verbose = config::verbose();
88 let remote_metadata = &self.file_fetcher.fetch_metadata()?;
89 let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
90
91 {
92 let metadata_file = &mut self.metadata.acquire_shared().unwrap();
93 update_last_accessed(metadata_file);
94
95 if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
96 if metadata.matches_remote_metadata(remote_metadata) {
97 let data_file_path = self.get_cached_data_file_path();
98
99 if metadata.compare_local_state(data_file_path).is_ok() {
100 if verbose {
101 eprintln!(
102 "[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",
103 self.uri.clone()
104 );
105 }
106 return Ok(finish_open(data_file_path, metadata_file));
107 }
108 }
109 }
110 }
111
112 let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
113 let metadata = self
114 .try_get_metadata(metadata_file, &cache_guard)
115 .unwrap_or_else(|_| {
117 Arc::new(EntryMetadata::new(
118 self.uri.clone(),
119 self.ttl.load(std::sync::atomic::Ordering::Relaxed),
120 ))
121 });
122
123 if metadata.matches_remote_metadata(remote_metadata) {
124 let data_file_path = self.get_cached_data_file_path();
125
126 if metadata.compare_local_state(data_file_path).is_ok() {
127 if verbose {
128 eprintln!(
129 "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
130 self.uri.clone()
131 );
132 }
133 return Ok(finish_open(data_file_path, metadata_file));
134 }
135 }
136
137 if verbose {
138 eprintln!(
139 "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
140 self.uri.clone(),
141 remote_metadata.version,
142 remote_metadata.size
143 );
144 }
145
146 let data_file_path = &get_data_file_path(
147 self.path_prefix.as_bytes(),
148 self.uri_hash.as_bytes(),
149 &remote_metadata.version,
150 );
151 let _ = std::fs::remove_file(data_file_path);
154 if !self.file_fetcher.fetches_as_symlink() {
155 let file = std::fs::OpenOptions::new()
156 .write(true)
157 .create(true)
158 .truncate(true)
159 .open(data_file_path)
160 .map_err(PolarsError::from)?;
161
162 static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {
166 let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
167 Ok("1") => Some(false),
168 Ok("0") => Some(true),
169 Err(_) => None,
170 Ok(v) => {
171 panic!("invalid value {v} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR")
172 },
173 };
174 if config::verbose() {
175 eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {v:?}");
176 }
177 v
178 });
179
180 let raise_alloc_err = *RAISE_ALLOC_ERROR;
182
183 file.lock_exclusive().unwrap();
184 if let Err(e) = file.allocate(remote_metadata.size) {
185 let msg = format!(
186 "failed to reserve {} bytes on disk to download uri = {}: {:?}",
187 remote_metadata.size, &self.uri, e
188 );
189
190 if raise_alloc_err == Some(true)
191 || (raise_alloc_err.is_none() && file.allocate(1).is_ok())
192 {
193 polars_bail!(ComputeError: msg)
194 } else if config::verbose() {
195 eprintln!("[file_cache]: warning: {msg}")
196 }
197 }
198 }
199 self.file_fetcher.fetch(data_file_path)?;
200
201 #[cfg(target_family = "unix")]
203 if !self.file_fetcher.fetches_as_symlink() {
204 let mut perms = std::fs::metadata(data_file_path.clone())
205 .unwrap()
206 .permissions();
207 perms.set_readonly(true);
208 std::fs::set_permissions(data_file_path, perms).unwrap();
209 }
210
211 let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
212 let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
213 let local_size = data_file_metadata.len();
214
215 if local_size != remote_metadata.size {
216 polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
217 }
218
219 let mut metadata = metadata;
220 let metadata = Arc::make_mut(&mut metadata);
221 metadata.local_last_modified = local_last_modified;
222 metadata.local_size = local_size;
223 metadata.remote_version = remote_metadata.version.clone();
224
225 if let Err(e) = metadata.compare_local_state(data_file_path) {
226 panic!("metadata mismatch after file fetch: {e}");
227 }
228
229 let data_file = finish_open(data_file_path, metadata_file);
230
231 metadata_file.set_len(0).unwrap();
232 metadata_file.seek(SeekFrom::Start(0)).unwrap();
233 metadata
234 .try_write(&mut **metadata_file)
235 .map_err(to_compute_err)?;
236
237 Ok(data_file)
238 }
239
240 fn try_get_metadata<F: FileLockAnyGuard>(
243 &mut self,
244 metadata_file: &mut F,
245 _cache_guard: &cache_lock::GlobalFileCacheGuardAny,
246 ) -> PolarsResult<Arc<EntryMetadata>> {
247 let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
248 let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
249
250 for _ in 0..2 {
251 if let Some(ref cached) = self.cached_data {
252 if cached.last_modified == last_modified {
253 if cached.metadata.ttl != ttl {
254 polars_bail!(ComputeError: "TTL mismatch");
255 }
256
257 if cached.metadata.uri != self.uri {
258 unimplemented!(
259 "hash collision: uri1 = {}, uri2 = {}, hash = {}",
260 cached.metadata.uri,
261 self.uri,
262 self.uri_hash,
263 );
264 }
265
266 return Ok(cached.metadata.clone());
267 }
268 }
269
270 self.cached_data = None;
272
273 let mut metadata =
274 EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
275
276 if metadata.ttl != ttl {
280 if F::IS_EXCLUSIVE {
281 metadata.ttl = ttl;
282 metadata_file.set_len(0).unwrap();
283 metadata_file.seek(SeekFrom::Start(0)).unwrap();
284 metadata
285 .try_write(&mut **metadata_file)
286 .map_err(to_compute_err)?;
287 } else {
288 polars_bail!(ComputeError: "TTL mismatch");
289 }
290 }
291
292 let metadata = Arc::new(metadata);
293 let data_file_path = get_data_file_path(
294 self.path_prefix.as_bytes(),
295 self.uri_hash.as_bytes(),
296 &metadata.remote_version,
297 );
298 self.cached_data = Some(CachedData {
299 last_modified,
300 metadata,
301 data_file_path,
302 });
303 }
304
305 unreachable!();
306 }
307
308 fn get_cached_data_file_path(&self) -> &Path {
311 &self.cached_data.as_ref().unwrap().data_file_path
312 }
313}
314
315impl FileCacheEntry {
316 pub(crate) fn new(
317 uri: PlRefPath,
318 uri_hash: String,
319 path_prefix: PlRefPath,
320 file_fetcher: Arc<dyn FileFetcher>,
321 file_cache_ttl: u64,
322 ) -> Self {
323 let metadata = FileLock::from(get_metadata_file_path(
324 path_prefix.as_bytes(),
325 uri_hash.as_bytes(),
326 ));
327
328 debug_assert!(
329 PlRefPath::ptr_eq(&uri, file_fetcher.get_uri()),
330 "impl error: entry uri != file_fetcher uri"
331 );
332
333 let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
334
335 Self(EntryData {
336 uri: uri.clone(),
337 inner: Mutex::new(Inner {
338 uri,
339 uri_hash,
340 path_prefix,
341 metadata,
342 cached_data: None,
343 ttl: ttl.clone(),
344 file_fetcher,
345 }),
346 ttl,
347 })
348 }
349
350 pub fn uri(&self) -> &PlRefPath {
351 &self.0.uri
352 }
353
354 pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
358 self.0.inner.lock().unwrap().try_open_assume_latest()
359 }
360
361 pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
364 self.0.inner.lock().unwrap().try_open_check_latest()
365 }
366
367 pub fn update_ttl(&self, ttl: u64) {
368 self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
369 }
370}
371
372fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
373 let file = {
374 #[cfg(not(target_family = "windows"))]
375 {
376 std::fs::OpenOptions::new()
377 .read(true)
378 .open(data_file_path)
379 .unwrap()
380 }
381 #[cfg(target_family = "windows")]
383 {
384 std::fs::OpenOptions::new()
385 .read(true)
386 .write(true)
387 .open(data_file_path)
388 .unwrap()
389 }
390 };
391 update_last_accessed(&file);
392 if FileExt::try_lock_shared(&file).is_err() {
393 panic!(
394 "finish_open: could not acquire shared lock on data file at {:?}",
395 data_file_path
396 );
397 }
398 file
399}
400
401fn get_data_file_path(
403 path_prefix: &[u8],
404 uri_hash: &[u8],
405 remote_version: &FileVersion,
406) -> PathBuf {
407 let owned;
408 let path = [
409 path_prefix,
410 &[b'/', DATA_PREFIX, b'/'],
411 uri_hash,
412 match remote_version {
413 FileVersion::Timestamp(v) => {
414 owned = Some(format!("{v:013x}"));
415 owned.as_deref().unwrap()
416 },
417 FileVersion::ETag(v) => v.as_str(),
418 FileVersion::Uninitialized => panic!("impl error: version not initialized"),
419 }
420 .as_bytes(),
421 ]
422 .concat();
423 PathBuf::from(String::from_utf8(path).unwrap())
424}
425
426fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
428 let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
429 PathBuf::from(String::from_utf8(bytes).unwrap())
430}