polars_io/cloud/
glob.rs

1use std::borrow::Cow;
2
3use futures::TryStreamExt;
4use object_store::path::Path;
5use polars_core::error::to_compute_err;
6use polars_error::{PolarsResult, polars_bail, polars_err};
7use polars_utils::pl_str::PlSmallStr;
8use polars_utils::plpath::{CloudScheme, PlPathRef};
9use regex::Regex;
10
11use super::CloudOptions;
12
13/// Converts a glob to regex form.
14///
15/// # Returns
16/// 1. the prefix part (all path components until the first one with '*')
17/// 2. a regular expression representation of the rest.
18pub(crate) fn extract_prefix_expansion(path: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
19    // (offset, len, replacement)
20    let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
21
22    // The position after the last slash before glob characters begin.
23    // `a/b/c*/`
24    //      ^
25    let mut pos: usize = if let Some(after_last_slash) =
26        memchr::memchr2(b'*', b'[', path.as_bytes()).map(|i| {
27            path.as_bytes()[..i]
28                .iter()
29                .rposition(|x| *x == b'/')
30                .map_or(0, |x| 1 + x)
31        }) {
32        // First value is used as the starting point later.
33        replacements.push((after_last_slash, 0, &[]));
34        after_last_slash
35    } else {
36        usize::MAX
37    };
38
39    while pos < path.len() {
40        match memchr::memchr2(b'*', b'.', &path.as_bytes()[pos..]) {
41            None => break,
42            Some(i) => pos += i,
43        }
44
45        let (len, replace): (usize, &[u8]) = match &path[pos..] {
46            // Accept:
47            // - `**/`
48            // - `**` only if it is the end of the path
49            v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
50                // Wrapping in a capture group ensures we also match non-nested paths.
51                (3, b"(.*/)?" as _)
52            },
53            v if v.starts_with("**") => {
54                polars_bail!(ComputeError: "invalid ** glob pattern")
55            },
56            v if v.starts_with('*') => (1, b"[^/]*" as _),
57            // Dots need to be escaped in regex.
58            v if v.starts_with('.') => (1, b"\\." as _),
59            _ => {
60                pos += 1;
61                continue;
62            },
63        };
64
65        replacements.push((pos, len, replace));
66        pos += len;
67    }
68
69    if replacements.is_empty() {
70        return Ok((Cow::Borrowed(path), None));
71    }
72
73    let prefix = Cow::Borrowed(&path[..replacements[0].0]);
74
75    let mut pos = replacements[0].0;
76    let mut expansion = Vec::with_capacity(path.len() - pos);
77    expansion.push(b'^');
78
79    for (offset, len, replace) in replacements {
80        expansion.extend_from_slice(&path.as_bytes()[pos..offset]);
81        expansion.extend_from_slice(replace);
82        pos = offset + len;
83    }
84
85    if pos < path.len() {
86        expansion.extend_from_slice(&path.as_bytes()[pos..]);
87    }
88
89    expansion.push(b'$');
90
91    Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
92}
93
94/// A location on cloud storage, may have wildcards.
95#[derive(PartialEq, Debug, Default)]
96pub struct CloudLocation {
97    /// The scheme (s3, ...).
98    pub scheme: &'static str,
99    /// The bucket name.
100    pub bucket: PlSmallStr,
101    /// The prefix inside the bucket, this will be the full key when wildcards are not used.
102    pub prefix: String,
103    /// The path components that need to be expanded.
104    pub expansion: Option<PlSmallStr>,
105}
106
107impl CloudLocation {
108    pub fn new(path: PlPathRef<'_>, glob: bool) -> PolarsResult<Self> {
109        if let Some(scheme @ CloudScheme::Http | scheme @ CloudScheme::Https) = path.scheme() {
110            // Http/s does not use this
111            return Ok(CloudLocation {
112                scheme: scheme.as_str(),
113                ..Default::default()
114            });
115        }
116
117        let path_is_local = matches!(
118            path.scheme(),
119            None | Some(CloudScheme::File | CloudScheme::FileNoHostname)
120        );
121
122        let (bucket, key) = path
123            .strip_scheme_split_authority()
124            .ok_or("could not extract bucket/key (path did not contain '/')")
125            .and_then(|x| {
126                let bucket_is_empty = x.0.is_empty();
127
128                if path_is_local {
129                    debug_assert!(bucket_is_empty);
130                }
131
132                if bucket_is_empty && !path_is_local {
133                    Err("empty bucket name")
134                } else {
135                    Ok(x)
136                }
137            })
138            .map_err(|failed_reason| {
139                polars_err!(
140                    ComputeError:
141                    "failed to create CloudLocation: {} (path: '{}')",
142                    failed_reason,
143                    path.to_str(),
144                )
145            })?;
146
147        let key = if path_is_local {
148            key
149        } else {
150            key.strip_prefix('/').unwrap_or(key)
151        };
152
153        let (prefix, expansion) = if glob {
154            let (prefix, expansion) = extract_prefix_expansion(key)?;
155
156            assert_eq!(prefix.starts_with('/'), key.starts_with('/'));
157
158            (prefix, expansion.map(|x| x.into()))
159        } else {
160            (key.into(), None)
161        };
162
163        Ok(CloudLocation {
164            scheme: path.scheme().unwrap_or(CloudScheme::File).as_str(),
165            bucket: PlSmallStr::from_str(bucket),
166            prefix: prefix.into_owned(),
167            expansion,
168        })
169    }
170}
171
172/// Return a full url from a key relative to the given location.
173fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
174    format!("{scheme}://{bucket}/{key}")
175}
176
177/// A simple matcher, if more is required consider depending on https://crates.io/crates/globset.
178/// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`.
179pub(crate) struct Matcher {
180    prefix: String,
181    re: Option<Regex>,
182}
183
184impl Matcher {
185    /// Build a Matcher for the given prefix and expansion.
186    pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
187        // Cloud APIs accept a prefix without any expansion, extract it.
188        let re = expansion
189            .map(polars_utils::regex_cache::compile_regex)
190            .transpose()?;
191        Ok(Matcher { prefix, re })
192    }
193
194    pub(crate) fn is_matching(&self, key: &str) -> bool {
195        if !key.starts_with(self.prefix.as_str()) {
196            // Prefix does not match, should not happen.
197            return false;
198        }
199        if self.re.is_none() {
200            return true;
201        }
202        let last = &key[self.prefix.len()..];
203        self.re.as_ref().unwrap().is_match(last.as_ref())
204    }
205}
206
207/// List files with a prefix derived from the pattern.
208pub async fn glob(
209    url: PlPathRef<'_>,
210    cloud_options: Option<&CloudOptions>,
211) -> PolarsResult<Vec<String>> {
212    // Find the fixed prefix, up to the first '*'.
213
214    let (
215        CloudLocation {
216            scheme,
217            bucket,
218            prefix,
219            expansion,
220        },
221        store,
222    ) = super::build_object_store(url, cloud_options, true).await?;
223    let matcher = &Matcher::new(
224        if scheme == "file" {
225            // For local paths the returned location has the leading slash stripped.
226            prefix[1..].into()
227        } else {
228            prefix.clone()
229        },
230        expansion.as_deref(),
231    )?;
232
233    let path = Path::from(prefix.as_str());
234    let path = Some(&path);
235
236    let mut locations = store
237        .try_exec_rebuild_on_err(|store| {
238            let st = store.clone();
239
240            async {
241                let store = st;
242                store
243                    .list(path)
244                    .try_filter_map(|x| async move {
245                        let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
246                            .then_some(x.location);
247                        Ok(out)
248                    })
249                    .try_collect::<Vec<_>>()
250                    .await
251                    .map_err(to_compute_err)
252            }
253        })
254        .await?;
255
256    locations.sort_unstable();
257    Ok(locations
258        .into_iter()
259        .map(|l| full_url(scheme, &bucket, l))
260        .collect::<Vec<_>>())
261}
262
263#[cfg(test)]
264mod test {
265    use super::*;
266
267    #[test]
268    fn test_cloud_location() {
269        assert_eq!(
270            CloudLocation::new(PlPathRef::new("s3://a/b"), true).unwrap(),
271            CloudLocation {
272                scheme: "s3",
273                bucket: "a".into(),
274                prefix: "b".into(),
275                expansion: None,
276            }
277        );
278        assert_eq!(
279            CloudLocation::new(PlPathRef::new("s3://a/b/*.c"), true).unwrap(),
280            CloudLocation {
281                scheme: "s3",
282                bucket: "a".into(),
283                prefix: "b/".into(),
284                expansion: Some("^[^/]*\\.c$".into()),
285            }
286        );
287        assert_eq!(
288            CloudLocation::new(PlPathRef::new("file:///a/b"), true).unwrap(),
289            CloudLocation {
290                scheme: "file",
291                bucket: "".into(),
292                prefix: "/a/b".into(),
293                expansion: None,
294            }
295        );
296        assert_eq!(
297            CloudLocation::new(PlPathRef::new("file:/a/b"), true).unwrap(),
298            CloudLocation {
299                scheme: "file",
300                bucket: "".into(),
301                prefix: "/a/b".into(),
302                expansion: None,
303            }
304        );
305    }
306
307    #[test]
308    fn test_extract_prefix_expansion() {
309        assert!(extract_prefix_expansion("**url").is_err());
310        assert_eq!(
311            extract_prefix_expansion("a/b.c").unwrap(),
312            ("a/b.c".into(), None)
313        );
314        assert_eq!(
315            extract_prefix_expansion("a/**").unwrap(),
316            ("a/".into(), Some("^(.*/)?$".into()))
317        );
318        assert_eq!(
319            extract_prefix_expansion("a/**/b").unwrap(),
320            ("a/".into(), Some("^(.*/)?b$".into()))
321        );
322        assert_eq!(
323            extract_prefix_expansion("a/**/*b").unwrap(),
324            ("a/".into(), Some("^(.*/)?[^/]*b$".into()))
325        );
326        assert_eq!(
327            extract_prefix_expansion("a/**/data/*b").unwrap(),
328            ("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
329        );
330        assert_eq!(
331            extract_prefix_expansion("a/*b").unwrap(),
332            ("a/".into(), Some("^[^/]*b$".into()))
333        );
334    }
335
336    #[test]
337    fn test_matcher_file_name() {
338        let cloud_location =
339            CloudLocation::new(PlPathRef::new("s3://bucket/folder/*.parquet"), true).unwrap();
340        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
341        // Regular match.
342        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
343        // Require . in the file name.
344        assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
345        // Intermediary folders are not allowed.
346        assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
347    }
348
349    #[test]
350    fn test_matcher_folders() {
351        let cloud_location =
352            CloudLocation::new(PlPathRef::new("s3://bucket/folder/**/*.parquet"), true).unwrap();
353
354        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
355        // Intermediary folders are optional.
356        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
357        // Intermediary folders are allowed.
358        assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
359
360        let cloud_location =
361            CloudLocation::new(PlPathRef::new("s3://bucket/folder/**/data/*.parquet"), true)
362                .unwrap();
363        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
364
365        // Required folder `data` is missing.
366        assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
367        // Required folder is present.
368        assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
369        // Required folder is present and additional folders are allowed.
370        assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
371    }
372
373    #[test]
374    fn test_cloud_location_no_glob() {
375        let cloud_location = CloudLocation::new(PlPathRef::new("s3://bucket/[*"), false).unwrap();
376        assert_eq!(
377            cloud_location,
378            CloudLocation {
379                scheme: "s3",
380                bucket: "bucket".into(),
381                prefix: "[*".into(),
382                expansion: None,
383            },
384        )
385    }
386
387    #[test]
388    fn test_cloud_location_percentages() {
389        use super::CloudLocation;
390
391        let path = "s3://bucket/%25";
392        let cloud_location = CloudLocation::new(PlPathRef::new(path), true).unwrap();
393
394        assert_eq!(
395            cloud_location,
396            CloudLocation {
397                scheme: "s3",
398                bucket: "bucket".into(),
399                prefix: "%25".into(),
400                expansion: None,
401            }
402        );
403
404        let path = "https://pola.rs/%25";
405        let cloud_location = CloudLocation::new(PlPathRef::new(path), true).unwrap();
406
407        assert_eq!(
408            cloud_location,
409            CloudLocation {
410                scheme: "https",
411                bucket: "".into(),
412                prefix: "".into(),
413                expansion: None,
414            }
415        );
416    }
417
418    #[test]
419    fn test_glob_wildcard_21736() {
420        let path = "s3://bucket/folder/**/data.parquet";
421        let cloud_location = CloudLocation::new(PlPathRef::new(path), true).unwrap();
422
423        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
424
425        assert!(!a.is_matching("folder/_data.parquet"));
426
427        assert!(a.is_matching("folder/data.parquet"));
428        assert!(a.is_matching("folder/abc/data.parquet"));
429        assert!(a.is_matching("folder/abc/def/data.parquet"));
430
431        let path = "s3://bucket/folder/data_*.parquet";
432        let cloud_location = CloudLocation::new(PlPathRef::new(path), true).unwrap();
433
434        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
435
436        assert!(!a.is_matching("folder/data_1.ipc"))
437    }
438}