Skip to main content

polars_io/cloud/
concurrency_config.rs

1use std::sync::LazyLock;
2
3use polars_core::config;
4
5/// Used to determine chunks when splitting large ranges, or combining small
6/// ranges.
7static DOWNLOAD_CHUNK_SIZE: LazyLock<usize> = LazyLock::new(|| {
8    let v: usize = std::env::var("POLARS_DOWNLOAD_CHUNK_SIZE")
9        .as_deref()
10        .map(|x| x.parse().expect("integer"))
11        .unwrap_or(64 * 1024 * 1024);
12
13    if config::verbose() {
14        eprintln!("async download_chunk_size: {v}")
15    }
16
17    v
18});
19
20static RANDOM_ACCESS_CHUNK_SIZE: LazyLock<usize> = LazyLock::new(|| {
21    let v = std::env::var("POLARS_DOWNLOAD_CHUNK_SIZE_RANDOM_ACCESS")
22        .as_deref()
23        .map(|x| x.parse().expect("integer"))
24        .unwrap_or(8 * 1024 * 1024);
25
26    if config::verbose() {
27        eprintln!("async download_chunk_size_random_access: {v}")
28    }
29
30    v
31});
32
33static STREAMING_CHUNK_SIZE: LazyLock<usize> = LazyLock::new(|| {
34    let v = std::env::var("POLARS_DOWNLOAD_CHUNK_SIZE_STREAMING")
35        .as_deref()
36        .map(|x| x.parse().expect("integer"))
37        .unwrap_or(32 * 1024 * 1024);
38
39    if config::verbose() {
40        eprintln!("async download_chunk_size_streaming: {v}")
41    }
42
43    v
44});
45
46pub fn get_download_chunk_size() -> usize {
47    *DOWNLOAD_CHUNK_SIZE
48}
49
50pub fn get_random_access_chunk_size() -> usize {
51    *RANDOM_ACCESS_CHUNK_SIZE
52}
53pub fn get_streaming_chunk_size() -> usize {
54    *STREAMING_CHUNK_SIZE
55}
56
57/// Determines how in-flight concurrency for access to the back-end store is handled.
58#[derive(Clone, Debug, Copy)]
59pub enum ConcurrencyStrategy {
60    /// (Almost) no in-flight concurrency control.
61    /// Warning: this may result in an unbounded API call rate. Only use in the
62    /// context of a rate-limited pipeline.
63    Unbounded,
64    /// In-flight concurrency control using a semi-static count-based budget.
65    /// NOTE: This is a legacy strategy which does not scale up to the full potential.
66    Legacy,
67    /// In-flight concurrency control using a dynamically sensed bytes-budget, backed by
68    /// a count-budget as fallback.
69    BytesBased,
70}
71
72#[derive(Clone, Copy, Debug)]
73pub struct FetchConfig {
74    pub chunk_size: usize,
75    pub strategy: ConcurrencyStrategy,
76}
77
78impl FetchConfig {
79    /// Use for cloud-based file formats that are randomly accessible, i.e. individual
80    /// row groups (or record batches) and/or individual columns can be fetched
81    /// directly using the metadata as an input. Example: Parquet, IPC with internal
82    /// extensions.
83    ///
84    /// The chunk_size should be small enough to enable smooth operation of the
85    /// bytes-based in-flight concurrency controller.
86    pub fn random_access() -> Self {
87        Self {
88            chunk_size: get_random_access_chunk_size(),
89            strategy: ConcurrencyStrategy::BytesBased,
90        }
91    }
92
93    /// Use for cloud-based file formats that have a sequential layout, i.e. the file bytes
94    /// must be fetched and parsed sequentially. The pipeline is responsible for
95    /// managing back-pressure and rate-limiting. Example: CSV.
96    pub fn streaming() -> Self {
97        Self {
98            chunk_size: get_streaming_chunk_size(),
99            // TODO: For now - keep as Legacy. Switch to Unbounded in a future PR.
100            strategy: ConcurrencyStrategy::Legacy,
101        }
102    }
103
104    /// Used for legacy fetch.
105    /// @TODO: Deprecate over time.
106    pub fn legacy() -> Self {
107        Self {
108            chunk_size: get_download_chunk_size(),
109            strategy: ConcurrencyStrategy::Legacy,
110        }
111    }
112}