Skip to main content

polars_io/cloud/concurrency/
model.rs

1//! Model: model the network parameters using IOSamples
2//!
3//! Parameters:
4//! - BW_max: maximum bandwidth
5//! - RTT_min: minimum round-trip time, based on TTFB (time-to-first-byte)
6//!
7//! Derived parameter:
8//! - BDP: bandwidth-delay product
9
10use std::collections::VecDeque;
11use std::time::{Duration, Instant};
12
13use crate::cloud::concurrency::IoSample;
14
15/// Snapshot of the network signal, recomputed each `update()`.
16/// If this exists, all fields are populated and mutually consistent.
17#[derive(Clone, Copy, Debug)]
18pub struct SignalStats {
19    /// Windowed average bandwidth.
20    pub bw_avg_bps: f64,
21    /// All-time high-watermark bandwidth (snapshotted; the live value
22    /// persists in Model across signal loss).
23    pub bw_hwm_bps: f64,
24    /// Windowed TTFB statistics.
25    pub ttfb_min: Duration,
26    pub ttfb_avg: Duration,
27}
28
29impl SignalStats {
30    /// RTT used to compute BDP.
31    ///
32    /// Average TTFB, capped at `QUEUING_RATIO_CAP × ttfb_min`. The cap
33    /// prevents the BDP estimate from spiraling on self-induced queuing
34    /// while still letting the budget grow with bw_hwm.
35    pub fn rtt_for_bdp(&self) -> Duration {
36        const QUEUING_RATIO_CAP: f64 = 10.0;
37        self.ttfb_avg.min(self.ttfb_min.mul_f64(QUEUING_RATIO_CAP))
38    }
39
40    pub fn bdp_bytes(&self) -> u64 {
41        (self.bw_hwm_bps * self.rtt_for_bdp().as_secs_f64()) as u64
42    }
43}
44
45#[derive(Debug)]
46pub struct Model {
47    // Collect and retain samples
48    // TODO: Introduce 'tick buckets' and move to ring-buffer of bucket-based stats.
49    samples: VecDeque<IoSample>,
50    first_sample_time: Option<Instant>,
51
52    // Model parameters for estimation and lifecycle management.
53    window: Duration,
54
55    // Statistics.
56    bw_hwm_bps: Option<f64>,
57    bw_hwm_last_updated: Option<Instant>,
58    signal: Option<SignalStats>,
59}
60
61impl Model {
62    pub fn new(window: Duration) -> Self {
63        Self {
64            samples: VecDeque::with_capacity(1024),
65            first_sample_time: None,
66            window,
67            bw_hwm_bps: None,
68            bw_hwm_last_updated: None,
69            signal: None,
70        }
71    }
72
73    /// Record a completed IO. Hot path.
74    pub fn record(&mut self, sample: IoSample) {
75        if self.first_sample_time.is_none() {
76            self.first_sample_time = Some(sample.completion_time);
77            if polars_config::config().verbose() {
78                eprintln!(
79                    "[InFlightConcurrency]: observed first RTT sample: {} ms, for {} bytes",
80                    sample.ttfb.as_millis(),
81                    sample.n_bytes
82                )
83            }
84        }
85
86        self.samples.push_back(sample);
87    }
88
89    pub fn signal(&self) -> Option<SignalStats> {
90        self.signal // Copy
91    }
92
93    pub fn bw_hwm_bps(&self) -> Option<f64> {
94        self.bw_hwm_bps
95    }
96
97    /// Recompute statistics from current samples.
98    /// Returns true if the model contains a valid signal.
99    // @TODO. The regime should account for an 'app-limited' state, to
100    // account for slow inbound rate not caused by the upstream network,
101    // but caused by the downstream processing backpressure (see also BBR paper).
102    pub fn update(&mut self, now: Instant) {
103        const N_SAMPLE_THRESHOLD: usize = 5;
104
105        self.evict_old(Instant::now());
106
107        if self.samples.len() < N_SAMPLE_THRESHOLD {
108            self.signal = None;
109            return;
110        }
111
112        // Single pass over the window (eviction is lazy, so still filter).
113        let window_start = now - self.window;
114        let mut n_bytes: u64 = 0;
115        let mut n_samples: usize = 0;
116        let mut ttfb_sum = Duration::ZERO;
117        let mut ttfb_min: Option<Duration> = None;
118
119        for s in &self.samples {
120            if s.completion_time >= window_start && s.completion_time <= now {
121                n_bytes += s.n_bytes;
122                n_samples += 1;
123                ttfb_sum += s.ttfb;
124                ttfb_min = Some(ttfb_min.map_or(s.ttfb, |m| m.min(s.ttfb)));
125            }
126        }
127
128        let (Some(ttfb_min), true) = (ttfb_min, n_bytes > 0) else {
129            self.signal = None;
130            return;
131        };
132        let ttfb_avg = ttfb_sum.div_f64(n_samples as f64);
133
134        // Rate against fixed window duration; bursty traffic may under-report,
135        // but the HWM tracks the peak.
136        // TODO: Improve accuracy by spreading arrival volume over ttlb-ttfb duration.
137        let bw_avg_bps = n_bytes as f64 / self.window.as_secs_f64();
138
139        // All-time high-water-mark (HWM).
140        // TODO: Some form of HWM expiration or decay would be in order.
141        // In the current implementation, HWM can only go up.
142        if self.bw_hwm_bps.is_none_or(|hwm| bw_avg_bps > hwm) {
143            self.bw_hwm_bps = Some(bw_avg_bps);
144            self.bw_hwm_last_updated = Some(now);
145        }
146        let bw_hwm_bps = self.bw_hwm_bps.unwrap();
147
148        self.signal = Some(SignalStats {
149            bw_avg_bps,
150            bw_hwm_bps,
151            ttfb_min,
152            ttfb_avg,
153        });
154    }
155
156    pub fn sample_count(&self) -> usize {
157        self.samples.len()
158    }
159
160    fn evict_old(&mut self, now: Instant) {
161        while let Some(front) = self.samples.front() {
162            if now.duration_since(front.completion_time) > self.window {
163                self.samples.pop_front();
164            } else {
165                break;
166            }
167        }
168    }
169}