1use std::ffi::c_void;
2use std::fs::File;
3use std::mem::ManuallyDrop;
4use std::sync::LazyLock;
5
6pub use memmap::Mmap;
7use memmap::MmapOptions;
8use polars_error::PolarsResult;
9#[cfg(target_family = "unix")]
10use polars_error::polars_bail;
11use rayon::{ThreadPool, ThreadPoolBuilder};
12
13use crate::mem::PAGE_SIZE;
14
15pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
16 let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
17 ThreadPoolBuilder::new()
18 .num_threads(1)
19 .thread_name(move |i| format!("{thread_name}-unmap-{i}"))
20 .build()
21 .expect("could not spawn threads")
22});
23
24#[cfg(target_family = "unix")]
28static MEMORY_MAPPED_FILES: std::sync::LazyLock<
29 std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
30> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
31
32#[derive(Debug)]
33pub struct MMapSemaphore {
34 #[cfg(target_family = "unix")]
35 key: (u64, u64),
36 mmap: ManuallyDrop<Mmap>,
37}
38
39impl Drop for MMapSemaphore {
40 fn drop(&mut self) {
41 #[cfg(target_family = "unix")]
42 {
43 let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
44 if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
45 let v = e.get_mut();
46 *v -= 1;
47
48 if *v == 0 {
49 e.remove_entry();
50 }
51 }
52 }
53
54 unsafe {
55 let mmap = ManuallyDrop::take(&mut self.mmap);
56 let len = self.mmap.len();
58 if len >= 1024 * 1024 {
59 UNMAP_POOL.spawn(move || {
60 #[cfg(target_family = "unix")]
61 {
62 let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
66 if len > chunk_size {
67 let mmap = ManuallyDrop::new(mmap);
68 let ptr: *const u8 = mmap.as_ptr();
69 let mut offset = 0;
70 while offset < len {
71 let remaining = len - offset;
72 libc::munmap(
73 ptr.add(offset) as *mut c_void,
74 remaining.min(chunk_size),
75 );
76 offset += chunk_size;
77 }
78 return;
79 }
80 }
81 drop(mmap)
82 });
83 } else {
84 drop(mmap);
85 }
86 }
87 }
88}
89
90impl MMapSemaphore {
91 pub fn new_from_file_with_options(
92 file: &File,
93 options: MmapOptions,
94 ) -> PolarsResult<MMapSemaphore> {
95 let mmap = match unsafe { options.map(file) } {
96 Ok(m) => m,
97
98 #[cfg(target_family = "unix")]
101 Err(e) if e.raw_os_error() == Some(libc::ENODEV) => unsafe {
102 options.map_copy_read_only(file)?
103 },
104
105 Err(e) => return Err(e.into()),
106 };
107
108 #[cfg(target_family = "unix")]
109 {
110 use std::os::unix::fs::MetadataExt;
113 let metadata = file.metadata()?;
114
115 let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
116 let key = (metadata.dev(), metadata.ino());
117 match guard.entry(key) {
118 std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
119 std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
120 }
121 Ok(Self {
122 key,
123 mmap: ManuallyDrop::new(mmap),
124 })
125 }
126
127 #[cfg(not(target_family = "unix"))]
128 Ok(Self {
129 mmap: ManuallyDrop::new(mmap),
130 })
131 }
132
133 pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
134 Self::new_from_file_with_options(file, MmapOptions::default())
135 }
136
137 pub fn as_ptr(&self) -> *const u8 {
138 self.mmap.as_ptr()
139 }
140}
141
142impl AsRef<[u8]> for MMapSemaphore {
143 #[inline]
144 fn as_ref(&self) -> &[u8] {
145 self.mmap.as_ref()
146 }
147}
148
149pub fn ensure_not_mapped(
150 #[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
151) -> PolarsResult<()> {
152 #[cfg(target_family = "unix")]
155 {
156 use std::os::unix::fs::MetadataExt;
157 let guard = MEMORY_MAPPED_FILES.lock().unwrap();
158 if guard.contains_key(&(file_md.dev(), file_md.ino())) {
159 polars_bail!(ComputeError: "cannot write to file: already memory mapped");
160 }
161 }
162 Ok(())
163}