polars_io/path_utils/
mod.rs

1use std::collections::VecDeque;
2use std::path::{Path, PathBuf};
3use std::sync::LazyLock;
4
5use arrow::buffer::Buffer;
6use polars_core::config;
7use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
8use polars_utils::pl_str::PlSmallStr;
9use polars_utils::plpath::{CloudScheme, PlPath, PlPathRef};
10
11#[cfg(feature = "cloud")]
12mod hugging_face;
13
14use crate::cloud::CloudOptions;
15
16#[allow(clippy::bind_instead_of_map)]
17pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
18    (|| {
19        let verbose = config::verbose();
20
21        let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
22            if verbose {
23                eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
24            }
25            v
26        } else if cfg!(target_family = "unix") {
27            let id = std::env::var("USER")
28                .inspect(|_| {
29                    if verbose {
30                        eprintln!("init_temp_dir: sourced $USER")
31                    }
32                })
33                .or_else(|_e| {
34                    // We shouldn't hit here, but we can fallback to hashing $HOME if blake3 is
35                    // available (it is available when file_cache is activated).
36                    #[cfg(feature = "file_cache")]
37                    {
38                        std::env::var("HOME")
39                            .inspect(|_| {
40                                if verbose {
41                                    eprintln!("init_temp_dir: sourced $HOME")
42                                }
43                            })
44                            .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
45                    }
46                    #[cfg(not(feature = "file_cache"))]
47                    {
48                        Err(_e)
49                    }
50                });
51
52            if let Ok(v) = id {
53                std::env::temp_dir().join(format!("polars-{v}/"))
54            } else {
55                return Err(std::io::Error::other(
56                    "could not load $USER or $HOME environment variables",
57                ));
58            }
59        } else if cfg!(target_family = "windows") {
60            // Setting permissions on Windows is not as easy compared to Unix, but fortunately
61            // the default temporary directory location is underneath the user profile, so we
62            // shouldn't need to do anything.
63            std::env::temp_dir().join("polars/")
64        } else {
65            std::env::temp_dir().join("polars/")
66        }
67        .into_boxed_path();
68
69        if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
70            if !path.is_dir() {
71                panic!(
72                    "failed to create temporary directory: {} (path = {:?})",
73                    err,
74                    path.as_ref()
75                );
76            }
77        }
78
79        #[cfg(target_family = "unix")]
80        {
81            use std::os::unix::fs::PermissionsExt;
82
83            let result = (|| {
84                std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
85                let perms = std::fs::metadata(path.as_ref())?.permissions();
86
87                if (perms.mode() % 0o1000) != 0o700 {
88                    std::io::Result::Err(std::io::Error::other(format!(
89                        "permission mismatch: {perms:?}"
90                    )))
91                } else {
92                    std::io::Result::Ok(())
93                }
94            })()
95            .map_err(|e| {
96                std::io::Error::new(
97                    e.kind(),
98                    format!(
99                        "error setting temporary directory permissions: {} (path = {:?})",
100                        e,
101                        path.as_ref()
102                    ),
103                )
104            });
105
106            if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
107                result?;
108            }
109        }
110
111        std::io::Result::Ok(path)
112    })()
113    .map_err(|e| {
114        std::io::Error::new(
115            e.kind(),
116            format!(
117                "error initializing temporary directory: {e} \
118                 consider explicitly setting POLARS_TEMP_DIR"
119            ),
120        )
121    })
122    .unwrap()
123});
124
125/// Replaces a "~" in the Path with the home directory.
126pub fn resolve_homedir(path: &dyn AsRef<Path>) -> PathBuf {
127    let path = path.as_ref();
128
129    if path.starts_with("~") {
130        // home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297
131        #[cfg(not(target_family = "wasm"))]
132        if let Some(homedir) = home::home_dir() {
133            return homedir.join(path.strip_prefix("~").unwrap());
134        }
135    }
136
137    path.into()
138}
139
140/// Get the index of the first occurrence of a glob symbol.
141pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
142    memchr::memchr3(b'*', b'?', b'[', path)
143}
144
145/// Returns `true` if `expanded_paths` were expanded from a single directory
146pub fn expanded_from_single_directory(paths: &[PlPath], expanded_paths: &[PlPath]) -> bool {
147    // Single input that isn't a glob
148    paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().strip_scheme().as_bytes()).is_none()
149    // And isn't a file
150    && {
151        (
152            // For local paths, we can just use `is_dir`
153            paths[0].as_ref().as_local_path().is_some_and(|p| p.is_dir())
154        )
155        || (
156            // For cloud paths, we determine that the input path isn't a file by checking that the
157            // output path differs.
158            expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
159        )
160    }
161}
162
163/// Recursively traverses directories and expands globs if `glob` is `true`.
164pub fn expand_paths(
165    paths: &[PlPath],
166    glob: bool,
167    hidden_file_prefix: &[PlSmallStr],
168    #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
169) -> PolarsResult<Buffer<PlPath>> {
170    expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false).map(|x| x.0)
171}
172
173struct HiveIdxTracker<'a> {
174    idx: usize,
175    paths: &'a [PlPath],
176    check_directory_level: bool,
177}
178
179impl HiveIdxTracker<'_> {
180    fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
181        let check_directory_level = self.check_directory_level;
182        let paths = self.paths;
183
184        if check_directory_level
185            && ![usize::MAX, i].contains(&self.idx)
186            // They could still be the same directory level, just with different name length
187            && (path_idx > 0 && paths[path_idx].as_ref().parent() != paths[path_idx - 1].as_ref().parent())
188        {
189            polars_bail!(
190                InvalidOperation:
191                "attempted to read from different directory levels with hive partitioning enabled: \
192                first path: {}, second path: {}",
193                paths[path_idx - 1].display(),
194                paths[path_idx].display(),
195            )
196        } else {
197            self.idx = std::cmp::min(self.idx, i);
198            Ok(())
199        }
200    }
201}
202
203/// Recursively traverses directories and expands globs if `glob` is `true`.
204/// Returns the expanded paths and the index at which to start parsing hive
205/// partitions from the path.
206pub fn expand_paths_hive(
207    paths: &[PlPath],
208    glob: bool,
209    hidden_file_prefix: &[PlSmallStr],
210    #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
211    check_directory_level: bool,
212) -> PolarsResult<(Buffer<PlPath>, usize)> {
213    let Some(first_path) = paths.first() else {
214        return Ok((vec![].into(), 0));
215    };
216
217    let is_cloud = first_path.as_ref().is_cloud_url();
218
219    let is_hidden_file = move |path: &PlPath| {
220        path.as_ref()
221            .file_name()
222            .and_then(|x| x.to_str())
223            .is_some_and(|file_name| {
224                hidden_file_prefix
225                    .iter()
226                    .any(|x| file_name.starts_with(x.as_str()))
227            })
228    };
229
230    let mut out_paths = OutPaths {
231        paths: vec![],
232        exts: [None, None],
233        current_idx: 0,
234        is_hidden_file: &is_hidden_file,
235    };
236
237    let mut hive_idx_tracker = HiveIdxTracker {
238        idx: usize::MAX,
239        paths,
240        check_directory_level,
241    };
242
243    if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
244        #[cfg(feature = "cloud")]
245        {
246            use polars_utils::_limit_path_len_io_err;
247
248            use crate::cloud::object_path_from_str;
249
250            if first_path.cloud_scheme() == Some(CloudScheme::Hf) {
251                let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
252                    hugging_face::expand_paths_hf(
253                        paths,
254                        check_directory_level,
255                        cloud_options,
256                        glob,
257                    ),
258                )?;
259
260                return Ok((paths.into(), expand_start_idx));
261            }
262
263            let format_path = |scheme: &str, bucket: &str, location: &str| {
264                if is_cloud {
265                    format!("{scheme}://{bucket}/{location}")
266                } else {
267                    format!("/{location}")
268                }
269            };
270
271            let expand_path_cloud = |path: PlPathRef<'_>,
272                                     cloud_options: Option<&CloudOptions>|
273             -> PolarsResult<(usize, Vec<PlPath>)> {
274                crate::pl_async::get_runtime().block_in_place_on(async {
275                    let path_str = path.to_str();
276
277                    let (cloud_location, store) =
278                        crate::cloud::build_object_store(path, cloud_options, glob).await?;
279                    let prefix = object_path_from_str(&cloud_location.prefix)?;
280
281                    let out = if !path_str.ends_with("/")
282                        && (!glob || cloud_location.expansion.is_none())
283                        && {
284                            // We need to check if it is a directory for local paths (we can be here due
285                            // to FORCE_ASYNC). For cloud paths the convention is that the user must add
286                            // a trailing slash `/` to scan directories. We don't infer it as that would
287                            // mean sending one network request per path serially (very slow).
288                            path.is_cloud_url() || path.as_local_path().unwrap().is_file()
289                        } {
290                        (
291                            0,
292                            vec![PlPath::from_string(format_path(
293                                cloud_location.scheme,
294                                &cloud_location.bucket,
295                                prefix.as_ref(),
296                            ))],
297                        )
298                    } else {
299                        use futures::TryStreamExt;
300
301                        if let Some(path) = path.as_local_path() {
302                            // FORCE_ASYNC in the test suite wants us to raise a proper error message
303                            // for non-existent file paths. Note we can't do this for cloud paths as
304                            // there is no concept of a "directory" - a non-existent path is
305                            // indistinguishable from an empty directory.
306                            if !path.is_dir() {
307                                path.metadata()
308                                    .map_err(|err| _limit_path_len_io_err(path, err))?;
309                            }
310                        }
311
312                        let cloud_location = &cloud_location;
313
314                        let mut paths = store
315                            .try_exec_rebuild_on_err(|store| {
316                                let st = store.clone();
317
318                                async {
319                                    let store = st;
320                                    let out = store
321                                        .list(Some(&prefix))
322                                        .try_filter_map(|x| async move {
323                                            let out = (x.size > 0).then(|| {
324                                                PlPath::from_string({
325                                                    format_path(
326                                                        cloud_location.scheme,
327                                                        &cloud_location.bucket,
328                                                        x.location.as_ref(),
329                                                    )
330                                                })
331                                            });
332                                            Ok(out)
333                                        })
334                                        .try_collect::<Vec<_>>()
335                                        .await?;
336
337                                    Ok(out)
338                                }
339                            })
340                            .await?;
341
342                        // Since Path::parse() removes any trailing slash ('/'), we may need to restore it
343                        // to calculate the right byte offset
344                        let mut prefix = prefix.to_string();
345                        if path_str.ends_with('/') && !prefix.ends_with('/') {
346                            prefix.push('/')
347                        };
348
349                        paths.sort_unstable();
350
351                        (
352                            format_path(
353                                cloud_location.scheme,
354                                &cloud_location.bucket,
355                                prefix.as_ref(),
356                            )
357                            .len(),
358                            paths,
359                        )
360                    };
361
362                    PolarsResult::Ok(out)
363                })
364            };
365
366            for (path_idx, path) in paths.iter().enumerate() {
367                use std::borrow::Cow;
368
369                let mut path = Cow::Borrowed(path);
370
371                if matches!(
372                    path.cloud_scheme(),
373                    Some(CloudScheme::Http | CloudScheme::Https)
374                ) {
375                    let mut rewrite_aws = false;
376
377                    #[cfg(feature = "aws")]
378                    if let Some(p) = (|| {
379                        use crate::cloud::CloudConfig;
380
381                        // See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#virtual-hosted-style-access
382                        // Path format: https://bucket-name.s3.region-code.amazonaws.com/key-name
383                        let p = path.as_ref().as_ref();
384                        let after_scheme = p.strip_scheme();
385
386                        let bucket_end = after_scheme.find(".s3.")?;
387                        let offset = bucket_end + 4;
388                        // Search after offset to prevent matching `.s3.amazonaws.com` (legacy global endpoint URL without region).
389                        let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
390
391                        // Do not convert if '?' (this can be query parameters for AWS presigned URLs).
392                        if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
393                            return None;
394                        }
395
396                        let bucket = &after_scheme[..bucket_end];
397                        let region = &after_scheme[bucket_end + 4..region_end];
398                        let key = &after_scheme[region_end + 15..];
399
400                        if let CloudConfig::Aws(configs) = cloud_options
401                            .get_or_insert_default()
402                            .config
403                            .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
404                        {
405                            use object_store::aws::AmazonS3ConfigKey;
406
407                            if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
408                                configs.push((AmazonS3ConfigKey::Region, region.into()))
409                            }
410                        }
411
412                        Some(format!("s3://{bucket}/{key}"))
413                    })() {
414                        path = Cow::Owned(PlPath::from_string(p));
415                        rewrite_aws = true;
416                    }
417
418                    if !rewrite_aws {
419                        out_paths.push(path.into_owned());
420                        hive_idx_tracker.update(0, path_idx)?;
421                        continue;
422                    }
423                }
424
425                let glob_start_idx = get_glob_start_idx(path.to_str().as_bytes());
426
427                let path = if glob && glob_start_idx.is_some() {
428                    path.clone()
429                } else {
430                    let (expand_start_idx, paths) =
431                        expand_path_cloud(path.as_ref().as_ref(), cloud_options.as_ref())?;
432                    out_paths.extend_from_slice(&paths);
433                    hive_idx_tracker.update(expand_start_idx, path_idx)?;
434                    continue;
435                };
436
437                hive_idx_tracker.update(0, path_idx)?;
438
439                let iter = crate::pl_async::get_runtime().block_in_place_on(crate::async_glob(
440                    path.as_ref().as_ref(),
441                    cloud_options.as_ref(),
442                ))?;
443
444                if is_cloud {
445                    out_paths.extend(iter.into_iter().map(PlPath::from_string));
446                } else {
447                    // FORCE_ASYNC, remove leading file:// as not all readers support it.
448                    out_paths.extend(
449                        iter.iter()
450                            .map(|x| &x[7..])
451                            .map(|s| PlPathRef::new(s).into_owned()),
452                    )
453                }
454            }
455        }
456        #[cfg(not(feature = "cloud"))]
457        panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
458    } else {
459        let mut stack = VecDeque::new();
460        let mut paths_scratch = vec![];
461
462        for (path_idx, path) in paths.iter().enumerate() {
463            let path = path.as_ref();
464            let path = path.as_local_path().unwrap();
465            stack.clear();
466
467            if path.is_dir() {
468                let path = path.to_path_buf();
469
470                let i = path.to_str().unwrap().len();
471
472                hive_idx_tracker.update(i, path_idx)?;
473
474                stack.push_back(path.clone());
475
476                while let Some(dir) = stack.pop_front() {
477                    let mut last_err = Ok(());
478
479                    paths_scratch.clear();
480                    paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| match x {
481                        Ok(v) => Some(v.path()),
482                        Err(e) => {
483                            last_err = Err(e);
484                            None
485                        },
486                    }));
487
488                    last_err?;
489
490                    paths_scratch.sort_unstable();
491
492                    for path in paths_scratch.drain(..) {
493                        if path.is_dir() {
494                            stack.push_back(path);
495                        } else if path.metadata()?.len() > 0 {
496                            out_paths.push(PlPath::Local(path.into()));
497                        }
498                    }
499                }
500
501                continue;
502            }
503
504            let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());
505
506            if glob && i.is_some() {
507                hive_idx_tracker.update(0, path_idx)?;
508
509                let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
510                    polars_bail!(ComputeError: "invalid glob pattern given")
511                };
512
513                for path in paths {
514                    let path = path.map_err(to_compute_err)?;
515                    if !path.is_dir() && path.metadata()?.len() > 0 {
516                        out_paths.push(PlPath::Local(path.into()));
517                    }
518                }
519            } else {
520                hive_idx_tracker.update(0, path_idx)?;
521                out_paths.push(PlPath::Local(path.into()));
522            }
523        }
524    }
525
526    assert_eq!(out_paths.current_idx, out_paths.paths.len());
527
528    if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
529        if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
530            polars_bail!(
531                InvalidOperation: "directory contained paths with different file extensions: \
532                first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
533                which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
534                &out_paths.paths[i1].display(), &out_paths.paths[i2].display()
535            )
536        }
537    }
538
539    return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
540
541    /// Wrapper around `Vec<PathBuf>` that also tracks file extensions, so that
542    /// we don't have to traverse the entire list again to validate extensions.
543    struct OutPaths<'a, F: Fn(&PlPath) -> bool> {
544        paths: Vec<PlPath>,
545        exts: [Option<(PlSmallStr, usize)>; 2],
546        current_idx: usize,
547        is_hidden_file: &'a F,
548    }
549
550    impl<F> OutPaths<'_, F>
551    where
552        F: Fn(&PlPath) -> bool,
553    {
554        fn push(&mut self, value: PlPath) {
555            if (self.is_hidden_file)(&value) {
556                return;
557            }
558
559            let current_idx = &mut self.current_idx;
560            let exts = &mut self.exts;
561            Self::update_ext_status(current_idx, exts, value.as_ref());
562
563            self.paths.push(value)
564        }
565
566        fn extend(&mut self, values: impl IntoIterator<Item = PlPath>) {
567            let current_idx = &mut self.current_idx;
568            let exts = &mut self.exts;
569
570            self.paths.extend(
571                values
572                    .into_iter()
573                    .filter(|x| !(self.is_hidden_file)(x))
574                    .inspect(|x| {
575                        Self::update_ext_status(current_idx, exts, x.as_ref());
576                    }),
577            )
578        }
579
580        fn extend_from_slice(&mut self, values: &[PlPath]) {
581            self.extend(values.iter().cloned())
582        }
583
584        fn update_ext_status(
585            current_idx: &mut usize,
586            exts: &mut [Option<(PlSmallStr, usize)>; 2],
587            value: PlPathRef,
588        ) {
589            let ext = value
590                .extension()
591                .map(PlSmallStr::from)
592                .unwrap_or(PlSmallStr::EMPTY);
593
594            if exts[0].is_none() {
595                exts[0] = Some((ext, *current_idx));
596            } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
597                exts[1] = Some((ext, *current_idx));
598            }
599
600            *current_idx += 1;
601        }
602    }
603}
604
605/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
606#[cfg(feature = "file_cache")]
607pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
608    let result = std::fs::create_dir_all(path);
609
610    if path.is_dir() { Ok(()) } else { result }
611}
612
613#[cfg(test)]
614mod tests {
615    use std::path::PathBuf;
616
617    use polars_utils::plpath::PlPath;
618
619    use super::resolve_homedir;
620
621    #[cfg(not(target_os = "windows"))]
622    #[test]
623    fn test_resolve_homedir() {
624        let paths: Vec<PathBuf> = vec![
625            "~/dir1/dir2/test.csv".into(),
626            "/abs/path/test.csv".into(),
627            "rel/path/test.csv".into(),
628            "/".into(),
629            "~".into(),
630        ];
631
632        let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
633
634        assert_eq!(resolved[0].file_name(), paths[0].file_name());
635        assert!(resolved[0].is_absolute());
636        assert_eq!(resolved[1], paths[1]);
637        assert_eq!(resolved[2], paths[2]);
638        assert_eq!(resolved[3], paths[3]);
639        assert!(resolved[4].is_absolute());
640    }
641
642    #[cfg(target_os = "windows")]
643    #[test]
644    fn test_resolve_homedir_windows() {
645        let paths: Vec<PathBuf> = vec![
646            r#"c:\Users\user1\test.csv"#.into(),
647            r#"~\user1\test.csv"#.into(),
648            "~".into(),
649        ];
650
651        let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
652
653        assert_eq!(resolved[0], paths[0]);
654        assert_eq!(resolved[1].file_name(), paths[1].file_name());
655        assert!(resolved[1].is_absolute());
656        assert!(resolved[2].is_absolute());
657    }
658
659    #[test]
660    fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
661        // Don't confuse HTTP URL's with query parameters for globs.
662        // See https://github.com/pola-rs/polars/pull/17774
663
664        use super::expand_paths;
665
666        let path = "https://pola.rs/test.csv?token=bear";
667        let paths = &[PlPath::new(path)];
668        let out = expand_paths(paths, true, &[], &mut None).unwrap();
669        assert_eq!(out.as_ref(), paths);
670    }
671}