polars_utils/
live_timer.rs1use std::sync::atomic::{AtomicU64, Ordering};
2use std::time::Instant;
3
4const TICKING_BIT: u64 = 1 << 63; const SESSION_UNIT: u64 = 1; const SESSION_MASK: u64 = TIMER_UNIT - 1; const TIMER_UNIT: u64 = 1 << 32; pub struct LiveTimer {
16 inner: *mut LiveTimerInner,
19}
20
21unsafe impl Send for LiveTimer {}
22unsafe impl Sync for LiveTimer {}
23
24impl Default for LiveTimer {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl LiveTimer {
31 pub fn new() -> Self {
32 let inner = LiveTimerInner {
33 base_timestamp: Instant::now(),
34 refcount: AtomicU64::new(TIMER_UNIT),
35 _padding: [0; _],
36 state_ns: AtomicU64::new(0),
37 max_live_ns: AtomicU64::new(0),
38 };
39
40 Self {
41 inner: Box::into_raw(Box::new(inner)),
42 }
43 }
44
45 pub fn start_session(&self) -> LiveTimerSession {
46 unsafe { (&*self.inner).start_session() };
47 LiveTimerSession { inner: self.inner }
48 }
49
50 pub fn total_time_live_ns(&self) -> u64 {
51 unsafe { (&*self.inner).total_time_live_ns() }
52 }
53}
54
55impl Clone for LiveTimer {
56 fn clone(&self) -> Self {
57 let inner = unsafe { &*self.inner };
58 inner.refcount.fetch_add(TIMER_UNIT, Ordering::Relaxed);
59 Self { inner: self.inner }
60 }
61}
62
63impl Drop for LiveTimer {
64 fn drop(&mut self) {
65 unsafe {
66 let old_rc = (&*self.inner)
67 .refcount
68 .fetch_sub(TIMER_UNIT, Ordering::AcqRel);
69 if old_rc == TIMER_UNIT {
70 drop(Box::from_raw(self.inner))
71 }
72 }
73 }
74}
75
76pub struct LiveTimerSession {
77 inner: *mut LiveTimerInner,
78}
79
80unsafe impl Send for LiveTimerSession {}
81unsafe impl Sync for LiveTimerSession {}
82
83impl Clone for LiveTimerSession {
84 fn clone(&self) -> Self {
85 unsafe { (*self.inner).start_session() };
86 Self { inner: self.inner }
87 }
88}
89
90impl Drop for LiveTimerSession {
91 fn drop(&mut self) {
92 unsafe {
93 if (*self.inner).stop_session() {
94 drop(Box::from_raw(self.inner))
95 }
96 }
97 }
98}
99
100struct LiveTimerInner {
101 base_timestamp: Instant,
102 refcount: AtomicU64,
105 _padding: [u8; 64],
107 state_ns: AtomicU64,
111 max_live_ns: AtomicU64,
113}
114
115impl LiveTimerInner {
116 fn start_session(&self) {
117 let prev_sessions = self.refcount.fetch_add(SESSION_UNIT, Ordering::Acquire) & SESSION_MASK;
119 if prev_sessions == 0 {
120 let orig_state_ns = self.state_ns.load(Ordering::Relaxed);
121 let start_ns = self.base_timestamp.elapsed().as_nanos() as u64;
122 debug_assert!(orig_state_ns & TICKING_BIT == 0);
123 let new_state_ns = start_ns.saturating_sub(orig_state_ns) | TICKING_BIT;
124 self.state_ns.store(new_state_ns, Ordering::Release); }
126 }
127
128 fn stop_session(&self) -> bool {
130 let mut stopped = false;
131 let mut stopped_at_ns = u64::MAX;
132 let mut orig_state_ns = u64::MAX;
133
134 self.refcount
136 .fetch_update(Ordering::Release, Ordering::Acquire, |rc| {
137 if rc == SESSION_UNIT {
138 return None; }
140
141 let should_stop = rc & SESSION_MASK == SESSION_UNIT;
143 if should_stop && !stopped {
144 if stopped_at_ns == u64::MAX {
145 orig_state_ns = self.state_ns.load(Ordering::Relaxed);
146 stopped_at_ns = self.base_timestamp.elapsed().as_nanos() as u64;
147 }
148 let new_state_ns = stopped_at_ns.saturating_sub(orig_state_ns & !TICKING_BIT);
149 self.state_ns.store(new_state_ns, Ordering::Relaxed);
150 stopped = true;
151 } else if !should_stop && stopped {
152 self.state_ns.store(orig_state_ns, Ordering::Release); stopped = false;
154 }
155
156 Some(rc - SESSION_UNIT)
157 })
158 .is_err()
159 }
160
161 fn total_time_live_ns(&self) -> u64 {
162 let state_ns = self.state_ns.load(Ordering::Acquire);
165 let active_time = if state_ns & TICKING_BIT == 0 {
166 state_ns
167 } else {
168 let now_ns = self.base_timestamp.elapsed().as_nanos() as u64;
169 now_ns.saturating_sub(state_ns & !TICKING_BIT)
170 };
171
172 u64::max(
175 active_time,
176 self.max_live_ns.fetch_max(active_time, Ordering::Relaxed),
177 )
178 }
179}
180
181impl std::fmt::Debug for LiveTimer {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 let refcount = unsafe { (&*self.inner).refcount.load(Ordering::Relaxed) };
184 let active_sessions = refcount & SESSION_MASK;
185 let total_time_live_ns = self.total_time_live_ns();
186
187 return std::fmt::Debug::fmt(
188 &display::LiveTimer {
189 active_sessions,
190 total_time_live_ns,
191 },
192 f,
193 );
194
195 mod display {
196 #[derive(Debug)]
197 #[expect(unused)]
198 pub struct LiveTimer {
199 pub active_sessions: u64,
200 pub total_time_live_ns: u64,
201 }
202 }
203 }
204}
205
206impl std::fmt::Debug for LiveTimerSession {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 return std::fmt::Debug::fmt(&display::LiveTimerSession {}, f);
209
210 mod display {
211 #[derive(Debug)]
212 pub struct LiveTimerSession {}
213 }
214 }
215}