1use std::io::{Seek, SeekFrom};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::AtomicU64;
4use std::sync::{Arc, LazyLock, Mutex};
5
6use fs4::fs_std::FileExt;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
9
10use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
11use super::file_fetcher::{FileFetcher, RemoteMetadata};
12use super::file_lock::{FileLock, FileLockAnyGuard};
13use super::metadata::{EntryMetadata, FileVersion};
14use super::utils::update_last_accessed;
15
16pub(super) const DATA_PREFIX: u8 = b'd';
17pub(super) const METADATA_PREFIX: u8 = b'm';
18
19struct CachedData {
20 last_modified: u64,
21 metadata: Arc<EntryMetadata>,
22 data_file_path: PathBuf,
23}
24
25struct Inner {
26 uri: Arc<str>,
27 uri_hash: String,
28 path_prefix: Arc<Path>,
29 metadata: FileLock<PathBuf>,
30 cached_data: Option<CachedData>,
31 ttl: Arc<AtomicU64>,
32 file_fetcher: Arc<dyn FileFetcher>,
33}
34
35struct EntryData {
36 uri: Arc<str>,
37 inner: Mutex<Inner>,
38 ttl: Arc<AtomicU64>,
39}
40
41pub struct FileCacheEntry(EntryData);
42
43impl EntryMetadata {
44 fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
45 self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
46 }
47}
48
49impl Inner {
50 fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
51 let verbose = config::verbose();
52
53 {
54 let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
55 let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
58 update_last_accessed(metadata_file);
59
60 if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
61 let data_file_path = self.get_cached_data_file_path();
62
63 if metadata.compare_local_state(data_file_path).is_ok() {
64 if verbose {
65 eprintln!(
66 "[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",
67 self.uri.clone()
68 );
69 }
70 return Ok(finish_open(data_file_path, metadata_file));
71 }
72 }
73 }
74
75 if verbose {
76 eprintln!(
77 "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
78 self.uri.clone()
79 );
80 }
81
82 self.try_open_check_latest()
83 }
84
85 fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
86 let verbose = config::verbose();
87 let remote_metadata = &self.file_fetcher.fetch_metadata()?;
88 let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
89
90 {
91 let metadata_file = &mut self.metadata.acquire_shared().unwrap();
92 update_last_accessed(metadata_file);
93
94 if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
95 if metadata.matches_remote_metadata(remote_metadata) {
96 let data_file_path = self.get_cached_data_file_path();
97
98 if metadata.compare_local_state(data_file_path).is_ok() {
99 if verbose {
100 eprintln!(
101 "[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",
102 self.uri.clone()
103 );
104 }
105 return Ok(finish_open(data_file_path, metadata_file));
106 }
107 }
108 }
109 }
110
111 let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
112 let metadata = self
113 .try_get_metadata(metadata_file, &cache_guard)
114 .unwrap_or_else(|_| {
116 Arc::new(EntryMetadata::new(
117 self.uri.clone(),
118 self.ttl.load(std::sync::atomic::Ordering::Relaxed),
119 ))
120 });
121
122 if metadata.matches_remote_metadata(remote_metadata) {
123 let data_file_path = self.get_cached_data_file_path();
124
125 if metadata.compare_local_state(data_file_path).is_ok() {
126 if verbose {
127 eprintln!(
128 "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
129 self.uri.clone()
130 );
131 }
132 return Ok(finish_open(data_file_path, metadata_file));
133 }
134 }
135
136 if verbose {
137 eprintln!(
138 "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
139 self.uri.clone(),
140 remote_metadata.version,
141 remote_metadata.size
142 );
143 }
144
145 let data_file_path = &get_data_file_path(
146 self.path_prefix.to_str().unwrap().as_bytes(),
147 self.uri_hash.as_bytes(),
148 &remote_metadata.version,
149 );
150 let _ = std::fs::remove_file(data_file_path);
153 if !self.file_fetcher.fetches_as_symlink() {
154 let file = std::fs::OpenOptions::new()
155 .write(true)
156 .create(true)
157 .truncate(true)
158 .open(data_file_path)
159 .map_err(PolarsError::from)?;
160
161 static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {
165 let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
166 Ok("1") => Some(false),
167 Ok("0") => Some(true),
168 Err(_) => None,
169 Ok(v) => {
170 panic!("invalid value {v} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR")
171 },
172 };
173 if config::verbose() {
174 eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {v:?}");
175 }
176 v
177 });
178
179 let raise_alloc_err = *RAISE_ALLOC_ERROR;
181
182 file.lock_exclusive().unwrap();
183 if let Err(e) = file.allocate(remote_metadata.size) {
184 let msg = format!(
185 "failed to reserve {} bytes on disk to download uri = {}: {:?}",
186 remote_metadata.size,
187 self.uri.as_ref(),
188 e
189 );
190
191 if raise_alloc_err == Some(true)
192 || (raise_alloc_err.is_none() && file.allocate(1).is_ok())
193 {
194 polars_bail!(ComputeError: msg)
195 } else if config::verbose() {
196 eprintln!("[file_cache]: warning: {msg}")
197 }
198 }
199 }
200 self.file_fetcher.fetch(data_file_path)?;
201
202 #[cfg(target_family = "unix")]
204 if !self.file_fetcher.fetches_as_symlink() {
205 let mut perms = std::fs::metadata(data_file_path.clone())
206 .unwrap()
207 .permissions();
208 perms.set_readonly(true);
209 std::fs::set_permissions(data_file_path, perms).unwrap();
210 }
211
212 let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
213 let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
214 let local_size = data_file_metadata.len();
215
216 if local_size != remote_metadata.size {
217 polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
218 }
219
220 let mut metadata = metadata;
221 let metadata = Arc::make_mut(&mut metadata);
222 metadata.local_last_modified = local_last_modified;
223 metadata.local_size = local_size;
224 metadata.remote_version = remote_metadata.version.clone();
225
226 if let Err(e) = metadata.compare_local_state(data_file_path) {
227 panic!("metadata mismatch after file fetch: {e}");
228 }
229
230 let data_file = finish_open(data_file_path, metadata_file);
231
232 metadata_file.set_len(0).unwrap();
233 metadata_file.seek(SeekFrom::Start(0)).unwrap();
234 metadata
235 .try_write(&mut **metadata_file)
236 .map_err(to_compute_err)?;
237
238 Ok(data_file)
239 }
240
241 fn try_get_metadata<F: FileLockAnyGuard>(
244 &mut self,
245 metadata_file: &mut F,
246 _cache_guard: &cache_lock::GlobalFileCacheGuardAny,
247 ) -> PolarsResult<Arc<EntryMetadata>> {
248 let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
249 let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
250
251 for _ in 0..2 {
252 if let Some(ref cached) = self.cached_data {
253 if cached.last_modified == last_modified {
254 if cached.metadata.ttl != ttl {
255 polars_bail!(ComputeError: "TTL mismatch");
256 }
257
258 if cached.metadata.uri != self.uri {
259 unimplemented!(
260 "hash collision: uri1 = {}, uri2 = {}, hash = {}",
261 cached.metadata.uri,
262 self.uri,
263 self.uri_hash,
264 );
265 }
266
267 return Ok(cached.metadata.clone());
268 }
269 }
270
271 self.cached_data = None;
273
274 let mut metadata =
275 EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
276
277 if metadata.ttl != ttl {
281 if F::IS_EXCLUSIVE {
282 metadata.ttl = ttl;
283 metadata_file.set_len(0).unwrap();
284 metadata_file.seek(SeekFrom::Start(0)).unwrap();
285 metadata
286 .try_write(&mut **metadata_file)
287 .map_err(to_compute_err)?;
288 } else {
289 polars_bail!(ComputeError: "TTL mismatch");
290 }
291 }
292
293 let metadata = Arc::new(metadata);
294 let data_file_path = get_data_file_path(
295 self.path_prefix.to_str().unwrap().as_bytes(),
296 self.uri_hash.as_bytes(),
297 &metadata.remote_version,
298 );
299 self.cached_data = Some(CachedData {
300 last_modified,
301 metadata,
302 data_file_path,
303 });
304 }
305
306 unreachable!();
307 }
308
309 fn get_cached_data_file_path(&self) -> &Path {
312 &self.cached_data.as_ref().unwrap().data_file_path
313 }
314}
315
316impl FileCacheEntry {
317 pub(crate) fn new(
318 uri: Arc<str>,
319 uri_hash: String,
320 path_prefix: Arc<Path>,
321 file_fetcher: Arc<dyn FileFetcher>,
322 file_cache_ttl: u64,
323 ) -> Self {
324 let metadata = FileLock::from(get_metadata_file_path(
325 path_prefix.to_str().unwrap().as_bytes(),
326 uri_hash.as_bytes(),
327 ));
328
329 debug_assert!(
330 Arc::ptr_eq(&uri, file_fetcher.get_uri()),
331 "impl error: entry uri != file_fetcher uri"
332 );
333
334 let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
335
336 Self(EntryData {
337 uri: uri.clone(),
338 inner: Mutex::new(Inner {
339 uri,
340 uri_hash,
341 path_prefix,
342 metadata,
343 cached_data: None,
344 ttl: ttl.clone(),
345 file_fetcher,
346 }),
347 ttl,
348 })
349 }
350
351 pub fn uri(&self) -> &Arc<str> {
352 &self.0.uri
353 }
354
355 pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
359 self.0.inner.lock().unwrap().try_open_assume_latest()
360 }
361
362 pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
365 self.0.inner.lock().unwrap().try_open_check_latest()
366 }
367
368 pub fn update_ttl(&self, ttl: u64) {
369 self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
370 }
371}
372
373fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
374 let file = {
375 #[cfg(not(target_family = "windows"))]
376 {
377 std::fs::OpenOptions::new()
378 .read(true)
379 .open(data_file_path)
380 .unwrap()
381 }
382 #[cfg(target_family = "windows")]
384 {
385 std::fs::OpenOptions::new()
386 .read(true)
387 .write(true)
388 .open(data_file_path)
389 .unwrap()
390 }
391 };
392 update_last_accessed(&file);
393 if FileExt::try_lock_shared(&file).is_err() {
394 panic!(
395 "finish_open: could not acquire shared lock on data file at {}",
396 data_file_path.to_str().unwrap()
397 );
398 }
399 file
400}
401
402fn get_data_file_path(
404 path_prefix: &[u8],
405 uri_hash: &[u8],
406 remote_version: &FileVersion,
407) -> PathBuf {
408 let owned;
409 let path = [
410 path_prefix,
411 &[b'/', DATA_PREFIX, b'/'],
412 uri_hash,
413 match remote_version {
414 FileVersion::Timestamp(v) => {
415 owned = Some(format!("{v:013x}"));
416 owned.as_deref().unwrap()
417 },
418 FileVersion::ETag(v) => v.as_str(),
419 FileVersion::Uninitialized => panic!("impl error: version not initialized"),
420 }
421 .as_bytes(),
422 ]
423 .concat();
424 PathBuf::from(String::from_utf8(path).unwrap())
425}
426
427fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
429 let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
430 PathBuf::from(String::from_utf8(bytes).unwrap())
431}