polars_io/file_cache/
file_fetcher.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use std::sync::Arc;

use polars_error::{PolarsError, PolarsResult};

use super::metadata::FileVersion;
use super::utils::last_modified_u64;
use crate::cloud::PolarsObjectStore;
use crate::pl_async;

pub trait FileFetcher: Send + Sync {
    fn get_uri(&self) -> &Arc<str>;
    fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata>;
    /// Fetches the object to a `local_path`.
    fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()>;
    fn fetches_as_symlink(&self) -> bool;
}

pub struct RemoteMetadata {
    pub size: u64,
    pub(super) version: FileVersion,
}

/// A struct that fetches data from local disk and stores it into the `cache`.
/// Mostly used for debugging, it only ever gets called if `POLARS_FORCE_ASYNC` is set.
pub(super) struct LocalFileFetcher {
    uri: Arc<str>,
    path: Box<std::path::Path>,
}

impl LocalFileFetcher {
    pub(super) fn from_uri(uri: Arc<str>) -> Self {
        let path = std::path::PathBuf::from(uri.as_ref()).into_boxed_path();
        debug_assert_eq!(
            path,
            std::fs::canonicalize(&path).unwrap().into_boxed_path()
        );

        Self { uri, path }
    }
}

impl FileFetcher for LocalFileFetcher {
    fn get_uri(&self) -> &Arc<str> {
        &self.uri
    }

    fn fetches_as_symlink(&self) -> bool {
        #[cfg(target_family = "unix")]
        {
            true
        }
        #[cfg(not(target_family = "unix"))]
        {
            false
        }
    }

    fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
        let metadata = std::fs::metadata(&self.path).map_err(PolarsError::from)?;

        Ok(RemoteMetadata {
            size: metadata.len(),
            version: FileVersion::Timestamp(last_modified_u64(&metadata)),
        })
    }

    fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
        #[cfg(target_family = "unix")]
        {
            std::os::unix::fs::symlink(&self.path, local_path).map_err(PolarsError::from)
        }
        #[cfg(not(target_family = "unix"))]
        {
            std::fs::copy(&self.path, local_path).map_err(PolarsError::from)?;
            Ok(())
        }
    }
}

pub(super) struct CloudFileFetcher {
    pub(super) uri: Arc<str>,
    pub(super) cloud_path: object_store::path::Path,
    pub(super) object_store: PolarsObjectStore,
}

impl FileFetcher for CloudFileFetcher {
    fn get_uri(&self) -> &Arc<str> {
        &self.uri
    }

    fn fetches_as_symlink(&self) -> bool {
        false
    }

    fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
        let metadata = pl_async::get_runtime()
            .block_on_potential_spawn(self.object_store.head(&self.cloud_path))?;

        Ok(RemoteMetadata {
            size: metadata.size as u64,
            version: metadata
                .e_tag
                .map(|x| FileVersion::ETag(blake3::hash(x.as_bytes()).to_hex()[..32].to_string()))
                .unwrap_or_else(|| {
                    FileVersion::Timestamp(metadata.last_modified.timestamp_millis() as u64)
                }),
        })
    }

    fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
        pl_async::get_runtime().block_on_potential_spawn(async {
            let file = &mut tokio::fs::OpenOptions::new()
                .write(true)
                .truncate(true)
                .open(local_path)
                .await
                .map_err(PolarsError::from)?;

            self.object_store.download(&self.cloud_path, file).await
        })
    }
}