polars_io/path_utils/
mod.rs

1use std::collections::VecDeque;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, LazyLock};
4
5use polars_core::config;
6use polars_core::error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::pl_str::PlSmallStr;
8use regex::Regex;
9
10#[cfg(feature = "cloud")]
11mod hugging_face;
12
13use crate::cloud::CloudOptions;
14
15pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
16    (|| {
17        let verbose = config::verbose();
18
19        let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
20            if verbose {
21                eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
22            }
23            v
24        } else if cfg!(target_family = "unix") {
25            let id = std::env::var("USER")
26                .inspect(|_| {
27                    if verbose {
28                        eprintln!("init_temp_dir: sourced $USER")
29                    }
30                })
31                .or_else(|_e| {
32                    // We shouldn't hit here, but we can fallback to hashing $HOME if blake3 is
33                    // available (it is available when file_cache is activated).
34                    #[cfg(feature = "file_cache")]
35                    {
36                        std::env::var("HOME")
37                            .inspect(|_| {
38                                if verbose {
39                                    eprintln!("init_temp_dir: sourced $HOME")
40                                }
41                            })
42                            .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
43                    }
44                    #[cfg(not(feature = "file_cache"))]
45                    {
46                        Err(_e)
47                    }
48                });
49
50            if let Ok(v) = id {
51                std::env::temp_dir().join(format!("polars-{}/", v))
52            } else {
53                return Err(std::io::Error::other(
54                    "could not load $USER or $HOME environment variables",
55                ));
56            }
57        } else if cfg!(target_family = "windows") {
58            // Setting permissions on Windows is not as easy compared to Unix, but fortunately
59            // the default temporary directory location is underneath the user profile, so we
60            // shouldn't need to do anything.
61            std::env::temp_dir().join("polars/")
62        } else {
63            std::env::temp_dir().join("polars/")
64        }
65        .into_boxed_path();
66
67        if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
68            if !path.is_dir() {
69                panic!(
70                    "failed to create temporary directory: {} (path = {:?})",
71                    err,
72                    path.as_ref()
73                );
74            }
75        }
76
77        #[cfg(target_family = "unix")]
78        {
79            use std::os::unix::fs::PermissionsExt;
80
81            let result = (|| {
82                std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
83                let perms = std::fs::metadata(path.as_ref())?.permissions();
84
85                if (perms.mode() % 0o1000) != 0o700 {
86                    std::io::Result::Err(std::io::Error::other(format!(
87                        "permission mismatch: {:?}",
88                        perms
89                    )))
90                } else {
91                    std::io::Result::Ok(())
92                }
93            })()
94            .map_err(|e| {
95                std::io::Error::new(
96                    e.kind(),
97                    format!(
98                        "error setting temporary directory permissions: {} (path = {:?})",
99                        e,
100                        path.as_ref()
101                    ),
102                )
103            });
104
105            if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
106                result?;
107            }
108        }
109
110        std::io::Result::Ok(path)
111    })()
112    .map_err(|e| {
113        std::io::Error::new(
114            e.kind(),
115            format!(
116                "error initializing temporary directory: {} \
117                 consider explicitly setting POLARS_TEMP_DIR",
118                e
119            ),
120        )
121    })
122    .unwrap()
123});
124
125/// Replaces a "~" in the Path with the home directory.
126pub fn resolve_homedir(path: &dyn AsRef<Path>) -> PathBuf {
127    let path = path.as_ref();
128
129    if path.starts_with("~") {
130        // home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297
131        #[cfg(not(target_family = "wasm"))]
132        if let Some(homedir) = home::home_dir() {
133            return homedir.join(path.strip_prefix("~").unwrap());
134        }
135    }
136
137    path.into()
138}
139
140static CLOUD_URL: LazyLock<Regex> =
141    LazyLock::new(|| Regex::new(r"^(s3a?|gs|gcs|file|abfss?|azure|az|adl|https?|hf)://").unwrap());
142
143/// Check if the path is a cloud url.
144pub fn is_cloud_url<P: AsRef<Path>>(p: P) -> bool {
145    match p.as_ref().as_os_str().to_str() {
146        Some(s) => CLOUD_URL.is_match(s),
147        _ => false,
148    }
149}
150
151/// Get the index of the first occurrence of a glob symbol.
152pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
153    memchr::memchr3(b'*', b'?', b'[', path)
154}
155
156/// Returns `true` if `expanded_paths` were expanded from a single directory
157pub fn expanded_from_single_directory<P: AsRef<std::path::Path>>(
158    paths: &[P],
159    expanded_paths: &[P],
160) -> bool {
161    // Single input that isn't a glob
162    paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().to_str().unwrap().as_bytes()).is_none()
163    // And isn't a file
164    && {
165        (
166            // For local paths, we can just use `is_dir`
167            !is_cloud_url(paths[0].as_ref()) && paths[0].as_ref().is_dir()
168        )
169        || (
170            // For cloud paths, we determine that the input path isn't a file by checking that the
171            // output path differs.
172            expanded_paths.is_empty() || (paths[0].as_ref() != expanded_paths[0].as_ref())
173        )
174    }
175}
176
177/// Recursively traverses directories and expands globs if `glob` is `true`.
178pub fn expand_paths(
179    paths: &[PathBuf],
180    glob: bool,
181    #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
182) -> PolarsResult<Arc<[PathBuf]>> {
183    expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
184}
185
186struct HiveIdxTracker<'a> {
187    idx: usize,
188    paths: &'a [PathBuf],
189    check_directory_level: bool,
190}
191
192impl HiveIdxTracker<'_> {
193    fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
194        let check_directory_level = self.check_directory_level;
195        let paths = self.paths;
196
197        if check_directory_level
198            && ![usize::MAX, i].contains(&self.idx)
199            // They could still be the same directory level, just with different name length
200            && (paths[path_idx].parent() != paths[path_idx - 1].parent())
201        {
202            polars_bail!(
203                InvalidOperation:
204                "attempted to read from different directory levels with hive partitioning enabled: first path: {}, second path: {}",
205                paths[path_idx - 1].to_str().unwrap(),
206                paths[path_idx].to_str().unwrap(),
207            )
208        } else {
209            self.idx = std::cmp::min(self.idx, i);
210            Ok(())
211        }
212    }
213}
214
215/// Recursively traverses directories and expands globs if `glob` is `true`.
216/// Returns the expanded paths and the index at which to start parsing hive
217/// partitions from the path.
218pub fn expand_paths_hive(
219    paths: &[PathBuf],
220    glob: bool,
221    #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
222    check_directory_level: bool,
223) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
224    let Some(first_path) = paths.first() else {
225        return Ok((vec![].into(), 0));
226    };
227
228    let is_cloud = is_cloud_url(first_path);
229
230    /// Wrapper around `Vec<PathBuf>` that also tracks file extensions, so that
231    /// we don't have to traverse the entire list again to validate extensions.
232    struct OutPaths {
233        paths: Vec<PathBuf>,
234        exts: [Option<(PlSmallStr, usize)>; 2],
235        current_idx: usize,
236    }
237
238    impl OutPaths {
239        fn update_ext_status(
240            current_idx: &mut usize,
241            exts: &mut [Option<(PlSmallStr, usize)>; 2],
242            value: &Path,
243        ) {
244            let ext = value
245                .extension()
246                .map(|x| PlSmallStr::from(x.to_str().unwrap()))
247                .unwrap_or(PlSmallStr::EMPTY);
248
249            if exts[0].is_none() {
250                exts[0] = Some((ext, *current_idx));
251            } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
252                exts[1] = Some((ext, *current_idx));
253            }
254
255            *current_idx += 1;
256        }
257
258        fn push(&mut self, value: PathBuf) {
259            {
260                let current_idx = &mut self.current_idx;
261                let exts = &mut self.exts;
262                Self::update_ext_status(current_idx, exts, &value);
263            }
264            self.paths.push(value)
265        }
266
267        fn extend(&mut self, values: impl IntoIterator<Item = PathBuf>) {
268            let current_idx = &mut self.current_idx;
269            let exts = &mut self.exts;
270
271            self.paths.extend(values.into_iter().inspect(|x| {
272                Self::update_ext_status(current_idx, exts, x);
273            }))
274        }
275
276        fn extend_from_slice(&mut self, values: &[PathBuf]) {
277            self.extend(values.iter().cloned())
278        }
279    }
280
281    let mut out_paths = OutPaths {
282        paths: vec![],
283        exts: [None, None],
284        current_idx: 0,
285    };
286
287    let mut hive_idx_tracker = HiveIdxTracker {
288        idx: usize::MAX,
289        paths,
290        check_directory_level,
291    };
292
293    if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
294        #[cfg(feature = "cloud")]
295        {
296            use polars_utils::_limit_path_len_io_err;
297
298            use crate::cloud::object_path_from_str;
299
300            if first_path.starts_with("hf://") {
301                let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
302                    hugging_face::expand_paths_hf(
303                        paths,
304                        check_directory_level,
305                        cloud_options,
306                        glob,
307                    ),
308                )?;
309
310                return Ok((Arc::from(paths), expand_start_idx));
311            }
312
313            let format_path = |scheme: &str, bucket: &str, location: &str| {
314                if is_cloud {
315                    format!("{}://{}/{}", scheme, bucket, location)
316                } else {
317                    format!("/{}", location)
318                }
319            };
320
321            let expand_path_cloud = |path: &str,
322                                     cloud_options: Option<&CloudOptions>|
323             -> PolarsResult<(usize, Vec<PathBuf>)> {
324                crate::pl_async::get_runtime().block_in_place_on(async {
325                    let (cloud_location, store) =
326                        crate::cloud::build_object_store(path, cloud_options, glob).await?;
327                    let prefix = object_path_from_str(&cloud_location.prefix)?;
328
329                    let out = if !path.ends_with("/")
330                        && (!glob || cloud_location.expansion.is_none())
331                        && {
332                            // We need to check if it is a directory for local paths (we can be here due
333                            // to FORCE_ASYNC). For cloud paths the convention is that the user must add
334                            // a trailing slash `/` to scan directories. We don't infer it as that would
335                            // mean sending one network request per path serially (very slow).
336                            is_cloud || PathBuf::from(path).is_file()
337                        } {
338                        (
339                            0,
340                            vec![PathBuf::from(format_path(
341                                &cloud_location.scheme,
342                                &cloud_location.bucket,
343                                prefix.as_ref(),
344                            ))],
345                        )
346                    } else {
347                        use futures::TryStreamExt;
348
349                        if !is_cloud {
350                            // FORCE_ASYNC in the test suite wants us to raise a proper error message
351                            // for non-existent file paths. Note we can't do this for cloud paths as
352                            // there is no concept of a "directory" - a non-existent path is
353                            // indistinguishable from an empty directory.
354                            let path = PathBuf::from(path);
355                            if !path.is_dir() {
356                                path.metadata()
357                                    .map_err(|err| _limit_path_len_io_err(&path, err))?;
358                            }
359                        }
360
361                        let cloud_location = &cloud_location;
362
363                        let mut paths = store
364                            .try_exec_rebuild_on_err(|store| {
365                                let st = store.clone();
366
367                                async {
368                                    let store = st;
369                                    store
370                                        .list(Some(&prefix))
371                                        .try_filter_map(|x| async move {
372                                            let out = (x.size > 0).then(|| {
373                                                PathBuf::from({
374                                                    format_path(
375                                                        &cloud_location.scheme,
376                                                        &cloud_location.bucket,
377                                                        x.location.as_ref(),
378                                                    )
379                                                })
380                                            });
381                                            Ok(out)
382                                        })
383                                        .try_collect::<Vec<_>>()
384                                        .await
385                                        .map_err(to_compute_err)
386                                }
387                            })
388                            .await?;
389
390                        paths.sort_unstable();
391                        (
392                            format_path(
393                                &cloud_location.scheme,
394                                &cloud_location.bucket,
395                                &cloud_location.prefix,
396                            )
397                            .len(),
398                            paths,
399                        )
400                    };
401
402                    PolarsResult::Ok(out)
403                })
404            };
405
406            for (path_idx, path) in paths.iter().enumerate() {
407                if path.to_str().unwrap().starts_with("http") {
408                    out_paths.push(path.clone());
409                    hive_idx_tracker.update(0, path_idx)?;
410                    continue;
411                }
412
413                let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes());
414
415                let path = if glob && glob_start_idx.is_some() {
416                    path.clone()
417                } else {
418                    let (expand_start_idx, paths) =
419                        expand_path_cloud(path.to_str().unwrap(), cloud_options)?;
420                    out_paths.extend_from_slice(&paths);
421                    hive_idx_tracker.update(expand_start_idx, path_idx)?;
422                    continue;
423                };
424
425                hive_idx_tracker.update(0, path_idx)?;
426
427                let iter = crate::pl_async::get_runtime()
428                    .block_in_place_on(crate::async_glob(path.to_str().unwrap(), cloud_options))?;
429
430                if is_cloud {
431                    out_paths.extend(iter.into_iter().map(PathBuf::from));
432                } else {
433                    // FORCE_ASYNC, remove leading file:// as not all readers support it.
434                    out_paths.extend(iter.iter().map(|x| &x[7..]).map(PathBuf::from))
435                }
436            }
437        }
438        #[cfg(not(feature = "cloud"))]
439        panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
440    } else {
441        let mut stack = VecDeque::new();
442
443        for path_idx in 0..paths.len() {
444            let path = &paths[path_idx];
445            stack.clear();
446
447            if path.is_dir() {
448                let i = path.to_str().unwrap().len();
449
450                hive_idx_tracker.update(i, path_idx)?;
451
452                stack.push_back(path.clone());
453
454                while let Some(dir) = stack.pop_front() {
455                    let mut paths = std::fs::read_dir(dir)
456                        .map_err(PolarsError::from)?
457                        .map(|x| x.map(|x| x.path()))
458                        .collect::<std::io::Result<Vec<_>>>()
459                        .map_err(PolarsError::from)?;
460                    paths.sort_unstable();
461
462                    for path in paths {
463                        if path.is_dir() {
464                            stack.push_back(path);
465                        } else if path.metadata()?.len() > 0 {
466                            out_paths.push(path);
467                        }
468                    }
469                }
470
471                continue;
472            }
473
474            let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());
475
476            if glob && i.is_some() {
477                hive_idx_tracker.update(0, path_idx)?;
478
479                let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
480                    polars_bail!(ComputeError: "invalid glob pattern given")
481                };
482
483                for path in paths {
484                    let path = path.map_err(to_compute_err)?;
485                    if !path.is_dir() && path.metadata()?.len() > 0 {
486                        out_paths.push(path);
487                    }
488                }
489            } else {
490                hive_idx_tracker.update(0, path_idx)?;
491                out_paths.push(path.clone());
492            }
493        }
494    }
495
496    assert_eq!(out_paths.current_idx, out_paths.paths.len());
497
498    if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
499        if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
500            polars_bail!(
501                InvalidOperation: r#"directory contained paths with different file extensions: \
502                first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
503                which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#,
504                &out_paths.paths[i1].to_string_lossy(), &out_paths.paths[i2].to_string_lossy()
505            )
506        }
507    }
508
509    Ok((out_paths.paths.into(), hive_idx_tracker.idx))
510}
511
512/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
513#[cfg(feature = "file_cache")]
514pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
515    let result = std::fs::create_dir_all(path);
516
517    if path.is_dir() { Ok(()) } else { result }
518}
519
520#[cfg(test)]
521mod tests {
522    use std::path::PathBuf;
523
524    use super::resolve_homedir;
525
526    #[cfg(not(target_os = "windows"))]
527    #[test]
528    fn test_resolve_homedir() {
529        let paths: Vec<PathBuf> = vec![
530            "~/dir1/dir2/test.csv".into(),
531            "/abs/path/test.csv".into(),
532            "rel/path/test.csv".into(),
533            "/".into(),
534            "~".into(),
535        ];
536
537        let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
538
539        assert_eq!(resolved[0].file_name(), paths[0].file_name());
540        assert!(resolved[0].is_absolute());
541        assert_eq!(resolved[1], paths[1]);
542        assert_eq!(resolved[2], paths[2]);
543        assert_eq!(resolved[3], paths[3]);
544        assert!(resolved[4].is_absolute());
545    }
546
547    #[cfg(target_os = "windows")]
548    #[test]
549    fn test_resolve_homedir_windows() {
550        let paths: Vec<PathBuf> = vec![
551            r#"c:\Users\user1\test.csv"#.into(),
552            r#"~\user1\test.csv"#.into(),
553            "~".into(),
554        ];
555
556        let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
557
558        assert_eq!(resolved[0], paths[0]);
559        assert_eq!(resolved[1].file_name(), paths[1].file_name());
560        assert!(resolved[1].is_absolute());
561        assert!(resolved[2].is_absolute());
562    }
563
564    #[test]
565    fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
566        // Don't confuse HTTP URL's with query parameters for globs.
567        // See https://github.com/pola-rs/polars/pull/17774
568        use std::path::PathBuf;
569
570        use super::expand_paths;
571
572        let path = "https://pola.rs/test.csv?token=bear";
573        let paths = &[PathBuf::from(path)];
574        let out = expand_paths(paths, true, None).unwrap();
575        assert_eq!(out.as_ref(), paths);
576    }
577}