polars_io/cloud/
object_store_setup.rs

1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_str::PlSmallStr;
9use polars_utils::plpath::{PlPath, PlPathRef};
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::CloudConfig;
15
16/// Object stores must be cached. Every object-store will do DNS lookups and
17/// get rate limited when querying the DNS (can take up to 5s).
18/// Other reasons are connection pools that must be shared between as much as possible.
19#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21    LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(
25    feature: &str,
26    cloud_type: &CloudType,
27) -> PolarsResult<Arc<dyn ObjectStore>> {
28    polars_bail!(
29        ComputeError:
30        "feature '{}' must be enabled in order to use '{:?}' cloud urls",
31        feature,
32        cloud_type,
33    );
34}
35
36/// Get the key of a url for object store registration.
37fn path_and_creds_to_key(path: PlPathRef<'_>, options: Option<&CloudOptions>) -> Vec<u8> {
38    // We include credentials as they can expire, so users will send new credentials for the same url.
39    let cloud_options = options.map(
40        |CloudOptions {
41             // Destructure to ensure this breaks if anything changes.
42             max_retries,
43             #[cfg(feature = "file_cache")]
44             file_cache_ttl,
45             config,
46             #[cfg(feature = "cloud")]
47             credential_provider,
48         }| {
49            CloudOptions2 {
50                max_retries: *max_retries,
51                #[cfg(feature = "file_cache")]
52                file_cache_ttl: *file_cache_ttl,
53                config: config.clone(),
54                #[cfg(feature = "cloud")]
55                credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
56            }
57        },
58    );
59
60    let cache_key = CacheKey {
61        url_base: format_pl_smallstr!("{}", &path.to_str()[..path.authority_end_position()]),
62        cloud_options,
63    };
64
65    verbose_print_sensitive(|| {
66        format!(
67            "object store cache key for path at '{}': {:?}",
68            path.to_str(),
69            &cache_key
70        )
71    });
72
73    return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
74
75    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
76    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
77    struct CacheKey {
78        url_base: PlSmallStr,
79        cloud_options: Option<CloudOptions2>,
80    }
81
82    /// Variant of CloudOptions for serializing to a cache key. The credential
83    /// provider is replaced by the function address.
84    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
85    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
86    struct CloudOptions2 {
87        max_retries: usize,
88        #[cfg(feature = "file_cache")]
89        file_cache_ttl: u64,
90        config: Option<CloudConfig>,
91        #[cfg(feature = "cloud")]
92        credential_provider: usize,
93    }
94}
95
96/// Construct an object_store `Path` from a string without any encoding/decoding.
97pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
98    object_store::path::Path::parse(path).map_err(to_compute_err)
99}
100
101#[derive(Debug, Clone)]
102pub(crate) struct PolarsObjectStoreBuilder {
103    path: PlPath,
104    cloud_type: CloudType,
105    options: Option<CloudOptions>,
106}
107
108impl PolarsObjectStoreBuilder {
109    pub(super) async fn build_impl(
110        &self,
111        // Whether to clear cached credentials for Python credential providers.
112        clear_cached_credentials: bool,
113    ) -> PolarsResult<Arc<dyn ObjectStore>> {
114        let options = self
115            .options
116            .as_ref()
117            .unwrap_or_else(|| CloudOptions::default_static_ref());
118
119        let store = match self.cloud_type {
120            CloudType::Aws => {
121                #[cfg(feature = "aws")]
122                {
123                    let store = options
124                        .build_aws(self.path.to_str(), clear_cached_credentials)
125                        .await?;
126                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
127                }
128                #[cfg(not(feature = "aws"))]
129                return err_missing_feature("aws", &self.cloud_type);
130            },
131            CloudType::Gcp => {
132                #[cfg(feature = "gcp")]
133                {
134                    let store = options.build_gcp(self.path.to_str(), clear_cached_credentials)?;
135
136                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
137                }
138                #[cfg(not(feature = "gcp"))]
139                return err_missing_feature("gcp", &self.cloud_type);
140            },
141            CloudType::Azure => {
142                {
143                    #[cfg(feature = "azure")]
144                    {
145                        let store =
146                            options.build_azure(self.path.to_str(), clear_cached_credentials)?;
147                        Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
148                    }
149                }
150                #[cfg(not(feature = "azure"))]
151                return err_missing_feature("azure", &self.cloud_type);
152            },
153            CloudType::File => {
154                let local = LocalFileSystem::new();
155                Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
156            },
157            CloudType::Http => {
158                {
159                    #[cfg(feature = "http")]
160                    {
161                        let store = options.build_http(self.path.to_str())?;
162                        PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
163                    }
164                }
165                #[cfg(not(feature = "http"))]
166                return err_missing_feature("http", &cloud_location.scheme);
167            },
168            CloudType::Hf => panic!("impl error: unresolved hf:// path"),
169        }?;
170
171        Ok(store)
172    }
173
174    /// Note: Use `build_impl` for a non-caching version.
175    pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
176        let opt_cache_key = match &self.cloud_type {
177            CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(path_and_creds_to_key(
178                self.path.as_ref(),
179                self.options.as_ref(),
180            )),
181            CloudType::File | CloudType::Http | CloudType::Hf => None,
182        };
183
184        let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
185            let cache = OBJECT_STORE_CACHE.read().await;
186
187            if let Some(store) = cache.get(cache_key) {
188                return Ok(store.clone());
189            }
190
191            drop(cache);
192
193            let cache = OBJECT_STORE_CACHE.write().await;
194
195            if let Some(store) = cache.get(cache_key) {
196                return Ok(store.clone());
197            }
198
199            Some(cache)
200        } else {
201            None
202        };
203
204        let store = self.build_impl(false).await?;
205        let store = PolarsObjectStore::new_from_inner(store, self);
206
207        if let Some(mut cache) = opt_cache_write_guard {
208            // Clear the cache if we surpass a certain amount of buckets.
209            if cache.len() >= 8 {
210                if config::verbose() {
211                    eprintln!(
212                        "build_object_store: clearing store cache (cache.len(): {})",
213                        cache.len()
214                    );
215                }
216                cache.clear()
217            }
218
219            cache.insert(opt_cache_key.unwrap(), store.clone());
220        }
221
222        Ok(store)
223    }
224
225    pub(crate) fn is_azure(&self) -> bool {
226        matches!(&self.cloud_type, CloudType::Azure)
227    }
228}
229
230/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
231pub async fn build_object_store(
232    path: PlPathRef<'_>,
233    #[cfg_attr(
234        not(any(feature = "aws", feature = "gcp", feature = "azure")),
235        allow(unused_variables)
236    )]
237    options: Option<&CloudOptions>,
238    glob: bool,
239) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
240    let path = path.to_absolute_path().unwrap_or_else(|| path.into_owned());
241
242    let cloud_location = CloudLocation::new(path.as_ref(), glob)?;
243    let cloud_type = path.as_ref().scheme().map_or(CloudType::File, |scheme| {
244        CloudType::from_cloud_scheme(&scheme)
245    });
246
247    let store = PolarsObjectStoreBuilder {
248        path,
249        cloud_type,
250        options: options.cloned(),
251    }
252    .build()
253    .await?;
254
255    Ok((cloud_location, store))
256}
257
258mod test {
259    #[test]
260    fn test_object_path_from_str() {
261        use super::object_path_from_str;
262
263        let path = "%25";
264        let out = object_path_from_str(path).unwrap();
265
266        assert_eq!(out.as_ref(), path);
267    }
268}