polars_io/utils/
byte_source.rs1use std::ops::Range;
2use std::sync::Arc;
3
4use polars_core::prelude::PlHashMap;
5use polars_error::PolarsResult;
6use polars_utils::_limit_path_len_io_err;
7use polars_utils::mmap::MemSlice;
8
9use crate::cloud::{
10 CloudLocation, CloudOptions, ObjectStorePath, PolarsObjectStore, build_object_store,
11 object_path_from_str,
12};
13
14#[allow(async_fn_in_trait)]
15pub trait ByteSource: Send + Sync {
16 async fn get_size(&self) -> PolarsResult<usize>;
17 async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice>;
20 async fn get_ranges(
22 &self,
23 ranges: &mut [Range<usize>],
24 ) -> PolarsResult<PlHashMap<usize, MemSlice>>;
25}
26
27pub struct MemSliceByteSource(pub MemSlice);
29
30impl MemSliceByteSource {
31 async fn try_new_mmap_from_path(
32 path: &str,
33 _cloud_options: Option<&CloudOptions>,
34 ) -> PolarsResult<Self> {
35 let file = Arc::new(
36 tokio::fs::File::open(path)
37 .await
38 .map_err(|err| _limit_path_len_io_err(path.as_ref(), err))?
39 .into_std()
40 .await,
41 );
42
43 Ok(Self(MemSlice::from_file(file.as_ref())?))
44 }
45}
46
47impl ByteSource for MemSliceByteSource {
48 async fn get_size(&self) -> PolarsResult<usize> {
49 Ok(self.0.as_ref().len())
50 }
51
52 async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
53 let out = self.0.slice(range);
54 Ok(out)
55 }
56
57 async fn get_ranges(
58 &self,
59 ranges: &mut [Range<usize>],
60 ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
61 Ok(ranges
62 .iter()
63 .map(|x| (x.start, self.0.slice(x.clone())))
64 .collect())
65 }
66}
67
68pub struct ObjectStoreByteSource {
69 store: PolarsObjectStore,
70 path: ObjectStorePath,
71}
72
73impl ObjectStoreByteSource {
74 async fn try_new_from_path(
75 path: &str,
76 cloud_options: Option<&CloudOptions>,
77 ) -> PolarsResult<Self> {
78 let (CloudLocation { prefix, .. }, store) =
79 build_object_store(path, cloud_options, false).await?;
80 let path = object_path_from_str(&prefix)?;
81
82 Ok(Self { store, path })
83 }
84}
85
86impl ByteSource for ObjectStoreByteSource {
87 async fn get_size(&self) -> PolarsResult<usize> {
88 Ok(self.store.head(&self.path).await?.size)
89 }
90
91 async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
92 let bytes = self.store.get_range(&self.path, range).await?;
93 let mem_slice = MemSlice::from_bytes(bytes);
94
95 Ok(mem_slice)
96 }
97
98 async fn get_ranges(
99 &self,
100 ranges: &mut [Range<usize>],
101 ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
102 self.store.get_ranges_sort(&self.path, ranges).await
103 }
104}
105
106pub enum DynByteSource {
108 MemSlice(MemSliceByteSource),
109 Cloud(ObjectStoreByteSource),
110}
111
112impl DynByteSource {
113 pub fn variant_name(&self) -> &str {
114 match self {
115 Self::MemSlice(_) => "MemSlice",
116 Self::Cloud(_) => "Cloud",
117 }
118 }
119}
120
121impl Default for DynByteSource {
122 fn default() -> Self {
123 Self::MemSlice(MemSliceByteSource(MemSlice::default()))
124 }
125}
126
127impl ByteSource for DynByteSource {
128 async fn get_size(&self) -> PolarsResult<usize> {
129 match self {
130 Self::MemSlice(v) => v.get_size().await,
131 Self::Cloud(v) => v.get_size().await,
132 }
133 }
134
135 async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
136 match self {
137 Self::MemSlice(v) => v.get_range(range).await,
138 Self::Cloud(v) => v.get_range(range).await,
139 }
140 }
141
142 async fn get_ranges(
143 &self,
144 ranges: &mut [Range<usize>],
145 ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
146 match self {
147 Self::MemSlice(v) => v.get_ranges(ranges).await,
148 Self::Cloud(v) => v.get_ranges(ranges).await,
149 }
150 }
151}
152
153impl From<MemSliceByteSource> for DynByteSource {
154 fn from(value: MemSliceByteSource) -> Self {
155 Self::MemSlice(value)
156 }
157}
158
159impl From<ObjectStoreByteSource> for DynByteSource {
160 fn from(value: ObjectStoreByteSource) -> Self {
161 Self::Cloud(value)
162 }
163}
164
165impl From<MemSlice> for DynByteSource {
166 fn from(value: MemSlice) -> Self {
167 Self::MemSlice(MemSliceByteSource(value))
168 }
169}
170
171#[derive(Clone, Debug)]
172pub enum DynByteSourceBuilder {
173 Mmap,
174 ObjectStore,
176}
177
178impl DynByteSourceBuilder {
179 pub async fn try_build_from_path(
180 &self,
181 path: &str,
182 cloud_options: Option<&CloudOptions>,
183 ) -> PolarsResult<DynByteSource> {
184 Ok(match self {
185 Self::Mmap => MemSliceByteSource::try_new_mmap_from_path(path, cloud_options)
186 .await?
187 .into(),
188 Self::ObjectStore => ObjectStoreByteSource::try_new_from_path(path, cloud_options)
189 .await?
190 .into(),
191 })
192 }
193}