1use std::io::{Seek, SeekFrom};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::AtomicU64;
4use std::sync::{Arc, LazyLock, Mutex};
5
6use fs4::fs_std::FileExt;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
9
10use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
11use super::file_fetcher::{FileFetcher, RemoteMetadata};
12use super::file_lock::{FileLock, FileLockAnyGuard};
13use super::metadata::{EntryMetadata, FileVersion};
14use super::utils::update_last_accessed;
15
16pub(super) const DATA_PREFIX: u8 = b'd';
17pub(super) const METADATA_PREFIX: u8 = b'm';
18
19struct CachedData {
20 last_modified: u64,
21 metadata: Arc<EntryMetadata>,
22 data_file_path: PathBuf,
23}
24
25struct Inner {
26 uri: Arc<str>,
27 uri_hash: String,
28 path_prefix: Arc<Path>,
29 metadata: FileLock<PathBuf>,
30 cached_data: Option<CachedData>,
31 ttl: Arc<AtomicU64>,
32 file_fetcher: Arc<dyn FileFetcher>,
33}
34
35struct EntryData {
36 uri: Arc<str>,
37 inner: Mutex<Inner>,
38 ttl: Arc<AtomicU64>,
39}
40
41pub struct FileCacheEntry(EntryData);
42
43impl EntryMetadata {
44 fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
45 self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
46 }
47}
48
49impl Inner {
50 fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
51 let verbose = config::verbose();
52
53 {
54 let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
55 let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
58 update_last_accessed(metadata_file);
59
60 if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
61 let data_file_path = self.get_cached_data_file_path();
62
63 if metadata.compare_local_state(data_file_path).is_ok() {
64 if verbose {
65 eprintln!(
66 "[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",
67 self.uri.clone()
68 );
69 }
70 return Ok(finish_open(data_file_path, metadata_file));
71 }
72 }
73 }
74
75 if verbose {
76 eprintln!(
77 "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
78 self.uri.clone()
79 );
80 }
81
82 self.try_open_check_latest()
83 }
84
85 fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
86 let verbose = config::verbose();
87 let remote_metadata = &self.file_fetcher.fetch_metadata()?;
88 let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
89
90 {
91 let metadata_file = &mut self.metadata.acquire_shared().unwrap();
92 update_last_accessed(metadata_file);
93
94 if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
95 if metadata.matches_remote_metadata(remote_metadata) {
96 let data_file_path = self.get_cached_data_file_path();
97
98 if metadata.compare_local_state(data_file_path).is_ok() {
99 if verbose {
100 eprintln!(
101 "[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",
102 self.uri.clone()
103 );
104 }
105 return Ok(finish_open(data_file_path, metadata_file));
106 }
107 }
108 }
109 }
110
111 let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
112 let metadata = self
113 .try_get_metadata(metadata_file, &cache_guard)
114 .unwrap_or_else(|_| {
116 Arc::new(EntryMetadata::new(
117 self.uri.clone(),
118 self.ttl.load(std::sync::atomic::Ordering::Relaxed),
119 ))
120 });
121
122 if metadata.matches_remote_metadata(remote_metadata) {
123 let data_file_path = self.get_cached_data_file_path();
124
125 if metadata.compare_local_state(data_file_path).is_ok() {
126 if verbose {
127 eprintln!(
128 "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
129 self.uri.clone()
130 );
131 }
132 return Ok(finish_open(data_file_path, metadata_file));
133 }
134 }
135
136 if verbose {
137 eprintln!(
138 "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
139 self.uri.clone(),
140 remote_metadata.version,
141 remote_metadata.size
142 );
143 }
144
145 let data_file_path = &get_data_file_path(
146 self.path_prefix.to_str().unwrap().as_bytes(),
147 self.uri_hash.as_bytes(),
148 &remote_metadata.version,
149 );
150 let _ = std::fs::remove_file(data_file_path);
153 if !self.file_fetcher.fetches_as_symlink() {
154 let file = std::fs::OpenOptions::new()
155 .write(true)
156 .create(true)
157 .truncate(true)
158 .open(data_file_path)
159 .map_err(PolarsError::from)?;
160
161 static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {
165 let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
166 Ok("1") => Some(false),
167 Ok("0") => Some(true),
168 Err(_) => None,
169 Ok(v) => panic!(
170 "invalid value {} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR",
171 v
172 ),
173 };
174 if config::verbose() {
175 eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {:?}", v);
176 }
177 v
178 });
179
180 let raise_alloc_err = *RAISE_ALLOC_ERROR;
182
183 file.lock_exclusive().unwrap();
184 if let Err(e) = file.allocate(remote_metadata.size) {
185 let msg = format!(
186 "failed to reserve {} bytes on disk to download uri = {}: {:?}",
187 remote_metadata.size,
188 self.uri.as_ref(),
189 e
190 );
191
192 if raise_alloc_err == Some(true)
193 || (raise_alloc_err.is_none() && file.allocate(1).is_ok())
194 {
195 polars_bail!(ComputeError: msg)
196 } else if config::verbose() {
197 eprintln!("[file_cache]: warning: {}", msg)
198 }
199 }
200 }
201 self.file_fetcher.fetch(data_file_path)?;
202
203 #[cfg(target_family = "unix")]
205 if !self.file_fetcher.fetches_as_symlink() {
206 let mut perms = std::fs::metadata(data_file_path.clone())
207 .unwrap()
208 .permissions();
209 perms.set_readonly(true);
210 std::fs::set_permissions(data_file_path, perms).unwrap();
211 }
212
213 let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
214 let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
215 let local_size = data_file_metadata.len();
216
217 if local_size != remote_metadata.size {
218 polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
219 }
220
221 let mut metadata = metadata;
222 let metadata = Arc::make_mut(&mut metadata);
223 metadata.local_last_modified = local_last_modified;
224 metadata.local_size = local_size;
225 metadata.remote_version = remote_metadata.version.clone();
226
227 if let Err(e) = metadata.compare_local_state(data_file_path) {
228 panic!("metadata mismatch after file fetch: {}", e);
229 }
230
231 let data_file = finish_open(data_file_path, metadata_file);
232
233 metadata_file.set_len(0).unwrap();
234 metadata_file.seek(SeekFrom::Start(0)).unwrap();
235 metadata
236 .try_write(&mut **metadata_file)
237 .map_err(to_compute_err)?;
238
239 Ok(data_file)
240 }
241
242 fn try_get_metadata<F: FileLockAnyGuard>(
245 &mut self,
246 metadata_file: &mut F,
247 _cache_guard: &cache_lock::GlobalFileCacheGuardAny,
248 ) -> PolarsResult<Arc<EntryMetadata>> {
249 let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
250 let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
251
252 for _ in 0..2 {
253 if let Some(ref cached) = self.cached_data {
254 if cached.last_modified == last_modified {
255 if cached.metadata.ttl != ttl {
256 polars_bail!(ComputeError: "TTL mismatch");
257 }
258
259 if cached.metadata.uri != self.uri {
260 unimplemented!(
261 "hash collision: uri1 = {}, uri2 = {}, hash = {}",
262 cached.metadata.uri,
263 self.uri,
264 self.uri_hash,
265 );
266 }
267
268 return Ok(cached.metadata.clone());
269 }
270 }
271
272 self.cached_data = None;
274
275 let mut metadata =
276 EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
277
278 if metadata.ttl != ttl {
282 if F::IS_EXCLUSIVE {
283 metadata.ttl = ttl;
284 metadata_file.set_len(0).unwrap();
285 metadata_file.seek(SeekFrom::Start(0)).unwrap();
286 metadata
287 .try_write(&mut **metadata_file)
288 .map_err(to_compute_err)?;
289 } else {
290 polars_bail!(ComputeError: "TTL mismatch");
291 }
292 }
293
294 let metadata = Arc::new(metadata);
295 let data_file_path = get_data_file_path(
296 self.path_prefix.to_str().unwrap().as_bytes(),
297 self.uri_hash.as_bytes(),
298 &metadata.remote_version,
299 );
300 self.cached_data = Some(CachedData {
301 last_modified,
302 metadata,
303 data_file_path,
304 });
305 }
306
307 unreachable!();
308 }
309
310 fn get_cached_data_file_path(&self) -> &Path {
313 &self.cached_data.as_ref().unwrap().data_file_path
314 }
315}
316
317impl FileCacheEntry {
318 pub(crate) fn new(
319 uri: Arc<str>,
320 uri_hash: String,
321 path_prefix: Arc<Path>,
322 file_fetcher: Arc<dyn FileFetcher>,
323 file_cache_ttl: u64,
324 ) -> Self {
325 let metadata = FileLock::from(get_metadata_file_path(
326 path_prefix.to_str().unwrap().as_bytes(),
327 uri_hash.as_bytes(),
328 ));
329
330 debug_assert!(
331 Arc::ptr_eq(&uri, file_fetcher.get_uri()),
332 "impl error: entry uri != file_fetcher uri"
333 );
334
335 let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
336
337 Self(EntryData {
338 uri: uri.clone(),
339 inner: Mutex::new(Inner {
340 uri,
341 uri_hash,
342 path_prefix,
343 metadata,
344 cached_data: None,
345 ttl: ttl.clone(),
346 file_fetcher,
347 }),
348 ttl,
349 })
350 }
351
352 pub fn uri(&self) -> &Arc<str> {
353 &self.0.uri
354 }
355
356 pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
360 self.0.inner.lock().unwrap().try_open_assume_latest()
361 }
362
363 pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
366 self.0.inner.lock().unwrap().try_open_check_latest()
367 }
368
369 pub fn update_ttl(&self, ttl: u64) {
370 self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
371 }
372}
373
374fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
375 let file = {
376 #[cfg(not(target_family = "windows"))]
377 {
378 std::fs::OpenOptions::new()
379 .read(true)
380 .open(data_file_path)
381 .unwrap()
382 }
383 #[cfg(target_family = "windows")]
385 {
386 std::fs::OpenOptions::new()
387 .read(true)
388 .write(true)
389 .open(data_file_path)
390 .unwrap()
391 }
392 };
393 update_last_accessed(&file);
394 if FileExt::try_lock_shared(&file).is_err() {
395 panic!(
396 "finish_open: could not acquire shared lock on data file at {}",
397 data_file_path.to_str().unwrap()
398 );
399 }
400 file
401}
402
403fn get_data_file_path(
405 path_prefix: &[u8],
406 uri_hash: &[u8],
407 remote_version: &FileVersion,
408) -> PathBuf {
409 let owned;
410 let path = [
411 path_prefix,
412 &[b'/', DATA_PREFIX, b'/'],
413 uri_hash,
414 match remote_version {
415 FileVersion::Timestamp(v) => {
416 owned = Some(format!("{:013x}", v));
417 owned.as_deref().unwrap()
418 },
419 FileVersion::ETag(v) => v.as_str(),
420 FileVersion::Uninitialized => panic!("impl error: version not initialized"),
421 }
422 .as_bytes(),
423 ]
424 .concat();
425 PathBuf::from(String::from_utf8(path).unwrap())
426}
427
428fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
430 let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
431 PathBuf::from(String::from_utf8(bytes).unwrap())
432}