1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::path::{Path, PathBuf};
4use std::sync::LazyLock;
5
6use polars_buffer::Buffer;
7use polars_core::config;
8use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
9use polars_utils::pl_path::{CloudScheme, PlRefPath};
10use polars_utils::pl_str::PlSmallStr;
11
12#[cfg(feature = "cloud")]
13mod hugging_face;
14
15use crate::cloud::CloudOptions;
16
17#[allow(clippy::bind_instead_of_map)]
18pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
19 (|| {
20 let verbose = config::verbose();
21
22 let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
23 if verbose {
24 eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
25 }
26 v
27 } else if cfg!(target_family = "unix") {
28 let id = std::env::var("USER")
29 .inspect(|_| {
30 if verbose {
31 eprintln!("init_temp_dir: sourced $USER")
32 }
33 })
34 .or_else(|_e| {
35 #[cfg(feature = "file_cache")]
38 {
39 std::env::var("HOME")
40 .inspect(|_| {
41 if verbose {
42 eprintln!("init_temp_dir: sourced $HOME")
43 }
44 })
45 .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
46 }
47 #[cfg(not(feature = "file_cache"))]
48 {
49 Err(_e)
50 }
51 });
52
53 if let Ok(v) = id {
54 std::env::temp_dir().join(format!("polars-{v}/"))
55 } else {
56 return Err(std::io::Error::other(
57 "could not load $USER or $HOME environment variables",
58 ));
59 }
60 } else if cfg!(target_family = "windows") {
61 std::env::temp_dir().join("polars/")
65 } else {
66 std::env::temp_dir().join("polars/")
67 }
68 .into_boxed_path();
69
70 let perm_result = create_dir_owner_only(path.as_ref());
71
72 if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
73 perm_result?;
74 }
75
76 std::io::Result::Ok(path)
77 })()
78 .map_err(|e| {
79 std::io::Error::new(
80 e.kind(),
81 format!(
82 "error initializing temporary directory: {e} \
83 consider explicitly setting POLARS_TEMP_DIR"
84 ),
85 )
86 })
87 .unwrap()
88});
89
90pub fn create_dir_owner_only(path: &Path) -> std::io::Result<()> {
92 std::fs::create_dir_all(path)?;
93
94 #[cfg(target_family = "unix")]
95 {
96 use std::os::unix::fs::PermissionsExt;
97
98 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o700))?;
99 let perms = std::fs::metadata(path)?.permissions();
100
101 if (perms.mode() % 0o1000) != 0o700 {
102 return Err(std::io::Error::other(format!(
103 "error setting directory permissions: permission mismatch: {perms:?} (path = {path:?})"
104 )));
105 }
106 }
107
108 Ok(())
109}
110
111pub fn resolve_homedir<'a, S: AsRef<Path> + ?Sized>(path: &'a S) -> Cow<'a, Path> {
113 return inner(path.as_ref());
114
115 fn inner(path: &Path) -> Cow<'_, Path> {
116 if path.starts_with("~") {
117 #[cfg(not(target_family = "wasm"))]
119 if let Some(homedir) = home::home_dir() {
120 return Cow::Owned(homedir.join(path.strip_prefix("~").unwrap()));
121 }
122 }
123
124 Cow::Borrowed(path)
125 }
126}
127
128fn has_glob(path: &[u8]) -> bool {
129 return get_glob_start_idx(path).is_some();
130
131 fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
133 memchr::memchr3(b'*', b'?', b'[', path)
134 }
135}
136
137pub fn expanded_from_single_directory(paths: &[PlRefPath], expanded_paths: &[PlRefPath]) -> bool {
139 paths.len() == 1 && !has_glob(paths[0].strip_scheme().as_bytes())
141 && {
143 (
144 !paths[0].has_scheme() && paths[0].as_std_path().is_dir()
146 )
147 || (
148 expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
151 )
152 }
153}
154
155pub async fn expand_paths(
157 paths: &[PlRefPath],
158 glob: bool,
159 hidden_file_prefix: &[PlSmallStr],
160 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
161) -> PolarsResult<Buffer<PlRefPath>> {
162 expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false)
163 .await
164 .map(|x| x.0)
165}
166
167struct HiveIdxTracker<'a> {
168 idx: usize,
169 paths: &'a [PlRefPath],
170 check_directory_level: bool,
171}
172
173impl HiveIdxTracker<'_> {
174 fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
175 let check_directory_level = self.check_directory_level;
176 let paths = self.paths;
177
178 if check_directory_level
179 && ![usize::MAX, i].contains(&self.idx)
180 && (path_idx > 0 && paths[path_idx].parent() != paths[path_idx - 1].parent())
182 {
183 polars_bail!(
184 InvalidOperation:
185 "attempted to read from different directory levels with hive partitioning enabled: \
186 first path: {}, second path: {}",
187 &paths[path_idx - 1],
188 &paths[path_idx],
189 )
190 } else {
191 self.idx = std::cmp::min(self.idx, i);
192 Ok(())
193 }
194 }
195}
196
197#[cfg(feature = "cloud")]
198async fn expand_path_cloud(
199 path: PlRefPath,
200 cloud_options: Option<&CloudOptions>,
201 glob: bool,
202 first_path_has_scheme: bool,
203) -> PolarsResult<(usize, Vec<PlRefPath>)> {
204 let format_path = |scheme: &str, bucket: &str, location: &str| {
205 if first_path_has_scheme {
206 format!("{scheme}://{bucket}/{location}")
207 } else {
208 format!("/{location}")
209 }
210 };
211
212 use polars_utils::_limit_path_len_io_err;
213
214 use crate::cloud::object_path_from_str;
215 let path_str = path.as_str();
216
217 let (cloud_location, store) =
218 crate::cloud::build_object_store(path.clone(), cloud_options, glob).await?;
219 let prefix = object_path_from_str(&cloud_location.prefix)?;
220
221 let out = if !path_str.ends_with("/") && (!glob || cloud_location.expansion.is_none()) && {
222 path.has_scheme() || path.as_std_path().is_file()
227 } {
228 (
229 0,
230 vec![PlRefPath::new(format_path(
231 cloud_location.scheme,
232 &cloud_location.bucket,
233 prefix.as_ref(),
234 ))],
235 )
236 } else {
237 use futures::TryStreamExt;
238
239 if !path.has_scheme() {
240 path.as_std_path()
245 .metadata()
246 .map_err(|err| _limit_path_len_io_err(path.as_std_path(), err))?;
247 }
248
249 let cloud_location = &cloud_location;
250 let prefix_ref = &prefix;
251
252 let mut paths = store
253 .exec_with_rebuild_retry_on_err(|s| async move {
254 let out = s
255 .list(Some(prefix_ref))
256 .try_filter_map(|x| async move {
257 let out = (x.size > 0).then(|| {
258 PlRefPath::new({
259 format_path(
260 cloud_location.scheme,
261 &cloud_location.bucket,
262 x.location.as_ref(),
263 )
264 })
265 });
266 Ok(out)
267 })
268 .try_collect::<Vec<_>>()
269 .await?;
270
271 Ok(out)
272 })
273 .await?;
274
275 let mut prefix = prefix.to_string();
278 if path_str.ends_with('/') && !prefix.ends_with('/') {
279 prefix.push('/')
280 };
281
282 paths.sort_unstable();
283
284 (
285 format_path(
286 cloud_location.scheme,
287 &cloud_location.bucket,
288 prefix.as_ref(),
289 )
290 .len(),
291 paths,
292 )
293 };
294
295 PolarsResult::Ok(out)
296}
297
298pub async fn expand_paths_hive(
302 paths: &[PlRefPath],
303 glob: bool,
304 hidden_file_prefix: &[PlSmallStr],
305 #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
306 check_directory_level: bool,
307) -> PolarsResult<(Buffer<PlRefPath>, usize)> {
308 let Some(first_path) = paths.first() else {
309 return Ok((vec![].into(), 0));
310 };
311
312 let first_path_has_scheme = first_path.has_scheme();
313
314 let is_hidden_file = move |path: &PlRefPath| {
315 path.file_name()
316 .and_then(|x| x.to_str())
317 .is_some_and(|file_name| {
318 hidden_file_prefix
319 .iter()
320 .any(|x| file_name.starts_with(x.as_str()))
321 })
322 };
323
324 let mut out_paths = OutPaths {
325 paths: vec![],
326 exts: [None, None],
327 is_hidden_file: &is_hidden_file,
328 };
329
330 let mut hive_idx_tracker = HiveIdxTracker {
331 idx: usize::MAX,
332 paths,
333 check_directory_level,
334 };
335
336 if first_path_has_scheme || {
337 cfg!(not(target_family = "windows")) && polars_config::config().force_async()
338 } {
339 #[cfg(feature = "cloud")]
340 {
341 if first_path.scheme() == Some(CloudScheme::Hf) {
342 let (expand_start_idx, paths) = hugging_face::expand_paths_hf(
343 paths,
344 check_directory_level,
345 cloud_options,
346 glob,
347 )
348 .await?;
349
350 return Ok((paths.into(), expand_start_idx));
351 }
352
353 for (path_idx, path) in paths.iter().enumerate() {
354 use std::borrow::Cow;
355
356 let mut path = Cow::Borrowed(path);
357
358 if matches!(path.scheme(), Some(CloudScheme::Http | CloudScheme::Https)) {
359 let mut rewrite_aws = false;
360
361 #[cfg(feature = "aws")]
362 if let Some(p) = (|| {
363 use crate::cloud::CloudConfig;
364
365 let after_scheme = path.strip_scheme();
368
369 let bucket_end = after_scheme.find(".s3.")?;
370 let offset = bucket_end + 4;
371 let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
373
374 if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
376 return None;
377 }
378
379 let bucket = &after_scheme[..bucket_end];
380 let region = &after_scheme[bucket_end + 4..region_end];
381 let key = &after_scheme[region_end + 15..];
382
383 if let CloudConfig::Aws(configs) = cloud_options
384 .get_or_insert_default()
385 .config
386 .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
387 {
388 use object_store::aws::AmazonS3ConfigKey;
389
390 if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
391 configs.push((AmazonS3ConfigKey::Region, region.into()))
392 }
393 }
394
395 Some(format!("s3://{bucket}/{key}"))
396 })() {
397 path = Cow::Owned(PlRefPath::new(p));
398 rewrite_aws = true;
399 }
400
401 if !rewrite_aws {
402 out_paths.push(path.into_owned());
403 hive_idx_tracker.update(0, path_idx)?;
404 continue;
405 }
406 }
407
408 let sort_start_idx = out_paths.paths.len();
409
410 if glob && has_glob(path.as_bytes()) {
411 hive_idx_tracker.update(0, path_idx)?;
412
413 let iter = crate::pl_async::get_runtime().block_in_place_on(
414 crate::async_glob(path.into_owned(), cloud_options.as_ref()),
415 )?;
416
417 if first_path_has_scheme {
418 out_paths.extend(iter.into_iter().map(PlRefPath::new))
419 } else {
420 out_paths.extend(iter.iter().map(|x| &x[7..]).map(PlRefPath::new))
423 };
424 } else {
425 let (expand_start_idx, paths) = expand_path_cloud(
426 path.into_owned(),
427 cloud_options.as_ref(),
428 glob,
429 first_path_has_scheme,
430 )
431 .await?;
432 out_paths.extend_from_slice(&paths);
433 hive_idx_tracker.update(expand_start_idx, path_idx)?;
434 };
435
436 if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
437 <[PlRefPath]>::sort_unstable(mut_slice);
438 }
439 }
440 }
441 #[cfg(not(feature = "cloud"))]
442 panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
443 } else {
444 let mut stack: VecDeque<Cow<'_, Path>> = VecDeque::new();
445 let mut paths_scratch: Vec<PathBuf> = vec![];
446
447 for (path_idx, path) in paths.iter().enumerate() {
448 stack.clear();
449 let sort_start_idx = out_paths.paths.len();
450
451 if path.as_std_path().is_dir() {
452 let i = path.as_str().len();
453
454 hive_idx_tracker.update(i, path_idx)?;
455
456 stack.push_back(Cow::Borrowed(path.as_std_path()));
457
458 while let Some(dir) = stack.pop_front() {
459 let mut last_err = Ok(());
460
461 paths_scratch.clear();
462 paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| {
463 match x.map(|x| x.path()) {
464 Ok(v) => Some(v),
465 Err(e) => {
466 last_err = Err(e);
467 None
468 },
469 }
470 }));
471
472 last_err?;
473
474 for path in paths_scratch.drain(..) {
475 let md = path.metadata()?;
476
477 if md.is_dir() {
478 stack.push_back(Cow::Owned(path));
479 } else if md.len() > 0 {
480 out_paths.push(PlRefPath::try_from_path(&path)?);
481 }
482 }
483 }
484 } else if glob && has_glob(path.as_bytes()) {
485 hive_idx_tracker.update(0, path_idx)?;
486
487 let Ok(paths) = glob::glob(path.as_str()) else {
488 polars_bail!(ComputeError: "invalid glob pattern given")
489 };
490
491 for path in paths {
492 let path = path.map_err(to_compute_err)?;
493 let md = path.metadata()?;
494 if !md.is_dir() && md.len() > 0 {
495 out_paths.push(PlRefPath::try_from_path(&path)?);
496 }
497 }
498 } else {
499 hive_idx_tracker.update(0, path_idx)?;
500 out_paths.push(path.clone());
501 };
502
503 if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
504 <[PlRefPath]>::sort_unstable(mut_slice);
505 }
506 }
507 }
508
509 if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
510 if let [Some((_, p1)), Some((_, p2))] = out_paths.exts {
511 polars_bail!(
512 InvalidOperation: "directory contained paths with different file extensions: \
513 first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
514 which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
515 &p1, &p2
516 )
517 }
518 }
519
520 return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
521
522 struct OutPaths<'a, F: Fn(&PlRefPath) -> bool> {
525 paths: Vec<PlRefPath>,
526 exts: [Option<(PlSmallStr, PlRefPath)>; 2],
527 is_hidden_file: &'a F,
528 }
529
530 impl<F> OutPaths<'_, F>
531 where
532 F: Fn(&PlRefPath) -> bool,
533 {
534 fn push(&mut self, value: PlRefPath) {
535 if (self.is_hidden_file)(&value) {
536 return;
537 }
538
539 let exts = &mut self.exts;
540 Self::update_ext_status(exts, &value);
541
542 self.paths.push(value)
543 }
544
545 fn extend(&mut self, values: impl IntoIterator<Item = PlRefPath>) {
546 let exts = &mut self.exts;
547
548 self.paths.extend(
549 values
550 .into_iter()
551 .filter(|x| !(self.is_hidden_file)(x))
552 .inspect(|x| {
553 Self::update_ext_status(exts, x);
554 }),
555 )
556 }
557
558 fn extend_from_slice(&mut self, values: &[PlRefPath]) {
559 self.extend(values.iter().cloned())
560 }
561
562 fn update_ext_status(exts: &mut [Option<(PlSmallStr, PlRefPath)>; 2], value: &PlRefPath) {
563 let ext = value
564 .extension()
565 .map_or(PlSmallStr::EMPTY, PlSmallStr::from);
566
567 if exts[0].is_none() {
568 exts[0] = Some((ext, value.clone()));
569 } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
570 exts[1] = Some((ext, value.clone()));
571 }
572 }
573 }
574}
575
576#[cfg(feature = "file_cache")]
578pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
579 let result = std::fs::create_dir_all(path);
580
581 if path.is_dir() { Ok(()) } else { result }
582}
583
584#[cfg(test)]
585mod tests {
586 use std::path::PathBuf;
587
588 use polars_utils::pl_path::PlRefPath;
589
590 use super::resolve_homedir;
591 use crate::pl_async::get_runtime;
592
593 #[cfg(not(target_os = "windows"))]
594 #[test]
595 fn test_resolve_homedir() {
596 let paths: Vec<PathBuf> = vec![
597 "~/dir1/dir2/test.csv".into(),
598 "/abs/path/test.csv".into(),
599 "rel/path/test.csv".into(),
600 "/".into(),
601 "~".into(),
602 ];
603
604 let resolved: Vec<PathBuf> = paths
605 .iter()
606 .map(resolve_homedir)
607 .map(|x| x.into_owned())
608 .collect();
609
610 assert_eq!(resolved[0].file_name(), paths[0].file_name());
611 assert!(resolved[0].is_absolute());
612 assert_eq!(resolved[1], paths[1]);
613 assert_eq!(resolved[2], paths[2]);
614 assert_eq!(resolved[3], paths[3]);
615 assert!(resolved[4].is_absolute());
616 }
617
618 #[cfg(target_os = "windows")]
619 #[test]
620 fn test_resolve_homedir_windows() {
621 let paths: Vec<PathBuf> = vec![
622 r#"c:\Users\user1\test.csv"#.into(),
623 r#"~\user1\test.csv"#.into(),
624 "~".into(),
625 ];
626
627 let resolved: Vec<PathBuf> = paths
628 .iter()
629 .map(resolve_homedir)
630 .map(|x| x.into_owned())
631 .collect();
632
633 assert_eq!(resolved[0], paths[0]);
634 assert_eq!(resolved[1].file_name(), paths[1].file_name());
635 assert!(resolved[1].is_absolute());
636 assert!(resolved[2].is_absolute());
637 }
638
639 #[test]
640 fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
641 use super::expand_paths;
645
646 let path = "https://pola.rs/test.csv?token=bear";
647 let paths = &[PlRefPath::new(path)];
648 let out = get_runtime()
649 .block_on(expand_paths(paths, true, &[], &mut None))
650 .unwrap();
651 assert_eq!(out.as_ref(), paths);
652 }
653}