polars_io/utils/
bytes_bufferer.rs1use std::num::NonZeroUsize;
2use std::ops::Range;
3
4use arrow::io::write_owned::WriteBytesOwned;
5use polars_buffer::Buffer;
6
7#[derive(Debug, Clone)]
8pub struct BytesBuffererConfig {
9 pub target_size: Range<NonZeroUsize>,
10 pub copy_buffer_reserve_size: Range<NonZeroUsize>,
13}
14
15#[derive(Debug, Clone, Default)]
16pub struct BytesBuffererStats {
17 pub total_bytes: usize,
18 pub chunks: usize,
19 pub copied_bytes: usize,
20 pub allocations: u64,
21}
22
23pub struct BytesBufferer {
26 target_size: Range<NonZeroUsize>,
27 copy_buffer_reserve_size: Range<NonZeroUsize>,
28 buffered_bytes: Vec<Buffer<u8>>,
29 copy_buffer: Vec<u8>,
30 num_bytes_buffered: usize,
31
32 num_copied_bytes: usize,
34 num_allocations: u64,
35}
36
37impl BytesBufferer {
38 pub fn new(config: &BytesBuffererConfig) -> Self {
39 let BytesBuffererConfig {
40 target_size,
41 copy_buffer_reserve_size,
42 } = config;
43
44 BytesBufferer {
45 target_size: target_size.clone(),
46 copy_buffer_reserve_size: copy_buffer_reserve_size.clone(),
47 buffered_bytes: Vec::with_capacity(8),
48 copy_buffer: vec![],
49 num_bytes_buffered: 0,
50
51 num_copied_bytes: 0,
52 num_allocations: 0,
53 }
54 }
55
56 pub fn stats(&self) -> BytesBuffererStats {
57 BytesBuffererStats {
58 total_bytes: self.num_bytes_buffered,
59 chunks: self.buffered_bytes.len() + !self.copy_buffer.is_empty() as usize,
60 copied_bytes: self.num_copied_bytes,
61 allocations: self.num_allocations,
62 }
63 }
64
65 pub fn is_empty(&self) -> bool {
66 self.len() == 0
67 }
68
69 pub fn len(&self) -> usize {
70 self.num_bytes_buffered
71 }
72
73 pub fn push_owned(&mut self, bytes: &Buffer<u8>) {
74 if bytes.len() <= self.copy_buffer.capacity() - self.copy_buffer.len() {
75 self.push_slice(bytes);
76 return;
77 }
78
79 let copy_buffer_was_empty = !self.commit_copy_buffer();
80 let half_min_target_size = self.target_size.start.get() / 2;
81
82 if copy_buffer_was_empty
83 && bytes.len() < half_min_target_size
84 && let Some(prev_bytes) = self
85 .buffered_bytes
86 .pop_if(|prev_bytes| prev_bytes.len() < half_min_target_size)
87 {
88 self.num_bytes_buffered -= prev_bytes.len();
89 self.reserve_copy_buffer(prev_bytes.len() + bytes.len());
90 self.push_slice(&prev_bytes);
91 self.push_slice(bytes);
92 return;
93 }
94
95 let bytes_len = bytes.len();
96
97 if let Some((n_parts, part_size, rem)) = (bytes.len() / self.target_size.end.get()
98 ..=bytes.len().div_ceil(self.target_size.end.get()))
99 .filter(|n_parts| *n_parts != 0)
100 .map(|n_parts| (n_parts, bytes.len() / n_parts, bytes.len() % n_parts))
101 .max_by_key(|(_, part_size, _)| part_size.abs_diff(self.target_size.end.get()))
102 && n_parts > 1
103 {
104 for i in 0..n_parts {
105 let start = i * part_size + usize::min(rem, i);
106 let end = start + part_size + (i < rem) as usize;
107
108 self.buffered_bytes
109 .push(Buffer::clone(bytes).sliced(start..end));
110 }
111 } else {
112 self.buffered_bytes.push(Buffer::clone(bytes));
113 }
114
115 self.num_bytes_buffered += bytes_len;
116 }
117
118 pub fn push_slice(&mut self, mut bytes: &[u8]) {
119 while !bytes.is_empty() {
120 if self.copy_buffer.is_empty() {
121 self.reserve_copy_buffer(bytes.len());
122 }
123
124 if let n = self.copy_buffer.capacity() - self.copy_buffer.len()
125 && n != 0
126 {
127 let n = usize::min(n, bytes.len());
128
129 self.copy_buffer
130 .extend_from_slice(bytes.split_off(..n).unwrap());
131 self.num_bytes_buffered += n;
132 self.num_copied_bytes += n;
133 }
134
135 if self.copy_buffer.len() == self.copy_buffer.capacity() {
136 self.commit_copy_buffer();
137 } else {
138 assert!(bytes.is_empty());
139 }
140 }
141 }
142
143 pub fn drain(&mut self) -> std::vec::Drain<'_, Buffer<u8>> {
144 self.commit_copy_buffer();
145 self.num_bytes_buffered = 0;
146 self.buffered_bytes.drain(..)
147 }
148
149 fn reserve_copy_buffer(&mut self, incoming_len: usize) {
155 assert_eq!(self.copy_buffer.len(), 0);
156
157 if self.copy_buffer.capacity() != 0 {
158 return;
159 }
160
161 self.num_allocations += 1;
162
163 let reserve_size = usize::max(self.num_bytes_buffered.saturating_mul(2), incoming_len)
164 .clamp(
165 self.copy_buffer_reserve_size.start.get(),
166 self.copy_buffer_reserve_size.end.get(),
167 );
168
169 if incoming_len.saturating_mul(128) < reserve_size
171 && self.buffered_bytes.last().is_none_or(|bytes| {
172 bytes.len() > 16 * incoming_len
175 })
176 {
177 self.copy_buffer.reserve_exact(8 * incoming_len);
178 return;
179 }
180
181 self.copy_buffer.reserve_exact(reserve_size);
182 }
183
184 #[inline]
185 fn commit_copy_buffer(&mut self) -> bool {
186 if !self.copy_buffer.is_empty() {
187 self.buffered_bytes
188 .push(Buffer::from_vec(std::mem::take(&mut self.copy_buffer)));
189 true
190 } else {
191 false
192 }
193 }
194}
195
196impl WriteBytesOwned for BytesBufferer {
197 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
198 self.push_slice(buf);
199 Ok(())
200 }
201
202 fn write_all_owned(&mut self, bytes: &Buffer<u8>) -> std::io::Result<()> {
203 self.push_owned(bytes);
204 Ok(())
205 }
206
207 fn len(&self) -> usize {
208 BytesBufferer::len(self)
209 }
210}
211
212impl IntoIterator for BytesBufferer {
213 type Item = <Vec<Buffer<u8>> as IntoIterator>::Item;
214 type IntoIter = <Vec<Buffer<u8>> as IntoIterator>::IntoIter;
215
216 fn into_iter(mut self) -> Self::IntoIter {
217 self.commit_copy_buffer();
218 self.buffered_bytes.into_iter()
219 }
220}