1use std::ffi::c_void;
2use std::fs::File;
3use std::io;
4use std::mem::ManuallyDrop;
5use std::sync::LazyLock;
6
7pub use memmap::Mmap;
8
9mod private {
10 use std::fs::File;
11 use std::ops::Deref;
12 use std::sync::Arc;
13
14 use polars_error::PolarsResult;
15
16 use super::MMapSemaphore;
17 use crate::mem::prefetch::prefetch_l2;
18
19 #[derive(Clone, Debug)]
27 pub struct MemSlice {
28 slice: &'static [u8],
33 #[allow(unused)]
34 inner: MemSliceInner,
35 }
36
37 #[derive(Clone, Debug)]
39 #[allow(unused)]
40 enum MemSliceInner {
41 Bytes(bytes::Bytes), Arc(Arc<dyn std::fmt::Debug + Send + Sync>),
43 }
44
45 impl Deref for MemSlice {
46 type Target = [u8];
47
48 #[inline(always)]
49 fn deref(&self) -> &Self::Target {
50 self.slice
51 }
52 }
53
54 impl AsRef<[u8]> for MemSlice {
55 #[inline(always)]
56 fn as_ref(&self) -> &[u8] {
57 self.slice
58 }
59 }
60
61 impl Default for MemSlice {
62 fn default() -> Self {
63 Self::from_bytes(bytes::Bytes::new())
64 }
65 }
66
67 impl From<Vec<u8>> for MemSlice {
68 fn from(value: Vec<u8>) -> Self {
69 Self::from_vec(value)
70 }
71 }
72
73 impl MemSlice {
74 pub const EMPTY: Self = Self::from_static(&[]);
75
76 #[inline(always)]
78 pub fn to_vec(self) -> Vec<u8> {
79 <[u8]>::to_vec(self.deref())
80 }
81
82 #[inline]
84 pub fn from_vec(v: Vec<u8>) -> Self {
85 Self::from_bytes(bytes::Bytes::from(v))
86 }
87
88 #[inline]
90 pub fn from_bytes(bytes: bytes::Bytes) -> Self {
91 Self {
92 slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes.as_ref()) },
93 inner: MemSliceInner::Bytes(bytes),
94 }
95 }
96
97 #[inline]
98 pub fn from_mmap(mmap: Arc<MMapSemaphore>) -> Self {
99 Self {
100 slice: unsafe {
101 std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref())
102 },
103 inner: MemSliceInner::Arc(mmap),
104 }
105 }
106
107 #[inline]
108 pub fn from_arc<T>(slice: &[u8], arc: Arc<T>) -> Self
109 where
110 T: std::fmt::Debug + Send + Sync + 'static,
111 {
112 Self {
113 slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) },
114 inner: MemSliceInner::Arc(arc),
115 }
116 }
117
118 #[inline]
119 pub fn from_file(file: &File) -> PolarsResult<Self> {
120 let mmap = MMapSemaphore::new_from_file(file)?;
121 Ok(Self::from_mmap(Arc::new(mmap)))
122 }
123
124 #[inline]
126 pub const fn from_static(slice: &'static [u8]) -> Self {
127 let inner = MemSliceInner::Bytes(bytes::Bytes::from_static(slice));
128 Self { slice, inner }
129 }
130
131 #[inline]
133 pub fn prefetch(&self) {
134 prefetch_l2(self.as_ref());
135 }
136
137 #[inline]
140 #[track_caller]
141 pub fn slice(&self, range: std::ops::Range<usize>) -> Self {
142 let mut out = self.clone();
143 out.slice = &out.slice[range];
144 out
145 }
146 }
147
148 impl From<bytes::Bytes> for MemSlice {
149 fn from(value: bytes::Bytes) -> Self {
150 Self::from_bytes(value)
151 }
152 }
153}
154
155use memmap::MmapOptions;
156use polars_error::PolarsResult;
157#[cfg(target_family = "unix")]
158use polars_error::polars_bail;
159pub use private::MemSlice;
160use rayon::{ThreadPool, ThreadPoolBuilder};
161
162use crate::mem::PAGE_SIZE;
163
164#[derive(Debug, Clone)]
166pub struct MemReader {
167 data: MemSlice,
168 position: usize,
169}
170
171impl MemReader {
172 pub fn new(data: MemSlice) -> Self {
173 Self { data, position: 0 }
174 }
175
176 #[inline(always)]
177 pub fn remaining_len(&self) -> usize {
178 self.data.len() - self.position
179 }
180
181 #[inline(always)]
182 pub fn total_len(&self) -> usize {
183 self.data.len()
184 }
185
186 #[inline(always)]
187 pub fn position(&self) -> usize {
188 self.position
189 }
190
191 #[inline(always)]
193 pub fn from_vec(v: Vec<u8>) -> Self {
194 Self::new(MemSlice::from_vec(v))
195 }
196
197 #[inline(always)]
199 pub fn from_bytes(bytes: bytes::Bytes) -> Self {
200 Self::new(MemSlice::from_bytes(bytes))
201 }
202
203 #[inline]
206 pub fn from_slice(slice: &'static [u8]) -> Self {
207 Self::new(MemSlice::from_static(slice))
208 }
209
210 #[inline(always)]
211 pub fn from_reader<R: io::Read>(mut reader: R) -> io::Result<Self> {
212 let mut vec = Vec::new();
213 reader.read_to_end(&mut vec)?;
214 Ok(Self::from_vec(vec))
215 }
216
217 #[inline(always)]
218 pub fn read_slice(&mut self, n: usize) -> MemSlice {
219 let start = self.position;
220 let end = usize::min(self.position + n, self.data.len());
221 self.position = end;
222 self.data.slice(start..end)
223 }
224}
225
226impl From<MemSlice> for MemReader {
227 fn from(data: MemSlice) -> Self {
228 Self { data, position: 0 }
229 }
230}
231
232impl io::Read for MemReader {
233 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
234 let n = usize::min(buf.len(), self.remaining_len());
235 buf[..n].copy_from_slice(&self.data[self.position..self.position + n]);
236 self.position += n;
237 Ok(n)
238 }
239}
240
241impl io::Seek for MemReader {
242 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
243 let position = match pos {
244 io::SeekFrom::Start(position) => usize::min(position as usize, self.total_len()),
245 io::SeekFrom::End(offset) => {
246 let Some(position) = self.total_len().checked_add_signed(offset as isize) else {
247 return Err(io::Error::other("Seek before to before buffer"));
248 };
249
250 position
251 },
252 io::SeekFrom::Current(offset) => {
253 let Some(position) = self.position.checked_add_signed(offset as isize) else {
254 return Err(io::Error::other("Seek before to before buffer"));
255 };
256
257 position
258 },
259 };
260
261 self.position = position;
262
263 Ok(position as u64)
264 }
265}
266
267pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
268 let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
269 ThreadPoolBuilder::new()
270 .num_threads(1)
271 .thread_name(move |i| format!("{thread_name}-unmap-{i}"))
272 .build()
273 .expect("could not spawn threads")
274});
275
276#[cfg(target_family = "unix")]
280static MEMORY_MAPPED_FILES: std::sync::LazyLock<
281 std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
282> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
283
284#[derive(Debug)]
285pub struct MMapSemaphore {
286 #[cfg(target_family = "unix")]
287 key: (u64, u64),
288 mmap: ManuallyDrop<Mmap>,
289}
290
291impl Drop for MMapSemaphore {
292 fn drop(&mut self) {
293 #[cfg(target_family = "unix")]
294 {
295 let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
296 if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
297 let v = e.get_mut();
298 *v -= 1;
299
300 if *v == 0 {
301 e.remove_entry();
302 }
303 }
304 }
305
306 unsafe {
307 let mmap = ManuallyDrop::take(&mut self.mmap);
308 let len = self.mmap.len();
310 if len >= 1024 * 1024 {
311 UNMAP_POOL.spawn(move || {
312 #[cfg(target_family = "unix")]
313 {
314 let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
318 if len > chunk_size {
319 let mmap = ManuallyDrop::new(mmap);
320 let ptr: *const u8 = mmap.as_ptr();
321 let mut offset = 0;
322 while offset < len {
323 let remaining = len - offset;
324 libc::munmap(
325 ptr.add(offset) as *mut c_void,
326 remaining.min(chunk_size),
327 );
328 offset += chunk_size;
329 }
330 return;
331 }
332 }
333 drop(mmap)
334 });
335 } else {
336 drop(mmap);
337 }
338 }
339 }
340}
341
342impl MMapSemaphore {
343 pub fn new_from_file_with_options(
344 file: &File,
345 options: MmapOptions,
346 ) -> PolarsResult<MMapSemaphore> {
347 let mmap = unsafe { options.map(file) }?;
348
349 #[cfg(target_family = "unix")]
350 {
351 use std::os::unix::fs::MetadataExt;
354 let metadata = file.metadata()?;
355
356 let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
357 let key = (metadata.dev(), metadata.ino());
358 match guard.entry(key) {
359 std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
360 std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
361 }
362 Ok(Self {
363 key,
364 mmap: ManuallyDrop::new(mmap),
365 })
366 }
367
368 #[cfg(not(target_family = "unix"))]
369 Ok(Self {
370 mmap: ManuallyDrop::new(mmap),
371 })
372 }
373
374 pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
375 Self::new_from_file_with_options(file, MmapOptions::default())
376 }
377
378 pub fn as_ptr(&self) -> *const u8 {
379 self.mmap.as_ptr()
380 }
381}
382
383impl AsRef<[u8]> for MMapSemaphore {
384 #[inline]
385 fn as_ref(&self) -> &[u8] {
386 self.mmap.as_ref()
387 }
388}
389
390pub fn ensure_not_mapped(
391 #[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
392) -> PolarsResult<()> {
393 #[cfg(target_family = "unix")]
396 {
397 use std::os::unix::fs::MetadataExt;
398 let guard = MEMORY_MAPPED_FILES.lock().unwrap();
399 if guard.contains_key(&(file_md.dev(), file_md.ino())) {
400 polars_bail!(ComputeError: "cannot write to file: already memory mapped");
401 }
402 }
403 Ok(())
404}
405
406mod tests {
407 #[test]
408 fn test_mem_slice_zero_copy() {
409 use std::sync::Arc;
410
411 use super::MemSlice;
412
413 {
414 let vec = vec![1u8, 2, 3, 4, 5];
415 let ptr = vec.as_ptr();
416
417 let mem_slice = MemSlice::from_vec(vec);
418 let ptr_out = mem_slice.as_ptr();
419
420 assert_eq!(ptr_out, ptr);
421 }
422
423 {
424 let mut vec = vec![1u8, 2, 3, 4, 5];
425 vec.truncate(2);
426 let ptr = vec.as_ptr();
427
428 let mem_slice = MemSlice::from_vec(vec);
429 let ptr_out = mem_slice.as_ptr();
430
431 assert_eq!(ptr_out, ptr);
432 }
433
434 {
435 let bytes = bytes::Bytes::from(vec![1u8, 2, 3, 4, 5]);
436 let ptr = bytes.as_ptr();
437
438 let mem_slice = MemSlice::from_bytes(bytes);
439 let ptr_out = mem_slice.as_ptr();
440
441 assert_eq!(ptr_out, ptr);
442 }
443
444 {
445 use crate::mmap::MMapSemaphore;
446
447 let path = "../../examples/datasets/foods1.csv";
448 let file = std::fs::File::open(path).unwrap();
449 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
450 let ptr = mmap.as_ptr();
451
452 let mem_slice = MemSlice::from_mmap(Arc::new(mmap));
453 let ptr_out = mem_slice.as_ptr();
454
455 assert_eq!(ptr_out, ptr);
456 }
457
458 {
459 let vec = vec![1u8, 2, 3, 4, 5];
460 let slice = vec.as_slice();
461 let ptr = slice.as_ptr();
462
463 let mem_slice = MemSlice::from_static(unsafe {
464 std::mem::transmute::<&[u8], &'static [u8]>(slice)
465 });
466 let ptr_out = mem_slice.as_ptr();
467
468 assert_eq!(ptr_out, ptr);
469 }
470 }
471
472 #[test]
473 fn test_mem_slice_slicing() {
474 use super::MemSlice;
475
476 {
477 let vec = vec![1u8, 2, 3, 4, 5];
478 let slice = vec.as_slice();
479
480 let mem_slice = MemSlice::from_static(unsafe {
481 std::mem::transmute::<&[u8], &'static [u8]>(slice)
482 });
483
484 let out = &*mem_slice.slice(3..5);
485 assert_eq!(out, &slice[3..5]);
486 assert_eq!(out.as_ptr(), slice[3..5].as_ptr());
487 }
488 }
489}