1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_str::PlSmallStr;
9use polars_utils::{format_pl_smallstr, pl_serialize};
10use tokio::sync::RwLock;
11use url::Url;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore, parse_url};
14use crate::cloud::CloudConfig;
15
16#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21 LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult<Arc<dyn ObjectStore>> {
25 polars_bail!(
26 ComputeError:
27 "feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme,
28 );
29}
30
31fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> Vec<u8> {
33 let cloud_options = options.map(
35 |CloudOptions {
36 max_retries,
38 #[cfg(feature = "file_cache")]
39 file_cache_ttl,
40 config,
41 #[cfg(feature = "cloud")]
42 credential_provider,
43 }| {
44 CloudOptions2 {
45 max_retries: *max_retries,
46 #[cfg(feature = "file_cache")]
47 file_cache_ttl: *file_cache_ttl,
48 config: config.clone(),
49 #[cfg(feature = "cloud")]
50 credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
51 }
52 },
53 );
54
55 let cache_key = CacheKey {
56 url_base: format_pl_smallstr!(
57 "{}",
58 &url[url::Position::BeforeScheme..url::Position::AfterPort]
59 ),
60 cloud_options,
61 };
62
63 verbose_print_sensitive(|| format!("object store cache key: {} {:?}", url, &cache_key));
64
65 return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
66
67 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
68 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
69 struct CacheKey {
70 url_base: PlSmallStr,
71 cloud_options: Option<CloudOptions2>,
72 }
73
74 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
77 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
78 struct CloudOptions2 {
79 max_retries: usize,
80 #[cfg(feature = "file_cache")]
81 file_cache_ttl: u64,
82 config: Option<CloudConfig>,
83 #[cfg(feature = "cloud")]
84 credential_provider: usize,
85 }
86}
87
88pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
90 object_store::path::Path::parse(path).map_err(to_compute_err)
91}
92
93#[derive(Debug, Clone)]
94pub(crate) struct PolarsObjectStoreBuilder {
95 url: PlSmallStr,
96 parsed_url: Url,
97 #[allow(unused)]
98 scheme: PlSmallStr,
99 cloud_type: CloudType,
100 options: Option<CloudOptions>,
101}
102
103impl PolarsObjectStoreBuilder {
104 pub(super) async fn build_impl(&self) -> PolarsResult<Arc<dyn ObjectStore>> {
105 let options = self
106 .options
107 .as_ref()
108 .unwrap_or_else(|| CloudOptions::default_static_ref());
109
110 let store = match self.cloud_type {
111 CloudType::Aws => {
112 #[cfg(feature = "aws")]
113 {
114 let store = options.build_aws(&self.url).await?;
115 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
116 }
117 #[cfg(not(feature = "aws"))]
118 return err_missing_feature("aws", &self.scheme);
119 },
120 CloudType::Gcp => {
121 #[cfg(feature = "gcp")]
122 {
123 let store = options.build_gcp(&self.url)?;
124 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
125 }
126 #[cfg(not(feature = "gcp"))]
127 return err_missing_feature("gcp", &self.scheme);
128 },
129 CloudType::Azure => {
130 {
131 #[cfg(feature = "azure")]
132 {
133 let store = options.build_azure(&self.url)?;
134 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
135 }
136 }
137 #[cfg(not(feature = "azure"))]
138 return err_missing_feature("azure", &self.scheme);
139 },
140 CloudType::File => {
141 let local = LocalFileSystem::new();
142 Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
143 },
144 CloudType::Http => {
145 {
146 #[cfg(feature = "http")]
147 {
148 let store = options.build_http(&self.url)?;
149 PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
150 }
151 }
152 #[cfg(not(feature = "http"))]
153 return err_missing_feature("http", &cloud_location.scheme);
154 },
155 CloudType::Hf => panic!("impl error: unresolved hf:// path"),
156 }?;
157
158 Ok(store)
159 }
160
161 pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
163 let opt_cache_key = match &self.cloud_type {
164 CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(url_and_creds_to_key(
165 &self.parsed_url,
166 self.options.as_ref(),
167 )),
168 CloudType::File | CloudType::Http | CloudType::Hf => None,
169 };
170
171 let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
172 let cache = OBJECT_STORE_CACHE.read().await;
173
174 if let Some(store) = cache.get(cache_key) {
175 return Ok(store.clone());
176 }
177
178 drop(cache);
179
180 let cache = OBJECT_STORE_CACHE.write().await;
181
182 if let Some(store) = cache.get(cache_key) {
183 return Ok(store.clone());
184 }
185
186 Some(cache)
187 } else {
188 None
189 };
190
191 let store = self.build_impl().await?;
192 let store = PolarsObjectStore::new_from_inner(store, self);
193
194 if let Some(mut cache) = opt_cache_write_guard {
195 if cache.len() >= 8 {
197 if config::verbose() {
198 eprintln!(
199 "build_object_store: clearing store cache (cache.len(): {})",
200 cache.len()
201 );
202 }
203 cache.clear()
204 }
205
206 cache.insert(opt_cache_key.unwrap(), store.clone());
207 }
208
209 Ok(store)
210 }
211
212 pub(crate) fn is_azure(&self) -> bool {
213 matches!(&self.cloud_type, CloudType::Azure)
214 }
215}
216
217pub async fn build_object_store(
219 url: &str,
220 #[cfg_attr(
221 not(any(feature = "aws", feature = "gcp", feature = "azure")),
222 allow(unused_variables)
223 )]
224 options: Option<&CloudOptions>,
225 glob: bool,
226) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
227 let parsed = parse_url(url).map_err(to_compute_err)?;
228 let cloud_location = CloudLocation::from_url(&parsed, glob)?;
229 let cloud_type = CloudType::from_url(&parsed)?;
230
231 let store = PolarsObjectStoreBuilder {
232 url: url.into(),
233 parsed_url: parsed,
234 scheme: cloud_location.scheme.as_str().into(),
235 cloud_type,
236 options: options.cloned(),
237 }
238 .build()
239 .await?;
240
241 Ok((cloud_location, store))
242}
243
244mod test {
245 #[test]
246 fn test_object_path_from_str() {
247 use super::object_path_from_str;
248
249 let path = "%25";
250 let out = object_path_from_str(path).unwrap();
251
252 assert_eq!(out.as_ref(), path);
253 }
254}