1#[cfg(feature = "aws")]
2use std::io::Read;
3#[cfg(feature = "aws")]
4use std::path::Path;
5use std::str::FromStr;
6use std::sync::LazyLock;
7
8#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
9use object_store::ClientOptions;
10#[cfg(feature = "aws")]
11use object_store::aws::AmazonS3Builder;
12#[cfg(feature = "aws")]
13pub use object_store::aws::AmazonS3ConfigKey;
14#[cfg(feature = "azure")]
15pub use object_store::azure::AzureConfigKey;
16#[cfg(feature = "azure")]
17use object_store::azure::MicrosoftAzureBuilder;
18#[cfg(feature = "gcp")]
19use object_store::gcp::GoogleCloudStorageBuilder;
20#[cfg(feature = "gcp")]
21pub use object_store::gcp::GoogleConfigKey;
22#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
23use object_store::{BackoffConfig, RetryConfig};
24use polars_error::*;
25#[cfg(feature = "aws")]
26use polars_utils::cache::LruCache;
27use polars_utils::plpath::{CloudScheme, PlPathRef};
28#[cfg(feature = "http")]
29use reqwest::header::HeaderMap;
30#[cfg(feature = "serde")]
31use serde::{Deserialize, Serialize};
32
33#[cfg(feature = "cloud")]
34use super::credential_provider::PlCredentialProvider;
35#[cfg(feature = "file_cache")]
36use crate::file_cache::get_env_file_cache_ttl;
37#[cfg(feature = "aws")]
38use crate::pl_async::with_concurrency_budget;
39
40#[cfg(feature = "aws")]
41static BUCKET_REGION: LazyLock<
42 std::sync::Mutex<LruCache<polars_utils::pl_str::PlSmallStr, polars_utils::pl_str::PlSmallStr>>,
43> = LazyLock::new(|| std::sync::Mutex::new(LruCache::with_capacity(32)));
44
45#[allow(dead_code)]
52type Configs<T> = Vec<(T, String)>;
53
54#[derive(Clone, Debug, PartialEq, Hash, Eq)]
55#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
56#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
57pub(crate) enum CloudConfig {
58 #[cfg(feature = "aws")]
59 Aws(
60 #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
61 Configs<AmazonS3ConfigKey>,
62 ),
63 #[cfg(feature = "azure")]
64 Azure(
65 #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
66 Configs<AzureConfigKey>,
67 ),
68 #[cfg(feature = "gcp")]
69 Gcp(
70 #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
71 Configs<GoogleConfigKey>,
72 ),
73 #[cfg(feature = "http")]
74 Http { headers: Vec<(String, String)> },
75}
76
77#[derive(Clone, Debug, PartialEq, Hash, Eq)]
78#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
79#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
80pub struct CloudOptions {
82 pub max_retries: usize,
83 #[cfg(feature = "file_cache")]
84 pub file_cache_ttl: u64,
85 pub(crate) config: Option<CloudConfig>,
86 #[cfg(feature = "cloud")]
87 pub(crate) credential_provider: Option<PlCredentialProvider>,
90}
91
92impl Default for CloudOptions {
93 fn default() -> Self {
94 Self::default_static_ref().clone()
95 }
96}
97
98impl CloudOptions {
99 pub fn default_static_ref() -> &'static Self {
100 static DEFAULT: LazyLock<CloudOptions> = LazyLock::new(|| CloudOptions {
101 max_retries: 2,
102 #[cfg(feature = "file_cache")]
103 file_cache_ttl: get_env_file_cache_ttl(),
104 config: None,
105 #[cfg(feature = "cloud")]
106 credential_provider: None,
107 });
108
109 &DEFAULT
110 }
111}
112
113#[cfg(feature = "http")]
114pub(crate) fn try_build_http_header_map_from_items_slice<S: AsRef<str>>(
115 headers: &[(S, S)],
116) -> PolarsResult<HeaderMap> {
117 use reqwest::header::{HeaderName, HeaderValue};
118
119 let mut map = HeaderMap::with_capacity(headers.len());
120 for (k, v) in headers {
121 let (k, v) = (k.as_ref(), v.as_ref());
122 map.insert(
123 HeaderName::from_str(k).map_err(to_compute_err)?,
124 HeaderValue::from_str(v).map_err(to_compute_err)?,
125 );
126 }
127
128 Ok(map)
129}
130
131#[allow(dead_code)]
132fn parse_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
134 config: I,
135) -> PolarsResult<Configs<T>>
136where
137 T: FromStr + Eq + std::hash::Hash,
138{
139 Ok(config
140 .into_iter()
141 .filter_map(|(key, val)| {
143 T::from_str(key.as_ref().to_ascii_lowercase().as_str())
144 .ok()
145 .map(|typed_key| (typed_key, val.into()))
146 })
147 .collect::<Configs<T>>())
148}
149
150#[derive(Debug, Clone, PartialEq)]
151pub enum CloudType {
152 Aws,
153 Azure,
154 File,
155 Gcp,
157 Http,
158 Hf,
160}
161
162impl CloudType {
163 pub fn from_cloud_scheme(scheme: &CloudScheme) -> Self {
164 match scheme {
165 CloudScheme::Abfs
166 | CloudScheme::Abfss
167 | CloudScheme::Adl
168 | CloudScheme::Az
169 | CloudScheme::Azure => Self::Azure,
170
171 CloudScheme::File | CloudScheme::FileNoHostname => Self::File,
172
173 CloudScheme::Gcs | CloudScheme::Gs => Self::Gcp,
174
175 CloudScheme::Hf => Self::Hf,
176
177 CloudScheme::Http | CloudScheme::Https => Self::Http,
178
179 CloudScheme::S3 | CloudScheme::S3a => Self::Aws,
180 }
181 }
182}
183
184#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
185fn get_retry_config(max_retries: usize) -> RetryConfig {
186 RetryConfig {
187 backoff: BackoffConfig::default(),
188 max_retries,
189 retry_timeout: std::time::Duration::from_secs(10),
190 }
191}
192
193pub static USER_AGENT: &str = concat!("polars", "/", env!("CARGO_PKG_VERSION"),);
194
195#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
196pub(super) fn get_client_options() -> ClientOptions {
197 use reqwest::header::HeaderValue;
198
199 ClientOptions::new()
200 .with_timeout_disabled()
204 .with_connect_timeout_disabled()
206 .with_user_agent(HeaderValue::from_static(USER_AGENT))
207 .with_allow_http(true)
208}
209
210#[cfg(feature = "aws")]
211fn read_config(
212 builder: &mut AmazonS3Builder,
213 items: &[(&Path, &[(&str, AmazonS3ConfigKey)])],
214) -> Option<()> {
215 use crate::path_utils::resolve_homedir;
216
217 for (path, keys) in items {
218 if keys
219 .iter()
220 .all(|(_, key)| builder.get_config_value(key).is_some())
221 {
222 continue;
223 }
224
225 let mut config = std::fs::File::open(resolve_homedir(path)).ok()?;
226 let mut buf = vec![];
227 config.read_to_end(&mut buf).ok()?;
228 let content = std::str::from_utf8(buf.as_ref()).ok()?;
229
230 for (pattern, key) in keys.iter() {
231 if builder.get_config_value(key).is_none() {
232 let reg = polars_utils::regex_cache::compile_regex(pattern).unwrap();
233 let cap = reg.captures(content)?;
234 let m = cap.get(1)?;
235 let parsed = m.as_str();
236 *builder = std::mem::take(builder).with_config(*key, parsed);
237 }
238 }
239 }
240 Some(())
241}
242
243impl CloudOptions {
244 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
246 self.max_retries = max_retries;
247 self
248 }
249
250 #[cfg(feature = "cloud")]
251 pub fn with_credential_provider(
252 mut self,
253 credential_provider: Option<PlCredentialProvider>,
254 ) -> Self {
255 self.credential_provider = credential_provider;
256 self
257 }
258
259 #[cfg(feature = "aws")]
261 pub fn with_aws<I: IntoIterator<Item = (AmazonS3ConfigKey, impl Into<String>)>>(
262 mut self,
263 configs: I,
264 ) -> Self {
265 self.config = Some(CloudConfig::Aws(
266 configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
267 ));
268 self
269 }
270
271 #[cfg(feature = "aws")]
273 pub async fn build_aws(
274 &self,
275 url: &str,
276 clear_cached_credentials: bool,
277 ) -> PolarsResult<impl object_store::ObjectStore> {
278 use super::credential_provider::IntoCredentialProvider;
279
280 let opt_credential_provider =
281 self.initialized_credential_provider(clear_cached_credentials)?;
282
283 let mut builder = AmazonS3Builder::from_env()
284 .with_client_options(get_client_options())
285 .with_url(url);
286
287 if let Some(credential_provider) = &opt_credential_provider {
288 let storage_update_options = parse_untyped_config::<AmazonS3ConfigKey, _>(
289 credential_provider
290 .storage_update_options()?
291 .into_iter()
292 .map(|(k, v)| (k, v.to_string())),
293 )?;
294
295 for (key, value) in storage_update_options {
296 builder = builder.with_config(key, value);
297 }
298 }
299
300 read_config(
301 &mut builder,
302 &[(
303 Path::new("~/.aws/config"),
304 &[("region\\s*=\\s*([^\r\n]*)", AmazonS3ConfigKey::Region)],
305 )],
306 );
307
308 read_config(
309 &mut builder,
310 &[(
311 Path::new("~/.aws/credentials"),
312 &[
313 (
314 "aws_access_key_id\\s*=\\s*([^\\r\\n]*)",
315 AmazonS3ConfigKey::AccessKeyId,
316 ),
317 (
318 "aws_secret_access_key\\s*=\\s*([^\\r\\n]*)",
319 AmazonS3ConfigKey::SecretAccessKey,
320 ),
321 (
322 "aws_session_token\\s*=\\s*([^\\r\\n]*)",
323 AmazonS3ConfigKey::Token,
324 ),
325 ],
326 )],
327 );
328
329 if let Some(options) = &self.config {
330 let CloudConfig::Aws(options) = options else {
331 panic!("impl error: cloud type mismatch")
332 };
333 for (key, value) in options {
334 builder = builder.with_config(*key, value);
335 }
336 }
337
338 if builder
339 .get_config_value(&AmazonS3ConfigKey::DefaultRegion)
340 .is_none()
341 && builder
342 .get_config_value(&AmazonS3ConfigKey::Region)
343 .is_none()
344 {
345 let bucket = crate::cloud::CloudLocation::new(PlPathRef::new(url), false)?.bucket;
346 let region = {
347 let mut bucket_region = BUCKET_REGION.lock().unwrap();
348 bucket_region.get(bucket.as_str()).cloned()
349 };
350
351 match region {
352 Some(region) => {
353 builder = builder.with_config(AmazonS3ConfigKey::Region, region.as_str())
354 },
355 None => {
356 if builder
357 .get_config_value(&AmazonS3ConfigKey::Endpoint)
358 .is_some()
359 {
360 builder = builder.with_config(AmazonS3ConfigKey::Region, "us-east-1");
363 } else {
364 polars_warn!(
365 "'(default_)region' not set; polars will try to get it from bucket\n\nSet the region manually to silence this warning."
366 );
367 let result = with_concurrency_budget(1, || async {
368 reqwest::Client::builder()
369 .user_agent(USER_AGENT)
370 .build()
371 .unwrap()
372 .head(format!("https://{bucket}.s3.amazonaws.com"))
373 .send()
374 .await
375 .map_err(to_compute_err)
376 })
377 .await?;
378 if let Some(region) = result.headers().get("x-amz-bucket-region") {
379 let region =
380 std::str::from_utf8(region.as_bytes()).map_err(to_compute_err)?;
381 let mut bucket_region = BUCKET_REGION.lock().unwrap();
382 bucket_region.insert(bucket, region.into());
383 builder = builder.with_config(AmazonS3ConfigKey::Region, region)
384 }
385 }
386 },
387 };
388 };
389
390 let builder = builder.with_retry(get_retry_config(self.max_retries));
391
392 let opt_credential_provider = match opt_credential_provider {
393 #[cfg(feature = "python")]
394 Some(PlCredentialProvider::Python(object)) => {
395 if pyo3::Python::attach(|py| {
396 let Ok(func_object) = object
397 .unwrap_as_provider_ref()
398 .getattr(py, "_can_use_as_provider")
399 else {
400 return PolarsResult::Ok(true);
401 };
402
403 Ok(func_object.call0(py)?.extract::<bool>(py).unwrap())
404 })? {
405 Some(PlCredentialProvider::Python(object))
406 } else {
407 None
408 }
409 },
410
411 v => v,
412 };
413
414 let builder = if let Some(credential_provider) = opt_credential_provider {
415 builder.with_credentials(credential_provider.into_aws_provider())
416 } else {
417 builder
418 };
419
420 let out = builder.build()?;
421
422 Ok(out)
423 }
424
425 #[cfg(feature = "azure")]
427 pub fn with_azure<I: IntoIterator<Item = (AzureConfigKey, impl Into<String>)>>(
428 mut self,
429 configs: I,
430 ) -> Self {
431 self.config = Some(CloudConfig::Azure(
432 configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
433 ));
434 self
435 }
436
437 #[cfg(feature = "azure")]
439 pub fn build_azure(
440 &self,
441 url: &str,
442 clear_cached_credentials: bool,
443 ) -> PolarsResult<impl object_store::ObjectStore> {
444 use super::credential_provider::IntoCredentialProvider;
445
446 let verbose = polars_core::config::verbose();
447
448 let mut builder =
451 MicrosoftAzureBuilder::from_env().with_client_options(get_client_options());
452
453 if let Some(options) = &self.config {
454 let CloudConfig::Azure(options) = options else {
455 panic!("impl error: cloud type mismatch")
456 };
457 for (key, value) in options.iter() {
458 builder = builder.with_config(*key, value);
459 }
460 }
461
462 let builder = builder
463 .with_url(url)
464 .with_retry(get_retry_config(self.max_retries));
465
466 let builder =
467 if let Some(v) = self.initialized_credential_provider(clear_cached_credentials)? {
468 if verbose {
469 eprintln!(
470 "[CloudOptions::build_azure]: Using credential provider {:?}",
471 &v
472 );
473 }
474 builder.with_credentials(v.into_azure_provider())
475 } else {
476 builder
477 };
478
479 let out = builder.build()?;
480
481 Ok(out)
482 }
483
484 #[cfg(feature = "gcp")]
486 pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
487 mut self,
488 configs: I,
489 ) -> Self {
490 self.config = Some(CloudConfig::Gcp(
491 configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
492 ));
493 self
494 }
495
496 #[cfg(feature = "gcp")]
498 pub fn build_gcp(
499 &self,
500 url: &str,
501 clear_cached_credentials: bool,
502 ) -> PolarsResult<impl object_store::ObjectStore> {
503 use super::credential_provider::IntoCredentialProvider;
504
505 let credential_provider = self.initialized_credential_provider(clear_cached_credentials)?;
506
507 let builder = if credential_provider.is_none() {
508 GoogleCloudStorageBuilder::from_env()
509 } else {
510 GoogleCloudStorageBuilder::new()
511 };
512
513 let mut builder = builder.with_client_options(get_client_options());
514
515 if let Some(options) = &self.config {
516 let CloudConfig::Gcp(options) = options else {
517 panic!("impl error: cloud type mismatch")
518 };
519 for (key, value) in options.iter() {
520 builder = builder.with_config(*key, value);
521 }
522 }
523
524 let builder = builder
525 .with_url(url)
526 .with_retry(get_retry_config(self.max_retries));
527
528 let builder = if let Some(v) = credential_provider {
529 builder.with_credentials(v.into_gcp_provider())
530 } else {
531 builder
532 };
533
534 let out = builder.build()?;
535
536 Ok(out)
537 }
538
539 #[cfg(feature = "http")]
540 pub fn build_http(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
541 let out = object_store::http::HttpBuilder::new()
542 .with_url(url)
543 .with_client_options({
544 let mut opts = super::get_client_options();
545 if let Some(CloudConfig::Http { headers }) = &self.config {
546 opts = opts.with_default_headers(try_build_http_header_map_from_items_slice(
547 headers.as_slice(),
548 )?);
549 }
550 opts
551 })
552 .build()?;
553
554 Ok(out)
555 }
556
557 #[allow(unused_variables)]
559 pub fn from_untyped_config<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
560 scheme: Option<&CloudScheme>,
561 config: I,
562 ) -> PolarsResult<Self> {
563 match scheme.map_or(CloudType::File, CloudType::from_cloud_scheme) {
564 CloudType::Aws => {
565 #[cfg(feature = "aws")]
566 {
567 parse_untyped_config::<AmazonS3ConfigKey, _>(config)
568 .map(|aws| Self::default().with_aws(aws))
569 }
570 #[cfg(not(feature = "aws"))]
571 {
572 polars_bail!(ComputeError: "'aws' feature is not enabled");
573 }
574 },
575 CloudType::Azure => {
576 #[cfg(feature = "azure")]
577 {
578 parse_untyped_config::<AzureConfigKey, _>(config)
579 .map(|azure| Self::default().with_azure(azure))
580 }
581 #[cfg(not(feature = "azure"))]
582 {
583 polars_bail!(ComputeError: "'azure' feature is not enabled");
584 }
585 },
586 CloudType::File => Ok(Self::default()),
587 CloudType::Http => Ok(Self::default()),
588 CloudType::Gcp => {
589 #[cfg(feature = "gcp")]
590 {
591 parse_untyped_config::<GoogleConfigKey, _>(config)
592 .map(|gcp| Self::default().with_gcp(gcp))
593 }
594 #[cfg(not(feature = "gcp"))]
595 {
596 polars_bail!(ComputeError: "'gcp' feature is not enabled");
597 }
598 },
599 CloudType::Hf => {
600 #[cfg(feature = "http")]
601 {
602 use polars_core::config;
603
604 use crate::path_utils::resolve_homedir;
605
606 let mut this = Self::default();
607 let mut token = None;
608 let verbose = config::verbose();
609
610 for (i, (k, v)) in config.into_iter().enumerate() {
611 let (k, v) = (k.as_ref(), v.into());
612
613 if i == 0 && k == "token" {
614 if verbose {
615 eprintln!("HF token sourced from storage_options");
616 }
617 token = Some(v);
618 } else {
619 polars_bail!(ComputeError: "unknown configuration key for HF: {}", k)
620 }
621 }
622
623 token = token
624 .or_else(|| {
625 let v = std::env::var("HF_TOKEN").ok();
626 if v.is_some() && verbose {
627 eprintln!("HF token sourced from HF_TOKEN env var");
628 }
629 v
630 })
631 .or_else(|| {
632 let hf_home = std::env::var("HF_HOME");
633 let hf_home = hf_home.as_deref();
634 let hf_home = hf_home.unwrap_or("~/.cache/huggingface");
635 let hf_home = resolve_homedir(&hf_home);
636 let cached_token_path = hf_home.join("token");
637
638 let v = std::string::String::from_utf8(
639 std::fs::read(&cached_token_path).ok()?,
640 )
641 .ok()
642 .filter(|x| !x.is_empty());
643
644 if v.is_some() && verbose {
645 eprintln!(
646 "HF token sourced from {}",
647 cached_token_path.to_str().unwrap()
648 );
649 }
650
651 v
652 });
653
654 if let Some(v) = token {
655 this.config = Some(CloudConfig::Http {
656 headers: vec![("Authorization".into(), format!("Bearer {v}"))],
657 })
658 }
659
660 Ok(this)
661 }
662 #[cfg(not(feature = "http"))]
663 {
664 polars_bail!(ComputeError: "'http' feature is not enabled");
665 }
666 },
667 }
668 }
669
670 #[cfg(feature = "cloud")]
673 fn initialized_credential_provider(
674 &self,
675 clear_cached_credentials: bool,
676 ) -> PolarsResult<Option<PlCredentialProvider>> {
677 if let Some(v) = self.credential_provider.clone() {
678 v.try_into_initialized(clear_cached_credentials)
679 } else {
680 Ok(None)
681 }
682 }
683}
684
685#[cfg(feature = "cloud")]
686#[cfg(test)]
687mod tests {
688 use hashbrown::HashMap;
689
690 use super::parse_untyped_config;
691
692 #[cfg(feature = "aws")]
693 #[test]
694 fn test_parse_untyped_config() {
695 use object_store::aws::AmazonS3ConfigKey;
696
697 let aws_config = [
698 ("aws_secret_access_key", "a_key"),
699 ("aws_s3_allow_unsafe_rename", "true"),
700 ]
701 .into_iter()
702 .collect::<HashMap<_, _>>();
703 let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
704 .expect("Parsing keys shouldn't have thrown an error");
705
706 assert_eq!(
707 aws_keys.first().unwrap().0,
708 AmazonS3ConfigKey::SecretAccessKey
709 );
710 assert_eq!(aws_keys.len(), 1);
711
712 let aws_config = [
713 ("AWS_SECRET_ACCESS_KEY", "a_key"),
714 ("aws_s3_allow_unsafe_rename", "true"),
715 ]
716 .into_iter()
717 .collect::<HashMap<_, _>>();
718 let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
719 .expect("Parsing keys shouldn't have thrown an error");
720
721 assert_eq!(
722 aws_keys.first().unwrap().0,
723 AmazonS3ConfigKey::SecretAccessKey
724 );
725 assert_eq!(aws_keys.len(), 1);
726 }
727}