1use std::collections::VecDeque;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, LazyLock};
4
5use polars_core::config;
6use polars_core::error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::pl_str::PlSmallStr;
8use regex::Regex;
9
10#[cfg(feature = "cloud")]
11mod hugging_face;
12
13use crate::cloud::CloudOptions;
14
15pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
16 (|| {
17 let verbose = config::verbose();
18
19 let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
20 if verbose {
21 eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
22 }
23 v
24 } else if cfg!(target_family = "unix") {
25 let id = std::env::var("USER")
26 .inspect(|_| {
27 if verbose {
28 eprintln!("init_temp_dir: sourced $USER")
29 }
30 })
31 .or_else(|_e| {
32 #[cfg(feature = "file_cache")]
35 {
36 std::env::var("HOME")
37 .inspect(|_| {
38 if verbose {
39 eprintln!("init_temp_dir: sourced $HOME")
40 }
41 })
42 .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
43 }
44 #[cfg(not(feature = "file_cache"))]
45 {
46 Err(_e)
47 }
48 });
49
50 if let Ok(v) = id {
51 std::env::temp_dir().join(format!("polars-{}/", v))
52 } else {
53 return Err(std::io::Error::other(
54 "could not load $USER or $HOME environment variables",
55 ));
56 }
57 } else if cfg!(target_family = "windows") {
58 std::env::temp_dir().join("polars/")
62 } else {
63 std::env::temp_dir().join("polars/")
64 }
65 .into_boxed_path();
66
67 if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
68 if !path.is_dir() {
69 panic!(
70 "failed to create temporary directory: {} (path = {:?})",
71 err,
72 path.as_ref()
73 );
74 }
75 }
76
77 #[cfg(target_family = "unix")]
78 {
79 use std::os::unix::fs::PermissionsExt;
80
81 let result = (|| {
82 std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
83 let perms = std::fs::metadata(path.as_ref())?.permissions();
84
85 if (perms.mode() % 0o1000) != 0o700 {
86 std::io::Result::Err(std::io::Error::other(format!(
87 "permission mismatch: {:?}",
88 perms
89 )))
90 } else {
91 std::io::Result::Ok(())
92 }
93 })()
94 .map_err(|e| {
95 std::io::Error::new(
96 e.kind(),
97 format!(
98 "error setting temporary directory permissions: {} (path = {:?})",
99 e,
100 path.as_ref()
101 ),
102 )
103 });
104
105 if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
106 result?;
107 }
108 }
109
110 std::io::Result::Ok(path)
111 })()
112 .map_err(|e| {
113 std::io::Error::new(
114 e.kind(),
115 format!(
116 "error initializing temporary directory: {} \
117 consider explicitly setting POLARS_TEMP_DIR",
118 e
119 ),
120 )
121 })
122 .unwrap()
123});
124
125pub fn resolve_homedir(path: &dyn AsRef<Path>) -> PathBuf {
127 let path = path.as_ref();
128
129 if path.starts_with("~") {
130 #[cfg(not(target_family = "wasm"))]
132 if let Some(homedir) = home::home_dir() {
133 return homedir.join(path.strip_prefix("~").unwrap());
134 }
135 }
136
137 path.into()
138}
139
140static CLOUD_URL: LazyLock<Regex> =
141 LazyLock::new(|| Regex::new(r"^(s3a?|gs|gcs|file|abfss?|azure|az|adl|https?|hf)://").unwrap());
142
143pub fn is_cloud_url<P: AsRef<Path>>(p: P) -> bool {
145 match p.as_ref().as_os_str().to_str() {
146 Some(s) => CLOUD_URL.is_match(s),
147 _ => false,
148 }
149}
150
151pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
153 memchr::memchr3(b'*', b'?', b'[', path)
154}
155
156pub fn expanded_from_single_directory<P: AsRef<std::path::Path>>(
158 paths: &[P],
159 expanded_paths: &[P],
160) -> bool {
161 paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().to_str().unwrap().as_bytes()).is_none()
163 && {
165 (
166 !is_cloud_url(paths[0].as_ref()) && paths[0].as_ref().is_dir()
168 )
169 || (
170 expanded_paths.is_empty() || (paths[0].as_ref() != expanded_paths[0].as_ref())
173 )
174 }
175}
176
177pub fn expand_paths(
179 paths: &[PathBuf],
180 glob: bool,
181 #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
182) -> PolarsResult<Arc<[PathBuf]>> {
183 expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
184}
185
186struct HiveIdxTracker<'a> {
187 idx: usize,
188 paths: &'a [PathBuf],
189 check_directory_level: bool,
190}
191
192impl HiveIdxTracker<'_> {
193 fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
194 let check_directory_level = self.check_directory_level;
195 let paths = self.paths;
196
197 if check_directory_level
198 && ![usize::MAX, i].contains(&self.idx)
199 && (paths[path_idx].parent() != paths[path_idx - 1].parent())
201 {
202 polars_bail!(
203 InvalidOperation:
204 "attempted to read from different directory levels with hive partitioning enabled: first path: {}, second path: {}",
205 paths[path_idx - 1].to_str().unwrap(),
206 paths[path_idx].to_str().unwrap(),
207 )
208 } else {
209 self.idx = std::cmp::min(self.idx, i);
210 Ok(())
211 }
212 }
213}
214
215pub fn expand_paths_hive(
219 paths: &[PathBuf],
220 glob: bool,
221 #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
222 check_directory_level: bool,
223) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
224 let Some(first_path) = paths.first() else {
225 return Ok((vec![].into(), 0));
226 };
227
228 let is_cloud = is_cloud_url(first_path);
229
230 struct OutPaths {
233 paths: Vec<PathBuf>,
234 exts: [Option<(PlSmallStr, usize)>; 2],
235 current_idx: usize,
236 }
237
238 impl OutPaths {
239 fn update_ext_status(
240 current_idx: &mut usize,
241 exts: &mut [Option<(PlSmallStr, usize)>; 2],
242 value: &Path,
243 ) {
244 let ext = value
245 .extension()
246 .map(|x| PlSmallStr::from(x.to_str().unwrap()))
247 .unwrap_or(PlSmallStr::EMPTY);
248
249 if exts[0].is_none() {
250 exts[0] = Some((ext, *current_idx));
251 } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
252 exts[1] = Some((ext, *current_idx));
253 }
254
255 *current_idx += 1;
256 }
257
258 fn push(&mut self, value: PathBuf) {
259 {
260 let current_idx = &mut self.current_idx;
261 let exts = &mut self.exts;
262 Self::update_ext_status(current_idx, exts, &value);
263 }
264 self.paths.push(value)
265 }
266
267 fn extend(&mut self, values: impl IntoIterator<Item = PathBuf>) {
268 let current_idx = &mut self.current_idx;
269 let exts = &mut self.exts;
270
271 self.paths.extend(values.into_iter().inspect(|x| {
272 Self::update_ext_status(current_idx, exts, x);
273 }))
274 }
275
276 fn extend_from_slice(&mut self, values: &[PathBuf]) {
277 self.extend(values.iter().cloned())
278 }
279 }
280
281 let mut out_paths = OutPaths {
282 paths: vec![],
283 exts: [None, None],
284 current_idx: 0,
285 };
286
287 let mut hive_idx_tracker = HiveIdxTracker {
288 idx: usize::MAX,
289 paths,
290 check_directory_level,
291 };
292
293 if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
294 #[cfg(feature = "cloud")]
295 {
296 use polars_utils::_limit_path_len_io_err;
297
298 use crate::cloud::object_path_from_str;
299
300 if first_path.starts_with("hf://") {
301 let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
302 hugging_face::expand_paths_hf(
303 paths,
304 check_directory_level,
305 cloud_options,
306 glob,
307 ),
308 )?;
309
310 return Ok((Arc::from(paths), expand_start_idx));
311 }
312
313 let format_path = |scheme: &str, bucket: &str, location: &str| {
314 if is_cloud {
315 format!("{}://{}/{}", scheme, bucket, location)
316 } else {
317 format!("/{}", location)
318 }
319 };
320
321 let expand_path_cloud = |path: &str,
322 cloud_options: Option<&CloudOptions>|
323 -> PolarsResult<(usize, Vec<PathBuf>)> {
324 crate::pl_async::get_runtime().block_in_place_on(async {
325 let (cloud_location, store) =
326 crate::cloud::build_object_store(path, cloud_options, glob).await?;
327 let prefix = object_path_from_str(&cloud_location.prefix)?;
328
329 let out = if !path.ends_with("/")
330 && (!glob || cloud_location.expansion.is_none())
331 && {
332 is_cloud || PathBuf::from(path).is_file()
337 } {
338 (
339 0,
340 vec![PathBuf::from(format_path(
341 &cloud_location.scheme,
342 &cloud_location.bucket,
343 prefix.as_ref(),
344 ))],
345 )
346 } else {
347 use futures::TryStreamExt;
348
349 if !is_cloud {
350 let path = PathBuf::from(path);
355 if !path.is_dir() {
356 path.metadata()
357 .map_err(|err| _limit_path_len_io_err(&path, err))?;
358 }
359 }
360
361 let cloud_location = &cloud_location;
362
363 let mut paths = store
364 .try_exec_rebuild_on_err(|store| {
365 let st = store.clone();
366
367 async {
368 let store = st;
369 store
370 .list(Some(&prefix))
371 .try_filter_map(|x| async move {
372 let out = (x.size > 0).then(|| {
373 PathBuf::from({
374 format_path(
375 &cloud_location.scheme,
376 &cloud_location.bucket,
377 x.location.as_ref(),
378 )
379 })
380 });
381 Ok(out)
382 })
383 .try_collect::<Vec<_>>()
384 .await
385 .map_err(to_compute_err)
386 }
387 })
388 .await?;
389
390 paths.sort_unstable();
391 (
392 format_path(
393 &cloud_location.scheme,
394 &cloud_location.bucket,
395 &cloud_location.prefix,
396 )
397 .len(),
398 paths,
399 )
400 };
401
402 PolarsResult::Ok(out)
403 })
404 };
405
406 for (path_idx, path) in paths.iter().enumerate() {
407 if path.to_str().unwrap().starts_with("http") {
408 out_paths.push(path.clone());
409 hive_idx_tracker.update(0, path_idx)?;
410 continue;
411 }
412
413 let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes());
414
415 let path = if glob && glob_start_idx.is_some() {
416 path.clone()
417 } else {
418 let (expand_start_idx, paths) =
419 expand_path_cloud(path.to_str().unwrap(), cloud_options)?;
420 out_paths.extend_from_slice(&paths);
421 hive_idx_tracker.update(expand_start_idx, path_idx)?;
422 continue;
423 };
424
425 hive_idx_tracker.update(0, path_idx)?;
426
427 let iter = crate::pl_async::get_runtime()
428 .block_in_place_on(crate::async_glob(path.to_str().unwrap(), cloud_options))?;
429
430 if is_cloud {
431 out_paths.extend(iter.into_iter().map(PathBuf::from));
432 } else {
433 out_paths.extend(iter.iter().map(|x| &x[7..]).map(PathBuf::from))
435 }
436 }
437 }
438 #[cfg(not(feature = "cloud"))]
439 panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
440 } else {
441 let mut stack = VecDeque::new();
442
443 for path_idx in 0..paths.len() {
444 let path = &paths[path_idx];
445 stack.clear();
446
447 if path.is_dir() {
448 let i = path.to_str().unwrap().len();
449
450 hive_idx_tracker.update(i, path_idx)?;
451
452 stack.push_back(path.clone());
453
454 while let Some(dir) = stack.pop_front() {
455 let mut paths = std::fs::read_dir(dir)
456 .map_err(PolarsError::from)?
457 .map(|x| x.map(|x| x.path()))
458 .collect::<std::io::Result<Vec<_>>>()
459 .map_err(PolarsError::from)?;
460 paths.sort_unstable();
461
462 for path in paths {
463 if path.is_dir() {
464 stack.push_back(path);
465 } else if path.metadata()?.len() > 0 {
466 out_paths.push(path);
467 }
468 }
469 }
470
471 continue;
472 }
473
474 let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());
475
476 if glob && i.is_some() {
477 hive_idx_tracker.update(0, path_idx)?;
478
479 let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
480 polars_bail!(ComputeError: "invalid glob pattern given")
481 };
482
483 for path in paths {
484 let path = path.map_err(to_compute_err)?;
485 if !path.is_dir() && path.metadata()?.len() > 0 {
486 out_paths.push(path);
487 }
488 }
489 } else {
490 hive_idx_tracker.update(0, path_idx)?;
491 out_paths.push(path.clone());
492 }
493 }
494 }
495
496 assert_eq!(out_paths.current_idx, out_paths.paths.len());
497
498 if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
499 if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
500 polars_bail!(
501 InvalidOperation: r#"directory contained paths with different file extensions: \
502 first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
503 which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#,
504 &out_paths.paths[i1].to_string_lossy(), &out_paths.paths[i2].to_string_lossy()
505 )
506 }
507 }
508
509 Ok((out_paths.paths.into(), hive_idx_tracker.idx))
510}
511
512#[cfg(feature = "file_cache")]
514pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
515 let result = std::fs::create_dir_all(path);
516
517 if path.is_dir() { Ok(()) } else { result }
518}
519
520#[cfg(test)]
521mod tests {
522 use std::path::PathBuf;
523
524 use super::resolve_homedir;
525
526 #[cfg(not(target_os = "windows"))]
527 #[test]
528 fn test_resolve_homedir() {
529 let paths: Vec<PathBuf> = vec![
530 "~/dir1/dir2/test.csv".into(),
531 "/abs/path/test.csv".into(),
532 "rel/path/test.csv".into(),
533 "/".into(),
534 "~".into(),
535 ];
536
537 let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
538
539 assert_eq!(resolved[0].file_name(), paths[0].file_name());
540 assert!(resolved[0].is_absolute());
541 assert_eq!(resolved[1], paths[1]);
542 assert_eq!(resolved[2], paths[2]);
543 assert_eq!(resolved[3], paths[3]);
544 assert!(resolved[4].is_absolute());
545 }
546
547 #[cfg(target_os = "windows")]
548 #[test]
549 fn test_resolve_homedir_windows() {
550 let paths: Vec<PathBuf> = vec![
551 r#"c:\Users\user1\test.csv"#.into(),
552 r#"~\user1\test.csv"#.into(),
553 "~".into(),
554 ];
555
556 let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
557
558 assert_eq!(resolved[0], paths[0]);
559 assert_eq!(resolved[1].file_name(), paths[1].file_name());
560 assert!(resolved[1].is_absolute());
561 assert!(resolved[2].is_absolute());
562 }
563
564 #[test]
565 fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
566 use std::path::PathBuf;
569
570 use super::expand_paths;
571
572 let path = "https://pola.rs/test.csv?token=bear";
573 let paths = &[PathBuf::from(path)];
574 let out = expand_paths(paths, true, None).unwrap();
575 assert_eq!(out.as_ref(), paths);
576 }
577}