use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufReader, Cursor, Read, Seek};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use memmap::Mmap;
use once_cell::sync::Lazy;
use polars_error::{polars_bail, PolarsResult};
use polars_utils::create_file;
static MEMORY_MAPPED_FILES: Lazy<Mutex<BTreeMap<PathBuf, u32>>> =
Lazy::new(|| Mutex::new(Default::default()));
pub(crate) struct MMapSemaphore {
path: PathBuf,
mmap: Mmap,
}
impl MMapSemaphore {
pub(super) fn new(path: PathBuf, mmap: Mmap) -> Self {
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
guard.insert(path.clone(), 1);
Self { path, mmap }
}
}
impl AsRef<[u8]> for MMapSemaphore {
#[inline]
fn as_ref(&self) -> &[u8] {
self.mmap.as_ref()
}
}
impl Drop for MMapSemaphore {
fn drop(&mut self) {
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
if let Entry::Occupied(mut e) = guard.entry(std::mem::take(&mut self.path)) {
let v = e.get_mut();
*v -= 1;
if *v == 0 {
e.remove_entry();
}
}
}
}
pub fn try_create_file(path: &Path) -> PolarsResult<File> {
let guard = MEMORY_MAPPED_FILES.lock().unwrap();
if guard.contains_key(path) {
polars_bail!(ComputeError: "cannot write to file: already memory mapped")
}
drop(guard);
create_file(path)
}
pub trait MmapBytesReader: Read + Seek + Send + Sync {
fn to_file(&self) -> Option<&File> {
None
}
fn to_bytes(&self) -> Option<&[u8]> {
None
}
}
impl MmapBytesReader for File {
fn to_file(&self) -> Option<&File> {
Some(self)
}
}
impl MmapBytesReader for BufReader<File> {
fn to_file(&self) -> Option<&File> {
Some(self.get_ref())
}
}
impl<T> MmapBytesReader for Cursor<T>
where
T: AsRef<[u8]> + Send + Sync,
{
fn to_bytes(&self) -> Option<&[u8]> {
Some(self.get_ref().as_ref())
}
}
impl<T: MmapBytesReader + ?Sized> MmapBytesReader for Box<T> {
fn to_file(&self) -> Option<&File> {
T::to_file(self)
}
fn to_bytes(&self) -> Option<&[u8]> {
T::to_bytes(self)
}
}
impl<T: MmapBytesReader> MmapBytesReader for &mut T {
fn to_file(&self) -> Option<&File> {
T::to_file(self)
}
fn to_bytes(&self) -> Option<&[u8]> {
T::to_bytes(self)
}
}
pub enum ReaderBytes<'a> {
Borrowed(&'a [u8]),
Owned(Vec<u8>),
Mapped(memmap::Mmap, &'a File),
}
impl std::ops::Deref for ReaderBytes<'_> {
type Target = [u8];
fn deref(&self) -> &[u8] {
match self {
Self::Borrowed(ref_bytes) => ref_bytes,
Self::Owned(vec) => vec,
Self::Mapped(mmap, _) => mmap,
}
}
}
impl<'a, T: 'a + MmapBytesReader> From<&'a T> for ReaderBytes<'a> {
fn from(m: &'a T) -> Self {
match m.to_bytes() {
Some(s) => ReaderBytes::Borrowed(s),
None => {
let f = m.to_file().unwrap();
let mmap = unsafe { memmap::Mmap::map(f).unwrap() };
ReaderBytes::Mapped(mmap, f)
},
}
}
}