1use std::sync::Arc;
2
3use polars_utils::live_timer::{LiveTimer, LiveTimerSession};
4use polars_utils::relaxed_cell::RelaxedCell;
5
6#[derive(Debug, Default, Clone)]
7pub struct IOMetrics {
8 pub io_timer: LiveTimer,
9 pub bytes_requested: RelaxedCell<u64>,
10 pub bytes_received: RelaxedCell<u64>,
11 pub bytes_sent: RelaxedCell<u64>,
12}
13
14#[derive(Debug, Clone)]
15pub struct OptIOMetrics(pub Option<Arc<IOMetrics>>);
16
17impl OptIOMetrics {
18 pub fn start_io_session(&self) -> Option<LiveTimerSession> {
19 self.0.as_ref().map(|x| x.io_timer.start_session())
20 }
21
22 pub fn add_bytes_requested(&self, bytes_requested: u64) {
23 self.0
24 .as_ref()
25 .map(|x| x.bytes_requested.fetch_add(bytes_requested));
26 }
27
28 pub fn add_bytes_received(&self, bytes_received: u64) {
29 self.0
30 .as_ref()
31 .map(|x| x.bytes_received.fetch_add(bytes_received));
32 }
33
34 pub fn add_bytes_sent(&self, bytes_sent: u64) {
35 self.0.as_ref().map(|x| x.bytes_sent.fetch_add(bytes_sent));
36 }
37
38 pub async fn record_io_read<F, O>(&self, num_bytes: u64, fut: F) -> O
39 where
40 F: Future<Output = O>,
41 {
42 self.add_bytes_requested(num_bytes);
43
44 let io_session = self.start_io_session();
45
46 let out = fut.await;
47
48 drop(io_session);
49
50 self.add_bytes_received(num_bytes);
51
52 out
53 }
54
55 pub async fn record_bytes_tx<F, O>(&self, num_bytes: u64, fut: F) -> O
56 where
57 F: Future<Output = O>,
58 {
59 let io_session = self.start_io_session();
60
61 let out = fut.await;
62
63 drop(io_session);
64
65 self.add_bytes_sent(num_bytes);
66
67 out
68 }
69}