1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_path::{PlPath, PlRefPath};
9use polars_utils::pl_str::PlSmallStr;
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::{CloudConfig, CloudRetryConfig};
15
16#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21 LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(
25 feature: &str,
26 cloud_type: &CloudType,
27) -> PolarsResult<Arc<dyn ObjectStore>> {
28 polars_bail!(
29 ComputeError:
30 "feature '{}' must be enabled in order to use '{:?}' cloud urls",
31 feature,
32 cloud_type,
33 );
34}
35
36fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> Vec<u8> {
38 let cloud_options = options.map(
40 |CloudOptions {
41 #[cfg(feature = "file_cache")]
43 file_cache_ttl,
44 config,
45 retry_config,
46 #[cfg(feature = "cloud")]
47 credential_provider,
48 }| {
49 CloudOptionsKey {
50 #[cfg(feature = "file_cache")]
51 file_cache_ttl: *file_cache_ttl,
52 config: config.clone(),
53 retry_config: *retry_config,
54 #[cfg(feature = "cloud")]
55 credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
56 }
57 },
58 );
59
60 let cache_key = CacheKey {
61 url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),
62 cloud_options,
63 };
64
65 verbose_print_sensitive(|| {
66 format!(
67 "object store cache key for path at '{}': {:?}",
68 path, &cache_key
69 )
70 });
71
72 return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
73
74 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
75 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
76 struct CacheKey {
77 url_base: PlSmallStr,
78 cloud_options: Option<CloudOptionsKey>,
79 }
80
81 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
84 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
85 struct CloudOptionsKey {
86 #[cfg(feature = "file_cache")]
87 file_cache_ttl: u64,
88 config: Option<CloudConfig>,
89 retry_config: CloudRetryConfig,
90 #[cfg(feature = "cloud")]
91 credential_provider: usize,
92 }
93}
94
95pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
97 object_store::path::Path::parse(path).map_err(to_compute_err)
98}
99
100#[derive(Debug, Clone)]
101pub(crate) struct PolarsObjectStoreBuilder {
102 path: PlRefPath,
103 cloud_type: CloudType,
104 options: Option<CloudOptions>,
105}
106
107impl PolarsObjectStoreBuilder {
108 pub(super) fn path(&self) -> &PlRefPath {
109 &self.path
110 }
111
112 pub(super) async fn build_impl(
113 &self,
114 clear_cached_credentials: bool,
116 ) -> PolarsResult<Arc<dyn ObjectStore>> {
117 let options = self
118 .options
119 .as_ref()
120 .unwrap_or_else(|| CloudOptions::default_static_ref());
121
122 if let Some(options) = &self.options
123 && verbose()
124 {
125 eprintln!(
126 "build object-store: file_cache_ttl: {}",
127 options.file_cache_ttl
128 )
129 }
130
131 let store = match self.cloud_type {
132 CloudType::Aws => {
133 #[cfg(feature = "aws")]
134 {
135 let store = options
136 .build_aws(self.path.clone(), clear_cached_credentials)
137 .await?;
138 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
139 }
140 #[cfg(not(feature = "aws"))]
141 return err_missing_feature("aws", &self.cloud_type);
142 },
143 CloudType::Gcp => {
144 #[cfg(feature = "gcp")]
145 {
146 let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;
147
148 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
149 }
150 #[cfg(not(feature = "gcp"))]
151 return err_missing_feature("gcp", &self.cloud_type);
152 },
153 CloudType::Azure => {
154 {
155 #[cfg(feature = "azure")]
156 {
157 let store =
158 options.build_azure(self.path.clone(), clear_cached_credentials)?;
159 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
160 }
161 }
162 #[cfg(not(feature = "azure"))]
163 return err_missing_feature("azure", &self.cloud_type);
164 },
165 CloudType::File => {
166 let local = LocalFileSystem::new();
167 Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
168 },
169 CloudType::Http => {
170 {
171 #[cfg(feature = "http")]
172 {
173 let store = options.build_http(self.path.clone())?;
174 PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
175 }
176 }
177 #[cfg(not(feature = "http"))]
178 return err_missing_feature("http", &cloud_location.scheme);
179 },
180 CloudType::Hf => panic!("impl error: unresolved hf:// path"),
181 }?;
182
183 Ok(store)
184 }
185
186 pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
188 let opt_cache_key = match &self.cloud_type {
189 CloudType::Aws | CloudType::Gcp | CloudType::Azure => {
190 Some(path_and_creds_to_key(&self.path, self.options.as_ref()))
191 },
192 CloudType::File | CloudType::Http | CloudType::Hf => None,
193 };
194
195 let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
196 let cache = OBJECT_STORE_CACHE.read().await;
197
198 if let Some(store) = cache.get(cache_key) {
199 return Ok(store.clone());
200 }
201
202 drop(cache);
203
204 let cache = OBJECT_STORE_CACHE.write().await;
205
206 if let Some(store) = cache.get(cache_key) {
207 return Ok(store.clone());
208 }
209
210 Some(cache)
211 } else {
212 None
213 };
214
215 let store = self.build_impl(false).await?;
216 let store = PolarsObjectStore::new_from_inner(store, self);
217
218 if let Some(mut cache) = opt_cache_write_guard {
219 if cache.len() >= 8 {
221 if config::verbose() {
222 eprintln!(
223 "build_object_store: clearing store cache (cache.len(): {})",
224 cache.len()
225 );
226 }
227 cache.clear()
228 }
229
230 cache.insert(opt_cache_key.unwrap(), store.clone());
231 }
232
233 Ok(store)
234 }
235
236 pub(crate) fn is_azure(&self) -> bool {
237 matches!(&self.cloud_type, CloudType::Azure)
238 }
239}
240
241pub async fn build_object_store(
243 path: PlRefPath,
244 #[cfg_attr(
245 not(any(feature = "aws", feature = "gcp", feature = "azure")),
246 allow(unused_variables)
247 )]
248 options: Option<&CloudOptions>,
249 glob: bool,
250) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
251 let path = path.to_absolute_path()?.into_owned();
252
253 let cloud_type = path
254 .scheme()
255 .map_or(CloudType::File, CloudType::from_cloud_scheme);
256 let cloud_location = CloudLocation::new(path.clone(), glob)?;
257
258 let store = PolarsObjectStoreBuilder {
259 path,
260 cloud_type,
261 options: options.cloned(),
262 }
263 .build()
264 .await?;
265
266 Ok((cloud_location, store))
267}
268
269mod test {
270 #[test]
271 fn test_object_path_from_str() {
272 use super::object_path_from_str;
273
274 let path = "%25";
275 let out = object_path_from_str(path).unwrap();
276
277 assert_eq!(out.as_ref(), path);
278 }
279}