polars_io/cloud/
glob.rs

1use std::borrow::Cow;
2
3use futures::TryStreamExt;
4use object_store::path::Path;
5use polars_error::{PolarsResult, polars_bail, polars_err};
6use polars_utils::pl_path::{CloudScheme, PlRefPath};
7use polars_utils::pl_str::PlSmallStr;
8use regex::Regex;
9
10use super::CloudOptions;
11
12/// Converts a glob to regex form.
13///
14/// # Returns
15/// 1. the prefix part (all path components until the first one with '*')
16/// 2. a regular expression representation of the rest.
17pub(crate) fn extract_prefix_expansion(path: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
18    // (offset, len, replacement)
19    let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
20
21    // The position after the last slash before glob characters begin.
22    // `a/b/c*/`
23    //      ^
24    let mut pos: usize = if let Some(after_last_slash) =
25        memchr::memchr2(b'*', b'[', path.as_bytes()).map(|i| {
26            path.as_bytes()[..i]
27                .iter()
28                .rposition(|x| *x == b'/')
29                .map_or(0, |x| 1 + x)
30        }) {
31        // First value is used as the starting point later.
32        replacements.push((after_last_slash, 0, &[]));
33        after_last_slash
34    } else {
35        usize::MAX
36    };
37
38    while pos < path.len() {
39        match memchr::memchr2(b'*', b'.', &path.as_bytes()[pos..]) {
40            None => break,
41            Some(i) => pos += i,
42        }
43
44        let (len, replace): (usize, &[u8]) = match &path[pos..] {
45            // Accept:
46            // - `**/`
47            // - `**` only if it is the end of the path
48            v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
49                // Wrapping in a capture group ensures we also match non-nested paths.
50                (3, b"(.*/)?" as _)
51            },
52            v if v.starts_with("**") => {
53                polars_bail!(ComputeError: "invalid ** glob pattern")
54            },
55            v if v.starts_with('*') => (1, b"[^/]*" as _),
56            // Dots need to be escaped in regex.
57            v if v.starts_with('.') => (1, b"\\." as _),
58            _ => {
59                pos += 1;
60                continue;
61            },
62        };
63
64        replacements.push((pos, len, replace));
65        pos += len;
66    }
67
68    if replacements.is_empty() {
69        return Ok((Cow::Borrowed(path), None));
70    }
71
72    let prefix = Cow::Borrowed(&path[..replacements[0].0]);
73
74    let mut pos = replacements[0].0;
75    let mut expansion = Vec::with_capacity(path.len() - pos);
76    expansion.push(b'^');
77
78    for (offset, len, replace) in replacements {
79        expansion.extend_from_slice(&path.as_bytes()[pos..offset]);
80        expansion.extend_from_slice(replace);
81        pos = offset + len;
82    }
83
84    if pos < path.len() {
85        expansion.extend_from_slice(&path.as_bytes()[pos..]);
86    }
87
88    expansion.push(b'$');
89
90    Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
91}
92
93/// A location on cloud storage, may have wildcards.
94#[derive(PartialEq, Debug, Default)]
95pub struct CloudLocation {
96    /// The scheme (s3, ...).
97    pub scheme: &'static str,
98    /// The bucket name.
99    pub bucket: PlSmallStr,
100    /// The prefix inside the bucket, this will be the full key when wildcards are not used.
101    pub prefix: String,
102    /// The path components that need to be expanded.
103    pub expansion: Option<PlSmallStr>,
104}
105
106impl CloudLocation {
107    pub fn new(path: PlRefPath, glob: bool) -> PolarsResult<Self> {
108        if let Some(scheme @ CloudScheme::Http | scheme @ CloudScheme::Https) = path.scheme() {
109            // Http/s does not use this
110            return Ok(CloudLocation {
111                scheme: scheme.as_str(),
112                ..Default::default()
113            });
114        }
115
116        let path_is_local = matches!(
117            path.scheme(),
118            None | Some(CloudScheme::File | CloudScheme::FileNoHostname)
119        );
120
121        let (bucket, key) = path
122            .strip_scheme_split_authority()
123            .ok_or(Cow::Borrowed(
124                "could not extract bucket/key (path did not contain '/')",
125            ))
126            .and_then(|x @ (bucket, _)| {
127                let bucket_is_empty = bucket.is_empty();
128
129                if path_is_local && !bucket_is_empty {
130                    Err(Cow::Owned(format!(
131                        "unsupported: non-empty hostname for 'file:' URI: '{bucket}'",
132                    )))
133                } else if bucket_is_empty && !path_is_local {
134                    Err(Cow::Borrowed("empty bucket name"))
135                } else {
136                    Ok(x)
137                }
138            })
139            .map_err(|failed_reason| {
140                polars_err!(
141                    ComputeError:
142                    "failed to create CloudLocation: {} (path: '{}')",
143                    failed_reason,
144                    path,
145                )
146            })?;
147
148        let key = if path_is_local {
149            key
150        } else {
151            key.strip_prefix('/').unwrap_or(key)
152        };
153
154        let (prefix, expansion) = if glob {
155            let (prefix, expansion) = extract_prefix_expansion(key)?;
156
157            assert_eq!(prefix.starts_with('/'), key.starts_with('/'));
158
159            (prefix, expansion.map(|x| x.into()))
160        } else {
161            (key.into(), None)
162        };
163
164        Ok(CloudLocation {
165            scheme: path.scheme().unwrap_or(CloudScheme::File).as_str(),
166            bucket: PlSmallStr::from_str(bucket),
167            prefix: prefix.into_owned(),
168            expansion,
169        })
170    }
171}
172
173/// Return a full url from a key relative to the given location.
174fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
175    format!("{scheme}://{bucket}/{key}")
176}
177
178/// A simple matcher, if more is required consider depending on https://crates.io/crates/globset.
179/// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`.
180pub(crate) struct Matcher {
181    prefix: String,
182    re: Option<Regex>,
183}
184
185impl Matcher {
186    /// Build a Matcher for the given prefix and expansion.
187    pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
188        // Cloud APIs accept a prefix without any expansion, extract it.
189        let re = expansion
190            .map(polars_utils::regex_cache::compile_regex)
191            .transpose()?;
192        Ok(Matcher { prefix, re })
193    }
194
195    pub(crate) fn is_matching(&self, key: &str) -> bool {
196        if !key.starts_with(self.prefix.as_str()) {
197            // Prefix does not match, should not happen.
198            return false;
199        }
200        if self.re.is_none() {
201            return true;
202        }
203        let last = &key[self.prefix.len()..];
204        self.re.as_ref().unwrap().is_match(last.as_ref())
205    }
206}
207
208/// List files with a prefix derived from the pattern.
209pub async fn glob(
210    url: PlRefPath,
211    cloud_options: Option<&CloudOptions>,
212) -> PolarsResult<Vec<String>> {
213    // Find the fixed prefix, up to the first '*'.
214
215    let (
216        CloudLocation {
217            scheme,
218            bucket,
219            prefix,
220            expansion,
221        },
222        store,
223    ) = super::build_object_store(url, cloud_options, true).await?;
224    let matcher = &Matcher::new(
225        if scheme == "file" {
226            // For local paths the returned location has the leading slash stripped.
227            prefix[1..].into()
228        } else {
229            prefix.clone()
230        },
231        expansion.as_deref(),
232    )?;
233
234    let path = Path::from(prefix.as_str());
235    let path = Some(&path);
236
237    let mut locations = store
238        .exec_with_rebuild_retry_on_err(|store| async move {
239            store
240                .list(path)
241                .try_filter_map(|x| async move {
242                    let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
243                        .then_some(x.location);
244                    Ok(out)
245                })
246                .try_collect::<Vec<_>>()
247                .await
248        })
249        .await?;
250
251    locations.sort_unstable();
252    Ok(locations
253        .into_iter()
254        .map(|l| full_url(scheme, &bucket, l))
255        .collect::<Vec<_>>())
256}
257
258#[cfg(test)]
259mod test {
260    use super::*;
261
262    #[test]
263    fn test_cloud_location() {
264        assert_eq!(
265            CloudLocation::new(PlRefPath::new("s3://a/b"), true).unwrap(),
266            CloudLocation {
267                scheme: "s3",
268                bucket: "a".into(),
269                prefix: "b".into(),
270                expansion: None,
271            }
272        );
273        assert_eq!(
274            CloudLocation::new(PlRefPath::new("s3://a/b/*.c"), true).unwrap(),
275            CloudLocation {
276                scheme: "s3",
277                bucket: "a".into(),
278                prefix: "b/".into(),
279                expansion: Some("^[^/]*\\.c$".into()),
280            }
281        );
282        assert_eq!(
283            CloudLocation::new(PlRefPath::new("file:///a/b"), true).unwrap(),
284            CloudLocation {
285                scheme: "file",
286                bucket: "".into(),
287                prefix: "/a/b".into(),
288                expansion: None,
289            }
290        );
291        assert_eq!(
292            CloudLocation::new(PlRefPath::new("file:/a/b"), true).unwrap(),
293            CloudLocation {
294                scheme: "file",
295                bucket: "".into(),
296                prefix: "/a/b".into(),
297                expansion: None,
298            }
299        );
300    }
301
302    #[test]
303    fn test_extract_prefix_expansion() {
304        assert!(extract_prefix_expansion("**url").is_err());
305        assert_eq!(
306            extract_prefix_expansion("a/b.c").unwrap(),
307            ("a/b.c".into(), None)
308        );
309        assert_eq!(
310            extract_prefix_expansion("a/**").unwrap(),
311            ("a/".into(), Some("^(.*/)?$".into()))
312        );
313        assert_eq!(
314            extract_prefix_expansion("a/**/b").unwrap(),
315            ("a/".into(), Some("^(.*/)?b$".into()))
316        );
317        assert_eq!(
318            extract_prefix_expansion("a/**/*b").unwrap(),
319            ("a/".into(), Some("^(.*/)?[^/]*b$".into()))
320        );
321        assert_eq!(
322            extract_prefix_expansion("a/**/data/*b").unwrap(),
323            ("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
324        );
325        assert_eq!(
326            extract_prefix_expansion("a/*b").unwrap(),
327            ("a/".into(), Some("^[^/]*b$".into()))
328        );
329    }
330
331    #[test]
332    fn test_matcher_file_name() {
333        let cloud_location =
334            CloudLocation::new(PlRefPath::new("s3://bucket/folder/*.parquet"), true).unwrap();
335        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
336        // Regular match.
337        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
338        // Require . in the file name.
339        assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
340        // Intermediary folders are not allowed.
341        assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
342    }
343
344    #[test]
345    fn test_matcher_folders() {
346        let cloud_location =
347            CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/*.parquet"), true).unwrap();
348
349        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
350        // Intermediary folders are optional.
351        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
352        // Intermediary folders are allowed.
353        assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
354
355        let cloud_location =
356            CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/data/*.parquet"), true)
357                .unwrap();
358        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
359
360        // Required folder `data` is missing.
361        assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
362        // Required folder is present.
363        assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
364        // Required folder is present and additional folders are allowed.
365        assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
366    }
367
368    #[test]
369    fn test_cloud_location_no_glob() {
370        let cloud_location = CloudLocation::new(PlRefPath::new("s3://bucket/[*"), false).unwrap();
371        assert_eq!(
372            cloud_location,
373            CloudLocation {
374                scheme: "s3",
375                bucket: "bucket".into(),
376                prefix: "[*".into(),
377                expansion: None,
378            },
379        )
380    }
381
382    #[test]
383    fn test_cloud_location_percentages() {
384        use super::CloudLocation;
385
386        let path = "s3://bucket/%25";
387        let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
388
389        assert_eq!(
390            cloud_location,
391            CloudLocation {
392                scheme: "s3",
393                bucket: "bucket".into(),
394                prefix: "%25".into(),
395                expansion: None,
396            }
397        );
398
399        let path = "https://pola.rs/%25";
400        let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
401
402        assert_eq!(
403            cloud_location,
404            CloudLocation {
405                scheme: "https",
406                bucket: "".into(),
407                prefix: "".into(),
408                expansion: None,
409            }
410        );
411    }
412
413    #[test]
414    fn test_glob_wildcard_21736() {
415        let path = "s3://bucket/folder/**/data.parquet";
416        let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
417
418        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
419
420        assert!(!a.is_matching("folder/_data.parquet"));
421
422        assert!(a.is_matching("folder/data.parquet"));
423        assert!(a.is_matching("folder/abc/data.parquet"));
424        assert!(a.is_matching("folder/abc/def/data.parquet"));
425
426        let path = "s3://bucket/folder/data_*.parquet";
427        let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
428
429        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
430
431        assert!(!a.is_matching("folder/data_1.ipc"))
432    }
433}