polars_io/cloud/
glob.rs

1use std::borrow::Cow;
2
3use futures::TryStreamExt;
4use object_store::path::Path;
5use polars_core::error::to_compute_err;
6use polars_error::{PolarsResult, polars_bail};
7use polars_utils::format_pl_smallstr;
8use polars_utils::pl_str::PlSmallStr;
9use regex::Regex;
10use url::Url;
11
12use super::{CloudOptions, parse_url};
13
14const DELIMITER: char = '/';
15
16/// Converts a glob to regex form.
17///
18/// # Returns
19/// 1. the prefix part (all path components until the first one with '*')
20/// 2. a regular expression representation of the rest.
21pub(crate) fn extract_prefix_expansion(url: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
22    let url = url.strip_prefix('/').unwrap_or(url);
23    // (offset, len, replacement)
24    let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
25
26    // The position after the last slash before glob characters begin.
27    // `a/b/c*/`
28    //      ^
29    let mut pos: usize = if let Some(after_last_slash) = memchr::memchr2(b'*', b'[', url.as_bytes())
30        .map(|i| {
31            url.as_bytes()[..i]
32                .iter()
33                .rposition(|x| *x == b'/')
34                .map_or(0, |x| 1 + x)
35        }) {
36        // First value is used as the starting point later.
37        replacements.push((after_last_slash, 0, &[]));
38        after_last_slash
39    } else {
40        usize::MAX
41    };
42
43    while pos < url.len() {
44        match memchr::memchr2(b'*', b'.', &url.as_bytes()[pos..]) {
45            None => break,
46            Some(i) => pos += i,
47        }
48
49        let (len, replace): (usize, &[u8]) = match &url[pos..] {
50            // Accept:
51            // - `**/`
52            // - `**` only if it is the end of the url
53            v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
54                // Wrapping in a capture group ensures we also match non-nested paths.
55                (3, b"(.*/)?" as _)
56            },
57            v if v.starts_with("**") => {
58                polars_bail!(ComputeError: "invalid ** glob pattern")
59            },
60            v if v.starts_with('*') => (1, b"[^/]*" as _),
61            // Dots need to be escaped in regex.
62            v if v.starts_with('.') => (1, b"\\." as _),
63            _ => {
64                pos += 1;
65                continue;
66            },
67        };
68
69        replacements.push((pos, len, replace));
70        pos += len;
71    }
72
73    if replacements.is_empty() {
74        return Ok((Cow::Borrowed(url), None));
75    }
76
77    let prefix = Cow::Borrowed(&url[..replacements[0].0]);
78
79    let mut pos = replacements[0].0;
80    let mut expansion = Vec::with_capacity(url.len() - pos);
81    expansion.push(b'^');
82
83    for (offset, len, replace) in replacements {
84        expansion.extend_from_slice(&url.as_bytes()[pos..offset]);
85        expansion.extend_from_slice(replace);
86        pos = offset + len;
87    }
88
89    if pos < url.len() {
90        expansion.extend_from_slice(&url.as_bytes()[pos..]);
91    }
92
93    expansion.push(b'$');
94
95    Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
96}
97
98/// A location on cloud storage, may have wildcards.
99#[derive(PartialEq, Debug, Default)]
100pub struct CloudLocation {
101    /// The scheme (s3, ...).
102    pub scheme: PlSmallStr,
103    /// The bucket name.
104    pub bucket: PlSmallStr,
105    /// The prefix inside the bucket, this will be the full key when wildcards are not used.
106    pub prefix: String,
107    /// The path components that need to be expanded.
108    pub expansion: Option<PlSmallStr>,
109}
110
111impl CloudLocation {
112    pub fn from_url(parsed: &Url, glob: bool) -> PolarsResult<CloudLocation> {
113        let is_local = parsed.scheme() == "file";
114        let (bucket, key) = if is_local {
115            ("".into(), parsed.path())
116        } else {
117            if parsed.scheme().starts_with("http") {
118                return Ok(CloudLocation {
119                    scheme: parsed.scheme().into(),
120                    ..Default::default()
121                });
122            }
123
124            let key = parsed.path();
125
126            let bucket = format_pl_smallstr!(
127                "{}",
128                &parsed[url::Position::BeforeUsername..url::Position::AfterPort]
129            );
130
131            if bucket.is_empty() {
132                polars_bail!(ComputeError: "CloudLocation::from_url(): empty bucket: {}", parsed);
133            }
134
135            (bucket, key)
136        };
137
138        let key = percent_encoding::percent_decode_str(key)
139            .decode_utf8()
140            .map_err(to_compute_err)?;
141        let (prefix, expansion) = if glob {
142            let (prefix, expansion) = extract_prefix_expansion(&key)?;
143            let mut prefix = prefix.into_owned();
144            if is_local && key.starts_with(DELIMITER) && !prefix.starts_with(DELIMITER) {
145                prefix.insert(0, DELIMITER);
146            }
147            (prefix, expansion.map(|x| x.into()))
148        } else {
149            (key.as_ref().into(), None)
150        };
151
152        Ok(CloudLocation {
153            scheme: parsed.scheme().into(),
154            bucket,
155            prefix,
156            expansion,
157        })
158    }
159
160    /// Parse a CloudLocation from an url.
161    pub fn new(url: &str, glob: bool) -> PolarsResult<CloudLocation> {
162        let parsed = parse_url(url).map_err(to_compute_err)?;
163        Self::from_url(&parsed, glob)
164    }
165}
166
167/// Return a full url from a key relative to the given location.
168fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
169    format!("{scheme}://{bucket}/{key}")
170}
171
172/// A simple matcher, if more is required consider depending on https://crates.io/crates/globset.
173/// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`.
174pub(crate) struct Matcher {
175    prefix: String,
176    re: Option<Regex>,
177}
178
179impl Matcher {
180    /// Build a Matcher for the given prefix and expansion.
181    pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
182        // Cloud APIs accept a prefix without any expansion, extract it.
183        let re = expansion
184            .map(polars_utils::regex_cache::compile_regex)
185            .transpose()?;
186        Ok(Matcher { prefix, re })
187    }
188
189    pub(crate) fn is_matching(&self, key: &str) -> bool {
190        if !key.starts_with(self.prefix.as_str()) {
191            // Prefix does not match, should not happen.
192            return false;
193        }
194        if self.re.is_none() {
195            return true;
196        }
197        let last = &key[self.prefix.len()..];
198        self.re.as_ref().unwrap().is_match(last.as_ref())
199    }
200}
201
202/// List files with a prefix derived from the pattern.
203pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
204    // Find the fixed prefix, up to the first '*'.
205
206    let (
207        CloudLocation {
208            scheme,
209            bucket,
210            prefix,
211            expansion,
212        },
213        store,
214    ) = super::build_object_store(url, cloud_options, true).await?;
215    let matcher = &Matcher::new(
216        if scheme == "file" {
217            // For local paths the returned location has the leading slash stripped.
218            prefix[1..].into()
219        } else {
220            prefix.clone()
221        },
222        expansion.as_deref(),
223    )?;
224
225    let path = Path::from(prefix.as_str());
226    let path = Some(&path);
227
228    let mut locations = store
229        .try_exec_rebuild_on_err(|store| {
230            let st = store.clone();
231
232            async {
233                let store = st;
234                store
235                    .list(path)
236                    .try_filter_map(|x| async move {
237                        let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
238                            .then_some(x.location);
239                        Ok(out)
240                    })
241                    .try_collect::<Vec<_>>()
242                    .await
243                    .map_err(to_compute_err)
244            }
245        })
246        .await?;
247
248    locations.sort_unstable();
249    Ok(locations
250        .into_iter()
251        .map(|l| full_url(&scheme, &bucket, l))
252        .collect::<Vec<_>>())
253}
254
255#[cfg(test)]
256mod test {
257    use super::*;
258
259    #[test]
260    fn test_cloud_location() {
261        assert_eq!(
262            CloudLocation::new("s3://a/b", true).unwrap(),
263            CloudLocation {
264                scheme: "s3".into(),
265                bucket: "a".into(),
266                prefix: "b".into(),
267                expansion: None,
268            }
269        );
270        assert_eq!(
271            CloudLocation::new("s3://a/b/*.c", true).unwrap(),
272            CloudLocation {
273                scheme: "s3".into(),
274                bucket: "a".into(),
275                prefix: "b/".into(),
276                expansion: Some("^[^/]*\\.c$".into()),
277            }
278        );
279        assert_eq!(
280            CloudLocation::new("file:///a/b", true).unwrap(),
281            CloudLocation {
282                scheme: "file".into(),
283                bucket: "".into(),
284                prefix: "/a/b".into(),
285                expansion: None,
286            }
287        );
288    }
289
290    #[test]
291    fn test_extract_prefix_expansion() {
292        assert!(extract_prefix_expansion("**url").is_err());
293        assert_eq!(
294            extract_prefix_expansion("a/b.c").unwrap(),
295            ("a/b.c".into(), None)
296        );
297        assert_eq!(
298            extract_prefix_expansion("a/**").unwrap(),
299            ("a/".into(), Some("^(.*/)?$".into()))
300        );
301        assert_eq!(
302            extract_prefix_expansion("a/**/b").unwrap(),
303            ("a/".into(), Some("^(.*/)?b$".into()))
304        );
305        assert_eq!(
306            extract_prefix_expansion("a/**/*b").unwrap(),
307            ("a/".into(), Some("^(.*/)?[^/]*b$".into()))
308        );
309        assert_eq!(
310            extract_prefix_expansion("a/**/data/*b").unwrap(),
311            ("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
312        );
313        assert_eq!(
314            extract_prefix_expansion("a/*b").unwrap(),
315            ("a/".into(), Some("^[^/]*b$".into()))
316        );
317    }
318
319    #[test]
320    fn test_matcher_file_name() {
321        let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet", true).unwrap();
322        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
323        // Regular match.
324        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
325        // Require . in the file name.
326        assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
327        // Intermediary folders are not allowed.
328        assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
329    }
330
331    #[test]
332    fn test_matcher_folders() {
333        let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet", true).unwrap();
334
335        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
336        // Intermediary folders are optional.
337        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
338        // Intermediary folders are allowed.
339        assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
340
341        let cloud_location =
342            CloudLocation::new("s3://bucket/folder/**/data/*.parquet", true).unwrap();
343        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
344
345        // Required folder `data` is missing.
346        assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
347        // Required folder is present.
348        assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
349        // Required folder is present and additional folders are allowed.
350        assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
351    }
352
353    #[test]
354    fn test_cloud_location_no_glob() {
355        let cloud_location = CloudLocation::new("s3://bucket/[*", false).unwrap();
356        assert_eq!(
357            cloud_location,
358            CloudLocation {
359                scheme: "s3".into(),
360                bucket: "bucket".into(),
361                prefix: "/[*".into(),
362                expansion: None,
363            },
364        )
365    }
366
367    #[test]
368    fn test_cloud_location_percentages() {
369        use super::CloudLocation;
370
371        let path = "s3://bucket/%25";
372        let cloud_location = CloudLocation::new(path, true).unwrap();
373
374        assert_eq!(
375            cloud_location,
376            CloudLocation {
377                scheme: "s3".into(),
378                bucket: "bucket".into(),
379                prefix: "%25".into(),
380                expansion: None,
381            }
382        );
383
384        let path = "https://pola.rs/%25";
385        let cloud_location = CloudLocation::new(path, true).unwrap();
386
387        assert_eq!(
388            cloud_location,
389            CloudLocation {
390                scheme: "https".into(),
391                bucket: "".into(),
392                prefix: "".into(),
393                expansion: None,
394            }
395        );
396    }
397
398    #[test]
399    fn test_glob_wildcard_21736() {
400        let url = "s3://bucket/folder/**/data.parquet";
401        let cloud_location = CloudLocation::new(url, true).unwrap();
402
403        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
404
405        assert!(!a.is_matching("folder/_data.parquet"));
406
407        assert!(a.is_matching("folder/data.parquet"));
408        assert!(a.is_matching("folder/abc/data.parquet"));
409        assert!(a.is_matching("folder/abc/def/data.parquet"));
410
411        let url = "s3://bucket/folder/data_*.parquet";
412        let cloud_location = CloudLocation::new(url, true).unwrap();
413
414        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
415
416        assert!(!a.is_matching("folder/data_1.ipc"))
417    }
418}