1use std::collections::VecDeque;
2use std::path::{Path, PathBuf};
3use std::sync::LazyLock;
4
5use arrow::buffer::Buffer;
6use polars_core::config;
7use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
8use polars_utils::pl_str::PlSmallStr;
9use polars_utils::plpath::{CloudScheme, PlPath, PlPathRef};
10
11#[cfg(feature = "cloud")]
12mod hugging_face;
13
14use crate::cloud::CloudOptions;
15
16#[allow(clippy::bind_instead_of_map)]
17pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
18 (|| {
19 let verbose = config::verbose();
20
21 let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
22 if verbose {
23 eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
24 }
25 v
26 } else if cfg!(target_family = "unix") {
27 let id = std::env::var("USER")
28 .inspect(|_| {
29 if verbose {
30 eprintln!("init_temp_dir: sourced $USER")
31 }
32 })
33 .or_else(|_e| {
34 #[cfg(feature = "file_cache")]
37 {
38 std::env::var("HOME")
39 .inspect(|_| {
40 if verbose {
41 eprintln!("init_temp_dir: sourced $HOME")
42 }
43 })
44 .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
45 }
46 #[cfg(not(feature = "file_cache"))]
47 {
48 Err(_e)
49 }
50 });
51
52 if let Ok(v) = id {
53 std::env::temp_dir().join(format!("polars-{v}/"))
54 } else {
55 return Err(std::io::Error::other(
56 "could not load $USER or $HOME environment variables",
57 ));
58 }
59 } else if cfg!(target_family = "windows") {
60 std::env::temp_dir().join("polars/")
64 } else {
65 std::env::temp_dir().join("polars/")
66 }
67 .into_boxed_path();
68
69 if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
70 if !path.is_dir() {
71 panic!(
72 "failed to create temporary directory: {} (path = {:?})",
73 err,
74 path.as_ref()
75 );
76 }
77 }
78
79 #[cfg(target_family = "unix")]
80 {
81 use std::os::unix::fs::PermissionsExt;
82
83 let result = (|| {
84 std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
85 let perms = std::fs::metadata(path.as_ref())?.permissions();
86
87 if (perms.mode() % 0o1000) != 0o700 {
88 std::io::Result::Err(std::io::Error::other(format!(
89 "permission mismatch: {perms:?}"
90 )))
91 } else {
92 std::io::Result::Ok(())
93 }
94 })()
95 .map_err(|e| {
96 std::io::Error::new(
97 e.kind(),
98 format!(
99 "error setting temporary directory permissions: {} (path = {:?})",
100 e,
101 path.as_ref()
102 ),
103 )
104 });
105
106 if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
107 result?;
108 }
109 }
110
111 std::io::Result::Ok(path)
112 })()
113 .map_err(|e| {
114 std::io::Error::new(
115 e.kind(),
116 format!(
117 "error initializing temporary directory: {e} \
118 consider explicitly setting POLARS_TEMP_DIR"
119 ),
120 )
121 })
122 .unwrap()
123});
124
125pub fn resolve_homedir(path: &dyn AsRef<Path>) -> PathBuf {
127 let path = path.as_ref();
128
129 if path.starts_with("~") {
130 #[cfg(not(target_family = "wasm"))]
132 if let Some(homedir) = home::home_dir() {
133 return homedir.join(path.strip_prefix("~").unwrap());
134 }
135 }
136
137 path.into()
138}
139
140pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
142 memchr::memchr3(b'*', b'?', b'[', path)
143}
144
145pub fn expanded_from_single_directory(paths: &[PlPath], expanded_paths: &[PlPath]) -> bool {
147 paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().strip_scheme().as_bytes()).is_none()
149 && {
151 (
152 paths[0].as_ref().as_local_path().is_some_and(|p| p.is_dir())
154 )
155 || (
156 expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
159 )
160 }
161}
162
163pub fn expand_paths(
165 paths: &[PlPath],
166 glob: bool,
167 hidden_file_prefix: &[PlSmallStr],
168 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
169) -> PolarsResult<Buffer<PlPath>> {
170 expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false).map(|x| x.0)
171}
172
173struct HiveIdxTracker<'a> {
174 idx: usize,
175 paths: &'a [PlPath],
176 check_directory_level: bool,
177}
178
179impl HiveIdxTracker<'_> {
180 fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
181 let check_directory_level = self.check_directory_level;
182 let paths = self.paths;
183
184 if check_directory_level
185 && ![usize::MAX, i].contains(&self.idx)
186 && (path_idx > 0 && paths[path_idx].as_ref().parent() != paths[path_idx - 1].as_ref().parent())
188 {
189 polars_bail!(
190 InvalidOperation:
191 "attempted to read from different directory levels with hive partitioning enabled: \
192 first path: {}, second path: {}",
193 paths[path_idx - 1].display(),
194 paths[path_idx].display(),
195 )
196 } else {
197 self.idx = std::cmp::min(self.idx, i);
198 Ok(())
199 }
200 }
201}
202
203pub fn expand_paths_hive(
207 paths: &[PlPath],
208 glob: bool,
209 hidden_file_prefix: &[PlSmallStr],
210 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
211 check_directory_level: bool,
212) -> PolarsResult<(Buffer<PlPath>, usize)> {
213 let Some(first_path) = paths.first() else {
214 return Ok((vec![].into(), 0));
215 };
216
217 let is_cloud = first_path.as_ref().is_cloud_url();
218
219 let is_hidden_file = move |path: &PlPath| {
220 path.as_ref()
221 .file_name()
222 .and_then(|x| x.to_str())
223 .is_some_and(|file_name| {
224 hidden_file_prefix
225 .iter()
226 .any(|x| file_name.starts_with(x.as_str()))
227 })
228 };
229
230 let mut out_paths = OutPaths {
231 paths: vec![],
232 exts: [None, None],
233 current_idx: 0,
234 is_hidden_file: &is_hidden_file,
235 };
236
237 let mut hive_idx_tracker = HiveIdxTracker {
238 idx: usize::MAX,
239 paths,
240 check_directory_level,
241 };
242
243 if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
244 #[cfg(feature = "cloud")]
245 {
246 use polars_utils::_limit_path_len_io_err;
247
248 use crate::cloud::object_path_from_str;
249
250 if first_path.cloud_scheme() == Some(CloudScheme::Hf) {
251 let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
252 hugging_face::expand_paths_hf(
253 paths,
254 check_directory_level,
255 cloud_options,
256 glob,
257 ),
258 )?;
259
260 return Ok((paths.into(), expand_start_idx));
261 }
262
263 let format_path = |scheme: &str, bucket: &str, location: &str| {
264 if is_cloud {
265 format!("{scheme}://{bucket}/{location}")
266 } else {
267 format!("/{location}")
268 }
269 };
270
271 let expand_path_cloud = |path: PlPathRef<'_>,
272 cloud_options: Option<&CloudOptions>|
273 -> PolarsResult<(usize, Vec<PlPath>)> {
274 crate::pl_async::get_runtime().block_in_place_on(async {
275 let path_str = path.to_str();
276
277 let (cloud_location, store) =
278 crate::cloud::build_object_store(path, cloud_options, glob).await?;
279 let prefix = object_path_from_str(&cloud_location.prefix)?;
280
281 let out = if !path_str.ends_with("/")
282 && (!glob || cloud_location.expansion.is_none())
283 && {
284 path.is_cloud_url() || path.as_local_path().unwrap().is_file()
289 } {
290 (
291 0,
292 vec![PlPath::from_string(format_path(
293 cloud_location.scheme,
294 &cloud_location.bucket,
295 prefix.as_ref(),
296 ))],
297 )
298 } else {
299 use futures::TryStreamExt;
300
301 if let Some(path) = path.as_local_path() {
302 if !path.is_dir() {
307 path.metadata()
308 .map_err(|err| _limit_path_len_io_err(path, err))?;
309 }
310 }
311
312 let cloud_location = &cloud_location;
313
314 let mut paths = store
315 .try_exec_rebuild_on_err(|store| {
316 let st = store.clone();
317
318 async {
319 let store = st;
320 let out = store
321 .list(Some(&prefix))
322 .try_filter_map(|x| async move {
323 let out = (x.size > 0).then(|| {
324 PlPath::from_string({
325 format_path(
326 cloud_location.scheme,
327 &cloud_location.bucket,
328 x.location.as_ref(),
329 )
330 })
331 });
332 Ok(out)
333 })
334 .try_collect::<Vec<_>>()
335 .await?;
336
337 Ok(out)
338 }
339 })
340 .await?;
341
342 let mut prefix = prefix.to_string();
345 if path_str.ends_with('/') && !prefix.ends_with('/') {
346 prefix.push('/')
347 };
348
349 paths.sort_unstable();
350
351 (
352 format_path(
353 cloud_location.scheme,
354 &cloud_location.bucket,
355 prefix.as_ref(),
356 )
357 .len(),
358 paths,
359 )
360 };
361
362 PolarsResult::Ok(out)
363 })
364 };
365
366 for (path_idx, path) in paths.iter().enumerate() {
367 use std::borrow::Cow;
368
369 let mut path = Cow::Borrowed(path);
370
371 if matches!(
372 path.cloud_scheme(),
373 Some(CloudScheme::Http | CloudScheme::Https)
374 ) {
375 let mut rewrite_aws = false;
376
377 #[cfg(feature = "aws")]
378 if let Some(p) = (|| {
379 use crate::cloud::CloudConfig;
380
381 let p = path.as_ref().as_ref();
384 let after_scheme = p.strip_scheme();
385
386 let bucket_end = after_scheme.find(".s3.")?;
387 let offset = bucket_end + 4;
388 let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
390
391 if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
393 return None;
394 }
395
396 let bucket = &after_scheme[..bucket_end];
397 let region = &after_scheme[bucket_end + 4..region_end];
398 let key = &after_scheme[region_end + 15..];
399
400 if let CloudConfig::Aws(configs) = cloud_options
401 .get_or_insert_default()
402 .config
403 .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
404 {
405 use object_store::aws::AmazonS3ConfigKey;
406
407 if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
408 configs.push((AmazonS3ConfigKey::Region, region.into()))
409 }
410 }
411
412 Some(format!("s3://{bucket}/{key}"))
413 })() {
414 path = Cow::Owned(PlPath::from_string(p));
415 rewrite_aws = true;
416 }
417
418 if !rewrite_aws {
419 out_paths.push(path.into_owned());
420 hive_idx_tracker.update(0, path_idx)?;
421 continue;
422 }
423 }
424
425 let glob_start_idx = get_glob_start_idx(path.to_str().as_bytes());
426
427 let path = if glob && glob_start_idx.is_some() {
428 path.clone()
429 } else {
430 let (expand_start_idx, paths) =
431 expand_path_cloud(path.as_ref().as_ref(), cloud_options.as_ref())?;
432 out_paths.extend_from_slice(&paths);
433 hive_idx_tracker.update(expand_start_idx, path_idx)?;
434 continue;
435 };
436
437 hive_idx_tracker.update(0, path_idx)?;
438
439 let iter = crate::pl_async::get_runtime().block_in_place_on(crate::async_glob(
440 path.as_ref().as_ref(),
441 cloud_options.as_ref(),
442 ))?;
443
444 if is_cloud {
445 out_paths.extend(iter.into_iter().map(PlPath::from_string));
446 } else {
447 out_paths.extend(
449 iter.iter()
450 .map(|x| &x[7..])
451 .map(|s| PlPathRef::new(s).into_owned()),
452 )
453 }
454 }
455 }
456 #[cfg(not(feature = "cloud"))]
457 panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
458 } else {
459 let mut stack = VecDeque::new();
460 let mut paths_scratch = vec![];
461
462 for (path_idx, path) in paths.iter().enumerate() {
463 let path = path.as_ref();
464 let path = path.as_local_path().unwrap();
465 stack.clear();
466
467 if path.is_dir() {
468 let path = path.to_path_buf();
469
470 let i = path.to_str().unwrap().len();
471
472 hive_idx_tracker.update(i, path_idx)?;
473
474 stack.push_back(path.clone());
475
476 while let Some(dir) = stack.pop_front() {
477 let mut last_err = Ok(());
478
479 paths_scratch.clear();
480 paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| match x {
481 Ok(v) => Some(v.path()),
482 Err(e) => {
483 last_err = Err(e);
484 None
485 },
486 }));
487
488 last_err?;
489
490 paths_scratch.sort_unstable();
491
492 for path in paths_scratch.drain(..) {
493 if path.is_dir() {
494 stack.push_back(path);
495 } else if path.metadata()?.len() > 0 {
496 out_paths.push(PlPath::Local(path.into()));
497 }
498 }
499 }
500
501 continue;
502 }
503
504 let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());
505
506 if glob && i.is_some() {
507 hive_idx_tracker.update(0, path_idx)?;
508
509 let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
510 polars_bail!(ComputeError: "invalid glob pattern given")
511 };
512
513 for path in paths {
514 let path = path.map_err(to_compute_err)?;
515 if !path.is_dir() && path.metadata()?.len() > 0 {
516 out_paths.push(PlPath::Local(path.into()));
517 }
518 }
519 } else {
520 hive_idx_tracker.update(0, path_idx)?;
521 out_paths.push(PlPath::Local(path.into()));
522 }
523 }
524 }
525
526 assert_eq!(out_paths.current_idx, out_paths.paths.len());
527
528 if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
529 if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
530 polars_bail!(
531 InvalidOperation: "directory contained paths with different file extensions: \
532 first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
533 which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
534 &out_paths.paths[i1].display(), &out_paths.paths[i2].display()
535 )
536 }
537 }
538
539 return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
540
541 struct OutPaths<'a, F: Fn(&PlPath) -> bool> {
544 paths: Vec<PlPath>,
545 exts: [Option<(PlSmallStr, usize)>; 2],
546 current_idx: usize,
547 is_hidden_file: &'a F,
548 }
549
550 impl<F> OutPaths<'_, F>
551 where
552 F: Fn(&PlPath) -> bool,
553 {
554 fn push(&mut self, value: PlPath) {
555 if (self.is_hidden_file)(&value) {
556 return;
557 }
558
559 let current_idx = &mut self.current_idx;
560 let exts = &mut self.exts;
561 Self::update_ext_status(current_idx, exts, value.as_ref());
562
563 self.paths.push(value)
564 }
565
566 fn extend(&mut self, values: impl IntoIterator<Item = PlPath>) {
567 let current_idx = &mut self.current_idx;
568 let exts = &mut self.exts;
569
570 self.paths.extend(
571 values
572 .into_iter()
573 .filter(|x| !(self.is_hidden_file)(x))
574 .inspect(|x| {
575 Self::update_ext_status(current_idx, exts, x.as_ref());
576 }),
577 )
578 }
579
580 fn extend_from_slice(&mut self, values: &[PlPath]) {
581 self.extend(values.iter().cloned())
582 }
583
584 fn update_ext_status(
585 current_idx: &mut usize,
586 exts: &mut [Option<(PlSmallStr, usize)>; 2],
587 value: PlPathRef,
588 ) {
589 let ext = value
590 .extension()
591 .map(PlSmallStr::from)
592 .unwrap_or(PlSmallStr::EMPTY);
593
594 if exts[0].is_none() {
595 exts[0] = Some((ext, *current_idx));
596 } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
597 exts[1] = Some((ext, *current_idx));
598 }
599
600 *current_idx += 1;
601 }
602 }
603}
604
605#[cfg(feature = "file_cache")]
607pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
608 let result = std::fs::create_dir_all(path);
609
610 if path.is_dir() { Ok(()) } else { result }
611}
612
613#[cfg(test)]
614mod tests {
615 use std::path::PathBuf;
616
617 use polars_utils::plpath::PlPath;
618
619 use super::resolve_homedir;
620
621 #[cfg(not(target_os = "windows"))]
622 #[test]
623 fn test_resolve_homedir() {
624 let paths: Vec<PathBuf> = vec![
625 "~/dir1/dir2/test.csv".into(),
626 "/abs/path/test.csv".into(),
627 "rel/path/test.csv".into(),
628 "/".into(),
629 "~".into(),
630 ];
631
632 let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
633
634 assert_eq!(resolved[0].file_name(), paths[0].file_name());
635 assert!(resolved[0].is_absolute());
636 assert_eq!(resolved[1], paths[1]);
637 assert_eq!(resolved[2], paths[2]);
638 assert_eq!(resolved[3], paths[3]);
639 assert!(resolved[4].is_absolute());
640 }
641
642 #[cfg(target_os = "windows")]
643 #[test]
644 fn test_resolve_homedir_windows() {
645 let paths: Vec<PathBuf> = vec![
646 r#"c:\Users\user1\test.csv"#.into(),
647 r#"~\user1\test.csv"#.into(),
648 "~".into(),
649 ];
650
651 let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
652
653 assert_eq!(resolved[0], paths[0]);
654 assert_eq!(resolved[1].file_name(), paths[1].file_name());
655 assert!(resolved[1].is_absolute());
656 assert!(resolved[2].is_absolute());
657 }
658
659 #[test]
660 fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
661 use super::expand_paths;
665
666 let path = "https://pola.rs/test.csv?token=bear";
667 let paths = &[PlPath::new(path)];
668 let out = expand_paths(paths, true, &[], &mut None).unwrap();
669 assert_eq!(out.as_ref(), paths);
670 }
671}