polars_io/cloud/concurrency/
regime.rs1use std::time::{Duration, Instant};
4
5use crate::cloud::concurrency::model::SignalStats;
6
7#[derive(Clone, Copy, Debug, PartialEq)]
8pub enum RegimeState {
9 Init,
11 RampUp {
13 consecutive_no_growth: u32,
14 last_bw_observation: f64,
15 },
16 Stable,
18 ProbeUp {
20 started_at: Instant,
21 },
22 WarmIdle {
24 started_at: Instant,
25 },
26}
27
28impl RegimeState {
29 pub fn label(&self) -> &'static str {
30 match *self {
31 RegimeState::Init => "init",
32 RegimeState::RampUp { .. } => "ramp_up",
33 RegimeState::Stable => "stable",
34 RegimeState::ProbeUp { .. } => "probe_up",
35 RegimeState::WarmIdle { .. } => "warm_idle",
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
41pub struct Regime {
42 state: RegimeState,
43 last_transition: Instant,
44
45 rampup_growth_threshold: f64,
46 rampup_exit_rounds: u32,
47 probe_interval: Duration,
48 probe_duration: Duration,
49 warm_idle_grace: Duration,
50}
51
52impl Regime {
53 pub fn new(now: Instant) -> Self {
54 Self {
55 state: RegimeState::Init,
56 last_transition: now,
57 rampup_growth_threshold: 1.05,
58 rampup_exit_rounds: 3,
59 probe_interval: Duration::from_millis(3000),
61 probe_duration: Duration::from_millis(1000),
62 warm_idle_grace: Duration::from_millis(5000),
63 }
64 }
65
66 pub fn step(&mut self, signal: Option<SignalStats>, now: Instant) -> RegimeState {
71 let Some(sig) = signal else {
72 return match self.state {
73 RegimeState::Init => RegimeState::Init,
74 RegimeState::WarmIdle { started_at } => {
75 if now.duration_since(started_at) > self.warm_idle_grace {
76 self.transition_to(RegimeState::Init, now)
77 } else {
78 self.state
79 }
80 },
81 _ => self.transition_to(RegimeState::WarmIdle { started_at: now }, now),
82 };
83 };
84
85 match self.state {
86 RegimeState::Init => self.transition_to(
87 RegimeState::RampUp {
88 consecutive_no_growth: 0,
89 last_bw_observation: sig.bw_avg_bps,
90 },
91 now,
92 ),
93
94 RegimeState::RampUp {
95 consecutive_no_growth,
96 last_bw_observation,
97 } => {
98 let growing = sig.bw_avg_bps > last_bw_observation * self.rampup_growth_threshold;
99 let consecutive_no_growth = if growing {
100 0
101 } else {
102 consecutive_no_growth + 1
103 };
104
105 if consecutive_no_growth >= self.rampup_exit_rounds {
106 self.transition_to(RegimeState::Stable, now)
107 } else {
108 self.state = RegimeState::RampUp {
109 consecutive_no_growth,
110 last_bw_observation: sig.bw_avg_bps,
111 };
112 self.state
113 }
114 },
115
116 RegimeState::WarmIdle { .. } => self.transition_to(RegimeState::Stable, now),
117
118 RegimeState::Stable => {
119 if now.duration_since(self.last_transition) > self.probe_interval {
120 self.transition_to(RegimeState::ProbeUp { started_at: now }, now)
121 } else {
122 self.state
123 }
124 },
125
126 RegimeState::ProbeUp { started_at } => {
127 if now.duration_since(started_at) > self.probe_duration {
128 self.transition_to(RegimeState::Stable, now)
129 } else {
130 self.state
131 }
132 },
133 }
134 }
135
136 pub fn state(&self) -> &RegimeState {
137 &self.state
138 }
139
140 fn transition_to(&mut self, new: RegimeState, now: Instant) -> RegimeState {
141 let old_label = self.state.label();
142 let new_label = new.label();
143
144 if polars_config::config().verbose() && old_label != new_label {
145 eprintln!(
146 "[InFlightConcurrency] regime change from {} to {}, after {:.2}s",
147 old_label,
148 new_label,
149 now.duration_since(self.last_transition).as_secs_f64(),
150 );
151 }
152
153 self.state = new;
154 self.last_transition = now;
155 new
156 }
157}