1use std::fmt::Display;
2use std::ops::Range;
3use std::sync::Arc;
4
5use futures::{Stream, StreamExt as _, TryStreamExt as _};
6use hashbrown::hash_map::RawEntryMut;
7use object_store::path::Path;
8use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
9use polars_buffer::Buffer;
10use polars_core::prelude::{InitHashMaps, PlHashMap};
11use polars_error::{PolarsError, PolarsResult};
12use polars_utils::pl_path::PlRefPath;
13use tokio::io::AsyncWriteExt;
14
15use crate::metrics::HEAD_RESPONSE_SIZE_ESTIMATE;
16use crate::pl_async::{
17 self, MAX_BUDGET_PER_REQUEST, get_concurrency_limit, get_download_chunk_size,
18 tune_with_concurrency_budget, with_concurrency_budget,
19};
20
21#[derive(Debug)]
22pub struct PolarsObjectStoreError {
23 pub base_url: PlRefPath,
24 pub source: object_store::Error,
25}
26
27impl PolarsObjectStoreError {
28 pub fn from_url(base_url: &PlRefPath) -> impl FnOnce(object_store::Error) -> Self {
29 |error| Self {
30 base_url: base_url.clone(),
31 source: error,
32 }
33 }
34}
35
36impl Display for PolarsObjectStoreError {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(
39 f,
40 "object-store error: {} (path: {})",
41 self.source, &self.base_url
42 )
43 }
44}
45
46impl std::error::Error for PolarsObjectStoreError {
47 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
48 Some(&self.source)
49 }
50}
51
52impl From<PolarsObjectStoreError> for std::io::Error {
53 fn from(value: PolarsObjectStoreError) -> Self {
54 std::io::Error::other(value)
55 }
56}
57
58impl From<PolarsObjectStoreError> for PolarsError {
59 fn from(value: PolarsObjectStoreError) -> Self {
60 PolarsError::IO {
61 error: Arc::new(value.into()),
62 msg: None,
63 }
64 }
65}
66
67mod inner {
68
69 use std::borrow::Cow;
70 use std::future::Future;
71 use std::sync::Arc;
72
73 use object_store::ObjectStore;
74 use polars_core::config;
75 use polars_error::{PolarsError, PolarsResult};
76 use polars_utils::relaxed_cell::RelaxedCell;
77
78 use crate::cloud::{ObjectStoreErrorContext, PolarsObjectStoreBuilder};
79 use crate::metrics::{IOMetrics, OptIOMetrics};
80
81 #[derive(Debug)]
82 struct Inner {
83 store: tokio::sync::RwLock<Arc<dyn ObjectStore>>,
84 builder: PolarsObjectStoreBuilder,
85 }
86
87 #[derive(Clone, Debug)]
89 pub struct PolarsObjectStore {
90 inner: Arc<Inner>,
91 initial_store: std::sync::Arc<dyn ObjectStore>,
93 rebuilt: RelaxedCell<bool>,
96 io_metrics: OptIOMetrics,
97 }
98
99 impl PolarsObjectStore {
100 pub(crate) fn new_from_inner(
101 store: Arc<dyn ObjectStore>,
102 builder: PolarsObjectStoreBuilder,
103 ) -> Self {
104 let initial_store = store.clone();
105 Self {
106 inner: Arc::new(Inner {
107 store: tokio::sync::RwLock::new(store),
108 builder,
109 }),
110 initial_store,
111 rebuilt: RelaxedCell::from(false),
112 io_metrics: OptIOMetrics(None),
113 }
114 }
115
116 pub fn set_io_metrics(&mut self, io_metrics: Option<Arc<IOMetrics>>) -> &mut Self {
117 self.io_metrics = OptIOMetrics(io_metrics);
118 self
119 }
120
121 pub fn io_metrics(&self) -> &OptIOMetrics {
122 &self.io_metrics
123 }
124
125 async fn to_dyn_object_store(&self) -> Cow<'_, Arc<dyn ObjectStore>> {
127 if !self.rebuilt.load() {
128 Cow::Borrowed(&self.initial_store)
129 } else {
130 Cow::Owned(self.inner.store.read().await.clone())
131 }
132 }
133
134 pub async fn rebuild_inner(
135 &self,
136 from_version: &Arc<dyn ObjectStore>,
137 ) -> PolarsResult<Arc<dyn ObjectStore>> {
138 let mut current_store = self.inner.store.write().await;
139
140 if Arc::ptr_eq(&*current_store, from_version) {
142 *current_store =
143 self.inner
144 .builder
145 .clone()
146 .build_impl(true)
147 .await
148 .map_err(|e| {
149 e.wrap_msg(|e| format!("attempt to rebuild object store failed: {e}"))
150 })?;
151 }
152
153 self.rebuilt.store(true);
154
155 Ok((*current_store).clone())
156 }
157
158 pub async fn exec_with_rebuild_retry_on_err<'s, 'f, Fn, Fut, O>(
159 &'s self,
160 mut func: Fn,
161 ) -> PolarsResult<O>
162 where
163 Fn: FnMut(Cow<'s, Arc<dyn ObjectStore>>) -> Fut + 'f,
164 Fut: Future<Output = object_store::Result<O>>,
165 {
166 let store = self.to_dyn_object_store().await;
167
168 let out = func(store.clone()).await;
169
170 let orig_err = match out {
171 Ok(v) => return Ok(v),
172 Err(e) => e,
173 };
174
175 if config::verbose() {
176 eprintln!(
177 "[PolarsObjectStore]: got error: {}, will rebuild store and retry",
178 &orig_err
179 );
180 }
181
182 let store = self
183 .rebuild_inner(&store)
184 .await
185 .map_err(|e| e.wrap_msg(|e| format!("{e}; original error: {orig_err}")))?;
186
187 func(Cow::Owned(store)).await.map_err(|e| {
188 let e: PolarsError = self.error_context().attach_err_info(e).into();
189
190 if self.inner.builder.is_azure()
191 && std::env::var("POLARS_AUTO_USE_AZURE_STORAGE_ACCOUNT_KEY").as_deref()
192 != Ok("1")
193 {
194 e.wrap_msg(|e| {
197 format!(
198 "{e}; note: if you are using Python, consider setting \
199POLARS_AUTO_USE_AZURE_STORAGE_ACCOUNT_KEY=1 if you would like polars to try to retrieve \
200and use the storage account keys from Azure CLI to authenticate"
201 )
202 })
203 } else {
204 e
205 }
206 })
207 }
208
209 pub fn error_context(&self) -> ObjectStoreErrorContext {
210 ObjectStoreErrorContext::new(self.inner.builder.path().clone())
211 }
212 }
213}
214
215#[derive(Clone)]
216pub struct ObjectStoreErrorContext {
217 path: PlRefPath,
218}
219
220impl ObjectStoreErrorContext {
221 pub fn new(path: PlRefPath) -> Self {
222 Self { path }
223 }
224
225 pub fn attach_err_info(self, err: object_store::Error) -> PolarsObjectStoreError {
226 let ObjectStoreErrorContext { path } = self;
227
228 PolarsObjectStoreError {
229 base_url: path,
230 source: err,
231 }
232 }
233}
234
235pub use inner::PolarsObjectStore;
236
237pub type ObjectStorePath = object_store::path::Path;
238
239impl PolarsObjectStore {
240 pub fn build_buffered_ranges_stream<'a, T: Iterator<Item = Range<usize>>>(
241 &'a self,
242 path: &'a Path,
243 ranges: T,
244 ) -> impl Stream<Item = PolarsResult<Buffer<u8>>> + use<'a, T> {
245 futures::stream::iter(ranges.map(move |range| async move {
246 if range.is_empty() {
247 return Ok(Buffer::new());
248 }
249
250 let out = self
251 .io_metrics()
252 .record_io_read(
253 range.len() as u64,
254 self.exec_with_rebuild_retry_on_err(|s| async move {
255 s.get_range(path, range.start as u64..range.end as u64)
256 .await
257 }),
258 )
259 .await?;
260
261 Ok(Buffer::from_owner(out))
262 }))
263 .buffered(get_concurrency_limit() as usize)
265 }
266
267 pub async fn get_range(&self, path: &Path, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
268 if range.is_empty() {
269 return Ok(Buffer::new());
270 }
271
272 let parts = split_range(range.clone());
273
274 if parts.len() == 1 {
275 let out = tune_with_concurrency_budget(1, move || async move {
276 let bytes = self
277 .io_metrics()
278 .record_io_read(
279 range.len() as u64,
280 self.exec_with_rebuild_retry_on_err(|s| async move {
281 s.get_range(path, range.start as u64..range.end as u64)
282 .await
283 }),
284 )
285 .await?;
286
287 PolarsResult::Ok(Buffer::from_owner(bytes))
288 })
289 .await?;
290
291 Ok(out)
292 } else {
293 let parts = tune_with_concurrency_budget(
294 parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
295 || {
296 self.build_buffered_ranges_stream(path, parts)
297 .try_collect::<Vec<Buffer<u8>>>()
298 },
299 )
300 .await?;
301
302 let mut combined = Vec::with_capacity(range.len());
303
304 for part in parts {
305 combined.extend_from_slice(&part)
306 }
307
308 assert_eq!(combined.len(), range.len());
309
310 PolarsResult::Ok(Buffer::from_vec(combined))
311 }
312 }
313
314 pub async fn get_ranges_sort(
320 &self,
321 path: &Path,
322 ranges: &mut [Range<usize>],
323 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
324 if ranges.is_empty() {
325 return Ok(Default::default());
326 }
327
328 ranges.sort_unstable_by_key(|x| x.start);
329
330 let ranges_len = ranges.len();
331 let (merged_ranges, merged_ends): (Vec<_>, Vec<_>) = merge_ranges(ranges).unzip();
332
333 let mut out = PlHashMap::with_capacity(ranges_len);
334
335 let mut stream = self.build_buffered_ranges_stream(path, merged_ranges.iter().cloned());
336
337 tune_with_concurrency_budget(
338 merged_ranges.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
339 || async {
340 let mut len = 0;
341 let mut current_offset = 0;
342 let mut ends_iter = merged_ends.iter();
343
344 let mut splitted_parts = vec![];
345
346 while let Some(bytes) = stream.try_next().await? {
347 len += bytes.len();
348 let end = *ends_iter.next().unwrap();
349
350 if end == 0 {
351 splitted_parts.push(bytes);
352 continue;
353 }
354
355 let full_range = ranges[current_offset..end]
356 .iter()
357 .cloned()
358 .reduce(|l, r| l.start.min(r.start)..l.end.max(r.end))
359 .unwrap();
360
361 let bytes = if splitted_parts.is_empty() {
362 bytes
363 } else {
364 let mut out = Vec::with_capacity(full_range.len());
365
366 for x in splitted_parts.drain(..) {
367 out.extend_from_slice(&x);
368 }
369
370 out.extend_from_slice(&bytes);
371 Buffer::from(out)
372 };
373
374 assert_eq!(bytes.len(), full_range.len());
375
376 for range in &ranges[current_offset..end] {
377 let slice = bytes
378 .clone()
379 .sliced(range.start - full_range.start..range.end - full_range.start);
380
381 match out.raw_entry_mut().from_key(&range.start) {
382 RawEntryMut::Vacant(slot) => {
383 slot.insert(range.start, slice);
384 },
385 RawEntryMut::Occupied(mut slot) => {
386 if slot.get_mut().len() < slice.len() {
387 *slot.get_mut() = slice;
388 }
389 },
390 }
391 }
392
393 current_offset = end;
394 }
395
396 assert!(splitted_parts.is_empty());
397
398 PolarsResult::Ok(pl_async::Size::from(len as u64))
399 },
400 )
401 .await?;
402
403 Ok(out)
404 }
405
406 pub async fn download(&self, path: &Path, file: &mut tokio::fs::File) -> PolarsResult<()> {
407 let size = self.head(path).await?.size;
408 let parts = split_range(0..size as usize);
409
410 tune_with_concurrency_budget(
411 parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
412 || async {
413 let mut stream = self.build_buffered_ranges_stream(path, parts);
414 let mut len = 0;
415 while let Some(bytes) = stream.try_next().await? {
416 len += bytes.len();
417 file.write_all(&bytes).await?;
418 }
419
420 assert_eq!(len, size as usize);
421
422 PolarsResult::Ok(pl_async::Size::from(len as u64))
423 },
424 )
425 .await?;
426
427 file.sync_all().await.map_err(PolarsError::from)?;
430
431 Ok(())
432 }
433
434 pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
436 with_concurrency_budget(1, || {
437 self.exec_with_rebuild_retry_on_err(|s| {
438 async move {
439 let head_result = self
440 .io_metrics()
441 .record_io_read(HEAD_RESPONSE_SIZE_ESTIMATE, s.head(path))
442 .await;
443
444 if head_result.is_err() {
445 let get_range_0_1_result = self
448 .io_metrics()
449 .record_io_read(
450 HEAD_RESPONSE_SIZE_ESTIMATE + 1,
451 s.get_opts(
452 path,
453 object_store::GetOptions {
454 range: Some((0..1).into()),
455 ..Default::default()
456 },
457 ),
458 )
459 .await;
460
461 if let Ok(v) = get_range_0_1_result {
462 return Ok(v.meta);
463 }
464 }
465
466 let out = head_result?;
467
468 Ok(out)
469 }
470 })
471 })
472 .await
473 }
474}
475
476fn split_range(range: Range<usize>) -> impl ExactSizeIterator<Item = Range<usize>> {
479 let chunk_size = get_download_chunk_size();
480
481 let n_parts = [
483 (range.len().div_ceil(chunk_size)).max(1),
484 (range.len() / chunk_size).max(1),
485 ]
486 .into_iter()
487 .min_by_key(|x| (range.len() / *x).abs_diff(chunk_size))
488 .unwrap();
489
490 let chunk_size = (range.len() / n_parts).max(1);
491
492 assert_eq!(n_parts, (range.len() / chunk_size).max(1));
493 let bytes_rem = range.len() % chunk_size;
494
495 (0..n_parts).map(move |part_no| {
496 let (start, end) = if part_no == 0 {
497 let end = range.start + chunk_size + bytes_rem;
499 let end = if end > range.end { range.end } else { end };
500 (range.start, end)
501 } else {
502 let start = bytes_rem + range.start + part_no * chunk_size;
503 (start, start + chunk_size)
504 };
505
506 start..end
507 })
508}
509
510fn merge_ranges(ranges: &[Range<usize>]) -> impl Iterator<Item = (Range<usize>, usize)> + '_ {
521 let chunk_size = get_download_chunk_size();
522
523 let mut current_merged_range = ranges.first().map_or(0..0, Clone::clone);
524 let mut current_n_bytes = current_merged_range.len();
526
527 (0..ranges.len())
528 .filter_map(move |current_idx| {
529 let current_idx = 1 + current_idx;
530
531 if current_idx == ranges.len() {
532 Some((current_merged_range.clone(), current_idx))
534 } else {
535 let range = ranges[current_idx].clone();
536
537 let new_merged = current_merged_range.start.min(range.start)
538 ..current_merged_range.end.max(range.end);
539
540 let (distance, is_overlapping) = {
551 let l = current_merged_range.end.min(range.end);
552 let r = current_merged_range.start.max(range.start);
553
554 (r.abs_diff(l), r < l)
555 };
556
557 let should_merge = is_overlapping || {
558 let leq_current_len_dist_to_chunk_size = new_merged.len().abs_diff(chunk_size)
559 <= current_merged_range.len().abs_diff(chunk_size);
560 let gap_tolerance =
561 (current_n_bytes.max(range.len()) / 8).clamp(1024 * 1024, 8 * 1024 * 1024);
562
563 leq_current_len_dist_to_chunk_size && distance <= gap_tolerance
564 };
565
566 if should_merge {
567 current_merged_range = new_merged;
569 current_n_bytes += if is_overlapping {
570 range.len() - distance
571 } else {
572 range.len()
573 };
574 None
575 } else {
576 let out = (current_merged_range.clone(), current_idx);
577 current_merged_range = range;
578 current_n_bytes = current_merged_range.len();
579 Some(out)
580 }
581 }
582 })
583 .flat_map(|x| {
584 let (range, end) = x;
586 let split = split_range(range);
587 let len = split.len();
588
589 split
590 .enumerate()
591 .map(move |(i, range)| (range, if 1 + i == len { end } else { 0 }))
592 })
593}
594
595#[cfg(test)]
596mod tests {
597
598 #[test]
599 fn test_split_range() {
600 use super::{get_download_chunk_size, split_range};
601
602 let chunk_size = get_download_chunk_size();
603
604 assert_eq!(chunk_size, 64 * 1024 * 1024);
605
606 #[allow(clippy::single_range_in_vec_init)]
607 {
608 assert_eq!(split_range(0..0).collect::<Vec<_>>(), [0..0]);
610 assert_eq!(split_range(3..3).collect::<Vec<_>>(), [3..3]);
611 }
612
613 let n = 4 * chunk_size / 3;
620
621 #[allow(clippy::single_range_in_vec_init)]
622 {
623 assert_eq!(split_range(0..n).collect::<Vec<_>>(), [0..89478485]);
624 }
625
626 assert_eq!(
627 split_range(0..n + 1).collect::<Vec<_>>(),
628 [0..44739243, 44739243..89478486]
629 );
630
631 let n = 12 * chunk_size / 5;
638
639 assert_eq!(
640 split_range(0..n).collect::<Vec<_>>(),
641 [0..80530637, 80530637..161061273]
642 );
643
644 assert_eq!(
645 split_range(0..n + 1).collect::<Vec<_>>(),
646 [0..53687092, 53687092..107374183, 107374183..161061274]
647 );
648 }
649
650 #[test]
651 fn test_merge_ranges() {
652 use super::{get_download_chunk_size, merge_ranges};
653
654 let chunk_size = get_download_chunk_size();
655
656 assert_eq!(chunk_size, 64 * 1024 * 1024);
657
658 assert_eq!(merge_ranges(&[]).collect::<Vec<_>>(), []);
660
661 assert_eq!(
665 merge_ranges(&[0..1, 1..127 * 1024 * 1024]).collect::<Vec<_>>(),
666 [(0..66584576, 0), (66584576..133169152, 2)]
667 );
668
669 assert_eq!(
671 merge_ranges(&[0..1, 1024 * 1024 + 1..1024 * 1024 + 2]).collect::<Vec<_>>(),
672 [(0..1048578, 2)]
673 );
674
675 assert_eq!(
677 merge_ranges(&[0..1, 1024 * 1024 + 2..1024 * 1024 + 3]).collect::<Vec<_>>(),
678 [(0..1, 1), (1048578..1048579, 2)]
679 );
680
681 assert_eq!(
683 merge_ranges(&[0..8, 10..11]).collect::<Vec<_>>(),
684 [(0..11, 2)]
685 );
686
687 assert_eq!(
689 merge_ranges(&[0..1, 3..11]).collect::<Vec<_>>(),
690 [(0..11, 2)]
691 );
692
693 assert_eq!(
695 merge_ranges(&[0..80 * 1024 * 1024, 10 * 1024 * 1024..70 * 1024 * 1024])
696 .collect::<Vec<_>>(),
697 [(0..80 * 1024 * 1024, 2)]
698 );
699 }
700}