polars_io/cloud/concurrency/
mod.rs1mod admission;
19mod model;
20mod regime;
21
22use std::num::{NonZeroU32, NonZeroU64};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26pub use admission::{InFlightBudget, InFlightPermit, InFlightStats};
27use crossbeam_queue::ArrayQueue;
28pub use model::Model;
29use polars_utils::relaxed_cell::RelaxedCell;
30pub use regime::{Regime, RegimeState};
31
32const SAMPLE_QUEUE_CAPACITY: usize = 8192;
36
37use crate::cloud::concurrency_config::get_random_access_chunk_size;
38
39#[derive(Clone, Copy, Debug)]
40pub struct IoSample {
41 pub n_bytes: u64,
42 pub ttfb: Duration,
44 pub completion_time: Instant,
46}
47
48#[derive(Debug, Clone)]
49pub struct ControllerConfig {
50 window: Duration,
53 init_byte_budget: u64,
55 floor_byte_budget: u64,
57 request_budget: u32,
59 control_interval: Duration,
61 budget_resize_threshold: f64,
63}
64
65impl Default for ControllerConfig {
66 fn default() -> Self {
67 let target_chunk_size = get_random_access_chunk_size() as u64;
69 Self {
70 window: Duration::from_millis(1000),
71
72 init_byte_budget: get_init_byte_budget(target_chunk_size),
83
84 floor_byte_budget: target_chunk_size,
87
88 request_budget: get_request_budget(),
90 control_interval: Duration::from_millis(100),
91 budget_resize_threshold: 0.05,
92 }
93 }
94}
95
96fn get_init_byte_budget(target_chunk_size: u64) -> u64 {
98 let init_byte_budget = std::env::var("POLARS_INFLIGHT_INIT_BYTE_BUDGET")
99 .map(|x| {
100 x.parse::<NonZeroU64>()
101 .unwrap_or_else(|_| {
102 panic!("invalid value for POLARS_INFLIGHT_INIT_BYTE_BUDGET: {x}")
103 })
104 .get()
105 })
106 .unwrap_or_else(|_| {
107 let n = polars_config::config().max_threads() as u64;
111 n.div_ceil(8).max(4) * target_chunk_size
112 })
113 .max(1);
114
115 if init_byte_budget < target_chunk_size {
116 panic!("in-flight byte budget init must be larger than the target_chunk_size");
117 }
118
119 init_byte_budget
120}
121
122pub fn get_request_budget() -> u32 {
124 std::env::var("POLARS_INFLIGHT_REQUEST_BUDGET")
129 .map(|x| {
130 x.parse::<NonZeroU32>()
131 .unwrap_or_else(|_| panic!("invalid value for POLARS_INFLIGHT_REQUEST_BUDGET: {x}"))
132 .get()
133 })
134 .unwrap_or(512)
135 .max(1)
136}
137
138#[derive(Debug)]
139pub struct ConcurrencyController {
140 config: ControllerConfig,
141 sample_queue: Arc<ArrayQueue<IoSample>>,
142 samples_dropped: Arc<RelaxedCell<u64>>,
143 inflight_budget: Arc<InFlightBudget>,
144 _control_task: tokio::task::JoinHandle<()>,
145}
146
147impl ConcurrencyController {
148 pub fn new(config: ControllerConfig) -> Self {
149 let sample_queue = Arc::new(ArrayQueue::new(SAMPLE_QUEUE_CAPACITY));
150 let samples_dropped = Arc::new(RelaxedCell::new_u64(0));
151
152 let inflight_budget = Arc::new(InFlightBudget::new(
153 config.init_byte_budget,
154 config.floor_byte_budget,
155 config.request_budget,
156 ));
157
158 let control_task = Self::spawn_control_loop(
159 sample_queue.clone(),
160 samples_dropped.clone(),
161 inflight_budget.clone(),
162 config.clone(),
163 );
164
165 Self {
166 config,
167 sample_queue,
168 samples_dropped,
169 inflight_budget,
170 _control_task: control_task,
171 }
172 }
173
174 pub fn config(&self) -> &ControllerConfig {
175 &self.config
176 }
177
178 pub fn record_io(&self, sample: IoSample) {
180 if self.sample_queue.push(sample).is_err() {
181 self.samples_dropped.fetch_add(1);
183 }
184 }
185
186 pub fn inflight_budget(&self) -> &Arc<InFlightBudget> {
187 &self.inflight_budget
188 }
189
190 pub async fn acquire(&self, bytes: u64) -> InFlightPermit {
191 self.inflight_budget.acquire(bytes).await
192 }
193
194 fn spawn_control_loop(
195 sample_queue: Arc<ArrayQueue<IoSample>>,
196 samples_dropped: Arc<RelaxedCell<u64>>,
197 admission: Arc<InFlightBudget>,
198 config: ControllerConfig,
199 ) -> tokio::task::JoinHandle<()> {
200 if polars_config::config().verbose() {
201 eprintln!(
202 "[InFlightConcurrency]: spawn control loop: control_interval: {}ms",
203 config.control_interval.as_millis()
204 );
205 }
206 tokio::spawn(async move {
207 let mut model = Model::new(config.window);
208 let mut regime = Regime::new(Instant::now());
209
210 let mut ticker = tokio::time::interval(config.control_interval);
211 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
212
213 loop {
214 ticker.tick().await;
215 let now = Instant::now();
216
217 let (state, signal, dropped, bw_hwm_held) = {
219 for _ in 0..SAMPLE_QUEUE_CAPACITY {
220 let Some(s) = sample_queue.pop() else { break };
221 model.record(s);
222 }
223 let dropped = samples_dropped.swap(0);
224 model.update(now);
225 let signal = model.signal();
226 let state = regime.step(signal, now);
227 let bw_hwm_held = model.bw_hwm_bps();
228 (state, signal, dropped, bw_hwm_held)
229 };
230
231 if !matches!(state, RegimeState::WarmIdle { .. }) {
232 let base_budget = match (state, signal) {
234 (RegimeState::Init, _) | (_, None) => config.init_byte_budget,
235 (_, Some(signal)) => signal.bdp_bytes().max(config.init_byte_budget),
236 };
237
238 let gain = match state {
240 RegimeState::Init => 1.0,
241 RegimeState::RampUp { .. } => 2.0,
242 RegimeState::Stable => 2.0,
244 RegimeState::ProbeUp { .. } => 3.0,
245 RegimeState::WarmIdle { .. } => 1.0,
247 };
248 let target_budget = (base_budget as f64 * gain) as u64;
249
250 let current_byte_budget = admission.current_byte_budget();
252 let threshold = config.budget_resize_threshold;
253 let should_resize = match current_byte_budget {
254 0 => target_budget > 0,
255 current => {
256 let ratio = target_budget as f64 / current as f64;
257 ratio < (1.0 - threshold) || ratio > (1.0 + threshold)
258 },
259 };
260
261 if should_resize {
262 admission.resize_byte_budget(target_budget);
263 }
264 }
265
266 if std::env::var("POLARS_LOG_CONCURRENCY").is_ok() {
268 let stats = admission.stats();
269 eprintln!(
270 "[InFlightConcurrency {}] regime={}, \
271 bw_hwm={:.1} MB/s, \
272 bw_avg={:.1} MB/s, \
273 rtt_min={:.1} ms, \
274 rtt_avg={:.1} ms, \
275 bdp_obs={:.1} MB, \
276 bytes_budget={:.1} MB, \
277 bytes_in_use={:.1} MB, \
278 bytes_sat={:.2}, \
279 req_budget={}, \
280 req_in_use={}, \
281 req_sat={:.2}",
282 chrono::Utc::now(),
283 state.label(),
284 signal.map(|s| s.bw_hwm_bps).or(bw_hwm_held).unwrap_or(0.0) / 1e6,
285 signal.map_or(0.0, |s| s.bw_avg_bps) / 1e6,
286 signal.map_or(0, |s| s.ttfb_min.as_millis()),
287 signal.map_or(0, |s| s.ttfb_avg.as_millis()),
288 signal.map_or(0, |s| s.bdp_bytes()) as f64 / 1e6,
289 stats.bytes_budget as f64 / 1e6,
290 stats.bytes_in_use as f64 / 1e6,
291 stats.bytes_saturation,
292 stats.request_budget,
293 stats.requests_in_use,
294 stats.requests_saturation
295 );
296 if dropped > 0 {
297 eprintln!(
298 "[InFlightConcurrency] WARN: {dropped} samples dropped (queue full)"
299 );
300 }
301 }
302 }
303 })
304 }
305}
306
307impl Drop for ConcurrencyController {
308 fn drop(&mut self) {
309 self._control_task.abort();
310 }
311}