polars_io/file_cache/
utils.rs

1use std::path::Path;
2use std::sync::{Arc, LazyLock};
3use std::time::UNIX_EPOCH;
4
5use polars_error::{PolarsError, PolarsResult};
6
7use super::cache::{FILE_CACHE, get_env_file_cache_ttl};
8use super::entry::FileCacheEntry;
9use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
10use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};
11use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init, is_cloud_url};
12use crate::pl_async;
13
14pub static FILE_CACHE_PREFIX: LazyLock<Box<Path>> = LazyLock::new(|| {
15    let path = POLARS_TEMP_DIR_BASE_PATH
16        .join("file-cache/")
17        .into_boxed_path();
18
19    if let Err(err) = ensure_directory_init(path.as_ref()) {
20        panic!(
21            "failed to create file cache directory: path = {}, err = {}",
22            path.to_str().unwrap(),
23            err
24        );
25    }
26
27    path
28});
29
30pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
31    metadata
32        .modified()
33        .unwrap()
34        .duration_since(UNIX_EPOCH)
35        .unwrap()
36        .as_millis() as u64
37}
38
39pub(super) fn update_last_accessed(file: &std::fs::File) {
40    let file_metadata = file.metadata().unwrap();
41
42    if let Err(e) = file.set_times(
43        std::fs::FileTimes::new()
44            .set_modified(file_metadata.modified().unwrap())
45            .set_accessed(std::time::SystemTime::now()),
46    ) {
47        panic!("failed to update file last accessed time: {}", e);
48    }
49}
50
51pub fn init_entries_from_uri_list(
52    uri_list: &[Arc<str>],
53    cloud_options: Option<&CloudOptions>,
54) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
55    if uri_list.is_empty() {
56        return Ok(Default::default());
57    }
58
59    let first_uri = uri_list.first().unwrap().as_ref();
60
61    let file_cache_ttl = cloud_options
62        .map(|x| x.file_cache_ttl)
63        .unwrap_or_else(get_env_file_cache_ttl);
64
65    if is_cloud_url(first_uri) {
66        let object_stores = pl_async::get_runtime().block_in_place_on(async {
67            futures::future::try_join_all(
68                (0..if first_uri.starts_with("http") {
69                    // Object stores for http are tied to the path.
70                    uri_list.len()
71                } else {
72                    1
73                })
74                    .map(|i| async move {
75                        let (_, object_store) =
76                            build_object_store(&uri_list[i], cloud_options, false).await?;
77                        PolarsResult::Ok(object_store)
78                    }),
79            )
80            .await
81        })?;
82
83        uri_list
84            .iter()
85            .enumerate()
86            .map(|(i, uri)| {
87                FILE_CACHE.init_entry(
88                    uri.clone(),
89                    || {
90                        let CloudLocation { prefix, .. } =
91                            CloudLocation::new(uri.as_ref(), false).unwrap();
92                        let cloud_path = object_path_from_str(&prefix)?;
93
94                        let object_store =
95                            object_stores[std::cmp::min(i, object_stores.len() - 1)].clone();
96                        let uri = uri.clone();
97
98                        Ok(Arc::new(CloudFileFetcher {
99                            uri,
100                            object_store,
101                            cloud_path,
102                        }))
103                    },
104                    file_cache_ttl,
105                )
106            })
107            .collect::<PolarsResult<Vec<_>>>()
108    } else {
109        uri_list
110            .iter()
111            .map(|uri| {
112                let uri = std::fs::canonicalize(uri.as_ref()).map_err(|err| {
113                    let msg = Some(format!("{}: {}", err, uri.as_ref()).into());
114                    PolarsError::IO {
115                        error: err.into(),
116                        msg,
117                    }
118                })?;
119                let uri = Arc::<str>::from(uri.to_str().unwrap());
120
121                FILE_CACHE.init_entry(
122                    uri.clone(),
123                    || Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),
124                    file_cache_ttl,
125                )
126            })
127            .collect::<PolarsResult<Vec<_>>>()
128    }
129}