1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_str::PlSmallStr;
9use polars_utils::plpath::{PlPath, PlPathRef};
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::CloudConfig;
15
16#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21 LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(
25 feature: &str,
26 cloud_type: &CloudType,
27) -> PolarsResult<Arc<dyn ObjectStore>> {
28 polars_bail!(
29 ComputeError:
30 "feature '{}' must be enabled in order to use '{:?}' cloud urls",
31 feature,
32 cloud_type,
33 );
34}
35
36fn path_and_creds_to_key(path: PlPathRef<'_>, options: Option<&CloudOptions>) -> Vec<u8> {
38 let cloud_options = options.map(
40 |CloudOptions {
41 max_retries,
43 #[cfg(feature = "file_cache")]
44 file_cache_ttl,
45 config,
46 #[cfg(feature = "cloud")]
47 credential_provider,
48 }| {
49 CloudOptions2 {
50 max_retries: *max_retries,
51 #[cfg(feature = "file_cache")]
52 file_cache_ttl: *file_cache_ttl,
53 config: config.clone(),
54 #[cfg(feature = "cloud")]
55 credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
56 }
57 },
58 );
59
60 let cache_key = CacheKey {
61 url_base: format_pl_smallstr!("{}", &path.to_str()[..path.authority_end_position()]),
62 cloud_options,
63 };
64
65 verbose_print_sensitive(|| {
66 format!(
67 "object store cache key for path at '{}': {:?}",
68 path.to_str(),
69 &cache_key
70 )
71 });
72
73 return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
74
75 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
76 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
77 struct CacheKey {
78 url_base: PlSmallStr,
79 cloud_options: Option<CloudOptions2>,
80 }
81
82 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
85 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
86 struct CloudOptions2 {
87 max_retries: usize,
88 #[cfg(feature = "file_cache")]
89 file_cache_ttl: u64,
90 config: Option<CloudConfig>,
91 #[cfg(feature = "cloud")]
92 credential_provider: usize,
93 }
94}
95
96pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
98 object_store::path::Path::parse(path).map_err(to_compute_err)
99}
100
101#[derive(Debug, Clone)]
102pub(crate) struct PolarsObjectStoreBuilder {
103 path: PlPath,
104 cloud_type: CloudType,
105 options: Option<CloudOptions>,
106}
107
108impl PolarsObjectStoreBuilder {
109 pub(super) async fn build_impl(
110 &self,
111 clear_cached_credentials: bool,
113 ) -> PolarsResult<Arc<dyn ObjectStore>> {
114 let options = self
115 .options
116 .as_ref()
117 .unwrap_or_else(|| CloudOptions::default_static_ref());
118
119 let store = match self.cloud_type {
120 CloudType::Aws => {
121 #[cfg(feature = "aws")]
122 {
123 let store = options
124 .build_aws(self.path.to_str(), clear_cached_credentials)
125 .await?;
126 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
127 }
128 #[cfg(not(feature = "aws"))]
129 return err_missing_feature("aws", &self.cloud_type);
130 },
131 CloudType::Gcp => {
132 #[cfg(feature = "gcp")]
133 {
134 let store = options.build_gcp(self.path.to_str(), clear_cached_credentials)?;
135
136 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
137 }
138 #[cfg(not(feature = "gcp"))]
139 return err_missing_feature("gcp", &self.cloud_type);
140 },
141 CloudType::Azure => {
142 {
143 #[cfg(feature = "azure")]
144 {
145 let store =
146 options.build_azure(self.path.to_str(), clear_cached_credentials)?;
147 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
148 }
149 }
150 #[cfg(not(feature = "azure"))]
151 return err_missing_feature("azure", &self.cloud_type);
152 },
153 CloudType::File => {
154 let local = LocalFileSystem::new();
155 Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
156 },
157 CloudType::Http => {
158 {
159 #[cfg(feature = "http")]
160 {
161 let store = options.build_http(self.path.to_str())?;
162 PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
163 }
164 }
165 #[cfg(not(feature = "http"))]
166 return err_missing_feature("http", &cloud_location.scheme);
167 },
168 CloudType::Hf => panic!("impl error: unresolved hf:// path"),
169 }?;
170
171 Ok(store)
172 }
173
174 pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
176 let opt_cache_key = match &self.cloud_type {
177 CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(path_and_creds_to_key(
178 self.path.as_ref(),
179 self.options.as_ref(),
180 )),
181 CloudType::File | CloudType::Http | CloudType::Hf => None,
182 };
183
184 let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
185 let cache = OBJECT_STORE_CACHE.read().await;
186
187 if let Some(store) = cache.get(cache_key) {
188 return Ok(store.clone());
189 }
190
191 drop(cache);
192
193 let cache = OBJECT_STORE_CACHE.write().await;
194
195 if let Some(store) = cache.get(cache_key) {
196 return Ok(store.clone());
197 }
198
199 Some(cache)
200 } else {
201 None
202 };
203
204 let store = self.build_impl(false).await?;
205 let store = PolarsObjectStore::new_from_inner(store, self);
206
207 if let Some(mut cache) = opt_cache_write_guard {
208 if cache.len() >= 8 {
210 if config::verbose() {
211 eprintln!(
212 "build_object_store: clearing store cache (cache.len(): {})",
213 cache.len()
214 );
215 }
216 cache.clear()
217 }
218
219 cache.insert(opt_cache_key.unwrap(), store.clone());
220 }
221
222 Ok(store)
223 }
224
225 pub(crate) fn is_azure(&self) -> bool {
226 matches!(&self.cloud_type, CloudType::Azure)
227 }
228}
229
230pub async fn build_object_store(
232 path: PlPathRef<'_>,
233 #[cfg_attr(
234 not(any(feature = "aws", feature = "gcp", feature = "azure")),
235 allow(unused_variables)
236 )]
237 options: Option<&CloudOptions>,
238 glob: bool,
239) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
240 let path = path.to_absolute_path().unwrap_or_else(|| path.into_owned());
241
242 let cloud_location = CloudLocation::new(path.as_ref(), glob)?;
243 let cloud_type = path.as_ref().scheme().map_or(CloudType::File, |scheme| {
244 CloudType::from_cloud_scheme(&scheme)
245 });
246
247 let store = PolarsObjectStoreBuilder {
248 path,
249 cloud_type,
250 options: options.cloned(),
251 }
252 .build()
253 .await?;
254
255 Ok((cloud_location, store))
256}
257
258mod test {
259 #[test]
260 fn test_object_path_from_str() {
261 use super::object_path_from_str;
262
263 let path = "%25";
264 let out = object_path_from_str(path).unwrap();
265
266 assert_eq!(out.as_ref(), path);
267 }
268}