polars_io/utils/compression.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
use std::io::Read;
use polars_core::prelude::*;
use polars_error::to_compute_err;
/// Represents the compression algorithms that we have decoders for
pub enum SupportedCompression {
GZIP,
ZLIB,
ZSTD,
}
impl SupportedCompression {
/// If the given byte slice starts with the "magic" bytes for a supported compression family, return
/// that family, for unsupported/uncompressed slices, return None
pub fn check(bytes: &[u8]) -> Option<Self> {
if bytes.len() < 4 {
// not enough bytes to perform prefix checks
return None;
}
match bytes[..4] {
[31, 139, _, _] => Some(Self::GZIP),
[0x78, 0x01, _, _] | // ZLIB0
[0x78, 0x9C, _, _] | // ZLIB1
[0x78, 0xDA, _, _] // ZLIB2
=> Some(Self::ZLIB),
[0x28, 0xB5, 0x2F, 0xFD] => Some(Self::ZSTD),
_ => None,
}
}
}
/// Decompress `bytes` if compression is detected, otherwise simply return it.
/// An `out` vec must be given for ownership of the decompressed data.
pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
assert!(out.is_empty());
if let Some(algo) = SupportedCompression::check(bytes) {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
{
match algo {
SupportedCompression::GZIP => {
flate2::read::MultiGzDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
},
SupportedCompression::ZLIB => {
flate2::read::ZlibDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
},
SupportedCompression::ZSTD => {
zstd::Decoder::new(bytes)?.read_to_end(out)?;
},
}
Ok(out)
}
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
{
panic!("cannot decompress without 'decompress' or 'decompress-fast' feature")
}
} else {
Ok(bytes)
}
}