polars_io/utils/
compression.rs1use std::cmp;
2use std::io::Read;
3
4use polars_core::prelude::*;
5use polars_error::{feature_gated, to_compute_err};
6use polars_utils::mmap::{MemReader, MemSlice};
7
8pub enum SupportedCompression {
10 GZIP,
11 ZLIB,
12 ZSTD,
13}
14
15impl SupportedCompression {
16 pub fn check(bytes: &[u8]) -> Option<Self> {
20 if bytes.len() < 4 {
21 return None;
23 }
24 match bytes[..4] {
25 [0x1f, 0x8b, _, _] => Some(Self::GZIP),
26 [0x78, 0x01, _, _] => Some(Self::ZLIB),
28 [0x78, 0x5e, _, _] => Some(Self::ZLIB),
29 [0x78, 0x9c, _, _] => Some(Self::ZLIB),
30 [0x78, 0xda, _, _] => Some(Self::ZLIB),
31 [0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
32 _ => None,
33 }
34 }
35}
36
37#[allow(clippy::ptr_arg)]
40pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
41 assert!(out.is_empty());
42
43 let Some(algo) = SupportedCompression::check(bytes) else {
44 return Ok(bytes);
45 };
46
47 feature_gated!("decompress", {
48 match algo {
49 SupportedCompression::GZIP => {
50 flate2::read::MultiGzDecoder::new(bytes)
51 .read_to_end(out)
52 .map_err(to_compute_err)?;
53 },
54 SupportedCompression::ZLIB => {
55 flate2::read::ZlibDecoder::new(bytes)
56 .read_to_end(out)
57 .map_err(to_compute_err)?;
58 },
59 SupportedCompression::ZSTD => {
60 zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
61 },
62 }
63
64 Ok(out)
65 })
66}
67
68pub enum CompressedReader {
73 Uncompressed {
74 slice: MemSlice,
75 offset: usize,
76 },
77 #[cfg(feature = "decompress")]
78 Gzip(flate2::bufread::MultiGzDecoder<MemReader>),
79 #[cfg(feature = "decompress")]
80 Zlib(flate2::bufread::ZlibDecoder<MemReader>),
81 #[cfg(feature = "decompress")]
82 Zstd(zstd::Decoder<'static, MemReader>),
83}
84
85impl CompressedReader {
86 pub fn try_new(slice: MemSlice) -> PolarsResult<Self> {
87 let algo = SupportedCompression::check(&slice);
88
89 Ok(match algo {
90 None => CompressedReader::Uncompressed { slice, offset: 0 },
91 #[cfg(feature = "decompress")]
92 Some(SupportedCompression::GZIP) => {
93 CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(MemReader::new(slice)))
94 },
95 #[cfg(feature = "decompress")]
96 Some(SupportedCompression::ZLIB) => {
97 CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(MemReader::new(slice)))
98 },
99 #[cfg(feature = "decompress")]
100 Some(SupportedCompression::ZSTD) => {
101 CompressedReader::Zstd(zstd::Decoder::with_buffer(MemReader::new(slice))?)
102 },
103 #[cfg(not(feature = "decompress"))]
104 _ => panic!("activate 'decompress' feature"),
105 })
106 }
107
108 pub fn is_compressed(&self) -> bool {
109 !matches!(&self, CompressedReader::Uncompressed { .. })
110 }
111
112 pub fn total_len_estimate(&self) -> usize {
115 const ESTIMATED_DEFLATE_RATIO: usize = 3;
116 const ESTIMATED_ZSTD_RATIO: usize = 5;
117
118 match self {
119 CompressedReader::Uncompressed { slice, .. } => slice.len(),
120 #[cfg(feature = "decompress")]
121 CompressedReader::Gzip(reader) => {
122 reader.get_ref().total_len() * ESTIMATED_DEFLATE_RATIO
123 },
124 #[cfg(feature = "decompress")]
125 CompressedReader::Zlib(reader) => {
126 reader.get_ref().total_len() * ESTIMATED_DEFLATE_RATIO
127 },
128 #[cfg(feature = "decompress")]
129 CompressedReader::Zstd(reader) => reader.get_ref().total_len() * ESTIMATED_ZSTD_RATIO,
130 }
131 }
132
133 pub fn read_next_slice(
148 &mut self,
149 prev_leftover: &MemSlice,
150 read_size: usize,
151 ) -> std::io::Result<(MemSlice, usize)> {
152 let prev_len = prev_leftover.len();
156
157 let mut buf = Vec::new();
158 if self.is_compressed() {
159 let reserve_size = if read_size == usize::MAX {
160 self.total_len_estimate()
161 } else {
162 prev_len.saturating_add(read_size)
163 };
164 buf.reserve_exact(reserve_size);
165 buf.extend_from_slice(prev_leftover);
166 }
167
168 let new_slice_from_read =
169 |read_n: usize, mut buf: Vec<u8>| -> std::io::Result<(MemSlice, usize)> {
170 buf.truncate(prev_len + read_n);
171 Ok((MemSlice::from_vec(buf), read_n))
172 };
173
174 match self {
175 CompressedReader::Uncompressed { slice, offset, .. } => {
176 let read_n = cmp::min(read_size, slice.len() - *offset);
177 let new_slice = slice.slice((*offset - prev_len)..(*offset + read_n));
178 *offset += read_n;
179 Ok((new_slice, read_n))
180 },
181 #[cfg(feature = "decompress")]
182 CompressedReader::Gzip(decoder) => {
183 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
184 },
185 #[cfg(feature = "decompress")]
186 CompressedReader::Zlib(decoder) => {
187 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
188 },
189 #[cfg(feature = "decompress")]
190 CompressedReader::Zstd(decoder) => {
191 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
192 },
193 }
194 }
195}