polars_utils/
mmap.rs

1use std::ffi::c_void;
2use std::fs::File;
3use std::io;
4use std::mem::ManuallyDrop;
5use std::sync::LazyLock;
6
7pub use memmap::Mmap;
8
9mod private {
10    use std::fs::File;
11    use std::ops::Deref;
12    use std::sync::Arc;
13
14    use polars_error::PolarsResult;
15
16    use super::MMapSemaphore;
17    use crate::mem::prefetch::prefetch_l2;
18
19    /// A read-only reference to a slice of memory that can potentially be memory-mapped.
20    ///
21    /// A reference count is kept to the underlying buffer to ensure the memory is kept alive.
22    /// [`MemSlice::slice`] can be used to slice the memory in a zero-copy manner.
23    ///
24    /// This still owns the all the original memory and therefore should probably not be a long-lasting
25    /// structure.
26    #[derive(Clone, Debug)]
27    pub struct MemSlice {
28        // Store the `&[u8]` to make the `Deref` free.
29        // `slice` is not 'static - it is backed by `inner`. This is safe as long as `slice` is not
30        // directly accessed, and we are in a private module to guarantee that. Access should only
31        // be done through `Deref<Target = [u8]>`, which automatically gives the correct lifetime.
32        slice: &'static [u8],
33        #[allow(unused)]
34        inner: MemSliceInner,
35    }
36
37    /// Keeps the underlying buffer alive. This should be cheaply cloneable.
38    #[derive(Clone, Debug)]
39    #[allow(unused)]
40    enum MemSliceInner {
41        Bytes(bytes::Bytes), // Separate because it does atomic refcounting internally
42        Arc(Arc<dyn std::fmt::Debug + Send + Sync>),
43    }
44
45    impl Deref for MemSlice {
46        type Target = [u8];
47
48        #[inline(always)]
49        fn deref(&self) -> &Self::Target {
50            self.slice
51        }
52    }
53
54    impl AsRef<[u8]> for MemSlice {
55        #[inline(always)]
56        fn as_ref(&self) -> &[u8] {
57            self.slice
58        }
59    }
60
61    impl Default for MemSlice {
62        fn default() -> Self {
63            Self::from_bytes(bytes::Bytes::new())
64        }
65    }
66
67    impl From<Vec<u8>> for MemSlice {
68        fn from(value: Vec<u8>) -> Self {
69            Self::from_vec(value)
70        }
71    }
72
73    impl MemSlice {
74        pub const EMPTY: Self = Self::from_static(&[]);
75
76        /// Copy the contents into a new owned `Vec`
77        #[inline(always)]
78        pub fn to_vec(self) -> Vec<u8> {
79            <[u8]>::to_vec(self.deref())
80        }
81
82        /// Construct a `MemSlice` from an existing `Vec<u8>`. This is zero-copy.
83        #[inline]
84        pub fn from_vec(v: Vec<u8>) -> Self {
85            Self::from_bytes(bytes::Bytes::from(v))
86        }
87
88        /// Construct a `MemSlice` from [`bytes::Bytes`]. This is zero-copy.
89        #[inline]
90        pub fn from_bytes(bytes: bytes::Bytes) -> Self {
91            Self {
92                slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes.as_ref()) },
93                inner: MemSliceInner::Bytes(bytes),
94            }
95        }
96
97        #[inline]
98        pub fn from_mmap(mmap: Arc<MMapSemaphore>) -> Self {
99            Self {
100                slice: unsafe {
101                    std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref())
102                },
103                inner: MemSliceInner::Arc(mmap),
104            }
105        }
106
107        #[inline]
108        pub fn from_arc<T>(slice: &[u8], arc: Arc<T>) -> Self
109        where
110            T: std::fmt::Debug + Send + Sync + 'static,
111        {
112            Self {
113                slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) },
114                inner: MemSliceInner::Arc(arc),
115            }
116        }
117
118        #[inline]
119        pub fn from_file(file: &File) -> PolarsResult<Self> {
120            let mmap = MMapSemaphore::new_from_file(file)?;
121            Ok(Self::from_mmap(Arc::new(mmap)))
122        }
123
124        /// Construct a `MemSlice` that simply wraps around a `&[u8]`.
125        #[inline]
126        pub const fn from_static(slice: &'static [u8]) -> Self {
127            let inner = MemSliceInner::Bytes(bytes::Bytes::from_static(slice));
128            Self { slice, inner }
129        }
130
131        /// Attempt to prefetch the memory belonging to to this [`MemSlice`]
132        #[inline]
133        pub fn prefetch(&self) {
134            prefetch_l2(self.as_ref());
135        }
136
137        /// # Panics
138        /// Panics if range is not in bounds.
139        #[inline]
140        #[track_caller]
141        pub fn slice(&self, range: std::ops::Range<usize>) -> Self {
142            let mut out = self.clone();
143            out.slice = &out.slice[range];
144            out
145        }
146    }
147
148    impl From<bytes::Bytes> for MemSlice {
149        fn from(value: bytes::Bytes) -> Self {
150            Self::from_bytes(value)
151        }
152    }
153}
154
155use memmap::MmapOptions;
156use polars_error::PolarsResult;
157#[cfg(target_family = "unix")]
158use polars_error::polars_bail;
159pub use private::MemSlice;
160use rayon::{ThreadPool, ThreadPoolBuilder};
161
162use crate::mem::PAGE_SIZE;
163
164/// A cursor over a [`MemSlice`].
165#[derive(Debug, Clone)]
166pub struct MemReader {
167    data: MemSlice,
168    position: usize,
169}
170
171impl MemReader {
172    pub fn new(data: MemSlice) -> Self {
173        Self { data, position: 0 }
174    }
175
176    #[inline(always)]
177    pub fn remaining_len(&self) -> usize {
178        self.data.len() - self.position
179    }
180
181    #[inline(always)]
182    pub fn total_len(&self) -> usize {
183        self.data.len()
184    }
185
186    #[inline(always)]
187    pub fn position(&self) -> usize {
188        self.position
189    }
190
191    /// Construct a `MemSlice` from an existing `Vec<u8>`. This is zero-copy.
192    #[inline(always)]
193    pub fn from_vec(v: Vec<u8>) -> Self {
194        Self::new(MemSlice::from_vec(v))
195    }
196
197    /// Construct a `MemSlice` from [`bytes::Bytes`]. This is zero-copy.
198    #[inline(always)]
199    pub fn from_bytes(bytes: bytes::Bytes) -> Self {
200        Self::new(MemSlice::from_bytes(bytes))
201    }
202
203    // Construct a `MemSlice` that simply wraps around a `&[u8]`. The caller must ensure the
204    /// slice outlives the returned `MemSlice`.
205    #[inline]
206    pub fn from_slice(slice: &'static [u8]) -> Self {
207        Self::new(MemSlice::from_static(slice))
208    }
209
210    #[inline(always)]
211    pub fn from_reader<R: io::Read>(mut reader: R) -> io::Result<Self> {
212        let mut vec = Vec::new();
213        reader.read_to_end(&mut vec)?;
214        Ok(Self::from_vec(vec))
215    }
216
217    #[inline(always)]
218    pub fn read_slice(&mut self, n: usize) -> MemSlice {
219        let start = self.position;
220        let end = usize::min(self.position + n, self.data.len());
221        self.position = end;
222        self.data.slice(start..end)
223    }
224}
225
226impl From<MemSlice> for MemReader {
227    fn from(data: MemSlice) -> Self {
228        Self { data, position: 0 }
229    }
230}
231
232impl io::Read for MemReader {
233    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
234        let n = usize::min(buf.len(), self.remaining_len());
235        buf[..n].copy_from_slice(&self.data[self.position..self.position + n]);
236        self.position += n;
237        Ok(n)
238    }
239}
240
241impl io::Seek for MemReader {
242    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
243        let position = match pos {
244            io::SeekFrom::Start(position) => usize::min(position as usize, self.total_len()),
245            io::SeekFrom::End(offset) => {
246                let Some(position) = self.total_len().checked_add_signed(offset as isize) else {
247                    return Err(io::Error::other("Seek before to before buffer"));
248                };
249
250                position
251            },
252            io::SeekFrom::Current(offset) => {
253                let Some(position) = self.position.checked_add_signed(offset as isize) else {
254                    return Err(io::Error::other("Seek before to before buffer"));
255                };
256
257                position
258            },
259        };
260
261        self.position = position;
262
263        Ok(position as u64)
264    }
265}
266
267pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
268    let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
269    ThreadPoolBuilder::new()
270        .num_threads(1)
271        .thread_name(move |i| format!("{thread_name}-unmap-{i}"))
272        .build()
273        .expect("could not spawn threads")
274});
275
276// Keep track of memory mapped files so we don't write to them while reading
277// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
278// Write handle in Windows is exclusive, so this is only necessary in Unix.
279#[cfg(target_family = "unix")]
280static MEMORY_MAPPED_FILES: std::sync::LazyLock<
281    std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
282> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
283
284#[derive(Debug)]
285pub struct MMapSemaphore {
286    #[cfg(target_family = "unix")]
287    key: (u64, u64),
288    mmap: ManuallyDrop<Mmap>,
289}
290
291impl Drop for MMapSemaphore {
292    fn drop(&mut self) {
293        #[cfg(target_family = "unix")]
294        {
295            let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
296            if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
297                let v = e.get_mut();
298                *v -= 1;
299
300                if *v == 0 {
301                    e.remove_entry();
302                }
303            }
304        }
305
306        unsafe {
307            let mmap = ManuallyDrop::take(&mut self.mmap);
308            // If the unmap is 1 MiB or bigger, we do it in a background thread.
309            let len = self.mmap.len();
310            if len >= 1024 * 1024 {
311                UNMAP_POOL.spawn(move || {
312                    #[cfg(target_family = "unix")]
313                    {
314                        // If the unmap is bigger than our chunk size (32 MiB), we do it in chunks.
315                        // This is because munmap holds a lock on the unmap file, which we don't
316                        // want to hold for extended periods of time.
317                        let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
318                        if len > chunk_size {
319                            let mmap = ManuallyDrop::new(mmap);
320                            let ptr: *const u8 = mmap.as_ptr();
321                            let mut offset = 0;
322                            while offset < len {
323                                let remaining = len - offset;
324                                libc::munmap(
325                                    ptr.add(offset) as *mut c_void,
326                                    remaining.min(chunk_size),
327                                );
328                                offset += chunk_size;
329                            }
330                            return;
331                        }
332                    }
333                    drop(mmap)
334                });
335            } else {
336                drop(mmap);
337            }
338        }
339    }
340}
341
342impl MMapSemaphore {
343    pub fn new_from_file_with_options(
344        file: &File,
345        options: MmapOptions,
346    ) -> PolarsResult<MMapSemaphore> {
347        let mmap = unsafe { options.map(file) }?;
348
349        #[cfg(target_family = "unix")]
350        {
351            // FIXME: We aren't handling the case where the file is already open in write-mode here.
352
353            use std::os::unix::fs::MetadataExt;
354            let metadata = file.metadata()?;
355
356            let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
357            let key = (metadata.dev(), metadata.ino());
358            match guard.entry(key) {
359                std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
360                std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
361            }
362            Ok(Self {
363                key,
364                mmap: ManuallyDrop::new(mmap),
365            })
366        }
367
368        #[cfg(not(target_family = "unix"))]
369        Ok(Self {
370            mmap: ManuallyDrop::new(mmap),
371        })
372    }
373
374    pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
375        Self::new_from_file_with_options(file, MmapOptions::default())
376    }
377
378    pub fn as_ptr(&self) -> *const u8 {
379        self.mmap.as_ptr()
380    }
381}
382
383impl AsRef<[u8]> for MMapSemaphore {
384    #[inline]
385    fn as_ref(&self) -> &[u8] {
386        self.mmap.as_ref()
387    }
388}
389
390pub fn ensure_not_mapped(
391    #[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
392) -> PolarsResult<()> {
393    // TODO: We need to actually register that this file has been write-opened and prevent
394    // read-opening this file based on that.
395    #[cfg(target_family = "unix")]
396    {
397        use std::os::unix::fs::MetadataExt;
398        let guard = MEMORY_MAPPED_FILES.lock().unwrap();
399        if guard.contains_key(&(file_md.dev(), file_md.ino())) {
400            polars_bail!(ComputeError: "cannot write to file: already memory mapped");
401        }
402    }
403    Ok(())
404}
405
406mod tests {
407    #[test]
408    fn test_mem_slice_zero_copy() {
409        use std::sync::Arc;
410
411        use super::MemSlice;
412
413        {
414            let vec = vec![1u8, 2, 3, 4, 5];
415            let ptr = vec.as_ptr();
416
417            let mem_slice = MemSlice::from_vec(vec);
418            let ptr_out = mem_slice.as_ptr();
419
420            assert_eq!(ptr_out, ptr);
421        }
422
423        {
424            let mut vec = vec![1u8, 2, 3, 4, 5];
425            vec.truncate(2);
426            let ptr = vec.as_ptr();
427
428            let mem_slice = MemSlice::from_vec(vec);
429            let ptr_out = mem_slice.as_ptr();
430
431            assert_eq!(ptr_out, ptr);
432        }
433
434        {
435            let bytes = bytes::Bytes::from(vec![1u8, 2, 3, 4, 5]);
436            let ptr = bytes.as_ptr();
437
438            let mem_slice = MemSlice::from_bytes(bytes);
439            let ptr_out = mem_slice.as_ptr();
440
441            assert_eq!(ptr_out, ptr);
442        }
443
444        {
445            use crate::mmap::MMapSemaphore;
446
447            let path = "../../examples/datasets/foods1.csv";
448            let file = std::fs::File::open(path).unwrap();
449            let mmap = MMapSemaphore::new_from_file(&file).unwrap();
450            let ptr = mmap.as_ptr();
451
452            let mem_slice = MemSlice::from_mmap(Arc::new(mmap));
453            let ptr_out = mem_slice.as_ptr();
454
455            assert_eq!(ptr_out, ptr);
456        }
457
458        {
459            let vec = vec![1u8, 2, 3, 4, 5];
460            let slice = vec.as_slice();
461            let ptr = slice.as_ptr();
462
463            let mem_slice = MemSlice::from_static(unsafe {
464                std::mem::transmute::<&[u8], &'static [u8]>(slice)
465            });
466            let ptr_out = mem_slice.as_ptr();
467
468            assert_eq!(ptr_out, ptr);
469        }
470    }
471
472    #[test]
473    fn test_mem_slice_slicing() {
474        use super::MemSlice;
475
476        {
477            let vec = vec![1u8, 2, 3, 4, 5];
478            let slice = vec.as_slice();
479
480            let mem_slice = MemSlice::from_static(unsafe {
481                std::mem::transmute::<&[u8], &'static [u8]>(slice)
482            });
483
484            let out = &*mem_slice.slice(3..5);
485            assert_eq!(out, &slice[3..5]);
486            assert_eq!(out.as_ptr(), slice[3..5].as_ptr());
487        }
488    }
489}