Skip to main content

polars_io/path_utils/
mod.rs

1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::path::{Path, PathBuf};
4use std::sync::LazyLock;
5
6use polars_buffer::Buffer;
7use polars_core::config;
8use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
9use polars_utils::pl_path::{CloudScheme, PlRefPath};
10use polars_utils::pl_str::PlSmallStr;
11
12#[cfg(feature = "cloud")]
13mod hugging_face;
14
15use crate::cloud::CloudOptions;
16
17#[allow(clippy::bind_instead_of_map)]
18pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
19    (|| {
20        let verbose = config::verbose();
21
22        let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
23            if verbose {
24                eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
25            }
26            v
27        } else if cfg!(target_family = "unix") {
28            let id = std::env::var("USER")
29                .inspect(|_| {
30                    if verbose {
31                        eprintln!("init_temp_dir: sourced $USER")
32                    }
33                })
34                .or_else(|_e| {
35                    // We shouldn't hit here, but we can fallback to hashing $HOME if blake3 is
36                    // available (it is available when file_cache is activated).
37                    #[cfg(feature = "file_cache")]
38                    {
39                        std::env::var("HOME")
40                            .inspect(|_| {
41                                if verbose {
42                                    eprintln!("init_temp_dir: sourced $HOME")
43                                }
44                            })
45                            .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
46                    }
47                    #[cfg(not(feature = "file_cache"))]
48                    {
49                        Err(_e)
50                    }
51                });
52
53            if let Ok(v) = id {
54                std::env::temp_dir().join(format!("polars-{v}/"))
55            } else {
56                return Err(std::io::Error::other(
57                    "could not load $USER or $HOME environment variables",
58                ));
59            }
60        } else if cfg!(target_family = "windows") {
61            // Setting permissions on Windows is not as easy compared to Unix, but fortunately
62            // the default temporary directory location is underneath the user profile, so we
63            // shouldn't need to do anything.
64            std::env::temp_dir().join("polars/")
65        } else {
66            std::env::temp_dir().join("polars/")
67        }
68        .into_boxed_path();
69
70        let perm_result = create_dir_owner_only(path.as_ref());
71
72        if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
73            perm_result?;
74        }
75
76        std::io::Result::Ok(path)
77    })()
78    .map_err(|e| {
79        std::io::Error::new(
80            e.kind(),
81            format!(
82                "error initializing temporary directory: {e} \
83                 consider explicitly setting POLARS_TEMP_DIR"
84            ),
85        )
86    })
87    .unwrap()
88});
89
90/// Create a directory (and parents) with owner-only permissions (0o700) on Unix.
91pub fn create_dir_owner_only(path: &Path) -> std::io::Result<()> {
92    std::fs::create_dir_all(path)?;
93
94    #[cfg(target_family = "unix")]
95    {
96        use std::os::unix::fs::PermissionsExt;
97
98        std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o700))?;
99        let perms = std::fs::metadata(path)?.permissions();
100
101        if (perms.mode() % 0o1000) != 0o700 {
102            return Err(std::io::Error::other(format!(
103                "error setting directory permissions: permission mismatch: {perms:?} (path = {path:?})"
104            )));
105        }
106    }
107
108    Ok(())
109}
110
111/// Replaces a "~" in the Path with the home directory.
112pub fn resolve_homedir<'a, S: AsRef<Path> + ?Sized>(path: &'a S) -> Cow<'a, Path> {
113    return inner(path.as_ref());
114
115    fn inner(path: &Path) -> Cow<'_, Path> {
116        if path.starts_with("~") {
117            // home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297
118            #[cfg(not(target_family = "wasm"))]
119            if let Some(homedir) = home::home_dir() {
120                return Cow::Owned(homedir.join(path.strip_prefix("~").unwrap()));
121            }
122        }
123
124        Cow::Borrowed(path)
125    }
126}
127
128fn has_glob(path: &[u8]) -> bool {
129    return get_glob_start_idx(path).is_some();
130
131    /// Get the index of the first occurrence of a glob symbol.
132    fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
133        memchr::memchr3(b'*', b'?', b'[', path)
134    }
135}
136
137/// Returns `true` if `expanded_paths` were expanded from a single directory
138pub fn expanded_from_single_directory(paths: &[PlRefPath], expanded_paths: &[PlRefPath]) -> bool {
139    // Single input that isn't a glob
140    paths.len() == 1 && !has_glob(paths[0].strip_scheme().as_bytes())
141    // And isn't a file
142    && {
143        (
144            // For local paths, we can just use `is_dir`
145            !paths[0].has_scheme() && paths[0].as_std_path().is_dir()
146        )
147        || (
148            // For cloud paths, we determine that the input path isn't a file by checking that the
149            // output path differs.
150            expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
151        )
152    }
153}
154
155/// Recursively traverses directories and expands globs if `glob` is `true`.
156pub async fn expand_paths(
157    paths: &[PlRefPath],
158    glob: bool,
159    hidden_file_prefix: &[PlSmallStr],
160    #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
161) -> PolarsResult<Buffer<PlRefPath>> {
162    expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false)
163        .await
164        .map(|x| x.0)
165}
166
167struct HiveIdxTracker<'a> {
168    idx: usize,
169    paths: &'a [PlRefPath],
170    check_directory_level: bool,
171}
172
173impl HiveIdxTracker<'_> {
174    fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
175        let check_directory_level = self.check_directory_level;
176        let paths = self.paths;
177
178        if check_directory_level
179            && ![usize::MAX, i].contains(&self.idx)
180            // They could still be the same directory level, just with different name length
181            && (path_idx > 0 && paths[path_idx].parent() != paths[path_idx - 1].parent())
182        {
183            polars_bail!(
184                InvalidOperation:
185                "attempted to read from different directory levels with hive partitioning enabled: \
186                first path: {}, second path: {}",
187                &paths[path_idx - 1],
188                &paths[path_idx],
189            )
190        } else {
191            self.idx = std::cmp::min(self.idx, i);
192            Ok(())
193        }
194    }
195}
196
197#[cfg(feature = "cloud")]
198async fn expand_path_cloud(
199    path: PlRefPath,
200    cloud_options: Option<&CloudOptions>,
201    glob: bool,
202    first_path_has_scheme: bool,
203) -> PolarsResult<(usize, Vec<PlRefPath>)> {
204    let format_path = |scheme: &str, bucket: &str, location: &str| {
205        if first_path_has_scheme {
206            format!("{scheme}://{bucket}/{location}")
207        } else {
208            format!("/{location}")
209        }
210    };
211
212    use polars_utils::_limit_path_len_io_err;
213
214    use crate::cloud::object_path_from_str;
215    let path_str = path.as_str();
216
217    let (cloud_location, store) =
218        crate::cloud::build_object_store(path.clone(), cloud_options, glob).await?;
219    let prefix = object_path_from_str(&cloud_location.prefix)?;
220
221    let out = if !path_str.ends_with("/") && (!glob || cloud_location.expansion.is_none()) && {
222        // We need to check if it is a directory for local paths (we can be here due
223        // to FORCE_ASYNC). For cloud paths the convention is that the user must add
224        // a trailing slash `/` to scan directories. We don't infer it as that would
225        // mean sending one network request per path serially (very slow).
226        path.has_scheme() || path.as_std_path().is_file()
227    } {
228        (
229            0,
230            vec![PlRefPath::new(format_path(
231                cloud_location.scheme,
232                &cloud_location.bucket,
233                prefix.as_ref(),
234            ))],
235        )
236    } else {
237        use futures::TryStreamExt;
238
239        if !path.has_scheme() {
240            // FORCE_ASYNC in the test suite wants us to raise a proper error message
241            // for non-existent file paths. Note we can't do this for cloud paths as
242            // there is no concept of a "directory" - a non-existent path is
243            // indistinguishable from an empty directory.
244            path.as_std_path()
245                .metadata()
246                .map_err(|err| _limit_path_len_io_err(path.as_std_path(), err))?;
247        }
248
249        let cloud_location = &cloud_location;
250        let prefix_ref = &prefix;
251
252        let mut paths = store
253            .exec_with_rebuild_retry_on_err(|s| async move {
254                let out = s
255                    .list(Some(prefix_ref))
256                    .try_filter_map(|x| async move {
257                        let out = (x.size > 0).then(|| {
258                            PlRefPath::new({
259                                format_path(
260                                    cloud_location.scheme,
261                                    &cloud_location.bucket,
262                                    x.location.as_ref(),
263                                )
264                            })
265                        });
266                        Ok(out)
267                    })
268                    .try_collect::<Vec<_>>()
269                    .await?;
270
271                Ok(out)
272            })
273            .await?;
274
275        // Since Path::parse() removes any trailing slash ('/'), we may need to restore it
276        // to calculate the right byte offset
277        let mut prefix = prefix.to_string();
278        if path_str.ends_with('/') && !prefix.ends_with('/') {
279            prefix.push('/')
280        };
281
282        paths.sort_unstable();
283
284        (
285            format_path(
286                cloud_location.scheme,
287                &cloud_location.bucket,
288                prefix.as_ref(),
289            )
290            .len(),
291            paths,
292        )
293    };
294
295    PolarsResult::Ok(out)
296}
297
298/// Recursively traverses directories and expands globs if `glob` is `true`.
299/// Returns the expanded paths and the index at which to start parsing hive
300/// partitions from the path.
301pub async fn expand_paths_hive(
302    paths: &[PlRefPath],
303    glob: bool,
304    hidden_file_prefix: &[PlSmallStr],
305    #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
306    check_directory_level: bool,
307) -> PolarsResult<(Buffer<PlRefPath>, usize)> {
308    let Some(first_path) = paths.first() else {
309        return Ok((vec![].into(), 0));
310    };
311
312    let first_path_has_scheme = first_path.has_scheme();
313
314    let is_hidden_file = move |path: &PlRefPath| {
315        path.file_name()
316            .and_then(|x| x.to_str())
317            .is_some_and(|file_name| {
318                hidden_file_prefix
319                    .iter()
320                    .any(|x| file_name.starts_with(x.as_str()))
321            })
322    };
323
324    let mut out_paths = OutPaths {
325        paths: vec![],
326        exts: [None, None],
327        is_hidden_file: &is_hidden_file,
328    };
329
330    let mut hive_idx_tracker = HiveIdxTracker {
331        idx: usize::MAX,
332        paths,
333        check_directory_level,
334    };
335
336    if first_path_has_scheme || {
337        cfg!(not(target_family = "windows")) && polars_config::config().force_async()
338    } {
339        #[cfg(feature = "cloud")]
340        {
341            if first_path.scheme() == Some(CloudScheme::Hf) {
342                let (expand_start_idx, paths) = hugging_face::expand_paths_hf(
343                    paths,
344                    check_directory_level,
345                    cloud_options,
346                    glob,
347                )
348                .await?;
349
350                return Ok((paths.into(), expand_start_idx));
351            }
352
353            for (path_idx, path) in paths.iter().enumerate() {
354                use std::borrow::Cow;
355
356                let mut path = Cow::Borrowed(path);
357
358                if matches!(path.scheme(), Some(CloudScheme::Http | CloudScheme::Https)) {
359                    let mut rewrite_aws = false;
360
361                    #[cfg(feature = "aws")]
362                    if let Some(p) = (|| {
363                        use crate::cloud::CloudConfig;
364
365                        // See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#virtual-hosted-style-access
366                        // Path format: https://bucket-name.s3.region-code.amazonaws.com/key-name
367                        let after_scheme = path.strip_scheme();
368
369                        let bucket_end = after_scheme.find(".s3.")?;
370                        let offset = bucket_end + 4;
371                        // Search after offset to prevent matching `.s3.amazonaws.com` (legacy global endpoint URL without region).
372                        let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
373
374                        // Do not convert if '?' (this can be query parameters for AWS presigned URLs).
375                        if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
376                            return None;
377                        }
378
379                        let bucket = &after_scheme[..bucket_end];
380                        let region = &after_scheme[bucket_end + 4..region_end];
381                        let key = &after_scheme[region_end + 15..];
382
383                        if let CloudConfig::Aws(configs) = cloud_options
384                            .get_or_insert_default()
385                            .config
386                            .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
387                        {
388                            use object_store::aws::AmazonS3ConfigKey;
389
390                            if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
391                                configs.push((AmazonS3ConfigKey::Region, region.into()))
392                            }
393                        }
394
395                        Some(format!("s3://{bucket}/{key}"))
396                    })() {
397                        path = Cow::Owned(PlRefPath::new(p));
398                        rewrite_aws = true;
399                    }
400
401                    if !rewrite_aws {
402                        out_paths.push(path.into_owned());
403                        hive_idx_tracker.update(0, path_idx)?;
404                        continue;
405                    }
406                }
407
408                let sort_start_idx = out_paths.paths.len();
409
410                if glob && has_glob(path.as_bytes()) {
411                    hive_idx_tracker.update(0, path_idx)?;
412
413                    let iter = crate::pl_async::get_runtime().block_in_place_on(
414                        crate::async_glob(path.into_owned(), cloud_options.as_ref()),
415                    )?;
416
417                    if first_path_has_scheme {
418                        out_paths.extend(iter.into_iter().map(PlRefPath::new))
419                    } else {
420                        // FORCE_ASYNC, remove leading file:// as the caller may not be expecting a
421                        // URI result.
422                        out_paths.extend(iter.iter().map(|x| &x[7..]).map(PlRefPath::new))
423                    };
424                } else {
425                    let (expand_start_idx, paths) = expand_path_cloud(
426                        path.into_owned(),
427                        cloud_options.as_ref(),
428                        glob,
429                        first_path_has_scheme,
430                    )
431                    .await?;
432                    out_paths.extend_from_slice(&paths);
433                    hive_idx_tracker.update(expand_start_idx, path_idx)?;
434                };
435
436                if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
437                    <[PlRefPath]>::sort_unstable(mut_slice);
438                }
439            }
440        }
441        #[cfg(not(feature = "cloud"))]
442        panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
443    } else {
444        let mut stack: VecDeque<Cow<'_, Path>> = VecDeque::new();
445        let mut paths_scratch: Vec<PathBuf> = vec![];
446
447        for (path_idx, path) in paths.iter().enumerate() {
448            stack.clear();
449            let sort_start_idx = out_paths.paths.len();
450
451            if path.as_std_path().is_dir() {
452                let i = path.as_str().len();
453
454                hive_idx_tracker.update(i, path_idx)?;
455
456                stack.push_back(Cow::Borrowed(path.as_std_path()));
457
458                while let Some(dir) = stack.pop_front() {
459                    let mut last_err = Ok(());
460
461                    paths_scratch.clear();
462                    paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| {
463                        match x.map(|x| x.path()) {
464                            Ok(v) => Some(v),
465                            Err(e) => {
466                                last_err = Err(e);
467                                None
468                            },
469                        }
470                    }));
471
472                    last_err?;
473
474                    for path in paths_scratch.drain(..) {
475                        let md = path.metadata()?;
476
477                        if md.is_dir() {
478                            stack.push_back(Cow::Owned(path));
479                        } else if md.len() > 0 {
480                            out_paths.push(PlRefPath::try_from_path(&path)?);
481                        }
482                    }
483                }
484            } else if glob && has_glob(path.as_bytes()) {
485                hive_idx_tracker.update(0, path_idx)?;
486
487                let Ok(paths) = glob::glob(path.as_str()) else {
488                    polars_bail!(ComputeError: "invalid glob pattern given")
489                };
490
491                for path in paths {
492                    let path = path.map_err(to_compute_err)?;
493                    let md = path.metadata()?;
494                    if !md.is_dir() && md.len() > 0 {
495                        out_paths.push(PlRefPath::try_from_path(&path)?);
496                    }
497                }
498            } else {
499                hive_idx_tracker.update(0, path_idx)?;
500                out_paths.push(path.clone());
501            };
502
503            if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
504                <[PlRefPath]>::sort_unstable(mut_slice);
505            }
506        }
507    }
508
509    if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
510        if let [Some((_, p1)), Some((_, p2))] = out_paths.exts {
511            polars_bail!(
512                InvalidOperation: "directory contained paths with different file extensions: \
513                first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
514                which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
515                &p1, &p2
516            )
517        }
518    }
519
520    return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
521
522    /// Wrapper around `Vec<PathBuf>` that also tracks file extensions, so that
523    /// we don't have to traverse the entire list again to validate extensions.
524    struct OutPaths<'a, F: Fn(&PlRefPath) -> bool> {
525        paths: Vec<PlRefPath>,
526        exts: [Option<(PlSmallStr, PlRefPath)>; 2],
527        is_hidden_file: &'a F,
528    }
529
530    impl<F> OutPaths<'_, F>
531    where
532        F: Fn(&PlRefPath) -> bool,
533    {
534        fn push(&mut self, value: PlRefPath) {
535            if (self.is_hidden_file)(&value) {
536                return;
537            }
538
539            let exts = &mut self.exts;
540            Self::update_ext_status(exts, &value);
541
542            self.paths.push(value)
543        }
544
545        fn extend(&mut self, values: impl IntoIterator<Item = PlRefPath>) {
546            let exts = &mut self.exts;
547
548            self.paths.extend(
549                values
550                    .into_iter()
551                    .filter(|x| !(self.is_hidden_file)(x))
552                    .inspect(|x| {
553                        Self::update_ext_status(exts, x);
554                    }),
555            )
556        }
557
558        fn extend_from_slice(&mut self, values: &[PlRefPath]) {
559            self.extend(values.iter().cloned())
560        }
561
562        fn update_ext_status(exts: &mut [Option<(PlSmallStr, PlRefPath)>; 2], value: &PlRefPath) {
563            let ext = value
564                .extension()
565                .map_or(PlSmallStr::EMPTY, PlSmallStr::from);
566
567            if exts[0].is_none() {
568                exts[0] = Some((ext, value.clone()));
569            } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
570                exts[1] = Some((ext, value.clone()));
571            }
572        }
573    }
574}
575
576/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
577#[cfg(feature = "file_cache")]
578pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
579    let result = std::fs::create_dir_all(path);
580
581    if path.is_dir() { Ok(()) } else { result }
582}
583
584#[cfg(test)]
585mod tests {
586    use std::path::PathBuf;
587
588    use polars_utils::pl_path::PlRefPath;
589
590    use super::resolve_homedir;
591    use crate::pl_async::get_runtime;
592
593    #[cfg(not(target_os = "windows"))]
594    #[test]
595    fn test_resolve_homedir() {
596        let paths: Vec<PathBuf> = vec![
597            "~/dir1/dir2/test.csv".into(),
598            "/abs/path/test.csv".into(),
599            "rel/path/test.csv".into(),
600            "/".into(),
601            "~".into(),
602        ];
603
604        let resolved: Vec<PathBuf> = paths
605            .iter()
606            .map(resolve_homedir)
607            .map(|x| x.into_owned())
608            .collect();
609
610        assert_eq!(resolved[0].file_name(), paths[0].file_name());
611        assert!(resolved[0].is_absolute());
612        assert_eq!(resolved[1], paths[1]);
613        assert_eq!(resolved[2], paths[2]);
614        assert_eq!(resolved[3], paths[3]);
615        assert!(resolved[4].is_absolute());
616    }
617
618    #[cfg(target_os = "windows")]
619    #[test]
620    fn test_resolve_homedir_windows() {
621        let paths: Vec<PathBuf> = vec![
622            r#"c:\Users\user1\test.csv"#.into(),
623            r#"~\user1\test.csv"#.into(),
624            "~".into(),
625        ];
626
627        let resolved: Vec<PathBuf> = paths
628            .iter()
629            .map(resolve_homedir)
630            .map(|x| x.into_owned())
631            .collect();
632
633        assert_eq!(resolved[0], paths[0]);
634        assert_eq!(resolved[1].file_name(), paths[1].file_name());
635        assert!(resolved[1].is_absolute());
636        assert!(resolved[2].is_absolute());
637    }
638
639    #[test]
640    fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
641        // Don't confuse HTTP URL's with query parameters for globs.
642        // See https://github.com/pola-rs/polars/pull/17774
643
644        use super::expand_paths;
645
646        let path = "https://pola.rs/test.csv?token=bear";
647        let paths = &[PlRefPath::new(path)];
648        let out = get_runtime()
649            .block_on(expand_paths(paths, true, &[], &mut None))
650            .unwrap();
651        assert_eq!(out.as_ref(), paths);
652    }
653}