polars_io/cloud/
object_store_setup.rs

1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_str::PlSmallStr;
9use polars_utils::{format_pl_smallstr, pl_serialize};
10use tokio::sync::RwLock;
11use url::Url;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore, parse_url};
14use crate::cloud::CloudConfig;
15
16/// Object stores must be cached. Every object-store will do DNS lookups and
17/// get rate limited when querying the DNS (can take up to 5s).
18/// Other reasons are connection pools that must be shared between as much as possible.
19#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21    LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult<Arc<dyn ObjectStore>> {
25    polars_bail!(
26        ComputeError:
27        "feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme,
28    );
29}
30
31/// Get the key of a url for object store registration.
32fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> Vec<u8> {
33    // We include credentials as they can expire, so users will send new credentials for the same url.
34    let cloud_options = options.map(
35        |CloudOptions {
36             // Destructure to ensure this breaks if anything changes.
37             max_retries,
38             #[cfg(feature = "file_cache")]
39             file_cache_ttl,
40             config,
41             #[cfg(feature = "cloud")]
42             credential_provider,
43         }| {
44            CloudOptions2 {
45                max_retries: *max_retries,
46                #[cfg(feature = "file_cache")]
47                file_cache_ttl: *file_cache_ttl,
48                config: config.clone(),
49                #[cfg(feature = "cloud")]
50                credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
51            }
52        },
53    );
54
55    let cache_key = CacheKey {
56        url_base: format_pl_smallstr!(
57            "{}",
58            &url[url::Position::BeforeScheme..url::Position::AfterPort]
59        ),
60        cloud_options,
61    };
62
63    verbose_print_sensitive(|| format!("object store cache key: {} {:?}", url, &cache_key));
64
65    return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
66
67    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
68    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
69    struct CacheKey {
70        url_base: PlSmallStr,
71        cloud_options: Option<CloudOptions2>,
72    }
73
74    /// Variant of CloudOptions for serializing to a cache key. The credential
75    /// provider is replaced by the function address.
76    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
77    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
78    struct CloudOptions2 {
79        max_retries: usize,
80        #[cfg(feature = "file_cache")]
81        file_cache_ttl: u64,
82        config: Option<CloudConfig>,
83        #[cfg(feature = "cloud")]
84        credential_provider: usize,
85    }
86}
87
88/// Construct an object_store `Path` from a string without any encoding/decoding.
89pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
90    object_store::path::Path::parse(path).map_err(to_compute_err)
91}
92
93#[derive(Debug, Clone)]
94pub(crate) struct PolarsObjectStoreBuilder {
95    url: PlSmallStr,
96    parsed_url: Url,
97    #[allow(unused)]
98    scheme: PlSmallStr,
99    cloud_type: CloudType,
100    options: Option<CloudOptions>,
101}
102
103impl PolarsObjectStoreBuilder {
104    pub(super) async fn build_impl(
105        &self,
106        // Whether to clear cached credentials for Python credential providers.
107        clear_cached_credentials: bool,
108    ) -> PolarsResult<Arc<dyn ObjectStore>> {
109        let options = self
110            .options
111            .as_ref()
112            .unwrap_or_else(|| CloudOptions::default_static_ref());
113
114        let store = match self.cloud_type {
115            CloudType::Aws => {
116                #[cfg(feature = "aws")]
117                {
118                    let store = options
119                        .build_aws(&self.url, clear_cached_credentials)
120                        .await?;
121                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
122                }
123                #[cfg(not(feature = "aws"))]
124                return err_missing_feature("aws", &self.scheme);
125            },
126            CloudType::Gcp => {
127                #[cfg(feature = "gcp")]
128                {
129                    let store = options.build_gcp(&self.url, clear_cached_credentials)?;
130                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
131                }
132                #[cfg(not(feature = "gcp"))]
133                return err_missing_feature("gcp", &self.scheme);
134            },
135            CloudType::Azure => {
136                {
137                    #[cfg(feature = "azure")]
138                    {
139                        let store = options.build_azure(&self.url, clear_cached_credentials)?;
140                        Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
141                    }
142                }
143                #[cfg(not(feature = "azure"))]
144                return err_missing_feature("azure", &self.scheme);
145            },
146            CloudType::File => {
147                let local = LocalFileSystem::new();
148                Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
149            },
150            CloudType::Http => {
151                {
152                    #[cfg(feature = "http")]
153                    {
154                        let store = options.build_http(&self.url)?;
155                        PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
156                    }
157                }
158                #[cfg(not(feature = "http"))]
159                return err_missing_feature("http", &cloud_location.scheme);
160            },
161            CloudType::Hf => panic!("impl error: unresolved hf:// path"),
162        }?;
163
164        Ok(store)
165    }
166
167    /// Note: Use `build_impl` for a non-caching version.
168    pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
169        let opt_cache_key = match &self.cloud_type {
170            CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(url_and_creds_to_key(
171                &self.parsed_url,
172                self.options.as_ref(),
173            )),
174            CloudType::File | CloudType::Http | CloudType::Hf => None,
175        };
176
177        let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
178            let cache = OBJECT_STORE_CACHE.read().await;
179
180            if let Some(store) = cache.get(cache_key) {
181                return Ok(store.clone());
182            }
183
184            drop(cache);
185
186            let cache = OBJECT_STORE_CACHE.write().await;
187
188            if let Some(store) = cache.get(cache_key) {
189                return Ok(store.clone());
190            }
191
192            Some(cache)
193        } else {
194            None
195        };
196
197        let store = self.build_impl(false).await?;
198        let store = PolarsObjectStore::new_from_inner(store, self);
199
200        if let Some(mut cache) = opt_cache_write_guard {
201            // Clear the cache if we surpass a certain amount of buckets.
202            if cache.len() >= 8 {
203                if config::verbose() {
204                    eprintln!(
205                        "build_object_store: clearing store cache (cache.len(): {})",
206                        cache.len()
207                    );
208                }
209                cache.clear()
210            }
211
212            cache.insert(opt_cache_key.unwrap(), store.clone());
213        }
214
215        Ok(store)
216    }
217
218    pub(crate) fn is_azure(&self) -> bool {
219        matches!(&self.cloud_type, CloudType::Azure)
220    }
221}
222
223/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
224pub async fn build_object_store(
225    url: &str,
226    #[cfg_attr(
227        not(any(feature = "aws", feature = "gcp", feature = "azure")),
228        allow(unused_variables)
229    )]
230    options: Option<&CloudOptions>,
231    glob: bool,
232) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
233    let parsed = parse_url(url).map_err(to_compute_err)?;
234    let cloud_location = CloudLocation::from_url(&parsed, glob)?;
235    let cloud_type = CloudType::from_url(&parsed)?;
236
237    let store = PolarsObjectStoreBuilder {
238        url: url.into(),
239        parsed_url: parsed,
240        scheme: cloud_location.scheme.as_str().into(),
241        cloud_type,
242        options: options.cloned(),
243    }
244    .build()
245    .await?;
246
247    Ok((cloud_location, store))
248}
249
250mod test {
251    #[test]
252    fn test_object_path_from_str() {
253        use super::object_path_from_str;
254
255        let path = "%25";
256        let out = object_path_from_str(path).unwrap();
257
258        assert_eq!(out.as_ref(), path);
259    }
260}