Skip to main content

polars_io/
metrics.rs

1use std::sync::Arc;
2
3use polars_utils::live_timer::{LiveTimer, LiveTimerSession};
4use polars_utils::relaxed_cell::RelaxedCell;
5
6#[derive(Debug, Default, Clone)]
7pub struct IOMetrics {
8    pub io_timer: LiveTimer,
9    pub bytes_requested: RelaxedCell<u64>,
10    pub bytes_received: RelaxedCell<u64>,
11    pub bytes_sent: RelaxedCell<u64>,
12}
13
14#[derive(Debug, Clone)]
15pub struct OptIOMetrics(pub Option<Arc<IOMetrics>>);
16
17impl OptIOMetrics {
18    pub fn start_io_session(&self) -> Option<LiveTimerSession> {
19        self.0.as_ref().map(|x| x.io_timer.start_session())
20    }
21
22    pub fn add_bytes_requested(&self, bytes_requested: u64) {
23        self.0
24            .as_ref()
25            .map(|x| x.bytes_requested.fetch_add(bytes_requested));
26    }
27
28    pub fn add_bytes_received(&self, bytes_received: u64) {
29        self.0
30            .as_ref()
31            .map(|x| x.bytes_received.fetch_add(bytes_received));
32    }
33
34    pub fn add_bytes_sent(&self, bytes_sent: u64) {
35        self.0.as_ref().map(|x| x.bytes_sent.fetch_add(bytes_sent));
36    }
37
38    pub async fn record_io_read<F, O>(&self, num_bytes: u64, fut: F) -> O
39    where
40        F: Future<Output = O>,
41    {
42        self.add_bytes_requested(num_bytes);
43
44        let io_session = self.start_io_session();
45
46        let out = fut.await;
47
48        drop(io_session);
49
50        self.add_bytes_received(num_bytes);
51
52        out
53    }
54
55    pub async fn record_bytes_tx<F, O>(&self, num_bytes: u64, fut: F) -> O
56    where
57        F: Future<Output = O>,
58    {
59        let io_session = self.start_io_session();
60
61        let out = fut.await;
62
63        drop(io_session);
64
65        self.add_bytes_sent(num_bytes);
66
67        out
68    }
69}