1use futures::TryStreamExt;
2use object_store::path::Path;
3use polars_core::error::to_compute_err;
4use polars_core::prelude::polars_ensure;
5use polars_error::{PolarsResult, polars_bail};
6use polars_utils::format_pl_smallstr;
7use polars_utils::pl_str::PlSmallStr;
8use regex::Regex;
9use url::Url;
10
11use super::{CloudOptions, parse_url};
12
13const DELIMITER: char = '/';
14
15pub(crate) fn extract_prefix_expansion(url: &str) -> PolarsResult<(String, Option<String>)> {
19 let splits = url.split(DELIMITER);
20 let mut prefix = String::new();
21 let mut expansion = String::new();
22 let mut last_split_was_wildcard = false;
23 for split in splits {
24 if expansion.is_empty() && memchr::memchr2(b'*', b'[', split.as_bytes()).is_none() {
25 if !prefix.is_empty() {
27 prefix.push(DELIMITER);
28 }
29 prefix.push_str(split);
30 continue;
31 }
32 if split == "**" {
36 last_split_was_wildcard = true;
37 expansion.push_str(".*");
38 continue;
39 }
40 polars_ensure!(
41 !split.contains("**"),
42 ComputeError: "expected '**' by itself in path component, got {}", url
43 );
44 if !last_split_was_wildcard && !expansion.is_empty() {
45 expansion.push(DELIMITER);
46 }
47 if memchr::memchr2(b'.', b'*', split.as_bytes()).is_some() {
49 let processed = split.replace('.', "\\.");
50 expansion.push_str(&processed.replace('*', "([^/]*)"));
51 continue;
52 }
53 last_split_was_wildcard = false;
54 expansion.push_str(split);
55 }
56 if !prefix.is_empty() && !expansion.is_empty() {
58 prefix.push(DELIMITER);
59 }
60 if !expansion.is_empty() {
62 expansion.insert(0, '^');
63 expansion.push('$');
64 }
65 Ok((
66 prefix,
67 if !expansion.is_empty() {
68 Some(expansion)
69 } else {
70 None
71 },
72 ))
73}
74
75#[derive(PartialEq, Debug, Default)]
77pub struct CloudLocation {
78 pub scheme: PlSmallStr,
80 pub bucket: PlSmallStr,
82 pub prefix: String,
84 pub expansion: Option<PlSmallStr>,
86}
87
88impl CloudLocation {
89 pub fn from_url(parsed: &Url, glob: bool) -> PolarsResult<CloudLocation> {
90 let is_local = parsed.scheme() == "file";
91 let (bucket, key) = if is_local {
92 ("".into(), parsed.path())
93 } else {
94 if parsed.scheme().starts_with("http") {
95 return Ok(CloudLocation {
96 scheme: parsed.scheme().into(),
97 ..Default::default()
98 });
99 }
100
101 let key = parsed.path();
102
103 let bucket = format_pl_smallstr!(
104 "{}",
105 &parsed[url::Position::BeforeUsername..url::Position::AfterPort]
106 );
107
108 if bucket.is_empty() {
109 polars_bail!(ComputeError: "CloudLocation::from_url(): empty bucket: {}", parsed);
110 }
111
112 (bucket, key)
113 };
114
115 let key = percent_encoding::percent_decode_str(key)
116 .decode_utf8()
117 .map_err(to_compute_err)?;
118 let (prefix, expansion) = if glob {
119 let (mut prefix, expansion) = extract_prefix_expansion(&key)?;
120 if is_local && key.starts_with(DELIMITER) {
121 prefix.insert(0, DELIMITER);
122 }
123 (prefix, expansion.map(|x| x.into()))
124 } else {
125 (key.as_ref().into(), None)
126 };
127
128 Ok(CloudLocation {
129 scheme: parsed.scheme().into(),
130 bucket,
131 prefix,
132 expansion,
133 })
134 }
135
136 pub fn new(url: &str, glob: bool) -> PolarsResult<CloudLocation> {
138 let parsed = parse_url(url).map_err(to_compute_err)?;
139 Self::from_url(&parsed, glob)
140 }
141}
142
143fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
145 format!("{scheme}://{bucket}/{key}")
146}
147
148pub(crate) struct Matcher {
151 prefix: String,
152 re: Option<Regex>,
153}
154
155impl Matcher {
156 pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
158 let re = expansion
160 .map(polars_utils::regex_cache::compile_regex)
161 .transpose()?;
162 Ok(Matcher { prefix, re })
163 }
164
165 pub(crate) fn is_matching(&self, key: &str) -> bool {
166 if !key.starts_with(self.prefix.as_str()) {
167 return false;
169 }
170 if self.re.is_none() {
171 return true;
172 }
173 let last = &key[self.prefix.len()..];
174 self.re.as_ref().unwrap().is_match(last.as_ref())
175 }
176}
177
178pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
180 let (
183 CloudLocation {
184 scheme,
185 bucket,
186 prefix,
187 expansion,
188 },
189 store,
190 ) = super::build_object_store(url, cloud_options, true).await?;
191 let matcher = &Matcher::new(
192 if scheme == "file" {
193 prefix[1..].into()
195 } else {
196 prefix.clone()
197 },
198 expansion.as_deref(),
199 )?;
200
201 let path = Path::from(prefix.as_str());
202 let path = Some(&path);
203
204 let mut locations = store
205 .try_exec_rebuild_on_err(|store| {
206 let st = store.clone();
207
208 async {
209 let store = st;
210 store
211 .list(path)
212 .try_filter_map(|x| async move {
213 let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
214 .then_some(x.location);
215 Ok(out)
216 })
217 .try_collect::<Vec<_>>()
218 .await
219 .map_err(to_compute_err)
220 }
221 })
222 .await?;
223
224 locations.sort_unstable();
225 Ok(locations
226 .into_iter()
227 .map(|l| full_url(&scheme, &bucket, l))
228 .collect::<Vec<_>>())
229}
230
231#[cfg(test)]
232mod test {
233 use super::*;
234
235 #[test]
236 fn test_cloud_location() {
237 assert_eq!(
238 CloudLocation::new("s3://a/b", true).unwrap(),
239 CloudLocation {
240 scheme: "s3".into(),
241 bucket: "a".into(),
242 prefix: "b".into(),
243 expansion: None,
244 }
245 );
246 assert_eq!(
247 CloudLocation::new("s3://a/b/*.c", true).unwrap(),
248 CloudLocation {
249 scheme: "s3".into(),
250 bucket: "a".into(),
251 prefix: "b/".into(),
252 expansion: Some("^([^/]*)\\.c$".into()),
253 }
254 );
255 assert_eq!(
256 CloudLocation::new("file:///a/b", true).unwrap(),
257 CloudLocation {
258 scheme: "file".into(),
259 bucket: "".into(),
260 prefix: "/a/b".into(),
261 expansion: None,
262 }
263 );
264 }
265
266 #[test]
267 fn test_extract_prefix_expansion() {
268 assert!(extract_prefix_expansion("**url").is_err());
269 assert_eq!(
270 extract_prefix_expansion("a/b.c").unwrap(),
271 ("a/b.c".into(), None)
272 );
273 assert_eq!(
274 extract_prefix_expansion("a/**").unwrap(),
275 ("a/".into(), Some("^.*$".into()))
276 );
277 assert_eq!(
278 extract_prefix_expansion("a/**/b").unwrap(),
279 ("a/".into(), Some("^.*b$".into()))
280 );
281 assert_eq!(
282 extract_prefix_expansion("a/**/*b").unwrap(),
283 ("a/".into(), Some("^.*([^/]*)b$".into()))
284 );
285 assert_eq!(
286 extract_prefix_expansion("a/**/data/*b").unwrap(),
287 ("a/".into(), Some("^.*data/([^/]*)b$".into()))
288 );
289 assert_eq!(
290 extract_prefix_expansion("a/*b").unwrap(),
291 ("a/".into(), Some("^([^/]*)b$".into()))
292 );
293 }
294
295 #[test]
296 fn test_matcher_file_name() {
297 let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet", true).unwrap();
298 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
299 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
301 assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
303 assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
305 }
306
307 #[test]
308 fn test_matcher_folders() {
309 let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet", true).unwrap();
310 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
311 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
313 assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
315 let cloud_location =
316 CloudLocation::new("s3://bucket/folder/**/data/*.parquet", true).unwrap();
317 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
318 assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
320 assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
322 assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
324 }
325
326 #[test]
327 fn test_cloud_location_no_glob() {
328 let cloud_location = CloudLocation::new("s3://bucket/[*", false).unwrap();
329 assert_eq!(
330 cloud_location,
331 CloudLocation {
332 scheme: "s3".into(),
333 bucket: "bucket".into(),
334 prefix: "/[*".into(),
335 expansion: None,
336 },
337 )
338 }
339
340 #[test]
341 fn test_cloud_location_percentages() {
342 use super::CloudLocation;
343
344 let path = "s3://bucket/%25";
345 let cloud_location = CloudLocation::new(path, true).unwrap();
346
347 assert_eq!(
348 cloud_location,
349 CloudLocation {
350 scheme: "s3".into(),
351 bucket: "bucket".into(),
352 prefix: "%25".into(),
353 expansion: None,
354 }
355 );
356
357 let path = "https://pola.rs/%25";
358 let cloud_location = CloudLocation::new(path, true).unwrap();
359
360 assert_eq!(
361 cloud_location,
362 CloudLocation {
363 scheme: "https".into(),
364 bucket: "".into(),
365 prefix: "".into(),
366 expansion: None,
367 }
368 );
369 }
370}