polars_utils/
chunked_bytes_cursor.rs

1use std::num::NonZeroUsize;
2
3/// Cursor over fixed size chunks of bytes.
4pub struct FixedSizeChunkedBytesCursor<'a, T> {
5    position: usize,
6    total_size: usize,
7    chunk_size: NonZeroUsize,
8    /// Note, the last chunk is allowed to have a length shorter than the `chunk_size`.
9    chunked_bytes: &'a [T],
10}
11
12#[derive(Debug)]
13pub enum FixedSizeChunkedBytesCursorInitErr {
14    ChunkLengthMismatch { index: usize },
15    EmptyFirstChunk,
16    NoChunks,
17}
18
19impl<'a, T> FixedSizeChunkedBytesCursor<'a, T>
20where
21    T: AsRef<[u8]>,
22{
23    /// Expects `chunked_bytes` to have a non-empty length `n`, where `chunked_bytes[..n - 1]` all have the same length.
24    pub fn try_new(chunked_bytes: &'a [T]) -> Result<Self, FixedSizeChunkedBytesCursorInitErr> {
25        use FixedSizeChunkedBytesCursorInitErr as E;
26
27        if chunked_bytes.is_empty() {
28            return Err(E::NoChunks);
29        }
30
31        let Ok(chunk_size) = NonZeroUsize::try_from(chunked_bytes[0].as_ref().len()) else {
32            return Err(E::EmptyFirstChunk);
33        };
34
35        let mut total_size: usize = 0;
36
37        for (i, bytes) in chunked_bytes.iter().enumerate() {
38            let bytes = bytes.as_ref();
39
40            if bytes.len() != chunk_size.get() && chunked_bytes.len() - i > 1 {
41                return Err(E::ChunkLengthMismatch { index: i });
42            }
43
44            total_size = total_size.checked_add(bytes.len()).unwrap();
45        }
46
47        Ok(Self {
48            position: 0,
49            total_size,
50            chunk_size,
51            chunked_bytes,
52        })
53    }
54}
55
56impl<'a, T> std::io::Read for FixedSizeChunkedBytesCursor<'a, T>
57where
58    T: AsRef<[u8]>,
59{
60    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
61        let available_bytes = self.total_size.saturating_sub(self.position);
62        let new_position = self.position + buf.len().min(available_bytes);
63
64        let requested_byte_range = self.position..new_position;
65
66        if requested_byte_range.is_empty() {
67            return Ok(0);
68        }
69
70        // First chunk needs special handling as the offset within the chunk can be non-zero.
71        let mut bytes_read = {
72            let (first_chunk_idx, offset_in_chunk) = (
73                requested_byte_range.start / self.chunk_size,
74                requested_byte_range.start % self.chunk_size,
75            );
76            let chunk_bytes = self.chunked_bytes[first_chunk_idx].as_ref();
77            let len = requested_byte_range
78                .len()
79                .min(chunk_bytes.len() - offset_in_chunk);
80
81            buf[..len].copy_from_slice(&chunk_bytes[offset_in_chunk..offset_in_chunk + len]);
82
83            len
84        };
85
86        assert!(
87            (requested_byte_range.start + bytes_read).is_multiple_of(self.chunk_size.get())
88                || bytes_read == requested_byte_range.len()
89        );
90
91        for chunk_idx in (requested_byte_range.start + bytes_read) / self.chunk_size
92            ..requested_byte_range.end.div_ceil(self.chunk_size.get())
93        {
94            let chunk_bytes = self.chunked_bytes[chunk_idx].as_ref();
95            let len = (requested_byte_range.len() - bytes_read).min(chunk_bytes.len());
96
97            buf[bytes_read..bytes_read + len].copy_from_slice(&chunk_bytes[..len]);
98
99            bytes_read += len;
100        }
101
102        assert_eq!(bytes_read, requested_byte_range.len());
103
104        self.position = new_position;
105
106        Ok(requested_byte_range.len())
107    }
108}
109
110impl<'a, T> std::io::Seek for FixedSizeChunkedBytesCursor<'a, T> {
111    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
112        // Mostly copied from io::Cursor::seek().
113        use std::io::SeekFrom;
114
115        let (base_pos, offset) = match pos {
116            SeekFrom::Start(n) => {
117                self.position = usize::try_from(n).unwrap().min(self.total_size);
118                return Ok(self.position as u64);
119            },
120            SeekFrom::End(n) => (self.total_size as u64, n),
121            SeekFrom::Current(n) => (self.position as u64, n),
122        };
123        match base_pos.checked_add_signed(offset) {
124            Some(n) => {
125                self.position = usize::try_from(n).unwrap();
126                Ok(self.position as u64)
127            },
128            None => Err(std::io::Error::new(
129                std::io::ErrorKind::InvalidInput,
130                "invalid seek to a negative or overflowing position",
131            )),
132        }
133    }
134}