1use std::borrow::Cow;
2
3use futures::TryStreamExt;
4use object_store::path::Path;
5use polars_core::error::to_compute_err;
6use polars_error::{PolarsResult, polars_bail, polars_err};
7use polars_utils::pl_str::PlSmallStr;
8use polars_utils::plpath::{CloudScheme, PlPathRef};
9use regex::Regex;
10
11use super::CloudOptions;
12
13pub(crate) fn extract_prefix_expansion(path: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
19 let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
21
22 let mut pos: usize = if let Some(after_last_slash) =
26 memchr::memchr2(b'*', b'[', path.as_bytes()).map(|i| {
27 path.as_bytes()[..i]
28 .iter()
29 .rposition(|x| *x == b'/')
30 .map_or(0, |x| 1 + x)
31 }) {
32 replacements.push((after_last_slash, 0, &[]));
34 after_last_slash
35 } else {
36 usize::MAX
37 };
38
39 while pos < path.len() {
40 match memchr::memchr2(b'*', b'.', &path.as_bytes()[pos..]) {
41 None => break,
42 Some(i) => pos += i,
43 }
44
45 let (len, replace): (usize, &[u8]) = match &path[pos..] {
46 v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
50 (3, b"(.*/)?" as _)
52 },
53 v if v.starts_with("**") => {
54 polars_bail!(ComputeError: "invalid ** glob pattern")
55 },
56 v if v.starts_with('*') => (1, b"[^/]*" as _),
57 v if v.starts_with('.') => (1, b"\\." as _),
59 _ => {
60 pos += 1;
61 continue;
62 },
63 };
64
65 replacements.push((pos, len, replace));
66 pos += len;
67 }
68
69 if replacements.is_empty() {
70 return Ok((Cow::Borrowed(path), None));
71 }
72
73 let prefix = Cow::Borrowed(&path[..replacements[0].0]);
74
75 let mut pos = replacements[0].0;
76 let mut expansion = Vec::with_capacity(path.len() - pos);
77 expansion.push(b'^');
78
79 for (offset, len, replace) in replacements {
80 expansion.extend_from_slice(&path.as_bytes()[pos..offset]);
81 expansion.extend_from_slice(replace);
82 pos = offset + len;
83 }
84
85 if pos < path.len() {
86 expansion.extend_from_slice(&path.as_bytes()[pos..]);
87 }
88
89 expansion.push(b'$');
90
91 Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
92}
93
94#[derive(PartialEq, Debug, Default)]
96pub struct CloudLocation {
97 pub scheme: &'static str,
99 pub bucket: PlSmallStr,
101 pub prefix: String,
103 pub expansion: Option<PlSmallStr>,
105}
106
107impl CloudLocation {
108 pub fn new(path: PlPathRef<'_>, glob: bool) -> PolarsResult<Self> {
109 if let Some(scheme @ CloudScheme::Http | scheme @ CloudScheme::Https) = path.scheme() {
110 return Ok(CloudLocation {
112 scheme: scheme.as_str(),
113 ..Default::default()
114 });
115 }
116
117 let path_is_local = matches!(
118 path.scheme(),
119 None | Some(CloudScheme::File | CloudScheme::FileNoHostname)
120 );
121
122 let (bucket, key) = path
123 .strip_scheme_split_authority()
124 .ok_or("could not extract bucket/key (path did not contain '/')")
125 .and_then(|x| {
126 let bucket_is_empty = x.0.is_empty();
127
128 if path_is_local {
129 debug_assert!(bucket_is_empty);
130 }
131
132 if bucket_is_empty && !path_is_local {
133 Err("empty bucket name")
134 } else {
135 Ok(x)
136 }
137 })
138 .map_err(|failed_reason| {
139 polars_err!(
140 ComputeError:
141 "failed to create CloudLocation: {} (path: '{}')",
142 failed_reason,
143 path.to_str(),
144 )
145 })?;
146
147 let key = if path_is_local {
148 key
149 } else {
150 key.strip_prefix('/').unwrap_or(key)
151 };
152
153 let (prefix, expansion) = if glob {
154 let (prefix, expansion) = extract_prefix_expansion(key)?;
155
156 assert_eq!(prefix.starts_with('/'), key.starts_with('/'));
157
158 (prefix, expansion.map(|x| x.into()))
159 } else {
160 (key.into(), None)
161 };
162
163 Ok(CloudLocation {
164 scheme: path.scheme().unwrap_or(CloudScheme::File).as_str(),
165 bucket: PlSmallStr::from_str(bucket),
166 prefix: prefix.into_owned(),
167 expansion,
168 })
169 }
170}
171
172fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
174 format!("{scheme}://{bucket}/{key}")
175}
176
177pub(crate) struct Matcher {
180 prefix: String,
181 re: Option<Regex>,
182}
183
184impl Matcher {
185 pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
187 let re = expansion
189 .map(polars_utils::regex_cache::compile_regex)
190 .transpose()?;
191 Ok(Matcher { prefix, re })
192 }
193
194 pub(crate) fn is_matching(&self, key: &str) -> bool {
195 if !key.starts_with(self.prefix.as_str()) {
196 return false;
198 }
199 if self.re.is_none() {
200 return true;
201 }
202 let last = &key[self.prefix.len()..];
203 self.re.as_ref().unwrap().is_match(last.as_ref())
204 }
205}
206
207pub async fn glob(
209 url: PlPathRef<'_>,
210 cloud_options: Option<&CloudOptions>,
211) -> PolarsResult<Vec<String>> {
212 let (
215 CloudLocation {
216 scheme,
217 bucket,
218 prefix,
219 expansion,
220 },
221 store,
222 ) = super::build_object_store(url, cloud_options, true).await?;
223 let matcher = &Matcher::new(
224 if scheme == "file" {
225 prefix[1..].into()
227 } else {
228 prefix.clone()
229 },
230 expansion.as_deref(),
231 )?;
232
233 let path = Path::from(prefix.as_str());
234 let path = Some(&path);
235
236 let mut locations = store
237 .try_exec_rebuild_on_err(|store| {
238 let st = store.clone();
239
240 async {
241 let store = st;
242 store
243 .list(path)
244 .try_filter_map(|x| async move {
245 let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
246 .then_some(x.location);
247 Ok(out)
248 })
249 .try_collect::<Vec<_>>()
250 .await
251 .map_err(to_compute_err)
252 }
253 })
254 .await?;
255
256 locations.sort_unstable();
257 Ok(locations
258 .into_iter()
259 .map(|l| full_url(scheme, &bucket, l))
260 .collect::<Vec<_>>())
261}
262
263#[cfg(test)]
264mod test {
265 use super::*;
266
267 #[test]
268 fn test_cloud_location() {
269 assert_eq!(
270 CloudLocation::new(PlPathRef::new("s3://a/b"), true).unwrap(),
271 CloudLocation {
272 scheme: "s3",
273 bucket: "a".into(),
274 prefix: "b".into(),
275 expansion: None,
276 }
277 );
278 assert_eq!(
279 CloudLocation::new(PlPathRef::new("s3://a/b/*.c"), true).unwrap(),
280 CloudLocation {
281 scheme: "s3",
282 bucket: "a".into(),
283 prefix: "b/".into(),
284 expansion: Some("^[^/]*\\.c$".into()),
285 }
286 );
287 assert_eq!(
288 CloudLocation::new(PlPathRef::new("file:///a/b"), true).unwrap(),
289 CloudLocation {
290 scheme: "file",
291 bucket: "".into(),
292 prefix: "/a/b".into(),
293 expansion: None,
294 }
295 );
296 assert_eq!(
297 CloudLocation::new(PlPathRef::new("file:/a/b"), true).unwrap(),
298 CloudLocation {
299 scheme: "file",
300 bucket: "".into(),
301 prefix: "/a/b".into(),
302 expansion: None,
303 }
304 );
305 }
306
307 #[test]
308 fn test_extract_prefix_expansion() {
309 assert!(extract_prefix_expansion("**url").is_err());
310 assert_eq!(
311 extract_prefix_expansion("a/b.c").unwrap(),
312 ("a/b.c".into(), None)
313 );
314 assert_eq!(
315 extract_prefix_expansion("a/**").unwrap(),
316 ("a/".into(), Some("^(.*/)?$".into()))
317 );
318 assert_eq!(
319 extract_prefix_expansion("a/**/b").unwrap(),
320 ("a/".into(), Some("^(.*/)?b$".into()))
321 );
322 assert_eq!(
323 extract_prefix_expansion("a/**/*b").unwrap(),
324 ("a/".into(), Some("^(.*/)?[^/]*b$".into()))
325 );
326 assert_eq!(
327 extract_prefix_expansion("a/**/data/*b").unwrap(),
328 ("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
329 );
330 assert_eq!(
331 extract_prefix_expansion("a/*b").unwrap(),
332 ("a/".into(), Some("^[^/]*b$".into()))
333 );
334 }
335
336 #[test]
337 fn test_matcher_file_name() {
338 let cloud_location =
339 CloudLocation::new(PlPathRef::new("s3://bucket/folder/*.parquet"), true).unwrap();
340 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
341 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
343 assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
345 assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
347 }
348
349 #[test]
350 fn test_matcher_folders() {
351 let cloud_location =
352 CloudLocation::new(PlPathRef::new("s3://bucket/folder/**/*.parquet"), true).unwrap();
353
354 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
355 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
357 assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
359
360 let cloud_location =
361 CloudLocation::new(PlPathRef::new("s3://bucket/folder/**/data/*.parquet"), true)
362 .unwrap();
363 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
364
365 assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
367 assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
369 assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
371 }
372
373 #[test]
374 fn test_cloud_location_no_glob() {
375 let cloud_location = CloudLocation::new(PlPathRef::new("s3://bucket/[*"), false).unwrap();
376 assert_eq!(
377 cloud_location,
378 CloudLocation {
379 scheme: "s3",
380 bucket: "bucket".into(),
381 prefix: "[*".into(),
382 expansion: None,
383 },
384 )
385 }
386
387 #[test]
388 fn test_cloud_location_percentages() {
389 use super::CloudLocation;
390
391 let path = "s3://bucket/%25";
392 let cloud_location = CloudLocation::new(PlPathRef::new(path), true).unwrap();
393
394 assert_eq!(
395 cloud_location,
396 CloudLocation {
397 scheme: "s3",
398 bucket: "bucket".into(),
399 prefix: "%25".into(),
400 expansion: None,
401 }
402 );
403
404 let path = "https://pola.rs/%25";
405 let cloud_location = CloudLocation::new(PlPathRef::new(path), true).unwrap();
406
407 assert_eq!(
408 cloud_location,
409 CloudLocation {
410 scheme: "https",
411 bucket: "".into(),
412 prefix: "".into(),
413 expansion: None,
414 }
415 );
416 }
417
418 #[test]
419 fn test_glob_wildcard_21736() {
420 let path = "s3://bucket/folder/**/data.parquet";
421 let cloud_location = CloudLocation::new(PlPathRef::new(path), true).unwrap();
422
423 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
424
425 assert!(!a.is_matching("folder/_data.parquet"));
426
427 assert!(a.is_matching("folder/data.parquet"));
428 assert!(a.is_matching("folder/abc/data.parquet"));
429 assert!(a.is_matching("folder/abc/def/data.parquet"));
430
431 let path = "s3://bucket/folder/data_*.parquet";
432 let cloud_location = CloudLocation::new(PlPathRef::new(path), true).unwrap();
433
434 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
435
436 assert!(!a.is_matching("folder/data_1.ipc"))
437 }
438}