polars_io/cloud/
object_store_setup.rs

1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_path::{PlPath, PlRefPath};
9use polars_utils::pl_str::PlSmallStr;
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::{CloudConfig, CloudRetryConfig};
15
16/// Object stores must be cached. Every object-store will do DNS lookups and
17/// get rate limited when querying the DNS (can take up to 5s).
18/// Other reasons are connection pools that must be shared between as much as possible.
19#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21    LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(
25    feature: &str,
26    cloud_type: &CloudType,
27) -> PolarsResult<Arc<dyn ObjectStore>> {
28    polars_bail!(
29        ComputeError:
30        "feature '{}' must be enabled in order to use '{:?}' cloud urls",
31        feature,
32        cloud_type,
33    );
34}
35
36/// Get the key of a url for object store registration.
37fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> Vec<u8> {
38    // We include credentials as they can expire, so users will send new credentials for the same url.
39    let cloud_options = options.map(
40        |CloudOptions {
41             // Destructure to ensure this breaks if anything changes.
42             #[cfg(feature = "file_cache")]
43             file_cache_ttl,
44             config,
45             retry_config,
46             #[cfg(feature = "cloud")]
47             credential_provider,
48         }| {
49            CloudOptionsKey {
50                #[cfg(feature = "file_cache")]
51                file_cache_ttl: *file_cache_ttl,
52                config: config.clone(),
53                retry_config: *retry_config,
54                #[cfg(feature = "cloud")]
55                credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
56            }
57        },
58    );
59
60    let cache_key = CacheKey {
61        url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),
62        cloud_options,
63    };
64
65    verbose_print_sensitive(|| {
66        format!(
67            "object store cache key for path at '{}': {:?}",
68            path, &cache_key
69        )
70    });
71
72    return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
73
74    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
75    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
76    struct CacheKey {
77        url_base: PlSmallStr,
78        cloud_options: Option<CloudOptionsKey>,
79    }
80
81    /// Variant of CloudOptions for serializing to a cache key. The credential
82    /// provider is replaced by the function address.
83    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
84    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
85    struct CloudOptionsKey {
86        #[cfg(feature = "file_cache")]
87        file_cache_ttl: u64,
88        config: Option<CloudConfig>,
89        retry_config: CloudRetryConfig,
90        #[cfg(feature = "cloud")]
91        credential_provider: usize,
92    }
93}
94
95/// Construct an object_store `Path` from a string without any encoding/decoding.
96pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
97    object_store::path::Path::parse(path).map_err(to_compute_err)
98}
99
100#[derive(Debug, Clone)]
101pub(crate) struct PolarsObjectStoreBuilder {
102    path: PlRefPath,
103    cloud_type: CloudType,
104    options: Option<CloudOptions>,
105}
106
107impl PolarsObjectStoreBuilder {
108    pub(super) fn path(&self) -> &PlRefPath {
109        &self.path
110    }
111
112    pub(super) async fn build_impl(
113        &self,
114        // Whether to clear cached credentials for Python credential providers.
115        clear_cached_credentials: bool,
116    ) -> PolarsResult<Arc<dyn ObjectStore>> {
117        let options = self
118            .options
119            .as_ref()
120            .unwrap_or_else(|| CloudOptions::default_static_ref());
121
122        if let Some(options) = &self.options
123            && verbose()
124        {
125            eprintln!(
126                "build object-store: file_cache_ttl: {}",
127                options.file_cache_ttl
128            )
129        }
130
131        let store = match self.cloud_type {
132            CloudType::Aws => {
133                #[cfg(feature = "aws")]
134                {
135                    let store = options
136                        .build_aws(self.path.clone(), clear_cached_credentials)
137                        .await?;
138                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
139                }
140                #[cfg(not(feature = "aws"))]
141                return err_missing_feature("aws", &self.cloud_type);
142            },
143            CloudType::Gcp => {
144                #[cfg(feature = "gcp")]
145                {
146                    let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;
147
148                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
149                }
150                #[cfg(not(feature = "gcp"))]
151                return err_missing_feature("gcp", &self.cloud_type);
152            },
153            CloudType::Azure => {
154                {
155                    #[cfg(feature = "azure")]
156                    {
157                        let store =
158                            options.build_azure(self.path.clone(), clear_cached_credentials)?;
159                        Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
160                    }
161                }
162                #[cfg(not(feature = "azure"))]
163                return err_missing_feature("azure", &self.cloud_type);
164            },
165            CloudType::File => {
166                let local = LocalFileSystem::new();
167                Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
168            },
169            CloudType::Http => {
170                {
171                    #[cfg(feature = "http")]
172                    {
173                        let store = options.build_http(self.path.clone())?;
174                        PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
175                    }
176                }
177                #[cfg(not(feature = "http"))]
178                return err_missing_feature("http", &cloud_location.scheme);
179            },
180            CloudType::Hf => panic!("impl error: unresolved hf:// path"),
181        }?;
182
183        Ok(store)
184    }
185
186    /// Note: Use `build_impl` for a non-caching version.
187    pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
188        let opt_cache_key = match &self.cloud_type {
189            CloudType::Aws | CloudType::Gcp | CloudType::Azure => {
190                Some(path_and_creds_to_key(&self.path, self.options.as_ref()))
191            },
192            CloudType::File | CloudType::Http | CloudType::Hf => None,
193        };
194
195        let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
196            let cache = OBJECT_STORE_CACHE.read().await;
197
198            if let Some(store) = cache.get(cache_key) {
199                return Ok(store.clone());
200            }
201
202            drop(cache);
203
204            let cache = OBJECT_STORE_CACHE.write().await;
205
206            if let Some(store) = cache.get(cache_key) {
207                return Ok(store.clone());
208            }
209
210            Some(cache)
211        } else {
212            None
213        };
214
215        let store = self.build_impl(false).await?;
216        let store = PolarsObjectStore::new_from_inner(store, self);
217
218        if let Some(mut cache) = opt_cache_write_guard {
219            // Clear the cache if we surpass a certain amount of buckets.
220            if cache.len() >= 8 {
221                if config::verbose() {
222                    eprintln!(
223                        "build_object_store: clearing store cache (cache.len(): {})",
224                        cache.len()
225                    );
226                }
227                cache.clear()
228            }
229
230            cache.insert(opt_cache_key.unwrap(), store.clone());
231        }
232
233        Ok(store)
234    }
235
236    pub(crate) fn is_azure(&self) -> bool {
237        matches!(&self.cloud_type, CloudType::Azure)
238    }
239}
240
241/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
242pub async fn build_object_store(
243    path: PlRefPath,
244    #[cfg_attr(
245        not(any(feature = "aws", feature = "gcp", feature = "azure")),
246        allow(unused_variables)
247    )]
248    options: Option<&CloudOptions>,
249    glob: bool,
250) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
251    let path = path.to_absolute_path()?.into_owned();
252
253    let cloud_type = path
254        .scheme()
255        .map_or(CloudType::File, CloudType::from_cloud_scheme);
256    let cloud_location = CloudLocation::new(path.clone(), glob)?;
257
258    let store = PolarsObjectStoreBuilder {
259        path,
260        cloud_type,
261        options: options.cloned(),
262    }
263    .build()
264    .await?;
265
266    Ok((cloud_location, store))
267}
268
269mod test {
270    #[test]
271    fn test_object_path_from_str() {
272        use super::object_path_from_str;
273
274        let path = "%25";
275        let out = object_path_from_str(path).unwrap();
276
277        assert_eq!(out.as_ref(), path);
278    }
279}