1use std::borrow::Cow;
2
3use futures::TryStreamExt;
4use object_store::path::Path;
5use polars_core::error::to_compute_err;
6use polars_error::{PolarsResult, polars_bail};
7use polars_utils::format_pl_smallstr;
8use polars_utils::pl_str::PlSmallStr;
9use regex::Regex;
10use url::Url;
11
12use super::{CloudOptions, parse_url};
13
14const DELIMITER: char = '/';
15
16pub(crate) fn extract_prefix_expansion(url: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
22 let url = url.strip_prefix('/').unwrap_or(url);
23 let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
25
26 let mut pos: usize = if let Some(after_last_slash) = memchr::memchr2(b'*', b'[', url.as_bytes())
30 .map(|i| {
31 url.as_bytes()[..i]
32 .iter()
33 .rposition(|x| *x == b'/')
34 .map_or(0, |x| 1 + x)
35 }) {
36 replacements.push((after_last_slash, 0, &[]));
38 after_last_slash
39 } else {
40 usize::MAX
41 };
42
43 while pos < url.len() {
44 match memchr::memchr2(b'*', b'.', &url.as_bytes()[pos..]) {
45 None => break,
46 Some(i) => pos += i,
47 }
48
49 let (len, replace): (usize, &[u8]) = match &url[pos..] {
50 v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
54 (3, b"(.*/)?" as _)
56 },
57 v if v.starts_with("**") => {
58 polars_bail!(ComputeError: "invalid ** glob pattern")
59 },
60 v if v.starts_with('*') => (1, b"[^/]*" as _),
61 v if v.starts_with('.') => (1, b"\\." as _),
63 _ => {
64 pos += 1;
65 continue;
66 },
67 };
68
69 replacements.push((pos, len, replace));
70 pos += len;
71 }
72
73 if replacements.is_empty() {
74 return Ok((Cow::Borrowed(url), None));
75 }
76
77 let prefix = Cow::Borrowed(&url[..replacements[0].0]);
78
79 let mut pos = replacements[0].0;
80 let mut expansion = Vec::with_capacity(url.len() - pos);
81 expansion.push(b'^');
82
83 for (offset, len, replace) in replacements {
84 expansion.extend_from_slice(&url.as_bytes()[pos..offset]);
85 expansion.extend_from_slice(replace);
86 pos = offset + len;
87 }
88
89 if pos < url.len() {
90 expansion.extend_from_slice(&url.as_bytes()[pos..]);
91 }
92
93 expansion.push(b'$');
94
95 Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
96}
97
98#[derive(PartialEq, Debug, Default)]
100pub struct CloudLocation {
101 pub scheme: PlSmallStr,
103 pub bucket: PlSmallStr,
105 pub prefix: String,
107 pub expansion: Option<PlSmallStr>,
109}
110
111impl CloudLocation {
112 pub fn from_url(parsed: &Url, glob: bool) -> PolarsResult<CloudLocation> {
113 let is_local = parsed.scheme() == "file";
114 let (bucket, key) = if is_local {
115 ("".into(), parsed.path())
116 } else {
117 if parsed.scheme().starts_with("http") {
118 return Ok(CloudLocation {
119 scheme: parsed.scheme().into(),
120 ..Default::default()
121 });
122 }
123
124 let key = parsed.path();
125
126 let bucket = format_pl_smallstr!(
127 "{}",
128 &parsed[url::Position::BeforeUsername..url::Position::AfterPort]
129 );
130
131 if bucket.is_empty() {
132 polars_bail!(ComputeError: "CloudLocation::from_url(): empty bucket: {}", parsed);
133 }
134
135 (bucket, key)
136 };
137
138 let key = percent_encoding::percent_decode_str(key)
139 .decode_utf8()
140 .map_err(to_compute_err)?;
141 let (prefix, expansion) = if glob {
142 let (prefix, expansion) = extract_prefix_expansion(&key)?;
143 let mut prefix = prefix.into_owned();
144 if is_local && key.starts_with(DELIMITER) && !prefix.starts_with(DELIMITER) {
145 prefix.insert(0, DELIMITER);
146 }
147 (prefix, expansion.map(|x| x.into()))
148 } else {
149 (key.as_ref().into(), None)
150 };
151
152 Ok(CloudLocation {
153 scheme: parsed.scheme().into(),
154 bucket,
155 prefix,
156 expansion,
157 })
158 }
159
160 pub fn new(url: &str, glob: bool) -> PolarsResult<CloudLocation> {
162 let parsed = parse_url(url).map_err(to_compute_err)?;
163 Self::from_url(&parsed, glob)
164 }
165}
166
167fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
169 format!("{scheme}://{bucket}/{key}")
170}
171
172pub(crate) struct Matcher {
175 prefix: String,
176 re: Option<Regex>,
177}
178
179impl Matcher {
180 pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
182 let re = expansion
184 .map(polars_utils::regex_cache::compile_regex)
185 .transpose()?;
186 Ok(Matcher { prefix, re })
187 }
188
189 pub(crate) fn is_matching(&self, key: &str) -> bool {
190 if !key.starts_with(self.prefix.as_str()) {
191 return false;
193 }
194 if self.re.is_none() {
195 return true;
196 }
197 let last = &key[self.prefix.len()..];
198 self.re.as_ref().unwrap().is_match(last.as_ref())
199 }
200}
201
202pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
204 let (
207 CloudLocation {
208 scheme,
209 bucket,
210 prefix,
211 expansion,
212 },
213 store,
214 ) = super::build_object_store(url, cloud_options, true).await?;
215 let matcher = &Matcher::new(
216 if scheme == "file" {
217 prefix[1..].into()
219 } else {
220 prefix.clone()
221 },
222 expansion.as_deref(),
223 )?;
224
225 let path = Path::from(prefix.as_str());
226 let path = Some(&path);
227
228 let mut locations = store
229 .try_exec_rebuild_on_err(|store| {
230 let st = store.clone();
231
232 async {
233 let store = st;
234 store
235 .list(path)
236 .try_filter_map(|x| async move {
237 let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
238 .then_some(x.location);
239 Ok(out)
240 })
241 .try_collect::<Vec<_>>()
242 .await
243 .map_err(to_compute_err)
244 }
245 })
246 .await?;
247
248 locations.sort_unstable();
249 Ok(locations
250 .into_iter()
251 .map(|l| full_url(&scheme, &bucket, l))
252 .collect::<Vec<_>>())
253}
254
255#[cfg(test)]
256mod test {
257 use super::*;
258
259 #[test]
260 fn test_cloud_location() {
261 assert_eq!(
262 CloudLocation::new("s3://a/b", true).unwrap(),
263 CloudLocation {
264 scheme: "s3".into(),
265 bucket: "a".into(),
266 prefix: "b".into(),
267 expansion: None,
268 }
269 );
270 assert_eq!(
271 CloudLocation::new("s3://a/b/*.c", true).unwrap(),
272 CloudLocation {
273 scheme: "s3".into(),
274 bucket: "a".into(),
275 prefix: "b/".into(),
276 expansion: Some("^[^/]*\\.c$".into()),
277 }
278 );
279 assert_eq!(
280 CloudLocation::new("file:///a/b", true).unwrap(),
281 CloudLocation {
282 scheme: "file".into(),
283 bucket: "".into(),
284 prefix: "/a/b".into(),
285 expansion: None,
286 }
287 );
288 }
289
290 #[test]
291 fn test_extract_prefix_expansion() {
292 assert!(extract_prefix_expansion("**url").is_err());
293 assert_eq!(
294 extract_prefix_expansion("a/b.c").unwrap(),
295 ("a/b.c".into(), None)
296 );
297 assert_eq!(
298 extract_prefix_expansion("a/**").unwrap(),
299 ("a/".into(), Some("^(.*/)?$".into()))
300 );
301 assert_eq!(
302 extract_prefix_expansion("a/**/b").unwrap(),
303 ("a/".into(), Some("^(.*/)?b$".into()))
304 );
305 assert_eq!(
306 extract_prefix_expansion("a/**/*b").unwrap(),
307 ("a/".into(), Some("^(.*/)?[^/]*b$".into()))
308 );
309 assert_eq!(
310 extract_prefix_expansion("a/**/data/*b").unwrap(),
311 ("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
312 );
313 assert_eq!(
314 extract_prefix_expansion("a/*b").unwrap(),
315 ("a/".into(), Some("^[^/]*b$".into()))
316 );
317 }
318
319 #[test]
320 fn test_matcher_file_name() {
321 let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet", true).unwrap();
322 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
323 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
325 assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
327 assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
329 }
330
331 #[test]
332 fn test_matcher_folders() {
333 let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet", true).unwrap();
334
335 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
336 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
338 assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
340
341 let cloud_location =
342 CloudLocation::new("s3://bucket/folder/**/data/*.parquet", true).unwrap();
343 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
344
345 assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
347 assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
349 assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
351 }
352
353 #[test]
354 fn test_cloud_location_no_glob() {
355 let cloud_location = CloudLocation::new("s3://bucket/[*", false).unwrap();
356 assert_eq!(
357 cloud_location,
358 CloudLocation {
359 scheme: "s3".into(),
360 bucket: "bucket".into(),
361 prefix: "/[*".into(),
362 expansion: None,
363 },
364 )
365 }
366
367 #[test]
368 fn test_cloud_location_percentages() {
369 use super::CloudLocation;
370
371 let path = "s3://bucket/%25";
372 let cloud_location = CloudLocation::new(path, true).unwrap();
373
374 assert_eq!(
375 cloud_location,
376 CloudLocation {
377 scheme: "s3".into(),
378 bucket: "bucket".into(),
379 prefix: "%25".into(),
380 expansion: None,
381 }
382 );
383
384 let path = "https://pola.rs/%25";
385 let cloud_location = CloudLocation::new(path, true).unwrap();
386
387 assert_eq!(
388 cloud_location,
389 CloudLocation {
390 scheme: "https".into(),
391 bucket: "".into(),
392 prefix: "".into(),
393 expansion: None,
394 }
395 );
396 }
397
398 #[test]
399 fn test_glob_wildcard_21736() {
400 let url = "s3://bucket/folder/**/data.parquet";
401 let cloud_location = CloudLocation::new(url, true).unwrap();
402
403 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
404
405 assert!(!a.is_matching("folder/_data.parquet"));
406
407 assert!(a.is_matching("folder/data.parquet"));
408 assert!(a.is_matching("folder/abc/data.parquet"));
409 assert!(a.is_matching("folder/abc/def/data.parquet"));
410
411 let url = "s3://bucket/folder/data_*.parquet";
412 let cloud_location = CloudLocation::new(url, true).unwrap();
413
414 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
415
416 assert!(!a.is_matching("folder/data_1.ipc"))
417 }
418}