polars_io/utils/
compression.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use std::io::Read;

use polars_core::prelude::*;
use polars_error::to_compute_err;

/// Represents the compression algorithms that we have decoders for
pub enum SupportedCompression {
    GZIP,
    ZLIB,
    ZSTD,
}

impl SupportedCompression {
    /// If the given byte slice starts with the "magic" bytes for a supported compression family, return
    /// that family, for unsupported/uncompressed slices, return None
    pub fn check(bytes: &[u8]) -> Option<Self> {
        if bytes.len() < 4 {
            // not enough bytes to perform prefix checks
            return None;
        }
        match bytes[..4] {
            [31, 139, _, _]          => Some(Self::GZIP),
            [0x78, 0x01, _, _] | // ZLIB0
            [0x78, 0x9C, _, _] | // ZLIB1
            [0x78, 0xDA, _, _]   // ZLIB2
                                     => Some(Self::ZLIB),
            [0x28, 0xB5, 0x2F, 0xFD] => Some(Self::ZSTD),
            _ => None,
        }
    }
}

/// Decompress `bytes` if compression is detected, otherwise simply return it.
/// An `out` vec must be given for ownership of the decompressed data.
pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
    assert!(out.is_empty());

    if let Some(algo) = SupportedCompression::check(bytes) {
        #[cfg(any(feature = "decompress", feature = "decompress-fast"))]
        {
            match algo {
                SupportedCompression::GZIP => {
                    flate2::read::MultiGzDecoder::new(bytes)
                        .read_to_end(out)
                        .map_err(to_compute_err)?;
                },
                SupportedCompression::ZLIB => {
                    flate2::read::ZlibDecoder::new(bytes)
                        .read_to_end(out)
                        .map_err(to_compute_err)?;
                },
                SupportedCompression::ZSTD => {
                    zstd::Decoder::new(bytes)?.read_to_end(out)?;
                },
            }

            Ok(out)
        }
        #[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
        {
            panic!("cannot decompress without 'decompress' or 'decompress-fast' feature")
        }
    } else {
        Ok(bytes)
    }
}