1use std::sync::Arc;
2
3use polars_utils::live_timer::{LiveTimer, LiveTimerSession};
4use polars_utils::relaxed_cell::RelaxedCell;
5
6pub const HEAD_RESPONSE_SIZE_ESTIMATE: u64 = 1;
7
8#[derive(Debug, Default, Clone)]
9pub struct IOMetrics {
10 pub io_timer: LiveTimer,
11 pub bytes_requested: RelaxedCell<u64>,
12 pub bytes_received: RelaxedCell<u64>,
13 pub bytes_sent: RelaxedCell<u64>,
14}
15
16#[derive(Debug, Clone)]
17pub struct OptIOMetrics(pub Option<Arc<IOMetrics>>);
18
19impl OptIOMetrics {
20 pub fn start_io_session(&self) -> Option<LiveTimerSession> {
21 self.0.as_ref().map(|x| x.io_timer.start_session())
22 }
23
24 pub fn add_bytes_requested(&self, bytes_requested: u64) {
25 self.0
26 .as_ref()
27 .map(|x| x.bytes_requested.fetch_add(bytes_requested));
28 }
29
30 pub fn add_bytes_received(&self, bytes_received: u64) {
31 self.0
32 .as_ref()
33 .map(|x| x.bytes_received.fetch_add(bytes_received));
34 }
35
36 pub fn add_bytes_sent(&self, bytes_sent: u64) {
37 self.0.as_ref().map(|x| x.bytes_sent.fetch_add(bytes_sent));
38 }
39
40 pub async fn record_io_read<F, O>(&self, num_bytes: u64, fut: F) -> O
41 where
42 F: Future<Output = O>,
43 {
44 self.add_bytes_requested(num_bytes);
45
46 let io_session = self.start_io_session();
47
48 let out = fut.await;
49
50 drop(io_session);
51
52 self.add_bytes_received(num_bytes);
53
54 out
55 }
56
57 pub async fn record_bytes_tx<F, O>(&self, num_bytes: u64, fut: F) -> O
58 where
59 F: Future<Output = O>,
60 {
61 let io_session = self.start_io_session();
62
63 let out = fut.await;
64
65 drop(io_session);
66
67 self.add_bytes_sent(num_bytes);
68
69 out
70 }
71}