polars_io/utils/
byte_source.rs

1use std::ops::Range;
2use std::sync::Arc;
3
4use polars_core::prelude::PlHashMap;
5use polars_error::PolarsResult;
6use polars_utils::_limit_path_len_io_err;
7use polars_utils::mmap::MemSlice;
8
9use crate::cloud::{
10    CloudLocation, CloudOptions, ObjectStorePath, PolarsObjectStore, build_object_store,
11    object_path_from_str,
12};
13
14#[allow(async_fn_in_trait)]
15pub trait ByteSource: Send + Sync {
16    async fn get_size(&self) -> PolarsResult<usize>;
17    /// # Panics
18    /// Panics if `range` is not in bounds.
19    async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice>;
20    /// Note: This will mutably sort ranges for coalescing.
21    async fn get_ranges(
22        &self,
23        ranges: &mut [Range<usize>],
24    ) -> PolarsResult<PlHashMap<usize, MemSlice>>;
25}
26
27/// Byte source backed by a `MemSlice`, which can potentially be memory-mapped.
28pub struct MemSliceByteSource(pub MemSlice);
29
30impl MemSliceByteSource {
31    async fn try_new_mmap_from_path(
32        path: &str,
33        _cloud_options: Option<&CloudOptions>,
34    ) -> PolarsResult<Self> {
35        let file = Arc::new(
36            tokio::fs::File::open(path)
37                .await
38                .map_err(|err| _limit_path_len_io_err(path.as_ref(), err))?
39                .into_std()
40                .await,
41        );
42
43        Ok(Self(MemSlice::from_file(file.as_ref())?))
44    }
45}
46
47impl ByteSource for MemSliceByteSource {
48    async fn get_size(&self) -> PolarsResult<usize> {
49        Ok(self.0.as_ref().len())
50    }
51
52    async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
53        let out = self.0.slice(range);
54        Ok(out)
55    }
56
57    async fn get_ranges(
58        &self,
59        ranges: &mut [Range<usize>],
60    ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
61        Ok(ranges
62            .iter()
63            .map(|x| (x.start, self.0.slice(x.clone())))
64            .collect())
65    }
66}
67
68pub struct ObjectStoreByteSource {
69    store: PolarsObjectStore,
70    path: ObjectStorePath,
71}
72
73impl ObjectStoreByteSource {
74    async fn try_new_from_path(
75        path: &str,
76        cloud_options: Option<&CloudOptions>,
77    ) -> PolarsResult<Self> {
78        let (CloudLocation { prefix, .. }, store) =
79            build_object_store(path, cloud_options, false).await?;
80        let path = object_path_from_str(&prefix)?;
81
82        Ok(Self { store, path })
83    }
84}
85
86impl ByteSource for ObjectStoreByteSource {
87    async fn get_size(&self) -> PolarsResult<usize> {
88        Ok(self.store.head(&self.path).await?.size)
89    }
90
91    async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
92        let bytes = self.store.get_range(&self.path, range).await?;
93        let mem_slice = MemSlice::from_bytes(bytes);
94
95        Ok(mem_slice)
96    }
97
98    async fn get_ranges(
99        &self,
100        ranges: &mut [Range<usize>],
101    ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
102        self.store.get_ranges_sort(&self.path, ranges).await
103    }
104}
105
106/// Dynamic dispatch to async functions.
107pub enum DynByteSource {
108    MemSlice(MemSliceByteSource),
109    Cloud(ObjectStoreByteSource),
110}
111
112impl DynByteSource {
113    pub fn variant_name(&self) -> &str {
114        match self {
115            Self::MemSlice(_) => "MemSlice",
116            Self::Cloud(_) => "Cloud",
117        }
118    }
119}
120
121impl Default for DynByteSource {
122    fn default() -> Self {
123        Self::MemSlice(MemSliceByteSource(MemSlice::default()))
124    }
125}
126
127impl ByteSource for DynByteSource {
128    async fn get_size(&self) -> PolarsResult<usize> {
129        match self {
130            Self::MemSlice(v) => v.get_size().await,
131            Self::Cloud(v) => v.get_size().await,
132        }
133    }
134
135    async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
136        match self {
137            Self::MemSlice(v) => v.get_range(range).await,
138            Self::Cloud(v) => v.get_range(range).await,
139        }
140    }
141
142    async fn get_ranges(
143        &self,
144        ranges: &mut [Range<usize>],
145    ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
146        match self {
147            Self::MemSlice(v) => v.get_ranges(ranges).await,
148            Self::Cloud(v) => v.get_ranges(ranges).await,
149        }
150    }
151}
152
153impl From<MemSliceByteSource> for DynByteSource {
154    fn from(value: MemSliceByteSource) -> Self {
155        Self::MemSlice(value)
156    }
157}
158
159impl From<ObjectStoreByteSource> for DynByteSource {
160    fn from(value: ObjectStoreByteSource) -> Self {
161        Self::Cloud(value)
162    }
163}
164
165impl From<MemSlice> for DynByteSource {
166    fn from(value: MemSlice) -> Self {
167        Self::MemSlice(MemSliceByteSource(value))
168    }
169}
170
171#[derive(Clone, Debug)]
172pub enum DynByteSourceBuilder {
173    Mmap,
174    /// Supports both cloud and local files.
175    ObjectStore,
176}
177
178impl DynByteSourceBuilder {
179    pub async fn try_build_from_path(
180        &self,
181        path: &str,
182        cloud_options: Option<&CloudOptions>,
183    ) -> PolarsResult<DynByteSource> {
184        Ok(match self {
185            Self::Mmap => MemSliceByteSource::try_new_mmap_from_path(path, cloud_options)
186                .await?
187                .into(),
188            Self::ObjectStore => ObjectStoreByteSource::try_new_from_path(path, cloud_options)
189                .await?
190                .into(),
191        })
192    }
193}