polars_io/cloud/
options.rs

1#[cfg(feature = "aws")]
2use std::io::Read;
3#[cfg(feature = "aws")]
4use std::path::Path;
5use std::str::FromStr;
6use std::sync::LazyLock;
7
8#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
9use object_store::ClientOptions;
10#[cfg(feature = "aws")]
11use object_store::aws::AmazonS3Builder;
12#[cfg(feature = "aws")]
13pub use object_store::aws::AmazonS3ConfigKey;
14#[cfg(feature = "azure")]
15pub use object_store::azure::AzureConfigKey;
16#[cfg(feature = "azure")]
17use object_store::azure::MicrosoftAzureBuilder;
18#[cfg(feature = "gcp")]
19use object_store::gcp::GoogleCloudStorageBuilder;
20#[cfg(feature = "gcp")]
21pub use object_store::gcp::GoogleConfigKey;
22use polars_error::*;
23#[cfg(feature = "aws")]
24use polars_utils::cache::LruCache;
25use polars_utils::pl_path::{CloudScheme, PlRefPath};
26use polars_utils::total_ord::TotalOrdWrap;
27#[cfg(feature = "http")]
28use reqwest::header::HeaderMap;
29#[cfg(feature = "serde")]
30use serde::{Deserialize, Serialize};
31
32#[cfg(feature = "cloud")]
33use super::credential_provider::PlCredentialProvider;
34#[cfg(feature = "file_cache")]
35use crate::file_cache::get_env_file_cache_ttl;
36#[cfg(feature = "aws")]
37use crate::pl_async::with_concurrency_budget;
38
39#[cfg(feature = "aws")]
40static BUCKET_REGION: LazyLock<
41    std::sync::Mutex<LruCache<polars_utils::pl_str::PlSmallStr, polars_utils::pl_str::PlSmallStr>>,
42> = LazyLock::new(|| std::sync::Mutex::new(LruCache::with_capacity(32)));
43
44/// The type of the config keys must satisfy the following requirements:
45/// 1. must be easily collected into a HashMap, the type required by the object_crate API.
46/// 2. be Serializable, required when the serde-lazy feature is defined.
47/// 3. not actually use HashMap since that type is disallowed in Polars for performance reasons.
48///
49/// Currently this type is a vector of pairs config key - config value.
50#[allow(dead_code)]
51type Configs<T> = Vec<(T, String)>;
52
53#[derive(Clone, Debug, PartialEq, Hash, Eq)]
54#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
55#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
56pub(crate) enum CloudConfig {
57    #[cfg(feature = "aws")]
58    Aws(
59        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
60        Configs<AmazonS3ConfigKey>,
61    ),
62    #[cfg(feature = "azure")]
63    Azure(
64        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
65        Configs<AzureConfigKey>,
66    ),
67    #[cfg(feature = "gcp")]
68    Gcp(
69        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
70        Configs<GoogleConfigKey>,
71    ),
72    #[cfg(feature = "http")]
73    Http { headers: Vec<(String, String)> },
74}
75
76#[derive(Clone, Debug, PartialEq, Hash, Eq)]
77#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
78#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
79/// Options to connect to various cloud providers.
80pub struct CloudOptions {
81    pub max_retries: usize, // TODO: Remove in a breaking DSL change
82    #[cfg(feature = "file_cache")]
83    pub file_cache_ttl: u64,
84    pub(crate) config: Option<CloudConfig>,
85    #[cfg_attr(feature = "serde", serde(default))]
86    pub retry_config: CloudRetryConfig,
87    #[cfg(feature = "cloud")]
88    /// Note: In most cases you will want to access this via [`CloudOptions::initialized_credential_provider`]
89    /// rather than directly.
90    pub(crate) credential_provider: Option<PlCredentialProvider>,
91}
92
93impl Default for CloudOptions {
94    fn default() -> Self {
95        Self::default_static_ref().clone()
96    }
97}
98
99impl CloudOptions {
100    pub fn default_static_ref() -> &'static Self {
101        static DEFAULT: LazyLock<CloudOptions> = LazyLock::new(|| CloudOptions {
102            max_retries: 2,
103            #[cfg(feature = "file_cache")]
104            file_cache_ttl: get_env_file_cache_ttl(),
105            config: None,
106            retry_config: CloudRetryConfig::default(),
107            #[cfg(feature = "cloud")]
108            credential_provider: None,
109        });
110
111        &DEFAULT
112    }
113}
114
115#[derive(Clone, Copy, Debug, PartialEq, Hash, Eq)]
116#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
117#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
118pub struct CloudRetryConfig {
119    pub max_retries: usize,
120    pub retry_timeout: std::time::Duration,
121    pub retry_init_backoff: std::time::Duration,
122    pub retry_max_backoff: std::time::Duration,
123    pub retry_base_multiplier: TotalOrdWrap<f64>,
124}
125
126impl Default for CloudRetryConfig {
127    fn default() -> Self {
128        use std::time::Duration;
129
130        return Self {
131            max_retries: parse_env_var(2, "POLARS_CLOUD_MAX_RETRIES"),
132            retry_timeout: Duration::from_millis(parse_env_var(
133                10 * 1000,
134                "POLARS_CLOUD_RETRY_TIMEOUT_MS",
135            )),
136            retry_init_backoff: Duration::from_millis(parse_env_var(
137                100,
138                "POLARS_CLOUD_RETRY_INIT_BACKOFF_MS",
139            )),
140            retry_max_backoff: Duration::from_millis(parse_env_var(
141                15 * 1000,
142                "POLARS_CLOUD_RETRY_MAX_BACKOFF_MS",
143            )),
144            retry_base_multiplier: TotalOrdWrap(parse_env_var(
145                2.,
146                "POLARS_CLOUD_RETRY_BASE_MULTIPLIER",
147            )),
148        };
149
150        fn parse_env_var<T: FromStr>(default: T, name: &'static str) -> T {
151            std::env::var(name).map_or(default, |x| {
152                x.parse::<T>()
153                    .ok()
154                    .unwrap_or_else(|| panic!("invalid value for {name}: {x}"))
155            })
156        }
157    }
158}
159
160#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
161impl From<CloudRetryConfig> for object_store::RetryConfig {
162    fn from(value: CloudRetryConfig) -> Self {
163        object_store::RetryConfig {
164            backoff: object_store::BackoffConfig {
165                init_backoff: value.retry_init_backoff,
166                max_backoff: value.retry_max_backoff,
167                base: value.retry_base_multiplier.0,
168            },
169            max_retries: value.max_retries,
170            retry_timeout: value.retry_timeout,
171        }
172    }
173}
174
175#[cfg(feature = "http")]
176pub(crate) fn try_build_http_header_map_from_items_slice<S: AsRef<str>>(
177    headers: &[(S, S)],
178) -> PolarsResult<HeaderMap> {
179    use reqwest::header::{HeaderName, HeaderValue};
180
181    let mut map = HeaderMap::with_capacity(headers.len());
182    for (k, v) in headers {
183        let (k, v) = (k.as_ref(), v.as_ref());
184        map.insert(
185            HeaderName::from_str(k).map_err(to_compute_err)?,
186            HeaderValue::from_str(v).map_err(to_compute_err)?,
187        );
188    }
189
190    Ok(map)
191}
192
193#[allow(dead_code)]
194/// Parse an untype configuration hashmap to a typed configuration for the given configuration key type.
195fn parse_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
196    config: I,
197) -> PolarsResult<Configs<T>>
198where
199    T: FromStr + Eq + std::hash::Hash,
200{
201    Ok(config
202        .into_iter()
203        // Silently ignores custom upstream storage_options
204        .filter_map(|(key, val)| {
205            T::from_str(key.as_ref().to_ascii_lowercase().as_str())
206                .ok()
207                .map(|typed_key| (typed_key, val.into()))
208        })
209        .collect::<Configs<T>>())
210}
211
212#[derive(Debug, Clone, PartialEq)]
213pub enum CloudType {
214    Aws,
215    Azure,
216    /// URI with 'file:' scheme
217    File,
218    /// Google cloud platform
219    Gcp,
220    Http,
221    /// HuggingFace
222    Hf,
223}
224
225impl CloudType {
226    pub fn from_cloud_scheme(scheme: CloudScheme) -> Self {
227        match scheme {
228            CloudScheme::Abfs
229            | CloudScheme::Abfss
230            | CloudScheme::Adl
231            | CloudScheme::Az
232            | CloudScheme::Azure => Self::Azure,
233
234            CloudScheme::File | CloudScheme::FileNoHostname => Self::File,
235
236            CloudScheme::Gcs | CloudScheme::Gs => Self::Gcp,
237
238            CloudScheme::Hf => Self::Hf,
239
240            CloudScheme::Http | CloudScheme::Https => Self::Http,
241
242            CloudScheme::S3 | CloudScheme::S3a => Self::Aws,
243        }
244    }
245}
246
247pub static USER_AGENT: &str = concat!("polars", "/", env!("CARGO_PKG_VERSION"),);
248
249#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
250pub(super) fn get_client_options() -> ClientOptions {
251    use std::num::NonZeroU64;
252
253    use reqwest::header::HeaderValue;
254
255    ClientOptions::new()
256        // Disables the time limit for downloading the response body.
257        .with_timeout_disabled()
258        // Set the time limit for establishing the connection.
259        .with_connect_timeout(std::time::Duration::from_secs(
260            std::env::var("POLARS_HTTP_CONNECT_TIMEOUT_SECONDS")
261                .map(|x| {
262                    x.parse::<NonZeroU64>()
263                        .ok()
264                        .unwrap_or_else(|| {
265                            panic!("invalid value for POLARS_HTTP_CONNECT_TIMEOUT_SECONDS: {x}")
266                        })
267                        .get()
268                })
269                .unwrap_or(5 * 60),
270        ))
271        .with_user_agent(HeaderValue::from_static(USER_AGENT))
272        .with_allow_http(true)
273}
274
275#[cfg(feature = "aws")]
276fn read_config(
277    builder: &mut AmazonS3Builder,
278    items: &[(&Path, &[(&str, AmazonS3ConfigKey)])],
279) -> Option<()> {
280    use crate::path_utils::resolve_homedir;
281
282    for (path, keys) in items {
283        if keys
284            .iter()
285            .all(|(_, key)| builder.get_config_value(key).is_some())
286        {
287            continue;
288        }
289
290        let mut config = std::fs::File::open(resolve_homedir(path)).ok()?;
291        let mut buf = vec![];
292        config.read_to_end(&mut buf).ok()?;
293        let content = std::str::from_utf8(buf.as_ref()).ok()?;
294
295        for (pattern, key) in keys.iter() {
296            if builder.get_config_value(key).is_none() {
297                let reg = polars_utils::regex_cache::compile_regex(pattern).unwrap();
298                let cap = reg.captures(content)?;
299                let m = cap.get(1)?;
300                let parsed = m.as_str();
301                *builder = std::mem::take(builder).with_config(*key, parsed);
302            }
303        }
304    }
305    Some(())
306}
307
308impl CloudOptions {
309    pub fn with_retry_config(mut self, retry_config: CloudRetryConfig) -> Self {
310        self.max_retries = retry_config.max_retries;
311        self.retry_config = retry_config;
312        self
313    }
314
315    #[cfg(feature = "cloud")]
316    pub fn with_credential_provider(
317        mut self,
318        credential_provider: Option<PlCredentialProvider>,
319    ) -> Self {
320        self.credential_provider = credential_provider;
321        self
322    }
323
324    /// Set the configuration for AWS connections. This is the preferred API from rust.
325    #[cfg(feature = "aws")]
326    pub fn with_aws<I: IntoIterator<Item = (AmazonS3ConfigKey, impl Into<String>)>>(
327        mut self,
328        configs: I,
329    ) -> Self {
330        self.config = Some(CloudConfig::Aws(
331            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
332        ));
333        self
334    }
335
336    /// Build the [`object_store::ObjectStore`] implementation for AWS.
337    #[cfg(feature = "aws")]
338    pub async fn build_aws(
339        &self,
340        url: PlRefPath,
341        clear_cached_credentials: bool,
342    ) -> PolarsResult<impl object_store::ObjectStore> {
343        use super::credential_provider::IntoCredentialProvider;
344
345        let opt_credential_provider =
346            self.initialized_credential_provider(clear_cached_credentials)?;
347
348        let mut builder = AmazonS3Builder::from_env()
349            .with_client_options(get_client_options())
350            .with_url(url.to_string());
351
352        if let Some(credential_provider) = &opt_credential_provider {
353            let storage_update_options = parse_untyped_config::<AmazonS3ConfigKey, _>(
354                credential_provider
355                    .storage_update_options()?
356                    .into_iter()
357                    .map(|(k, v)| (k, v.to_string())),
358            )?;
359
360            for (key, value) in storage_update_options {
361                builder = builder.with_config(key, value);
362            }
363        }
364
365        read_config(
366            &mut builder,
367            &[(
368                Path::new("~/.aws/config"),
369                &[("region\\s*=\\s*([^\r\n]*)", AmazonS3ConfigKey::Region)],
370            )],
371        );
372
373        read_config(
374            &mut builder,
375            &[(
376                Path::new("~/.aws/credentials"),
377                &[
378                    (
379                        "aws_access_key_id\\s*=\\s*([^\\r\\n]*)",
380                        AmazonS3ConfigKey::AccessKeyId,
381                    ),
382                    (
383                        "aws_secret_access_key\\s*=\\s*([^\\r\\n]*)",
384                        AmazonS3ConfigKey::SecretAccessKey,
385                    ),
386                    (
387                        "aws_session_token\\s*=\\s*([^\\r\\n]*)",
388                        AmazonS3ConfigKey::Token,
389                    ),
390                ],
391            )],
392        );
393
394        if let Some(options) = &self.config {
395            let CloudConfig::Aws(options) = options else {
396                panic!("impl error: cloud type mismatch")
397            };
398            for (key, value) in options {
399                builder = builder.with_config(*key, value);
400            }
401        }
402
403        if builder
404            .get_config_value(&AmazonS3ConfigKey::DefaultRegion)
405            .is_none()
406            && builder
407                .get_config_value(&AmazonS3ConfigKey::Region)
408                .is_none()
409        {
410            let bucket = crate::cloud::CloudLocation::new(url, false)?.bucket;
411            let region = {
412                let mut bucket_region = BUCKET_REGION.lock().unwrap();
413                bucket_region.get(bucket.as_str()).cloned()
414            };
415
416            match region {
417                Some(region) => {
418                    builder = builder.with_config(AmazonS3ConfigKey::Region, region.as_str())
419                },
420                None => {
421                    if builder
422                        .get_config_value(&AmazonS3ConfigKey::Endpoint)
423                        .is_some()
424                    {
425                        // Set a default value if the endpoint is not aws.
426                        // See: #13042
427                        builder = builder.with_config(AmazonS3ConfigKey::Region, "us-east-1");
428                    } else {
429                        polars_warn!(
430                            "'(default_)region' not set; polars will try to get it from bucket\n\nSet the region manually to silence this warning."
431                        );
432                        let result = with_concurrency_budget(1, || async {
433                            reqwest::Client::builder()
434                                .user_agent(USER_AGENT)
435                                .build()
436                                .unwrap()
437                                .head(format!("https://{bucket}.s3.amazonaws.com"))
438                                .send()
439                                .await
440                                .map_err(to_compute_err)
441                        })
442                        .await?;
443                        if let Some(region) = result.headers().get("x-amz-bucket-region") {
444                            let region =
445                                std::str::from_utf8(region.as_bytes()).map_err(to_compute_err)?;
446                            let mut bucket_region = BUCKET_REGION.lock().unwrap();
447                            bucket_region.insert(bucket, region.into());
448                            builder = builder.with_config(AmazonS3ConfigKey::Region, region)
449                        }
450                    }
451                },
452            };
453        };
454
455        let builder = builder.with_retry(self.retry_config.into());
456
457        let opt_credential_provider = match opt_credential_provider {
458            #[cfg(feature = "python")]
459            Some(PlCredentialProvider::Python(object)) => {
460                if pyo3::Python::attach(|py| {
461                    let Ok(func_object) = object
462                        .unwrap_as_provider_ref()
463                        .getattr(py, "_can_use_as_provider")
464                    else {
465                        return PolarsResult::Ok(true);
466                    };
467
468                    Ok(func_object.call0(py)?.extract::<bool>(py).unwrap())
469                })? {
470                    Some(PlCredentialProvider::Python(object))
471                } else {
472                    None
473                }
474            },
475
476            v => v,
477        };
478
479        let builder = if let Some(credential_provider) = opt_credential_provider {
480            builder.with_credentials(credential_provider.into_aws_provider())
481        } else {
482            builder
483        };
484
485        let out = builder.build()?;
486
487        Ok(out)
488    }
489
490    /// Set the configuration for Azure connections. This is the preferred API from rust.
491    #[cfg(feature = "azure")]
492    pub fn with_azure<I: IntoIterator<Item = (AzureConfigKey, impl Into<String>)>>(
493        mut self,
494        configs: I,
495    ) -> Self {
496        self.config = Some(CloudConfig::Azure(
497            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
498        ));
499        self
500    }
501
502    /// Build the [`object_store::ObjectStore`] implementation for Azure.
503    #[cfg(feature = "azure")]
504    pub fn build_azure(
505        &self,
506        url: PlRefPath,
507        clear_cached_credentials: bool,
508    ) -> PolarsResult<impl object_store::ObjectStore> {
509        use super::credential_provider::IntoCredentialProvider;
510
511        let verbose = polars_core::config::verbose();
512
513        // The credential provider `self.credentials` is prioritized if it is set. We also need
514        // `from_env()` as it may source environment configured storage account name.
515        let mut builder =
516            MicrosoftAzureBuilder::from_env().with_client_options(get_client_options());
517
518        if let Some(options) = &self.config {
519            let CloudConfig::Azure(options) = options else {
520                panic!("impl error: cloud type mismatch")
521            };
522            for (key, value) in options.iter() {
523                builder = builder.with_config(*key, value);
524            }
525        }
526
527        let builder = builder
528            .with_url(url.to_string())
529            .with_retry(self.retry_config.into());
530
531        let builder =
532            if let Some(v) = self.initialized_credential_provider(clear_cached_credentials)? {
533                if verbose {
534                    eprintln!(
535                        "[CloudOptions::build_azure]: Using credential provider {:?}",
536                        &v
537                    );
538                }
539                builder.with_credentials(v.into_azure_provider())
540            } else {
541                builder
542            };
543
544        let out = builder.build()?;
545
546        Ok(out)
547    }
548
549    /// Set the configuration for GCP connections. This is the preferred API from rust.
550    #[cfg(feature = "gcp")]
551    pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
552        mut self,
553        configs: I,
554    ) -> Self {
555        self.config = Some(CloudConfig::Gcp(
556            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
557        ));
558        self
559    }
560
561    /// Build the [`object_store::ObjectStore`] implementation for GCP.
562    #[cfg(feature = "gcp")]
563    pub fn build_gcp(
564        &self,
565        url: PlRefPath,
566        clear_cached_credentials: bool,
567    ) -> PolarsResult<impl object_store::ObjectStore> {
568        use super::credential_provider::IntoCredentialProvider;
569
570        let credential_provider = self.initialized_credential_provider(clear_cached_credentials)?;
571
572        let builder = if credential_provider.is_none() {
573            GoogleCloudStorageBuilder::from_env()
574        } else {
575            GoogleCloudStorageBuilder::new()
576        };
577
578        let mut builder = builder.with_client_options(get_client_options());
579
580        if let Some(options) = &self.config {
581            let CloudConfig::Gcp(options) = options else {
582                panic!("impl error: cloud type mismatch")
583            };
584            for (key, value) in options.iter() {
585                builder = builder.with_config(*key, value);
586            }
587        }
588
589        let builder = builder
590            .with_url(url.to_string())
591            .with_retry(self.retry_config.into());
592
593        let builder = if let Some(v) = credential_provider {
594            builder.with_credentials(v.into_gcp_provider())
595        } else {
596            builder
597        };
598
599        let out = builder.build()?;
600
601        Ok(out)
602    }
603
604    #[cfg(feature = "http")]
605    pub fn build_http(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
606        let out = object_store::http::HttpBuilder::new()
607            .with_url(url.to_string())
608            .with_client_options({
609                let mut opts = super::get_client_options();
610                if let Some(CloudConfig::Http { headers }) = &self.config {
611                    opts = opts.with_default_headers(try_build_http_header_map_from_items_slice(
612                        headers.as_slice(),
613                    )?);
614                }
615                opts
616            })
617            .build()?;
618
619        Ok(out)
620    }
621
622    /// Parse a configuration from a Hashmap. This is the interface from Python.
623    #[allow(unused_variables)]
624    pub fn from_untyped_config<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
625        scheme: Option<CloudScheme>,
626        config: I,
627    ) -> PolarsResult<Self> {
628        match scheme.map_or(CloudType::File, CloudType::from_cloud_scheme) {
629            CloudType::Aws => {
630                #[cfg(feature = "aws")]
631                {
632                    parse_untyped_config::<AmazonS3ConfigKey, _>(config)
633                        .map(|aws| Self::default().with_aws(aws))
634                }
635                #[cfg(not(feature = "aws"))]
636                {
637                    polars_bail!(ComputeError: "'aws' feature is not enabled");
638                }
639            },
640            CloudType::Azure => {
641                #[cfg(feature = "azure")]
642                {
643                    parse_untyped_config::<AzureConfigKey, _>(config)
644                        .map(|azure| Self::default().with_azure(azure))
645                }
646                #[cfg(not(feature = "azure"))]
647                {
648                    polars_bail!(ComputeError: "'azure' feature is not enabled");
649                }
650            },
651            CloudType::File => Ok(Self::default()),
652            CloudType::Http => Ok(Self::default()),
653            CloudType::Gcp => {
654                #[cfg(feature = "gcp")]
655                {
656                    parse_untyped_config::<GoogleConfigKey, _>(config)
657                        .map(|gcp| Self::default().with_gcp(gcp))
658                }
659                #[cfg(not(feature = "gcp"))]
660                {
661                    polars_bail!(ComputeError: "'gcp' feature is not enabled");
662                }
663            },
664            CloudType::Hf => {
665                #[cfg(feature = "http")]
666                {
667                    use polars_core::config;
668
669                    use crate::path_utils::resolve_homedir;
670
671                    let mut this = Self::default();
672                    let mut token = None;
673                    let verbose = config::verbose();
674
675                    for (i, (k, v)) in config.into_iter().enumerate() {
676                        let (k, v) = (k.as_ref(), v.into());
677
678                        if i == 0 && k == "token" {
679                            if verbose {
680                                eprintln!("HF token sourced from storage_options");
681                            }
682                            token = Some(v);
683                        } else {
684                            polars_bail!(ComputeError: "unknown configuration key for HF: {}", k)
685                        }
686                    }
687
688                    token = token
689                        .or_else(|| {
690                            let v = std::env::var("HF_TOKEN").ok();
691                            if v.is_some() && verbose {
692                                eprintln!("HF token sourced from HF_TOKEN env var");
693                            }
694                            v
695                        })
696                        .or_else(|| {
697                            let hf_home = std::env::var("HF_HOME");
698                            let hf_home = hf_home.as_deref();
699                            let hf_home = hf_home.unwrap_or("~/.cache/huggingface");
700                            let hf_home = resolve_homedir(hf_home);
701                            let cached_token_path = hf_home.join("token");
702
703                            let v = std::string::String::from_utf8(
704                                std::fs::read(&cached_token_path).ok()?,
705                            )
706                            .ok()
707                            .filter(|x| !x.is_empty());
708
709                            if v.is_some() && verbose {
710                                eprintln!("HF token sourced from {:?}", cached_token_path);
711                            }
712
713                            v
714                        });
715
716                    if let Some(v) = token {
717                        this.config = Some(CloudConfig::Http {
718                            headers: vec![("Authorization".into(), format!("Bearer {v}"))],
719                        })
720                    }
721
722                    Ok(this)
723                }
724                #[cfg(not(feature = "http"))]
725                {
726                    polars_bail!(ComputeError: "'http' feature is not enabled");
727                }
728            },
729        }
730    }
731
732    /// Python passes a credential provider builder that needs to be called to get the actual credential
733    /// provider.
734    #[cfg(feature = "cloud")]
735    fn initialized_credential_provider(
736        &self,
737        clear_cached_credentials: bool,
738    ) -> PolarsResult<Option<PlCredentialProvider>> {
739        if let Some(v) = self.credential_provider.clone() {
740            v.try_into_initialized(clear_cached_credentials)
741        } else {
742            Ok(None)
743        }
744    }
745}
746
747#[cfg(feature = "cloud")]
748#[cfg(test)]
749mod tests {
750    use hashbrown::HashMap;
751
752    use super::parse_untyped_config;
753
754    #[cfg(feature = "aws")]
755    #[test]
756    fn test_parse_untyped_config() {
757        use object_store::aws::AmazonS3ConfigKey;
758
759        let aws_config = [
760            ("aws_secret_access_key", "a_key"),
761            ("aws_s3_allow_unsafe_rename", "true"),
762        ]
763        .into_iter()
764        .collect::<HashMap<_, _>>();
765        let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
766            .expect("Parsing keys shouldn't have thrown an error");
767
768        assert_eq!(
769            aws_keys.first().unwrap().0,
770            AmazonS3ConfigKey::SecretAccessKey
771        );
772        assert_eq!(aws_keys.len(), 1);
773
774        let aws_config = [
775            ("AWS_SECRET_ACCESS_KEY", "a_key"),
776            ("aws_s3_allow_unsafe_rename", "true"),
777        ]
778        .into_iter()
779        .collect::<HashMap<_, _>>();
780        let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
781            .expect("Parsing keys shouldn't have thrown an error");
782
783        assert_eq!(
784            aws_keys.first().unwrap().0,
785            AmazonS3ConfigKey::SecretAccessKey
786        );
787        assert_eq!(aws_keys.len(), 1);
788    }
789}