polars_utils/
mmap.rs

1use std::ffi::c_void;
2use std::fs::File;
3use std::mem::ManuallyDrop;
4use std::sync::LazyLock;
5
6pub use memmap::Mmap;
7use memmap::MmapOptions;
8use polars_error::PolarsResult;
9#[cfg(target_family = "unix")]
10use polars_error::polars_bail;
11use rayon::{ThreadPool, ThreadPoolBuilder};
12
13use crate::mem::PAGE_SIZE;
14
15pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
16    let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
17    ThreadPoolBuilder::new()
18        .num_threads(1)
19        .thread_name(move |i| format!("{thread_name}-unmap-{i}"))
20        .build()
21        .expect("could not spawn threads")
22});
23
24// Keep track of memory mapped files so we don't write to them while reading
25// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
26// Write handle in Windows is exclusive, so this is only necessary in Unix.
27#[cfg(target_family = "unix")]
28static MEMORY_MAPPED_FILES: std::sync::LazyLock<
29    std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
30> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
31
32#[derive(Debug)]
33pub struct MMapSemaphore {
34    #[cfg(target_family = "unix")]
35    key: (u64, u64),
36    mmap: ManuallyDrop<Mmap>,
37}
38
39impl Drop for MMapSemaphore {
40    fn drop(&mut self) {
41        #[cfg(target_family = "unix")]
42        {
43            let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
44            if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
45                let v = e.get_mut();
46                *v -= 1;
47
48                if *v == 0 {
49                    e.remove_entry();
50                }
51            }
52        }
53
54        unsafe {
55            let mmap = ManuallyDrop::take(&mut self.mmap);
56            // If the unmap is 1 MiB or bigger, we do it in a background thread.
57            let len = self.mmap.len();
58            if len >= 1024 * 1024 {
59                UNMAP_POOL.spawn(move || {
60                    #[cfg(target_family = "unix")]
61                    {
62                        // If the unmap is bigger than our chunk size (32 MiB), we do it in chunks.
63                        // This is because munmap holds a lock on the unmap file, which we don't
64                        // want to hold for extended periods of time.
65                        let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
66                        if len > chunk_size {
67                            let mmap = ManuallyDrop::new(mmap);
68                            let ptr: *const u8 = mmap.as_ptr();
69                            let mut offset = 0;
70                            while offset < len {
71                                let remaining = len - offset;
72                                libc::munmap(
73                                    ptr.add(offset) as *mut c_void,
74                                    remaining.min(chunk_size),
75                                );
76                                offset += chunk_size;
77                            }
78                            return;
79                        }
80                    }
81                    drop(mmap)
82                });
83            } else {
84                drop(mmap);
85            }
86        }
87    }
88}
89
90impl MMapSemaphore {
91    pub fn new_from_file_with_options(
92        file: &File,
93        options: MmapOptions,
94    ) -> PolarsResult<MMapSemaphore> {
95        let mmap = match unsafe { options.map(file) } {
96            Ok(m) => m,
97
98            // Mmap can fail with ENODEV on filesystems which don't support
99            // MAP_SHARED, try MAP_PRIVATE instead, see #24343.
100            #[cfg(target_family = "unix")]
101            Err(e) if e.raw_os_error() == Some(libc::ENODEV) => unsafe {
102                options.map_copy_read_only(file)?
103            },
104
105            Err(e) => return Err(e.into()),
106        };
107
108        #[cfg(target_family = "unix")]
109        {
110            // TODO: We aren't handling the case where the file is already open in write-mode here.
111
112            use std::os::unix::fs::MetadataExt;
113            let metadata = file.metadata()?;
114
115            let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
116            let key = (metadata.dev(), metadata.ino());
117            match guard.entry(key) {
118                std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
119                std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
120            }
121            Ok(Self {
122                key,
123                mmap: ManuallyDrop::new(mmap),
124            })
125        }
126
127        #[cfg(not(target_family = "unix"))]
128        Ok(Self {
129            mmap: ManuallyDrop::new(mmap),
130        })
131    }
132
133    pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
134        Self::new_from_file_with_options(file, MmapOptions::default())
135    }
136
137    pub fn as_ptr(&self) -> *const u8 {
138        self.mmap.as_ptr()
139    }
140}
141
142impl AsRef<[u8]> for MMapSemaphore {
143    #[inline]
144    fn as_ref(&self) -> &[u8] {
145        self.mmap.as_ref()
146    }
147}
148
149pub fn ensure_not_mapped(
150    #[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
151) -> PolarsResult<()> {
152    // TODO: We need to actually register that this file has been write-opened and prevent
153    // read-opening this file based on that.
154    #[cfg(target_family = "unix")]
155    {
156        use std::os::unix::fs::MetadataExt;
157        let guard = MEMORY_MAPPED_FILES.lock().unwrap();
158        if guard.contains_key(&(file_md.dev(), file_md.ino())) {
159            polars_bail!(ComputeError: "cannot write to file: already memory mapped");
160        }
161    }
162    Ok(())
163}