1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::path::{Path, PathBuf};
4use std::sync::LazyLock;
5
6use polars_buffer::Buffer;
7use polars_core::config;
8use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
9use polars_utils::pl_path::{CloudScheme, PlRefPath};
10use polars_utils::pl_str::PlSmallStr;
11
12#[cfg(feature = "cloud")]
13mod hugging_face;
14
15use crate::cloud::CloudOptions;
16
17#[allow(clippy::bind_instead_of_map)]
18pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
19 (|| {
20 let verbose = config::verbose();
21
22 let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
23 if verbose {
24 eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
25 }
26 v
27 } else if cfg!(target_family = "unix") {
28 let id = std::env::var("USER")
29 .inspect(|_| {
30 if verbose {
31 eprintln!("init_temp_dir: sourced $USER")
32 }
33 })
34 .or_else(|_e| {
35 #[cfg(feature = "file_cache")]
38 {
39 std::env::var("HOME")
40 .inspect(|_| {
41 if verbose {
42 eprintln!("init_temp_dir: sourced $HOME")
43 }
44 })
45 .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
46 }
47 #[cfg(not(feature = "file_cache"))]
48 {
49 Err(_e)
50 }
51 });
52
53 if let Ok(v) = id {
54 std::env::temp_dir().join(format!("polars-{v}/"))
55 } else {
56 return Err(std::io::Error::other(
57 "could not load $USER or $HOME environment variables",
58 ));
59 }
60 } else if cfg!(target_family = "windows") {
61 std::env::temp_dir().join("polars/")
65 } else {
66 std::env::temp_dir().join("polars/")
67 }
68 .into_boxed_path();
69
70 if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
71 if !path.is_dir() {
72 panic!(
73 "failed to create temporary directory: {} (path = {:?})",
74 err,
75 path.as_ref()
76 );
77 }
78 }
79
80 #[cfg(target_family = "unix")]
81 {
82 use std::os::unix::fs::PermissionsExt;
83
84 let result = (|| {
85 std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
86 let perms = std::fs::metadata(path.as_ref())?.permissions();
87
88 if (perms.mode() % 0o1000) != 0o700 {
89 std::io::Result::Err(std::io::Error::other(format!(
90 "permission mismatch: {perms:?}"
91 )))
92 } else {
93 std::io::Result::Ok(())
94 }
95 })()
96 .map_err(|e| {
97 std::io::Error::new(
98 e.kind(),
99 format!(
100 "error setting temporary directory permissions: {} (path = {:?})",
101 e,
102 path.as_ref()
103 ),
104 )
105 });
106
107 if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
108 result?;
109 }
110 }
111
112 std::io::Result::Ok(path)
113 })()
114 .map_err(|e| {
115 std::io::Error::new(
116 e.kind(),
117 format!(
118 "error initializing temporary directory: {e} \
119 consider explicitly setting POLARS_TEMP_DIR"
120 ),
121 )
122 })
123 .unwrap()
124});
125
126pub fn resolve_homedir<'a, S: AsRef<Path> + ?Sized>(path: &'a S) -> Cow<'a, Path> {
128 return inner(path.as_ref());
129
130 fn inner(path: &Path) -> Cow<'_, Path> {
131 if path.starts_with("~") {
132 #[cfg(not(target_family = "wasm"))]
134 if let Some(homedir) = home::home_dir() {
135 return Cow::Owned(homedir.join(path.strip_prefix("~").unwrap()));
136 }
137 }
138
139 Cow::Borrowed(path)
140 }
141}
142
143fn has_glob(path: &[u8]) -> bool {
144 return get_glob_start_idx(path).is_some();
145
146 fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
148 memchr::memchr3(b'*', b'?', b'[', path)
149 }
150}
151
152pub fn expanded_from_single_directory(paths: &[PlRefPath], expanded_paths: &[PlRefPath]) -> bool {
154 paths.len() == 1 && !has_glob(paths[0].strip_scheme().as_bytes())
156 && {
158 (
159 !paths[0].has_scheme() && paths[0].as_std_path().is_dir()
161 )
162 || (
163 expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
166 )
167 }
168}
169
170pub async fn expand_paths(
172 paths: &[PlRefPath],
173 glob: bool,
174 hidden_file_prefix: &[PlSmallStr],
175 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
176) -> PolarsResult<Buffer<PlRefPath>> {
177 expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false)
178 .await
179 .map(|x| x.0)
180}
181
182struct HiveIdxTracker<'a> {
183 idx: usize,
184 paths: &'a [PlRefPath],
185 check_directory_level: bool,
186}
187
188impl HiveIdxTracker<'_> {
189 fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
190 let check_directory_level = self.check_directory_level;
191 let paths = self.paths;
192
193 if check_directory_level
194 && ![usize::MAX, i].contains(&self.idx)
195 && (path_idx > 0 && paths[path_idx].parent() != paths[path_idx - 1].parent())
197 {
198 polars_bail!(
199 InvalidOperation:
200 "attempted to read from different directory levels with hive partitioning enabled: \
201 first path: {}, second path: {}",
202 &paths[path_idx - 1],
203 &paths[path_idx],
204 )
205 } else {
206 self.idx = std::cmp::min(self.idx, i);
207 Ok(())
208 }
209 }
210}
211
212#[cfg(feature = "cloud")]
213async fn expand_path_cloud(
214 path: PlRefPath,
215 cloud_options: Option<&CloudOptions>,
216 glob: bool,
217 first_path_has_scheme: bool,
218) -> PolarsResult<(usize, Vec<PlRefPath>)> {
219 let format_path = |scheme: &str, bucket: &str, location: &str| {
220 if first_path_has_scheme {
221 format!("{scheme}://{bucket}/{location}")
222 } else {
223 format!("/{location}")
224 }
225 };
226
227 use polars_utils::_limit_path_len_io_err;
228
229 use crate::cloud::object_path_from_str;
230 let path_str = path.as_str();
231
232 let (cloud_location, store) =
233 crate::cloud::build_object_store(path.clone(), cloud_options, glob).await?;
234 let prefix = object_path_from_str(&cloud_location.prefix)?;
235
236 let out = if !path_str.ends_with("/") && (!glob || cloud_location.expansion.is_none()) && {
237 path.has_scheme() || path.as_std_path().is_file()
242 } {
243 (
244 0,
245 vec![PlRefPath::new(format_path(
246 cloud_location.scheme,
247 &cloud_location.bucket,
248 prefix.as_ref(),
249 ))],
250 )
251 } else {
252 use futures::TryStreamExt;
253
254 if !path.has_scheme() {
255 path.as_std_path()
260 .metadata()
261 .map_err(|err| _limit_path_len_io_err(path.as_std_path(), err))?;
262 }
263
264 let cloud_location = &cloud_location;
265 let prefix_ref = &prefix;
266
267 let mut paths = store
268 .exec_with_rebuild_retry_on_err(|s| async move {
269 let out = s
270 .list(Some(prefix_ref))
271 .try_filter_map(|x| async move {
272 let out = (x.size > 0).then(|| {
273 PlRefPath::new({
274 format_path(
275 cloud_location.scheme,
276 &cloud_location.bucket,
277 x.location.as_ref(),
278 )
279 })
280 });
281 Ok(out)
282 })
283 .try_collect::<Vec<_>>()
284 .await?;
285
286 Ok(out)
287 })
288 .await?;
289
290 let mut prefix = prefix.to_string();
293 if path_str.ends_with('/') && !prefix.ends_with('/') {
294 prefix.push('/')
295 };
296
297 paths.sort_unstable();
298
299 (
300 format_path(
301 cloud_location.scheme,
302 &cloud_location.bucket,
303 prefix.as_ref(),
304 )
305 .len(),
306 paths,
307 )
308 };
309
310 PolarsResult::Ok(out)
311}
312
313pub async fn expand_paths_hive(
317 paths: &[PlRefPath],
318 glob: bool,
319 hidden_file_prefix: &[PlSmallStr],
320 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
321 check_directory_level: bool,
322) -> PolarsResult<(Buffer<PlRefPath>, usize)> {
323 let Some(first_path) = paths.first() else {
324 return Ok((vec![].into(), 0));
325 };
326
327 let first_path_has_scheme = first_path.has_scheme();
328
329 let is_hidden_file = move |path: &PlRefPath| {
330 path.file_name()
331 .and_then(|x| x.to_str())
332 .is_some_and(|file_name| {
333 hidden_file_prefix
334 .iter()
335 .any(|x| file_name.starts_with(x.as_str()))
336 })
337 };
338
339 let mut out_paths = OutPaths {
340 paths: vec![],
341 exts: [None, None],
342 is_hidden_file: &is_hidden_file,
343 };
344
345 let mut hive_idx_tracker = HiveIdxTracker {
346 idx: usize::MAX,
347 paths,
348 check_directory_level,
349 };
350
351 if first_path_has_scheme || {
352 cfg!(not(target_family = "windows")) && polars_config::config().force_async()
353 } {
354 #[cfg(feature = "cloud")]
355 {
356 if first_path.scheme() == Some(CloudScheme::Hf) {
357 let (expand_start_idx, paths) = hugging_face::expand_paths_hf(
358 paths,
359 check_directory_level,
360 cloud_options,
361 glob,
362 )
363 .await?;
364
365 return Ok((paths.into(), expand_start_idx));
366 }
367
368 for (path_idx, path) in paths.iter().enumerate() {
369 use std::borrow::Cow;
370
371 let mut path = Cow::Borrowed(path);
372
373 if matches!(path.scheme(), Some(CloudScheme::Http | CloudScheme::Https)) {
374 let mut rewrite_aws = false;
375
376 #[cfg(feature = "aws")]
377 if let Some(p) = (|| {
378 use crate::cloud::CloudConfig;
379
380 let after_scheme = path.strip_scheme();
383
384 let bucket_end = after_scheme.find(".s3.")?;
385 let offset = bucket_end + 4;
386 let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
388
389 if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
391 return None;
392 }
393
394 let bucket = &after_scheme[..bucket_end];
395 let region = &after_scheme[bucket_end + 4..region_end];
396 let key = &after_scheme[region_end + 15..];
397
398 if let CloudConfig::Aws(configs) = cloud_options
399 .get_or_insert_default()
400 .config
401 .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
402 {
403 use object_store::aws::AmazonS3ConfigKey;
404
405 if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
406 configs.push((AmazonS3ConfigKey::Region, region.into()))
407 }
408 }
409
410 Some(format!("s3://{bucket}/{key}"))
411 })() {
412 path = Cow::Owned(PlRefPath::new(p));
413 rewrite_aws = true;
414 }
415
416 if !rewrite_aws {
417 out_paths.push(path.into_owned());
418 hive_idx_tracker.update(0, path_idx)?;
419 continue;
420 }
421 }
422
423 let sort_start_idx = out_paths.paths.len();
424
425 if glob && has_glob(path.as_bytes()) {
426 hive_idx_tracker.update(0, path_idx)?;
427
428 let iter = crate::pl_async::get_runtime().block_in_place_on(
429 crate::async_glob(path.into_owned(), cloud_options.as_ref()),
430 )?;
431
432 if first_path_has_scheme {
433 out_paths.extend(iter.into_iter().map(PlRefPath::new))
434 } else {
435 out_paths.extend(iter.iter().map(|x| &x[7..]).map(PlRefPath::new))
438 };
439 } else {
440 let (expand_start_idx, paths) = expand_path_cloud(
441 path.into_owned(),
442 cloud_options.as_ref(),
443 glob,
444 first_path_has_scheme,
445 )
446 .await?;
447 out_paths.extend_from_slice(&paths);
448 hive_idx_tracker.update(expand_start_idx, path_idx)?;
449 };
450
451 if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
452 <[PlRefPath]>::sort_unstable(mut_slice);
453 }
454 }
455 }
456 #[cfg(not(feature = "cloud"))]
457 panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
458 } else {
459 let mut stack: VecDeque<Cow<'_, Path>> = VecDeque::new();
460 let mut paths_scratch: Vec<PathBuf> = vec![];
461
462 for (path_idx, path) in paths.iter().enumerate() {
463 stack.clear();
464 let sort_start_idx = out_paths.paths.len();
465
466 if path.as_std_path().is_dir() {
467 let i = path.as_str().len();
468
469 hive_idx_tracker.update(i, path_idx)?;
470
471 stack.push_back(Cow::Borrowed(path.as_std_path()));
472
473 while let Some(dir) = stack.pop_front() {
474 let mut last_err = Ok(());
475
476 paths_scratch.clear();
477 paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| {
478 match x.map(|x| x.path()) {
479 Ok(v) => Some(v),
480 Err(e) => {
481 last_err = Err(e);
482 None
483 },
484 }
485 }));
486
487 last_err?;
488
489 for path in paths_scratch.drain(..) {
490 let md = path.metadata()?;
491
492 if md.is_dir() {
493 stack.push_back(Cow::Owned(path));
494 } else if md.len() > 0 {
495 out_paths.push(PlRefPath::try_from_path(&path)?);
496 }
497 }
498 }
499 } else if glob && has_glob(path.as_bytes()) {
500 hive_idx_tracker.update(0, path_idx)?;
501
502 let Ok(paths) = glob::glob(path.as_str()) else {
503 polars_bail!(ComputeError: "invalid glob pattern given")
504 };
505
506 for path in paths {
507 let path = path.map_err(to_compute_err)?;
508 let md = path.metadata()?;
509 if !md.is_dir() && md.len() > 0 {
510 out_paths.push(PlRefPath::try_from_path(&path)?);
511 }
512 }
513 } else {
514 hive_idx_tracker.update(0, path_idx)?;
515 out_paths.push(path.clone());
516 };
517
518 if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
519 <[PlRefPath]>::sort_unstable(mut_slice);
520 }
521 }
522 }
523
524 if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
525 if let [Some((_, p1)), Some((_, p2))] = out_paths.exts {
526 polars_bail!(
527 InvalidOperation: "directory contained paths with different file extensions: \
528 first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
529 which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
530 &p1, &p2
531 )
532 }
533 }
534
535 return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
536
537 struct OutPaths<'a, F: Fn(&PlRefPath) -> bool> {
540 paths: Vec<PlRefPath>,
541 exts: [Option<(PlSmallStr, PlRefPath)>; 2],
542 is_hidden_file: &'a F,
543 }
544
545 impl<F> OutPaths<'_, F>
546 where
547 F: Fn(&PlRefPath) -> bool,
548 {
549 fn push(&mut self, value: PlRefPath) {
550 if (self.is_hidden_file)(&value) {
551 return;
552 }
553
554 let exts = &mut self.exts;
555 Self::update_ext_status(exts, &value);
556
557 self.paths.push(value)
558 }
559
560 fn extend(&mut self, values: impl IntoIterator<Item = PlRefPath>) {
561 let exts = &mut self.exts;
562
563 self.paths.extend(
564 values
565 .into_iter()
566 .filter(|x| !(self.is_hidden_file)(x))
567 .inspect(|x| {
568 Self::update_ext_status(exts, x);
569 }),
570 )
571 }
572
573 fn extend_from_slice(&mut self, values: &[PlRefPath]) {
574 self.extend(values.iter().cloned())
575 }
576
577 fn update_ext_status(exts: &mut [Option<(PlSmallStr, PlRefPath)>; 2], value: &PlRefPath) {
578 let ext = value
579 .extension()
580 .map_or(PlSmallStr::EMPTY, PlSmallStr::from);
581
582 if exts[0].is_none() {
583 exts[0] = Some((ext, value.clone()));
584 } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
585 exts[1] = Some((ext, value.clone()));
586 }
587 }
588 }
589}
590
591#[cfg(feature = "file_cache")]
593pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
594 let result = std::fs::create_dir_all(path);
595
596 if path.is_dir() { Ok(()) } else { result }
597}
598
599#[cfg(test)]
600mod tests {
601 use std::path::PathBuf;
602
603 use polars_utils::pl_path::PlRefPath;
604
605 use super::resolve_homedir;
606 use crate::pl_async::get_runtime;
607
608 #[cfg(not(target_os = "windows"))]
609 #[test]
610 fn test_resolve_homedir() {
611 let paths: Vec<PathBuf> = vec![
612 "~/dir1/dir2/test.csv".into(),
613 "/abs/path/test.csv".into(),
614 "rel/path/test.csv".into(),
615 "/".into(),
616 "~".into(),
617 ];
618
619 let resolved: Vec<PathBuf> = paths
620 .iter()
621 .map(resolve_homedir)
622 .map(|x| x.into_owned())
623 .collect();
624
625 assert_eq!(resolved[0].file_name(), paths[0].file_name());
626 assert!(resolved[0].is_absolute());
627 assert_eq!(resolved[1], paths[1]);
628 assert_eq!(resolved[2], paths[2]);
629 assert_eq!(resolved[3], paths[3]);
630 assert!(resolved[4].is_absolute());
631 }
632
633 #[cfg(target_os = "windows")]
634 #[test]
635 fn test_resolve_homedir_windows() {
636 let paths: Vec<PathBuf> = vec![
637 r#"c:\Users\user1\test.csv"#.into(),
638 r#"~\user1\test.csv"#.into(),
639 "~".into(),
640 ];
641
642 let resolved: Vec<PathBuf> = paths
643 .iter()
644 .map(resolve_homedir)
645 .map(|x| x.into_owned())
646 .collect();
647
648 assert_eq!(resolved[0], paths[0]);
649 assert_eq!(resolved[1].file_name(), paths[1].file_name());
650 assert!(resolved[1].is_absolute());
651 assert!(resolved[2].is_absolute());
652 }
653
654 #[test]
655 fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
656 use super::expand_paths;
660
661 let path = "https://pola.rs/test.csv?token=bear";
662 let paths = &[PlRefPath::new(path)];
663 let out = get_runtime()
664 .block_on(expand_paths(paths, true, &[], &mut None))
665 .unwrap();
666 assert_eq!(out.as_ref(), paths);
667 }
668}