Skip to main content

polars_io/cloud/concurrency/
admission.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
5
6/// Byte-granularity budget with dynamic resize.
7///
8/// Uses an atomic counter + Notify rather than tokio::Semaphore because
9/// we need instant shrink semantics (tokio::Semaphore requires "stealing"
10/// permits by acquiring them, which blocks under saturation).
11#[derive(Debug)]
12pub(super) struct ByteBudget {
13    // Current budget, measured in number of bytes, reflecting the maximum
14    // allowed in-flight volume.
15    current_budget: AtomicU64,
16    // Lowest allowed budget for the current_budget.
17    floor_budget: u64,
18    // Volume in use for in-flight traffic, as allowed by the current_budget.
19    inflight_in_use: AtomicU64,
20    waiters: Notify,
21}
22
23impl ByteBudget {
24    fn new(initial: u64, floor_budget: u64) -> Self {
25        Self {
26            current_budget: AtomicU64::new(initial),
27            floor_budget,
28            inflight_in_use: AtomicU64::new(0),
29            waiters: Notify::new(),
30        }
31    }
32
33    /// Acquire a bytes-based permit. The call site is responsible for capping the
34    /// request size to prevent deadlock.
35    async fn acquire_strict(&self, n_bytes: u64) {
36        // Pre-empt deadlock.
37        assert!(n_bytes <= self.floor_budget);
38
39        // NOTE: Large waiters can starve under sustained small-request load.
40        // In practice, this may not be material issue.
41        loop {
42            let cap = self.current_budget.load(Ordering::Acquire);
43            let inflight = self.inflight_in_use.load(Ordering::Acquire);
44
45            if inflight + n_bytes <= cap {
46                if self
47                    .inflight_in_use
48                    .compare_exchange_weak(
49                        inflight,
50                        inflight + n_bytes,
51                        Ordering::AcqRel,
52                        Ordering::Relaxed,
53                    )
54                    .is_ok()
55                {
56                    // Fits. There may be leftover capacity for the next
57                    // waiter (e.g. one big release satisfying several small
58                    // acquires), so keep the wake chain alive — but only
59                    // because progress occurred.
60                    self.waiters.notify_one();
61                    return;
62                }
63                continue;
64            }
65
66            // Doesn't fit: register, re-check, park.
67            let notified = self.waiters.notified();
68            let cap = self.current_budget.load(Ordering::Acquire);
69            let inflight = self.inflight_in_use.load(Ordering::Acquire);
70            if inflight + n_bytes <= cap {
71                continue;
72            }
73            notified.await;
74        }
75    }
76
77    fn release(&self, bytes: u64) {
78        self.inflight_in_use.fetch_sub(bytes, Ordering::AcqRel);
79        self.waiters.notify_one();
80    }
81
82    fn resize(&self, new: u64) {
83        let new = new.max(self.floor_budget);
84        let old = self.current_budget.swap(new, Ordering::AcqRel);
85        if new > old {
86            // Grow: maybe someone can now proceed.
87            self.waiters.notify_waiters();
88        }
89    }
90
91    fn current_budget(&self) -> u64 {
92        self.current_budget.load(Ordering::Relaxed)
93    }
94
95    fn floor_byte_budget(&self) -> u64 {
96        self.floor_budget
97    }
98
99    fn inflight_in_use(&self) -> u64 {
100        self.inflight_in_use.load(Ordering::Relaxed)
101    }
102}
103
104/// Request_count-based budget with fixed size.
105#[derive(Debug)]
106pub(super) struct RequestBudget {
107    budget: usize,
108    semaphore: Arc<Semaphore>,
109    // // Budget in use for in-flight traffic.
110    // inflight: AtomicU64,
111}
112
113impl RequestBudget {
114    fn new(budget: usize) -> Self {
115        Self {
116            budget,
117            semaphore: Arc::new(Semaphore::new(budget)),
118        }
119    }
120}
121
122#[derive(Clone, Copy, Debug)]
123pub struct InFlightStats {
124    pub bytes_budget: u64,
125    pub bytes_in_use: u64,
126    // May exceed 1.0 transiently after a budget shrink, while
127    // previously-admitted traffic drains. Expected, not a bug.
128    pub bytes_saturation: f64,
129    pub request_budget: usize,
130    pub requests_in_use: usize,
131    pub requests_saturation: f64,
132}
133
134#[derive(Debug)]
135pub struct InFlightBudget {
136    byte_budget: Arc<ByteBudget>,
137    request_budget: Arc<RequestBudget>,
138}
139
140impl InFlightBudget {
141    pub fn new(
142        initial_byte_budget: u64,
143        floor_byte_budget: u64,
144        initial_request_budget: u32,
145    ) -> Self {
146        let inflight_budget = Self {
147            byte_budget: Arc::new(ByteBudget::new(initial_byte_budget, floor_byte_budget)),
148            request_budget: Arc::new(RequestBudget::new(initial_request_budget as usize)),
149        };
150
151        if polars_config::config().verbose() {
152            eprintln!(
153                "[InFlightConcurrency]: \
154                initial_byte_budget: {}, \
155                floor_byte_budget: {}, \
156                request_budget: {}",
157                initial_byte_budget, floor_byte_budget, initial_request_budget
158            );
159        }
160
161        inflight_budget
162    }
163
164    pub async fn acquire(self: &Arc<Self>, n_bytes: u64) -> InFlightPermit {
165        // NOTE: since chunk_size is a target, merge_ranges and split_ranges may overshoot
166        // the floor. Cap'ing prevents deadlock at the expense of memory management precision.
167        let n_bytes = n_bytes.min(self.byte_budget.floor_byte_budget());
168
169        // Byte budget (may wait). Cancel-safe internally.
170        self.byte_budget.acquire_strict(n_bytes).await;
171
172        // Guard immediately — synchronous, so there's no cancellation
173        // window between reservation and guard.
174        let bytes = ByteReservation {
175            budget: self.byte_budget.clone(),
176            n_bytes,
177        };
178
179        // Request permit. If we're cancelled here, `bytes` drops and
180        // releases the reservation (and notifies a waiter).
181        let req_permit = self
182            .request_budget
183            .semaphore
184            .clone()
185            .acquire_owned()
186            .await
187            .expect("semaphore closed");
188
189        InFlightPermit {
190            _byte_reservation: bytes,
191            _req_permit: req_permit,
192        }
193    }
194
195    pub fn current_byte_budget(&self) -> u64 {
196        self.byte_budget.current_budget()
197    }
198
199    pub fn floor_byte_budget(&self) -> u64 {
200        self.byte_budget.floor_byte_budget()
201    }
202
203    pub fn resize_byte_budget(&self, new: u64) {
204        self.byte_budget.resize(new);
205    }
206
207    pub fn stats(&self) -> InFlightStats {
208        let bytes_budget = self.byte_budget.current_budget();
209        let bytes_in_use = self.byte_budget.inflight_in_use();
210        let bytes_saturation = if bytes_budget > 0 {
211            bytes_in_use as f64 / bytes_budget as f64
212        } else {
213            0.0
214        };
215
216        let request_budget = self.request_budget.budget;
217        let requests_in_use = request_budget - self.request_budget.semaphore.available_permits();
218        let requests_saturation = if request_budget > 0 {
219            requests_in_use as f64 / request_budget as f64
220        } else {
221            0.0
222        };
223
224        InFlightStats {
225            bytes_budget,
226            bytes_in_use,
227            bytes_saturation,
228            request_budget,
229            requests_in_use,
230            requests_saturation,
231        }
232    }
233}
234
235struct ByteReservation {
236    budget: Arc<ByteBudget>,
237    n_bytes: u64,
238}
239
240impl Drop for ByteReservation {
241    fn drop(&mut self) {
242        self.budget.release(self.n_bytes);
243    }
244}
245
246pub struct InFlightPermit {
247    _byte_reservation: ByteReservation,
248    _req_permit: OwnedSemaphorePermit,
249}