Skip to main content

polars_io/cloud/
object_store_setup.rs

1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_path::{ALLOWED_EXT_SCHEMES, CloudScheme, PlPath, PlRefPath};
9use polars_utils::pl_str::PlSmallStr;
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::{CloudConfig, CloudRetryConfig};
15
16/// Object stores must be cached. Every object-store will do DNS lookups and
17/// get rate limited when querying the DNS (can take up to 5s).
18/// Other reasons are connection pools that must be shared between as much as possible.
19#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21    LazyLock::new(Default::default);
22
23/// Trait for external ObjectStore builder (e.g., for HDFS). Unstable.
24pub trait ExtObjectStoreBuilder {
25    /// Build new object_store.
26    fn build(
27        &self,
28        url: &PlRefPath,
29        options: Option<&CloudOptions>,
30    ) -> PolarsResult<Arc<dyn ObjectStore + Send + Sync>>;
31
32    /// Return a stable cache key for this store.
33    /// Defaults to `None`, which uses the default key (URL authority + serialised CloudOptions).
34    fn stable_cache_key(
35        &self,
36        _url: &PlRefPath,
37        _options: Option<&CloudOptions>,
38    ) -> Option<Vec<u8>> {
39        None
40    }
41}
42
43static EXT_OBJECT_STORE_BUILDER_REGISTRY: LazyLock<
44    std::sync::RwLock<PlHashMap<PlSmallStr, Arc<dyn ExtObjectStoreBuilder + Send + Sync>>>,
45> = LazyLock::new(Default::default);
46
47/// Register custom object_store builder for a given cloud scheme.
48/// Example: for 'hdfs://', the scheme is "hdfs".
49/// Rejects native cloud schemes (e.g. "s3").
50pub fn register_object_store_builder(
51    scheme: &str,
52    builder: Arc<dyn ExtObjectStoreBuilder + Send + Sync>,
53) -> PolarsResult<()> {
54    // Reject schemes already handled natively.
55    // TODO: allow shadowing of existing schemes.
56    if CloudScheme::is_native_str(scheme) {
57        polars_bail!(
58            InvalidOperation:
59            "cannot register object_store_builder for scheme '{}': \
60             this scheme is handled natively",
61            scheme
62        );
63    }
64
65    if !polars_utils::pl_path::ext_scheme_allowed(scheme) {
66        polars_bail!(
67            InvalidOperation:
68            "cannot register object_store_builder for scheme '{}': \
69             allowed external schemes are: {:?}",
70            scheme,
71            ALLOWED_EXT_SCHEMES
72        );
73    }
74
75    if polars_config::config().verbose() {
76        eprintln!(
77            "[ObjectStoreBuilderRegistry]: register object_store_builder for scheme '{scheme}'"
78        )
79    }
80
81    EXT_OBJECT_STORE_BUILDER_REGISTRY
82        .write()
83        .unwrap()
84        .insert(scheme.into(), builder);
85    Ok(())
86}
87
88pub fn deregister_object_store_builder(scheme: &str) {
89    if polars_config::config().verbose() {
90        eprintln!(
91            "[ObjectStoreBuilderRegistry]: deregister object_store_builder for scheme '{scheme}'"
92        )
93    }
94
95    EXT_OBJECT_STORE_BUILDER_REGISTRY
96        .write()
97        .unwrap()
98        .remove(scheme);
99}
100
101#[allow(dead_code)]
102fn err_missing_feature(
103    feature: &str,
104    cloud_type: &CloudType,
105) -> PolarsResult<Arc<dyn ObjectStore>> {
106    polars_bail!(
107        ComputeError:
108        "feature '{}' must be enabled in order to use '{:?}' cloud urls",
109        feature,
110        cloud_type,
111    );
112}
113
114/// Get the key of a url for object store registration.
115fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> PolarsResult<Vec<u8>> {
116    // We include credentials as they can expire, so users will send new credentials for the same url.
117
118    #[cfg(feature = "cloud")]
119    let credential_cache_key = CacheKeyBytes(
120        options
121            .and_then(|o| o.credential_provider.as_ref())
122            .map(|x| x.stable_cache_key())
123            .transpose()?
124            .unwrap_or_default(),
125    );
126
127    let cloud_options = options
128        .map(
129            |CloudOptions {
130                 // Destructure to ensure this breaks if anything changes.
131                 #[cfg(feature = "file_cache")]
132                 file_cache_ttl,
133                 config,
134                 retry_config,
135                 #[cfg(feature = "cloud")]
136                     credential_provider: _,
137             }|
138             -> PolarsResult<CloudOptionsKey> {
139                Ok(CloudOptionsKey {
140                    #[cfg(feature = "file_cache")]
141                    file_cache_ttl: *file_cache_ttl,
142                    config: config.clone(),
143                    retry_config: *retry_config,
144                    #[cfg(feature = "cloud")]
145                    credential_provider: credential_cache_key,
146                })
147            },
148        )
149        .transpose()?;
150
151    let cache_key = CacheKey {
152        url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),
153        cloud_options,
154    };
155
156    verbose_print_sensitive(|| {
157        format!(
158            "object store cache key for path at '{}': {:?}",
159            path, &cache_key
160        )
161    });
162
163    return pl_serialize::serialize_to_bytes::<_, false>(&cache_key);
164
165    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
166    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
167    struct CacheKey {
168        url_base: PlSmallStr,
169        cloud_options: Option<CloudOptionsKey>,
170    }
171
172    #[derive(Clone, PartialEq, Hash, Eq)]
173    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
174    struct CacheKeyBytes(Vec<u8>);
175
176    impl std::fmt::Debug for CacheKeyBytes {
177        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178            if self.0.is_empty() {
179                write!(f, "None")
180            } else {
181                for b in &self.0 {
182                    write!(f, "{:02x}", b)?;
183                }
184                Ok(())
185            }
186        }
187    }
188
189    /// Variant of CloudOptions for serializing to a cache key. The credential
190    /// provider is replaced by the function address.
191    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
192    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
193    struct CloudOptionsKey {
194        #[cfg(feature = "file_cache")]
195        file_cache_ttl: u64,
196        config: Option<CloudConfig>,
197        retry_config: CloudRetryConfig,
198        #[cfg(feature = "cloud")]
199        credential_provider: CacheKeyBytes,
200    }
201}
202
203/// Construct an object_store `Path` from a string without any encoding/decoding.
204pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
205    object_store::path::Path::parse(path).map_err(to_compute_err)
206}
207
208#[derive(Debug, Clone)]
209pub(crate) struct PolarsObjectStoreBuilder {
210    path: PlRefPath,
211    cloud_type: CloudType,
212    options: Option<CloudOptions>,
213}
214
215impl PolarsObjectStoreBuilder {
216    pub(super) fn path(&self) -> &PlRefPath {
217        &self.path
218    }
219
220    pub(super) async fn build_impl(
221        &self,
222        // Whether to clear cached credentials for Python credential providers.
223        clear_cached_credentials: bool,
224    ) -> PolarsResult<Arc<dyn ObjectStore>> {
225        let options = self
226            .options
227            .as_ref()
228            .unwrap_or_else(|| CloudOptions::default_static_ref());
229
230        if let Some(options) = &self.options
231            && verbose()
232        {
233            eprintln!(
234                "build object-store: file_cache_ttl: {}",
235                options.file_cache_ttl
236            )
237        }
238
239        let store = match self.cloud_type {
240            CloudType::Aws => {
241                #[cfg(feature = "aws")]
242                {
243                    let store = options
244                        .build_aws(self.path.clone(), clear_cached_credentials)
245                        .await?;
246                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
247                }
248                #[cfg(not(feature = "aws"))]
249                return err_missing_feature("aws", &self.cloud_type);
250            },
251            CloudType::Gcp => {
252                #[cfg(feature = "gcp")]
253                {
254                    let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;
255
256                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
257                }
258                #[cfg(not(feature = "gcp"))]
259                return err_missing_feature("gcp", &self.cloud_type);
260            },
261            CloudType::Azure => {
262                {
263                    #[cfg(feature = "azure")]
264                    {
265                        let store =
266                            options.build_azure(self.path.clone(), clear_cached_credentials)?;
267                        Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
268                    }
269                }
270                #[cfg(not(feature = "azure"))]
271                return err_missing_feature("azure", &self.cloud_type);
272            },
273            CloudType::File => {
274                let local = LocalFileSystem::new();
275                Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
276            },
277            CloudType::Http => {
278                {
279                    #[cfg(feature = "http")]
280                    {
281                        let store = options.build_http(self.path.clone())?;
282                        PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
283                    }
284                }
285                #[cfg(not(feature = "http"))]
286                return err_missing_feature("http", &cloud_location.scheme);
287            },
288            CloudType::Hf => panic!("impl error: unresolved hf:// path"),
289            CloudType::Ext(scheme) => {
290                let prefix = &self.path.as_str()[..self.path.authority_end_position()];
291
292                verbose_print_sensitive(|| {
293                    format!(
294                        "build external object_store: scheme='{}', prefix='{}', options={:?}",
295                        scheme, prefix, self.options
296                    )
297                });
298
299                let store = EXT_OBJECT_STORE_BUILDER_REGISTRY
300                    .read()
301                    .unwrap()
302                    .get(scheme)
303                    .ok_or_else(|| {
304                        polars_err!(
305                            ComputeError:
306                            "no object_store_builder registered for prefix: {}; \
307                             call register_object_store_builder() before executing queries \
308                             against the scheme: {}",
309                            prefix, scheme
310                        )
311                    })?
312                    .build(&self.path, self.options.as_ref())?;
313
314                return Ok(store);
315            },
316        }?;
317
318        Ok(store)
319    }
320
321    /// Note: Use `build_impl` for a non-caching version.
322    pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
323        let opt_cache_key = match self.cloud_type {
324            CloudType::Aws | CloudType::Gcp | CloudType::Azure => {
325                Some(path_and_creds_to_key(&self.path, self.options.as_ref())?)
326            },
327            CloudType::File | CloudType::Http | CloudType::Hf => None,
328            CloudType::Ext(scheme) => {
329                let registry = EXT_OBJECT_STORE_BUILDER_REGISTRY.read().unwrap();
330                let builder = registry.get(scheme).ok_or_else(|| {
331                    polars_err!(
332                        ComputeError:
333                        "no object_store_builder registered for scheme '{}'; \
334                         call register_object_store_builder() before executing queries \
335                         against this scheme",
336                        scheme
337                    )
338                })?;
339
340                let key = match builder.stable_cache_key(&self.path, self.options.as_ref()) {
341                    Some(key) => key,
342                    None => path_and_creds_to_key(&self.path, self.options.as_ref())?,
343                };
344
345                Some(key)
346            },
347        };
348
349        let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
350            let cache = OBJECT_STORE_CACHE.read().await;
351
352            if let Some(store) = cache.get(cache_key) {
353                return Ok(store.clone());
354            }
355
356            drop(cache);
357
358            let cache = OBJECT_STORE_CACHE.write().await;
359
360            if let Some(store) = cache.get(cache_key) {
361                return Ok(store.clone());
362            }
363
364            Some(cache)
365        } else {
366            None
367        };
368
369        let store = self.build_impl(false).await?;
370        let store = PolarsObjectStore::new_from_inner(store, self);
371
372        if let Some(mut cache) = opt_cache_write_guard {
373            // Clear the cache if we surpass a certain amount of buckets.
374            if cache.len() >= 8 {
375                if config::verbose() {
376                    eprintln!(
377                        "build_object_store: clearing store cache (cache.len(): {})",
378                        cache.len()
379                    );
380                }
381                cache.clear()
382            }
383
384            cache.insert(opt_cache_key.unwrap(), store.clone());
385        }
386
387        Ok(store)
388    }
389
390    pub(crate) fn is_azure(&self) -> bool {
391        matches!(&self.cloud_type, CloudType::Azure)
392    }
393}
394
395/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
396pub async fn build_object_store(
397    path: PlRefPath,
398    #[cfg_attr(
399        not(any(feature = "aws", feature = "gcp", feature = "azure")),
400        allow(unused_variables)
401    )]
402    options: Option<&CloudOptions>,
403    glob: bool,
404) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
405    let path = path.to_absolute_path()?.into_owned();
406
407    let cloud_type = path
408        .scheme()
409        .map_or(CloudType::File, CloudType::from_cloud_scheme);
410    let cloud_location = CloudLocation::new(path.clone(), glob)?;
411
412    let store = PolarsObjectStoreBuilder {
413        path,
414        cloud_type,
415        options: options.cloned(),
416    }
417    .build()
418    .await?;
419
420    Ok((cloud_location, store))
421}
422
423mod test {
424    #[test]
425    fn test_object_path_from_str() {
426        use super::object_path_from_str;
427
428        let path = "%25";
429        let out = object_path_from_str(path).unwrap();
430
431        assert_eq!(out.as_ref(), path);
432    }
433}
434
435#[cfg(all(test, feature = "cloud"))]
436mod ext_store_tests {
437    use std::sync::Arc;
438
439    use object_store::ObjectStore;
440    use object_store::memory::InMemory;
441    use polars_utils::pl_path::PlRefPath;
442    use polars_utils::relaxed_cell::RelaxedCell;
443
444    use super::*;
445
446    struct TestBuilder {
447        store: Arc<dyn ObjectStore + Send + Sync>,
448        build_count: RelaxedCell<usize>,
449    }
450
451    impl TestBuilder {
452        fn new() -> Arc<Self> {
453            Arc::new(Self {
454                store: Arc::new(InMemory::new()),
455                build_count: RelaxedCell::new_usize(0),
456            })
457        }
458
459        fn build_count(&self) -> usize {
460            self.build_count.load()
461        }
462
463        fn inc_build_count(&self) {
464            self.build_count.fetch_add(1);
465        }
466    }
467
468    impl ExtObjectStoreBuilder for TestBuilder {
469        fn build(
470            &self,
471            _url: &PlRefPath,
472            _options: Option<&CloudOptions>,
473        ) -> PolarsResult<Arc<dyn ObjectStore + Send + Sync>> {
474            self.inc_build_count();
475            Ok(self.store.clone())
476        }
477    }
478
479    #[tokio::test]
480    async fn test_register_and_resolve() -> PolarsResult<()> {
481        let builder = TestBuilder::new();
482        polars_utils::pl_path::_allow_ext_scheme("pl-test1")?;
483        register_object_store_builder("pl-test1", builder.clone()).unwrap();
484
485        let path = PlRefPath::new("pl-test1://host:1234/data/file.parquet");
486        let result = build_object_store(path, None, false).await;
487        assert!(result.is_ok());
488        assert_eq!(builder.build_count(), 1);
489
490        deregister_object_store_builder("pl-test1");
491        polars_utils::pl_path::_disallow_ext_scheme("pl-test1");
492        Ok(())
493    }
494
495    #[tokio::test]
496    async fn test_cache_hit_after_first_build() -> PolarsResult<()> {
497        let builder = TestBuilder::new();
498        polars_utils::pl_path::_allow_ext_scheme("pl-test2")?;
499        register_object_store_builder("pl-test2", builder.clone()).unwrap();
500
501        let path = PlRefPath::new("pl-test2://host:1234/data/file.parquet");
502
503        // First call — cache miss, build_impl called
504        build_object_store(path.clone(), None, false).await.unwrap();
505        assert_eq!(builder.build_count(), 1);
506
507        // Second call — cache hit, build_impl not called
508        build_object_store(path.clone(), None, false).await.unwrap();
509        assert_eq!(builder.build_count(), 1);
510
511        deregister_object_store_builder("pl-test2");
512        polars_utils::pl_path::_disallow_ext_scheme("pl-test2");
513        Ok(())
514    }
515
516    #[test]
517    fn test_native_scheme_rejected() {
518        let builder = TestBuilder::new();
519        let result = register_object_store_builder("s3", builder);
520        assert!(result.is_err());
521        assert!(result.unwrap_err().to_string().contains("handled natively"));
522    }
523
524    #[tokio::test]
525    async fn test_stable_cache_key_override() -> PolarsResult<()> {
526        #[derive(Clone)]
527        struct AuthorityOnlyBuilder {
528            store: Arc<dyn ObjectStore + Send + Sync>,
529            build_count: Arc<RelaxedCell<usize>>,
530        }
531
532        impl AuthorityOnlyBuilder {
533            fn new() -> Self {
534                Self {
535                    store: Arc::new(InMemory::new()),
536                    build_count: Arc::new(RelaxedCell::new_usize(0)),
537                }
538            }
539
540            fn build_count(&self) -> usize {
541                self.build_count.load()
542            }
543
544            fn inc_build_count(&self) -> usize {
545                self.build_count.fetch_add(1)
546            }
547        }
548
549        impl ExtObjectStoreBuilder for AuthorityOnlyBuilder {
550            fn build(
551                &self,
552                _url: &PlRefPath,
553                _options: Option<&CloudOptions>,
554            ) -> PolarsResult<Arc<dyn ObjectStore + Send + Sync>> {
555                self.inc_build_count();
556                Ok(self.store.clone())
557            }
558
559            fn stable_cache_key(
560                &self,
561                url: &PlRefPath,
562                _options: Option<&CloudOptions>,
563            ) -> Option<Vec<u8>> {
564                let authority = &url.as_str()[..url.authority_end_position()];
565                Some(authority.as_bytes().to_vec())
566            }
567        }
568
569        let builder = AuthorityOnlyBuilder::new();
570        polars_utils::pl_path::_allow_ext_scheme("pl-test3")?;
571        register_object_store_builder("pl-test3", Arc::new(builder.clone())).unwrap();
572
573        use crate::cloud::{CloudConfig, CloudOptions};
574
575        let options_a = CloudOptions {
576            config: Some(CloudConfig::Ext {
577                options: vec![("user".to_string(), "alice".to_string())],
578            }),
579            ..CloudOptions::default()
580        };
581
582        let options_b = CloudOptions {
583            config: Some(CloudConfig::Ext {
584                options: vec![("user".to_string(), "bob".to_string())],
585            }),
586            ..CloudOptions::default()
587        };
588
589        let path = PlRefPath::new("pl-test3://host:1234/data/file.parquet");
590
591        build_object_store(path.clone(), Some(&options_a), false)
592            .await
593            .unwrap();
594        build_object_store(path.clone(), Some(&options_b), false)
595            .await
596            .unwrap();
597
598        assert_eq!(builder.build_count(), 1);
599
600        deregister_object_store_builder("pl-test3");
601        polars_utils::pl_path::_disallow_ext_scheme("pl-test3");
602        Ok(())
603    }
604
605    #[tokio::test]
606    async fn test_storage_options_passed_to_builder() -> PolarsResult<()> {
607        use crate::cloud::{CloudConfig, CloudOptions};
608
609        #[allow(clippy::type_complexity)]
610        struct CapturingBuilder {
611            received_options: Arc<std::sync::Mutex<Option<Vec<(String, String)>>>>,
612            store: Arc<dyn ObjectStore + Send + Sync>,
613        }
614
615        impl ExtObjectStoreBuilder for CapturingBuilder {
616            fn build(
617                &self,
618                _url: &PlRefPath,
619                options: Option<&CloudOptions>,
620            ) -> PolarsResult<Arc<dyn ObjectStore + Send + Sync>> {
621                let captured = match options {
622                    Some(CloudOptions {
623                        config: Some(CloudConfig::Ext { options }),
624                        ..
625                    }) => Some(options.clone()),
626                    _ => None,
627                };
628                *self.received_options.lock().unwrap() = captured;
629                Ok(self.store.clone())
630            }
631        }
632
633        let received = Arc::new(std::sync::Mutex::new(None));
634
635        let builder = Arc::new(CapturingBuilder {
636            received_options: received.clone(),
637            store: Arc::new(InMemory::new()),
638        });
639
640        polars_utils::pl_path::_allow_ext_scheme("pl-test4")?;
641        register_object_store_builder("pl-test4", builder).unwrap();
642
643        let options = CloudOptions {
644            config: Some(CloudConfig::Ext {
645                options: vec![
646                    ("user".to_string(), "hadoop".to_string()),
647                    ("token".to_string(), "abc123".to_string()),
648                ],
649            }),
650            ..CloudOptions::default()
651        };
652
653        let path = PlRefPath::new("pl-test4://host:1234/data/file.parquet");
654        build_object_store(path, Some(&options), false)
655            .await
656            .unwrap();
657
658        let captured = received.lock().unwrap().clone().unwrap();
659        assert_eq!(captured.len(), 2);
660        assert!(captured.iter().any(|(k, v)| k == "user" && v == "hadoop"));
661        assert!(captured.iter().any(|(k, v)| k == "token" && v == "abc123"));
662
663        deregister_object_store_builder("pl-test4");
664        polars_utils::pl_path::_disallow_ext_scheme("pl-test4");
665        Ok(())
666    }
667}