polars_io/
metrics.rs

1use std::sync::Arc;
2
3use polars_utils::live_timer::{LiveTimer, LiveTimerSession};
4use polars_utils::relaxed_cell::RelaxedCell;
5
6pub const HEAD_RESPONSE_SIZE_ESTIMATE: u64 = 1;
7
8#[derive(Debug, Default, Clone)]
9pub struct IOMetrics {
10    pub io_timer: LiveTimer,
11    pub bytes_requested: RelaxedCell<u64>,
12    pub bytes_received: RelaxedCell<u64>,
13    pub bytes_sent: RelaxedCell<u64>,
14}
15
16#[derive(Debug, Clone)]
17pub struct OptIOMetrics(pub Option<Arc<IOMetrics>>);
18
19impl OptIOMetrics {
20    pub fn start_io_session(&self) -> Option<LiveTimerSession> {
21        self.0.as_ref().map(|x| x.io_timer.start_session())
22    }
23
24    pub fn add_bytes_requested(&self, bytes_requested: u64) {
25        self.0
26            .as_ref()
27            .map(|x| x.bytes_requested.fetch_add(bytes_requested));
28    }
29
30    pub fn add_bytes_received(&self, bytes_received: u64) {
31        self.0
32            .as_ref()
33            .map(|x| x.bytes_received.fetch_add(bytes_received));
34    }
35
36    pub fn add_bytes_sent(&self, bytes_sent: u64) {
37        self.0.as_ref().map(|x| x.bytes_sent.fetch_add(bytes_sent));
38    }
39
40    pub async fn record_io_read<F, O>(&self, num_bytes: u64, fut: F) -> O
41    where
42        F: Future<Output = O>,
43    {
44        self.add_bytes_requested(num_bytes);
45
46        let io_session = self.start_io_session();
47
48        let out = fut.await;
49
50        drop(io_session);
51
52        self.add_bytes_received(num_bytes);
53
54        out
55    }
56
57    pub async fn record_bytes_tx<F, O>(&self, num_bytes: u64, fut: F) -> O
58    where
59        F: Future<Output = O>,
60    {
61        let io_session = self.start_io_session();
62
63        let out = fut.await;
64
65        drop(io_session);
66
67        self.add_bytes_sent(num_bytes);
68
69        out
70    }
71}