polars_io/cloud/concurrency/
model.rs1use std::collections::VecDeque;
11use std::time::{Duration, Instant};
12
13use crate::cloud::concurrency::IoSample;
14
15#[derive(Clone, Copy, Debug)]
18pub struct SignalStats {
19 pub bw_avg_bps: f64,
21 pub bw_hwm_bps: f64,
24 pub ttfb_min: Duration,
26 pub ttfb_avg: Duration,
27}
28
29impl SignalStats {
30 pub fn rtt_for_bdp(&self) -> Duration {
36 const QUEUING_RATIO_CAP: f64 = 10.0;
37 self.ttfb_avg.min(self.ttfb_min.mul_f64(QUEUING_RATIO_CAP))
38 }
39
40 pub fn bdp_bytes(&self) -> u64 {
41 (self.bw_hwm_bps * self.rtt_for_bdp().as_secs_f64()) as u64
42 }
43}
44
45#[derive(Debug)]
46pub struct Model {
47 samples: VecDeque<IoSample>,
50 first_sample_time: Option<Instant>,
51
52 window: Duration,
54
55 bw_hwm_bps: Option<f64>,
57 bw_hwm_last_updated: Option<Instant>,
58 signal: Option<SignalStats>,
59}
60
61impl Model {
62 pub fn new(window: Duration) -> Self {
63 Self {
64 samples: VecDeque::with_capacity(1024),
65 first_sample_time: None,
66 window,
67 bw_hwm_bps: None,
68 bw_hwm_last_updated: None,
69 signal: None,
70 }
71 }
72
73 pub fn record(&mut self, sample: IoSample) {
75 if self.first_sample_time.is_none() {
76 self.first_sample_time = Some(sample.completion_time);
77 if polars_config::config().verbose() {
78 eprintln!(
79 "[InFlightConcurrency]: observed first RTT sample: {} ms, for {} bytes",
80 sample.ttfb.as_millis(),
81 sample.n_bytes
82 )
83 }
84 }
85
86 self.samples.push_back(sample);
87 }
88
89 pub fn signal(&self) -> Option<SignalStats> {
90 self.signal }
92
93 pub fn bw_hwm_bps(&self) -> Option<f64> {
94 self.bw_hwm_bps
95 }
96
97 pub fn update(&mut self, now: Instant) {
103 const N_SAMPLE_THRESHOLD: usize = 5;
104
105 self.evict_old(Instant::now());
106
107 if self.samples.len() < N_SAMPLE_THRESHOLD {
108 self.signal = None;
109 return;
110 }
111
112 let window_start = now - self.window;
114 let mut n_bytes: u64 = 0;
115 let mut n_samples: usize = 0;
116 let mut ttfb_sum = Duration::ZERO;
117 let mut ttfb_min: Option<Duration> = None;
118
119 for s in &self.samples {
120 if s.completion_time >= window_start && s.completion_time <= now {
121 n_bytes += s.n_bytes;
122 n_samples += 1;
123 ttfb_sum += s.ttfb;
124 ttfb_min = Some(ttfb_min.map_or(s.ttfb, |m| m.min(s.ttfb)));
125 }
126 }
127
128 let (Some(ttfb_min), true) = (ttfb_min, n_bytes > 0) else {
129 self.signal = None;
130 return;
131 };
132 let ttfb_avg = ttfb_sum.div_f64(n_samples as f64);
133
134 let bw_avg_bps = n_bytes as f64 / self.window.as_secs_f64();
138
139 if self.bw_hwm_bps.is_none_or(|hwm| bw_avg_bps > hwm) {
143 self.bw_hwm_bps = Some(bw_avg_bps);
144 self.bw_hwm_last_updated = Some(now);
145 }
146 let bw_hwm_bps = self.bw_hwm_bps.unwrap();
147
148 self.signal = Some(SignalStats {
149 bw_avg_bps,
150 bw_hwm_bps,
151 ttfb_min,
152 ttfb_avg,
153 });
154 }
155
156 pub fn sample_count(&self) -> usize {
157 self.samples.len()
158 }
159
160 fn evict_old(&mut self, now: Instant) {
161 while let Some(front) = self.samples.front() {
162 if now.duration_since(front.completion_time) > self.window {
163 self.samples.pop_front();
164 } else {
165 break;
166 }
167 }
168 }
169}