1use std::borrow::Cow;
2
3use futures::TryStreamExt;
4use object_store::path::Path;
5use polars_error::{PolarsResult, polars_bail, polars_err};
6use polars_utils::pl_path::{CloudScheme, PlRefPath};
7use polars_utils::pl_str::PlSmallStr;
8use regex::Regex;
9
10use super::CloudOptions;
11
12pub(crate) fn extract_prefix_expansion(path: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
18 let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
20
21 let mut pos: usize = if let Some(after_last_slash) =
25 memchr::memchr2(b'*', b'[', path.as_bytes()).map(|i| {
26 path.as_bytes()[..i]
27 .iter()
28 .rposition(|x| *x == b'/')
29 .map_or(0, |x| 1 + x)
30 }) {
31 replacements.push((after_last_slash, 0, &[]));
33 after_last_slash
34 } else {
35 usize::MAX
36 };
37
38 while pos < path.len() {
39 match memchr::memchr2(b'*', b'.', &path.as_bytes()[pos..]) {
40 None => break,
41 Some(i) => pos += i,
42 }
43
44 let (len, replace): (usize, &[u8]) = match &path[pos..] {
45 v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
49 (3, b"(.*/)?" as _)
51 },
52 v if v.starts_with("**") => {
53 polars_bail!(ComputeError: "invalid ** glob pattern")
54 },
55 v if v.starts_with('*') => (1, b"[^/]*" as _),
56 v if v.starts_with('.') => (1, b"\\." as _),
58 _ => {
59 pos += 1;
60 continue;
61 },
62 };
63
64 replacements.push((pos, len, replace));
65 pos += len;
66 }
67
68 if replacements.is_empty() {
69 return Ok((Cow::Borrowed(path), None));
70 }
71
72 let prefix = Cow::Borrowed(&path[..replacements[0].0]);
73
74 let mut pos = replacements[0].0;
75 let mut expansion = Vec::with_capacity(path.len() - pos);
76 expansion.push(b'^');
77
78 for (offset, len, replace) in replacements {
79 expansion.extend_from_slice(&path.as_bytes()[pos..offset]);
80 expansion.extend_from_slice(replace);
81 pos = offset + len;
82 }
83
84 if pos < path.len() {
85 expansion.extend_from_slice(&path.as_bytes()[pos..]);
86 }
87
88 expansion.push(b'$');
89
90 Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
91}
92
93#[derive(PartialEq, Debug, Default)]
95pub struct CloudLocation {
96 pub scheme: &'static str,
98 pub bucket: PlSmallStr,
100 pub prefix: String,
102 pub expansion: Option<PlSmallStr>,
104}
105
106impl CloudLocation {
107 pub fn new(path: PlRefPath, glob: bool) -> PolarsResult<Self> {
108 if let Some(scheme @ CloudScheme::Http | scheme @ CloudScheme::Https) = path.scheme() {
109 return Ok(CloudLocation {
111 scheme: scheme.as_str(),
112 ..Default::default()
113 });
114 }
115
116 let path_is_local = matches!(
117 path.scheme(),
118 None | Some(CloudScheme::File | CloudScheme::FileNoHostname)
119 );
120
121 let (bucket, key) = path
122 .strip_scheme_split_authority()
123 .ok_or(Cow::Borrowed(
124 "could not extract bucket/key (path did not contain '/')",
125 ))
126 .and_then(|x @ (bucket, _)| {
127 let bucket_is_empty = bucket.is_empty();
128
129 if path_is_local && !bucket_is_empty {
130 Err(Cow::Owned(format!(
131 "unsupported: non-empty hostname for 'file:' URI: '{bucket}'",
132 )))
133 } else if bucket_is_empty && !path_is_local {
134 Err(Cow::Borrowed("empty bucket name"))
135 } else {
136 Ok(x)
137 }
138 })
139 .map_err(|failed_reason| {
140 polars_err!(
141 ComputeError:
142 "failed to create CloudLocation: {} (path: '{}')",
143 failed_reason,
144 path,
145 )
146 })?;
147
148 let key = if path_is_local {
149 key
150 } else {
151 key.strip_prefix('/').unwrap_or(key)
152 };
153
154 let (prefix, expansion) = if glob {
155 let (prefix, expansion) = extract_prefix_expansion(key)?;
156
157 assert_eq!(prefix.starts_with('/'), key.starts_with('/'));
158
159 (prefix, expansion.map(|x| x.into()))
160 } else {
161 (key.into(), None)
162 };
163
164 Ok(CloudLocation {
165 scheme: path.scheme().unwrap_or(CloudScheme::File).as_str(),
166 bucket: PlSmallStr::from_str(bucket),
167 prefix: prefix.into_owned(),
168 expansion,
169 })
170 }
171}
172
173fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
175 format!("{scheme}://{bucket}/{key}")
176}
177
178pub(crate) struct Matcher {
181 prefix: String,
182 re: Option<Regex>,
183}
184
185impl Matcher {
186 pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
188 let re = expansion
190 .map(polars_utils::regex_cache::compile_regex)
191 .transpose()?;
192 Ok(Matcher { prefix, re })
193 }
194
195 pub(crate) fn is_matching(&self, key: &str) -> bool {
196 if !key.starts_with(self.prefix.as_str()) {
197 return false;
199 }
200 if self.re.is_none() {
201 return true;
202 }
203 let last = &key[self.prefix.len()..];
204 self.re.as_ref().unwrap().is_match(last.as_ref())
205 }
206}
207
208pub async fn glob(
210 url: PlRefPath,
211 cloud_options: Option<&CloudOptions>,
212) -> PolarsResult<Vec<String>> {
213 let (
216 CloudLocation {
217 scheme,
218 bucket,
219 prefix,
220 expansion,
221 },
222 store,
223 ) = super::build_object_store(url, cloud_options, true).await?;
224 let matcher = &Matcher::new(
225 if scheme == "file" {
226 prefix[1..].into()
228 } else {
229 prefix.clone()
230 },
231 expansion.as_deref(),
232 )?;
233
234 let path = Path::from(prefix.as_str());
235 let path = Some(&path);
236
237 let mut locations = store
238 .exec_with_rebuild_retry_on_err(|store| async move {
239 store
240 .list(path)
241 .try_filter_map(|x| async move {
242 let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
243 .then_some(x.location);
244 Ok(out)
245 })
246 .try_collect::<Vec<_>>()
247 .await
248 })
249 .await?;
250
251 locations.sort_unstable();
252 Ok(locations
253 .into_iter()
254 .map(|l| full_url(scheme, &bucket, l))
255 .collect::<Vec<_>>())
256}
257
258#[cfg(test)]
259mod test {
260 use super::*;
261
262 #[test]
263 fn test_cloud_location() {
264 assert_eq!(
265 CloudLocation::new(PlRefPath::new("s3://a/b"), true).unwrap(),
266 CloudLocation {
267 scheme: "s3",
268 bucket: "a".into(),
269 prefix: "b".into(),
270 expansion: None,
271 }
272 );
273 assert_eq!(
274 CloudLocation::new(PlRefPath::new("s3://a/b/*.c"), true).unwrap(),
275 CloudLocation {
276 scheme: "s3",
277 bucket: "a".into(),
278 prefix: "b/".into(),
279 expansion: Some("^[^/]*\\.c$".into()),
280 }
281 );
282 assert_eq!(
283 CloudLocation::new(PlRefPath::new("file:///a/b"), true).unwrap(),
284 CloudLocation {
285 scheme: "file",
286 bucket: "".into(),
287 prefix: "/a/b".into(),
288 expansion: None,
289 }
290 );
291 assert_eq!(
292 CloudLocation::new(PlRefPath::new("file:/a/b"), true).unwrap(),
293 CloudLocation {
294 scheme: "file",
295 bucket: "".into(),
296 prefix: "/a/b".into(),
297 expansion: None,
298 }
299 );
300 }
301
302 #[test]
303 fn test_extract_prefix_expansion() {
304 assert!(extract_prefix_expansion("**url").is_err());
305 assert_eq!(
306 extract_prefix_expansion("a/b.c").unwrap(),
307 ("a/b.c".into(), None)
308 );
309 assert_eq!(
310 extract_prefix_expansion("a/**").unwrap(),
311 ("a/".into(), Some("^(.*/)?$".into()))
312 );
313 assert_eq!(
314 extract_prefix_expansion("a/**/b").unwrap(),
315 ("a/".into(), Some("^(.*/)?b$".into()))
316 );
317 assert_eq!(
318 extract_prefix_expansion("a/**/*b").unwrap(),
319 ("a/".into(), Some("^(.*/)?[^/]*b$".into()))
320 );
321 assert_eq!(
322 extract_prefix_expansion("a/**/data/*b").unwrap(),
323 ("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
324 );
325 assert_eq!(
326 extract_prefix_expansion("a/*b").unwrap(),
327 ("a/".into(), Some("^[^/]*b$".into()))
328 );
329 }
330
331 #[test]
332 fn test_matcher_file_name() {
333 let cloud_location =
334 CloudLocation::new(PlRefPath::new("s3://bucket/folder/*.parquet"), true).unwrap();
335 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
336 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
338 assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
340 assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
342 }
343
344 #[test]
345 fn test_matcher_folders() {
346 let cloud_location =
347 CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/*.parquet"), true).unwrap();
348
349 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
350 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
352 assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
354
355 let cloud_location =
356 CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/data/*.parquet"), true)
357 .unwrap();
358 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
359
360 assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
362 assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
364 assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
366 }
367
368 #[test]
369 fn test_cloud_location_no_glob() {
370 let cloud_location = CloudLocation::new(PlRefPath::new("s3://bucket/[*"), false).unwrap();
371 assert_eq!(
372 cloud_location,
373 CloudLocation {
374 scheme: "s3",
375 bucket: "bucket".into(),
376 prefix: "[*".into(),
377 expansion: None,
378 },
379 )
380 }
381
382 #[test]
383 fn test_cloud_location_percentages() {
384 use super::CloudLocation;
385
386 let path = "s3://bucket/%25";
387 let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
388
389 assert_eq!(
390 cloud_location,
391 CloudLocation {
392 scheme: "s3",
393 bucket: "bucket".into(),
394 prefix: "%25".into(),
395 expansion: None,
396 }
397 );
398
399 let path = "https://pola.rs/%25";
400 let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
401
402 assert_eq!(
403 cloud_location,
404 CloudLocation {
405 scheme: "https",
406 bucket: "".into(),
407 prefix: "".into(),
408 expansion: None,
409 }
410 );
411 }
412
413 #[test]
414 fn test_glob_wildcard_21736() {
415 let path = "s3://bucket/folder/**/data.parquet";
416 let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
417
418 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
419
420 assert!(!a.is_matching("folder/_data.parquet"));
421
422 assert!(a.is_matching("folder/data.parquet"));
423 assert!(a.is_matching("folder/abc/data.parquet"));
424 assert!(a.is_matching("folder/abc/def/data.parquet"));
425
426 let path = "s3://bucket/folder/data_*.parquet";
427 let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
428
429 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
430
431 assert!(!a.is_matching("folder/data_1.ipc"))
432 }
433}