polars_io/file_cache/
utils.rs

1use std::path::Path;
2use std::sync::{Arc, LazyLock};
3use std::time::UNIX_EPOCH;
4
5use polars_error::{PolarsError, PolarsResult};
6use polars_utils::plpath::{CloudScheme, PlPathRef};
7
8use super::cache::{FILE_CACHE, get_env_file_cache_ttl};
9use super::entry::FileCacheEntry;
10use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
11use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};
12use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init};
13use crate::pl_async;
14
15pub static FILE_CACHE_PREFIX: LazyLock<Box<Path>> = LazyLock::new(|| {
16    let path = POLARS_TEMP_DIR_BASE_PATH
17        .join("file-cache/")
18        .into_boxed_path();
19
20    if let Err(err) = ensure_directory_init(path.as_ref()) {
21        panic!(
22            "failed to create file cache directory: path = {}, err = {}",
23            path.to_str().unwrap(),
24            err
25        );
26    }
27
28    path
29});
30
31pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
32    u64::try_from(
33        metadata
34            .modified()
35            .unwrap()
36            .duration_since(UNIX_EPOCH)
37            .unwrap()
38            .as_millis(),
39    )
40    .unwrap()
41}
42
43pub(super) fn update_last_accessed(file: &std::fs::File) {
44    let file_metadata = file.metadata().unwrap();
45
46    if let Err(e) = file.set_times(
47        std::fs::FileTimes::new()
48            .set_modified(file_metadata.modified().unwrap())
49            .set_accessed(std::time::SystemTime::now()),
50    ) {
51        panic!("failed to update file last accessed time: {e}");
52    }
53}
54
55pub fn init_entries_from_uri_list(
56    mut uri_list: impl ExactSizeIterator<Item = Arc<str>>,
57    cloud_options: Option<&CloudOptions>,
58) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
59    init_entries_from_uri_list_impl(&mut uri_list, cloud_options)
60}
61
62fn init_entries_from_uri_list_impl(
63    uri_list: &mut dyn ExactSizeIterator<Item = Arc<str>>,
64    cloud_options: Option<&CloudOptions>,
65) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
66    #[expect(clippy::len_zero)]
67    if uri_list.len() == 0 {
68        return Ok(Default::default());
69    }
70
71    let mut uri_list = uri_list.peekable();
72
73    let first_uri = PlPathRef::new(uri_list.peek().unwrap().as_ref());
74
75    let file_cache_ttl = cloud_options
76        .map(|x| x.file_cache_ttl)
77        .unwrap_or_else(get_env_file_cache_ttl);
78
79    if first_uri.is_cloud_url() {
80        let shared_object_store = (!matches!(
81            first_uri.scheme(),
82            Some(CloudScheme::Http | CloudScheme::Https) // Object stores for http are tied to the path.
83        ))
84        .then(|| {
85            pl_async::get_runtime().block_in_place_on(async {
86                let (_, object_store) =
87                    build_object_store(first_uri.to_str(), cloud_options, false).await?;
88
89                PolarsResult::Ok(object_store)
90            })
91        })
92        .transpose()?;
93
94        pl_async::get_runtime().block_in_place_on(async {
95            futures::future::try_join_all(uri_list.map(|uri| {
96                let shared_object_store = shared_object_store.clone();
97
98                async move {
99                    let object_store =
100                        if let Some(shared_object_store) = shared_object_store.clone() {
101                            shared_object_store
102                        } else {
103                            let (_, object_store) =
104                                build_object_store(&uri, cloud_options, false).await?;
105                            object_store
106                        };
107
108                    FILE_CACHE.init_entry(
109                        uri.clone(),
110                        &|| {
111                            let CloudLocation { prefix, .. } =
112                                CloudLocation::new(uri.as_ref(), false).unwrap();
113                            let cloud_path = object_path_from_str(&prefix)?;
114
115                            let uri = uri.clone();
116                            let object_store = object_store.clone();
117
118                            Ok(Arc::new(CloudFileFetcher {
119                                uri,
120                                object_store,
121                                cloud_path,
122                            }))
123                        },
124                        file_cache_ttl,
125                    )
126                }
127            }))
128            .await
129        })
130    } else {
131        uri_list
132            .map(|uri| {
133                let uri = std::fs::canonicalize(uri.as_ref()).map_err(|err| {
134                    let msg = Some(format!("{}: {}", err, uri.as_ref()).into());
135                    PolarsError::IO {
136                        error: err.into(),
137                        msg,
138                    }
139                })?;
140                let uri = Arc::<str>::from(uri.to_str().unwrap());
141
142                FILE_CACHE.init_entry(
143                    uri.clone(),
144                    &|| Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),
145                    file_cache_ttl,
146                )
147            })
148            .collect::<PolarsResult<Vec<_>>>()
149    }
150}