1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_path::{ALLOWED_EXT_SCHEMES, CloudScheme, PlPath, PlRefPath};
9use polars_utils::pl_str::PlSmallStr;
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::{CloudConfig, CloudRetryConfig};
15
16#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21 LazyLock::new(Default::default);
22
23pub trait ExtObjectStoreBuilder {
25 fn build(
27 &self,
28 url: &PlRefPath,
29 options: Option<&CloudOptions>,
30 ) -> PolarsResult<Arc<dyn ObjectStore + Send + Sync>>;
31
32 fn stable_cache_key(
35 &self,
36 _url: &PlRefPath,
37 _options: Option<&CloudOptions>,
38 ) -> Option<Vec<u8>> {
39 None
40 }
41}
42
43static EXT_OBJECT_STORE_BUILDER_REGISTRY: LazyLock<
44 std::sync::RwLock<PlHashMap<PlSmallStr, Arc<dyn ExtObjectStoreBuilder + Send + Sync>>>,
45> = LazyLock::new(Default::default);
46
47pub fn register_object_store_builder(
51 scheme: &str,
52 builder: Arc<dyn ExtObjectStoreBuilder + Send + Sync>,
53) -> PolarsResult<()> {
54 if CloudScheme::is_native_str(scheme) {
57 polars_bail!(
58 InvalidOperation:
59 "cannot register object_store_builder for scheme '{}': \
60 this scheme is handled natively",
61 scheme
62 );
63 }
64
65 if !polars_utils::pl_path::ext_scheme_allowed(scheme) {
66 polars_bail!(
67 InvalidOperation:
68 "cannot register object_store_builder for scheme '{}': \
69 allowed external schemes are: {:?}",
70 scheme,
71 ALLOWED_EXT_SCHEMES
72 );
73 }
74
75 if polars_config::config().verbose() {
76 eprintln!(
77 "[ObjectStoreBuilderRegistry]: register object_store_builder for scheme '{scheme}'"
78 )
79 }
80
81 EXT_OBJECT_STORE_BUILDER_REGISTRY
82 .write()
83 .unwrap()
84 .insert(scheme.into(), builder);
85 Ok(())
86}
87
88pub fn deregister_object_store_builder(scheme: &str) {
89 if polars_config::config().verbose() {
90 eprintln!(
91 "[ObjectStoreBuilderRegistry]: deregister object_store_builder for scheme '{scheme}'"
92 )
93 }
94
95 EXT_OBJECT_STORE_BUILDER_REGISTRY
96 .write()
97 .unwrap()
98 .remove(scheme);
99}
100
101#[allow(dead_code)]
102fn err_missing_feature(
103 feature: &str,
104 cloud_type: &CloudType,
105) -> PolarsResult<Arc<dyn ObjectStore>> {
106 polars_bail!(
107 ComputeError:
108 "feature '{}' must be enabled in order to use '{:?}' cloud urls",
109 feature,
110 cloud_type,
111 );
112}
113
114fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> PolarsResult<Vec<u8>> {
116 #[cfg(feature = "cloud")]
119 let credential_cache_key = CacheKeyBytes(
120 options
121 .and_then(|o| o.credential_provider.as_ref())
122 .map(|x| x.stable_cache_key())
123 .transpose()?
124 .unwrap_or_default(),
125 );
126
127 let cloud_options = options
128 .map(
129 |CloudOptions {
130 #[cfg(feature = "file_cache")]
132 file_cache_ttl,
133 config,
134 retry_config,
135 #[cfg(feature = "cloud")]
136 credential_provider: _,
137 }|
138 -> PolarsResult<CloudOptionsKey> {
139 Ok(CloudOptionsKey {
140 #[cfg(feature = "file_cache")]
141 file_cache_ttl: *file_cache_ttl,
142 config: config.clone(),
143 retry_config: *retry_config,
144 #[cfg(feature = "cloud")]
145 credential_provider: credential_cache_key,
146 })
147 },
148 )
149 .transpose()?;
150
151 let cache_key = CacheKey {
152 url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),
153 cloud_options,
154 };
155
156 verbose_print_sensitive(|| {
157 format!(
158 "object store cache key for path at '{}': {:?}",
159 path, &cache_key
160 )
161 });
162
163 return pl_serialize::serialize_to_bytes::<_, false>(&cache_key);
164
165 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
166 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
167 struct CacheKey {
168 url_base: PlSmallStr,
169 cloud_options: Option<CloudOptionsKey>,
170 }
171
172 #[derive(Clone, PartialEq, Hash, Eq)]
173 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
174 struct CacheKeyBytes(Vec<u8>);
175
176 impl std::fmt::Debug for CacheKeyBytes {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 if self.0.is_empty() {
179 write!(f, "None")
180 } else {
181 for b in &self.0 {
182 write!(f, "{:02x}", b)?;
183 }
184 Ok(())
185 }
186 }
187 }
188
189 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
192 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
193 struct CloudOptionsKey {
194 #[cfg(feature = "file_cache")]
195 file_cache_ttl: u64,
196 config: Option<CloudConfig>,
197 retry_config: CloudRetryConfig,
198 #[cfg(feature = "cloud")]
199 credential_provider: CacheKeyBytes,
200 }
201}
202
203pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
205 object_store::path::Path::parse(path).map_err(to_compute_err)
206}
207
208#[derive(Debug, Clone)]
209pub(crate) struct PolarsObjectStoreBuilder {
210 path: PlRefPath,
211 cloud_type: CloudType,
212 options: Option<CloudOptions>,
213}
214
215impl PolarsObjectStoreBuilder {
216 pub(super) fn path(&self) -> &PlRefPath {
217 &self.path
218 }
219
220 pub(super) async fn build_impl(
221 &self,
222 clear_cached_credentials: bool,
224 ) -> PolarsResult<Arc<dyn ObjectStore>> {
225 let options = self
226 .options
227 .as_ref()
228 .unwrap_or_else(|| CloudOptions::default_static_ref());
229
230 if let Some(options) = &self.options
231 && verbose()
232 {
233 eprintln!(
234 "build object-store: file_cache_ttl: {}",
235 options.file_cache_ttl
236 )
237 }
238
239 let store = match self.cloud_type {
240 CloudType::Aws => {
241 #[cfg(feature = "aws")]
242 {
243 let store = options
244 .build_aws(self.path.clone(), clear_cached_credentials)
245 .await?;
246 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
247 }
248 #[cfg(not(feature = "aws"))]
249 return err_missing_feature("aws", &self.cloud_type);
250 },
251 CloudType::Gcp => {
252 #[cfg(feature = "gcp")]
253 {
254 let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;
255
256 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
257 }
258 #[cfg(not(feature = "gcp"))]
259 return err_missing_feature("gcp", &self.cloud_type);
260 },
261 CloudType::Azure => {
262 {
263 #[cfg(feature = "azure")]
264 {
265 let store =
266 options.build_azure(self.path.clone(), clear_cached_credentials)?;
267 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
268 }
269 }
270 #[cfg(not(feature = "azure"))]
271 return err_missing_feature("azure", &self.cloud_type);
272 },
273 CloudType::File => {
274 let local = LocalFileSystem::new();
275 Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
276 },
277 CloudType::Http => {
278 {
279 #[cfg(feature = "http")]
280 {
281 let store = options.build_http(self.path.clone())?;
282 PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
283 }
284 }
285 #[cfg(not(feature = "http"))]
286 return err_missing_feature("http", &cloud_location.scheme);
287 },
288 CloudType::Hf => panic!("impl error: unresolved hf:// path"),
289 CloudType::Ext(scheme) => {
290 let prefix = &self.path.as_str()[..self.path.authority_end_position()];
291
292 verbose_print_sensitive(|| {
293 format!(
294 "build external object_store: scheme='{}', prefix='{}', options={:?}",
295 scheme, prefix, self.options
296 )
297 });
298
299 let store = EXT_OBJECT_STORE_BUILDER_REGISTRY
300 .read()
301 .unwrap()
302 .get(scheme)
303 .ok_or_else(|| {
304 polars_err!(
305 ComputeError:
306 "no object_store_builder registered for prefix: {}; \
307 call register_object_store_builder() before executing queries \
308 against the scheme: {}",
309 prefix, scheme
310 )
311 })?
312 .build(&self.path, self.options.as_ref())?;
313
314 return Ok(store);
315 },
316 }?;
317
318 Ok(store)
319 }
320
321 pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
323 let opt_cache_key = match self.cloud_type {
324 CloudType::Aws | CloudType::Gcp | CloudType::Azure => {
325 Some(path_and_creds_to_key(&self.path, self.options.as_ref())?)
326 },
327 CloudType::File | CloudType::Http | CloudType::Hf => None,
328 CloudType::Ext(scheme) => {
329 let registry = EXT_OBJECT_STORE_BUILDER_REGISTRY.read().unwrap();
330 let builder = registry.get(scheme).ok_or_else(|| {
331 polars_err!(
332 ComputeError:
333 "no object_store_builder registered for scheme '{}'; \
334 call register_object_store_builder() before executing queries \
335 against this scheme",
336 scheme
337 )
338 })?;
339
340 let key = match builder.stable_cache_key(&self.path, self.options.as_ref()) {
341 Some(key) => key,
342 None => path_and_creds_to_key(&self.path, self.options.as_ref())?,
343 };
344
345 Some(key)
346 },
347 };
348
349 let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
350 let cache = OBJECT_STORE_CACHE.read().await;
351
352 if let Some(store) = cache.get(cache_key) {
353 return Ok(store.clone());
354 }
355
356 drop(cache);
357
358 let cache = OBJECT_STORE_CACHE.write().await;
359
360 if let Some(store) = cache.get(cache_key) {
361 return Ok(store.clone());
362 }
363
364 Some(cache)
365 } else {
366 None
367 };
368
369 let store = self.build_impl(false).await?;
370 let store = PolarsObjectStore::new_from_inner(store, self);
371
372 if let Some(mut cache) = opt_cache_write_guard {
373 if cache.len() >= 8 {
375 if config::verbose() {
376 eprintln!(
377 "build_object_store: clearing store cache (cache.len(): {})",
378 cache.len()
379 );
380 }
381 cache.clear()
382 }
383
384 cache.insert(opt_cache_key.unwrap(), store.clone());
385 }
386
387 Ok(store)
388 }
389
390 pub(crate) fn is_azure(&self) -> bool {
391 matches!(&self.cloud_type, CloudType::Azure)
392 }
393}
394
395pub async fn build_object_store(
397 path: PlRefPath,
398 #[cfg_attr(
399 not(any(feature = "aws", feature = "gcp", feature = "azure")),
400 allow(unused_variables)
401 )]
402 options: Option<&CloudOptions>,
403 glob: bool,
404) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
405 let path = path.to_absolute_path()?.into_owned();
406
407 let cloud_type = path
408 .scheme()
409 .map_or(CloudType::File, CloudType::from_cloud_scheme);
410 let cloud_location = CloudLocation::new(path.clone(), glob)?;
411
412 let store = PolarsObjectStoreBuilder {
413 path,
414 cloud_type,
415 options: options.cloned(),
416 }
417 .build()
418 .await?;
419
420 Ok((cloud_location, store))
421}
422
423mod test {
424 #[test]
425 fn test_object_path_from_str() {
426 use super::object_path_from_str;
427
428 let path = "%25";
429 let out = object_path_from_str(path).unwrap();
430
431 assert_eq!(out.as_ref(), path);
432 }
433}
434
435#[cfg(all(test, feature = "cloud"))]
436mod ext_store_tests {
437 use std::sync::Arc;
438
439 use object_store::ObjectStore;
440 use object_store::memory::InMemory;
441 use polars_utils::pl_path::PlRefPath;
442 use polars_utils::relaxed_cell::RelaxedCell;
443
444 use super::*;
445
446 struct TestBuilder {
447 store: Arc<dyn ObjectStore + Send + Sync>,
448 build_count: RelaxedCell<usize>,
449 }
450
451 impl TestBuilder {
452 fn new() -> Arc<Self> {
453 Arc::new(Self {
454 store: Arc::new(InMemory::new()),
455 build_count: RelaxedCell::new_usize(0),
456 })
457 }
458
459 fn build_count(&self) -> usize {
460 self.build_count.load()
461 }
462
463 fn inc_build_count(&self) {
464 self.build_count.fetch_add(1);
465 }
466 }
467
468 impl ExtObjectStoreBuilder for TestBuilder {
469 fn build(
470 &self,
471 _url: &PlRefPath,
472 _options: Option<&CloudOptions>,
473 ) -> PolarsResult<Arc<dyn ObjectStore + Send + Sync>> {
474 self.inc_build_count();
475 Ok(self.store.clone())
476 }
477 }
478
479 #[tokio::test]
480 async fn test_register_and_resolve() -> PolarsResult<()> {
481 let builder = TestBuilder::new();
482 polars_utils::pl_path::_allow_ext_scheme("pl-test1")?;
483 register_object_store_builder("pl-test1", builder.clone()).unwrap();
484
485 let path = PlRefPath::new("pl-test1://host:1234/data/file.parquet");
486 let result = build_object_store(path, None, false).await;
487 assert!(result.is_ok());
488 assert_eq!(builder.build_count(), 1);
489
490 deregister_object_store_builder("pl-test1");
491 polars_utils::pl_path::_disallow_ext_scheme("pl-test1");
492 Ok(())
493 }
494
495 #[tokio::test]
496 async fn test_cache_hit_after_first_build() -> PolarsResult<()> {
497 let builder = TestBuilder::new();
498 polars_utils::pl_path::_allow_ext_scheme("pl-test2")?;
499 register_object_store_builder("pl-test2", builder.clone()).unwrap();
500
501 let path = PlRefPath::new("pl-test2://host:1234/data/file.parquet");
502
503 build_object_store(path.clone(), None, false).await.unwrap();
505 assert_eq!(builder.build_count(), 1);
506
507 build_object_store(path.clone(), None, false).await.unwrap();
509 assert_eq!(builder.build_count(), 1);
510
511 deregister_object_store_builder("pl-test2");
512 polars_utils::pl_path::_disallow_ext_scheme("pl-test2");
513 Ok(())
514 }
515
516 #[test]
517 fn test_native_scheme_rejected() {
518 let builder = TestBuilder::new();
519 let result = register_object_store_builder("s3", builder);
520 assert!(result.is_err());
521 assert!(result.unwrap_err().to_string().contains("handled natively"));
522 }
523
524 #[tokio::test]
525 async fn test_stable_cache_key_override() -> PolarsResult<()> {
526 #[derive(Clone)]
527 struct AuthorityOnlyBuilder {
528 store: Arc<dyn ObjectStore + Send + Sync>,
529 build_count: Arc<RelaxedCell<usize>>,
530 }
531
532 impl AuthorityOnlyBuilder {
533 fn new() -> Self {
534 Self {
535 store: Arc::new(InMemory::new()),
536 build_count: Arc::new(RelaxedCell::new_usize(0)),
537 }
538 }
539
540 fn build_count(&self) -> usize {
541 self.build_count.load()
542 }
543
544 fn inc_build_count(&self) -> usize {
545 self.build_count.fetch_add(1)
546 }
547 }
548
549 impl ExtObjectStoreBuilder for AuthorityOnlyBuilder {
550 fn build(
551 &self,
552 _url: &PlRefPath,
553 _options: Option<&CloudOptions>,
554 ) -> PolarsResult<Arc<dyn ObjectStore + Send + Sync>> {
555 self.inc_build_count();
556 Ok(self.store.clone())
557 }
558
559 fn stable_cache_key(
560 &self,
561 url: &PlRefPath,
562 _options: Option<&CloudOptions>,
563 ) -> Option<Vec<u8>> {
564 let authority = &url.as_str()[..url.authority_end_position()];
565 Some(authority.as_bytes().to_vec())
566 }
567 }
568
569 let builder = AuthorityOnlyBuilder::new();
570 polars_utils::pl_path::_allow_ext_scheme("pl-test3")?;
571 register_object_store_builder("pl-test3", Arc::new(builder.clone())).unwrap();
572
573 use crate::cloud::{CloudConfig, CloudOptions};
574
575 let options_a = CloudOptions {
576 config: Some(CloudConfig::Ext {
577 options: vec![("user".to_string(), "alice".to_string())],
578 }),
579 ..CloudOptions::default()
580 };
581
582 let options_b = CloudOptions {
583 config: Some(CloudConfig::Ext {
584 options: vec![("user".to_string(), "bob".to_string())],
585 }),
586 ..CloudOptions::default()
587 };
588
589 let path = PlRefPath::new("pl-test3://host:1234/data/file.parquet");
590
591 build_object_store(path.clone(), Some(&options_a), false)
592 .await
593 .unwrap();
594 build_object_store(path.clone(), Some(&options_b), false)
595 .await
596 .unwrap();
597
598 assert_eq!(builder.build_count(), 1);
599
600 deregister_object_store_builder("pl-test3");
601 polars_utils::pl_path::_disallow_ext_scheme("pl-test3");
602 Ok(())
603 }
604
605 #[tokio::test]
606 async fn test_storage_options_passed_to_builder() -> PolarsResult<()> {
607 use crate::cloud::{CloudConfig, CloudOptions};
608
609 #[allow(clippy::type_complexity)]
610 struct CapturingBuilder {
611 received_options: Arc<std::sync::Mutex<Option<Vec<(String, String)>>>>,
612 store: Arc<dyn ObjectStore + Send + Sync>,
613 }
614
615 impl ExtObjectStoreBuilder for CapturingBuilder {
616 fn build(
617 &self,
618 _url: &PlRefPath,
619 options: Option<&CloudOptions>,
620 ) -> PolarsResult<Arc<dyn ObjectStore + Send + Sync>> {
621 let captured = match options {
622 Some(CloudOptions {
623 config: Some(CloudConfig::Ext { options }),
624 ..
625 }) => Some(options.clone()),
626 _ => None,
627 };
628 *self.received_options.lock().unwrap() = captured;
629 Ok(self.store.clone())
630 }
631 }
632
633 let received = Arc::new(std::sync::Mutex::new(None));
634
635 let builder = Arc::new(CapturingBuilder {
636 received_options: received.clone(),
637 store: Arc::new(InMemory::new()),
638 });
639
640 polars_utils::pl_path::_allow_ext_scheme("pl-test4")?;
641 register_object_store_builder("pl-test4", builder).unwrap();
642
643 let options = CloudOptions {
644 config: Some(CloudConfig::Ext {
645 options: vec![
646 ("user".to_string(), "hadoop".to_string()),
647 ("token".to_string(), "abc123".to_string()),
648 ],
649 }),
650 ..CloudOptions::default()
651 };
652
653 let path = PlRefPath::new("pl-test4://host:1234/data/file.parquet");
654 build_object_store(path, Some(&options), false)
655 .await
656 .unwrap();
657
658 let captured = received.lock().unwrap().clone().unwrap();
659 assert_eq!(captured.len(), 2);
660 assert!(captured.iter().any(|(k, v)| k == "user" && v == "hadoop"));
661 assert!(captured.iter().any(|(k, v)| k == "token" && v == "abc123"));
662
663 deregister_object_store_builder("pl-test4");
664 polars_utils::pl_path::_disallow_ext_scheme("pl-test4");
665 Ok(())
666 }
667}