1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::path::{Path, PathBuf};
4use std::sync::LazyLock;
5
6use arrow::buffer::Buffer;
7use polars_core::config;
8use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
9use polars_utils::pl_path::{CloudScheme, PlRefPath};
10use polars_utils::pl_str::PlSmallStr;
11
12#[cfg(feature = "cloud")]
13mod hugging_face;
14
15use crate::cloud::CloudOptions;
16
17#[allow(clippy::bind_instead_of_map)]
18pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
19 (|| {
20 let verbose = config::verbose();
21
22 let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
23 if verbose {
24 eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
25 }
26 v
27 } else if cfg!(target_family = "unix") {
28 let id = std::env::var("USER")
29 .inspect(|_| {
30 if verbose {
31 eprintln!("init_temp_dir: sourced $USER")
32 }
33 })
34 .or_else(|_e| {
35 #[cfg(feature = "file_cache")]
38 {
39 std::env::var("HOME")
40 .inspect(|_| {
41 if verbose {
42 eprintln!("init_temp_dir: sourced $HOME")
43 }
44 })
45 .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
46 }
47 #[cfg(not(feature = "file_cache"))]
48 {
49 Err(_e)
50 }
51 });
52
53 if let Ok(v) = id {
54 std::env::temp_dir().join(format!("polars-{v}/"))
55 } else {
56 return Err(std::io::Error::other(
57 "could not load $USER or $HOME environment variables",
58 ));
59 }
60 } else if cfg!(target_family = "windows") {
61 std::env::temp_dir().join("polars/")
65 } else {
66 std::env::temp_dir().join("polars/")
67 }
68 .into_boxed_path();
69
70 if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
71 if !path.is_dir() {
72 panic!(
73 "failed to create temporary directory: {} (path = {:?})",
74 err,
75 path.as_ref()
76 );
77 }
78 }
79
80 #[cfg(target_family = "unix")]
81 {
82 use std::os::unix::fs::PermissionsExt;
83
84 let result = (|| {
85 std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
86 let perms = std::fs::metadata(path.as_ref())?.permissions();
87
88 if (perms.mode() % 0o1000) != 0o700 {
89 std::io::Result::Err(std::io::Error::other(format!(
90 "permission mismatch: {perms:?}"
91 )))
92 } else {
93 std::io::Result::Ok(())
94 }
95 })()
96 .map_err(|e| {
97 std::io::Error::new(
98 e.kind(),
99 format!(
100 "error setting temporary directory permissions: {} (path = {:?})",
101 e,
102 path.as_ref()
103 ),
104 )
105 });
106
107 if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
108 result?;
109 }
110 }
111
112 std::io::Result::Ok(path)
113 })()
114 .map_err(|e| {
115 std::io::Error::new(
116 e.kind(),
117 format!(
118 "error initializing temporary directory: {e} \
119 consider explicitly setting POLARS_TEMP_DIR"
120 ),
121 )
122 })
123 .unwrap()
124});
125
126pub fn resolve_homedir<'a, S: AsRef<Path> + ?Sized>(path: &'a S) -> Cow<'a, Path> {
128 return inner(path.as_ref());
129
130 fn inner(path: &Path) -> Cow<'_, Path> {
131 if path.starts_with("~") {
132 #[cfg(not(target_family = "wasm"))]
134 if let Some(homedir) = home::home_dir() {
135 return Cow::Owned(homedir.join(path.strip_prefix("~").unwrap()));
136 }
137 }
138
139 Cow::Borrowed(path)
140 }
141}
142
143pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
145 memchr::memchr3(b'*', b'?', b'[', path)
146}
147
148pub fn expanded_from_single_directory(paths: &[PlRefPath], expanded_paths: &[PlRefPath]) -> bool {
150 paths.len() == 1 && get_glob_start_idx(paths[0].strip_scheme().as_bytes()).is_none()
152 && {
154 (
155 !paths[0].has_scheme() && paths[0].as_std_path().is_dir()
157 )
158 || (
159 expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
162 )
163 }
164}
165
166pub fn expand_paths(
168 paths: &[PlRefPath],
169 glob: bool,
170 hidden_file_prefix: &[PlSmallStr],
171 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
172) -> PolarsResult<Buffer<PlRefPath>> {
173 expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false).map(|x| x.0)
174}
175
176struct HiveIdxTracker<'a> {
177 idx: usize,
178 paths: &'a [PlRefPath],
179 check_directory_level: bool,
180}
181
182impl HiveIdxTracker<'_> {
183 fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
184 let check_directory_level = self.check_directory_level;
185 let paths = self.paths;
186
187 if check_directory_level
188 && ![usize::MAX, i].contains(&self.idx)
189 && (path_idx > 0 && paths[path_idx].parent() != paths[path_idx - 1].parent())
191 {
192 polars_bail!(
193 InvalidOperation:
194 "attempted to read from different directory levels with hive partitioning enabled: \
195 first path: {}, second path: {}",
196 &paths[path_idx - 1],
197 &paths[path_idx],
198 )
199 } else {
200 self.idx = std::cmp::min(self.idx, i);
201 Ok(())
202 }
203 }
204}
205
206pub fn expand_paths_hive(
210 paths: &[PlRefPath],
211 glob: bool,
212 hidden_file_prefix: &[PlSmallStr],
213 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
214 check_directory_level: bool,
215) -> PolarsResult<(Buffer<PlRefPath>, usize)> {
216 let Some(first_path) = paths.first() else {
217 return Ok((vec![].into(), 0));
218 };
219
220 let first_path_has_scheme = first_path.has_scheme();
221
222 let is_hidden_file = move |path: &PlRefPath| {
223 path.file_name()
224 .and_then(|x| x.to_str())
225 .is_some_and(|file_name| {
226 hidden_file_prefix
227 .iter()
228 .any(|x| file_name.starts_with(x.as_str()))
229 })
230 };
231
232 let mut out_paths = OutPaths {
233 paths: vec![],
234 exts: [None, None],
235 current_idx: 0,
236 is_hidden_file: &is_hidden_file,
237 };
238
239 let mut hive_idx_tracker = HiveIdxTracker {
240 idx: usize::MAX,
241 paths,
242 check_directory_level,
243 };
244
245 if first_path_has_scheme || { cfg!(not(target_family = "windows")) && config::force_async() } {
246 #[cfg(feature = "cloud")]
247 {
248 use polars_utils::_limit_path_len_io_err;
249
250 use crate::cloud::object_path_from_str;
251
252 if first_path.scheme() == Some(CloudScheme::Hf) {
253 let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
254 hugging_face::expand_paths_hf(
255 paths,
256 check_directory_level,
257 cloud_options,
258 glob,
259 ),
260 )?;
261
262 return Ok((paths.into(), expand_start_idx));
263 }
264
265 let format_path = |scheme: &str, bucket: &str, location: &str| {
266 if first_path_has_scheme {
267 format!("{scheme}://{bucket}/{location}")
268 } else {
269 format!("/{location}")
270 }
271 };
272
273 let expand_path_cloud = |path: PlRefPath,
274 cloud_options: Option<&CloudOptions>|
275 -> PolarsResult<(usize, Vec<PlRefPath>)> {
276 crate::pl_async::get_runtime().block_in_place_on(async {
277 let path_str = path.as_str();
278
279 let (cloud_location, store) =
280 crate::cloud::build_object_store(path.clone(), cloud_options, glob).await?;
281 let prefix = object_path_from_str(&cloud_location.prefix)?;
282
283 let out = if !path_str.ends_with("/")
284 && (!glob || cloud_location.expansion.is_none())
285 && {
286 path.has_scheme() || path.as_std_path().is_file()
291 } {
292 (
293 0,
294 vec![PlRefPath::new(format_path(
295 cloud_location.scheme,
296 &cloud_location.bucket,
297 prefix.as_ref(),
298 ))],
299 )
300 } else {
301 use futures::TryStreamExt;
302
303 if !path.has_scheme() {
304 path.as_std_path()
309 .metadata()
310 .map_err(|err| _limit_path_len_io_err(path.as_std_path(), err))?;
311 }
312
313 let cloud_location = &cloud_location;
314
315 let mut paths = store
316 .try_exec_rebuild_on_err(|store| {
317 let st = store.clone();
318
319 async {
320 let store = st;
321 let out = store
322 .list(Some(&prefix))
323 .try_filter_map(|x| async move {
324 let out = (x.size > 0).then(|| {
325 PlRefPath::new({
326 format_path(
327 cloud_location.scheme,
328 &cloud_location.bucket,
329 x.location.as_ref(),
330 )
331 })
332 });
333 Ok(out)
334 })
335 .try_collect::<Vec<_>>()
336 .await?;
337
338 Ok(out)
339 }
340 })
341 .await?;
342
343 let mut prefix = prefix.to_string();
346 if path_str.ends_with('/') && !prefix.ends_with('/') {
347 prefix.push('/')
348 };
349
350 paths.sort_unstable();
351
352 (
353 format_path(
354 cloud_location.scheme,
355 &cloud_location.bucket,
356 prefix.as_ref(),
357 )
358 .len(),
359 paths,
360 )
361 };
362
363 PolarsResult::Ok(out)
364 })
365 };
366
367 for (path_idx, path) in paths.iter().enumerate() {
368 use std::borrow::Cow;
369
370 let mut path = Cow::Borrowed(path);
371
372 if matches!(path.scheme(), Some(CloudScheme::Http | CloudScheme::Https)) {
373 let mut rewrite_aws = false;
374
375 #[cfg(feature = "aws")]
376 if let Some(p) = (|| {
377 use crate::cloud::CloudConfig;
378
379 let after_scheme = path.strip_scheme();
382
383 let bucket_end = after_scheme.find(".s3.")?;
384 let offset = bucket_end + 4;
385 let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
387
388 if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
390 return None;
391 }
392
393 let bucket = &after_scheme[..bucket_end];
394 let region = &after_scheme[bucket_end + 4..region_end];
395 let key = &after_scheme[region_end + 15..];
396
397 if let CloudConfig::Aws(configs) = cloud_options
398 .get_or_insert_default()
399 .config
400 .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
401 {
402 use object_store::aws::AmazonS3ConfigKey;
403
404 if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
405 configs.push((AmazonS3ConfigKey::Region, region.into()))
406 }
407 }
408
409 Some(format!("s3://{bucket}/{key}"))
410 })() {
411 path = Cow::Owned(PlRefPath::new(p));
412 rewrite_aws = true;
413 }
414
415 if !rewrite_aws {
416 out_paths.push(path.into_owned());
417 hive_idx_tracker.update(0, path_idx)?;
418 continue;
419 }
420 }
421
422 let glob_start_idx = get_glob_start_idx(path.as_bytes());
423
424 let path = if glob && glob_start_idx.is_some() {
425 path.clone()
426 } else {
427 let (expand_start_idx, paths) =
428 expand_path_cloud(path.into_owned(), cloud_options.as_ref())?;
429 out_paths.extend_from_slice(&paths);
430 hive_idx_tracker.update(expand_start_idx, path_idx)?;
431 continue;
432 };
433
434 hive_idx_tracker.update(0, path_idx)?;
435
436 let iter = crate::pl_async::get_runtime().block_in_place_on(crate::async_glob(
437 path.into_owned(),
438 cloud_options.as_ref(),
439 ))?;
440
441 if first_path_has_scheme {
442 out_paths.extend(iter.into_iter().map(PlRefPath::new))
443 } else {
444 out_paths.extend(iter.iter().map(|x| &x[7..]).map(PlRefPath::new))
447 };
448 }
449 }
450 #[cfg(not(feature = "cloud"))]
451 panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
452 } else {
453 let mut stack = VecDeque::new();
454 let mut paths_scratch: Vec<Cow<'_, Path>> = vec![];
455
456 for (path_idx, path) in paths.iter().enumerate() {
457 stack.clear();
458
459 if path.as_std_path().is_dir() {
460 let i = path.as_str().len();
461 let path = Cow::Borrowed(path.as_std_path());
462
463 hive_idx_tracker.update(i, path_idx)?;
464
465 stack.push_back(path.clone());
466
467 while let Some(dir) = stack.pop_front() {
468 let mut last_err = Ok(());
469
470 paths_scratch.clear();
471 paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| match x {
472 Ok(v) => Some(Cow::Owned(v.path())),
473 Err(e) => {
474 last_err = Err(e);
475 None
476 },
477 }));
478
479 last_err?;
480
481 paths_scratch.sort_unstable();
482
483 for path in paths_scratch.drain(..) {
484 if path.is_dir() {
485 stack.push_back(path);
486 } else if path.metadata()?.len() > 0 {
487 out_paths.push(PlRefPath::try_from_path(&path)?);
488 }
489 }
490 }
491
492 continue;
493 }
494
495 let i = get_glob_start_idx(path.as_bytes());
496
497 if glob && i.is_some() {
498 hive_idx_tracker.update(0, path_idx)?;
499
500 let Ok(paths) = glob::glob(path.as_str()) else {
501 polars_bail!(ComputeError: "invalid glob pattern given")
502 };
503
504 for path in paths {
505 let path = path.map_err(to_compute_err)?;
506 if !path.is_dir() && path.metadata()?.len() > 0 {
507 out_paths.push(PlRefPath::try_from_path(&path)?);
508 }
509 }
510 } else {
511 hive_idx_tracker.update(0, path_idx)?;
512 out_paths.push(path.clone());
513 }
514 }
515 }
516
517 assert_eq!(out_paths.current_idx, out_paths.paths.len());
518
519 if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
520 if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
521 polars_bail!(
522 InvalidOperation: "directory contained paths with different file extensions: \
523 first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
524 which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
525 &out_paths.paths[i1], &out_paths.paths[i2]
526 )
527 }
528 }
529
530 return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
531
532 struct OutPaths<'a, F: Fn(&PlRefPath) -> bool> {
535 paths: Vec<PlRefPath>,
536 exts: [Option<(PlSmallStr, usize)>; 2],
537 current_idx: usize,
538 is_hidden_file: &'a F,
539 }
540
541 impl<F> OutPaths<'_, F>
542 where
543 F: Fn(&PlRefPath) -> bool,
544 {
545 fn push(&mut self, value: PlRefPath) {
546 if (self.is_hidden_file)(&value) {
547 return;
548 }
549
550 let current_idx = &mut self.current_idx;
551 let exts = &mut self.exts;
552 Self::update_ext_status(current_idx, exts, &value);
553
554 self.paths.push(value)
555 }
556
557 fn extend(&mut self, values: impl IntoIterator<Item = PlRefPath>) {
558 let current_idx = &mut self.current_idx;
559 let exts = &mut self.exts;
560
561 self.paths.extend(
562 values
563 .into_iter()
564 .filter(|x| !(self.is_hidden_file)(x))
565 .inspect(|x| {
566 Self::update_ext_status(current_idx, exts, x);
567 }),
568 )
569 }
570
571 fn extend_from_slice(&mut self, values: &[PlRefPath]) {
572 self.extend(values.iter().cloned())
573 }
574
575 fn update_ext_status(
576 current_idx: &mut usize,
577 exts: &mut [Option<(PlSmallStr, usize)>; 2],
578 value: &PlRefPath,
579 ) {
580 let ext = value
581 .extension()
582 .map_or(PlSmallStr::EMPTY, PlSmallStr::from);
583
584 if exts[0].is_none() {
585 exts[0] = Some((ext, *current_idx));
586 } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
587 exts[1] = Some((ext, *current_idx));
588 }
589
590 *current_idx += 1;
591 }
592 }
593}
594
595#[cfg(feature = "file_cache")]
597pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
598 let result = std::fs::create_dir_all(path);
599
600 if path.is_dir() { Ok(()) } else { result }
601}
602
603#[cfg(test)]
604mod tests {
605 use std::path::PathBuf;
606
607 use polars_utils::pl_path::PlRefPath;
608
609 use super::resolve_homedir;
610
611 #[cfg(not(target_os = "windows"))]
612 #[test]
613 fn test_resolve_homedir() {
614 let paths: Vec<PathBuf> = vec![
615 "~/dir1/dir2/test.csv".into(),
616 "/abs/path/test.csv".into(),
617 "rel/path/test.csv".into(),
618 "/".into(),
619 "~".into(),
620 ];
621
622 let resolved: Vec<PathBuf> = paths
623 .iter()
624 .map(resolve_homedir)
625 .map(|x| x.into_owned())
626 .collect();
627
628 assert_eq!(resolved[0].file_name(), paths[0].file_name());
629 assert!(resolved[0].is_absolute());
630 assert_eq!(resolved[1], paths[1]);
631 assert_eq!(resolved[2], paths[2]);
632 assert_eq!(resolved[3], paths[3]);
633 assert!(resolved[4].is_absolute());
634 }
635
636 #[cfg(target_os = "windows")]
637 #[test]
638 fn test_resolve_homedir_windows() {
639 let paths: Vec<PathBuf> = vec![
640 r#"c:\Users\user1\test.csv"#.into(),
641 r#"~\user1\test.csv"#.into(),
642 "~".into(),
643 ];
644
645 let resolved: Vec<PathBuf> = paths
646 .iter()
647 .map(resolve_homedir)
648 .map(|x| x.into_owned())
649 .collect();
650
651 assert_eq!(resolved[0], paths[0]);
652 assert_eq!(resolved[1].file_name(), paths[1].file_name());
653 assert!(resolved[1].is_absolute());
654 assert!(resolved[2].is_absolute());
655 }
656
657 #[test]
658 fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
659 use super::expand_paths;
663
664 let path = "https://pola.rs/test.csv?token=bear";
665 let paths = &[PlRefPath::new(path)];
666 let out = expand_paths(paths, true, &[], &mut None).unwrap();
667 assert_eq!(out.as_ref(), paths);
668 }
669}