Skip to main content

polars_io/cloud/concurrency/
mod.rs

1//! Adaptive in-flight concurrency controller for cloud IO (Input only).
2//!
3//! Admission control for concurrency uses two budgets:
4//! - A (primary) bytes-based budget to model the bandwidth-delay product (BDP)
5//! - A (secondary) count-based budget to limit the number of in-flight requests
6//!
7//! The bytes-based budget models the BDP as
8//!   BDP = BW_max * TTFB_est
9//!
10//! Three components cooperate:
11//! - Model: records IO observations and models the network (BW_max, TTFB_est, BDP)
12//! - Regime: state machine driving the admission (Init / RampUp / Stable / ProbeUp)
13//! - Admission: admission control, enforces byte-based + request-based budgets
14
15// Loosely based on BBR: Congestion-Based Congestion Control
16// see https://queue.acm.org/detail.cfm?id=3022184
17
18mod admission;
19mod model;
20mod regime;
21
22use std::num::{NonZeroU32, NonZeroU64};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26pub use admission::{InFlightBudget, InFlightPermit, InFlightStats};
27use crossbeam_queue::ArrayQueue;
28pub use model::Model;
29use polars_utils::relaxed_cell::RelaxedCell;
30pub use regime::{Regime, RegimeState};
31
32// Number of samples in the queue, which gets drained on every tick.
33// At 50k requests per second and 100 ms tick window, we need 5k.
34// kdn TODO TEST & TUNE: Refactor to a run-time config-based value.
35const SAMPLE_QUEUE_CAPACITY: usize = 8192;
36
37use crate::cloud::concurrency_config::get_random_access_chunk_size;
38
39#[derive(Clone, Copy, Debug)]
40pub struct IoSample {
41    pub n_bytes: u64,
42    // Time-to-first-byte.
43    pub ttfb: Duration,
44    // TODO: Factor out as we only care about per-tick_window stats.
45    pub completion_time: Instant,
46}
47
48#[derive(Debug, Clone)]
49pub struct ControllerConfig {
50    // Sliding window over which the most recent round-trip-time (RTT) and bandwidth (BW)
51    // will be calculated. Also acts as the retention window.
52    window: Duration,
53    // Byte-based budget during the Init phase, and as the base for the RampUp phase.
54    init_byte_budget: u64,
55    // Lower limit for the byte-based budget - needed to avoid deadlock.
56    floor_byte_budget: u64,
57    // Count-based request budget.
58    request_budget: u32,
59    // Controller regime update frequency.
60    control_interval: Duration,
61    // Total budget only resizes if the relative changes exceeds this threshold
62    budget_resize_threshold: f64,
63}
64
65impl Default for ControllerConfig {
66    fn default() -> Self {
67        // Only used for bytes-based budget.
68        let target_chunk_size = get_random_access_chunk_size() as u64;
69        Self {
70            window: Duration::from_millis(1000),
71
72            // Byte-based budget during the ramp-up phase.
73            // Starting too low results in lost opportunity (time) during ramp-up.
74            // Starting too high results in early congestion, delayed completion of the first chunk,
75            // and inflated bandwidth estimation.
76            //
77            // Some BDP numbers for reference:
78            //   1 Gbps x 20 ms = 2.5 MB
79            //   1 Gbps x 50 ms = 6.25 MB
80            //   10 Gbps x 50 ms = 62.5 MB
81            //   100 Gbps x 50 ms = 625 MB
82            init_byte_budget: get_init_byte_budget(target_chunk_size),
83
84            // Byte-based budget floor.
85            // Must be >=larger than target_chunk_size to avoid potential deadlock.
86            floor_byte_budget: target_chunk_size,
87
88            // Count-based budget, currently fixed
89            request_budget: get_request_budget(),
90            control_interval: Duration::from_millis(100),
91            budget_resize_threshold: 0.05,
92        }
93    }
94}
95
96/// Max number of bytes concurrently in flight during the init and start of rampup phase.
97fn get_init_byte_budget(target_chunk_size: u64) -> u64 {
98    let init_byte_budget = std::env::var("POLARS_INFLIGHT_INIT_BYTE_BUDGET")
99        .map(|x| {
100            x.parse::<NonZeroU64>()
101                .unwrap_or_else(|_| {
102                    panic!("invalid value for POLARS_INFLIGHT_INIT_BYTE_BUDGET: {x}")
103                })
104                .get()
105        })
106        .unwrap_or_else(|_| {
107            // This should be lower than the expected BDP so it can ramp-up, but
108            // too low a value delays the transition to stable.
109            // Heuristic: higher bandwidth is expected on larger instances.
110            let n = polars_config::config().max_threads() as u64;
111            n.div_ceil(8).max(4) * target_chunk_size
112        })
113        .max(1);
114
115    if init_byte_budget < target_chunk_size {
116        panic!("in-flight byte budget init must be larger than the target_chunk_size");
117    }
118
119    init_byte_budget
120}
121
122/// Maximum number of requests concurrently in flight.
123pub fn get_request_budget() -> u32 {
124    // Since object_store/reqwest use HTTP/1 with a connection pool, this value controls the
125    // max concurrent TCP sessions to S3 for the pipeline.
126    // When modifying this value, consider the max_thread count configuration(s), the OS limitations
127    // (e.g., ulimit -n), and any cloud infrastructure limitations.
128    std::env::var("POLARS_INFLIGHT_REQUEST_BUDGET")
129        .map(|x| {
130            x.parse::<NonZeroU32>()
131                .unwrap_or_else(|_| panic!("invalid value for POLARS_INFLIGHT_REQUEST_BUDGET: {x}"))
132                .get()
133        })
134        .unwrap_or(512)
135        .max(1)
136}
137
138#[derive(Debug)]
139pub struct ConcurrencyController {
140    config: ControllerConfig,
141    sample_queue: Arc<ArrayQueue<IoSample>>,
142    samples_dropped: Arc<RelaxedCell<u64>>,
143    inflight_budget: Arc<InFlightBudget>,
144    _control_task: tokio::task::JoinHandle<()>,
145}
146
147impl ConcurrencyController {
148    pub fn new(config: ControllerConfig) -> Self {
149        let sample_queue = Arc::new(ArrayQueue::new(SAMPLE_QUEUE_CAPACITY));
150        let samples_dropped = Arc::new(RelaxedCell::new_u64(0));
151
152        let inflight_budget = Arc::new(InFlightBudget::new(
153            config.init_byte_budget,
154            config.floor_byte_budget,
155            config.request_budget,
156        ));
157
158        let control_task = Self::spawn_control_loop(
159            sample_queue.clone(),
160            samples_dropped.clone(),
161            inflight_budget.clone(),
162            config.clone(),
163        );
164
165        Self {
166            config,
167            sample_queue,
168            samples_dropped,
169            inflight_budget,
170            _control_task: control_task,
171        }
172    }
173
174    pub fn config(&self) -> &ControllerConfig {
175        &self.config
176    }
177
178    /// Record a completed IO. Hot path.
179    pub fn record_io(&self, sample: IoSample) {
180        if self.sample_queue.push(sample).is_err() {
181            // Queue full: drop. Samples are statistics is considered acceptable.
182            self.samples_dropped.fetch_add(1);
183        }
184    }
185
186    pub fn inflight_budget(&self) -> &Arc<InFlightBudget> {
187        &self.inflight_budget
188    }
189
190    pub async fn acquire(&self, bytes: u64) -> InFlightPermit {
191        self.inflight_budget.acquire(bytes).await
192    }
193
194    fn spawn_control_loop(
195        sample_queue: Arc<ArrayQueue<IoSample>>,
196        samples_dropped: Arc<RelaxedCell<u64>>,
197        admission: Arc<InFlightBudget>,
198        config: ControllerConfig,
199    ) -> tokio::task::JoinHandle<()> {
200        if polars_config::config().verbose() {
201            eprintln!(
202                "[InFlightConcurrency]: spawn control loop: control_interval: {}ms",
203                config.control_interval.as_millis()
204            );
205        }
206        tokio::spawn(async move {
207            let mut model = Model::new(config.window);
208            let mut regime = Regime::new(Instant::now());
209
210            let mut ticker = tokio::time::interval(config.control_interval);
211            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
212
213            loop {
214                ticker.tick().await;
215                let now = Instant::now();
216
217                // Update model statistics and step regime.
218                let (state, signal, dropped, bw_hwm_held) = {
219                    for _ in 0..SAMPLE_QUEUE_CAPACITY {
220                        let Some(s) = sample_queue.pop() else { break };
221                        model.record(s);
222                    }
223                    let dropped = samples_dropped.swap(0);
224                    model.update(now);
225                    let signal = model.signal();
226                    let state = regime.step(signal, now);
227                    let bw_hwm_held = model.bw_hwm_bps();
228                    (state, signal, dropped, bw_hwm_held)
229                };
230
231                if !matches!(state, RegimeState::WarmIdle { .. }) {
232                    // Compute base BDP
233                    let base_budget = match (state, signal) {
234                        (RegimeState::Init, _) | (_, None) => config.init_byte_budget,
235                        (_, Some(signal)) => signal.bdp_bytes().max(config.init_byte_budget),
236                    };
237
238                    // Compute target BDP using the gain multiplier. This is similar to BBR cwnd_gain.
239                    let gain = match state {
240                        RegimeState::Init => 1.0,
241                        RegimeState::RampUp { .. } => 2.0,
242                        // NOTE: >> 1.0 for the purpose of absorbing environment noise.
243                        RegimeState::Stable => 2.0,
244                        RegimeState::ProbeUp { .. } => 3.0,
245                        // Unreachable.
246                        RegimeState::WarmIdle { .. } => 1.0,
247                    };
248                    let target_budget = (base_budget as f64 * gain) as u64;
249
250                    // Resize if needed.
251                    let current_byte_budget = admission.current_byte_budget();
252                    let threshold = config.budget_resize_threshold;
253                    let should_resize = match current_byte_budget {
254                        0 => target_budget > 0,
255                        current => {
256                            let ratio = target_budget as f64 / current as f64;
257                            ratio < (1.0 - threshold) || ratio > (1.0 + threshold)
258                        },
259                    };
260
261                    if should_resize {
262                        admission.resize_byte_budget(target_budget);
263                    }
264                }
265
266                // Log snapshot.
267                if std::env::var("POLARS_LOG_CONCURRENCY").is_ok() {
268                    let stats = admission.stats();
269                    eprintln!(
270                        "[InFlightConcurrency {}] regime={}, \
271                        bw_hwm={:.1} MB/s, \
272                        bw_avg={:.1} MB/s, \
273                        rtt_min={:.1} ms, \
274                        rtt_avg={:.1} ms, \
275                        bdp_obs={:.1} MB, \
276                        bytes_budget={:.1} MB, \
277                        bytes_in_use={:.1} MB, \
278                        bytes_sat={:.2}, \
279                        req_budget={}, \
280                        req_in_use={}, \
281                        req_sat={:.2}",
282                        chrono::Utc::now(),
283                        state.label(),
284                        signal.map(|s| s.bw_hwm_bps).or(bw_hwm_held).unwrap_or(0.0) / 1e6,
285                        signal.map_or(0.0, |s| s.bw_avg_bps) / 1e6,
286                        signal.map_or(0, |s| s.ttfb_min.as_millis()),
287                        signal.map_or(0, |s| s.ttfb_avg.as_millis()),
288                        signal.map_or(0, |s| s.bdp_bytes()) as f64 / 1e6,
289                        stats.bytes_budget as f64 / 1e6,
290                        stats.bytes_in_use as f64 / 1e6,
291                        stats.bytes_saturation,
292                        stats.request_budget,
293                        stats.requests_in_use,
294                        stats.requests_saturation
295                    );
296                    if dropped > 0 {
297                        eprintln!(
298                            "[InFlightConcurrency] WARN: {dropped} samples dropped (queue full)"
299                        );
300                    }
301                }
302            }
303        })
304    }
305}
306
307impl Drop for ConcurrencyController {
308    fn drop(&mut self) {
309        self._control_task.abort();
310    }
311}