polars_io/cloud/
glob.rs

1use futures::TryStreamExt;
2use object_store::path::Path;
3use polars_core::error::to_compute_err;
4use polars_core::prelude::polars_ensure;
5use polars_error::{PolarsResult, polars_bail};
6use polars_utils::format_pl_smallstr;
7use polars_utils::pl_str::PlSmallStr;
8use regex::Regex;
9use url::Url;
10
11use super::{CloudOptions, parse_url};
12
13const DELIMITER: char = '/';
14
15/// Split the url in
16/// 1. the prefix part (all path components until the first one with '*')
17/// 2. a regular expression representation of the rest.
18pub(crate) fn extract_prefix_expansion(url: &str) -> PolarsResult<(String, Option<String>)> {
19    let splits = url.split(DELIMITER);
20    let mut prefix = String::new();
21    let mut expansion = String::new();
22    let mut last_split_was_wildcard = false;
23    for split in splits {
24        if expansion.is_empty() && memchr::memchr2(b'*', b'[', split.as_bytes()).is_none() {
25            // We are still gathering splits in the prefix.
26            if !prefix.is_empty() {
27                prefix.push(DELIMITER);
28            }
29            prefix.push_str(split);
30            continue;
31        }
32        // We are gathering splits for the expansion.
33        //
34        // Handle '**', we expect them to be by themselves in a split.
35        if split == "**" {
36            last_split_was_wildcard = true;
37            expansion.push_str(".*");
38            continue;
39        }
40        polars_ensure!(
41            !split.contains("**"),
42            ComputeError: "expected '**' by itself in path component, got {}", url
43        );
44        if !last_split_was_wildcard && !expansion.is_empty() {
45            expansion.push(DELIMITER);
46        }
47        // Handle '.' inside a split.
48        if memchr::memchr2(b'.', b'*', split.as_bytes()).is_some() {
49            let processed = split.replace('.', "\\.");
50            expansion.push_str(&processed.replace('*', "([^/]*)"));
51            continue;
52        }
53        last_split_was_wildcard = false;
54        expansion.push_str(split);
55    }
56    // Prefix post-processing: when present, prefix should end with '/' in order to simplify matching.
57    if !prefix.is_empty() && !expansion.is_empty() {
58        prefix.push(DELIMITER);
59    }
60    // Expansion post-processing: when present, expansion should cover the whole input.
61    if !expansion.is_empty() {
62        expansion.insert(0, '^');
63        expansion.push('$');
64    }
65    Ok((
66        prefix,
67        if !expansion.is_empty() {
68            Some(expansion)
69        } else {
70            None
71        },
72    ))
73}
74
75/// A location on cloud storage, may have wildcards.
76#[derive(PartialEq, Debug, Default)]
77pub struct CloudLocation {
78    /// The scheme (s3, ...).
79    pub scheme: PlSmallStr,
80    /// The bucket name.
81    pub bucket: PlSmallStr,
82    /// The prefix inside the bucket, this will be the full key when wildcards are not used.
83    pub prefix: String,
84    /// The path components that need to be expanded.
85    pub expansion: Option<PlSmallStr>,
86}
87
88impl CloudLocation {
89    pub fn from_url(parsed: &Url, glob: bool) -> PolarsResult<CloudLocation> {
90        let is_local = parsed.scheme() == "file";
91        let (bucket, key) = if is_local {
92            ("".into(), parsed.path())
93        } else {
94            if parsed.scheme().starts_with("http") {
95                return Ok(CloudLocation {
96                    scheme: parsed.scheme().into(),
97                    ..Default::default()
98                });
99            }
100
101            let key = parsed.path();
102
103            let bucket = format_pl_smallstr!(
104                "{}",
105                &parsed[url::Position::BeforeUsername..url::Position::AfterPort]
106            );
107
108            if bucket.is_empty() {
109                polars_bail!(ComputeError: "CloudLocation::from_url(): empty bucket: {}", parsed);
110            }
111
112            (bucket, key)
113        };
114
115        let key = percent_encoding::percent_decode_str(key)
116            .decode_utf8()
117            .map_err(to_compute_err)?;
118        let (prefix, expansion) = if glob {
119            let (mut prefix, expansion) = extract_prefix_expansion(&key)?;
120            if is_local && key.starts_with(DELIMITER) {
121                prefix.insert(0, DELIMITER);
122            }
123            (prefix, expansion.map(|x| x.into()))
124        } else {
125            (key.as_ref().into(), None)
126        };
127
128        Ok(CloudLocation {
129            scheme: parsed.scheme().into(),
130            bucket,
131            prefix,
132            expansion,
133        })
134    }
135
136    /// Parse a CloudLocation from an url.
137    pub fn new(url: &str, glob: bool) -> PolarsResult<CloudLocation> {
138        let parsed = parse_url(url).map_err(to_compute_err)?;
139        Self::from_url(&parsed, glob)
140    }
141}
142
143/// Return a full url from a key relative to the given location.
144fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
145    format!("{scheme}://{bucket}/{key}")
146}
147
148/// A simple matcher, if more is required consider depending on https://crates.io/crates/globset.
149/// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`.
150pub(crate) struct Matcher {
151    prefix: String,
152    re: Option<Regex>,
153}
154
155impl Matcher {
156    /// Build a Matcher for the given prefix and expansion.
157    pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
158        // Cloud APIs accept a prefix without any expansion, extract it.
159        let re = expansion
160            .map(polars_utils::regex_cache::compile_regex)
161            .transpose()?;
162        Ok(Matcher { prefix, re })
163    }
164
165    pub(crate) fn is_matching(&self, key: &str) -> bool {
166        if !key.starts_with(self.prefix.as_str()) {
167            // Prefix does not match, should not happen.
168            return false;
169        }
170        if self.re.is_none() {
171            return true;
172        }
173        let last = &key[self.prefix.len()..];
174        self.re.as_ref().unwrap().is_match(last.as_ref())
175    }
176}
177
178/// List files with a prefix derived from the pattern.
179pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
180    // Find the fixed prefix, up to the first '*'.
181
182    let (
183        CloudLocation {
184            scheme,
185            bucket,
186            prefix,
187            expansion,
188        },
189        store,
190    ) = super::build_object_store(url, cloud_options, true).await?;
191    let matcher = &Matcher::new(
192        if scheme == "file" {
193            // For local paths the returned location has the leading slash stripped.
194            prefix[1..].into()
195        } else {
196            prefix.clone()
197        },
198        expansion.as_deref(),
199    )?;
200
201    let path = Path::from(prefix.as_str());
202    let path = Some(&path);
203
204    let mut locations = store
205        .try_exec_rebuild_on_err(|store| {
206            let st = store.clone();
207
208            async {
209                let store = st;
210                store
211                    .list(path)
212                    .try_filter_map(|x| async move {
213                        let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
214                            .then_some(x.location);
215                        Ok(out)
216                    })
217                    .try_collect::<Vec<_>>()
218                    .await
219                    .map_err(to_compute_err)
220            }
221        })
222        .await?;
223
224    locations.sort_unstable();
225    Ok(locations
226        .into_iter()
227        .map(|l| full_url(&scheme, &bucket, l))
228        .collect::<Vec<_>>())
229}
230
231#[cfg(test)]
232mod test {
233    use super::*;
234
235    #[test]
236    fn test_cloud_location() {
237        assert_eq!(
238            CloudLocation::new("s3://a/b", true).unwrap(),
239            CloudLocation {
240                scheme: "s3".into(),
241                bucket: "a".into(),
242                prefix: "b".into(),
243                expansion: None,
244            }
245        );
246        assert_eq!(
247            CloudLocation::new("s3://a/b/*.c", true).unwrap(),
248            CloudLocation {
249                scheme: "s3".into(),
250                bucket: "a".into(),
251                prefix: "b/".into(),
252                expansion: Some("^([^/]*)\\.c$".into()),
253            }
254        );
255        assert_eq!(
256            CloudLocation::new("file:///a/b", true).unwrap(),
257            CloudLocation {
258                scheme: "file".into(),
259                bucket: "".into(),
260                prefix: "/a/b".into(),
261                expansion: None,
262            }
263        );
264    }
265
266    #[test]
267    fn test_extract_prefix_expansion() {
268        assert!(extract_prefix_expansion("**url").is_err());
269        assert_eq!(
270            extract_prefix_expansion("a/b.c").unwrap(),
271            ("a/b.c".into(), None)
272        );
273        assert_eq!(
274            extract_prefix_expansion("a/**").unwrap(),
275            ("a/".into(), Some("^.*$".into()))
276        );
277        assert_eq!(
278            extract_prefix_expansion("a/**/b").unwrap(),
279            ("a/".into(), Some("^.*b$".into()))
280        );
281        assert_eq!(
282            extract_prefix_expansion("a/**/*b").unwrap(),
283            ("a/".into(), Some("^.*([^/]*)b$".into()))
284        );
285        assert_eq!(
286            extract_prefix_expansion("a/**/data/*b").unwrap(),
287            ("a/".into(), Some("^.*data/([^/]*)b$".into()))
288        );
289        assert_eq!(
290            extract_prefix_expansion("a/*b").unwrap(),
291            ("a/".into(), Some("^([^/]*)b$".into()))
292        );
293    }
294
295    #[test]
296    fn test_matcher_file_name() {
297        let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet", true).unwrap();
298        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
299        // Regular match.
300        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
301        // Require . in the file name.
302        assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
303        // Intermediary folders are not allowed.
304        assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
305    }
306
307    #[test]
308    fn test_matcher_folders() {
309        let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet", true).unwrap();
310        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
311        // Intermediary folders are optional.
312        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
313        // Intermediary folders are allowed.
314        assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
315        let cloud_location =
316            CloudLocation::new("s3://bucket/folder/**/data/*.parquet", true).unwrap();
317        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
318        // Required folder `data` is missing.
319        assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
320        // Required folder is present.
321        assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
322        // Required folder is present and additional folders are allowed.
323        assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
324    }
325
326    #[test]
327    fn test_cloud_location_no_glob() {
328        let cloud_location = CloudLocation::new("s3://bucket/[*", false).unwrap();
329        assert_eq!(
330            cloud_location,
331            CloudLocation {
332                scheme: "s3".into(),
333                bucket: "bucket".into(),
334                prefix: "/[*".into(),
335                expansion: None,
336            },
337        )
338    }
339
340    #[test]
341    fn test_cloud_location_percentages() {
342        use super::CloudLocation;
343
344        let path = "s3://bucket/%25";
345        let cloud_location = CloudLocation::new(path, true).unwrap();
346
347        assert_eq!(
348            cloud_location,
349            CloudLocation {
350                scheme: "s3".into(),
351                bucket: "bucket".into(),
352                prefix: "%25".into(),
353                expansion: None,
354            }
355        );
356
357        let path = "https://pola.rs/%25";
358        let cloud_location = CloudLocation::new(path, true).unwrap();
359
360        assert_eq!(
361            cloud_location,
362            CloudLocation {
363                scheme: "https".into(),
364                bucket: "".into(),
365                prefix: "".into(),
366                expansion: None,
367            }
368        );
369    }
370}