polars_io/cloud/concurrency/
admission.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
5
6#[derive(Debug)]
12pub(super) struct ByteBudget {
13 current_budget: AtomicU64,
16 floor_budget: u64,
18 inflight_in_use: AtomicU64,
20 waiters: Notify,
21}
22
23impl ByteBudget {
24 fn new(initial: u64, floor_budget: u64) -> Self {
25 Self {
26 current_budget: AtomicU64::new(initial),
27 floor_budget,
28 inflight_in_use: AtomicU64::new(0),
29 waiters: Notify::new(),
30 }
31 }
32
33 async fn acquire_strict(&self, n_bytes: u64) {
36 assert!(n_bytes <= self.floor_budget);
38
39 loop {
42 let cap = self.current_budget.load(Ordering::Acquire);
43 let inflight = self.inflight_in_use.load(Ordering::Acquire);
44
45 if inflight + n_bytes <= cap {
46 if self
47 .inflight_in_use
48 .compare_exchange_weak(
49 inflight,
50 inflight + n_bytes,
51 Ordering::AcqRel,
52 Ordering::Relaxed,
53 )
54 .is_ok()
55 {
56 self.waiters.notify_one();
61 return;
62 }
63 continue;
64 }
65
66 let notified = self.waiters.notified();
68 let cap = self.current_budget.load(Ordering::Acquire);
69 let inflight = self.inflight_in_use.load(Ordering::Acquire);
70 if inflight + n_bytes <= cap {
71 continue;
72 }
73 notified.await;
74 }
75 }
76
77 fn release(&self, bytes: u64) {
78 self.inflight_in_use.fetch_sub(bytes, Ordering::AcqRel);
79 self.waiters.notify_one();
80 }
81
82 fn resize(&self, new: u64) {
83 let new = new.max(self.floor_budget);
84 let old = self.current_budget.swap(new, Ordering::AcqRel);
85 if new > old {
86 self.waiters.notify_waiters();
88 }
89 }
90
91 fn current_budget(&self) -> u64 {
92 self.current_budget.load(Ordering::Relaxed)
93 }
94
95 fn floor_byte_budget(&self) -> u64 {
96 self.floor_budget
97 }
98
99 fn inflight_in_use(&self) -> u64 {
100 self.inflight_in_use.load(Ordering::Relaxed)
101 }
102}
103
104#[derive(Debug)]
106pub(super) struct RequestBudget {
107 budget: usize,
108 semaphore: Arc<Semaphore>,
109 }
112
113impl RequestBudget {
114 fn new(budget: usize) -> Self {
115 Self {
116 budget,
117 semaphore: Arc::new(Semaphore::new(budget)),
118 }
119 }
120}
121
122#[derive(Clone, Copy, Debug)]
123pub struct InFlightStats {
124 pub bytes_budget: u64,
125 pub bytes_in_use: u64,
126 pub bytes_saturation: f64,
129 pub request_budget: usize,
130 pub requests_in_use: usize,
131 pub requests_saturation: f64,
132}
133
134#[derive(Debug)]
135pub struct InFlightBudget {
136 byte_budget: Arc<ByteBudget>,
137 request_budget: Arc<RequestBudget>,
138}
139
140impl InFlightBudget {
141 pub fn new(
142 initial_byte_budget: u64,
143 floor_byte_budget: u64,
144 initial_request_budget: u32,
145 ) -> Self {
146 let inflight_budget = Self {
147 byte_budget: Arc::new(ByteBudget::new(initial_byte_budget, floor_byte_budget)),
148 request_budget: Arc::new(RequestBudget::new(initial_request_budget as usize)),
149 };
150
151 if polars_config::config().verbose() {
152 eprintln!(
153 "[InFlightConcurrency]: \
154 initial_byte_budget: {}, \
155 floor_byte_budget: {}, \
156 request_budget: {}",
157 initial_byte_budget, floor_byte_budget, initial_request_budget
158 );
159 }
160
161 inflight_budget
162 }
163
164 pub async fn acquire(self: &Arc<Self>, n_bytes: u64) -> InFlightPermit {
165 let n_bytes = n_bytes.min(self.byte_budget.floor_byte_budget());
168
169 self.byte_budget.acquire_strict(n_bytes).await;
171
172 let bytes = ByteReservation {
175 budget: self.byte_budget.clone(),
176 n_bytes,
177 };
178
179 let req_permit = self
182 .request_budget
183 .semaphore
184 .clone()
185 .acquire_owned()
186 .await
187 .expect("semaphore closed");
188
189 InFlightPermit {
190 _byte_reservation: bytes,
191 _req_permit: req_permit,
192 }
193 }
194
195 pub fn current_byte_budget(&self) -> u64 {
196 self.byte_budget.current_budget()
197 }
198
199 pub fn floor_byte_budget(&self) -> u64 {
200 self.byte_budget.floor_byte_budget()
201 }
202
203 pub fn resize_byte_budget(&self, new: u64) {
204 self.byte_budget.resize(new);
205 }
206
207 pub fn stats(&self) -> InFlightStats {
208 let bytes_budget = self.byte_budget.current_budget();
209 let bytes_in_use = self.byte_budget.inflight_in_use();
210 let bytes_saturation = if bytes_budget > 0 {
211 bytes_in_use as f64 / bytes_budget as f64
212 } else {
213 0.0
214 };
215
216 let request_budget = self.request_budget.budget;
217 let requests_in_use = request_budget - self.request_budget.semaphore.available_permits();
218 let requests_saturation = if request_budget > 0 {
219 requests_in_use as f64 / request_budget as f64
220 } else {
221 0.0
222 };
223
224 InFlightStats {
225 bytes_budget,
226 bytes_in_use,
227 bytes_saturation,
228 request_budget,
229 requests_in_use,
230 requests_saturation,
231 }
232 }
233}
234
235struct ByteReservation {
236 budget: Arc<ByteBudget>,
237 n_bytes: u64,
238}
239
240impl Drop for ByteReservation {
241 fn drop(&mut self) {
242 self.budget.release(self.n_bytes);
243 }
244}
245
246pub struct InFlightPermit {
247 _byte_reservation: ByteReservation,
248 _req_permit: OwnedSemaphorePermit,
249}