Skip to main content

polars_io/cloud/cloud_writer/
bufferer.rs

1use bytes::Bytes;
2use object_store::PutPayload;
3
4use crate::configs::{cloud_writer_coalesce_run_length, cloud_writer_copy_buffer_size};
5
6/// Utility for byte buffering logic. Accepts both owned [`Bytes`] and borrowed `&[u8]` incoming
7/// bytes. Buffered bytes can be flushed to a [`PutPayload`].
8pub(super) struct BytesBufferer {
9    /// Buffer until this many bytes. If set to `0`, buffering is disabled.
10    target_output_size: usize,
11    buffered_bytes: Vec<Bytes>,
12    /// Copy bytes from small or borrowed (`&[u8]`) incoming buffers.
13    copy_buffer: Vec<u8>,
14    copy_buffer_reserve_size: usize,
15    /// Total bytes buffered, includes both `buffered_bytes` and `copy_buffer.len()`.
16    num_bytes_buffered: usize,
17    tail_coalesce_num_items: usize,
18    tail_coalesce_byte_offset: usize,
19}
20
21impl BytesBufferer {
22    pub(super) fn new(target_output_size: usize) -> Self {
23        let copy_buffer_reserve_size =
24            usize::min(target_output_size, cloud_writer_copy_buffer_size().get());
25
26        BytesBufferer {
27            target_output_size,
28
29            buffered_bytes: Vec::with_capacity(if target_output_size == 0 {
30                1
31            } else {
32                usize::max(
33                    target_output_size.div_ceil(copy_buffer_reserve_size),
34                    match cloud_writer_coalesce_run_length() {
35                        n if n <= copy_buffer_reserve_size => n,
36                        _ => 0,
37                    },
38                )
39            }),
40            copy_buffer: vec![],
41            copy_buffer_reserve_size,
42            num_bytes_buffered: 0,
43            tail_coalesce_num_items: 0,
44            tail_coalesce_byte_offset: 0,
45        }
46    }
47
48    /// Push owned [`Bytes`] into this bufferer. This will consume from a mutable reference
49    /// via [`Bytes::split_to`] until either the bytes is fully consumed, or `self` is full.
50    pub(super) fn push_owned(&mut self, bytes: &mut Bytes) {
51        if bytes.is_empty() {
52            return;
53        }
54
55        let available_capacity = self.available_capacity_current_chunk(bytes.len());
56
57        if available_capacity == 0 {
58            return;
59        }
60
61        loop {
62            let copy_buffer_available_capacity = usize::min(
63                available_capacity,
64                self.copy_buffer.capacity() - self.copy_buffer.len(),
65            );
66
67            if bytes.len() <= copy_buffer_available_capacity {
68                self.copy_buffer.extend_from_slice(bytes);
69                self.num_bytes_buffered += bytes.len();
70                *bytes = Bytes::new();
71
72                return;
73            }
74
75            self.commit_active_copy_buffer();
76
77            if self.tail_coalesce_num_items >= cloud_writer_coalesce_run_length() {
78                self.coalesce_tail();
79                continue;
80            }
81
82            break;
83        }
84
85        let bytes = bytes.split_to(usize::min(bytes.len(), available_capacity));
86
87        let bytes_len = bytes.len();
88        self.buffered_bytes.push(bytes);
89        self.num_bytes_buffered += bytes_len;
90
91        if self.num_bytes_buffered - self.tail_coalesce_byte_offset <= self.copy_buffer_reserve_size
92        {
93            self.tail_coalesce_num_items += 1;
94        } else {
95            self.reset_tail_coalesce_counters();
96        }
97    }
98
99    /// Push borrowed `&[u8]` into this bufferer. This will consume from a mutable reference
100    /// via `split_off` until either the slice is fully consumed, or `self` is full.
101    pub(super) fn push_slice(&mut self, bytes: &mut &[u8]) {
102        while !bytes.is_empty() {
103            let available_capacity = self.available_capacity_current_chunk(bytes.len());
104
105            if available_capacity == 0 {
106                break;
107            }
108
109            let mut copy_buffer_available_capacity = usize::min(
110                available_capacity,
111                self.copy_buffer.capacity() - self.copy_buffer.len(),
112            );
113
114            if copy_buffer_available_capacity == 0 {
115                self.commit_active_copy_buffer();
116                copy_buffer_available_capacity =
117                    self.reserve_active_copy_buffer(available_capacity);
118            }
119
120            let n = usize::min(bytes.len(), copy_buffer_available_capacity);
121
122            self.copy_buffer
123                .extend_from_slice(bytes.split_off(..n).unwrap());
124            self.num_bytes_buffered += n;
125        }
126    }
127
128    fn coalesce_tail(&mut self) {
129        if self.tail_coalesce_num_items < 2 {
130            return;
131        }
132
133        assert_eq!(self.copy_buffer.capacity(), 0);
134        assert!(self.tail_coalesce_byte_offset < self.target_output_size);
135
136        let copy_buffer_reserve = usize::min(
137            self.copy_buffer_reserve_size,
138            self.target_output_size - self.tail_coalesce_byte_offset,
139        );
140
141        assert!(copy_buffer_reserve >= (self.num_bytes_buffered - self.tail_coalesce_byte_offset));
142
143        let drain_start: usize = self.buffered_bytes.len() - self.tail_coalesce_num_items;
144        let drain_range = drain_start..;
145        self.reset_tail_coalesce_counters();
146
147        self.copy_buffer.reserve_exact(copy_buffer_reserve);
148        self.buffered_bytes
149            .drain(drain_range)
150            .for_each(|bytes| self.copy_buffer.extend_from_slice(&bytes));
151    }
152
153    fn reset_tail_coalesce_counters(&mut self) {
154        self.tail_coalesce_byte_offset = self.num_bytes_buffered;
155        self.tail_coalesce_num_items = 0;
156    }
157
158    pub(super) fn is_empty(&self) -> bool {
159        if self.num_bytes_buffered == 0 {
160            assert!(self.buffered_bytes.is_empty());
161            assert_eq!(self.copy_buffer.capacity(), 0);
162            true
163        } else {
164            false
165        }
166    }
167
168    pub(super) fn is_full(&self) -> bool {
169        self.num_bytes_buffered >= usize::max(1, self.target_output_size)
170    }
171
172    pub(super) fn flush_full_chunk(&mut self) -> Option<PutPayload> {
173        self.is_full().then(|| self.flush().unwrap())
174    }
175
176    pub(super) fn flush(&mut self) -> Option<PutPayload> {
177        if self.is_empty() {
178            return None;
179        }
180
181        self.commit_active_copy_buffer();
182
183        self.num_bytes_buffered = 0;
184        self.reset_tail_coalesce_counters();
185
186        let payload = PutPayload::from_iter(self.buffered_bytes.drain(..));
187
188        Some(payload)
189    }
190
191    fn available_capacity_current_chunk(&self, incoming_len: usize) -> usize {
192        if self.target_output_size > 0 {
193            self.target_output_size - self.num_bytes_buffered
194        } else if self.is_empty() {
195            incoming_len
196        } else {
197            0
198        }
199    }
200
201    #[inline]
202    fn commit_active_copy_buffer(&mut self) {
203        if !self.copy_buffer.is_empty() {
204            self.num_bytes_buffered -= self.copy_buffer.len();
205            let mut bytes: Bytes = std::mem::take(&mut self.copy_buffer).into();
206            self.push_owned(&mut bytes);
207            assert!(bytes.is_empty());
208        }
209    }
210
211    fn reserve_active_copy_buffer(&mut self, available_capacity_current_chunk: usize) -> usize {
212        let n = if self.copy_buffer_reserve_size > 0 {
213            usize::min(
214                self.copy_buffer_reserve_size,
215                available_capacity_current_chunk,
216            )
217        } else {
218            available_capacity_current_chunk
219        };
220
221        self.copy_buffer.reserve_exact(n);
222
223        usize::min(
224            self.copy_buffer.capacity() - self.copy_buffer.len(),
225            available_capacity_current_chunk,
226        )
227    }
228}