polars_utils/
chunked_bytes_cursor.rs1use std::num::NonZeroUsize;
2
3pub struct FixedSizeChunkedBytesCursor<'a, T> {
5 position: usize,
6 total_size: usize,
7 chunk_size: NonZeroUsize,
8 chunked_bytes: &'a [T],
10}
11
12#[derive(Debug)]
13pub enum FixedSizeChunkedBytesCursorInitErr {
14 ChunkLengthMismatch { index: usize },
15 EmptyFirstChunk,
16 NoChunks,
17}
18
19impl<'a, T> FixedSizeChunkedBytesCursor<'a, T>
20where
21 T: AsRef<[u8]>,
22{
23 pub fn try_new(chunked_bytes: &'a [T]) -> Result<Self, FixedSizeChunkedBytesCursorInitErr> {
25 use FixedSizeChunkedBytesCursorInitErr as E;
26
27 if chunked_bytes.is_empty() {
28 return Err(E::NoChunks);
29 }
30
31 let Ok(chunk_size) = NonZeroUsize::try_from(chunked_bytes[0].as_ref().len()) else {
32 return Err(E::EmptyFirstChunk);
33 };
34
35 let mut total_size: usize = 0;
36
37 for (i, bytes) in chunked_bytes.iter().enumerate() {
38 let bytes = bytes.as_ref();
39
40 if bytes.len() != chunk_size.get() && chunked_bytes.len() - i > 1 {
41 return Err(E::ChunkLengthMismatch { index: i });
42 }
43
44 total_size = total_size.checked_add(bytes.len()).unwrap();
45 }
46
47 Ok(Self {
48 position: 0,
49 total_size,
50 chunk_size,
51 chunked_bytes,
52 })
53 }
54}
55
56impl<'a, T> std::io::Read for FixedSizeChunkedBytesCursor<'a, T>
57where
58 T: AsRef<[u8]>,
59{
60 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
61 let available_bytes = self.total_size.saturating_sub(self.position);
62 let new_position = self.position + buf.len().min(available_bytes);
63
64 let requested_byte_range = self.position..new_position;
65
66 if requested_byte_range.is_empty() {
67 return Ok(0);
68 }
69
70 let mut bytes_read = {
72 let (first_chunk_idx, offset_in_chunk) = (
73 requested_byte_range.start / self.chunk_size,
74 requested_byte_range.start % self.chunk_size,
75 );
76 let chunk_bytes = self.chunked_bytes[first_chunk_idx].as_ref();
77 let len = requested_byte_range
78 .len()
79 .min(chunk_bytes.len() - offset_in_chunk);
80
81 buf[..len].copy_from_slice(&chunk_bytes[offset_in_chunk..offset_in_chunk + len]);
82
83 len
84 };
85
86 assert!(
87 (requested_byte_range.start + bytes_read).is_multiple_of(self.chunk_size.get())
88 || bytes_read == requested_byte_range.len()
89 );
90
91 for chunk_idx in (requested_byte_range.start + bytes_read) / self.chunk_size
92 ..requested_byte_range.end.div_ceil(self.chunk_size.get())
93 {
94 let chunk_bytes = self.chunked_bytes[chunk_idx].as_ref();
95 let len = (requested_byte_range.len() - bytes_read).min(chunk_bytes.len());
96
97 buf[bytes_read..bytes_read + len].copy_from_slice(&chunk_bytes[..len]);
98
99 bytes_read += len;
100 }
101
102 assert_eq!(bytes_read, requested_byte_range.len());
103
104 self.position = new_position;
105
106 Ok(requested_byte_range.len())
107 }
108}
109
110impl<'a, T> std::io::Seek for FixedSizeChunkedBytesCursor<'a, T> {
111 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
112 use std::io::SeekFrom;
114
115 let (base_pos, offset) = match pos {
116 SeekFrom::Start(n) => {
117 self.position = usize::try_from(n).unwrap().min(self.total_size);
118 return Ok(self.position as u64);
119 },
120 SeekFrom::End(n) => (self.total_size as u64, n),
121 SeekFrom::Current(n) => (self.position as u64, n),
122 };
123 match base_pos.checked_add_signed(offset) {
124 Some(n) => {
125 self.position = usize::try_from(n).unwrap();
126 Ok(self.position as u64)
127 },
128 None => Err(std::io::Error::new(
129 std::io::ErrorKind::InvalidInput,
130 "invalid seek to a negative or overflowing position",
131 )),
132 }
133 }
134}