polars_io/path_utils/
mod.rs

1use std::collections::VecDeque;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, LazyLock};
4
5use polars_core::config;
6use polars_core::error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::pl_str::PlSmallStr;
8use polars_utils::plpath::{CloudScheme, PlPath, PlPathRef};
9
10#[cfg(feature = "cloud")]
11mod hugging_face;
12
13use crate::cloud::CloudOptions;
14
15#[allow(clippy::bind_instead_of_map)]
16pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
17    (|| {
18        let verbose = config::verbose();
19
20        let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
21            if verbose {
22                eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
23            }
24            v
25        } else if cfg!(target_family = "unix") {
26            let id = std::env::var("USER")
27                .inspect(|_| {
28                    if verbose {
29                        eprintln!("init_temp_dir: sourced $USER")
30                    }
31                })
32                .or_else(|_e| {
33                    // We shouldn't hit here, but we can fallback to hashing $HOME if blake3 is
34                    // available (it is available when file_cache is activated).
35                    #[cfg(feature = "file_cache")]
36                    {
37                        std::env::var("HOME")
38                            .inspect(|_| {
39                                if verbose {
40                                    eprintln!("init_temp_dir: sourced $HOME")
41                                }
42                            })
43                            .map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
44                    }
45                    #[cfg(not(feature = "file_cache"))]
46                    {
47                        Err(_e)
48                    }
49                });
50
51            if let Ok(v) = id {
52                std::env::temp_dir().join(format!("polars-{v}/"))
53            } else {
54                return Err(std::io::Error::other(
55                    "could not load $USER or $HOME environment variables",
56                ));
57            }
58        } else if cfg!(target_family = "windows") {
59            // Setting permissions on Windows is not as easy compared to Unix, but fortunately
60            // the default temporary directory location is underneath the user profile, so we
61            // shouldn't need to do anything.
62            std::env::temp_dir().join("polars/")
63        } else {
64            std::env::temp_dir().join("polars/")
65        }
66        .into_boxed_path();
67
68        if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
69            if !path.is_dir() {
70                panic!(
71                    "failed to create temporary directory: {} (path = {:?})",
72                    err,
73                    path.as_ref()
74                );
75            }
76        }
77
78        #[cfg(target_family = "unix")]
79        {
80            use std::os::unix::fs::PermissionsExt;
81
82            let result = (|| {
83                std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
84                let perms = std::fs::metadata(path.as_ref())?.permissions();
85
86                if (perms.mode() % 0o1000) != 0o700 {
87                    std::io::Result::Err(std::io::Error::other(format!(
88                        "permission mismatch: {perms:?}"
89                    )))
90                } else {
91                    std::io::Result::Ok(())
92                }
93            })()
94            .map_err(|e| {
95                std::io::Error::new(
96                    e.kind(),
97                    format!(
98                        "error setting temporary directory permissions: {} (path = {:?})",
99                        e,
100                        path.as_ref()
101                    ),
102                )
103            });
104
105            if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
106                result?;
107            }
108        }
109
110        std::io::Result::Ok(path)
111    })()
112    .map_err(|e| {
113        std::io::Error::new(
114            e.kind(),
115            format!(
116                "error initializing temporary directory: {e} \
117                 consider explicitly setting POLARS_TEMP_DIR"
118            ),
119        )
120    })
121    .unwrap()
122});
123
124/// Replaces a "~" in the Path with the home directory.
125pub fn resolve_homedir(path: &dyn AsRef<Path>) -> PathBuf {
126    let path = path.as_ref();
127
128    if path.starts_with("~") {
129        // home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297
130        #[cfg(not(target_family = "wasm"))]
131        if let Some(homedir) = home::home_dir() {
132            return homedir.join(path.strip_prefix("~").unwrap());
133        }
134    }
135
136    path.into()
137}
138
139/// Get the index of the first occurrence of a glob symbol.
140pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
141    memchr::memchr3(b'*', b'?', b'[', path)
142}
143
144/// Returns `true` if `expanded_paths` were expanded from a single directory
145pub fn expanded_from_single_directory(addrs: &[PlPath], expanded_addrs: &[PlPath]) -> bool {
146    // Single input that isn't a glob
147    addrs.len() == 1 && get_glob_start_idx(addrs[0].as_ref().strip_scheme().as_bytes()).is_none()
148    // And isn't a file
149    && {
150        (
151            // For local paths, we can just use `is_dir`
152            addrs[0].as_ref().as_local_path().is_some_and(|p| p.is_dir())
153        )
154        || (
155            // For cloud paths, we determine that the input path isn't a file by checking that the
156            // output path differs.
157            expanded_addrs.is_empty() || (addrs[0] != expanded_addrs[0])
158        )
159    }
160}
161
162/// Recursively traverses directories and expands globs if `glob` is `true`.
163pub fn expand_paths(
164    paths: &[PlPath],
165    glob: bool,
166    #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
167) -> PolarsResult<Arc<[PlPath]>> {
168    expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
169}
170
171struct HiveIdxTracker<'a> {
172    idx: usize,
173    paths: &'a [PlPath],
174    check_directory_level: bool,
175}
176
177impl HiveIdxTracker<'_> {
178    fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
179        let check_directory_level = self.check_directory_level;
180        let paths = self.paths;
181
182        if check_directory_level
183            && ![usize::MAX, i].contains(&self.idx)
184            // They could still be the same directory level, just with different name length
185            && (path_idx > 0 && paths[path_idx].as_ref().parent() != paths[path_idx - 1].as_ref().parent())
186        {
187            polars_bail!(
188                InvalidOperation:
189                "attempted to read from different directory levels with hive partitioning enabled: \
190                first path: {}, second path: {}",
191                paths[path_idx - 1].display(),
192                paths[path_idx].display(),
193            )
194        } else {
195            self.idx = std::cmp::min(self.idx, i);
196            Ok(())
197        }
198    }
199}
200
201/// Recursively traverses directories and expands globs if `glob` is `true`.
202/// Returns the expanded paths and the index at which to start parsing hive
203/// partitions from the path.
204pub fn expand_paths_hive(
205    paths: &[PlPath],
206    glob: bool,
207    #[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
208    check_directory_level: bool,
209) -> PolarsResult<(Arc<[PlPath]>, usize)> {
210    let Some(first_path) = paths.first() else {
211        return Ok((vec![].into(), 0));
212    };
213
214    let is_cloud = first_path.as_ref().is_cloud_url();
215
216    /// Wrapper around `Vec<PathBuf>` that also tracks file extensions, so that
217    /// we don't have to traverse the entire list again to validate extensions.
218    struct OutPaths {
219        paths: Vec<PlPath>,
220        exts: [Option<(PlSmallStr, usize)>; 2],
221        current_idx: usize,
222    }
223
224    impl OutPaths {
225        fn update_ext_status(
226            current_idx: &mut usize,
227            exts: &mut [Option<(PlSmallStr, usize)>; 2],
228            value: PlPathRef,
229        ) {
230            let ext = value
231                .extension()
232                .map(PlSmallStr::from)
233                .unwrap_or(PlSmallStr::EMPTY);
234
235            if exts[0].is_none() {
236                exts[0] = Some((ext, *current_idx));
237            } else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
238                exts[1] = Some((ext, *current_idx));
239            }
240
241            *current_idx += 1;
242        }
243
244        fn push(&mut self, value: PlPath) {
245            {
246                let current_idx = &mut self.current_idx;
247                let exts = &mut self.exts;
248                Self::update_ext_status(current_idx, exts, value.as_ref());
249            }
250            self.paths.push(value)
251        }
252
253        fn extend(&mut self, values: impl IntoIterator<Item = PlPath>) {
254            let current_idx = &mut self.current_idx;
255            let exts = &mut self.exts;
256
257            self.paths.extend(values.into_iter().inspect(|x| {
258                Self::update_ext_status(current_idx, exts, x.as_ref());
259            }))
260        }
261
262        fn extend_from_slice(&mut self, values: &[PlPath]) {
263            self.extend(values.iter().cloned())
264        }
265    }
266
267    let mut out_paths = OutPaths {
268        paths: vec![],
269        exts: [None, None],
270        current_idx: 0,
271    };
272
273    let mut hive_idx_tracker = HiveIdxTracker {
274        idx: usize::MAX,
275        paths,
276        check_directory_level,
277    };
278
279    if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
280        #[cfg(feature = "cloud")]
281        {
282            use polars_utils::_limit_path_len_io_err;
283
284            use crate::cloud::object_path_from_str;
285
286            if first_path.cloud_scheme() == Some(CloudScheme::Hf) {
287                let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
288                    hugging_face::expand_paths_hf(
289                        paths,
290                        check_directory_level,
291                        cloud_options,
292                        glob,
293                    ),
294                )?;
295
296                return Ok((Arc::from(paths), expand_start_idx));
297            }
298
299            let format_path = |scheme: &str, bucket: &str, location: &str| {
300                if is_cloud {
301                    format!("{scheme}://{bucket}/{location}")
302                } else {
303                    format!("/{location}")
304                }
305            };
306
307            let expand_path_cloud = |addr: &str,
308                                     cloud_options: Option<&CloudOptions>|
309             -> PolarsResult<(usize, Vec<PlPath>)> {
310                crate::pl_async::get_runtime().block_in_place_on(async {
311                    let (cloud_location, store) =
312                        crate::cloud::build_object_store(addr, cloud_options, glob).await?;
313                    let prefix = object_path_from_str(&cloud_location.prefix)?;
314
315                    let out = if !addr.ends_with("/")
316                        && (!glob || cloud_location.expansion.is_none())
317                        && {
318                            // We need to check if it is a directory for local paths (we can be here due
319                            // to FORCE_ASYNC). For cloud paths the convention is that the user must add
320                            // a trailing slash `/` to scan directories. We don't infer it as that would
321                            // mean sending one network request per path serially (very slow).
322                            is_cloud || Path::new(addr).is_file()
323                        } {
324                        (
325                            0,
326                            vec![PlPath::from_string(format_path(
327                                &cloud_location.scheme,
328                                &cloud_location.bucket,
329                                prefix.as_ref(),
330                            ))],
331                        )
332                    } else {
333                        use futures::TryStreamExt;
334
335                        if !is_cloud {
336                            // FORCE_ASYNC in the test suite wants us to raise a proper error message
337                            // for non-existent file paths. Note we can't do this for cloud paths as
338                            // there is no concept of a "directory" - a non-existent path is
339                            // indistinguishable from an empty directory.
340                            let path = PathBuf::from(addr);
341                            if !path.is_dir() {
342                                path.metadata()
343                                    .map_err(|err| _limit_path_len_io_err(&path, err))?;
344                            }
345                        }
346
347                        let cloud_location = &cloud_location;
348
349                        let mut paths = store
350                            .try_exec_rebuild_on_err(|store| {
351                                let st = store.clone();
352
353                                async {
354                                    let store = st;
355                                    let out = store
356                                        .list(Some(&prefix))
357                                        .try_filter_map(|x| async move {
358                                            let out = (x.size > 0).then(|| {
359                                                PlPath::from_string({
360                                                    format_path(
361                                                        &cloud_location.scheme,
362                                                        &cloud_location.bucket,
363                                                        x.location.as_ref(),
364                                                    )
365                                                })
366                                            });
367                                            Ok(out)
368                                        })
369                                        .try_collect::<Vec<_>>()
370                                        .await?;
371
372                                    Ok(out)
373                                }
374                            })
375                            .await?;
376
377                        // Since Path::parse() removes any trailing slash ('/'), we may need to restore it
378                        // to calculate the right byte offset
379                        let mut prefix = prefix.to_string();
380                        if addr.ends_with('/') {
381                            prefix.push('/')
382                        };
383
384                        paths.sort_unstable();
385                        (
386                            format_path(
387                                &cloud_location.scheme,
388                                &cloud_location.bucket,
389                                prefix.as_ref(),
390                            )
391                            .len(),
392                            paths,
393                        )
394                    };
395
396                    PolarsResult::Ok(out)
397                })
398            };
399
400            for (path_idx, path) in paths.iter().enumerate() {
401                use std::borrow::Cow;
402
403                let mut path = Cow::Borrowed(path);
404
405                if matches!(
406                    path.cloud_scheme(),
407                    Some(CloudScheme::Http | CloudScheme::Https)
408                ) {
409                    let mut rewrite_aws = false;
410
411                    #[cfg(feature = "aws")]
412                    if let Some(p) = (|| {
413                        use crate::cloud::CloudConfig;
414
415                        // See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#virtual-hosted-style-access
416                        // Path format: https://bucket-name.s3.region-code.amazonaws.com/key-name
417                        let p = path.as_ref().as_ref();
418                        let after_scheme = p.strip_scheme();
419
420                        let bucket_end = after_scheme.find(".s3.")?;
421                        let offset = bucket_end + 4;
422                        // Search after offset to prevent matching `.s3.amazonaws.com` (legacy global endpoint URL without region).
423                        let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
424
425                        // Do not convert if '?' (this can be query parameters for AWS presigned URLs).
426                        if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
427                            return None;
428                        }
429
430                        let bucket = &after_scheme[..bucket_end];
431                        let region = &after_scheme[bucket_end + 4..region_end];
432                        let key = &after_scheme[region_end + 15..];
433
434                        if let CloudConfig::Aws(configs) = cloud_options
435                            .get_or_insert_default()
436                            .config
437                            .get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
438                        {
439                            use object_store::aws::AmazonS3ConfigKey;
440
441                            if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
442                                configs.push((AmazonS3ConfigKey::Region, region.into()))
443                            }
444                        }
445
446                        Some(format!("s3://{bucket}/{key}"))
447                    })() {
448                        path = Cow::Owned(PlPath::from_string(p));
449                        rewrite_aws = true;
450                    }
451
452                    if !rewrite_aws {
453                        out_paths.push(path.into_owned());
454                        hive_idx_tracker.update(0, path_idx)?;
455                        continue;
456                    }
457                }
458
459                let glob_start_idx = get_glob_start_idx(path.to_str().as_bytes());
460
461                let path = if glob && glob_start_idx.is_some() {
462                    path.clone()
463                } else {
464                    let (expand_start_idx, paths) =
465                        expand_path_cloud(path.to_str(), cloud_options.as_ref())?;
466                    out_paths.extend_from_slice(&paths);
467                    hive_idx_tracker.update(expand_start_idx, path_idx)?;
468                    continue;
469                };
470
471                hive_idx_tracker.update(0, path_idx)?;
472
473                let iter = crate::pl_async::get_runtime()
474                    .block_in_place_on(crate::async_glob(path.to_str(), cloud_options.as_ref()))?;
475
476                if is_cloud {
477                    out_paths.extend(iter.into_iter().map(PlPath::from_string));
478                } else {
479                    // FORCE_ASYNC, remove leading file:// as not all readers support it.
480                    out_paths.extend(
481                        iter.iter()
482                            .map(|x| &x[7..])
483                            .map(|s| PlPathRef::new(s).into_owned()),
484                    )
485                }
486            }
487        }
488        #[cfg(not(feature = "cloud"))]
489        panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
490    } else {
491        let mut stack = VecDeque::new();
492
493        for path_idx in 0..paths.len() {
494            let path = paths[path_idx]
495                .as_ref()
496                .as_local_path()
497                .unwrap()
498                .to_path_buf();
499            stack.clear();
500
501            if path.is_dir() {
502                let i = path.to_str().unwrap().len();
503
504                hive_idx_tracker.update(i, path_idx)?;
505
506                stack.push_back(path.clone());
507
508                while let Some(dir) = stack.pop_front() {
509                    let mut paths = std::fs::read_dir(dir)
510                        .map_err(PolarsError::from)?
511                        .map(|x| x.map(|x| x.path()))
512                        .collect::<std::io::Result<Vec<_>>>()
513                        .map_err(PolarsError::from)?;
514                    paths.sort_unstable();
515
516                    for path in paths {
517                        if path.is_dir() {
518                            stack.push_back(path);
519                        } else if path.metadata()?.len() > 0 {
520                            out_paths.push(PlPath::Local(path.into()));
521                        }
522                    }
523                }
524
525                continue;
526            }
527
528            let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());
529
530            if glob && i.is_some() {
531                hive_idx_tracker.update(0, path_idx)?;
532
533                let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
534                    polars_bail!(ComputeError: "invalid glob pattern given")
535                };
536
537                for path in paths {
538                    let path = path.map_err(to_compute_err)?;
539                    if !path.is_dir() && path.metadata()?.len() > 0 {
540                        out_paths.push(PlPath::Local(path.into()));
541                    }
542                }
543            } else {
544                hive_idx_tracker.update(0, path_idx)?;
545                out_paths.push(PlPath::Local(path.into()));
546            }
547        }
548    }
549
550    assert_eq!(out_paths.current_idx, out_paths.paths.len());
551
552    if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
553        if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
554            polars_bail!(
555                InvalidOperation: r#"directory contained paths with different file extensions: \
556                first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
557                which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#,
558                &out_paths.paths[i1].display(), &out_paths.paths[i2].display()
559            )
560        }
561    }
562
563    Ok((out_paths.paths.into(), hive_idx_tracker.idx))
564}
565
566/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
567#[cfg(feature = "file_cache")]
568pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
569    let result = std::fs::create_dir_all(path);
570
571    if path.is_dir() { Ok(()) } else { result }
572}
573
574#[cfg(test)]
575mod tests {
576    use std::path::PathBuf;
577
578    use polars_utils::plpath::PlPath;
579
580    use super::resolve_homedir;
581
582    #[cfg(not(target_os = "windows"))]
583    #[test]
584    fn test_resolve_homedir() {
585        let paths: Vec<PathBuf> = vec![
586            "~/dir1/dir2/test.csv".into(),
587            "/abs/path/test.csv".into(),
588            "rel/path/test.csv".into(),
589            "/".into(),
590            "~".into(),
591        ];
592
593        let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
594
595        assert_eq!(resolved[0].file_name(), paths[0].file_name());
596        assert!(resolved[0].is_absolute());
597        assert_eq!(resolved[1], paths[1]);
598        assert_eq!(resolved[2], paths[2]);
599        assert_eq!(resolved[3], paths[3]);
600        assert!(resolved[4].is_absolute());
601    }
602
603    #[cfg(target_os = "windows")]
604    #[test]
605    fn test_resolve_homedir_windows() {
606        let paths: Vec<PathBuf> = vec![
607            r#"c:\Users\user1\test.csv"#.into(),
608            r#"~\user1\test.csv"#.into(),
609            "~".into(),
610        ];
611
612        let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
613
614        assert_eq!(resolved[0], paths[0]);
615        assert_eq!(resolved[1].file_name(), paths[1].file_name());
616        assert!(resolved[1].is_absolute());
617        assert!(resolved[2].is_absolute());
618    }
619
620    #[test]
621    fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
622        // Don't confuse HTTP URL's with query parameters for globs.
623        // See https://github.com/pola-rs/polars/pull/17774
624
625        use super::expand_paths;
626
627        let path = "https://pola.rs/test.csv?token=bear";
628        let paths = &[PlPath::new(path)];
629        let out = expand_paths(paths, true, &mut None).unwrap();
630        assert_eq!(out.as_ref(), paths);
631    }
632}