1use std::collections::VecDeque;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, LazyLock};
4
5use polars_core::config;
6use polars_core::error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::pl_str::PlSmallStr;
8use polars_utils::plpath::{CloudScheme, PlPath, PlPathRef};
9
10#[cfg(feature = "cloud")]
11mod hugging_face;
12
13use crate::cloud::CloudOptions;
14
15#[allow(clippy::bind_instead_of_map)]
16pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
17 (|| {
18 let verbose = config::verbose();
19
20 let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
21 if verbose {
22 eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
23 }
24 v
25 } else if cfg!(target_family = "unix") {
26 let id = std::env::var("USER")
27 .inspect(|_| {
28 if verbose {
29 eprintln!("init_temp_dir: sourced $USER")
30 }
31 })
32 .or_else(|_e| {
33 #[cfg(feature = "file_cache")]
36 {
37 std::env::var("HOME")
38 .inspect(|_| {
39 if verbose {
40 eprintln!("init_temp_dir: sourced $HOME")
41 }
42 })
43 .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
44 }
45 #[cfg(not(feature = "file_cache"))]
46 {
47 Err(_e)
48 }
49 });
50
51 if let Ok(v) = id {
52 std::env::temp_dir().join(format!("polars-{v}/"))
53 } else {
54 return Err(std::io::Error::other(
55 "could not load $USER or $HOME environment variables",
56 ));
57 }
58 } else if cfg!(target_family = "windows") {
59 std::env::temp_dir().join("polars/")
63 } else {
64 std::env::temp_dir().join("polars/")
65 }
66 .into_boxed_path();
67
68 if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
69 if !path.is_dir() {
70 panic!(
71 "failed to create temporary directory: {} (path = {:?})",
72 err,
73 path.as_ref()
74 );
75 }
76 }
77
78 #[cfg(target_family = "unix")]
79 {
80 use std::os::unix::fs::PermissionsExt;
81
82 let result = (|| {
83 std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
84 let perms = std::fs::metadata(path.as_ref())?.permissions();
85
86 if (perms.mode() % 0o1000) != 0o700 {
87 std::io::Result::Err(std::io::Error::other(format!(
88 "permission mismatch: {perms:?}"
89 )))
90 } else {
91 std::io::Result::Ok(())
92 }
93 })()
94 .map_err(|e| {
95 std::io::Error::new(
96 e.kind(),
97 format!(
98 "error setting temporary directory permissions: {} (path = {:?})",
99 e,
100 path.as_ref()
101 ),
102 )
103 });
104
105 if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
106 result?;
107 }
108 }
109
110 std::io::Result::Ok(path)
111 })()
112 .map_err(|e| {
113 std::io::Error::new(
114 e.kind(),
115 format!(
116 "error initializing temporary directory: {e} \
117 consider explicitly setting POLARS_TEMP_DIR"
118 ),
119 )
120 })
121 .unwrap()
122});
123
124pub fn resolve_homedir(path: &dyn AsRef<Path>) -> PathBuf {
126 let path = path.as_ref();
127
128 if path.starts_with("~") {
129 #[cfg(not(target_family = "wasm"))]
131 if let Some(homedir) = home::home_dir() {
132 return homedir.join(path.strip_prefix("~").unwrap());
133 }
134 }
135
136 path.into()
137}
138
139pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
141 memchr::memchr3(b'*', b'?', b'[', path)
142}
143
144pub fn expanded_from_single_directory(addrs: &[PlPath], expanded_addrs: &[PlPath]) -> bool {
146 addrs.len() == 1 && get_glob_start_idx(addrs[0].as_ref().strip_scheme().as_bytes()).is_none()
148 && {
150 (
151 addrs[0].as_ref().as_local_path().is_some_and(|p| p.is_dir())
153 )
154 || (
155 expanded_addrs.is_empty() || (addrs[0] != expanded_addrs[0])
158 )
159 }
160}
161
162pub fn expand_paths(
164 paths: &[PlPath],
165 glob: bool,
166 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
167) -> PolarsResult<Arc<[PlPath]>> {
168 expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
169}
170
171struct HiveIdxTracker<'a> {
172 idx: usize,
173 paths: &'a [PlPath],
174 check_directory_level: bool,
175}
176
177impl HiveIdxTracker<'_> {
178 fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
179 let check_directory_level = self.check_directory_level;
180 let paths = self.paths;
181
182 if check_directory_level
183 && ![usize::MAX, i].contains(&self.idx)
184 && (path_idx > 0 && paths[path_idx].as_ref().parent() != paths[path_idx - 1].as_ref().parent())
186 {
187 polars_bail!(
188 InvalidOperation:
189 "attempted to read from different directory levels with hive partitioning enabled: \
190 first path: {}, second path: {}",
191 paths[path_idx - 1].display(),
192 paths[path_idx].display(),
193 )
194 } else {
195 self.idx = std::cmp::min(self.idx, i);
196 Ok(())
197 }
198 }
199}
200
201pub fn expand_paths_hive(
205 paths: &[PlPath],
206 glob: bool,
207 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
208 check_directory_level: bool,
209) -> PolarsResult<(Arc<[PlPath]>, usize)> {
210 let Some(first_path) = paths.first() else {
211 return Ok((vec![].into(), 0));
212 };
213
214 let is_cloud = first_path.as_ref().is_cloud_url();
215
216 struct OutPaths {
219 paths: Vec<PlPath>,
220 exts: [Option<(PlSmallStr, usize)>; 2],
221 current_idx: usize,
222 }
223
224 impl OutPaths {
225 fn update_ext_status(
226 current_idx: &mut usize,
227 exts: &mut [Option<(PlSmallStr, usize)>; 2],
228 value: PlPathRef,
229 ) {
230 let ext = value
231 .extension()
232 .map(PlSmallStr::from)
233 .unwrap_or(PlSmallStr::EMPTY);
234
235 if exts[0].is_none() {
236 exts[0] = Some((ext, *current_idx));
237 } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
238 exts[1] = Some((ext, *current_idx));
239 }
240
241 *current_idx += 1;
242 }
243
244 fn push(&mut self, value: PlPath) {
245 {
246 let current_idx = &mut self.current_idx;
247 let exts = &mut self.exts;
248 Self::update_ext_status(current_idx, exts, value.as_ref());
249 }
250 self.paths.push(value)
251 }
252
253 fn extend(&mut self, values: impl IntoIterator<Item = PlPath>) {
254 let current_idx = &mut self.current_idx;
255 let exts = &mut self.exts;
256
257 self.paths.extend(values.into_iter().inspect(|x| {
258 Self::update_ext_status(current_idx, exts, x.as_ref());
259 }))
260 }
261
262 fn extend_from_slice(&mut self, values: &[PlPath]) {
263 self.extend(values.iter().cloned())
264 }
265 }
266
267 let mut out_paths = OutPaths {
268 paths: vec![],
269 exts: [None, None],
270 current_idx: 0,
271 };
272
273 let mut hive_idx_tracker = HiveIdxTracker {
274 idx: usize::MAX,
275 paths,
276 check_directory_level,
277 };
278
279 if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
280 #[cfg(feature = "cloud")]
281 {
282 use polars_utils::_limit_path_len_io_err;
283
284 use crate::cloud::object_path_from_str;
285
286 if first_path.cloud_scheme() == Some(CloudScheme::Hf) {
287 let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
288 hugging_face::expand_paths_hf(
289 paths,
290 check_directory_level,
291 cloud_options,
292 glob,
293 ),
294 )?;
295
296 return Ok((Arc::from(paths), expand_start_idx));
297 }
298
299 let format_path = |scheme: &str, bucket: &str, location: &str| {
300 if is_cloud {
301 format!("{scheme}://{bucket}/{location}")
302 } else {
303 format!("/{location}")
304 }
305 };
306
307 let expand_path_cloud = |addr: &str,
308 cloud_options: Option<&CloudOptions>|
309 -> PolarsResult<(usize, Vec<PlPath>)> {
310 crate::pl_async::get_runtime().block_in_place_on(async {
311 let (cloud_location, store) =
312 crate::cloud::build_object_store(addr, cloud_options, glob).await?;
313 let prefix = object_path_from_str(&cloud_location.prefix)?;
314
315 let out = if !addr.ends_with("/")
316 && (!glob || cloud_location.expansion.is_none())
317 && {
318 is_cloud || Path::new(addr).is_file()
323 } {
324 (
325 0,
326 vec![PlPath::from_string(format_path(
327 &cloud_location.scheme,
328 &cloud_location.bucket,
329 prefix.as_ref(),
330 ))],
331 )
332 } else {
333 use futures::TryStreamExt;
334
335 if !is_cloud {
336 let path = PathBuf::from(addr);
341 if !path.is_dir() {
342 path.metadata()
343 .map_err(|err| _limit_path_len_io_err(&path, err))?;
344 }
345 }
346
347 let cloud_location = &cloud_location;
348
349 let mut paths = store
350 .try_exec_rebuild_on_err(|store| {
351 let st = store.clone();
352
353 async {
354 let store = st;
355 let out = store
356 .list(Some(&prefix))
357 .try_filter_map(|x| async move {
358 let out = (x.size > 0).then(|| {
359 PlPath::from_string({
360 format_path(
361 &cloud_location.scheme,
362 &cloud_location.bucket,
363 x.location.as_ref(),
364 )
365 })
366 });
367 Ok(out)
368 })
369 .try_collect::<Vec<_>>()
370 .await?;
371
372 Ok(out)
373 }
374 })
375 .await?;
376
377 let mut prefix = prefix.to_string();
380 if addr.ends_with('/') {
381 prefix.push('/')
382 };
383
384 paths.sort_unstable();
385 (
386 format_path(
387 &cloud_location.scheme,
388 &cloud_location.bucket,
389 prefix.as_ref(),
390 )
391 .len(),
392 paths,
393 )
394 };
395
396 PolarsResult::Ok(out)
397 })
398 };
399
400 for (path_idx, path) in paths.iter().enumerate() {
401 use std::borrow::Cow;
402
403 let mut path = Cow::Borrowed(path);
404
405 if matches!(
406 path.cloud_scheme(),
407 Some(CloudScheme::Http | CloudScheme::Https)
408 ) {
409 let mut rewrite_aws = false;
410
411 #[cfg(feature = "aws")]
412 if let Some(p) = (|| {
413 use crate::cloud::CloudConfig;
414
415 let p = path.as_ref().as_ref();
418 let after_scheme = p.strip_scheme();
419
420 let bucket_end = after_scheme.find(".s3.")?;
421 let offset = bucket_end + 4;
422 let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
424
425 if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
427 return None;
428 }
429
430 let bucket = &after_scheme[..bucket_end];
431 let region = &after_scheme[bucket_end + 4..region_end];
432 let key = &after_scheme[region_end + 15..];
433
434 if let CloudConfig::Aws(configs) = cloud_options
435 .get_or_insert_default()
436 .config
437 .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
438 {
439 use object_store::aws::AmazonS3ConfigKey;
440
441 if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
442 configs.push((AmazonS3ConfigKey::Region, region.into()))
443 }
444 }
445
446 Some(format!("s3://{bucket}/{key}"))
447 })() {
448 path = Cow::Owned(PlPath::from_string(p));
449 rewrite_aws = true;
450 }
451
452 if !rewrite_aws {
453 out_paths.push(path.into_owned());
454 hive_idx_tracker.update(0, path_idx)?;
455 continue;
456 }
457 }
458
459 let glob_start_idx = get_glob_start_idx(path.to_str().as_bytes());
460
461 let path = if glob && glob_start_idx.is_some() {
462 path.clone()
463 } else {
464 let (expand_start_idx, paths) =
465 expand_path_cloud(path.to_str(), cloud_options.as_ref())?;
466 out_paths.extend_from_slice(&paths);
467 hive_idx_tracker.update(expand_start_idx, path_idx)?;
468 continue;
469 };
470
471 hive_idx_tracker.update(0, path_idx)?;
472
473 let iter = crate::pl_async::get_runtime()
474 .block_in_place_on(crate::async_glob(path.to_str(), cloud_options.as_ref()))?;
475
476 if is_cloud {
477 out_paths.extend(iter.into_iter().map(PlPath::from_string));
478 } else {
479 out_paths.extend(
481 iter.iter()
482 .map(|x| &x[7..])
483 .map(|s| PlPathRef::new(s).into_owned()),
484 )
485 }
486 }
487 }
488 #[cfg(not(feature = "cloud"))]
489 panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
490 } else {
491 let mut stack = VecDeque::new();
492
493 for path_idx in 0..paths.len() {
494 let path = paths[path_idx]
495 .as_ref()
496 .as_local_path()
497 .unwrap()
498 .to_path_buf();
499 stack.clear();
500
501 if path.is_dir() {
502 let i = path.to_str().unwrap().len();
503
504 hive_idx_tracker.update(i, path_idx)?;
505
506 stack.push_back(path.clone());
507
508 while let Some(dir) = stack.pop_front() {
509 let mut paths = std::fs::read_dir(dir)
510 .map_err(PolarsError::from)?
511 .map(|x| x.map(|x| x.path()))
512 .collect::<std::io::Result<Vec<_>>>()
513 .map_err(PolarsError::from)?;
514 paths.sort_unstable();
515
516 for path in paths {
517 if path.is_dir() {
518 stack.push_back(path);
519 } else if path.metadata()?.len() > 0 {
520 out_paths.push(PlPath::Local(path.into()));
521 }
522 }
523 }
524
525 continue;
526 }
527
528 let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());
529
530 if glob && i.is_some() {
531 hive_idx_tracker.update(0, path_idx)?;
532
533 let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
534 polars_bail!(ComputeError: "invalid glob pattern given")
535 };
536
537 for path in paths {
538 let path = path.map_err(to_compute_err)?;
539 if !path.is_dir() && path.metadata()?.len() > 0 {
540 out_paths.push(PlPath::Local(path.into()));
541 }
542 }
543 } else {
544 hive_idx_tracker.update(0, path_idx)?;
545 out_paths.push(PlPath::Local(path.into()));
546 }
547 }
548 }
549
550 assert_eq!(out_paths.current_idx, out_paths.paths.len());
551
552 if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
553 if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
554 polars_bail!(
555 InvalidOperation: r#"directory contained paths with different file extensions: \
556 first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
557 which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#,
558 &out_paths.paths[i1].display(), &out_paths.paths[i2].display()
559 )
560 }
561 }
562
563 Ok((out_paths.paths.into(), hive_idx_tracker.idx))
564}
565
566#[cfg(feature = "file_cache")]
568pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
569 let result = std::fs::create_dir_all(path);
570
571 if path.is_dir() { Ok(()) } else { result }
572}
573
574#[cfg(test)]
575mod tests {
576 use std::path::PathBuf;
577
578 use polars_utils::plpath::PlPath;
579
580 use super::resolve_homedir;
581
582 #[cfg(not(target_os = "windows"))]
583 #[test]
584 fn test_resolve_homedir() {
585 let paths: Vec<PathBuf> = vec![
586 "~/dir1/dir2/test.csv".into(),
587 "/abs/path/test.csv".into(),
588 "rel/path/test.csv".into(),
589 "/".into(),
590 "~".into(),
591 ];
592
593 let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
594
595 assert_eq!(resolved[0].file_name(), paths[0].file_name());
596 assert!(resolved[0].is_absolute());
597 assert_eq!(resolved[1], paths[1]);
598 assert_eq!(resolved[2], paths[2]);
599 assert_eq!(resolved[3], paths[3]);
600 assert!(resolved[4].is_absolute());
601 }
602
603 #[cfg(target_os = "windows")]
604 #[test]
605 fn test_resolve_homedir_windows() {
606 let paths: Vec<PathBuf> = vec![
607 r#"c:\Users\user1\test.csv"#.into(),
608 r#"~\user1\test.csv"#.into(),
609 "~".into(),
610 ];
611
612 let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
613
614 assert_eq!(resolved[0], paths[0]);
615 assert_eq!(resolved[1].file_name(), paths[1].file_name());
616 assert!(resolved[1].is_absolute());
617 assert!(resolved[2].is_absolute());
618 }
619
620 #[test]
621 fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
622 use super::expand_paths;
626
627 let path = "https://pola.rs/test.csv?token=bear";
628 let paths = &[PlPath::new(path)];
629 let out = expand_paths(paths, true, &mut None).unwrap();
630 assert_eq!(out.as_ref(), paths);
631 }
632}