polars_utils/
fixedringbuffer.rs

1/// A ring-buffer with a size determined at creation-time
2///
3/// This makes it perfectly suited for buffers that produce and consume at different speeds.
4pub struct FixedRingBuffer<T> {
5    start: usize,
6    length: usize,
7    buffer: *mut T,
8    /// The wanted fixed capacity in the buffer
9    capacity: usize,
10
11    /// The actually allocated capacity, this should not be used for any calculations and it purely
12    /// used for the deallocation.
13    _buffer_capacity: usize,
14}
15
16#[inline(always)]
17const fn wrapping_add(x: usize, n: usize, capacity: usize) -> usize {
18    assert!(n <= capacity);
19
20    let sub = if capacity - n <= x { capacity } else { 0 };
21
22    x.wrapping_add(n).wrapping_sub(sub)
23}
24
25impl<T> FixedRingBuffer<T> {
26    pub fn new(capacity: usize) -> Self {
27        let buffer = Vec::with_capacity(capacity);
28
29        Self {
30            start: 0,
31            length: 0,
32
33            _buffer_capacity: buffer.capacity(),
34            buffer: buffer.leak() as *mut [T] as *mut T,
35            capacity,
36        }
37    }
38
39    #[inline(always)]
40    pub const fn len(&self) -> usize {
41        self.length
42    }
43
44    #[inline(always)]
45    pub const fn capacity(&self) -> usize {
46        self.capacity
47    }
48
49    #[inline(always)]
50    pub const fn remaining_capacity(&self) -> usize {
51        self.capacity - self.len()
52    }
53
54    #[inline(always)]
55    pub const fn is_empty(&self) -> bool {
56        self.length == 0
57    }
58
59    #[inline(always)]
60    pub const fn is_full(&self) -> bool {
61        self.len() == self.capacity
62    }
63
64    /// Get a reference to all elements in the form of two slices.
65    ///
66    /// These are in the listed in the order of being pushed into the buffer.
67    #[inline]
68    pub fn as_slices(&self) -> (&[T], &[T]) {
69        // SAFETY: Only pick the part that is actually defined
70        if self.capacity - self.length > self.start {
71            (
72                unsafe {
73                    std::slice::from_raw_parts(self.buffer.wrapping_add(self.start), self.length)
74                },
75                &[],
76            )
77        } else {
78            (
79                unsafe {
80                    std::slice::from_raw_parts(
81                        self.buffer.wrapping_add(self.start),
82                        self.capacity - self.start,
83                    )
84                },
85                unsafe {
86                    std::slice::from_raw_parts(
87                        self.buffer,
88                        wrapping_add(self.start, self.length, self.capacity),
89                    )
90                },
91            )
92        }
93    }
94
95    /// Pop an item at the front of the [`FixedRingBuffer`]
96    #[inline]
97    pub fn pop_front(&mut self) -> Option<T> {
98        if self.is_empty() {
99            return None;
100        }
101
102        // SAFETY: This value is never read again
103        let item = unsafe { self.buffer.wrapping_add(self.start).read() };
104        self.start = wrapping_add(self.start, 1, self.capacity);
105        self.length -= 1;
106        Some(item)
107    }
108
109    /// Push an item into the [`FixedRingBuffer`]
110    ///
111    /// Returns `None` if there is no more space
112    #[inline]
113    pub fn push(&mut self, value: T) -> Option<()> {
114        if self.is_full() {
115            return None;
116        }
117
118        let offset = wrapping_add(self.start, self.len(), self.capacity);
119
120        unsafe { self.buffer.wrapping_add(offset).write(value) };
121        self.length += 1;
122
123        Some(())
124    }
125}
126
127impl<T: Copy> FixedRingBuffer<T> {
128    /// Add at most `num` items of `value` into the [`FixedRingBuffer`]
129    ///
130    /// This returns the amount of items actually added.
131    pub fn fill_repeat(&mut self, value: T, num: usize) -> usize {
132        if num == 0 || self.is_full() {
133            return 0;
134        }
135
136        let num = usize::min(num, self.remaining_capacity());
137
138        let start = wrapping_add(self.start, self.len(), self.capacity);
139        let end = wrapping_add(start, num, self.capacity);
140
141        if start < end {
142            unsafe { std::slice::from_raw_parts_mut(self.buffer.wrapping_add(start), num) }
143                .fill(value);
144        } else {
145            unsafe {
146                std::slice::from_raw_parts_mut(
147                    self.buffer.wrapping_add(start),
148                    self.capacity - start,
149                )
150            }
151            .fill(value);
152
153            if end != 0 {
154                unsafe { std::slice::from_raw_parts_mut(self.buffer, end) }.fill(value);
155            }
156        }
157
158        self.length += num;
159
160        num
161    }
162}
163
164impl<T> Drop for FixedRingBuffer<T> {
165    fn drop(&mut self) {
166        for i in 0..self.length {
167            let offset = wrapping_add(self.start, i, self.capacity);
168            unsafe { self.buffer.wrapping_add(offset).read() };
169        }
170
171        unsafe { Vec::from_raw_parts(self.buffer, 0, self._buffer_capacity) };
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[test]
180    fn basic() {
181        let mut frb = FixedRingBuffer::new(256);
182
183        assert!(frb.pop_front().is_none());
184
185        frb.push(1).unwrap();
186        frb.push(3).unwrap();
187
188        assert_eq!(frb.pop_front(), Some(1));
189        assert_eq!(frb.pop_front(), Some(3));
190        assert_eq!(frb.pop_front(), None);
191
192        assert!(!frb.is_full());
193        assert_eq!(frb.fill_repeat(42, 300), 256);
194        assert!(frb.is_full());
195
196        for _ in 0..256 {
197            assert_eq!(frb.pop_front(), Some(42));
198            assert!(!frb.is_full());
199        }
200        assert_eq!(frb.pop_front(), None);
201    }
202
203    #[test]
204    fn boxed() {
205        let mut frb = FixedRingBuffer::new(256);
206
207        assert!(frb.pop_front().is_none());
208
209        frb.push(Box::new(1)).unwrap();
210        frb.push(Box::new(3)).unwrap();
211
212        assert_eq!(frb.pop_front(), Some(Box::new(1)));
213        assert_eq!(frb.pop_front(), Some(Box::new(3)));
214        assert_eq!(frb.pop_front(), None);
215    }
216}