polars_io/path_utils/
mod.rs

1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::path::{Path, PathBuf};
4use std::sync::LazyLock;
5
6use arrow::buffer::Buffer;
7use polars_core::config;
8use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
9use polars_utils::pl_path::{CloudScheme, PlRefPath};
10use polars_utils::pl_str::PlSmallStr;
11
12#[cfg(feature = "cloud")]
13mod hugging_face;
14
15use crate::cloud::CloudOptions;
16
17#[allow(clippy::bind_instead_of_map)]
18pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
19    (|| {
20        let verbose = config::verbose();
21
22        let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
23            if verbose {
24                eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
25            }
26            v
27        } else if cfg!(target_family = "unix") {
28            let id = std::env::var("USER")
29                .inspect(|_| {
30                    if verbose {
31                        eprintln!("init_temp_dir: sourced $USER")
32                    }
33                })
34                .or_else(|_e| {
35                    // We shouldn't hit here, but we can fallback to hashing $HOME if blake3 is
36                    // available (it is available when file_cache is activated).
37                    #[cfg(feature = "file_cache")]
38                    {
39                        std::env::var("HOME")
40                            .inspect(|_| {
41                                if verbose {
42                                    eprintln!("init_temp_dir: sourced $HOME")
43                                }
44                            })
45                            .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
46                    }
47                    #[cfg(not(feature = "file_cache"))]
48                    {
49                        Err(_e)
50                    }
51                });
52
53            if let Ok(v) = id {
54                std::env::temp_dir().join(format!("polars-{v}/"))
55            } else {
56                return Err(std::io::Error::other(
57                    "could not load $USER or $HOME environment variables",
58                ));
59            }
60        } else if cfg!(target_family = "windows") {
61            // Setting permissions on Windows is not as easy compared to Unix, but fortunately
62            // the default temporary directory location is underneath the user profile, so we
63            // shouldn't need to do anything.
64            std::env::temp_dir().join("polars/")
65        } else {
66            std::env::temp_dir().join("polars/")
67        }
68        .into_boxed_path();
69
70        if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
71            if !path.is_dir() {
72                panic!(
73                    "failed to create temporary directory: {} (path = {:?})",
74                    err,
75                    path.as_ref()
76                );
77            }
78        }
79
80        #[cfg(target_family = "unix")]
81        {
82            use std::os::unix::fs::PermissionsExt;
83
84            let result = (|| {
85                std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
86                let perms = std::fs::metadata(path.as_ref())?.permissions();
87
88                if (perms.mode() % 0o1000) != 0o700 {
89                    std::io::Result::Err(std::io::Error::other(format!(
90                        "permission mismatch: {perms:?}"
91                    )))
92                } else {
93                    std::io::Result::Ok(())
94                }
95            })()
96            .map_err(|e| {
97                std::io::Error::new(
98                    e.kind(),
99                    format!(
100                        "error setting temporary directory permissions: {} (path = {:?})",
101                        e,
102                        path.as_ref()
103                    ),
104                )
105            });
106
107            if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
108                result?;
109            }
110        }
111
112        std::io::Result::Ok(path)
113    })()
114    .map_err(|e| {
115        std::io::Error::new(
116            e.kind(),
117            format!(
118                "error initializing temporary directory: {e} \
119                 consider explicitly setting POLARS_TEMP_DIR"
120            ),
121        )
122    })
123    .unwrap()
124});
125
126/// Replaces a "~" in the Path with the home directory.
127pub fn resolve_homedir<'a, S: AsRef<Path> + ?Sized>(path: &'a S) -> Cow<'a, Path> {
128    return inner(path.as_ref());
129
130    fn inner(path: &Path) -> Cow<'_, Path> {
131        if path.starts_with("~") {
132            // home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297
133            #[cfg(not(target_family = "wasm"))]
134            if let Some(homedir) = home::home_dir() {
135                return Cow::Owned(homedir.join(path.strip_prefix("~").unwrap()));
136            }
137        }
138
139        Cow::Borrowed(path)
140    }
141}
142
143/// Get the index of the first occurrence of a glob symbol.
144pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
145    memchr::memchr3(b'*', b'?', b'[', path)
146}
147
148/// Returns `true` if `expanded_paths` were expanded from a single directory
149pub fn expanded_from_single_directory(paths: &[PlRefPath], expanded_paths: &[PlRefPath]) -> bool {
150    // Single input that isn't a glob
151    paths.len() == 1 && get_glob_start_idx(paths[0].strip_scheme().as_bytes()).is_none()
152    // And isn't a file
153    && {
154        (
155            // For local paths, we can just use `is_dir`
156            !paths[0].has_scheme() && paths[0].as_std_path().is_dir()
157        )
158        || (
159            // For cloud paths, we determine that the input path isn't a file by checking that the
160            // output path differs.
161            expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
162        )
163    }
164}
165
166/// Recursively traverses directories and expands globs if `glob` is `true`.
167pub fn expand_paths(
168    paths: &[PlRefPath],
169    glob: bool,
170    hidden_file_prefix: &[PlSmallStr],
171    #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
172) -> PolarsResult<Buffer<PlRefPath>> {
173    expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false).map(|x| x.0)
174}
175
176struct HiveIdxTracker<'a> {
177    idx: usize,
178    paths: &'a [PlRefPath],
179    check_directory_level: bool,
180}
181
182impl HiveIdxTracker<'_> {
183    fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
184        let check_directory_level = self.check_directory_level;
185        let paths = self.paths;
186
187        if check_directory_level
188            && ![usize::MAX, i].contains(&self.idx)
189            // They could still be the same directory level, just with different name length
190            && (path_idx > 0 && paths[path_idx].parent() != paths[path_idx - 1].parent())
191        {
192            polars_bail!(
193                InvalidOperation:
194                "attempted to read from different directory levels with hive partitioning enabled: \
195                first path: {}, second path: {}",
196                &paths[path_idx - 1],
197                &paths[path_idx],
198            )
199        } else {
200            self.idx = std::cmp::min(self.idx, i);
201            Ok(())
202        }
203    }
204}
205
206/// Recursively traverses directories and expands globs if `glob` is `true`.
207/// Returns the expanded paths and the index at which to start parsing hive
208/// partitions from the path.
209pub fn expand_paths_hive(
210    paths: &[PlRefPath],
211    glob: bool,
212    hidden_file_prefix: &[PlSmallStr],
213    #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
214    check_directory_level: bool,
215) -> PolarsResult<(Buffer<PlRefPath>, usize)> {
216    let Some(first_path) = paths.first() else {
217        return Ok((vec![].into(), 0));
218    };
219
220    let first_path_has_scheme = first_path.has_scheme();
221
222    let is_hidden_file = move |path: &PlRefPath| {
223        path.file_name()
224            .and_then(|x| x.to_str())
225            .is_some_and(|file_name| {
226                hidden_file_prefix
227                    .iter()
228                    .any(|x| file_name.starts_with(x.as_str()))
229            })
230    };
231
232    let mut out_paths = OutPaths {
233        paths: vec![],
234        exts: [None, None],
235        current_idx: 0,
236        is_hidden_file: &is_hidden_file,
237    };
238
239    let mut hive_idx_tracker = HiveIdxTracker {
240        idx: usize::MAX,
241        paths,
242        check_directory_level,
243    };
244
245    if first_path_has_scheme || { cfg!(not(target_family = "windows")) && config::force_async() } {
246        #[cfg(feature = "cloud")]
247        {
248            use polars_utils::_limit_path_len_io_err;
249
250            use crate::cloud::object_path_from_str;
251
252            if first_path.scheme() == Some(CloudScheme::Hf) {
253                let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
254                    hugging_face::expand_paths_hf(
255                        paths,
256                        check_directory_level,
257                        cloud_options,
258                        glob,
259                    ),
260                )?;
261
262                return Ok((paths.into(), expand_start_idx));
263            }
264
265            let format_path = |scheme: &str, bucket: &str, location: &str| {
266                if first_path_has_scheme {
267                    format!("{scheme}://{bucket}/{location}")
268                } else {
269                    format!("/{location}")
270                }
271            };
272
273            let expand_path_cloud = |path: PlRefPath,
274                                     cloud_options: Option<&CloudOptions>|
275             -> PolarsResult<(usize, Vec<PlRefPath>)> {
276                crate::pl_async::get_runtime().block_in_place_on(async {
277                    let path_str = path.as_str();
278
279                    let (cloud_location, store) =
280                        crate::cloud::build_object_store(path.clone(), cloud_options, glob).await?;
281                    let prefix = object_path_from_str(&cloud_location.prefix)?;
282
283                    let out = if !path_str.ends_with("/")
284                        && (!glob || cloud_location.expansion.is_none())
285                        && {
286                            // We need to check if it is a directory for local paths (we can be here due
287                            // to FORCE_ASYNC). For cloud paths the convention is that the user must add
288                            // a trailing slash `/` to scan directories. We don't infer it as that would
289                            // mean sending one network request per path serially (very slow).
290                            path.has_scheme() || path.as_std_path().is_file()
291                        } {
292                        (
293                            0,
294                            vec![PlRefPath::new(format_path(
295                                cloud_location.scheme,
296                                &cloud_location.bucket,
297                                prefix.as_ref(),
298                            ))],
299                        )
300                    } else {
301                        use futures::TryStreamExt;
302
303                        if !path.has_scheme() {
304                            // FORCE_ASYNC in the test suite wants us to raise a proper error message
305                            // for non-existent file paths. Note we can't do this for cloud paths as
306                            // there is no concept of a "directory" - a non-existent path is
307                            // indistinguishable from an empty directory.
308                            path.as_std_path()
309                                .metadata()
310                                .map_err(|err| _limit_path_len_io_err(path.as_std_path(), err))?;
311                        }
312
313                        let cloud_location = &cloud_location;
314
315                        let mut paths = store
316                            .try_exec_rebuild_on_err(|store| {
317                                let st = store.clone();
318
319                                async {
320                                    let store = st;
321                                    let out = store
322                                        .list(Some(&prefix))
323                                        .try_filter_map(|x| async move {
324                                            let out = (x.size > 0).then(|| {
325                                                PlRefPath::new({
326                                                    format_path(
327                                                        cloud_location.scheme,
328                                                        &cloud_location.bucket,
329                                                        x.location.as_ref(),
330                                                    )
331                                                })
332                                            });
333                                            Ok(out)
334                                        })
335                                        .try_collect::<Vec<_>>()
336                                        .await?;
337
338                                    Ok(out)
339                                }
340                            })
341                            .await?;
342
343                        // Since Path::parse() removes any trailing slash ('/'), we may need to restore it
344                        // to calculate the right byte offset
345                        let mut prefix = prefix.to_string();
346                        if path_str.ends_with('/') && !prefix.ends_with('/') {
347                            prefix.push('/')
348                        };
349
350                        paths.sort_unstable();
351
352                        (
353                            format_path(
354                                cloud_location.scheme,
355                                &cloud_location.bucket,
356                                prefix.as_ref(),
357                            )
358                            .len(),
359                            paths,
360                        )
361                    };
362
363                    PolarsResult::Ok(out)
364                })
365            };
366
367            for (path_idx, path) in paths.iter().enumerate() {
368                use std::borrow::Cow;
369
370                let mut path = Cow::Borrowed(path);
371
372                if matches!(path.scheme(), Some(CloudScheme::Http | CloudScheme::Https)) {
373                    let mut rewrite_aws = false;
374
375                    #[cfg(feature = "aws")]
376                    if let Some(p) = (|| {
377                        use crate::cloud::CloudConfig;
378
379                        // See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#virtual-hosted-style-access
380                        // Path format: https://bucket-name.s3.region-code.amazonaws.com/key-name
381                        let after_scheme = path.strip_scheme();
382
383                        let bucket_end = after_scheme.find(".s3.")?;
384                        let offset = bucket_end + 4;
385                        // Search after offset to prevent matching `.s3.amazonaws.com` (legacy global endpoint URL without region).
386                        let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
387
388                        // Do not convert if '?' (this can be query parameters for AWS presigned URLs).
389                        if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
390                            return None;
391                        }
392
393                        let bucket = &after_scheme[..bucket_end];
394                        let region = &after_scheme[bucket_end + 4..region_end];
395                        let key = &after_scheme[region_end + 15..];
396
397                        if let CloudConfig::Aws(configs) = cloud_options
398                            .get_or_insert_default()
399                            .config
400                            .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
401                        {
402                            use object_store::aws::AmazonS3ConfigKey;
403
404                            if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
405                                configs.push((AmazonS3ConfigKey::Region, region.into()))
406                            }
407                        }
408
409                        Some(format!("s3://{bucket}/{key}"))
410                    })() {
411                        path = Cow::Owned(PlRefPath::new(p));
412                        rewrite_aws = true;
413                    }
414
415                    if !rewrite_aws {
416                        out_paths.push(path.into_owned());
417                        hive_idx_tracker.update(0, path_idx)?;
418                        continue;
419                    }
420                }
421
422                let glob_start_idx = get_glob_start_idx(path.as_bytes());
423
424                let path = if glob && glob_start_idx.is_some() {
425                    path.clone()
426                } else {
427                    let (expand_start_idx, paths) =
428                        expand_path_cloud(path.into_owned(), cloud_options.as_ref())?;
429                    out_paths.extend_from_slice(&paths);
430                    hive_idx_tracker.update(expand_start_idx, path_idx)?;
431                    continue;
432                };
433
434                hive_idx_tracker.update(0, path_idx)?;
435
436                let iter = crate::pl_async::get_runtime().block_in_place_on(crate::async_glob(
437                    path.into_owned(),
438                    cloud_options.as_ref(),
439                ))?;
440
441                if first_path_has_scheme {
442                    out_paths.extend(iter.into_iter().map(PlRefPath::new))
443                } else {
444                    // FORCE_ASYNC, remove leading file:// as the caller may not be expecting a
445                    // URI result.
446                    out_paths.extend(iter.iter().map(|x| &x[7..]).map(PlRefPath::new))
447                };
448            }
449        }
450        #[cfg(not(feature = "cloud"))]
451        panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
452    } else {
453        let mut stack = VecDeque::new();
454        let mut paths_scratch: Vec<Cow<'_, Path>> = vec![];
455
456        for (path_idx, path) in paths.iter().enumerate() {
457            stack.clear();
458
459            if path.as_std_path().is_dir() {
460                let i = path.as_str().len();
461                let path = Cow::Borrowed(path.as_std_path());
462
463                hive_idx_tracker.update(i, path_idx)?;
464
465                stack.push_back(path.clone());
466
467                while let Some(dir) = stack.pop_front() {
468                    let mut last_err = Ok(());
469
470                    paths_scratch.clear();
471                    paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| match x {
472                        Ok(v) => Some(Cow::Owned(v.path())),
473                        Err(e) => {
474                            last_err = Err(e);
475                            None
476                        },
477                    }));
478
479                    last_err?;
480
481                    paths_scratch.sort_unstable();
482
483                    for path in paths_scratch.drain(..) {
484                        if path.is_dir() {
485                            stack.push_back(path);
486                        } else if path.metadata()?.len() > 0 {
487                            out_paths.push(PlRefPath::try_from_path(&path)?);
488                        }
489                    }
490                }
491
492                continue;
493            }
494
495            let i = get_glob_start_idx(path.as_bytes());
496
497            if glob && i.is_some() {
498                hive_idx_tracker.update(0, path_idx)?;
499
500                let Ok(paths) = glob::glob(path.as_str()) else {
501                    polars_bail!(ComputeError: "invalid glob pattern given")
502                };
503
504                for path in paths {
505                    let path = path.map_err(to_compute_err)?;
506                    if !path.is_dir() && path.metadata()?.len() > 0 {
507                        out_paths.push(PlRefPath::try_from_path(&path)?);
508                    }
509                }
510            } else {
511                hive_idx_tracker.update(0, path_idx)?;
512                out_paths.push(path.clone());
513            }
514        }
515    }
516
517    assert_eq!(out_paths.current_idx, out_paths.paths.len());
518
519    if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
520        if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
521            polars_bail!(
522                InvalidOperation: "directory contained paths with different file extensions: \
523                first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
524                which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
525                &out_paths.paths[i1], &out_paths.paths[i2]
526            )
527        }
528    }
529
530    return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
531
532    /// Wrapper around `Vec<PathBuf>` that also tracks file extensions, so that
533    /// we don't have to traverse the entire list again to validate extensions.
534    struct OutPaths<'a, F: Fn(&PlRefPath) -> bool> {
535        paths: Vec<PlRefPath>,
536        exts: [Option<(PlSmallStr, usize)>; 2],
537        current_idx: usize,
538        is_hidden_file: &'a F,
539    }
540
541    impl<F> OutPaths<'_, F>
542    where
543        F: Fn(&PlRefPath) -> bool,
544    {
545        fn push(&mut self, value: PlRefPath) {
546            if (self.is_hidden_file)(&value) {
547                return;
548            }
549
550            let current_idx = &mut self.current_idx;
551            let exts = &mut self.exts;
552            Self::update_ext_status(current_idx, exts, &value);
553
554            self.paths.push(value)
555        }
556
557        fn extend(&mut self, values: impl IntoIterator<Item = PlRefPath>) {
558            let current_idx = &mut self.current_idx;
559            let exts = &mut self.exts;
560
561            self.paths.extend(
562                values
563                    .into_iter()
564                    .filter(|x| !(self.is_hidden_file)(x))
565                    .inspect(|x| {
566                        Self::update_ext_status(current_idx, exts, x);
567                    }),
568            )
569        }
570
571        fn extend_from_slice(&mut self, values: &[PlRefPath]) {
572            self.extend(values.iter().cloned())
573        }
574
575        fn update_ext_status(
576            current_idx: &mut usize,
577            exts: &mut [Option<(PlSmallStr, usize)>; 2],
578            value: &PlRefPath,
579        ) {
580            let ext = value
581                .extension()
582                .map_or(PlSmallStr::EMPTY, PlSmallStr::from);
583
584            if exts[0].is_none() {
585                exts[0] = Some((ext, *current_idx));
586            } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
587                exts[1] = Some((ext, *current_idx));
588            }
589
590            *current_idx += 1;
591        }
592    }
593}
594
595/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
596#[cfg(feature = "file_cache")]
597pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
598    let result = std::fs::create_dir_all(path);
599
600    if path.is_dir() { Ok(()) } else { result }
601}
602
603#[cfg(test)]
604mod tests {
605    use std::path::PathBuf;
606
607    use polars_utils::pl_path::PlRefPath;
608
609    use super::resolve_homedir;
610
611    #[cfg(not(target_os = "windows"))]
612    #[test]
613    fn test_resolve_homedir() {
614        let paths: Vec<PathBuf> = vec![
615            "~/dir1/dir2/test.csv".into(),
616            "/abs/path/test.csv".into(),
617            "rel/path/test.csv".into(),
618            "/".into(),
619            "~".into(),
620        ];
621
622        let resolved: Vec<PathBuf> = paths
623            .iter()
624            .map(resolve_homedir)
625            .map(|x| x.into_owned())
626            .collect();
627
628        assert_eq!(resolved[0].file_name(), paths[0].file_name());
629        assert!(resolved[0].is_absolute());
630        assert_eq!(resolved[1], paths[1]);
631        assert_eq!(resolved[2], paths[2]);
632        assert_eq!(resolved[3], paths[3]);
633        assert!(resolved[4].is_absolute());
634    }
635
636    #[cfg(target_os = "windows")]
637    #[test]
638    fn test_resolve_homedir_windows() {
639        let paths: Vec<PathBuf> = vec![
640            r#"c:\Users\user1\test.csv"#.into(),
641            r#"~\user1\test.csv"#.into(),
642            "~".into(),
643        ];
644
645        let resolved: Vec<PathBuf> = paths
646            .iter()
647            .map(resolve_homedir)
648            .map(|x| x.into_owned())
649            .collect();
650
651        assert_eq!(resolved[0], paths[0]);
652        assert_eq!(resolved[1].file_name(), paths[1].file_name());
653        assert!(resolved[1].is_absolute());
654        assert!(resolved[2].is_absolute());
655    }
656
657    #[test]
658    fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
659        // Don't confuse HTTP URL's with query parameters for globs.
660        // See https://github.com/pola-rs/polars/pull/17774
661
662        use super::expand_paths;
663
664        let path = "https://pola.rs/test.csv?token=bear";
665        let paths = &[PlRefPath::new(path)];
666        let out = expand_paths(paths, true, &[], &mut None).unwrap();
667        assert_eq!(out.as_ref(), paths);
668    }
669}