1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use std::path::Path;
use std::sync::Arc;
use std::time::UNIX_EPOCH;

use once_cell::sync::Lazy;
use polars_error::{to_compute_err, PolarsError, PolarsResult};

use super::cache::FILE_CACHE;
use super::entry::FileCacheEntry;
use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
use crate::cloud::{build_object_store, CloudLocation, CloudOptions, PolarsObjectStore};
use crate::pl_async;
use crate::prelude::{is_cloud_url, POLARS_TEMP_DIR_BASE_PATH};

pub(super) static FILE_CACHE_PREFIX: Lazy<Box<Path>> = Lazy::new(|| {
    let path = POLARS_TEMP_DIR_BASE_PATH
        .join("file-cache/")
        .into_boxed_path();

    if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
        if !path.is_dir() {
            panic!("failed to create file cache directory: {}", err);
        }
    }

    path
});

pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
    metadata
        .modified()
        .unwrap()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_millis() as u64
}

pub(super) fn update_last_accessed(file: &std::fs::File) {
    let file_metadata = file.metadata().unwrap();

    if let Err(e) = file.set_times(
        std::fs::FileTimes::new()
            .set_modified(file_metadata.modified().unwrap())
            .set_accessed(std::time::SystemTime::now()),
    ) {
        panic!("failed to update file last accessed time: {}", e);
    }
}

pub fn init_entries_from_uri_list<A: AsRef<[Arc<str>]>>(
    uri_list: A,
    cloud_options: Option<&CloudOptions>,
) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
    let uri_list = uri_list.as_ref();

    if uri_list.is_empty() {
        return Ok(Default::default());
    }

    let first_uri = uri_list.first().unwrap().as_ref();

    if is_cloud_url(first_uri) {
        let (_, object_store) = pl_async::get_runtime()
            .block_on_potential_spawn(build_object_store(first_uri, cloud_options))?;
        let object_store = PolarsObjectStore::new(object_store);

        uri_list
            .iter()
            .map(|uri| {
                FILE_CACHE.init_entry(uri.clone(), || {
                    let CloudLocation {
                        prefix, expansion, ..
                    } = CloudLocation::new(uri.as_ref()).unwrap();

                    let cloud_path = {
                        assert!(expansion.is_none(), "path should not contain wildcards");
                        object_store::path::Path::from_url_path(prefix).map_err(to_compute_err)?
                    };

                    let object_store = object_store.clone();
                    let uri = uri.clone();

                    Ok(Arc::new(CloudFileFetcher {
                        uri,
                        object_store,
                        cloud_path,
                    }))
                })
            })
            .collect::<PolarsResult<Vec<_>>>()
    } else {
        uri_list
            .iter()
            .map(|uri| {
                let uri = std::fs::canonicalize(uri.as_ref()).map_err(|err| {
                    let msg = Some(format!("{}: {}", err, uri.as_ref()).into());
                    PolarsError::IO {
                        error: err.into(),
                        msg,
                    }
                })?;
                let uri = Arc::<str>::from(uri.to_str().unwrap());

                FILE_CACHE.init_entry(uri.clone(), || {
                    Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone())))
                })
            })
            .collect::<PolarsResult<Vec<_>>>()
    }
}