1use std::collections::VecDeque;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, LazyLock};
4
5use polars_core::config;
6use polars_core::error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::pl_str::PlSmallStr;
8
9#[cfg(feature = "cloud")]
10mod hugging_face;
11
12use crate::cloud::CloudOptions;
13
14pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
15 (|| {
16 let verbose = config::verbose();
17
18 let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
19 if verbose {
20 eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
21 }
22 v
23 } else if cfg!(target_family = "unix") {
24 let id = std::env::var("USER")
25 .inspect(|_| {
26 if verbose {
27 eprintln!("init_temp_dir: sourced $USER")
28 }
29 })
30 .or_else(|_e| {
31 #[cfg(feature = "file_cache")]
34 {
35 std::env::var("HOME")
36 .inspect(|_| {
37 if verbose {
38 eprintln!("init_temp_dir: sourced $HOME")
39 }
40 })
41 .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
42 }
43 #[cfg(not(feature = "file_cache"))]
44 {
45 Err(_e)
46 }
47 });
48
49 if let Ok(v) = id {
50 std::env::temp_dir().join(format!("polars-{}/", v))
51 } else {
52 return Err(std::io::Error::other(
53 "could not load $USER or $HOME environment variables",
54 ));
55 }
56 } else if cfg!(target_family = "windows") {
57 std::env::temp_dir().join("polars/")
61 } else {
62 std::env::temp_dir().join("polars/")
63 }
64 .into_boxed_path();
65
66 if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
67 if !path.is_dir() {
68 panic!(
69 "failed to create temporary directory: {} (path = {:?})",
70 err,
71 path.as_ref()
72 );
73 }
74 }
75
76 #[cfg(target_family = "unix")]
77 {
78 use std::os::unix::fs::PermissionsExt;
79
80 let result = (|| {
81 std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
82 let perms = std::fs::metadata(path.as_ref())?.permissions();
83
84 if (perms.mode() % 0o1000) != 0o700 {
85 std::io::Result::Err(std::io::Error::other(format!(
86 "permission mismatch: {:?}",
87 perms
88 )))
89 } else {
90 std::io::Result::Ok(())
91 }
92 })()
93 .map_err(|e| {
94 std::io::Error::new(
95 e.kind(),
96 format!(
97 "error setting temporary directory permissions: {} (path = {:?})",
98 e,
99 path.as_ref()
100 ),
101 )
102 });
103
104 if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
105 result?;
106 }
107 }
108
109 std::io::Result::Ok(path)
110 })()
111 .map_err(|e| {
112 std::io::Error::new(
113 e.kind(),
114 format!(
115 "error initializing temporary directory: {} \
116 consider explicitly setting POLARS_TEMP_DIR",
117 e
118 ),
119 )
120 })
121 .unwrap()
122});
123
124pub fn resolve_homedir(path: &dyn AsRef<Path>) -> PathBuf {
126 let path = path.as_ref();
127
128 if path.starts_with("~") {
129 #[cfg(not(target_family = "wasm"))]
131 if let Some(homedir) = home::home_dir() {
132 return homedir.join(path.strip_prefix("~").unwrap());
133 }
134 }
135
136 path.into()
137}
138
139polars_utils::regex_cache::cached_regex! {
140 static CLOUD_URL = r"^(s3a?|gs|gcs|file|abfss?|azure|az|adl|https?|hf)://";
141}
142
143pub fn is_cloud_url<P: AsRef<Path>>(p: P) -> bool {
145 match p.as_ref().as_os_str().to_str() {
146 Some(s) => CLOUD_URL.is_match(s),
147 _ => false,
148 }
149}
150
151pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
153 memchr::memchr3(b'*', b'?', b'[', path)
154}
155
156pub fn expanded_from_single_directory<P: AsRef<std::path::Path>>(
158 paths: &[P],
159 expanded_paths: &[P],
160) -> bool {
161 paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().to_str().unwrap().as_bytes()).is_none()
163 && {
165 (
166 !is_cloud_url(paths[0].as_ref()) && paths[0].as_ref().is_dir()
168 )
169 || (
170 expanded_paths.is_empty() || (paths[0].as_ref() != expanded_paths[0].as_ref())
173 )
174 }
175}
176
177pub fn expand_paths(
179 paths: &[PathBuf],
180 glob: bool,
181 #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
182) -> PolarsResult<Arc<[PathBuf]>> {
183 expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
184}
185
186struct HiveIdxTracker<'a> {
187 idx: usize,
188 paths: &'a [PathBuf],
189 check_directory_level: bool,
190}
191
192impl HiveIdxTracker<'_> {
193 fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
194 let check_directory_level = self.check_directory_level;
195 let paths = self.paths;
196
197 if check_directory_level
198 && ![usize::MAX, i].contains(&self.idx)
199 && (path_idx > 0 && paths[path_idx].parent() != paths[path_idx - 1].parent())
201 {
202 polars_bail!(
203 InvalidOperation:
204 "attempted to read from different directory levels with hive partitioning enabled: \
205 first path: {}, second path: {}",
206 paths[path_idx - 1].to_str().unwrap(),
207 paths[path_idx].to_str().unwrap(),
208 )
209 } else {
210 self.idx = std::cmp::min(self.idx, i);
211 Ok(())
212 }
213 }
214}
215
216pub fn expand_paths_hive(
220 paths: &[PathBuf],
221 glob: bool,
222 #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
223 check_directory_level: bool,
224) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
225 let Some(first_path) = paths.first() else {
226 return Ok((vec![].into(), 0));
227 };
228
229 let is_cloud = is_cloud_url(first_path);
230
231 struct OutPaths {
234 paths: Vec<PathBuf>,
235 exts: [Option<(PlSmallStr, usize)>; 2],
236 current_idx: usize,
237 }
238
239 impl OutPaths {
240 fn update_ext_status(
241 current_idx: &mut usize,
242 exts: &mut [Option<(PlSmallStr, usize)>; 2],
243 value: &Path,
244 ) {
245 let ext = value
246 .extension()
247 .map(|x| PlSmallStr::from(x.to_str().unwrap()))
248 .unwrap_or(PlSmallStr::EMPTY);
249
250 if exts[0].is_none() {
251 exts[0] = Some((ext, *current_idx));
252 } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
253 exts[1] = Some((ext, *current_idx));
254 }
255
256 *current_idx += 1;
257 }
258
259 fn push(&mut self, value: PathBuf) {
260 {
261 let current_idx = &mut self.current_idx;
262 let exts = &mut self.exts;
263 Self::update_ext_status(current_idx, exts, &value);
264 }
265 self.paths.push(value)
266 }
267
268 fn extend(&mut self, values: impl IntoIterator<Item = PathBuf>) {
269 let current_idx = &mut self.current_idx;
270 let exts = &mut self.exts;
271
272 self.paths.extend(values.into_iter().inspect(|x| {
273 Self::update_ext_status(current_idx, exts, x);
274 }))
275 }
276
277 fn extend_from_slice(&mut self, values: &[PathBuf]) {
278 self.extend(values.iter().cloned())
279 }
280 }
281
282 let mut out_paths = OutPaths {
283 paths: vec![],
284 exts: [None, None],
285 current_idx: 0,
286 };
287
288 let mut hive_idx_tracker = HiveIdxTracker {
289 idx: usize::MAX,
290 paths,
291 check_directory_level,
292 };
293
294 if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
295 #[cfg(feature = "cloud")]
296 {
297 use polars_utils::_limit_path_len_io_err;
298
299 use crate::cloud::object_path_from_str;
300
301 if first_path.starts_with("hf://") {
302 let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
303 hugging_face::expand_paths_hf(
304 paths,
305 check_directory_level,
306 cloud_options,
307 glob,
308 ),
309 )?;
310
311 return Ok((Arc::from(paths), expand_start_idx));
312 }
313
314 let format_path = |scheme: &str, bucket: &str, location: &str| {
315 if is_cloud {
316 format!("{}://{}/{}", scheme, bucket, location)
317 } else {
318 format!("/{}", location)
319 }
320 };
321
322 let expand_path_cloud = |path: &str,
323 cloud_options: Option<&CloudOptions>|
324 -> PolarsResult<(usize, Vec<PathBuf>)> {
325 crate::pl_async::get_runtime().block_in_place_on(async {
326 let (cloud_location, store) =
327 crate::cloud::build_object_store(path, cloud_options, glob).await?;
328 let prefix = object_path_from_str(&cloud_location.prefix)?;
329
330 let out = if !path.ends_with("/")
331 && (!glob || cloud_location.expansion.is_none())
332 && {
333 is_cloud || PathBuf::from(path).is_file()
338 } {
339 (
340 0,
341 vec![PathBuf::from(format_path(
342 &cloud_location.scheme,
343 &cloud_location.bucket,
344 prefix.as_ref(),
345 ))],
346 )
347 } else {
348 use futures::TryStreamExt;
349
350 if !is_cloud {
351 let path = PathBuf::from(path);
356 if !path.is_dir() {
357 path.metadata()
358 .map_err(|err| _limit_path_len_io_err(&path, err))?;
359 }
360 }
361
362 let cloud_location = &cloud_location;
363
364 let mut paths = store
365 .try_exec_rebuild_on_err(|store| {
366 let st = store.clone();
367
368 async {
369 let store = st;
370 let out = store
371 .list(Some(&prefix))
372 .try_filter_map(|x| async move {
373 let out = (x.size > 0).then(|| {
374 PathBuf::from({
375 format_path(
376 &cloud_location.scheme,
377 &cloud_location.bucket,
378 x.location.as_ref(),
379 )
380 })
381 });
382 Ok(out)
383 })
384 .try_collect::<Vec<_>>()
385 .await?;
386
387 Ok(out)
388 }
389 })
390 .await?;
391
392 paths.sort_unstable();
393 (
394 format_path(
395 &cloud_location.scheme,
396 &cloud_location.bucket,
397 &cloud_location.prefix,
398 )
399 .len(),
400 paths,
401 )
402 };
403
404 PolarsResult::Ok(out)
405 })
406 };
407
408 for (path_idx, path) in paths.iter().enumerate() {
409 if path.to_str().unwrap().starts_with("http") {
410 out_paths.push(path.clone());
411 hive_idx_tracker.update(0, path_idx)?;
412 continue;
413 }
414
415 let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes());
416
417 let path = if glob && glob_start_idx.is_some() {
418 path.clone()
419 } else {
420 let (expand_start_idx, paths) =
421 expand_path_cloud(path.to_str().unwrap(), cloud_options)?;
422 out_paths.extend_from_slice(&paths);
423 hive_idx_tracker.update(expand_start_idx, path_idx)?;
424 continue;
425 };
426
427 hive_idx_tracker.update(0, path_idx)?;
428
429 let iter = crate::pl_async::get_runtime()
430 .block_in_place_on(crate::async_glob(path.to_str().unwrap(), cloud_options))?;
431
432 if is_cloud {
433 out_paths.extend(iter.into_iter().map(PathBuf::from));
434 } else {
435 out_paths.extend(iter.iter().map(|x| &x[7..]).map(PathBuf::from))
437 }
438 }
439 }
440 #[cfg(not(feature = "cloud"))]
441 panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
442 } else {
443 let mut stack = VecDeque::new();
444
445 for path_idx in 0..paths.len() {
446 let path = &paths[path_idx];
447 stack.clear();
448
449 if path.is_dir() {
450 let i = path.to_str().unwrap().len();
451
452 hive_idx_tracker.update(i, path_idx)?;
453
454 stack.push_back(path.clone());
455
456 while let Some(dir) = stack.pop_front() {
457 let mut paths = std::fs::read_dir(dir)
458 .map_err(PolarsError::from)?
459 .map(|x| x.map(|x| x.path()))
460 .collect::<std::io::Result<Vec<_>>>()
461 .map_err(PolarsError::from)?;
462 paths.sort_unstable();
463
464 for path in paths {
465 if path.is_dir() {
466 stack.push_back(path);
467 } else if path.metadata()?.len() > 0 {
468 out_paths.push(path);
469 }
470 }
471 }
472
473 continue;
474 }
475
476 let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());
477
478 if glob && i.is_some() {
479 hive_idx_tracker.update(0, path_idx)?;
480
481 let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
482 polars_bail!(ComputeError: "invalid glob pattern given")
483 };
484
485 for path in paths {
486 let path = path.map_err(to_compute_err)?;
487 if !path.is_dir() && path.metadata()?.len() > 0 {
488 out_paths.push(path);
489 }
490 }
491 } else {
492 hive_idx_tracker.update(0, path_idx)?;
493 out_paths.push(path.clone());
494 }
495 }
496 }
497
498 assert_eq!(out_paths.current_idx, out_paths.paths.len());
499
500 if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
501 if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
502 polars_bail!(
503 InvalidOperation: r#"directory contained paths with different file extensions: \
504 first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
505 which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#,
506 &out_paths.paths[i1].to_string_lossy(), &out_paths.paths[i2].to_string_lossy()
507 )
508 }
509 }
510
511 Ok((out_paths.paths.into(), hive_idx_tracker.idx))
512}
513
514#[cfg(feature = "file_cache")]
516pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
517 let result = std::fs::create_dir_all(path);
518
519 if path.is_dir() { Ok(()) } else { result }
520}
521
522#[cfg(test)]
523mod tests {
524 use std::path::PathBuf;
525
526 use super::resolve_homedir;
527
528 #[cfg(not(target_os = "windows"))]
529 #[test]
530 fn test_resolve_homedir() {
531 let paths: Vec<PathBuf> = vec![
532 "~/dir1/dir2/test.csv".into(),
533 "/abs/path/test.csv".into(),
534 "rel/path/test.csv".into(),
535 "/".into(),
536 "~".into(),
537 ];
538
539 let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
540
541 assert_eq!(resolved[0].file_name(), paths[0].file_name());
542 assert!(resolved[0].is_absolute());
543 assert_eq!(resolved[1], paths[1]);
544 assert_eq!(resolved[2], paths[2]);
545 assert_eq!(resolved[3], paths[3]);
546 assert!(resolved[4].is_absolute());
547 }
548
549 #[cfg(target_os = "windows")]
550 #[test]
551 fn test_resolve_homedir_windows() {
552 let paths: Vec<PathBuf> = vec![
553 r#"c:\Users\user1\test.csv"#.into(),
554 r#"~\user1\test.csv"#.into(),
555 "~".into(),
556 ];
557
558 let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
559
560 assert_eq!(resolved[0], paths[0]);
561 assert_eq!(resolved[1].file_name(), paths[1].file_name());
562 assert!(resolved[1].is_absolute());
563 assert!(resolved[2].is_absolute());
564 }
565
566 #[test]
567 fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
568 use std::path::PathBuf;
571
572 use super::expand_paths;
573
574 let path = "https://pola.rs/test.csv?token=bear";
575 let paths = &[PathBuf::from(path)];
576 let out = expand_paths(paths, true, None).unwrap();
577 assert_eq!(out.as_ref(), paths);
578 }
579}