polars_utils/
fixedringbuffer.rs1use std::mem::ManuallyDrop;
2
3pub struct FixedRingBuffer<T> {
7 start: usize,
8 length: usize,
9 buffer: *mut T,
10 capacity: usize,
12
13 _buffer_capacity: usize,
16}
17
18#[inline(always)]
19const fn wrapping_add(x: usize, n: usize, capacity: usize) -> usize {
20 assert!(n <= capacity);
21
22 let sub = if capacity - n <= x { capacity } else { 0 };
23
24 x.wrapping_add(n).wrapping_sub(sub)
25}
26
27impl<T> FixedRingBuffer<T> {
28 pub fn new(capacity: usize) -> Self {
29 let mut buffer = ManuallyDrop::new(Vec::with_capacity(capacity));
30
31 Self {
32 start: 0,
33 length: 0,
34
35 _buffer_capacity: buffer.capacity(),
36 buffer: buffer.as_mut_ptr(),
37 capacity,
38 }
39 }
40
41 #[inline(always)]
42 pub const fn len(&self) -> usize {
43 self.length
44 }
45
46 #[inline(always)]
47 pub const fn capacity(&self) -> usize {
48 self.capacity
49 }
50
51 #[inline(always)]
52 pub const fn remaining_capacity(&self) -> usize {
53 self.capacity - self.len()
54 }
55
56 #[inline(always)]
57 pub const fn is_empty(&self) -> bool {
58 self.length == 0
59 }
60
61 #[inline(always)]
62 pub const fn is_full(&self) -> bool {
63 self.len() == self.capacity
64 }
65
66 #[inline]
70 pub fn as_slices(&self) -> (&[T], &[T]) {
71 if self.capacity - self.length > self.start {
73 (
74 unsafe {
75 std::slice::from_raw_parts(self.buffer.wrapping_add(self.start), self.length)
76 },
77 &[],
78 )
79 } else {
80 (
81 unsafe {
82 std::slice::from_raw_parts(
83 self.buffer.wrapping_add(self.start),
84 self.capacity - self.start,
85 )
86 },
87 unsafe {
88 std::slice::from_raw_parts(
89 self.buffer,
90 wrapping_add(self.start, self.length, self.capacity),
91 )
92 },
93 )
94 }
95 }
96
97 #[inline]
99 pub fn pop_front(&mut self) -> Option<T> {
100 if self.is_empty() {
101 return None;
102 }
103
104 let item = unsafe { self.buffer.wrapping_add(self.start).read() };
106 self.start = wrapping_add(self.start, 1, self.capacity);
107 self.length -= 1;
108 Some(item)
109 }
110
111 #[inline]
115 pub fn push(&mut self, value: T) -> Option<()> {
116 if self.is_full() {
117 return None;
118 }
119
120 let offset = wrapping_add(self.start, self.len(), self.capacity);
121
122 unsafe { self.buffer.wrapping_add(offset).write(value) };
123 self.length += 1;
124
125 Some(())
126 }
127}
128
129impl<T: Copy> FixedRingBuffer<T> {
130 pub fn fill_repeat(&mut self, value: T, num: usize) -> usize {
134 if num == 0 || self.is_full() {
135 return 0;
136 }
137
138 let num = usize::min(num, self.remaining_capacity());
139
140 let start = wrapping_add(self.start, self.len(), self.capacity);
141 let end = wrapping_add(start, num, self.capacity);
142
143 if start < end {
144 unsafe { std::slice::from_raw_parts_mut(self.buffer.wrapping_add(start), num) }
145 .fill(value);
146 } else {
147 unsafe {
148 std::slice::from_raw_parts_mut(
149 self.buffer.wrapping_add(start),
150 self.capacity - start,
151 )
152 }
153 .fill(value);
154
155 if end != 0 {
156 unsafe { std::slice::from_raw_parts_mut(self.buffer, end) }.fill(value);
157 }
158 }
159
160 self.length += num;
161
162 num
163 }
164}
165
166impl<T> Drop for FixedRingBuffer<T> {
167 fn drop(&mut self) {
168 for i in 0..self.length {
169 let offset = wrapping_add(self.start, i, self.capacity);
170 unsafe { self.buffer.wrapping_add(offset).read() };
171 }
172
173 unsafe { Vec::from_raw_parts(self.buffer, 0, self._buffer_capacity) };
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
182 fn basic() {
183 let mut frb = FixedRingBuffer::new(256);
184
185 assert!(frb.pop_front().is_none());
186
187 frb.push(1).unwrap();
188 frb.push(3).unwrap();
189
190 assert_eq!(frb.pop_front(), Some(1));
191 assert_eq!(frb.pop_front(), Some(3));
192 assert_eq!(frb.pop_front(), None);
193
194 assert!(!frb.is_full());
195 assert_eq!(frb.fill_repeat(42, 300), 256);
196 assert!(frb.is_full());
197
198 for _ in 0..256 {
199 assert_eq!(frb.pop_front(), Some(42));
200 assert!(!frb.is_full());
201 }
202 assert_eq!(frb.pop_front(), None);
203 }
204
205 #[test]
206 fn boxed() {
207 let mut frb = FixedRingBuffer::new(256);
208
209 assert!(frb.pop_front().is_none());
210
211 frb.push(Box::new(1)).unwrap();
212 frb.push(Box::new(3)).unwrap();
213
214 assert_eq!(frb.pop_front(), Some(Box::new(1)));
215 assert_eq!(frb.pop_front(), Some(Box::new(3)));
216 assert_eq!(frb.pop_front(), None);
217 }
218}