polars_utils/
mmap.rs

1use std::ffi::c_void;
2use std::fs::File;
3use std::io;
4use std::mem::ManuallyDrop;
5use std::sync::LazyLock;
6
7pub use memmap::Mmap;
8
9mod private {
10    use std::cmp;
11    use std::fs::File;
12    use std::ops::Deref;
13    use std::sync::Arc;
14
15    use polars_error::PolarsResult;
16
17    use super::MMapSemaphore;
18    use crate::mem::prefetch::prefetch_l2;
19
20    /// A read-only reference to a slice of memory that can potentially be memory-mapped.
21    ///
22    /// A reference count is kept to the underlying buffer to ensure the memory is kept alive.
23    /// [`MemSlice::slice`] can be used to slice the memory in a zero-copy manner.
24    ///
25    /// This still owns the all the original memory and therefore should probably not be a long-lasting
26    /// structure.
27    #[derive(Clone)]
28    pub struct MemSlice {
29        // Store the `&[u8]` to make the `Deref` free.
30        // `slice` is not 'static - it is backed by `inner`. This is safe as long as `slice` is not
31        // directly accessed, and we are in a private module to guarantee that. Access should only
32        // be done through `Deref<Target = [u8]>`, which automatically gives the correct lifetime.
33        slice: &'static [u8],
34        #[allow(unused)]
35        inner: MemSliceInner,
36    }
37
38    /// Keeps the underlying buffer alive. This should be cheaply cloneable.
39    #[derive(Clone)]
40    #[allow(unused)]
41    enum MemSliceInner {
42        Bytes(bytes::Bytes), // Separate because it does atomic refcounting internally
43        Arc(Arc<dyn std::fmt::Debug + Send + Sync>),
44    }
45
46    impl Deref for MemSlice {
47        type Target = [u8];
48
49        #[inline(always)]
50        fn deref(&self) -> &Self::Target {
51            self.slice
52        }
53    }
54
55    impl AsRef<[u8]> for MemSlice {
56        #[inline(always)]
57        fn as_ref(&self) -> &[u8] {
58            self.slice
59        }
60    }
61
62    impl Default for MemSlice {
63        fn default() -> Self {
64            Self::from_bytes(bytes::Bytes::new())
65        }
66    }
67
68    impl From<Vec<u8>> for MemSlice {
69        fn from(value: Vec<u8>) -> Self {
70            Self::from_vec(value)
71        }
72    }
73
74    impl MemSlice {
75        pub const EMPTY: Self = Self::from_static(&[]);
76
77        /// Copy the contents into a new owned `Vec`
78        #[inline(always)]
79        pub fn to_vec(self) -> Vec<u8> {
80            <[u8]>::to_vec(self.deref())
81        }
82
83        /// Construct a `MemSlice` from an existing `Vec<u8>`. This is zero-copy.
84        #[inline]
85        pub fn from_vec(v: Vec<u8>) -> Self {
86            Self::from_bytes(bytes::Bytes::from(v))
87        }
88
89        /// Construct a `MemSlice` from [`bytes::Bytes`]. This is zero-copy.
90        #[inline]
91        pub fn from_bytes(bytes: bytes::Bytes) -> Self {
92            Self {
93                slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes.as_ref()) },
94                inner: MemSliceInner::Bytes(bytes),
95            }
96        }
97
98        #[inline]
99        pub fn from_mmap(mmap: Arc<MMapSemaphore>) -> Self {
100            Self {
101                slice: unsafe {
102                    std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref())
103                },
104                inner: MemSliceInner::Arc(mmap),
105            }
106        }
107
108        /// Construct a `MemSlice` from a temporary slice being kept alive by
109        /// the arc.
110        ///
111        /// # Safety
112        /// The slice must stay alive while the Arc does.
113        #[inline]
114        pub unsafe fn from_arc<T>(slice: &[u8], arc: Arc<T>) -> Self
115        where
116            T: std::fmt::Debug + Send + Sync + 'static,
117        {
118            Self {
119                slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) },
120                inner: MemSliceInner::Arc(arc),
121            }
122        }
123
124        #[inline]
125        pub fn from_file(file: &File) -> PolarsResult<Self> {
126            let mmap = MMapSemaphore::new_from_file(file)?;
127            Ok(Self::from_mmap(Arc::new(mmap)))
128        }
129
130        /// Construct a `MemSlice` that simply wraps around a `&[u8]`.
131        #[inline]
132        pub const fn from_static(slice: &'static [u8]) -> Self {
133            let inner = MemSliceInner::Bytes(bytes::Bytes::from_static(slice));
134            Self { slice, inner }
135        }
136
137        /// Attempt to prefetch the memory belonging to to this [`MemSlice`]
138        #[inline]
139        pub fn prefetch(&self) {
140            prefetch_l2(self.as_ref());
141        }
142
143        /// # Panics
144        /// Panics if range is not in bounds.
145        #[inline]
146        #[track_caller]
147        pub fn slice(&self, range: std::ops::Range<usize>) -> Self {
148            let mut out = self.clone();
149            out.slice = &out.slice[range];
150            out
151        }
152    }
153
154    impl From<bytes::Bytes> for MemSlice {
155        fn from(value: bytes::Bytes) -> Self {
156            Self::from_bytes(value)
157        }
158    }
159
160    impl std::fmt::Debug for MemSlice {
161        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162            const MAX_PRINT_BYTES: usize = 100;
163
164            write!(f, "MemSlice{{slice: ")?;
165
166            let end = cmp::min(self.slice.len(), MAX_PRINT_BYTES);
167            let truncated_slice = &self.slice[..end];
168
169            if let Ok(slice_as_str) = str::from_utf8(truncated_slice) {
170                write!(f, "{slice_as_str:?}")?;
171            } else {
172                write!(f, "{truncated_slice:?}(non-utf8)")?;
173            }
174
175            if self.slice.len() > MAX_PRINT_BYTES {
176                write!(f, " ... truncated ({MAX_PRINT_BYTES}/{})", self.slice.len())?;
177            }
178
179            write!(f, ", inner: ")?;
180
181            match &self.inner {
182                MemSliceInner::Arc(x) => {
183                    write!(f, "Arc(refcount: {})", Arc::strong_count(x))?;
184                },
185                MemSliceInner::Bytes(x) => {
186                    write!(f, "Bytes(unique: {})", x.is_unique())?;
187                },
188            }
189
190            write!(f, "}}")
191        }
192    }
193}
194
195use memmap::MmapOptions;
196use polars_error::PolarsResult;
197#[cfg(target_family = "unix")]
198use polars_error::polars_bail;
199pub use private::MemSlice;
200use rayon::{ThreadPool, ThreadPoolBuilder};
201
202use crate::mem::PAGE_SIZE;
203
204/// A cursor over a [`MemSlice`].
205#[derive(Debug, Clone)]
206pub struct MemReader {
207    data: MemSlice,
208    position: usize,
209}
210
211impl MemReader {
212    pub fn new(data: MemSlice) -> Self {
213        Self { data, position: 0 }
214    }
215
216    #[inline(always)]
217    pub fn remaining_len(&self) -> usize {
218        self.data.len() - self.position
219    }
220
221    #[inline(always)]
222    pub fn total_len(&self) -> usize {
223        self.data.len()
224    }
225
226    #[inline(always)]
227    pub fn position(&self) -> usize {
228        self.position
229    }
230
231    /// Construct a `MemSlice` from an existing `Vec<u8>`. This is zero-copy.
232    #[inline(always)]
233    pub fn from_vec(v: Vec<u8>) -> Self {
234        Self::new(MemSlice::from_vec(v))
235    }
236
237    /// Construct a `MemSlice` from [`bytes::Bytes`]. This is zero-copy.
238    #[inline(always)]
239    pub fn from_bytes(bytes: bytes::Bytes) -> Self {
240        Self::new(MemSlice::from_bytes(bytes))
241    }
242
243    // Construct a `MemSlice` that simply wraps around a `&[u8]`. The caller must ensure the
244    /// slice outlives the returned `MemSlice`.
245    #[inline]
246    pub fn from_slice(slice: &'static [u8]) -> Self {
247        Self::new(MemSlice::from_static(slice))
248    }
249
250    #[inline(always)]
251    pub fn from_reader<R: io::Read>(mut reader: R) -> io::Result<Self> {
252        let mut vec = Vec::new();
253        reader.read_to_end(&mut vec)?;
254        Ok(Self::from_vec(vec))
255    }
256
257    #[inline(always)]
258    pub fn read_slice(&mut self, n: usize) -> MemSlice {
259        let start = self.position;
260        let end = usize::min(self.position + n, self.data.len());
261        self.position = end;
262        self.data.slice(start..end)
263    }
264}
265
266impl From<MemSlice> for MemReader {
267    fn from(data: MemSlice) -> Self {
268        Self { data, position: 0 }
269    }
270}
271
272impl io::Read for MemReader {
273    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
274        let n = usize::min(buf.len(), self.remaining_len());
275        buf[..n].copy_from_slice(&self.data[self.position..self.position + n]);
276        self.position += n;
277        Ok(n)
278    }
279}
280
281/// Native implementation is more efficient than wrapping it in [`io::BufReader`]. The memory is
282/// already available as slice, duplicating the memory into a local buffer only adds an additional
283/// copy step. The number of total page-faults if the memory is backed by mmap will still be the
284/// same.
285impl io::BufRead for MemReader {
286    fn fill_buf(&mut self) -> io::Result<&[u8]> {
287        Ok(&self.data[self.position..])
288    }
289
290    fn consume(&mut self, amount: usize) {
291        assert!(amount <= self.remaining_len());
292        self.position += amount;
293    }
294}
295
296impl io::Seek for MemReader {
297    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
298        let position = match pos {
299            io::SeekFrom::Start(position) => usize::min(position as usize, self.total_len()),
300            io::SeekFrom::End(offset) => {
301                let Some(position) = self.total_len().checked_add_signed(offset as isize) else {
302                    return Err(io::Error::other("Seek before to before buffer"));
303                };
304
305                position
306            },
307            io::SeekFrom::Current(offset) => {
308                let Some(position) = self.position.checked_add_signed(offset as isize) else {
309                    return Err(io::Error::other("Seek before to before buffer"));
310                };
311
312                position
313            },
314        };
315
316        self.position = position;
317
318        Ok(position as u64)
319    }
320}
321
322pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
323    let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
324    ThreadPoolBuilder::new()
325        .num_threads(1)
326        .thread_name(move |i| format!("{thread_name}-unmap-{i}"))
327        .build()
328        .expect("could not spawn threads")
329});
330
331// Keep track of memory mapped files so we don't write to them while reading
332// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
333// Write handle in Windows is exclusive, so this is only necessary in Unix.
334#[cfg(target_family = "unix")]
335static MEMORY_MAPPED_FILES: std::sync::LazyLock<
336    std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
337> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
338
339#[derive(Debug)]
340pub struct MMapSemaphore {
341    #[cfg(target_family = "unix")]
342    key: (u64, u64),
343    mmap: ManuallyDrop<Mmap>,
344}
345
346impl Drop for MMapSemaphore {
347    fn drop(&mut self) {
348        #[cfg(target_family = "unix")]
349        {
350            let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
351            if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
352                let v = e.get_mut();
353                *v -= 1;
354
355                if *v == 0 {
356                    e.remove_entry();
357                }
358            }
359        }
360
361        unsafe {
362            let mmap = ManuallyDrop::take(&mut self.mmap);
363            // If the unmap is 1 MiB or bigger, we do it in a background thread.
364            let len = self.mmap.len();
365            if len >= 1024 * 1024 {
366                UNMAP_POOL.spawn(move || {
367                    #[cfg(target_family = "unix")]
368                    {
369                        // If the unmap is bigger than our chunk size (32 MiB), we do it in chunks.
370                        // This is because munmap holds a lock on the unmap file, which we don't
371                        // want to hold for extended periods of time.
372                        let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
373                        if len > chunk_size {
374                            let mmap = ManuallyDrop::new(mmap);
375                            let ptr: *const u8 = mmap.as_ptr();
376                            let mut offset = 0;
377                            while offset < len {
378                                let remaining = len - offset;
379                                libc::munmap(
380                                    ptr.add(offset) as *mut c_void,
381                                    remaining.min(chunk_size),
382                                );
383                                offset += chunk_size;
384                            }
385                            return;
386                        }
387                    }
388                    drop(mmap)
389                });
390            } else {
391                drop(mmap);
392            }
393        }
394    }
395}
396
397impl MMapSemaphore {
398    pub fn new_from_file_with_options(
399        file: &File,
400        options: MmapOptions,
401    ) -> PolarsResult<MMapSemaphore> {
402        let mmap = match unsafe { options.map(file) } {
403            Ok(m) => m,
404
405            // Mmap can fail with ENODEV on filesystems which don't support
406            // MAP_SHARED, try MAP_PRIVATE instead, see #24343.
407            #[cfg(target_family = "unix")]
408            Err(e) if e.raw_os_error() == Some(libc::ENODEV) => unsafe {
409                options.map_copy_read_only(file)?
410            },
411
412            Err(e) => return Err(e.into()),
413        };
414
415        #[cfg(target_family = "unix")]
416        {
417            // FIXME: We aren't handling the case where the file is already open in write-mode here.
418
419            use std::os::unix::fs::MetadataExt;
420            let metadata = file.metadata()?;
421
422            let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
423            let key = (metadata.dev(), metadata.ino());
424            match guard.entry(key) {
425                std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
426                std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
427            }
428            Ok(Self {
429                key,
430                mmap: ManuallyDrop::new(mmap),
431            })
432        }
433
434        #[cfg(not(target_family = "unix"))]
435        Ok(Self {
436            mmap: ManuallyDrop::new(mmap),
437        })
438    }
439
440    pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
441        Self::new_from_file_with_options(file, MmapOptions::default())
442    }
443
444    pub fn as_ptr(&self) -> *const u8 {
445        self.mmap.as_ptr()
446    }
447}
448
449impl AsRef<[u8]> for MMapSemaphore {
450    #[inline]
451    fn as_ref(&self) -> &[u8] {
452        self.mmap.as_ref()
453    }
454}
455
456pub fn ensure_not_mapped(
457    #[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
458) -> PolarsResult<()> {
459    // TODO: We need to actually register that this file has been write-opened and prevent
460    // read-opening this file based on that.
461    #[cfg(target_family = "unix")]
462    {
463        use std::os::unix::fs::MetadataExt;
464        let guard = MEMORY_MAPPED_FILES.lock().unwrap();
465        if guard.contains_key(&(file_md.dev(), file_md.ino())) {
466            polars_bail!(ComputeError: "cannot write to file: already memory mapped");
467        }
468    }
469    Ok(())
470}
471
472mod tests {
473    #[test]
474    fn test_mem_slice_zero_copy() {
475        use std::sync::Arc;
476
477        use super::MemSlice;
478
479        {
480            let vec = vec![1u8, 2, 3, 4, 5];
481            let ptr = vec.as_ptr();
482
483            let mem_slice = MemSlice::from_vec(vec);
484            let ptr_out = mem_slice.as_ptr();
485
486            assert_eq!(ptr_out, ptr);
487        }
488
489        {
490            let mut vec = vec![1u8, 2, 3, 4, 5];
491            vec.truncate(2);
492            let ptr = vec.as_ptr();
493
494            let mem_slice = MemSlice::from_vec(vec);
495            let ptr_out = mem_slice.as_ptr();
496
497            assert_eq!(ptr_out, ptr);
498        }
499
500        {
501            let bytes = bytes::Bytes::from(vec![1u8, 2, 3, 4, 5]);
502            let ptr = bytes.as_ptr();
503
504            let mem_slice = MemSlice::from_bytes(bytes);
505            let ptr_out = mem_slice.as_ptr();
506
507            assert_eq!(ptr_out, ptr);
508        }
509
510        {
511            use crate::mmap::MMapSemaphore;
512
513            let path = "../../examples/datasets/foods1.csv";
514            let file = std::fs::File::open(path).unwrap();
515            let mmap = MMapSemaphore::new_from_file(&file).unwrap();
516            let ptr = mmap.as_ptr();
517
518            let mem_slice = MemSlice::from_mmap(Arc::new(mmap));
519            let ptr_out = mem_slice.as_ptr();
520
521            assert_eq!(ptr_out, ptr);
522        }
523
524        {
525            let vec = vec![1u8, 2, 3, 4, 5];
526            let slice = vec.as_slice();
527            let ptr = slice.as_ptr();
528
529            let mem_slice = MemSlice::from_static(unsafe {
530                std::mem::transmute::<&[u8], &'static [u8]>(slice)
531            });
532            let ptr_out = mem_slice.as_ptr();
533
534            assert_eq!(ptr_out, ptr);
535        }
536    }
537
538    #[test]
539    fn test_mem_slice_slicing() {
540        use super::MemSlice;
541
542        {
543            let vec = vec![1u8, 2, 3, 4, 5];
544            let slice = vec.as_slice();
545
546            let mem_slice = MemSlice::from_static(unsafe {
547                std::mem::transmute::<&[u8], &'static [u8]>(slice)
548            });
549
550            let out = &*mem_slice.slice(3..5);
551            assert_eq!(out, &slice[3..5]);
552            assert_eq!(out.as_ptr(), slice[3..5].as_ptr());
553        }
554    }
555}