polars_io/cloud/
polars_object_store.rs

1use std::fmt::Display;
2use std::ops::Range;
3use std::sync::Arc;
4
5use futures::{Stream, StreamExt as _, TryStreamExt as _};
6use hashbrown::hash_map::RawEntryMut;
7use object_store::path::Path;
8use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
9use polars_buffer::Buffer;
10use polars_core::prelude::{InitHashMaps, PlHashMap};
11use polars_error::{PolarsError, PolarsResult};
12use polars_utils::pl_path::PlRefPath;
13use tokio::io::AsyncWriteExt;
14
15use crate::metrics::HEAD_RESPONSE_SIZE_ESTIMATE;
16use crate::pl_async::{
17    self, MAX_BUDGET_PER_REQUEST, get_concurrency_limit, get_download_chunk_size,
18    tune_with_concurrency_budget, with_concurrency_budget,
19};
20
21#[derive(Debug)]
22pub struct PolarsObjectStoreError {
23    pub base_url: PlRefPath,
24    pub source: object_store::Error,
25}
26
27impl PolarsObjectStoreError {
28    pub fn from_url(base_url: &PlRefPath) -> impl FnOnce(object_store::Error) -> Self {
29        |error| Self {
30            base_url: base_url.clone(),
31            source: error,
32        }
33    }
34}
35
36impl Display for PolarsObjectStoreError {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(
39            f,
40            "object-store error: {} (path: {})",
41            self.source, &self.base_url
42        )
43    }
44}
45
46impl std::error::Error for PolarsObjectStoreError {
47    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
48        Some(&self.source)
49    }
50}
51
52impl From<PolarsObjectStoreError> for std::io::Error {
53    fn from(value: PolarsObjectStoreError) -> Self {
54        std::io::Error::other(value)
55    }
56}
57
58impl From<PolarsObjectStoreError> for PolarsError {
59    fn from(value: PolarsObjectStoreError) -> Self {
60        PolarsError::IO {
61            error: Arc::new(value.into()),
62            msg: None,
63        }
64    }
65}
66
67mod inner {
68
69    use std::borrow::Cow;
70    use std::future::Future;
71    use std::sync::Arc;
72
73    use object_store::ObjectStore;
74    use polars_core::config;
75    use polars_error::{PolarsError, PolarsResult};
76    use polars_utils::relaxed_cell::RelaxedCell;
77
78    use crate::cloud::{ObjectStoreErrorContext, PolarsObjectStoreBuilder};
79    use crate::metrics::{IOMetrics, OptIOMetrics};
80
81    #[derive(Debug)]
82    struct Inner {
83        store: tokio::sync::RwLock<Arc<dyn ObjectStore>>,
84        builder: PolarsObjectStoreBuilder,
85    }
86
87    /// Polars wrapper around [`ObjectStore`] functionality. This struct is cheaply cloneable.
88    #[derive(Clone, Debug)]
89    pub struct PolarsObjectStore {
90        inner: Arc<Inner>,
91        /// Avoid contending the Mutex `lock()` until the first re-build.
92        initial_store: std::sync::Arc<dyn ObjectStore>,
93        /// Used for interior mutability. Doesn't need to be shared with other threads so it's not
94        /// inside `Arc<>`.
95        rebuilt: RelaxedCell<bool>,
96        io_metrics: OptIOMetrics,
97    }
98
99    impl PolarsObjectStore {
100        pub(crate) fn new_from_inner(
101            store: Arc<dyn ObjectStore>,
102            builder: PolarsObjectStoreBuilder,
103        ) -> Self {
104            let initial_store = store.clone();
105            Self {
106                inner: Arc::new(Inner {
107                    store: tokio::sync::RwLock::new(store),
108                    builder,
109                }),
110                initial_store,
111                rebuilt: RelaxedCell::from(false),
112                io_metrics: OptIOMetrics(None),
113            }
114        }
115
116        pub fn set_io_metrics(&mut self, io_metrics: Option<Arc<IOMetrics>>) -> &mut Self {
117            self.io_metrics = OptIOMetrics(io_metrics);
118            self
119        }
120
121        pub fn io_metrics(&self) -> &OptIOMetrics {
122            &self.io_metrics
123        }
124
125        /// Gets the underlying [`ObjectStore`] implementation.
126        async fn to_dyn_object_store(&self) -> Cow<'_, Arc<dyn ObjectStore>> {
127            if !self.rebuilt.load() {
128                Cow::Borrowed(&self.initial_store)
129            } else {
130                Cow::Owned(self.inner.store.read().await.clone())
131            }
132        }
133
134        pub async fn rebuild_inner(
135            &self,
136            from_version: &Arc<dyn ObjectStore>,
137        ) -> PolarsResult<Arc<dyn ObjectStore>> {
138            let mut current_store = self.inner.store.write().await;
139
140            // If this does not eq, then `inner` was already re-built by another thread.
141            if Arc::ptr_eq(&*current_store, from_version) {
142                *current_store =
143                    self.inner
144                        .builder
145                        .clone()
146                        .build_impl(true)
147                        .await
148                        .map_err(|e| {
149                            e.wrap_msg(|e| format!("attempt to rebuild object store failed: {e}"))
150                        })?;
151            }
152
153            self.rebuilt.store(true);
154
155            Ok((*current_store).clone())
156        }
157
158        pub async fn exec_with_rebuild_retry_on_err<'s, 'f, Fn, Fut, O>(
159            &'s self,
160            mut func: Fn,
161        ) -> PolarsResult<O>
162        where
163            Fn: FnMut(Cow<'s, Arc<dyn ObjectStore>>) -> Fut + 'f,
164            Fut: Future<Output = object_store::Result<O>>,
165        {
166            let store = self.to_dyn_object_store().await;
167
168            let out = func(store.clone()).await;
169
170            let orig_err = match out {
171                Ok(v) => return Ok(v),
172                Err(e) => e,
173            };
174
175            if config::verbose() {
176                eprintln!(
177                    "[PolarsObjectStore]: got error: {}, will rebuild store and retry",
178                    &orig_err
179                );
180            }
181
182            let store = self
183                .rebuild_inner(&store)
184                .await
185                .map_err(|e| e.wrap_msg(|e| format!("{e}; original error: {orig_err}")))?;
186
187            func(Cow::Owned(store)).await.map_err(|e| {
188                let e: PolarsError = self.error_context().attach_err_info(e).into();
189
190                if self.inner.builder.is_azure()
191                    && std::env::var("POLARS_AUTO_USE_AZURE_STORAGE_ACCOUNT_KEY").as_deref()
192                        != Ok("1")
193                {
194                    // Note: This error is intended for Python audiences. The logic for retrieving
195                    // these keys exist only on the Python side.
196                    e.wrap_msg(|e| {
197                        format!(
198                            "{e}; note: if you are using Python, consider setting \
199POLARS_AUTO_USE_AZURE_STORAGE_ACCOUNT_KEY=1 if you would like polars to try to retrieve \
200and use the storage account keys from Azure CLI to authenticate"
201                        )
202                    })
203                } else {
204                    e
205                }
206            })
207        }
208
209        pub fn error_context(&self) -> ObjectStoreErrorContext {
210            ObjectStoreErrorContext::new(self.inner.builder.path().clone())
211        }
212    }
213}
214
215#[derive(Clone)]
216pub struct ObjectStoreErrorContext {
217    path: PlRefPath,
218}
219
220impl ObjectStoreErrorContext {
221    pub fn new(path: PlRefPath) -> Self {
222        Self { path }
223    }
224
225    pub fn attach_err_info(self, err: object_store::Error) -> PolarsObjectStoreError {
226        let ObjectStoreErrorContext { path } = self;
227
228        PolarsObjectStoreError {
229            base_url: path,
230            source: err,
231        }
232    }
233}
234
235pub use inner::PolarsObjectStore;
236
237pub type ObjectStorePath = object_store::path::Path;
238
239impl PolarsObjectStore {
240    pub fn build_buffered_ranges_stream<'a, T: Iterator<Item = Range<usize>>>(
241        &'a self,
242        path: &'a Path,
243        ranges: T,
244    ) -> impl Stream<Item = PolarsResult<Buffer<u8>>> + use<'a, T> {
245        futures::stream::iter(ranges.map(move |range| async move {
246            if range.is_empty() {
247                return Ok(Buffer::new());
248            }
249
250            let out = self
251                .io_metrics()
252                .record_io_read(
253                    range.len() as u64,
254                    self.exec_with_rebuild_retry_on_err(|s| async move {
255                        s.get_range(path, range.start as u64..range.end as u64)
256                            .await
257                    }),
258                )
259                .await?;
260
261            Ok(Buffer::from_owner(out))
262        }))
263        // Add a limit locally as this gets run inside a single `tune_with_concurrency_budget`.
264        .buffered(get_concurrency_limit() as usize)
265    }
266
267    pub async fn get_range(&self, path: &Path, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
268        if range.is_empty() {
269            return Ok(Buffer::new());
270        }
271
272        let parts = split_range(range.clone());
273
274        if parts.len() == 1 {
275            let out = tune_with_concurrency_budget(1, move || async move {
276                let bytes = self
277                    .io_metrics()
278                    .record_io_read(
279                        range.len() as u64,
280                        self.exec_with_rebuild_retry_on_err(|s| async move {
281                            s.get_range(path, range.start as u64..range.end as u64)
282                                .await
283                        }),
284                    )
285                    .await?;
286
287                PolarsResult::Ok(Buffer::from_owner(bytes))
288            })
289            .await?;
290
291            Ok(out)
292        } else {
293            let parts = tune_with_concurrency_budget(
294                parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
295                || {
296                    self.build_buffered_ranges_stream(path, parts)
297                        .try_collect::<Vec<Buffer<u8>>>()
298                },
299            )
300            .await?;
301
302            let mut combined = Vec::with_capacity(range.len());
303
304            for part in parts {
305                combined.extend_from_slice(&part)
306            }
307
308            assert_eq!(combined.len(), range.len());
309
310            PolarsResult::Ok(Buffer::from_vec(combined))
311        }
312    }
313
314    /// Fetch byte ranges into a HashMap keyed by the range start. This will mutably sort the
315    /// `ranges` slice for coalescing.
316    ///
317    /// # Panics
318    /// Panics if the same range start is used by more than 1 range.
319    pub async fn get_ranges_sort(
320        &self,
321        path: &Path,
322        ranges: &mut [Range<usize>],
323    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
324        if ranges.is_empty() {
325            return Ok(Default::default());
326        }
327
328        ranges.sort_unstable_by_key(|x| x.start);
329
330        let ranges_len = ranges.len();
331        let (merged_ranges, merged_ends): (Vec<_>, Vec<_>) = merge_ranges(ranges).unzip();
332
333        let mut out = PlHashMap::with_capacity(ranges_len);
334
335        let mut stream = self.build_buffered_ranges_stream(path, merged_ranges.iter().cloned());
336
337        tune_with_concurrency_budget(
338            merged_ranges.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
339            || async {
340                let mut len = 0;
341                let mut current_offset = 0;
342                let mut ends_iter = merged_ends.iter();
343
344                let mut splitted_parts = vec![];
345
346                while let Some(bytes) = stream.try_next().await? {
347                    len += bytes.len();
348                    let end = *ends_iter.next().unwrap();
349
350                    if end == 0 {
351                        splitted_parts.push(bytes);
352                        continue;
353                    }
354
355                    let full_range = ranges[current_offset..end]
356                        .iter()
357                        .cloned()
358                        .reduce(|l, r| l.start.min(r.start)..l.end.max(r.end))
359                        .unwrap();
360
361                    let bytes = if splitted_parts.is_empty() {
362                        bytes
363                    } else {
364                        let mut out = Vec::with_capacity(full_range.len());
365
366                        for x in splitted_parts.drain(..) {
367                            out.extend_from_slice(&x);
368                        }
369
370                        out.extend_from_slice(&bytes);
371                        Buffer::from(out)
372                    };
373
374                    assert_eq!(bytes.len(), full_range.len());
375
376                    for range in &ranges[current_offset..end] {
377                        let slice = bytes
378                            .clone()
379                            .sliced(range.start - full_range.start..range.end - full_range.start);
380
381                        match out.raw_entry_mut().from_key(&range.start) {
382                            RawEntryMut::Vacant(slot) => {
383                                slot.insert(range.start, slice);
384                            },
385                            RawEntryMut::Occupied(mut slot) => {
386                                if slot.get_mut().len() < slice.len() {
387                                    *slot.get_mut() = slice;
388                                }
389                            },
390                        }
391                    }
392
393                    current_offset = end;
394                }
395
396                assert!(splitted_parts.is_empty());
397
398                PolarsResult::Ok(pl_async::Size::from(len as u64))
399            },
400        )
401        .await?;
402
403        Ok(out)
404    }
405
406    pub async fn download(&self, path: &Path, file: &mut tokio::fs::File) -> PolarsResult<()> {
407        let size = self.head(path).await?.size;
408        let parts = split_range(0..size as usize);
409
410        tune_with_concurrency_budget(
411            parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
412            || async {
413                let mut stream = self.build_buffered_ranges_stream(path, parts);
414                let mut len = 0;
415                while let Some(bytes) = stream.try_next().await? {
416                    len += bytes.len();
417                    file.write_all(&bytes).await?;
418                }
419
420                assert_eq!(len, size as usize);
421
422                PolarsResult::Ok(pl_async::Size::from(len as u64))
423            },
424        )
425        .await?;
426
427        // Dropping is delayed for tokio async files so we need to explicitly
428        // flush here (https://github.com/tokio-rs/tokio/issues/2307#issuecomment-596336451).
429        file.sync_all().await.map_err(PolarsError::from)?;
430
431        Ok(())
432    }
433
434    /// Fetch the metadata of the parquet file, do not memoize it.
435    pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
436        with_concurrency_budget(1, || {
437            self.exec_with_rebuild_retry_on_err(|s| {
438                async move {
439                    let head_result = self
440                        .io_metrics()
441                        .record_io_read(HEAD_RESPONSE_SIZE_ESTIMATE, s.head(path))
442                        .await;
443
444                    if head_result.is_err() {
445                        // Pre-signed URLs forbid the HEAD method, but we can still retrieve the header
446                        // information with a range 0-1 request.
447                        let get_range_0_1_result = self
448                            .io_metrics()
449                            .record_io_read(
450                                HEAD_RESPONSE_SIZE_ESTIMATE + 1,
451                                s.get_opts(
452                                    path,
453                                    object_store::GetOptions {
454                                        range: Some((0..1).into()),
455                                        ..Default::default()
456                                    },
457                                ),
458                            )
459                            .await;
460
461                        if let Ok(v) = get_range_0_1_result {
462                            return Ok(v.meta);
463                        }
464                    }
465
466                    let out = head_result?;
467
468                    Ok(out)
469                }
470            })
471        })
472        .await
473    }
474}
475
476/// Splits a single range into multiple smaller ranges, which can be downloaded concurrently for
477/// much higher throughput.
478fn split_range(range: Range<usize>) -> impl ExactSizeIterator<Item = Range<usize>> {
479    let chunk_size = get_download_chunk_size();
480
481    // Calculate n_parts such that we are as close as possible to the `chunk_size`.
482    let n_parts = [
483        (range.len().div_ceil(chunk_size)).max(1),
484        (range.len() / chunk_size).max(1),
485    ]
486    .into_iter()
487    .min_by_key(|x| (range.len() / *x).abs_diff(chunk_size))
488    .unwrap();
489
490    let chunk_size = (range.len() / n_parts).max(1);
491
492    assert_eq!(n_parts, (range.len() / chunk_size).max(1));
493    let bytes_rem = range.len() % chunk_size;
494
495    (0..n_parts).map(move |part_no| {
496        let (start, end) = if part_no == 0 {
497            // Download remainder length in the first chunk since it starts downloading first.
498            let end = range.start + chunk_size + bytes_rem;
499            let end = if end > range.end { range.end } else { end };
500            (range.start, end)
501        } else {
502            let start = bytes_rem + range.start + part_no * chunk_size;
503            (start, start + chunk_size)
504        };
505
506        start..end
507    })
508}
509
510/// Note: For optimal performance, `ranges` should be sorted. More generally,
511/// ranges placed next to each other should also be close in range value.
512///
513/// # Returns
514/// `[(range1, end1), (range2, end2)]`, where:
515/// * `range1` contains bytes for the ranges from `ranges[0..end1]`
516/// * `range2` contains bytes for the ranges from `ranges[end1..end2]`
517/// * etc..
518///
519/// Note that if an end value is 0, it means the range is a splitted part and should be combined.
520fn merge_ranges(ranges: &[Range<usize>]) -> impl Iterator<Item = (Range<usize>, usize)> + '_ {
521    let chunk_size = get_download_chunk_size();
522
523    let mut current_merged_range = ranges.first().map_or(0..0, Clone::clone);
524    // Number of fetched bytes excluding excess.
525    let mut current_n_bytes = current_merged_range.len();
526
527    (0..ranges.len())
528        .filter_map(move |current_idx| {
529            let current_idx = 1 + current_idx;
530
531            if current_idx == ranges.len() {
532                // No more items - flush current state.
533                Some((current_merged_range.clone(), current_idx))
534            } else {
535                let range = ranges[current_idx].clone();
536
537                let new_merged = current_merged_range.start.min(range.start)
538                    ..current_merged_range.end.max(range.end);
539
540                // E.g.:
541                // |--------|
542                //  oo        // range1
543                //       oo   // range2
544                //    ^^^     // distance = 3, is_overlapping = false
545                // E.g.:
546                // |--------|
547                //  ooooo     // range1
548                //     ooooo  // range2
549                //     ^^     // distance = 2, is_overlapping = true
550                let (distance, is_overlapping) = {
551                    let l = current_merged_range.end.min(range.end);
552                    let r = current_merged_range.start.max(range.start);
553
554                    (r.abs_diff(l), r < l)
555                };
556
557                let should_merge = is_overlapping || {
558                    let leq_current_len_dist_to_chunk_size = new_merged.len().abs_diff(chunk_size)
559                        <= current_merged_range.len().abs_diff(chunk_size);
560                    let gap_tolerance =
561                        (current_n_bytes.max(range.len()) / 8).clamp(1024 * 1024, 8 * 1024 * 1024);
562
563                    leq_current_len_dist_to_chunk_size && distance <= gap_tolerance
564                };
565
566                if should_merge {
567                    // Merge to existing range
568                    current_merged_range = new_merged;
569                    current_n_bytes += if is_overlapping {
570                        range.len() - distance
571                    } else {
572                        range.len()
573                    };
574                    None
575                } else {
576                    let out = (current_merged_range.clone(), current_idx);
577                    current_merged_range = range;
578                    current_n_bytes = current_merged_range.len();
579                    Some(out)
580                }
581            }
582        })
583        .flat_map(|x| {
584            // Split large individual ranges within the list of ranges.
585            let (range, end) = x;
586            let split = split_range(range);
587            let len = split.len();
588
589            split
590                .enumerate()
591                .map(move |(i, range)| (range, if 1 + i == len { end } else { 0 }))
592        })
593}
594
595#[cfg(test)]
596mod tests {
597
598    #[test]
599    fn test_split_range() {
600        use super::{get_download_chunk_size, split_range};
601
602        let chunk_size = get_download_chunk_size();
603
604        assert_eq!(chunk_size, 64 * 1024 * 1024);
605
606        #[allow(clippy::single_range_in_vec_init)]
607        {
608            // Round-trip empty ranges.
609            assert_eq!(split_range(0..0).collect::<Vec<_>>(), [0..0]);
610            assert_eq!(split_range(3..3).collect::<Vec<_>>(), [3..3]);
611        }
612
613        // Threshold to start splitting to 2 ranges
614        //
615        // n - chunk_size == chunk_size - n / 2
616        // n + n / 2 == 2 * chunk_size
617        // 3 * n == 4 * chunk_size
618        // n = 4 * chunk_size / 3
619        let n = 4 * chunk_size / 3;
620
621        #[allow(clippy::single_range_in_vec_init)]
622        {
623            assert_eq!(split_range(0..n).collect::<Vec<_>>(), [0..89478485]);
624        }
625
626        assert_eq!(
627            split_range(0..n + 1).collect::<Vec<_>>(),
628            [0..44739243, 44739243..89478486]
629        );
630
631        // Threshold to start splitting to 3 ranges
632        //
633        // n / 2 - chunk_size == chunk_size - n / 3
634        // n / 2 + n / 3 == 2 * chunk_size
635        // 5 * n == 12 * chunk_size
636        // n == 12 * chunk_size / 5
637        let n = 12 * chunk_size / 5;
638
639        assert_eq!(
640            split_range(0..n).collect::<Vec<_>>(),
641            [0..80530637, 80530637..161061273]
642        );
643
644        assert_eq!(
645            split_range(0..n + 1).collect::<Vec<_>>(),
646            [0..53687092, 53687092..107374183, 107374183..161061274]
647        );
648    }
649
650    #[test]
651    fn test_merge_ranges() {
652        use super::{get_download_chunk_size, merge_ranges};
653
654        let chunk_size = get_download_chunk_size();
655
656        assert_eq!(chunk_size, 64 * 1024 * 1024);
657
658        // Round-trip empty slice
659        assert_eq!(merge_ranges(&[]).collect::<Vec<_>>(), []);
660
661        // We have 1 tiny request followed by 1 huge request. They are combined as it reduces the
662        // `abs_diff()` to the `chunk_size`, but afterwards they are split to 2 evenly sized
663        // requests.
664        assert_eq!(
665            merge_ranges(&[0..1, 1..127 * 1024 * 1024]).collect::<Vec<_>>(),
666            [(0..66584576, 0), (66584576..133169152, 2)]
667        );
668
669        // <= 1MiB gap, merge
670        assert_eq!(
671            merge_ranges(&[0..1, 1024 * 1024 + 1..1024 * 1024 + 2]).collect::<Vec<_>>(),
672            [(0..1048578, 2)]
673        );
674
675        // > 1MiB gap, do not merge
676        assert_eq!(
677            merge_ranges(&[0..1, 1024 * 1024 + 2..1024 * 1024 + 3]).collect::<Vec<_>>(),
678            [(0..1, 1), (1048578..1048579, 2)]
679        );
680
681        // <= 12.5% gap, merge
682        assert_eq!(
683            merge_ranges(&[0..8, 10..11]).collect::<Vec<_>>(),
684            [(0..11, 2)]
685        );
686
687        // <= 12.5% gap relative to RHS, merge
688        assert_eq!(
689            merge_ranges(&[0..1, 3..11]).collect::<Vec<_>>(),
690            [(0..11, 2)]
691        );
692
693        // Overlapping range, merge
694        assert_eq!(
695            merge_ranges(&[0..80 * 1024 * 1024, 10 * 1024 * 1024..70 * 1024 * 1024])
696                .collect::<Vec<_>>(),
697            [(0..80 * 1024 * 1024, 2)]
698        );
699    }
700}