polars_io/utils/
byte_source.rs

1use std::ops::Range;
2use std::sync::Arc;
3
4use polars_core::prelude::PlHashMap;
5use polars_error::PolarsResult;
6use polars_utils::_limit_path_len_io_err;
7use polars_utils::mmap::MemSlice;
8use polars_utils::plpath::PlPathRef;
9
10use crate::cloud::{
11    CloudLocation, CloudOptions, ObjectStorePath, PolarsObjectStore, build_object_store,
12    object_path_from_str,
13};
14
15#[allow(async_fn_in_trait)]
16pub trait ByteSource: Send + Sync {
17    async fn get_size(&self) -> PolarsResult<usize>;
18    /// # Panics
19    /// Panics if `range` is not in bounds.
20    async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice>;
21    /// Note: This will mutably sort ranges for coalescing.
22    async fn get_ranges(
23        &self,
24        ranges: &mut [Range<usize>],
25    ) -> PolarsResult<PlHashMap<usize, MemSlice>>;
26}
27
28/// Byte source backed by a `MemSlice`, which can potentially be memory-mapped.
29pub struct MemSliceByteSource(pub MemSlice);
30
31impl MemSliceByteSource {
32    async fn try_new_mmap_from_path(
33        path: PlPathRef<'_>,
34        _cloud_options: Option<&CloudOptions>,
35    ) -> PolarsResult<Self> {
36        let path = path.as_local_path().unwrap();
37
38        let file = Arc::new(
39            tokio::fs::File::open(path)
40                .await
41                .map_err(|err| _limit_path_len_io_err(path, err))?
42                .into_std()
43                .await,
44        );
45
46        Ok(Self(MemSlice::from_file(file.as_ref())?))
47    }
48}
49
50impl ByteSource for MemSliceByteSource {
51    async fn get_size(&self) -> PolarsResult<usize> {
52        Ok(self.0.as_ref().len())
53    }
54
55    async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
56        let out = self.0.slice(range);
57        Ok(out)
58    }
59
60    async fn get_ranges(
61        &self,
62        ranges: &mut [Range<usize>],
63    ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
64        Ok(ranges
65            .iter()
66            .map(|x| (x.start, self.0.slice(x.clone())))
67            .collect())
68    }
69}
70
71pub struct ObjectStoreByteSource {
72    store: PolarsObjectStore,
73    path: ObjectStorePath,
74}
75
76impl ObjectStoreByteSource {
77    async fn try_new_from_path(
78        path: PlPathRef<'_>,
79        cloud_options: Option<&CloudOptions>,
80    ) -> PolarsResult<Self> {
81        let (CloudLocation { prefix, .. }, store) =
82            build_object_store(path, cloud_options, false).await?;
83        let path = object_path_from_str(&prefix)?;
84
85        Ok(Self { store, path })
86    }
87}
88
89impl ByteSource for ObjectStoreByteSource {
90    async fn get_size(&self) -> PolarsResult<usize> {
91        Ok(self.store.head(&self.path).await?.size as usize)
92    }
93
94    async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
95        let bytes = self.store.get_range(&self.path, range).await?;
96        let mem_slice = MemSlice::from_bytes(bytes);
97
98        Ok(mem_slice)
99    }
100
101    async fn get_ranges(
102        &self,
103        ranges: &mut [Range<usize>],
104    ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
105        self.store.get_ranges_sort(&self.path, ranges).await
106    }
107}
108
109/// Dynamic dispatch to async functions.
110pub enum DynByteSource {
111    MemSlice(MemSliceByteSource),
112    Cloud(ObjectStoreByteSource),
113}
114
115impl DynByteSource {
116    pub fn variant_name(&self) -> &str {
117        match self {
118            Self::MemSlice(_) => "MemSlice",
119            Self::Cloud(_) => "Cloud",
120        }
121    }
122}
123
124impl Default for DynByteSource {
125    fn default() -> Self {
126        Self::MemSlice(MemSliceByteSource(MemSlice::default()))
127    }
128}
129
130impl ByteSource for DynByteSource {
131    async fn get_size(&self) -> PolarsResult<usize> {
132        match self {
133            Self::MemSlice(v) => v.get_size().await,
134            Self::Cloud(v) => v.get_size().await,
135        }
136    }
137
138    async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
139        match self {
140            Self::MemSlice(v) => v.get_range(range).await,
141            Self::Cloud(v) => v.get_range(range).await,
142        }
143    }
144
145    async fn get_ranges(
146        &self,
147        ranges: &mut [Range<usize>],
148    ) -> PolarsResult<PlHashMap<usize, MemSlice>> {
149        match self {
150            Self::MemSlice(v) => v.get_ranges(ranges).await,
151            Self::Cloud(v) => v.get_ranges(ranges).await,
152        }
153    }
154}
155
156impl From<MemSliceByteSource> for DynByteSource {
157    fn from(value: MemSliceByteSource) -> Self {
158        Self::MemSlice(value)
159    }
160}
161
162impl From<ObjectStoreByteSource> for DynByteSource {
163    fn from(value: ObjectStoreByteSource) -> Self {
164        Self::Cloud(value)
165    }
166}
167
168impl From<MemSlice> for DynByteSource {
169    fn from(value: MemSlice) -> Self {
170        Self::MemSlice(MemSliceByteSource(value))
171    }
172}
173
174#[derive(Clone, Debug)]
175pub enum DynByteSourceBuilder {
176    Mmap,
177    /// Supports both cloud and local files.
178    ObjectStore,
179}
180
181impl DynByteSourceBuilder {
182    pub async fn try_build_from_path(
183        &self,
184        path: PlPathRef<'_>,
185        cloud_options: Option<&CloudOptions>,
186    ) -> PolarsResult<DynByteSource> {
187        Ok(match self {
188            Self::Mmap => MemSliceByteSource::try_new_mmap_from_path(path, cloud_options)
189                .await?
190                .into(),
191            Self::ObjectStore => ObjectStoreByteSource::try_new_from_path(path, cloud_options)
192                .await?
193                .into(),
194        })
195    }
196}