Skip to main content

polars_utils/
fixedringbuffer.rs

1use std::mem::ManuallyDrop;
2
3/// A ring-buffer with a size determined at creation-time
4///
5/// This makes it perfectly suited for buffers that produce and consume at different speeds.
6pub struct FixedRingBuffer<T> {
7    start: usize,
8    length: usize,
9    buffer: *mut T,
10    /// The wanted fixed capacity in the buffer
11    capacity: usize,
12
13    /// The actually allocated capacity, this should not be used for any calculations and it purely
14    /// used for the deallocation.
15    _buffer_capacity: usize,
16}
17
18#[inline(always)]
19const fn wrapping_add(x: usize, n: usize, capacity: usize) -> usize {
20    assert!(n <= capacity);
21
22    let sub = if capacity - n <= x { capacity } else { 0 };
23
24    x.wrapping_add(n).wrapping_sub(sub)
25}
26
27impl<T> FixedRingBuffer<T> {
28    pub fn new(capacity: usize) -> Self {
29        let mut buffer = ManuallyDrop::new(Vec::with_capacity(capacity));
30
31        Self {
32            start: 0,
33            length: 0,
34
35            _buffer_capacity: buffer.capacity(),
36            buffer: buffer.as_mut_ptr(),
37            capacity,
38        }
39    }
40
41    #[inline(always)]
42    pub const fn len(&self) -> usize {
43        self.length
44    }
45
46    #[inline(always)]
47    pub const fn capacity(&self) -> usize {
48        self.capacity
49    }
50
51    #[inline(always)]
52    pub const fn remaining_capacity(&self) -> usize {
53        self.capacity - self.len()
54    }
55
56    #[inline(always)]
57    pub const fn is_empty(&self) -> bool {
58        self.length == 0
59    }
60
61    #[inline(always)]
62    pub const fn is_full(&self) -> bool {
63        self.len() == self.capacity
64    }
65
66    /// Get a reference to all elements in the form of two slices.
67    ///
68    /// These are in the listed in the order of being pushed into the buffer.
69    #[inline]
70    pub fn as_slices(&self) -> (&[T], &[T]) {
71        // SAFETY: Only pick the part that is actually defined
72        if self.capacity - self.length > self.start {
73            (
74                unsafe {
75                    std::slice::from_raw_parts(self.buffer.wrapping_add(self.start), self.length)
76                },
77                &[],
78            )
79        } else {
80            (
81                unsafe {
82                    std::slice::from_raw_parts(
83                        self.buffer.wrapping_add(self.start),
84                        self.capacity - self.start,
85                    )
86                },
87                unsafe {
88                    std::slice::from_raw_parts(
89                        self.buffer,
90                        wrapping_add(self.start, self.length, self.capacity),
91                    )
92                },
93            )
94        }
95    }
96
97    /// Pop an item at the front of the [`FixedRingBuffer`]
98    #[inline]
99    pub fn pop_front(&mut self) -> Option<T> {
100        if self.is_empty() {
101            return None;
102        }
103
104        // SAFETY: This value is never read again
105        let item = unsafe { self.buffer.wrapping_add(self.start).read() };
106        self.start = wrapping_add(self.start, 1, self.capacity);
107        self.length -= 1;
108        Some(item)
109    }
110
111    /// Push an item into the [`FixedRingBuffer`]
112    ///
113    /// Returns `None` if there is no more space
114    #[inline]
115    pub fn push(&mut self, value: T) -> Option<()> {
116        if self.is_full() {
117            return None;
118        }
119
120        let offset = wrapping_add(self.start, self.len(), self.capacity);
121
122        unsafe { self.buffer.wrapping_add(offset).write(value) };
123        self.length += 1;
124
125        Some(())
126    }
127}
128
129impl<T: Copy> FixedRingBuffer<T> {
130    /// Add at most `num` items of `value` into the [`FixedRingBuffer`]
131    ///
132    /// This returns the amount of items actually added.
133    pub fn fill_repeat(&mut self, value: T, num: usize) -> usize {
134        if num == 0 || self.is_full() {
135            return 0;
136        }
137
138        let num = usize::min(num, self.remaining_capacity());
139
140        let start = wrapping_add(self.start, self.len(), self.capacity);
141        let end = wrapping_add(start, num, self.capacity);
142
143        if start < end {
144            unsafe { std::slice::from_raw_parts_mut(self.buffer.wrapping_add(start), num) }
145                .fill(value);
146        } else {
147            unsafe {
148                std::slice::from_raw_parts_mut(
149                    self.buffer.wrapping_add(start),
150                    self.capacity - start,
151                )
152            }
153            .fill(value);
154
155            if end != 0 {
156                unsafe { std::slice::from_raw_parts_mut(self.buffer, end) }.fill(value);
157            }
158        }
159
160        self.length += num;
161
162        num
163    }
164}
165
166impl<T> Drop for FixedRingBuffer<T> {
167    fn drop(&mut self) {
168        for i in 0..self.length {
169            let offset = wrapping_add(self.start, i, self.capacity);
170            unsafe { self.buffer.wrapping_add(offset).read() };
171        }
172
173        unsafe { Vec::from_raw_parts(self.buffer, 0, self._buffer_capacity) };
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    #[test]
182    fn basic() {
183        let mut frb = FixedRingBuffer::new(256);
184
185        assert!(frb.pop_front().is_none());
186
187        frb.push(1).unwrap();
188        frb.push(3).unwrap();
189
190        assert_eq!(frb.pop_front(), Some(1));
191        assert_eq!(frb.pop_front(), Some(3));
192        assert_eq!(frb.pop_front(), None);
193
194        assert!(!frb.is_full());
195        assert_eq!(frb.fill_repeat(42, 300), 256);
196        assert!(frb.is_full());
197
198        for _ in 0..256 {
199            assert_eq!(frb.pop_front(), Some(42));
200            assert!(!frb.is_full());
201        }
202        assert_eq!(frb.pop_front(), None);
203    }
204
205    #[test]
206    fn boxed() {
207        let mut frb = FixedRingBuffer::new(256);
208
209        assert!(frb.pop_front().is_none());
210
211        frb.push(Box::new(1)).unwrap();
212        frb.push(Box::new(3)).unwrap();
213
214        assert_eq!(frb.pop_front(), Some(Box::new(1)));
215        assert_eq!(frb.pop_front(), Some(Box::new(3)));
216        assert_eq!(frb.pop_front(), None);
217    }
218}