1use std::ffi::c_void;
2use std::fs::File;
3use std::io;
4use std::mem::ManuallyDrop;
5use std::sync::LazyLock;
6
7pub use memmap::Mmap;
8
9mod private {
10 use std::fs::File;
11 use std::ops::Deref;
12 use std::sync::Arc;
13
14 use polars_error::PolarsResult;
15
16 use super::MMapSemaphore;
17 use crate::mem::prefetch::prefetch_l2;
18
19 #[derive(Clone, Debug)]
27 pub struct MemSlice {
28 slice: &'static [u8],
33 #[allow(unused)]
34 inner: MemSliceInner,
35 }
36
37 #[derive(Clone, Debug)]
39 #[allow(unused)]
40 enum MemSliceInner {
41 Bytes(bytes::Bytes), Arc(Arc<dyn std::fmt::Debug + Send + Sync>),
43 }
44
45 impl Deref for MemSlice {
46 type Target = [u8];
47
48 #[inline(always)]
49 fn deref(&self) -> &Self::Target {
50 self.slice
51 }
52 }
53
54 impl AsRef<[u8]> for MemSlice {
55 #[inline(always)]
56 fn as_ref(&self) -> &[u8] {
57 self.slice
58 }
59 }
60
61 impl Default for MemSlice {
62 fn default() -> Self {
63 Self::from_bytes(bytes::Bytes::new())
64 }
65 }
66
67 impl From<Vec<u8>> for MemSlice {
68 fn from(value: Vec<u8>) -> Self {
69 Self::from_vec(value)
70 }
71 }
72
73 impl MemSlice {
74 pub const EMPTY: Self = Self::from_static(&[]);
75
76 #[inline(always)]
78 pub fn to_vec(self) -> Vec<u8> {
79 <[u8]>::to_vec(self.deref())
80 }
81
82 #[inline]
84 pub fn from_vec(v: Vec<u8>) -> Self {
85 Self::from_bytes(bytes::Bytes::from(v))
86 }
87
88 #[inline]
90 pub fn from_bytes(bytes: bytes::Bytes) -> Self {
91 Self {
92 slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes.as_ref()) },
93 inner: MemSliceInner::Bytes(bytes),
94 }
95 }
96
97 #[inline]
98 pub fn from_mmap(mmap: Arc<MMapSemaphore>) -> Self {
99 Self {
100 slice: unsafe {
101 std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref())
102 },
103 inner: MemSliceInner::Arc(mmap),
104 }
105 }
106
107 #[inline]
108 pub fn from_arc<T>(slice: &[u8], arc: Arc<T>) -> Self
109 where
110 T: std::fmt::Debug + Send + Sync + 'static,
111 {
112 Self {
113 slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) },
114 inner: MemSliceInner::Arc(arc),
115 }
116 }
117
118 #[inline]
119 pub fn from_file(file: &File) -> PolarsResult<Self> {
120 let mmap = MMapSemaphore::new_from_file(file)?;
121 Ok(Self::from_mmap(Arc::new(mmap)))
122 }
123
124 #[inline]
126 pub const fn from_static(slice: &'static [u8]) -> Self {
127 let inner = MemSliceInner::Bytes(bytes::Bytes::from_static(slice));
128 Self { slice, inner }
129 }
130
131 #[inline]
133 pub fn prefetch(&self) {
134 prefetch_l2(self.as_ref());
135 }
136
137 #[inline]
140 #[track_caller]
141 pub fn slice(&self, range: std::ops::Range<usize>) -> Self {
142 let mut out = self.clone();
143 out.slice = &out.slice[range];
144 out
145 }
146 }
147
148 impl From<bytes::Bytes> for MemSlice {
149 fn from(value: bytes::Bytes) -> Self {
150 Self::from_bytes(value)
151 }
152 }
153}
154
155use memmap::MmapOptions;
156use polars_error::PolarsResult;
157#[cfg(target_family = "unix")]
158use polars_error::polars_bail;
159pub use private::MemSlice;
160use rayon::{ThreadPool, ThreadPoolBuilder};
161
162use crate::mem::PAGE_SIZE;
163
164#[derive(Debug, Clone)]
166pub struct MemReader {
167 data: MemSlice,
168 position: usize,
169}
170
171impl MemReader {
172 pub fn new(data: MemSlice) -> Self {
173 Self { data, position: 0 }
174 }
175
176 #[inline(always)]
177 pub fn remaining_len(&self) -> usize {
178 self.data.len() - self.position
179 }
180
181 #[inline(always)]
182 pub fn total_len(&self) -> usize {
183 self.data.len()
184 }
185
186 #[inline(always)]
187 pub fn position(&self) -> usize {
188 self.position
189 }
190
191 #[inline(always)]
193 pub fn from_vec(v: Vec<u8>) -> Self {
194 Self::new(MemSlice::from_vec(v))
195 }
196
197 #[inline(always)]
199 pub fn from_bytes(bytes: bytes::Bytes) -> Self {
200 Self::new(MemSlice::from_bytes(bytes))
201 }
202
203 #[inline]
206 pub fn from_slice(slice: &'static [u8]) -> Self {
207 Self::new(MemSlice::from_static(slice))
208 }
209
210 #[inline(always)]
211 pub fn from_reader<R: io::Read>(mut reader: R) -> io::Result<Self> {
212 let mut vec = Vec::new();
213 reader.read_to_end(&mut vec)?;
214 Ok(Self::from_vec(vec))
215 }
216
217 #[inline(always)]
218 pub fn read_slice(&mut self, n: usize) -> MemSlice {
219 let start = self.position;
220 let end = usize::min(self.position + n, self.data.len());
221 self.position = end;
222 self.data.slice(start..end)
223 }
224}
225
226impl From<MemSlice> for MemReader {
227 fn from(data: MemSlice) -> Self {
228 Self { data, position: 0 }
229 }
230}
231
232impl io::Read for MemReader {
233 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
234 let n = usize::min(buf.len(), self.remaining_len());
235 buf[..n].copy_from_slice(&self.data[self.position..self.position + n]);
236 self.position += n;
237 Ok(n)
238 }
239}
240
241impl io::Seek for MemReader {
242 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
243 let position = match pos {
244 io::SeekFrom::Start(position) => usize::min(position as usize, self.total_len()),
245 io::SeekFrom::End(offset) => {
246 let Some(position) = self.total_len().checked_add_signed(offset as isize) else {
247 return Err(io::Error::other("Seek before to before buffer"));
248 };
249
250 position
251 },
252 io::SeekFrom::Current(offset) => {
253 let Some(position) = self.position.checked_add_signed(offset as isize) else {
254 return Err(io::Error::other("Seek before to before buffer"));
255 };
256
257 position
258 },
259 };
260
261 self.position = position;
262
263 Ok(position as u64)
264 }
265}
266
267pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
268 let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
269 ThreadPoolBuilder::new()
270 .num_threads(1)
271 .thread_name(move |i| format!("{thread_name}-unmap-{i}"))
272 .build()
273 .expect("could not spawn threads")
274});
275
276#[cfg(target_family = "unix")]
280static MEMORY_MAPPED_FILES: std::sync::LazyLock<
281 std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
282> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
283
284#[derive(Debug)]
285pub struct MMapSemaphore {
286 #[cfg(target_family = "unix")]
287 key: (u64, u64),
288 mmap: ManuallyDrop<Mmap>,
289}
290
291impl Drop for MMapSemaphore {
292 fn drop(&mut self) {
293 #[cfg(target_family = "unix")]
294 {
295 let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
296 if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
297 let v = e.get_mut();
298 *v -= 1;
299
300 if *v == 0 {
301 e.remove_entry();
302 }
303 }
304 }
305
306 unsafe {
307 let mmap = ManuallyDrop::take(&mut self.mmap);
308 let len = self.mmap.len();
310 if len >= 1024 * 1024 {
311 UNMAP_POOL.spawn(move || {
312 #[cfg(target_family = "unix")]
313 {
314 let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
318 if len > chunk_size {
319 let mmap = ManuallyDrop::new(mmap);
320 let ptr: *const u8 = mmap.as_ptr();
321 let mut offset = 0;
322 while offset < len {
323 let remaining = len - offset;
324 libc::munmap(
325 ptr.add(offset) as *mut c_void,
326 remaining.min(chunk_size),
327 );
328 offset += chunk_size;
329 }
330 return;
331 }
332 }
333 drop(mmap)
334 });
335 } else {
336 drop(mmap);
337 }
338 }
339 }
340}
341
342impl MMapSemaphore {
343 pub fn new_from_file_with_options(
344 file: &File,
345 options: MmapOptions,
346 ) -> PolarsResult<MMapSemaphore> {
347 let mmap = match unsafe { options.map(file) } {
348 Ok(m) => m,
349
350 #[cfg(target_family = "unix")]
353 Err(e) if e.raw_os_error() == Some(libc::ENODEV) => unsafe {
354 options.map_copy_read_only(file)?
355 },
356
357 Err(e) => return Err(e.into()),
358 };
359
360 #[cfg(target_family = "unix")]
361 {
362 use std::os::unix::fs::MetadataExt;
365 let metadata = file.metadata()?;
366
367 let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
368 let key = (metadata.dev(), metadata.ino());
369 match guard.entry(key) {
370 std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
371 std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
372 }
373 Ok(Self {
374 key,
375 mmap: ManuallyDrop::new(mmap),
376 })
377 }
378
379 #[cfg(not(target_family = "unix"))]
380 Ok(Self {
381 mmap: ManuallyDrop::new(mmap),
382 })
383 }
384
385 pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
386 Self::new_from_file_with_options(file, MmapOptions::default())
387 }
388
389 pub fn as_ptr(&self) -> *const u8 {
390 self.mmap.as_ptr()
391 }
392}
393
394impl AsRef<[u8]> for MMapSemaphore {
395 #[inline]
396 fn as_ref(&self) -> &[u8] {
397 self.mmap.as_ref()
398 }
399}
400
401pub fn ensure_not_mapped(
402 #[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
403) -> PolarsResult<()> {
404 #[cfg(target_family = "unix")]
407 {
408 use std::os::unix::fs::MetadataExt;
409 let guard = MEMORY_MAPPED_FILES.lock().unwrap();
410 if guard.contains_key(&(file_md.dev(), file_md.ino())) {
411 polars_bail!(ComputeError: "cannot write to file: already memory mapped");
412 }
413 }
414 Ok(())
415}
416
417mod tests {
418 #[test]
419 fn test_mem_slice_zero_copy() {
420 use std::sync::Arc;
421
422 use super::MemSlice;
423
424 {
425 let vec = vec![1u8, 2, 3, 4, 5];
426 let ptr = vec.as_ptr();
427
428 let mem_slice = MemSlice::from_vec(vec);
429 let ptr_out = mem_slice.as_ptr();
430
431 assert_eq!(ptr_out, ptr);
432 }
433
434 {
435 let mut vec = vec![1u8, 2, 3, 4, 5];
436 vec.truncate(2);
437 let ptr = vec.as_ptr();
438
439 let mem_slice = MemSlice::from_vec(vec);
440 let ptr_out = mem_slice.as_ptr();
441
442 assert_eq!(ptr_out, ptr);
443 }
444
445 {
446 let bytes = bytes::Bytes::from(vec![1u8, 2, 3, 4, 5]);
447 let ptr = bytes.as_ptr();
448
449 let mem_slice = MemSlice::from_bytes(bytes);
450 let ptr_out = mem_slice.as_ptr();
451
452 assert_eq!(ptr_out, ptr);
453 }
454
455 {
456 use crate::mmap::MMapSemaphore;
457
458 let path = "../../examples/datasets/foods1.csv";
459 let file = std::fs::File::open(path).unwrap();
460 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
461 let ptr = mmap.as_ptr();
462
463 let mem_slice = MemSlice::from_mmap(Arc::new(mmap));
464 let ptr_out = mem_slice.as_ptr();
465
466 assert_eq!(ptr_out, ptr);
467 }
468
469 {
470 let vec = vec![1u8, 2, 3, 4, 5];
471 let slice = vec.as_slice();
472 let ptr = slice.as_ptr();
473
474 let mem_slice = MemSlice::from_static(unsafe {
475 std::mem::transmute::<&[u8], &'static [u8]>(slice)
476 });
477 let ptr_out = mem_slice.as_ptr();
478
479 assert_eq!(ptr_out, ptr);
480 }
481 }
482
483 #[test]
484 fn test_mem_slice_slicing() {
485 use super::MemSlice;
486
487 {
488 let vec = vec![1u8, 2, 3, 4, 5];
489 let slice = vec.as_slice();
490
491 let mem_slice = MemSlice::from_static(unsafe {
492 std::mem::transmute::<&[u8], &'static [u8]>(slice)
493 });
494
495 let out = &*mem_slice.slice(3..5);
496 assert_eq!(out, &slice[3..5]);
497 assert_eq!(out.as_ptr(), slice[3..5].as_ptr());
498 }
499 }
500}