Skip to main content

polars_io/cloud/
options.rs

1#[cfg(feature = "aws")]
2use std::io::Read;
3#[cfg(feature = "aws")]
4use std::path::Path;
5use std::str::FromStr;
6#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
7use std::sync::Arc;
8use std::sync::LazyLock;
9
10#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
11use object_store::ClientOptions;
12#[cfg(feature = "aws")]
13use object_store::aws::AmazonS3Builder;
14#[cfg(feature = "aws")]
15pub use object_store::aws::AmazonS3ConfigKey;
16#[cfg(feature = "azure")]
17pub use object_store::azure::AzureConfigKey;
18#[cfg(feature = "azure")]
19use object_store::azure::MicrosoftAzureBuilder;
20#[cfg(feature = "gcp")]
21use object_store::gcp::GoogleCloudStorageBuilder;
22#[cfg(feature = "gcp")]
23pub use object_store::gcp::GoogleConfigKey;
24use polars_error::*;
25#[cfg(feature = "aws")]
26use polars_utils::cache::LruCache;
27use polars_utils::pl_path::{CloudScheme, PlRefPath};
28use polars_utils::total_ord::TotalOrdWrap;
29#[cfg(feature = "http")]
30use reqwest::header::HeaderMap;
31#[cfg(feature = "serde")]
32use serde::{Deserialize, Serialize};
33
34#[cfg(feature = "cloud")]
35use super::credential_provider::PlCredentialProvider;
36#[cfg(feature = "cloud")]
37use super::dns::get_dns_cache_ttl;
38#[cfg(feature = "cloud")]
39use crate::cloud::ObjectStoreErrorContext;
40#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
41use crate::cloud::dns::CachingResolver;
42#[cfg(feature = "file_cache")]
43use crate::file_cache::get_env_file_cache_ttl;
44#[cfg(feature = "aws")]
45use crate::pl_async::with_concurrency_budget;
46
47#[cfg(feature = "aws")]
48static BUCKET_REGION: LazyLock<
49    std::sync::Mutex<LruCache<polars_utils::pl_str::PlSmallStr, polars_utils::pl_str::PlSmallStr>>,
50> = LazyLock::new(|| std::sync::Mutex::new(LruCache::with_capacity(32)));
51
52/// The type of the config keys must satisfy the following requirements:
53/// 1. must be easily collected into a HashMap, the type required by the object_crate API.
54/// 2. be Serializable, required when the serde-lazy feature is defined.
55/// 3. not actually use HashMap since that type is disallowed in Polars for performance reasons.
56///
57/// Currently this type is a vector of pairs config key - config value.
58#[allow(dead_code)]
59type Configs<T> = Vec<(T, String)>;
60
61#[derive(Clone, Debug, PartialEq, Hash, Eq)]
62#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
63#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
64pub enum CloudConfig {
65    #[cfg(feature = "aws")]
66    Aws(
67        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
68        Configs<AmazonS3ConfigKey>,
69    ),
70    #[cfg(feature = "azure")]
71    Azure(
72        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
73        Configs<AzureConfigKey>,
74    ),
75    #[cfg(feature = "gcp")]
76    Gcp(
77        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
78        Configs<GoogleConfigKey>,
79    ),
80    #[cfg(feature = "http")]
81    Http {
82        headers: Vec<(String, String)>,
83    },
84    Ext {
85        options: Vec<(String, String)>,
86    },
87}
88
89#[derive(Clone, Debug, PartialEq, Hash, Eq)]
90#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
91#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
92/// Options to connect to various cloud providers.
93pub struct CloudOptions {
94    #[cfg(feature = "file_cache")]
95    pub file_cache_ttl: u64,
96    pub config: Option<CloudConfig>,
97    #[cfg_attr(feature = "serde", serde(default))]
98    pub retry_config: CloudRetryConfig,
99    #[cfg(feature = "cloud")]
100    /// Note: In most cases you will want to access this via [`CloudOptions::initialized_credential_provider`]
101    /// rather than directly.
102    pub(crate) credential_provider: Option<PlCredentialProvider>,
103}
104
105impl Default for CloudOptions {
106    fn default() -> Self {
107        Self::default_static_ref().clone()
108    }
109}
110
111impl CloudOptions {
112    pub fn default_static_ref() -> &'static Self {
113        static DEFAULT: LazyLock<CloudOptions> = LazyLock::new(|| CloudOptions {
114            #[cfg(feature = "file_cache")]
115            file_cache_ttl: get_env_file_cache_ttl(),
116            config: None,
117            retry_config: CloudRetryConfig::default(),
118            #[cfg(feature = "cloud")]
119            credential_provider: None,
120        });
121
122        &DEFAULT
123    }
124}
125
126#[derive(Clone, Copy, Default, Debug, PartialEq, Hash, Eq)]
127#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
128#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
129pub struct CloudRetryConfig {
130    pub max_retries: Option<usize>,
131    pub retry_timeout: Option<std::time::Duration>,
132    pub retry_init_backoff: Option<std::time::Duration>,
133    pub retry_max_backoff: Option<std::time::Duration>,
134    pub retry_base_multiplier: Option<TotalOrdWrap<f64>>,
135}
136
137#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
138impl From<CloudRetryConfig> for object_store::RetryConfig {
139    fn from(value: CloudRetryConfig) -> Self {
140        use std::time::Duration;
141
142        use polars_core::config::verbose;
143
144        let out = object_store::RetryConfig {
145            backoff: object_store::BackoffConfig {
146                init_backoff: value
147                    .retry_init_backoff
148                    .unwrap_or_else(|| DEFAULTS.backoff.init_backoff),
149                max_backoff: value
150                    .retry_max_backoff
151                    .unwrap_or_else(|| DEFAULTS.backoff.max_backoff),
152                base: value
153                    .retry_base_multiplier
154                    .map_or_else(|| DEFAULTS.backoff.base, |x| x.0),
155            },
156            max_retries: value.max_retries.unwrap_or_else(|| DEFAULTS.max_retries),
157            retry_timeout: value
158                .retry_timeout
159                .unwrap_or_else(|| DEFAULTS.retry_timeout),
160        };
161
162        if verbose() {
163            eprintln!("object-store retry config: {:?}", &out)
164        }
165
166        return out;
167
168        static DEFAULTS: LazyLock<object_store::RetryConfig> =
169            LazyLock::new(|| object_store::RetryConfig {
170                backoff: object_store::BackoffConfig {
171                    init_backoff: Duration::from_millis(parse_env_var(
172                        100,
173                        "POLARS_CLOUD_RETRY_INIT_BACKOFF_MS",
174                    )),
175                    max_backoff: Duration::from_millis(parse_env_var(
176                        15 * 1000,
177                        "POLARS_CLOUD_RETRY_MAX_BACKOFF_MS",
178                    )),
179                    base: parse_env_var(2., "POLARS_CLOUD_RETRY_BASE_MULTIPLIER"),
180                },
181                max_retries: parse_env_var(2, "POLARS_CLOUD_MAX_RETRIES"),
182                retry_timeout: Duration::from_millis(parse_env_var(
183                    10 * 1000,
184                    "POLARS_CLOUD_RETRY_TIMEOUT_MS",
185                )),
186            });
187
188        fn parse_env_var<T: FromStr>(default: T, name: &'static str) -> T {
189            std::env::var(name).map_or(default, |x| {
190                x.parse::<T>()
191                    .ok()
192                    .unwrap_or_else(|| panic!("invalid value for {name}: {x}"))
193            })
194        }
195    }
196}
197
198#[cfg(feature = "http")]
199pub(crate) fn try_build_http_header_map_from_items_slice<S: AsRef<str>>(
200    headers: &[(S, S)],
201) -> PolarsResult<HeaderMap> {
202    use reqwest::header::{HeaderName, HeaderValue};
203
204    let mut map = HeaderMap::with_capacity(headers.len());
205    for (k, v) in headers {
206        let (k, v) = (k.as_ref(), v.as_ref());
207        map.insert(
208            HeaderName::from_str(k).map_err(to_compute_err)?,
209            HeaderValue::from_str(v).map_err(to_compute_err)?,
210        );
211    }
212
213    Ok(map)
214}
215
216#[allow(dead_code)]
217/// Parse an untype configuration hashmap to a typed configuration for the given configuration key type.
218fn parse_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
219    config: I,
220) -> PolarsResult<Configs<T>>
221where
222    T: FromStr + Eq + std::hash::Hash,
223{
224    Ok(config
225        .into_iter()
226        // Silently ignores custom upstream storage_options
227        .filter_map(|(key, val)| {
228            T::from_str(key.as_ref().to_ascii_lowercase().as_str())
229                .ok()
230                .map(|typed_key| (typed_key, val.into()))
231        })
232        .collect::<Configs<T>>())
233}
234
235#[derive(Debug, Copy, Clone, PartialEq)]
236pub enum CloudType {
237    Aws,
238    Azure,
239    /// URI with 'file:' scheme
240    File,
241    /// Google cloud platform
242    Gcp,
243    Http,
244    /// HuggingFace
245    Hf,
246    /// Externally registered scheme (e.g. hdfs:// as "hdfs")
247    Ext(&'static str),
248}
249
250impl CloudType {
251    pub fn from_cloud_scheme(scheme: CloudScheme) -> Self {
252        match scheme {
253            CloudScheme::Abfs
254            | CloudScheme::Abfss
255            | CloudScheme::Adl
256            | CloudScheme::Az
257            | CloudScheme::Azure => Self::Azure,
258
259            CloudScheme::File | CloudScheme::FileNoHostname => Self::File,
260
261            CloudScheme::Gcs | CloudScheme::Gs => Self::Gcp,
262
263            CloudScheme::Hf => Self::Hf,
264
265            CloudScheme::Http | CloudScheme::Https => Self::Http,
266
267            CloudScheme::S3 | CloudScheme::S3a => Self::Aws,
268
269            CloudScheme::Ext(scheme) => Self::Ext(scheme),
270        }
271    }
272}
273
274pub static USER_AGENT: &str = concat!("polars", "/", env!("CARGO_PKG_VERSION"),);
275
276#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
277pub(super) fn get_client_options() -> ClientOptions {
278    use std::num::NonZeroU64;
279
280    use reqwest::header::HeaderValue;
281
282    ClientOptions::new()
283        // Disables the time limit for downloading the response body.
284        .with_timeout_disabled()
285        // Set the time limit for establishing the connection.
286        .with_connect_timeout(std::time::Duration::from_secs(
287            std::env::var("POLARS_HTTP_CONNECT_TIMEOUT_SECONDS")
288                .map(|x| {
289                    x.parse::<NonZeroU64>()
290                        .ok()
291                        .unwrap_or_else(|| {
292                            panic!("invalid value for POLARS_HTTP_CONNECT_TIMEOUT_SECONDS: {x}")
293                        })
294                        .get()
295                })
296                .unwrap_or(5 * 60),
297        ))
298        .with_user_agent(HeaderValue::from_static(USER_AGENT))
299        .with_allow_http(true)
300        .with_dns_resolver(Arc::new(CachingResolver::new(get_dns_cache_ttl())))
301}
302
303#[cfg(feature = "aws")]
304fn read_config(
305    builder: &mut AmazonS3Builder,
306    items: &[(&Path, &[(&str, AmazonS3ConfigKey)])],
307) -> Option<()> {
308    use crate::path_utils::resolve_homedir;
309
310    for (path, keys) in items {
311        if keys
312            .iter()
313            .all(|(_, key)| builder.get_config_value(key).is_some())
314        {
315            continue;
316        }
317
318        let mut config = std::fs::File::open(resolve_homedir(path)).ok()?;
319        let mut buf = vec![];
320        config.read_to_end(&mut buf).ok()?;
321        let content = std::str::from_utf8(buf.as_ref()).ok()?;
322
323        for (pattern, key) in keys.iter() {
324            if builder.get_config_value(key).is_none() {
325                let reg = polars_utils::regex_cache::compile_regex(pattern).unwrap();
326                let cap = reg.captures(content)?;
327                let m = cap.get(1)?;
328                let parsed = m.as_str();
329                *builder = std::mem::take(builder).with_config(*key, parsed);
330            }
331        }
332    }
333    Some(())
334}
335
336impl CloudOptions {
337    pub fn with_retry_config(mut self, retry_config: CloudRetryConfig) -> Self {
338        self.retry_config = retry_config;
339        self
340    }
341
342    #[cfg(feature = "cloud")]
343    pub fn with_credential_provider(
344        mut self,
345        credential_provider: Option<PlCredentialProvider>,
346    ) -> Self {
347        self.credential_provider = credential_provider;
348        self
349    }
350
351    /// Set the configuration for AWS connections. This is the preferred API from rust.
352    #[cfg(feature = "aws")]
353    pub fn with_aws<I: IntoIterator<Item = (AmazonS3ConfigKey, impl Into<String>)>>(
354        mut self,
355        configs: I,
356    ) -> Self {
357        self.config = Some(CloudConfig::Aws(
358            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
359        ));
360        self
361    }
362
363    /// Build the [`object_store::ObjectStore`] implementation for AWS.
364    #[cfg(feature = "aws")]
365    pub async fn build_aws(
366        &self,
367        url: PlRefPath,
368        clear_cached_credentials: bool,
369    ) -> PolarsResult<impl object_store::ObjectStore> {
370        use super::credential_provider::IntoCredentialProvider;
371
372        let opt_credential_provider =
373            self.initialized_credential_provider(clear_cached_credentials)?;
374
375        let mut builder = AmazonS3Builder::from_env()
376            .with_client_options(get_client_options())
377            .with_url(url.clone().to_string());
378
379        if let Some(credential_provider) = &opt_credential_provider {
380            let storage_update_options = parse_untyped_config::<AmazonS3ConfigKey, _>(
381                credential_provider
382                    .storage_update_options()?
383                    .into_iter()
384                    .map(|(k, v)| (k, v.to_string())),
385            )?;
386
387            for (key, value) in storage_update_options {
388                builder = builder.with_config(key, value);
389            }
390        }
391
392        read_config(
393            &mut builder,
394            &[(
395                Path::new("~/.aws/config"),
396                &[("region\\s*=\\s*([^\r\n]*)", AmazonS3ConfigKey::Region)],
397            )],
398        );
399
400        read_config(
401            &mut builder,
402            &[(
403                Path::new("~/.aws/credentials"),
404                &[
405                    (
406                        "aws_access_key_id\\s*=\\s*([^\\r\\n]*)",
407                        AmazonS3ConfigKey::AccessKeyId,
408                    ),
409                    (
410                        "aws_secret_access_key\\s*=\\s*([^\\r\\n]*)",
411                        AmazonS3ConfigKey::SecretAccessKey,
412                    ),
413                    (
414                        "aws_session_token\\s*=\\s*([^\\r\\n]*)",
415                        AmazonS3ConfigKey::Token,
416                    ),
417                ],
418            )],
419        );
420
421        if let Some(options) = &self.config {
422            let CloudConfig::Aws(options) = options else {
423                panic!("impl error: cloud type mismatch")
424            };
425            for (key, value) in options {
426                builder = builder.with_config(*key, value);
427            }
428        }
429
430        if builder
431            .get_config_value(&AmazonS3ConfigKey::DefaultRegion)
432            .is_none()
433            && builder
434                .get_config_value(&AmazonS3ConfigKey::Region)
435                .is_none()
436        {
437            let bucket = crate::cloud::CloudLocation::new(url.clone(), false)?.bucket;
438            let region = {
439                let mut bucket_region = BUCKET_REGION.lock().unwrap();
440                bucket_region.get(bucket.as_str()).cloned()
441            };
442
443            match region {
444                Some(region) => {
445                    builder = builder.with_config(AmazonS3ConfigKey::Region, region.as_str())
446                },
447                None => {
448                    if builder
449                        .get_config_value(&AmazonS3ConfigKey::Endpoint)
450                        .is_some()
451                    {
452                        // Set a default value if the endpoint is not aws.
453                        // See: #13042
454                        builder = builder.with_config(AmazonS3ConfigKey::Region, "us-east-1");
455                    } else {
456                        polars_warn!(
457                            "'(default_)region' not set; polars will try to get it from bucket\n\nSet the region manually to silence this warning."
458                        );
459                        let result = with_concurrency_budget(1, || async {
460                            reqwest::Client::builder()
461                                .user_agent(USER_AGENT)
462                                .build()
463                                .unwrap()
464                                .head(format!("https://{bucket}.s3.amazonaws.com"))
465                                .send()
466                                .await
467                                .map_err(to_compute_err)
468                        })
469                        .await?;
470                        if let Some(region) = result.headers().get("x-amz-bucket-region") {
471                            let region =
472                                std::str::from_utf8(region.as_bytes()).map_err(to_compute_err)?;
473                            let mut bucket_region = BUCKET_REGION.lock().unwrap();
474                            bucket_region.insert(bucket, region.into());
475                            builder = builder.with_config(AmazonS3ConfigKey::Region, region)
476                        }
477                    }
478                },
479            };
480        };
481
482        let builder = builder.with_retry(self.retry_config.into());
483
484        let opt_credential_provider = match opt_credential_provider {
485            #[cfg(feature = "python")]
486            Some(PlCredentialProvider::Python(object)) => {
487                if pyo3::Python::attach(|py| {
488                    let Ok(func_object) = object
489                        .unwrap_as_provider_ref()
490                        .getattr(py, "_can_use_as_provider")
491                    else {
492                        return PolarsResult::Ok(true);
493                    };
494
495                    Ok(func_object.call0(py)?.extract::<bool>(py).unwrap())
496                })? {
497                    Some(PlCredentialProvider::Python(object))
498                } else {
499                    None
500                }
501            },
502
503            v => v,
504        };
505
506        let builder = if let Some(credential_provider) = opt_credential_provider {
507            builder.with_credentials(credential_provider.into_aws_provider())
508        } else {
509            builder
510        };
511
512        let out = builder
513            .with_checksum_algorithm(object_store::aws::Checksum::CRC64NVME)
514            .with_unsigned_payload(true)
515            .build()
516            .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
517
518        Ok(out)
519    }
520
521    /// Set the configuration for Azure connections. This is the preferred API from rust.
522    #[cfg(feature = "azure")]
523    pub fn with_azure<I: IntoIterator<Item = (AzureConfigKey, impl Into<String>)>>(
524        mut self,
525        configs: I,
526    ) -> Self {
527        self.config = Some(CloudConfig::Azure(
528            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
529        ));
530        self
531    }
532
533    /// Build the [`object_store::ObjectStore`] implementation for Azure.
534    #[cfg(feature = "azure")]
535    pub fn build_azure(
536        &self,
537        url: PlRefPath,
538        clear_cached_credentials: bool,
539    ) -> PolarsResult<impl object_store::ObjectStore> {
540        use super::credential_provider::IntoCredentialProvider;
541        use crate::cloud::ObjectStoreErrorContext;
542
543        let verbose = polars_core::config::verbose();
544
545        // The credential provider `self.credentials` is prioritized if it is set. We also need
546        // `from_env()` as it may source environment configured storage account name.
547        let mut builder =
548            MicrosoftAzureBuilder::from_env().with_client_options(get_client_options());
549
550        if let Some(options) = &self.config {
551            let CloudConfig::Azure(options) = options else {
552                panic!("impl error: cloud type mismatch")
553            };
554            for (key, value) in options.iter() {
555                builder = builder.with_config(*key, value);
556            }
557        }
558
559        let builder = builder
560            .with_url(url.to_string())
561            .with_retry(self.retry_config.into());
562
563        let builder =
564            if let Some(v) = self.initialized_credential_provider(clear_cached_credentials)? {
565                if verbose {
566                    eprintln!(
567                        "[CloudOptions::build_azure]: Using credential provider {:?}",
568                        &v
569                    );
570                }
571                builder.with_credentials(v.into_azure_provider())
572            } else {
573                builder
574            };
575
576        let out = builder
577            .build()
578            .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
579
580        Ok(out)
581    }
582
583    /// Set the configuration for GCP connections. This is the preferred API from rust.
584    #[cfg(feature = "gcp")]
585    pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
586        mut self,
587        configs: I,
588    ) -> Self {
589        self.config = Some(CloudConfig::Gcp(
590            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
591        ));
592        self
593    }
594
595    /// Build the [`object_store::ObjectStore`] implementation for GCP.
596    #[cfg(feature = "gcp")]
597    pub fn build_gcp(
598        &self,
599        url: PlRefPath,
600        clear_cached_credentials: bool,
601    ) -> PolarsResult<impl object_store::ObjectStore> {
602        use super::credential_provider::IntoCredentialProvider;
603
604        let credential_provider = self.initialized_credential_provider(clear_cached_credentials)?;
605
606        let builder = if credential_provider.is_none() {
607            GoogleCloudStorageBuilder::from_env()
608        } else {
609            GoogleCloudStorageBuilder::new()
610        };
611
612        let mut builder = builder.with_client_options(get_client_options());
613
614        if let Some(options) = &self.config {
615            let CloudConfig::Gcp(options) = options else {
616                panic!("impl error: cloud type mismatch")
617            };
618            for (key, value) in options.iter() {
619                builder = builder.with_config(*key, value);
620            }
621        }
622
623        let builder = builder
624            .with_url(url.to_string())
625            .with_retry(self.retry_config.into());
626
627        let builder = if let Some(v) = credential_provider {
628            builder.with_credentials(v.into_gcp_provider())
629        } else {
630            builder
631        };
632
633        let out = builder
634            .build()
635            .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
636
637        Ok(out)
638    }
639
640    #[cfg(feature = "http")]
641    pub fn build_http(&self, url: PlRefPath) -> PolarsResult<impl object_store::ObjectStore> {
642        let out = object_store::http::HttpBuilder::new()
643            .with_url(url.to_string())
644            .with_client_options({
645                let mut opts = super::get_client_options();
646                if let Some(CloudConfig::Http { headers }) = &self.config {
647                    opts = opts.with_default_headers(try_build_http_header_map_from_items_slice(
648                        headers.as_slice(),
649                    )?);
650                }
651                opts
652            })
653            .build()
654            .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
655
656        Ok(out)
657    }
658
659    /// Parse a configuration from a Hashmap. This is the interface from Python.
660    #[allow(unused_variables)]
661    pub fn from_untyped_config<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
662        scheme: Option<CloudScheme>,
663        config: I,
664    ) -> PolarsResult<Self> {
665        match scheme.map_or(CloudType::File, CloudType::from_cloud_scheme) {
666            CloudType::Aws => {
667                #[cfg(feature = "aws")]
668                {
669                    parse_untyped_config::<AmazonS3ConfigKey, _>(config)
670                        .map(|aws| Self::default().with_aws(aws))
671                }
672                #[cfg(not(feature = "aws"))]
673                {
674                    polars_bail!(ComputeError: "'aws' feature is not enabled");
675                }
676            },
677            CloudType::Azure => {
678                #[cfg(feature = "azure")]
679                {
680                    parse_untyped_config::<AzureConfigKey, _>(config)
681                        .map(|azure| Self::default().with_azure(azure))
682                }
683                #[cfg(not(feature = "azure"))]
684                {
685                    polars_bail!(ComputeError: "'azure' feature is not enabled");
686                }
687            },
688            CloudType::File => Ok(Self::default()),
689            CloudType::Http => Ok(Self::default()),
690            CloudType::Gcp => {
691                #[cfg(feature = "gcp")]
692                {
693                    parse_untyped_config::<GoogleConfigKey, _>(config)
694                        .map(|gcp| Self::default().with_gcp(gcp))
695                }
696                #[cfg(not(feature = "gcp"))]
697                {
698                    polars_bail!(ComputeError: "'gcp' feature is not enabled");
699                }
700            },
701            CloudType::Hf => {
702                #[cfg(feature = "http")]
703                {
704                    use polars_core::config;
705
706                    use crate::path_utils::resolve_homedir;
707
708                    let mut this = Self::default();
709                    let mut token = None;
710                    let verbose = config::verbose();
711
712                    for (i, (k, v)) in config.into_iter().enumerate() {
713                        let (k, v) = (k.as_ref(), v.into());
714
715                        if i == 0 && k == "token" {
716                            if verbose {
717                                eprintln!("HF token sourced from storage_options");
718                            }
719                            token = Some(v);
720                        } else {
721                            polars_bail!(ComputeError: "unknown configuration key for HF: {}", k)
722                        }
723                    }
724
725                    token = token
726                        .or_else(|| {
727                            let v = std::env::var("HF_TOKEN").ok();
728                            if v.is_some() && verbose {
729                                eprintln!("HF token sourced from HF_TOKEN env var");
730                            }
731                            v
732                        })
733                        .or_else(|| {
734                            let hf_home = std::env::var("HF_HOME");
735                            let hf_home = hf_home.as_deref();
736                            let hf_home = hf_home.unwrap_or("~/.cache/huggingface");
737                            let hf_home = resolve_homedir(hf_home);
738                            let cached_token_path = hf_home.join("token");
739
740                            let v = std::string::String::from_utf8(
741                                std::fs::read(&cached_token_path).ok()?,
742                            )
743                            .ok()
744                            .filter(|x| !x.is_empty());
745
746                            if v.is_some() && verbose {
747                                eprintln!("HF token sourced from {:?}", cached_token_path);
748                            }
749
750                            v
751                        });
752
753                    if let Some(v) = token {
754                        this.config = Some(CloudConfig::Http {
755                            headers: vec![("Authorization".into(), format!("Bearer {v}"))],
756                        })
757                    }
758
759                    Ok(this)
760                }
761                #[cfg(not(feature = "http"))]
762                {
763                    polars_bail!(ComputeError: "'http' feature is not enabled");
764                }
765            },
766            CloudType::Ext(_) => {
767                let pairs: Vec<(String, String)> = config
768                    .into_iter()
769                    .map(|(k, v)| (k.as_ref().to_string(), v.into()))
770                    .collect();
771
772                Ok(Self {
773                    config: if pairs.is_empty() {
774                        None
775                    } else {
776                        Some(CloudConfig::Ext { options: pairs })
777                    },
778                    ..Self::default()
779                })
780            },
781        }
782    }
783
784    /// Python passes a credential provider builder that needs to be called to get the actual credential
785    /// provider.
786    #[cfg(feature = "cloud")]
787    fn initialized_credential_provider(
788        &self,
789        clear_cached_credentials: bool,
790    ) -> PolarsResult<Option<PlCredentialProvider>> {
791        if let Some(v) = self.credential_provider.clone() {
792            v.try_into_initialized(clear_cached_credentials)
793        } else {
794            Ok(None)
795        }
796    }
797}
798
799#[cfg(feature = "cloud")]
800#[cfg(test)]
801mod tests {
802    use hashbrown::HashMap;
803
804    use super::parse_untyped_config;
805
806    #[cfg(feature = "aws")]
807    #[test]
808    fn test_parse_untyped_config() {
809        use object_store::aws::AmazonS3ConfigKey;
810
811        let aws_config = [
812            ("aws_secret_access_key", "a_key"),
813            ("aws_s3_allow_unsafe_rename", "true"),
814        ]
815        .into_iter()
816        .collect::<HashMap<_, _>>();
817        let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
818            .expect("Parsing keys shouldn't have thrown an error");
819
820        assert_eq!(
821            aws_keys.first().unwrap().0,
822            AmazonS3ConfigKey::SecretAccessKey
823        );
824        assert_eq!(aws_keys.len(), 1);
825
826        let aws_config = [
827            ("AWS_SECRET_ACCESS_KEY", "a_key"),
828            ("aws_s3_allow_unsafe_rename", "true"),
829        ]
830        .into_iter()
831        .collect::<HashMap<_, _>>();
832        let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
833            .expect("Parsing keys shouldn't have thrown an error");
834
835        assert_eq!(
836            aws_keys.first().unwrap().0,
837            AmazonS3ConfigKey::SecretAccessKey
838        );
839        assert_eq!(aws_keys.len(), 1);
840    }
841}