polars_utils/
live_timer.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::time::Instant;
3
4const TICKING_BIT: u64 = 1 << 63; // Indicates if the timer is currently ticking.
5const SESSION_UNIT: u64 = 1; // Base unit for number of sessions.
6const SESSION_MASK: u64 = TIMER_UNIT - 1; // Mask used to extract session count.
7const TIMER_UNIT: u64 = 1 << 32; // Base unit for number of timers.
8
9/// Counts for how much time this timer was 'live'.
10///
11/// It is live when there is at least one session, but multiple concurrent
12/// sessions don't increase the rate at which the timer ticks.
13///
14/// Clones of this timer are cheap, and the clones are identical, like an Arc.
15pub struct LiveTimer {
16    // We use a raw pointer instead of an Arc to ensure starting/stopping sessions only involves
17    // a single atomic operation, while still letting LiveTimerSession be lifetime-free.
18    inner: *mut LiveTimerInner,
19}
20
21unsafe impl Send for LiveTimer {}
22unsafe impl Sync for LiveTimer {}
23
24impl Default for LiveTimer {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl LiveTimer {
31    pub fn new() -> Self {
32        let inner = LiveTimerInner {
33            base_timestamp: Instant::now(),
34            refcount: AtomicU64::new(TIMER_UNIT),
35            _padding: [0; _],
36            state_ns: AtomicU64::new(0),
37            max_live_ns: AtomicU64::new(0),
38        };
39
40        Self {
41            inner: Box::into_raw(Box::new(inner)),
42        }
43    }
44
45    pub fn start_session(&self) -> LiveTimerSession {
46        unsafe { (&*self.inner).start_session() };
47        LiveTimerSession { inner: self.inner }
48    }
49
50    pub fn total_time_live_ns(&self) -> u64 {
51        unsafe { (&*self.inner).total_time_live_ns() }
52    }
53}
54
55impl Clone for LiveTimer {
56    fn clone(&self) -> Self {
57        let inner = unsafe { &*self.inner };
58        inner.refcount.fetch_add(TIMER_UNIT, Ordering::Relaxed);
59        Self { inner: self.inner }
60    }
61}
62
63impl Drop for LiveTimer {
64    fn drop(&mut self) {
65        unsafe {
66            let old_rc = (&*self.inner)
67                .refcount
68                .fetch_sub(TIMER_UNIT, Ordering::AcqRel);
69            if old_rc == TIMER_UNIT {
70                drop(Box::from_raw(self.inner))
71            }
72        }
73    }
74}
75
76pub struct LiveTimerSession {
77    inner: *mut LiveTimerInner,
78}
79
80unsafe impl Send for LiveTimerSession {}
81unsafe impl Sync for LiveTimerSession {}
82
83impl Clone for LiveTimerSession {
84    fn clone(&self) -> Self {
85        unsafe { (*self.inner).start_session() };
86        Self { inner: self.inner }
87    }
88}
89
90impl Drop for LiveTimerSession {
91    fn drop(&mut self) {
92        unsafe {
93            if (*self.inner).stop_session() {
94                drop(Box::from_raw(self.inner))
95            }
96        }
97    }
98}
99
100struct LiveTimerInner {
101    base_timestamp: Instant,
102    /// Contains two 32-bit refcounts: number of timer references and number of live sessions.
103    /// If both are zero this object is destroyed.
104    refcount: AtomicU64,
105    /// Ensures refcount (commonly modified) is on a different cache line to state_ns.
106    _padding: [u8; 64],
107    /// Contains the total amount of time the timer was live. Interpreted differently depending on TICKING_BIT:
108    ///   0 -> Duration::from_nanos(state)
109    ///   1 -> base_timestamp.elapsed() - Duration::from_nanos(state & !TICKING_BIT)
110    state_ns: AtomicU64,
111    /// Used by `total_time_live_ns` to ensure it returns monotonically nondecreasing values.
112    max_live_ns: AtomicU64,
113}
114
115impl LiveTimerInner {
116    fn start_session(&self) {
117        // (1) Acquire to ensure we load state_ns from previous stop_session, if necessary.
118        let prev_sessions = self.refcount.fetch_add(SESSION_UNIT, Ordering::Acquire) & SESSION_MASK;
119        if prev_sessions == 0 {
120            let orig_state_ns = self.state_ns.load(Ordering::Relaxed);
121            let start_ns = self.base_timestamp.elapsed().as_nanos() as u64;
122            debug_assert!(orig_state_ns & TICKING_BIT == 0);
123            let new_state_ns = start_ns.saturating_sub(orig_state_ns) | TICKING_BIT;
124            self.state_ns.store(new_state_ns, Ordering::Release); // See (2).
125        }
126    }
127
128    /// Returns true if this timer should be destroyed.
129    fn stop_session(&self) -> bool {
130        let mut stopped = false;
131        let mut stopped_at_ns = u64::MAX;
132        let mut orig_state_ns = u64::MAX;
133
134        // (1) Acquire and Release to ensure we load state_ns from previous start_session, if necessary.
135        self.refcount
136            .fetch_update(Ordering::Release, Ordering::Acquire, |rc| {
137                if rc == SESSION_UNIT {
138                    return None; // We're the sole reference, can just destroy.
139                }
140
141                // Stop or re-start the timer if necessary.
142                let should_stop = rc & SESSION_MASK == SESSION_UNIT;
143                if should_stop && !stopped {
144                    if stopped_at_ns == u64::MAX {
145                        orig_state_ns = self.state_ns.load(Ordering::Relaxed);
146                        stopped_at_ns = self.base_timestamp.elapsed().as_nanos() as u64;
147                    }
148                    let new_state_ns = stopped_at_ns.saturating_sub(orig_state_ns & !TICKING_BIT);
149                    self.state_ns.store(new_state_ns, Ordering::Relaxed);
150                    stopped = true;
151                } else if !should_stop && stopped {
152                    self.state_ns.store(orig_state_ns, Ordering::Release); // See (2).
153                    stopped = false;
154                }
155
156                Some(rc - SESSION_UNIT)
157            })
158            .is_err()
159    }
160
161    fn total_time_live_ns(&self) -> u64 {
162        // (2) Acquire ensures the elapsed() call by this function is sequenced after the elapsed()
163        // call that state_ns value was calculated against if it is currently ticking.
164        let state_ns = self.state_ns.load(Ordering::Acquire);
165        let active_time = if state_ns & TICKING_BIT == 0 {
166            state_ns
167        } else {
168            let now_ns = self.base_timestamp.elapsed().as_nanos() as u64;
169            now_ns.saturating_sub(state_ns & !TICKING_BIT)
170        };
171
172        // Needed to ensure monotonicity, our load above interleaved with stops/restarts
173        // could otherwise be non-monotonic.
174        u64::max(
175            active_time,
176            self.max_live_ns.fetch_max(active_time, Ordering::Relaxed),
177        )
178    }
179}
180
181impl std::fmt::Debug for LiveTimer {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        let refcount = unsafe { (&*self.inner).refcount.load(Ordering::Relaxed) };
184        let active_sessions = refcount & SESSION_MASK;
185        let total_time_live_ns = self.total_time_live_ns();
186
187        return std::fmt::Debug::fmt(
188            &display::LiveTimer {
189                active_sessions,
190                total_time_live_ns,
191            },
192            f,
193        );
194
195        mod display {
196            #[derive(Debug)]
197            #[expect(unused)]
198            pub struct LiveTimer {
199                pub active_sessions: u64,
200                pub total_time_live_ns: u64,
201            }
202        }
203    }
204}
205
206impl std::fmt::Debug for LiveTimerSession {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        return std::fmt::Debug::fmt(&display::LiveTimerSession {}, f);
209
210        mod display {
211            #[derive(Debug)]
212            pub struct LiveTimerSession {}
213        }
214    }
215}