Skip to main content

polars_io/cloud/concurrency/
regime.rs

1//! Regime: state machine driving the admission control behavior and parameters.
2
3use std::time::{Duration, Instant};
4
5use crate::cloud::concurrency::model::SignalStats;
6
7#[derive(Clone, Copy, Debug, PartialEq)]
8pub enum RegimeState {
9    // Starting state.
10    Init,
11    // Rapid increase to gauge the max_bandwidth.
12    RampUp {
13        consecutive_no_growth: u32,
14        last_bw_observation: f64,
15    },
16    // Nominal steady state.
17    Stable,
18    // Actively sense for higher bandwidth.
19    ProbeUp {
20        started_at: Instant,
21    },
22    // No signal available, but the prior model still applies.
23    WarmIdle {
24        started_at: Instant,
25    },
26}
27
28impl RegimeState {
29    pub fn label(&self) -> &'static str {
30        match *self {
31            RegimeState::Init => "init",
32            RegimeState::RampUp { .. } => "ramp_up",
33            RegimeState::Stable => "stable",
34            RegimeState::ProbeUp { .. } => "probe_up",
35            RegimeState::WarmIdle { .. } => "warm_idle",
36        }
37    }
38}
39
40#[derive(Debug, Clone)]
41pub struct Regime {
42    state: RegimeState,
43    last_transition: Instant,
44
45    rampup_growth_threshold: f64,
46    rampup_exit_rounds: u32,
47    probe_interval: Duration,
48    probe_duration: Duration,
49    warm_idle_grace: Duration,
50}
51
52impl Regime {
53    pub fn new(now: Instant) -> Self {
54        Self {
55            state: RegimeState::Init,
56            last_transition: now,
57            rampup_growth_threshold: 1.05,
58            rampup_exit_rounds: 3,
59            // Interval between ProbeUp spikes
60            probe_interval: Duration::from_millis(3000),
61            probe_duration: Duration::from_millis(1000),
62            warm_idle_grace: Duration::from_millis(5000),
63        }
64    }
65
66    // TODO: Add sensing-based app-limited state (see BBR paper). This state avoids model regression in
67    // when bandwidth is artificially constrained by the downstream backpressure kicking in (e.g., from
68    // decode or from the streaming engine execution).
69    // TODO: Add a ProbeDown state if needed.
70    pub fn step(&mut self, signal: Option<SignalStats>, now: Instant) -> RegimeState {
71        let Some(sig) = signal else {
72            return match self.state {
73                RegimeState::Init => RegimeState::Init,
74                RegimeState::WarmIdle { started_at } => {
75                    if now.duration_since(started_at) > self.warm_idle_grace {
76                        self.transition_to(RegimeState::Init, now)
77                    } else {
78                        self.state
79                    }
80                },
81                _ => self.transition_to(RegimeState::WarmIdle { started_at: now }, now),
82            };
83        };
84
85        match self.state {
86            RegimeState::Init => self.transition_to(
87                RegimeState::RampUp {
88                    consecutive_no_growth: 0,
89                    last_bw_observation: sig.bw_avg_bps,
90                },
91                now,
92            ),
93
94            RegimeState::RampUp {
95                consecutive_no_growth,
96                last_bw_observation,
97            } => {
98                let growing = sig.bw_avg_bps > last_bw_observation * self.rampup_growth_threshold;
99                let consecutive_no_growth = if growing {
100                    0
101                } else {
102                    consecutive_no_growth + 1
103                };
104
105                if consecutive_no_growth >= self.rampup_exit_rounds {
106                    self.transition_to(RegimeState::Stable, now)
107                } else {
108                    self.state = RegimeState::RampUp {
109                        consecutive_no_growth,
110                        last_bw_observation: sig.bw_avg_bps,
111                    };
112                    self.state
113                }
114            },
115
116            RegimeState::WarmIdle { .. } => self.transition_to(RegimeState::Stable, now),
117
118            RegimeState::Stable => {
119                if now.duration_since(self.last_transition) > self.probe_interval {
120                    self.transition_to(RegimeState::ProbeUp { started_at: now }, now)
121                } else {
122                    self.state
123                }
124            },
125
126            RegimeState::ProbeUp { started_at } => {
127                if now.duration_since(started_at) > self.probe_duration {
128                    self.transition_to(RegimeState::Stable, now)
129                } else {
130                    self.state
131                }
132            },
133        }
134    }
135
136    pub fn state(&self) -> &RegimeState {
137        &self.state
138    }
139
140    fn transition_to(&mut self, new: RegimeState, now: Instant) -> RegimeState {
141        let old_label = self.state.label();
142        let new_label = new.label();
143
144        if polars_config::config().verbose() && old_label != new_label {
145            eprintln!(
146                "[InFlightConcurrency] regime change from {} to {}, after {:.2}s",
147                old_label,
148                new_label,
149                now.duration_since(self.last_transition).as_secs_f64(),
150            );
151        }
152
153        self.state = new;
154        self.last_transition = now;
155        new
156    }
157}