polars_io/utils/
byte_source.rs1use std::ops::Range;
2use std::path::Path;
3use std::sync::Arc;
4
5use polars_buffer::Buffer;
6use polars_core::prelude::PlHashMap;
7use polars_error::{PolarsResult, feature_gated};
8use polars_utils::_limit_path_len_io_err;
9use polars_utils::mmap::MMapSemaphore;
10use polars_utils::pl_path::PlRefPath;
11
12use crate::cloud::options::CloudOptions;
13#[cfg(feature = "cloud")]
14use crate::cloud::{
15 CloudLocation, ObjectStorePath, PolarsObjectStore, build_object_store, object_path_from_str,
16};
17use crate::metrics::IOMetrics;
18
19#[allow(async_fn_in_trait)]
20pub trait ByteSource: Send + Sync {
21 async fn get_size(&self) -> PolarsResult<usize>;
22 async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>>;
25 async fn get_ranges(
27 &self,
28 ranges: &mut [Range<usize>],
29 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>>;
30}
31
32pub struct BufferByteSource(pub Buffer<u8>);
34
35impl BufferByteSource {
36 async fn try_new_mmap_from_path(
37 path: &Path,
38 _cloud_options: Option<&CloudOptions>,
39 ) -> PolarsResult<Self> {
40 let file = Arc::new(
41 tokio::fs::File::open(path)
42 .await
43 .map_err(|err| _limit_path_len_io_err(path, err))?
44 .into_std()
45 .await,
46 );
47
48 Ok(Self(Buffer::from_owner(MMapSemaphore::new_from_file(
49 &file,
50 )?)))
51 }
52}
53
54impl ByteSource for BufferByteSource {
55 async fn get_size(&self) -> PolarsResult<usize> {
56 Ok(self.0.as_ref().len())
57 }
58
59 async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
60 let out = self.0.clone().sliced(range);
61 Ok(out)
62 }
63
64 async fn get_ranges(
65 &self,
66 ranges: &mut [Range<usize>],
67 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
68 Ok(ranges
69 .iter()
70 .map(|x| (x.start, self.0.clone().sliced(x.clone())))
71 .collect())
72 }
73}
74
75#[cfg(feature = "cloud")]
76pub struct ObjectStoreByteSource {
77 store: PolarsObjectStore,
78 path: ObjectStorePath,
79}
80
81#[cfg(feature = "cloud")]
82impl ObjectStoreByteSource {
83 async fn try_new_from_path(
84 path: PlRefPath,
85 cloud_options: Option<&CloudOptions>,
86 io_metrics: Option<Arc<IOMetrics>>,
87 ) -> PolarsResult<Self> {
88 let (CloudLocation { prefix, .. }, mut store) =
89 build_object_store(path, cloud_options, false).await?;
90 let path = object_path_from_str(&prefix)?;
91
92 store.set_io_metrics(io_metrics);
93
94 Ok(Self { store, path })
95 }
96}
97
98#[cfg(feature = "cloud")]
99impl ByteSource for ObjectStoreByteSource {
100 async fn get_size(&self) -> PolarsResult<usize> {
101 Ok(self.store.head(&self.path).await?.size as usize)
102 }
103
104 async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
105 self.store.get_range(&self.path, range).await
106 }
107
108 async fn get_ranges(
109 &self,
110 ranges: &mut [Range<usize>],
111 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
112 self.store.get_ranges_sort(&self.path, ranges).await
113 }
114}
115
116pub enum DynByteSource {
118 Buffer(BufferByteSource),
119 #[cfg(feature = "cloud")]
120 Cloud(ObjectStoreByteSource),
121}
122
123impl DynByteSource {
124 pub fn variant_name(&self) -> &str {
125 match self {
126 Self::Buffer(_) => "Buffer",
127 #[cfg(feature = "cloud")]
128 Self::Cloud(_) => "Cloud",
129 }
130 }
131}
132
133impl Default for DynByteSource {
134 fn default() -> Self {
135 Self::Buffer(BufferByteSource(Buffer::new()))
136 }
137}
138
139impl ByteSource for DynByteSource {
140 async fn get_size(&self) -> PolarsResult<usize> {
141 match self {
142 Self::Buffer(v) => v.get_size().await,
143 #[cfg(feature = "cloud")]
144 Self::Cloud(v) => v.get_size().await,
145 }
146 }
147
148 async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
149 match self {
150 Self::Buffer(v) => v.get_range(range).await,
151 #[cfg(feature = "cloud")]
152 Self::Cloud(v) => v.get_range(range).await,
153 }
154 }
155
156 async fn get_ranges(
157 &self,
158 ranges: &mut [Range<usize>],
159 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
160 match self {
161 Self::Buffer(v) => v.get_ranges(ranges).await,
162 #[cfg(feature = "cloud")]
163 Self::Cloud(v) => v.get_ranges(ranges).await,
164 }
165 }
166}
167
168impl From<BufferByteSource> for DynByteSource {
169 fn from(value: BufferByteSource) -> Self {
170 Self::Buffer(value)
171 }
172}
173
174#[cfg(feature = "cloud")]
175impl From<ObjectStoreByteSource> for DynByteSource {
176 fn from(value: ObjectStoreByteSource) -> Self {
177 Self::Cloud(value)
178 }
179}
180
181impl From<Buffer<u8>> for DynByteSource {
182 fn from(value: Buffer<u8>) -> Self {
183 Self::Buffer(BufferByteSource(value))
184 }
185}
186
187#[derive(Clone, Debug)]
188pub enum DynByteSourceBuilder {
189 Mmap,
190 ObjectStore,
192}
193
194impl DynByteSourceBuilder {
195 pub async fn try_build_from_path(
196 &self,
197 path: PlRefPath,
198 cloud_options: Option<&CloudOptions>,
199 io_metrics: Option<Arc<IOMetrics>>,
200 ) -> PolarsResult<DynByteSource> {
201 Ok(match self {
202 Self::Mmap => {
203 BufferByteSource::try_new_mmap_from_path(path.as_std_path(), cloud_options)
204 .await?
205 .into()
206 },
207 Self::ObjectStore => feature_gated!("cloud", {
208 ObjectStoreByteSource::try_new_from_path(path, cloud_options, io_metrics)
209 .await?
210 .into()
211 }),
212 })
213 }
214}