polars_io/cloud/cloud_writer/
bufferer.rs1use bytes::Bytes;
2use object_store::PutPayload;
3
4use crate::configs::{cloud_writer_coalesce_run_length, cloud_writer_copy_buffer_size};
5
6pub(super) struct BytesBufferer {
9 target_output_size: usize,
11 buffered_bytes: Vec<Bytes>,
12 copy_buffer: Vec<u8>,
14 copy_buffer_reserve_size: usize,
15 num_bytes_buffered: usize,
17 tail_coalesce_num_items: usize,
18 tail_coalesce_byte_offset: usize,
19}
20
21impl BytesBufferer {
22 pub(super) fn new(target_output_size: usize) -> Self {
23 let copy_buffer_reserve_size =
24 usize::min(target_output_size, cloud_writer_copy_buffer_size().get());
25
26 BytesBufferer {
27 target_output_size,
28
29 buffered_bytes: Vec::with_capacity(if target_output_size == 0 {
30 1
31 } else {
32 usize::max(
33 target_output_size.div_ceil(copy_buffer_reserve_size),
34 match cloud_writer_coalesce_run_length() {
35 n if n <= copy_buffer_reserve_size => n,
36 _ => 0,
37 },
38 )
39 }),
40 copy_buffer: vec![],
41 copy_buffer_reserve_size,
42 num_bytes_buffered: 0,
43 tail_coalesce_num_items: 0,
44 tail_coalesce_byte_offset: 0,
45 }
46 }
47
48 pub(super) fn push_owned(&mut self, bytes: &mut Bytes) {
51 if bytes.is_empty() {
52 return;
53 }
54
55 let available_capacity = self.available_capacity_current_chunk(bytes.len());
56
57 if available_capacity == 0 {
58 return;
59 }
60
61 loop {
62 let copy_buffer_available_capacity = usize::min(
63 available_capacity,
64 self.copy_buffer.capacity() - self.copy_buffer.len(),
65 );
66
67 if bytes.len() <= copy_buffer_available_capacity {
68 self.copy_buffer.extend_from_slice(bytes);
69 self.num_bytes_buffered += bytes.len();
70 *bytes = Bytes::new();
71
72 return;
73 }
74
75 self.commit_active_copy_buffer();
76
77 if self.tail_coalesce_num_items >= cloud_writer_coalesce_run_length() {
78 self.coalesce_tail();
79 continue;
80 }
81
82 break;
83 }
84
85 let bytes = bytes.split_to(usize::min(bytes.len(), available_capacity));
86
87 let bytes_len = bytes.len();
88 self.buffered_bytes.push(bytes);
89 self.num_bytes_buffered += bytes_len;
90
91 if self.num_bytes_buffered - self.tail_coalesce_byte_offset <= self.copy_buffer_reserve_size
92 {
93 self.tail_coalesce_num_items += 1;
94 } else {
95 self.reset_tail_coalesce_counters();
96 }
97 }
98
99 pub(super) fn push_slice(&mut self, bytes: &mut &[u8]) {
102 while !bytes.is_empty() {
103 let available_capacity = self.available_capacity_current_chunk(bytes.len());
104
105 if available_capacity == 0 {
106 break;
107 }
108
109 let mut copy_buffer_available_capacity = usize::min(
110 available_capacity,
111 self.copy_buffer.capacity() - self.copy_buffer.len(),
112 );
113
114 if copy_buffer_available_capacity == 0 {
115 self.commit_active_copy_buffer();
116 copy_buffer_available_capacity =
117 self.reserve_active_copy_buffer(available_capacity);
118 }
119
120 let n = usize::min(bytes.len(), copy_buffer_available_capacity);
121
122 self.copy_buffer
123 .extend_from_slice(bytes.split_off(..n).unwrap());
124 self.num_bytes_buffered += n;
125 }
126 }
127
128 fn coalesce_tail(&mut self) {
129 if self.tail_coalesce_num_items < 2 {
130 return;
131 }
132
133 assert_eq!(self.copy_buffer.capacity(), 0);
134 assert!(self.tail_coalesce_byte_offset < self.target_output_size);
135
136 let copy_buffer_reserve = usize::min(
137 self.copy_buffer_reserve_size,
138 self.target_output_size - self.tail_coalesce_byte_offset,
139 );
140
141 assert!(copy_buffer_reserve >= (self.num_bytes_buffered - self.tail_coalesce_byte_offset));
142
143 let drain_start: usize = self.buffered_bytes.len() - self.tail_coalesce_num_items;
144 let drain_range = drain_start..;
145 self.reset_tail_coalesce_counters();
146
147 self.copy_buffer.reserve_exact(copy_buffer_reserve);
148 self.buffered_bytes
149 .drain(drain_range)
150 .for_each(|bytes| self.copy_buffer.extend_from_slice(&bytes));
151 }
152
153 fn reset_tail_coalesce_counters(&mut self) {
154 self.tail_coalesce_byte_offset = self.num_bytes_buffered;
155 self.tail_coalesce_num_items = 0;
156 }
157
158 pub(super) fn is_empty(&self) -> bool {
159 if self.num_bytes_buffered == 0 {
160 assert!(self.buffered_bytes.is_empty());
161 assert_eq!(self.copy_buffer.capacity(), 0);
162 true
163 } else {
164 false
165 }
166 }
167
168 pub(super) fn is_full(&self) -> bool {
169 self.num_bytes_buffered >= usize::max(1, self.target_output_size)
170 }
171
172 pub(super) fn flush_full_chunk(&mut self) -> Option<PutPayload> {
173 self.is_full().then(|| self.flush().unwrap())
174 }
175
176 pub(super) fn flush(&mut self) -> Option<PutPayload> {
177 if self.is_empty() {
178 return None;
179 }
180
181 self.commit_active_copy_buffer();
182
183 self.num_bytes_buffered = 0;
184 self.reset_tail_coalesce_counters();
185
186 let payload = PutPayload::from_iter(self.buffered_bytes.drain(..));
187
188 Some(payload)
189 }
190
191 fn available_capacity_current_chunk(&self, incoming_len: usize) -> usize {
192 if self.target_output_size > 0 {
193 self.target_output_size - self.num_bytes_buffered
194 } else if self.is_empty() {
195 incoming_len
196 } else {
197 0
198 }
199 }
200
201 #[inline]
202 fn commit_active_copy_buffer(&mut self) {
203 if !self.copy_buffer.is_empty() {
204 self.num_bytes_buffered -= self.copy_buffer.len();
205 let mut bytes: Bytes = std::mem::take(&mut self.copy_buffer).into();
206 self.push_owned(&mut bytes);
207 assert!(bytes.is_empty());
208 }
209 }
210
211 fn reserve_active_copy_buffer(&mut self, available_capacity_current_chunk: usize) -> usize {
212 let n = if self.copy_buffer_reserve_size > 0 {
213 usize::min(
214 self.copy_buffer_reserve_size,
215 available_capacity_current_chunk,
216 )
217 } else {
218 available_capacity_current_chunk
219 };
220
221 self.copy_buffer.reserve_exact(n);
222
223 usize::min(
224 self.copy_buffer.capacity() - self.copy_buffer.len(),
225 available_capacity_current_chunk,
226 )
227 }
228}