1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::path::{Path, PathBuf};
4use std::sync::LazyLock;
5
6use polars_buffer::Buffer;
7use polars_core::config;
8use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
9use polars_core::runtime::ASYNC;
10use polars_utils::pl_path::{CloudScheme, PlRefPath};
11use polars_utils::pl_str::PlSmallStr;
12
13#[cfg(feature = "cloud")]
14mod hugging_face;
15
16use crate::cloud::CloudOptions;
17
18#[allow(clippy::bind_instead_of_map)]
19pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
20 (|| {
21 let verbose = config::verbose();
22
23 let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
24 if verbose {
25 eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
26 }
27 v
28 } else if cfg!(target_family = "unix") {
29 let id = std::env::var("USER")
30 .inspect(|_| {
31 if verbose {
32 eprintln!("init_temp_dir: sourced $USER")
33 }
34 })
35 .or_else(|_e| {
36 #[cfg(feature = "file_cache")]
39 {
40 std::env::var("HOME")
41 .inspect(|_| {
42 if verbose {
43 eprintln!("init_temp_dir: sourced $HOME")
44 }
45 })
46 .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
47 }
48 #[cfg(not(feature = "file_cache"))]
49 {
50 Err(_e)
51 }
52 });
53
54 if let Ok(v) = id {
55 std::env::temp_dir().join(format!("polars-{v}/"))
56 } else {
57 return Err(std::io::Error::other(
58 "could not load $USER or $HOME environment variables",
59 ));
60 }
61 } else if cfg!(target_family = "windows") {
62 std::env::temp_dir().join("polars/")
66 } else {
67 std::env::temp_dir().join("polars/")
68 }
69 .into_boxed_path();
70
71 let perm_result = create_dir_owner_only(path.as_ref());
72
73 if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
74 perm_result?;
75 }
76
77 std::io::Result::Ok(path)
78 })()
79 .map_err(|e| {
80 std::io::Error::new(
81 e.kind(),
82 format!(
83 "error initializing temporary directory: {e} \
84 consider explicitly setting POLARS_TEMP_DIR"
85 ),
86 )
87 })
88 .unwrap()
89});
90
91pub fn create_dir_owner_only(path: &Path) -> std::io::Result<()> {
93 std::fs::create_dir_all(path)?;
94
95 #[cfg(target_family = "unix")]
96 {
97 use std::os::unix::fs::PermissionsExt;
98
99 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o700))?;
100 let perms = std::fs::metadata(path)?.permissions();
101
102 if (perms.mode() % 0o1000) != 0o700 {
103 return Err(std::io::Error::other(format!(
104 "error setting directory permissions: permission mismatch: {perms:?} (path = {path:?})"
105 )));
106 }
107 }
108
109 Ok(())
110}
111
112pub fn resolve_homedir<'a, S: AsRef<Path> + ?Sized>(path: &'a S) -> Cow<'a, Path> {
114 return inner(path.as_ref());
115
116 fn inner(path: &Path) -> Cow<'_, Path> {
117 if path.starts_with("~") {
118 #[cfg(not(target_family = "wasm"))]
120 if let Some(homedir) = home::home_dir() {
121 return Cow::Owned(homedir.join(path.strip_prefix("~").unwrap()));
122 }
123 }
124
125 Cow::Borrowed(path)
126 }
127}
128
129fn has_glob(path: &[u8]) -> bool {
130 return get_glob_start_idx(path).is_some();
131
132 fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
134 memchr::memchr3(b'*', b'?', b'[', path)
135 }
136}
137
138fn is_file_uri_with_escape(path: &PlRefPath) -> bool {
140 path.scheme().is_some_and(|s| s.is_file()) && path.strip_scheme().contains('%')
141}
142
143pub fn decode_file_uri_paths(paths: &[PlRefPath], glob: bool) -> Cow<'_, [PlRefPath]> {
158 if !paths.iter().any(is_file_uri_with_escape) {
160 return Cow::Borrowed(paths);
161 }
162
163 Cow::Owned(
164 paths
165 .iter()
166 .map(|path| {
167 if is_file_uri_with_escape(path)
168 && let Some(decoded) = decode_file_uri_path(path.strip_scheme(), glob)
169 {
170 PlRefPath::new(decoded)
171 } else {
172 path.clone()
175 }
176 })
177 .collect(),
178 )
179}
180
181fn decode_file_uri_path(path: &str, glob: bool) -> Option<String> {
185 let decoded = percent_encoding::percent_decode_str(path)
186 .decode_utf8()
187 .ok()?;
188 let path = strip_windows_drive_slash(&decoded);
189 Some(if glob {
190 glob::Pattern::escape(path)
191 } else {
192 path.to_owned()
193 })
194}
195
196fn strip_windows_drive_slash(path: &str) -> &str {
200 #[cfg(target_family = "windows")]
201 {
202 let b = path.as_bytes();
203 if b.len() >= 3 && b[0] == b'/' && b[1].is_ascii_alphabetic() && b[2] == b':' {
204 return &path[1..];
205 }
206 }
207 path
208}
209
210pub fn expanded_from_single_directory(paths: &[PlRefPath], expanded_paths: &[PlRefPath]) -> bool {
212 paths.len() == 1 && !has_glob(paths[0].strip_scheme().as_bytes())
214 && {
216 (
217 !paths[0].has_scheme() && paths[0].as_std_path().is_dir()
219 )
220 || (
221 expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
224 )
225 }
226}
227
228pub async fn expand_paths(
230 paths: &[PlRefPath],
231 glob: bool,
232 hidden_file_prefix: &[PlSmallStr],
233 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
234) -> PolarsResult<Buffer<PlRefPath>> {
235 expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false)
236 .await
237 .map(|x| x.0)
238}
239
240struct HiveIdxTracker<'a> {
241 idx: usize,
242 paths: &'a [PlRefPath],
243 check_directory_level: bool,
244}
245
246impl HiveIdxTracker<'_> {
247 fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
248 let check_directory_level = self.check_directory_level;
249 let paths = self.paths;
250
251 if check_directory_level
252 && ![usize::MAX, i].contains(&self.idx)
253 && (path_idx > 0 && paths[path_idx].parent() != paths[path_idx - 1].parent())
255 {
256 polars_bail!(
257 InvalidOperation:
258 "attempted to read from different directory levels with hive partitioning enabled: \
259 first path: {}, second path: {}",
260 &paths[path_idx - 1],
261 &paths[path_idx],
262 )
263 } else {
264 self.idx = std::cmp::min(self.idx, i);
265 Ok(())
266 }
267 }
268}
269
270#[cfg(feature = "cloud")]
271async fn expand_path_cloud(
272 path: PlRefPath,
273 cloud_options: Option<&CloudOptions>,
274 glob: bool,
275 first_path_has_scheme: bool,
276) -> PolarsResult<(usize, Vec<PlRefPath>)> {
277 let format_path = |scheme: &str, bucket: &str, location: &str| {
278 if first_path_has_scheme {
279 format!("{scheme}://{bucket}/{location}")
280 } else {
281 format!("/{location}")
282 }
283 };
284
285 use polars_utils::_limit_path_len_io_err;
286
287 use crate::cloud::object_path_from_str;
288 let path_str = path.as_str();
289
290 let (cloud_location, store) =
291 crate::cloud::build_object_store(path.clone(), cloud_options, glob).await?;
292 let prefix = object_path_from_str(&cloud_location.prefix)?;
293
294 let out = if !path_str.ends_with("/") && (!glob || cloud_location.expansion.is_none()) && {
295 path.has_scheme() || path.as_std_path().is_file()
300 } {
301 (
302 0,
303 vec![PlRefPath::new(format_path(
304 cloud_location.scheme,
305 &cloud_location.bucket,
306 prefix.as_ref(),
307 ))],
308 )
309 } else {
310 use futures::TryStreamExt;
311
312 if !path.has_scheme() {
313 path.as_std_path()
318 .metadata()
319 .map_err(|err| _limit_path_len_io_err(path.as_std_path(), err))?;
320 }
321
322 let cloud_location = &cloud_location;
323 let prefix_ref = &prefix;
324
325 let mut paths = store
326 .exec_with_rebuild_retry_on_err(|s| async move {
327 let out = s
328 .list(Some(prefix_ref))
329 .try_filter_map(|x| async move {
330 let out = (x.size > 0).then(|| {
331 PlRefPath::new({
332 format_path(
333 cloud_location.scheme,
334 &cloud_location.bucket,
335 x.location.as_ref(),
336 )
337 })
338 });
339 Ok(out)
340 })
341 .try_collect::<Vec<_>>()
342 .await?;
343
344 Ok(out)
345 })
346 .await?;
347
348 let mut prefix = prefix.to_string();
351 if path_str.ends_with('/') && !prefix.ends_with('/') {
352 prefix.push('/')
353 };
354
355 paths.sort_unstable();
356
357 (
358 format_path(
359 cloud_location.scheme,
360 &cloud_location.bucket,
361 prefix.as_ref(),
362 )
363 .len(),
364 paths,
365 )
366 };
367
368 PolarsResult::Ok(out)
369}
370
371pub async fn expand_paths_hive(
375 paths: &[PlRefPath],
376 glob: bool,
377 hidden_file_prefix: &[PlSmallStr],
378 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
379 check_directory_level: bool,
380) -> PolarsResult<(Buffer<PlRefPath>, usize)> {
381 let Some(first_path) = paths.first() else {
382 return Ok((vec![].into(), 0));
383 };
384
385 let first_path_has_scheme = first_path.has_scheme();
386
387 let is_hidden_file = move |path: &PlRefPath| {
388 path.file_name()
389 .and_then(|x| x.to_str())
390 .is_some_and(|file_name| {
391 hidden_file_prefix
392 .iter()
393 .any(|x| file_name.starts_with(x.as_str()))
394 })
395 };
396
397 let mut out_paths = OutPaths {
398 paths: vec![],
399 exts: [None, None],
400 is_hidden_file: &is_hidden_file,
401 };
402
403 let mut hive_idx_tracker = HiveIdxTracker {
404 idx: usize::MAX,
405 paths,
406 check_directory_level,
407 };
408
409 if first_path_has_scheme || {
410 cfg!(not(target_family = "windows")) && polars_config::config().force_async()
411 } {
412 #[cfg(feature = "cloud")]
413 {
414 if first_path.scheme() == Some(CloudScheme::Hf) {
415 let (expand_start_idx, paths) = hugging_face::expand_paths_hf(
416 paths,
417 check_directory_level,
418 cloud_options,
419 glob,
420 )
421 .await?;
422
423 return Ok((paths.into(), expand_start_idx));
424 }
425
426 for (path_idx, path) in paths.iter().enumerate() {
427 use std::borrow::Cow;
428
429 let mut path = Cow::Borrowed(path);
430
431 if matches!(path.scheme(), Some(CloudScheme::Http | CloudScheme::Https)) {
432 let mut rewrite_aws = false;
433
434 #[cfg(feature = "aws")]
435 if let Some(p) = (|| {
436 use crate::cloud::CloudConfig;
437
438 let after_scheme = path.strip_scheme();
441
442 let bucket_end = after_scheme.find(".s3.")?;
443 let offset = bucket_end + 4;
444 let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
446
447 if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
449 return None;
450 }
451
452 let bucket = &after_scheme[..bucket_end];
453 let region = &after_scheme[bucket_end + 4..region_end];
454 let key = &after_scheme[region_end + 15..];
455
456 if let CloudConfig::Aws(configs) = cloud_options
457 .get_or_insert_default()
458 .config
459 .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
460 {
461 use object_store::aws::AmazonS3ConfigKey;
462
463 if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
464 configs.push((AmazonS3ConfigKey::Region, region.into()))
465 }
466 }
467
468 Some(format!("s3://{bucket}/{key}"))
469 })() {
470 path = Cow::Owned(PlRefPath::new(p));
471 rewrite_aws = true;
472 }
473
474 if !rewrite_aws {
475 out_paths.push(path.into_owned());
476 hive_idx_tracker.update(0, path_idx)?;
477 continue;
478 }
479 }
480
481 let sort_start_idx = out_paths.paths.len();
482
483 if glob && has_glob(path.as_bytes()) {
484 hive_idx_tracker.update(0, path_idx)?;
485
486 let iter = ASYNC.block_in_place_on(crate::async_glob(
487 path.into_owned(),
488 cloud_options.as_ref(),
489 ))?;
490
491 if first_path_has_scheme {
492 out_paths.extend(iter.into_iter().map(PlRefPath::new))
493 } else {
494 out_paths.extend(iter.iter().map(|x| &x[7..]).map(PlRefPath::new))
497 };
498 } else {
499 let (expand_start_idx, paths) = expand_path_cloud(
500 path.into_owned(),
501 cloud_options.as_ref(),
502 glob,
503 first_path_has_scheme,
504 )
505 .await?;
506 out_paths.extend_from_slice(&paths);
507 hive_idx_tracker.update(expand_start_idx, path_idx)?;
508 };
509
510 if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
511 <[PlRefPath]>::sort_unstable(mut_slice);
512 }
513 }
514 }
515 #[cfg(not(feature = "cloud"))]
516 panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
517 } else {
518 let mut stack: VecDeque<Cow<'_, Path>> = VecDeque::new();
519 let mut paths_scratch: Vec<PathBuf> = vec![];
520
521 for (path_idx, path) in paths.iter().enumerate() {
522 stack.clear();
523 let sort_start_idx = out_paths.paths.len();
524
525 if path.as_std_path().is_dir() {
526 let i = path.as_str().len();
527
528 hive_idx_tracker.update(i, path_idx)?;
529
530 stack.push_back(Cow::Borrowed(path.as_std_path()));
531
532 while let Some(dir) = stack.pop_front() {
533 let mut last_err = Ok(());
534
535 paths_scratch.clear();
536 paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| {
537 match x.map(|x| x.path()) {
538 Ok(v) => Some(v),
539 Err(e) => {
540 last_err = Err(e);
541 None
542 },
543 }
544 }));
545
546 last_err?;
547
548 for path in paths_scratch.drain(..) {
549 let md = path.metadata()?;
550
551 if md.is_dir() {
552 stack.push_back(Cow::Owned(path));
553 } else if md.len() > 0 {
554 out_paths.push(PlRefPath::try_from_path(&path)?);
555 }
556 }
557 }
558 } else if glob && has_glob(path.as_bytes()) {
559 hive_idx_tracker.update(0, path_idx)?;
560
561 let Ok(paths) = glob::glob(path.as_str()) else {
562 polars_bail!(ComputeError: "invalid glob pattern given")
563 };
564
565 for path in paths {
566 let path = path.map_err(to_compute_err)?;
567 let md = path.metadata()?;
568 if !md.is_dir() && md.len() > 0 {
569 out_paths.push(PlRefPath::try_from_path(&path)?);
570 }
571 }
572 } else {
573 hive_idx_tracker.update(0, path_idx)?;
574 out_paths.push(path.clone());
575 };
576
577 if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
578 <[PlRefPath]>::sort_unstable(mut_slice);
579 }
580 }
581 }
582
583 if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
584 if let [Some((_, p1)), Some((_, p2))] = out_paths.exts {
585 polars_bail!(
586 InvalidOperation: "directory contained paths with different file extensions: \
587 first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
588 which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
589 &p1, &p2
590 )
591 }
592 }
593
594 return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
595
596 struct OutPaths<'a, F: Fn(&PlRefPath) -> bool> {
599 paths: Vec<PlRefPath>,
600 exts: [Option<(PlSmallStr, PlRefPath)>; 2],
601 is_hidden_file: &'a F,
602 }
603
604 impl<F> OutPaths<'_, F>
605 where
606 F: Fn(&PlRefPath) -> bool,
607 {
608 fn push(&mut self, value: PlRefPath) {
609 if (self.is_hidden_file)(&value) {
610 return;
611 }
612
613 let exts = &mut self.exts;
614 Self::update_ext_status(exts, &value);
615
616 self.paths.push(value)
617 }
618
619 fn extend(&mut self, values: impl IntoIterator<Item = PlRefPath>) {
620 let exts = &mut self.exts;
621
622 self.paths.extend(
623 values
624 .into_iter()
625 .filter(|x| !(self.is_hidden_file)(x))
626 .inspect(|x| {
627 Self::update_ext_status(exts, x);
628 }),
629 )
630 }
631
632 fn extend_from_slice(&mut self, values: &[PlRefPath]) {
633 self.extend(values.iter().cloned())
634 }
635
636 fn update_ext_status(exts: &mut [Option<(PlSmallStr, PlRefPath)>; 2], value: &PlRefPath) {
637 let ext = value
638 .extension()
639 .map_or(PlSmallStr::EMPTY, PlSmallStr::from);
640
641 if exts[0].is_none() {
642 exts[0] = Some((ext, value.clone()));
643 } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
644 exts[1] = Some((ext, value.clone()));
645 }
646 }
647 }
648}
649
650#[cfg(feature = "file_cache")]
652pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
653 let result = std::fs::create_dir_all(path);
654
655 if path.is_dir() { Ok(()) } else { result }
656}
657
658#[cfg(test)]
659mod tests {
660 use std::path::PathBuf;
661
662 use polars_core::runtime::ASYNC;
663 use polars_utils::pl_path::PlRefPath;
664
665 use super::resolve_homedir;
666
667 #[cfg(not(target_os = "windows"))]
668 #[test]
669 fn test_resolve_homedir() {
670 let paths: Vec<PathBuf> = vec![
671 "~/dir1/dir2/test.csv".into(),
672 "/abs/path/test.csv".into(),
673 "rel/path/test.csv".into(),
674 "/".into(),
675 "~".into(),
676 ];
677
678 let resolved: Vec<PathBuf> = paths
679 .iter()
680 .map(resolve_homedir)
681 .map(|x| x.into_owned())
682 .collect();
683
684 assert_eq!(resolved[0].file_name(), paths[0].file_name());
685 assert!(resolved[0].is_absolute());
686 assert_eq!(resolved[1], paths[1]);
687 assert_eq!(resolved[2], paths[2]);
688 assert_eq!(resolved[3], paths[3]);
689 assert!(resolved[4].is_absolute());
690 }
691
692 #[cfg(target_os = "windows")]
693 #[test]
694 fn test_resolve_homedir_windows() {
695 let paths: Vec<PathBuf> = vec![
696 r#"c:\Users\user1\test.csv"#.into(),
697 r#"~\user1\test.csv"#.into(),
698 "~".into(),
699 ];
700
701 let resolved: Vec<PathBuf> = paths
702 .iter()
703 .map(resolve_homedir)
704 .map(|x| x.into_owned())
705 .collect();
706
707 assert_eq!(resolved[0], paths[0]);
708 assert_eq!(resolved[1].file_name(), paths[1].file_name());
709 assert!(resolved[1].is_absolute());
710 assert!(resolved[2].is_absolute());
711 }
712
713 #[test]
714 fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
715 use super::expand_paths;
719
720 let path = "https://pola.rs/test.csv?token=bear";
721 let paths = &[PlRefPath::new(path)];
722 let out = ASYNC
723 .block_on(expand_paths(paths, true, &[], &mut None))
724 .unwrap();
725 assert_eq!(out.as_ref(), paths);
726 }
727}