1use std::ffi::c_void;
2use std::fs::File;
3use std::io;
4use std::mem::ManuallyDrop;
5use std::sync::LazyLock;
6
7pub use memmap::Mmap;
8
9mod private {
10 use std::cmp;
11 use std::fs::File;
12 use std::ops::Deref;
13 use std::sync::Arc;
14
15 use polars_error::PolarsResult;
16
17 use super::MMapSemaphore;
18 use crate::mem::prefetch::prefetch_l2;
19
20 #[derive(Clone)]
28 pub struct MemSlice {
29 slice: &'static [u8],
34 #[allow(unused)]
35 inner: MemSliceInner,
36 }
37
38 #[derive(Clone)]
40 #[allow(unused)]
41 enum MemSliceInner {
42 Bytes(bytes::Bytes), Arc(Arc<dyn std::fmt::Debug + Send + Sync>),
44 }
45
46 impl Deref for MemSlice {
47 type Target = [u8];
48
49 #[inline(always)]
50 fn deref(&self) -> &Self::Target {
51 self.slice
52 }
53 }
54
55 impl AsRef<[u8]> for MemSlice {
56 #[inline(always)]
57 fn as_ref(&self) -> &[u8] {
58 self.slice
59 }
60 }
61
62 impl Default for MemSlice {
63 fn default() -> Self {
64 Self::from_bytes(bytes::Bytes::new())
65 }
66 }
67
68 impl From<Vec<u8>> for MemSlice {
69 fn from(value: Vec<u8>) -> Self {
70 Self::from_vec(value)
71 }
72 }
73
74 impl MemSlice {
75 pub const EMPTY: Self = Self::from_static(&[]);
76
77 #[inline(always)]
79 pub fn to_vec(self) -> Vec<u8> {
80 <[u8]>::to_vec(self.deref())
81 }
82
83 #[inline]
85 pub fn from_vec(v: Vec<u8>) -> Self {
86 Self::from_bytes(bytes::Bytes::from(v))
87 }
88
89 #[inline]
91 pub fn from_bytes(bytes: bytes::Bytes) -> Self {
92 Self {
93 slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes.as_ref()) },
94 inner: MemSliceInner::Bytes(bytes),
95 }
96 }
97
98 #[inline]
99 pub fn from_mmap(mmap: Arc<MMapSemaphore>) -> Self {
100 Self {
101 slice: unsafe {
102 std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref())
103 },
104 inner: MemSliceInner::Arc(mmap),
105 }
106 }
107
108 #[inline]
114 pub unsafe fn from_arc<T>(slice: &[u8], arc: Arc<T>) -> Self
115 where
116 T: std::fmt::Debug + Send + Sync + 'static,
117 {
118 Self {
119 slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) },
120 inner: MemSliceInner::Arc(arc),
121 }
122 }
123
124 #[inline]
125 pub fn from_file(file: &File) -> PolarsResult<Self> {
126 let mmap = MMapSemaphore::new_from_file(file)?;
127 Ok(Self::from_mmap(Arc::new(mmap)))
128 }
129
130 #[inline]
132 pub const fn from_static(slice: &'static [u8]) -> Self {
133 let inner = MemSliceInner::Bytes(bytes::Bytes::from_static(slice));
134 Self { slice, inner }
135 }
136
137 #[inline]
139 pub fn prefetch(&self) {
140 prefetch_l2(self.as_ref());
141 }
142
143 #[inline]
146 #[track_caller]
147 pub fn slice(&self, range: std::ops::Range<usize>) -> Self {
148 let mut out = self.clone();
149 out.slice = &out.slice[range];
150 out
151 }
152 }
153
154 impl From<bytes::Bytes> for MemSlice {
155 fn from(value: bytes::Bytes) -> Self {
156 Self::from_bytes(value)
157 }
158 }
159
160 impl std::fmt::Debug for MemSlice {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 const MAX_PRINT_BYTES: usize = 100;
163
164 write!(f, "MemSlice{{slice: ")?;
165
166 let end = cmp::min(self.slice.len(), MAX_PRINT_BYTES);
167 let truncated_slice = &self.slice[..end];
168
169 if let Ok(slice_as_str) = str::from_utf8(truncated_slice) {
170 write!(f, "{slice_as_str:?}")?;
171 } else {
172 write!(f, "{truncated_slice:?}(non-utf8)")?;
173 }
174
175 if self.slice.len() > MAX_PRINT_BYTES {
176 write!(f, " ... truncated ({MAX_PRINT_BYTES}/{})", self.slice.len())?;
177 }
178
179 write!(f, ", inner: ")?;
180
181 match &self.inner {
182 MemSliceInner::Arc(x) => {
183 write!(f, "Arc(refcount: {})", Arc::strong_count(x))?;
184 },
185 MemSliceInner::Bytes(x) => {
186 write!(f, "Bytes(unique: {})", x.is_unique())?;
187 },
188 }
189
190 write!(f, "}}")
191 }
192 }
193}
194
195use memmap::MmapOptions;
196use polars_error::PolarsResult;
197#[cfg(target_family = "unix")]
198use polars_error::polars_bail;
199pub use private::MemSlice;
200use rayon::{ThreadPool, ThreadPoolBuilder};
201
202use crate::mem::PAGE_SIZE;
203
204#[derive(Debug, Clone)]
206pub struct MemReader {
207 data: MemSlice,
208 position: usize,
209}
210
211impl MemReader {
212 pub fn new(data: MemSlice) -> Self {
213 Self { data, position: 0 }
214 }
215
216 #[inline(always)]
217 pub fn remaining_len(&self) -> usize {
218 self.data.len() - self.position
219 }
220
221 #[inline(always)]
222 pub fn total_len(&self) -> usize {
223 self.data.len()
224 }
225
226 #[inline(always)]
227 pub fn position(&self) -> usize {
228 self.position
229 }
230
231 #[inline(always)]
233 pub fn from_vec(v: Vec<u8>) -> Self {
234 Self::new(MemSlice::from_vec(v))
235 }
236
237 #[inline(always)]
239 pub fn from_bytes(bytes: bytes::Bytes) -> Self {
240 Self::new(MemSlice::from_bytes(bytes))
241 }
242
243 #[inline]
246 pub fn from_slice(slice: &'static [u8]) -> Self {
247 Self::new(MemSlice::from_static(slice))
248 }
249
250 #[inline(always)]
251 pub fn from_reader<R: io::Read>(mut reader: R) -> io::Result<Self> {
252 let mut vec = Vec::new();
253 reader.read_to_end(&mut vec)?;
254 Ok(Self::from_vec(vec))
255 }
256
257 #[inline(always)]
258 pub fn read_slice(&mut self, n: usize) -> MemSlice {
259 let start = self.position;
260 let end = usize::min(self.position + n, self.data.len());
261 self.position = end;
262 self.data.slice(start..end)
263 }
264}
265
266impl From<MemSlice> for MemReader {
267 fn from(data: MemSlice) -> Self {
268 Self { data, position: 0 }
269 }
270}
271
272impl io::Read for MemReader {
273 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
274 let n = usize::min(buf.len(), self.remaining_len());
275 buf[..n].copy_from_slice(&self.data[self.position..self.position + n]);
276 self.position += n;
277 Ok(n)
278 }
279}
280
281impl io::BufRead for MemReader {
286 fn fill_buf(&mut self) -> io::Result<&[u8]> {
287 Ok(&self.data[self.position..])
288 }
289
290 fn consume(&mut self, amount: usize) {
291 assert!(amount <= self.remaining_len());
292 self.position += amount;
293 }
294}
295
296impl io::Seek for MemReader {
297 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
298 let position = match pos {
299 io::SeekFrom::Start(position) => usize::min(position as usize, self.total_len()),
300 io::SeekFrom::End(offset) => {
301 let Some(position) = self.total_len().checked_add_signed(offset as isize) else {
302 return Err(io::Error::other("Seek before to before buffer"));
303 };
304
305 position
306 },
307 io::SeekFrom::Current(offset) => {
308 let Some(position) = self.position.checked_add_signed(offset as isize) else {
309 return Err(io::Error::other("Seek before to before buffer"));
310 };
311
312 position
313 },
314 };
315
316 self.position = position;
317
318 Ok(position as u64)
319 }
320}
321
322pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
323 let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
324 ThreadPoolBuilder::new()
325 .num_threads(1)
326 .thread_name(move |i| format!("{thread_name}-unmap-{i}"))
327 .build()
328 .expect("could not spawn threads")
329});
330
331#[cfg(target_family = "unix")]
335static MEMORY_MAPPED_FILES: std::sync::LazyLock<
336 std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
337> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
338
339#[derive(Debug)]
340pub struct MMapSemaphore {
341 #[cfg(target_family = "unix")]
342 key: (u64, u64),
343 mmap: ManuallyDrop<Mmap>,
344}
345
346impl Drop for MMapSemaphore {
347 fn drop(&mut self) {
348 #[cfg(target_family = "unix")]
349 {
350 let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
351 if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
352 let v = e.get_mut();
353 *v -= 1;
354
355 if *v == 0 {
356 e.remove_entry();
357 }
358 }
359 }
360
361 unsafe {
362 let mmap = ManuallyDrop::take(&mut self.mmap);
363 let len = self.mmap.len();
365 if len >= 1024 * 1024 {
366 UNMAP_POOL.spawn(move || {
367 #[cfg(target_family = "unix")]
368 {
369 let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
373 if len > chunk_size {
374 let mmap = ManuallyDrop::new(mmap);
375 let ptr: *const u8 = mmap.as_ptr();
376 let mut offset = 0;
377 while offset < len {
378 let remaining = len - offset;
379 libc::munmap(
380 ptr.add(offset) as *mut c_void,
381 remaining.min(chunk_size),
382 );
383 offset += chunk_size;
384 }
385 return;
386 }
387 }
388 drop(mmap)
389 });
390 } else {
391 drop(mmap);
392 }
393 }
394 }
395}
396
397impl MMapSemaphore {
398 pub fn new_from_file_with_options(
399 file: &File,
400 options: MmapOptions,
401 ) -> PolarsResult<MMapSemaphore> {
402 let mmap = match unsafe { options.map(file) } {
403 Ok(m) => m,
404
405 #[cfg(target_family = "unix")]
408 Err(e) if e.raw_os_error() == Some(libc::ENODEV) => unsafe {
409 options.map_copy_read_only(file)?
410 },
411
412 Err(e) => return Err(e.into()),
413 };
414
415 #[cfg(target_family = "unix")]
416 {
417 use std::os::unix::fs::MetadataExt;
420 let metadata = file.metadata()?;
421
422 let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
423 let key = (metadata.dev(), metadata.ino());
424 match guard.entry(key) {
425 std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
426 std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
427 }
428 Ok(Self {
429 key,
430 mmap: ManuallyDrop::new(mmap),
431 })
432 }
433
434 #[cfg(not(target_family = "unix"))]
435 Ok(Self {
436 mmap: ManuallyDrop::new(mmap),
437 })
438 }
439
440 pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
441 Self::new_from_file_with_options(file, MmapOptions::default())
442 }
443
444 pub fn as_ptr(&self) -> *const u8 {
445 self.mmap.as_ptr()
446 }
447}
448
449impl AsRef<[u8]> for MMapSemaphore {
450 #[inline]
451 fn as_ref(&self) -> &[u8] {
452 self.mmap.as_ref()
453 }
454}
455
456pub fn ensure_not_mapped(
457 #[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
458) -> PolarsResult<()> {
459 #[cfg(target_family = "unix")]
462 {
463 use std::os::unix::fs::MetadataExt;
464 let guard = MEMORY_MAPPED_FILES.lock().unwrap();
465 if guard.contains_key(&(file_md.dev(), file_md.ino())) {
466 polars_bail!(ComputeError: "cannot write to file: already memory mapped");
467 }
468 }
469 Ok(())
470}
471
472mod tests {
473 #[test]
474 fn test_mem_slice_zero_copy() {
475 use std::sync::Arc;
476
477 use super::MemSlice;
478
479 {
480 let vec = vec![1u8, 2, 3, 4, 5];
481 let ptr = vec.as_ptr();
482
483 let mem_slice = MemSlice::from_vec(vec);
484 let ptr_out = mem_slice.as_ptr();
485
486 assert_eq!(ptr_out, ptr);
487 }
488
489 {
490 let mut vec = vec![1u8, 2, 3, 4, 5];
491 vec.truncate(2);
492 let ptr = vec.as_ptr();
493
494 let mem_slice = MemSlice::from_vec(vec);
495 let ptr_out = mem_slice.as_ptr();
496
497 assert_eq!(ptr_out, ptr);
498 }
499
500 {
501 let bytes = bytes::Bytes::from(vec![1u8, 2, 3, 4, 5]);
502 let ptr = bytes.as_ptr();
503
504 let mem_slice = MemSlice::from_bytes(bytes);
505 let ptr_out = mem_slice.as_ptr();
506
507 assert_eq!(ptr_out, ptr);
508 }
509
510 {
511 use crate::mmap::MMapSemaphore;
512
513 let path = "../../examples/datasets/foods1.csv";
514 let file = std::fs::File::open(path).unwrap();
515 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
516 let ptr = mmap.as_ptr();
517
518 let mem_slice = MemSlice::from_mmap(Arc::new(mmap));
519 let ptr_out = mem_slice.as_ptr();
520
521 assert_eq!(ptr_out, ptr);
522 }
523
524 {
525 let vec = vec![1u8, 2, 3, 4, 5];
526 let slice = vec.as_slice();
527 let ptr = slice.as_ptr();
528
529 let mem_slice = MemSlice::from_static(unsafe {
530 std::mem::transmute::<&[u8], &'static [u8]>(slice)
531 });
532 let ptr_out = mem_slice.as_ptr();
533
534 assert_eq!(ptr_out, ptr);
535 }
536 }
537
538 #[test]
539 fn test_mem_slice_slicing() {
540 use super::MemSlice;
541
542 {
543 let vec = vec![1u8, 2, 3, 4, 5];
544 let slice = vec.as_slice();
545
546 let mem_slice = MemSlice::from_static(unsafe {
547 std::mem::transmute::<&[u8], &'static [u8]>(slice)
548 });
549
550 let out = &*mem_slice.slice(3..5);
551 assert_eq!(out, &slice[3..5]);
552 assert_eq!(out.as_ptr(), slice[3..5].as_ptr());
553 }
554 }
555}