polars_io/file_cache/
utils.rs

1use std::sync::{Arc, LazyLock};
2use std::time::UNIX_EPOCH;
3
4use polars_error::{PolarsError, PolarsResult};
5use polars_utils::pl_path::{CloudScheme, PlRefPath};
6
7use super::cache::{FILE_CACHE, get_env_file_cache_ttl};
8use super::entry::FileCacheEntry;
9use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
10use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};
11use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init};
12
13pub static FILE_CACHE_PREFIX: LazyLock<PlRefPath> = LazyLock::new(|| {
14    let path = PlRefPath::try_from_path(&POLARS_TEMP_DIR_BASE_PATH.join("file-cache/")).unwrap();
15
16    if let Err(err) = ensure_directory_init(path.as_ref()) {
17        panic!(
18            "failed to create file cache directory: path = {}, err = {}",
19            path, err
20        );
21    }
22
23    path
24});
25
26pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
27    u64::try_from(
28        metadata
29            .modified()
30            .unwrap()
31            .duration_since(UNIX_EPOCH)
32            .unwrap()
33            .as_millis(),
34    )
35    .unwrap()
36}
37
38pub(super) fn update_last_accessed(file: &std::fs::File) {
39    let file_metadata = file.metadata().unwrap();
40
41    if let Err(e) = file.set_times(
42        std::fs::FileTimes::new()
43            .set_modified(file_metadata.modified().unwrap())
44            .set_accessed(std::time::SystemTime::now()),
45    ) {
46        panic!("failed to update file last accessed time: {e}");
47    }
48}
49
50pub async fn init_entries_from_uri_list(
51    uri_list: impl ExactSizeIterator<Item = PlRefPath> + Send + 'static,
52    cloud_options: Option<&CloudOptions>,
53) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
54    init_entries_from_uri_list_impl(Box::new(uri_list), cloud_options).await
55}
56
57async fn init_entries_from_uri_list_impl(
58    uri_list: Box<dyn ExactSizeIterator<Item = PlRefPath> + Send + 'static>,
59    cloud_options: Option<&CloudOptions>,
60) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
61    #[allow(clippy::len_zero)]
62    if uri_list.len() == 0 {
63        return Ok(Default::default());
64    }
65
66    let mut uri_list = uri_list.peekable();
67
68    let first_uri = uri_list.peek().unwrap().clone();
69
70    let file_cache_ttl = cloud_options
71        .map(|x| x.file_cache_ttl)
72        .unwrap_or_else(get_env_file_cache_ttl);
73
74    if first_uri.has_scheme() {
75        let shared_object_store = if !matches!(
76            first_uri.scheme(),
77            Some(CloudScheme::Http | CloudScheme::Https) // Object stores for http are tied to the path.
78        ) {
79            let (_, object_store) = build_object_store(first_uri, cloud_options, false).await?;
80            Some(object_store)
81        } else {
82            None
83        };
84
85        futures::future::try_join_all(uri_list.map(|uri| {
86            let shared_object_store = shared_object_store.clone();
87
88            async move {
89                let object_store = if let Some(shared_object_store) = shared_object_store.clone() {
90                    shared_object_store
91                } else {
92                    let (_, object_store) =
93                        build_object_store(uri.clone(), cloud_options, false).await?;
94                    object_store
95                };
96
97                FILE_CACHE.init_entry(
98                    uri.clone(),
99                    &|| {
100                        let CloudLocation { prefix, .. } =
101                            CloudLocation::new(uri.clone(), false).unwrap();
102                        let cloud_path = object_path_from_str(&prefix)?;
103                        let object_store = object_store.clone();
104
105                        Ok(Arc::new(CloudFileFetcher {
106                            uri: uri.clone(),
107                            object_store,
108                            cloud_path,
109                        }))
110                    },
111                    file_cache_ttl,
112                )
113            }
114        }))
115        .await
116    } else {
117        let mut out = Vec::with_capacity(uri_list.len());
118        for uri in uri_list {
119            let uri = tokio::fs::canonicalize(uri.as_str()).await.map_err(|err| {
120                let msg = Some(format!("{}: {}", err, uri).into());
121                PolarsError::IO {
122                    error: err.into(),
123                    msg,
124                }
125            })?;
126            let uri = PlRefPath::try_from_pathbuf(uri)?;
127
128            out.push(FILE_CACHE.init_entry(
129                uri.clone(),
130                &|| Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),
131                file_cache_ttl,
132            )?)
133        }
134        Ok(out)
135    }
136}