Skip to main content

polars_io/utils/
bytes_bufferer.rs

1use std::num::NonZeroUsize;
2use std::ops::Range;
3
4use arrow::io::write_owned::WriteBytesOwned;
5use polars_buffer::Buffer;
6
7#[derive(Debug, Clone)]
8pub struct BytesBuffererConfig {
9    pub target_size: Range<NonZeroUsize>,
10    /// `min..max` allocation size. `min` / `max` must be less than
11    /// `target_size.min` / `target_size.max` respectively.
12    pub copy_buffer_reserve_size: Range<NonZeroUsize>,
13}
14
15#[derive(Debug, Clone, Default)]
16pub struct BytesBuffererStats {
17    pub total_bytes: usize,
18    pub chunks: usize,
19    pub copied_bytes: usize,
20    pub allocations: u64,
21}
22
23/// Utility for byte buffering logic. Accepts both owned [`Buffer<u8>`] and borrowed `&[u8]` incoming
24/// bytes.
25pub struct BytesBufferer {
26    target_size: Range<NonZeroUsize>,
27    copy_buffer_reserve_size: Range<NonZeroUsize>,
28    buffered_bytes: Vec<Buffer<u8>>,
29    copy_buffer: Vec<u8>,
30    num_bytes_buffered: usize,
31
32    // Metrics
33    num_copied_bytes: usize,
34    num_allocations: u64,
35}
36
37impl BytesBufferer {
38    pub fn new(config: &BytesBuffererConfig) -> Self {
39        let BytesBuffererConfig {
40            target_size,
41            copy_buffer_reserve_size,
42        } = config;
43
44        BytesBufferer {
45            target_size: target_size.clone(),
46            copy_buffer_reserve_size: copy_buffer_reserve_size.clone(),
47            buffered_bytes: Vec::with_capacity(8),
48            copy_buffer: vec![],
49            num_bytes_buffered: 0,
50
51            num_copied_bytes: 0,
52            num_allocations: 0,
53        }
54    }
55
56    pub fn stats(&self) -> BytesBuffererStats {
57        BytesBuffererStats {
58            total_bytes: self.num_bytes_buffered,
59            chunks: self.buffered_bytes.len() + !self.copy_buffer.is_empty() as usize,
60            copied_bytes: self.num_copied_bytes,
61            allocations: self.num_allocations,
62        }
63    }
64
65    pub fn is_empty(&self) -> bool {
66        self.len() == 0
67    }
68
69    pub fn len(&self) -> usize {
70        self.num_bytes_buffered
71    }
72
73    pub fn push_owned(&mut self, bytes: &Buffer<u8>) {
74        if bytes.len() <= self.copy_buffer.capacity() - self.copy_buffer.len() {
75            self.push_slice(bytes);
76            return;
77        }
78
79        let copy_buffer_was_empty = !self.commit_copy_buffer();
80        let half_min_target_size = self.target_size.start.get() / 2;
81
82        if copy_buffer_was_empty
83            && bytes.len() < half_min_target_size
84            && let Some(prev_bytes) = self
85                .buffered_bytes
86                .pop_if(|prev_bytes| prev_bytes.len() < half_min_target_size)
87        {
88            self.num_bytes_buffered -= prev_bytes.len();
89            self.reserve_copy_buffer(prev_bytes.len() + bytes.len());
90            self.push_slice(&prev_bytes);
91            self.push_slice(bytes);
92            return;
93        }
94
95        let bytes_len = bytes.len();
96
97        if let Some((n_parts, part_size, rem)) = (bytes.len() / self.target_size.end.get()
98            ..=bytes.len().div_ceil(self.target_size.end.get()))
99            .filter(|n_parts| *n_parts != 0)
100            .map(|n_parts| (n_parts, bytes.len() / n_parts, bytes.len() % n_parts))
101            .max_by_key(|(_, part_size, _)| part_size.abs_diff(self.target_size.end.get()))
102            && n_parts > 1
103        {
104            for i in 0..n_parts {
105                let start = i * part_size + usize::min(rem, i);
106                let end = start + part_size + (i < rem) as usize;
107
108                self.buffered_bytes
109                    .push(Buffer::clone(bytes).sliced(start..end));
110            }
111        } else {
112            self.buffered_bytes.push(Buffer::clone(bytes));
113        }
114
115        self.num_bytes_buffered += bytes_len;
116    }
117
118    pub fn push_slice(&mut self, mut bytes: &[u8]) {
119        while !bytes.is_empty() {
120            if self.copy_buffer.is_empty() {
121                self.reserve_copy_buffer(bytes.len());
122            }
123
124            if let n = self.copy_buffer.capacity() - self.copy_buffer.len()
125                && n != 0
126            {
127                let n = usize::min(n, bytes.len());
128
129                self.copy_buffer
130                    .extend_from_slice(bytes.split_off(..n).unwrap());
131                self.num_bytes_buffered += n;
132                self.num_copied_bytes += n;
133            }
134
135            if self.copy_buffer.len() == self.copy_buffer.capacity() {
136                self.commit_copy_buffer();
137            } else {
138                assert!(bytes.is_empty());
139            }
140        }
141    }
142
143    pub fn drain(&mut self) -> std::vec::Drain<'_, Buffer<u8>> {
144        self.commit_copy_buffer();
145        self.num_bytes_buffered = 0;
146        self.buffered_bytes.drain(..)
147    }
148
149    /// Guarantees that `self.copy_buffer` has spare capacity. Does not guarantee how much spare
150    /// capacity (should be re-called if more capacity is needed).
151    ///
152    /// # Panics
153    /// Panics if `self.copy_buffer` is not empty.
154    fn reserve_copy_buffer(&mut self, incoming_len: usize) {
155        assert_eq!(self.copy_buffer.len(), 0);
156
157        if self.copy_buffer.capacity() != 0 {
158            return;
159        }
160
161        self.num_allocations += 1;
162
163        let reserve_size = usize::max(self.num_bytes_buffered.saturating_mul(2), incoming_len)
164            .clamp(
165                self.copy_buffer_reserve_size.start.get(),
166                self.copy_buffer_reserve_size.end.get(),
167            );
168
169        // Avoid over-allocating for small header pushes.
170        if incoming_len.saturating_mul(128) < reserve_size
171            && self.buffered_bytes.last().is_none_or(|bytes| {
172                // Prevent this branch from being hit successively on multiple
173                // small writes that accumulate to a large amount.
174                bytes.len() > 16 * incoming_len
175            })
176        {
177            self.copy_buffer.reserve_exact(8 * incoming_len);
178            return;
179        }
180
181        self.copy_buffer.reserve_exact(reserve_size);
182    }
183
184    #[inline]
185    fn commit_copy_buffer(&mut self) -> bool {
186        if !self.copy_buffer.is_empty() {
187            self.buffered_bytes
188                .push(Buffer::from_vec(std::mem::take(&mut self.copy_buffer)));
189            true
190        } else {
191            false
192        }
193    }
194}
195
196impl WriteBytesOwned for BytesBufferer {
197    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
198        self.push_slice(buf);
199        Ok(())
200    }
201
202    fn write_all_owned(&mut self, bytes: &Buffer<u8>) -> std::io::Result<()> {
203        self.push_owned(bytes);
204        Ok(())
205    }
206
207    fn len(&self) -> usize {
208        BytesBufferer::len(self)
209    }
210}
211
212impl IntoIterator for BytesBufferer {
213    type Item = <Vec<Buffer<u8>> as IntoIterator>::Item;
214    type IntoIter = <Vec<Buffer<u8>> as IntoIterator>::IntoIter;
215
216    fn into_iter(mut self) -> Self::IntoIter {
217        self.commit_copy_buffer();
218        self.buffered_bytes.into_iter()
219    }
220}