use futures::future::ready;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use polars_core::error::to_compute_err;
use polars_core::prelude::{polars_ensure, polars_err};
use polars_error::{PolarsError, PolarsResult};
use regex::Regex;
use url::Url;
use super::CloudOptions;
const DELIMITER: char = '/';
fn extract_prefix_expansion(url: &str) -> PolarsResult<(String, Option<String>)> {
let splits = url.split(DELIMITER);
let mut prefix = String::new();
let mut expansion = String::new();
let mut last_split_was_wildcard = false;
for split in splits {
let has_star = split.contains('*');
if expansion.is_empty() && !has_star {
if !prefix.is_empty() {
prefix.push(DELIMITER);
}
prefix.push_str(split);
continue;
}
if split == "**" {
last_split_was_wildcard = true;
expansion.push_str(".*");
continue;
}
polars_ensure!(
!split.contains("**"),
ComputeError: "expected '**' by itself in path component, got {}", url
);
if !last_split_was_wildcard && !expansion.is_empty() {
expansion.push(DELIMITER);
}
if split.contains('.') || split.contains('*') {
let processed = split.replace('.', "\\.");
expansion.push_str(&processed.replace('*', "([^/]*)"));
continue;
}
last_split_was_wildcard = false;
expansion.push_str(split);
}
if !prefix.is_empty() && !expansion.is_empty() {
prefix.push(DELIMITER);
}
if !expansion.is_empty() {
expansion.insert(0, '^');
expansion.push('$');
}
Ok((
prefix,
if !expansion.is_empty() {
Some(expansion)
} else {
None
},
))
}
#[derive(PartialEq, Debug, Default)]
pub struct CloudLocation {
pub scheme: String,
pub bucket: String,
pub prefix: String,
pub expansion: Option<String>,
}
impl CloudLocation {
pub fn from_url(parsed: &Url) -> PolarsResult<CloudLocation> {
let is_local = parsed.scheme() == "file";
let (bucket, key) = if is_local {
("".into(), parsed.path())
} else {
if parsed.scheme().starts_with("http") {
return Ok(CloudLocation {
scheme: parsed.scheme().into(),
..Default::default()
});
}
let key = parsed.path();
let bucket = parsed
.host()
.ok_or_else(
|| polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed),
)?
.to_string();
(bucket, key)
};
let key = percent_encoding::percent_decode_str(key)
.decode_utf8()
.map_err(to_compute_err)?;
let (mut prefix, expansion) = extract_prefix_expansion(&key)?;
if is_local && key.starts_with(DELIMITER) {
prefix.insert(0, DELIMITER);
}
Ok(CloudLocation {
scheme: parsed.scheme().into(),
bucket,
prefix,
expansion,
})
}
pub fn new(url: &str) -> PolarsResult<CloudLocation> {
let parsed = Url::parse(url).map_err(to_compute_err)?;
Self::from_url(&parsed)
}
}
fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
format!("{scheme}://{bucket}/{key}")
}
struct Matcher {
prefix: String,
re: Option<Regex>,
}
impl Matcher {
fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
let re = expansion.map(Regex::new).transpose()?;
Ok(Matcher { prefix, re })
}
fn is_matching(&self, key: &Path) -> bool {
let key: &str = key.as_ref();
if !key.starts_with(&self.prefix) {
return false;
}
if self.re.is_none() {
return true;
}
let last = &key[self.prefix.len()..];
return self.re.as_ref().unwrap().is_match(last.as_ref());
}
}
#[tokio::main(flavor = "current_thread")]
pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
let (
CloudLocation {
scheme,
bucket,
prefix,
expansion,
},
store,
) = super::build_object_store(url, cloud_options).await?;
let matcher = Matcher::new(prefix.clone(), expansion.as_deref())?;
let list_stream = store
.list(Some(&Path::from(prefix)))
.map_err(to_compute_err);
let locations: Vec<Path> = list_stream
.then(|entry| async { Ok::<_, PolarsError>(entry.map_err(to_compute_err)?.location) })
.filter(|name| ready(name.as_ref().map_or(true, |name| matcher.is_matching(name))))
.try_collect()
.await?;
Ok(locations
.into_iter()
.map(|l| full_url(&scheme, &bucket, l))
.collect::<Vec<_>>())
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_cloud_location() {
assert_eq!(
CloudLocation::new("s3://a/b").unwrap(),
CloudLocation {
scheme: "s3".into(),
bucket: "a".into(),
prefix: "b".into(),
expansion: None,
}
);
assert_eq!(
CloudLocation::new("s3://a/b/*.c").unwrap(),
CloudLocation {
scheme: "s3".into(),
bucket: "a".into(),
prefix: "b/".into(),
expansion: Some("^([^/]*)\\.c$".into()),
}
);
assert_eq!(
CloudLocation::new("file:///a/b").unwrap(),
CloudLocation {
scheme: "file".into(),
bucket: "".into(),
prefix: "/a/b".into(),
expansion: None,
}
);
}
#[test]
fn test_extract_prefix_expansion() {
assert!(extract_prefix_expansion("**url").is_err());
assert_eq!(
extract_prefix_expansion("a/b.c").unwrap(),
("a/b.c".into(), None)
);
assert_eq!(
extract_prefix_expansion("a/**").unwrap(),
("a/".into(), Some("^.*$".into()))
);
assert_eq!(
extract_prefix_expansion("a/**/b").unwrap(),
("a/".into(), Some("^.*b$".into()))
);
assert_eq!(
extract_prefix_expansion("a/**/*b").unwrap(),
("a/".into(), Some("^.*([^/]*)b$".into()))
);
assert_eq!(
extract_prefix_expansion("a/**/data/*b").unwrap(),
("a/".into(), Some("^.*data/([^/]*)b$".into()))
);
assert_eq!(
extract_prefix_expansion("a/*b").unwrap(),
("a/".into(), Some("^([^/]*)b$".into()))
);
}
#[test]
fn test_matcher_file_name() {
let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet").unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
assert!(a.is_matching(&Path::from("folder/1.parquet")));
assert!(!a.is_matching(&Path::from("folder/1parquet")));
assert!(!a.is_matching(&Path::from("folder/other/1.parquet")));
}
#[test]
fn test_matcher_folders() {
let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet").unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
assert!(a.is_matching(&Path::from("folder/1.parquet")));
assert!(a.is_matching(&Path::from("folder/other/1.parquet")));
let cloud_location = CloudLocation::new("s3://bucket/folder/**/data/*.parquet").unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
assert!(!a.is_matching(&Path::from("folder/1.parquet")));
assert!(a.is_matching(&Path::from("folder/data/1.parquet")));
assert!(a.is_matching(&Path::from("folder/other/data/1.parquet")));
}
}