polars_io/utils/
byte_source.rs1use std::ops::Range;
2use std::sync::Arc;
3
4use polars_core::prelude::PlHashMap;
5use polars_error::PolarsResult;
6use polars_utils::_limit_path_len_io_err;
7use polars_utils::mmap::MemSlice;
8use polars_utils::plpath::PlPathRef;
9
10use crate::cloud::{
11 CloudLocation, CloudOptions, ObjectStorePath, PolarsObjectStore, build_object_store,
12 object_path_from_str,
13};
14
15#[allow(async_fn_in_trait)]
16pub trait ByteSource: Send + Sync {
17 async fn get_size(&self) -> PolarsResult<usize>;
18 async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice>;
21 async fn get_ranges(
23 &self,
24 ranges: &mut [Range<usize>],
25 ) -> PolarsResult<PlHashMap<usize, MemSlice>>;
26}
27
28pub struct MemSliceByteSource(pub MemSlice);
30
31impl MemSliceByteSource {
32 async fn try_new_mmap_from_path(
33 path: PlPathRef<'_>,
34 _cloud_options: Option<&CloudOptions>,
35 ) -> PolarsResult<Self> {
36 let path = path.as_local_path().unwrap();
37
38 let file = Arc::new(
39 tokio::fs::File::open(path)
40 .await
41 .map_err(|err| _limit_path_len_io_err(path, err))?
42 .into_std()
43 .await,
44 );
45
46 Ok(Self(MemSlice::from_file(file.as_ref())?))
47 }
48}
49
50impl ByteSource for MemSliceByteSource {
51 async fn get_size(&self) -> PolarsResult<usize> {
52 Ok(self.0.as_ref().len())
53 }
54
55 async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
56 let out = self.0.slice(range);
57 Ok(out)
58 }
59
60 async fn get_ranges(
61 &self,
62 ranges: &mut [Range<usize>],
63 ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
64 Ok(ranges
65 .iter()
66 .map(|x| (x.start, self.0.slice(x.clone())))
67 .collect())
68 }
69}
70
71pub struct ObjectStoreByteSource {
72 store: PolarsObjectStore,
73 path: ObjectStorePath,
74}
75
76impl ObjectStoreByteSource {
77 async fn try_new_from_path(
78 path: PlPathRef<'_>,
79 cloud_options: Option<&CloudOptions>,
80 ) -> PolarsResult<Self> {
81 let (CloudLocation { prefix, .. }, store) =
82 build_object_store(path, cloud_options, false).await?;
83 let path = object_path_from_str(&prefix)?;
84
85 Ok(Self { store, path })
86 }
87}
88
89impl ByteSource for ObjectStoreByteSource {
90 async fn get_size(&self) -> PolarsResult<usize> {
91 Ok(self.store.head(&self.path).await?.size as usize)
92 }
93
94 async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
95 let bytes = self.store.get_range(&self.path, range).await?;
96 let mem_slice = MemSlice::from_bytes(bytes);
97
98 Ok(mem_slice)
99 }
100
101 async fn get_ranges(
102 &self,
103 ranges: &mut [Range<usize>],
104 ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
105 self.store.get_ranges_sort(&self.path, ranges).await
106 }
107}
108
109pub enum DynByteSource {
111 MemSlice(MemSliceByteSource),
112 Cloud(ObjectStoreByteSource),
113}
114
115impl DynByteSource {
116 pub fn variant_name(&self) -> &str {
117 match self {
118 Self::MemSlice(_) => "MemSlice",
119 Self::Cloud(_) => "Cloud",
120 }
121 }
122}
123
124impl Default for DynByteSource {
125 fn default() -> Self {
126 Self::MemSlice(MemSliceByteSource(MemSlice::default()))
127 }
128}
129
130impl ByteSource for DynByteSource {
131 async fn get_size(&self) -> PolarsResult<usize> {
132 match self {
133 Self::MemSlice(v) => v.get_size().await,
134 Self::Cloud(v) => v.get_size().await,
135 }
136 }
137
138 async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
139 match self {
140 Self::MemSlice(v) => v.get_range(range).await,
141 Self::Cloud(v) => v.get_range(range).await,
142 }
143 }
144
145 async fn get_ranges(
146 &self,
147 ranges: &mut [Range<usize>],
148 ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
149 match self {
150 Self::MemSlice(v) => v.get_ranges(ranges).await,
151 Self::Cloud(v) => v.get_ranges(ranges).await,
152 }
153 }
154}
155
156impl From<MemSliceByteSource> for DynByteSource {
157 fn from(value: MemSliceByteSource) -> Self {
158 Self::MemSlice(value)
159 }
160}
161
162impl From<ObjectStoreByteSource> for DynByteSource {
163 fn from(value: ObjectStoreByteSource) -> Self {
164 Self::Cloud(value)
165 }
166}
167
168impl From<MemSlice> for DynByteSource {
169 fn from(value: MemSlice) -> Self {
170 Self::MemSlice(MemSliceByteSource(value))
171 }
172}
173
174#[derive(Clone, Debug)]
175pub enum DynByteSourceBuilder {
176 Mmap,
177 ObjectStore,
179}
180
181impl DynByteSourceBuilder {
182 pub async fn try_build_from_path(
183 &self,
184 path: PlPathRef<'_>,
185 cloud_options: Option<&CloudOptions>,
186 ) -> PolarsResult<DynByteSource> {
187 Ok(match self {
188 Self::Mmap => MemSliceByteSource::try_new_mmap_from_path(path, cloud_options)
189 .await?
190 .into(),
191 Self::ObjectStore => ObjectStoreByteSource::try_new_from_path(path, cloud_options)
192 .await?
193 .into(),
194 })
195 }
196}