polars_io/utils/
byte_source.rs

1use std::ops::Range;
2use std::path::Path;
3use std::sync::Arc;
4
5use polars_buffer::Buffer;
6use polars_core::prelude::PlHashMap;
7use polars_error::{PolarsResult, feature_gated};
8use polars_utils::_limit_path_len_io_err;
9use polars_utils::mmap::MMapSemaphore;
10use polars_utils::pl_path::PlRefPath;
11
12use crate::cloud::options::CloudOptions;
13#[cfg(feature = "cloud")]
14use crate::cloud::{
15    CloudLocation, ObjectStorePath, PolarsObjectStore, build_object_store, object_path_from_str,
16};
17use crate::metrics::IOMetrics;
18
19#[allow(async_fn_in_trait)]
20pub trait ByteSource: Send + Sync {
21    async fn get_size(&self) -> PolarsResult<usize>;
22    /// # Panics
23    /// Panics if `range` is not in bounds.
24    async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>>;
25    /// Note: This will mutably sort ranges for coalescing.
26    async fn get_ranges(
27        &self,
28        ranges: &mut [Range<usize>],
29    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>>;
30}
31
32/// Byte source backed by a `Buffer`, which can potentially be memory-mapped.
33pub struct BufferByteSource(pub Buffer<u8>);
34
35impl BufferByteSource {
36    async fn try_new_mmap_from_path(
37        path: &Path,
38        _cloud_options: Option<&CloudOptions>,
39    ) -> PolarsResult<Self> {
40        let file = Arc::new(
41            tokio::fs::File::open(path)
42                .await
43                .map_err(|err| _limit_path_len_io_err(path, err))?
44                .into_std()
45                .await,
46        );
47
48        Ok(Self(Buffer::from_owner(MMapSemaphore::new_from_file(
49            &file,
50        )?)))
51    }
52}
53
54impl ByteSource for BufferByteSource {
55    async fn get_size(&self) -> PolarsResult<usize> {
56        Ok(self.0.as_ref().len())
57    }
58
59    async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
60        let out = self.0.clone().sliced(range);
61        Ok(out)
62    }
63
64    async fn get_ranges(
65        &self,
66        ranges: &mut [Range<usize>],
67    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
68        Ok(ranges
69            .iter()
70            .map(|x| (x.start, self.0.clone().sliced(x.clone())))
71            .collect())
72    }
73}
74
75#[cfg(feature = "cloud")]
76pub struct ObjectStoreByteSource {
77    store: PolarsObjectStore,
78    path: ObjectStorePath,
79}
80
81#[cfg(feature = "cloud")]
82impl ObjectStoreByteSource {
83    async fn try_new_from_path(
84        path: PlRefPath,
85        cloud_options: Option<&CloudOptions>,
86        io_metrics: Option<Arc<IOMetrics>>,
87    ) -> PolarsResult<Self> {
88        let (CloudLocation { prefix, .. }, mut store) =
89            build_object_store(path, cloud_options, false).await?;
90        let path = object_path_from_str(&prefix)?;
91
92        store.set_io_metrics(io_metrics);
93
94        Ok(Self { store, path })
95    }
96}
97
98#[cfg(feature = "cloud")]
99impl ByteSource for ObjectStoreByteSource {
100    async fn get_size(&self) -> PolarsResult<usize> {
101        Ok(self.store.head(&self.path).await?.size as usize)
102    }
103
104    async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
105        self.store.get_range(&self.path, range).await
106    }
107
108    async fn get_ranges(
109        &self,
110        ranges: &mut [Range<usize>],
111    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
112        self.store.get_ranges_sort(&self.path, ranges).await
113    }
114}
115
116/// Dynamic dispatch to async functions.
117pub enum DynByteSource {
118    Buffer(BufferByteSource),
119    #[cfg(feature = "cloud")]
120    Cloud(ObjectStoreByteSource),
121}
122
123impl DynByteSource {
124    pub fn variant_name(&self) -> &str {
125        match self {
126            Self::Buffer(_) => "Buffer",
127            #[cfg(feature = "cloud")]
128            Self::Cloud(_) => "Cloud",
129        }
130    }
131}
132
133impl Default for DynByteSource {
134    fn default() -> Self {
135        Self::Buffer(BufferByteSource(Buffer::new()))
136    }
137}
138
139impl ByteSource for DynByteSource {
140    async fn get_size(&self) -> PolarsResult<usize> {
141        match self {
142            Self::Buffer(v) => v.get_size().await,
143            #[cfg(feature = "cloud")]
144            Self::Cloud(v) => v.get_size().await,
145        }
146    }
147
148    async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
149        match self {
150            Self::Buffer(v) => v.get_range(range).await,
151            #[cfg(feature = "cloud")]
152            Self::Cloud(v) => v.get_range(range).await,
153        }
154    }
155
156    async fn get_ranges(
157        &self,
158        ranges: &mut [Range<usize>],
159    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
160        match self {
161            Self::Buffer(v) => v.get_ranges(ranges).await,
162            #[cfg(feature = "cloud")]
163            Self::Cloud(v) => v.get_ranges(ranges).await,
164        }
165    }
166}
167
168impl From<BufferByteSource> for DynByteSource {
169    fn from(value: BufferByteSource) -> Self {
170        Self::Buffer(value)
171    }
172}
173
174#[cfg(feature = "cloud")]
175impl From<ObjectStoreByteSource> for DynByteSource {
176    fn from(value: ObjectStoreByteSource) -> Self {
177        Self::Cloud(value)
178    }
179}
180
181impl From<Buffer<u8>> for DynByteSource {
182    fn from(value: Buffer<u8>) -> Self {
183        Self::Buffer(BufferByteSource(value))
184    }
185}
186
187#[derive(Clone, Debug)]
188pub enum DynByteSourceBuilder {
189    Mmap,
190    /// Supports both cloud and local files, requires cloud feature.
191    ObjectStore,
192}
193
194impl DynByteSourceBuilder {
195    pub async fn try_build_from_path(
196        &self,
197        path: PlRefPath,
198        cloud_options: Option<&CloudOptions>,
199        io_metrics: Option<Arc<IOMetrics>>,
200    ) -> PolarsResult<DynByteSource> {
201        Ok(match self {
202            Self::Mmap => {
203                BufferByteSource::try_new_mmap_from_path(path.as_std_path(), cloud_options)
204                    .await?
205                    .into()
206            },
207            Self::ObjectStore => feature_gated!("cloud", {
208                ObjectStoreByteSource::try_new_from_path(path, cloud_options, io_metrics)
209                    .await?
210                    .into()
211            }),
212        })
213    }
214}