polars_io/utils/
compression.rs

1use std::cmp;
2use std::io::Read;
3
4use polars_core::prelude::*;
5use polars_error::{feature_gated, to_compute_err};
6use polars_utils::mmap::{MemReader, MemSlice};
7
8/// Represents the compression algorithms that we have decoders for
9pub enum SupportedCompression {
10    GZIP,
11    ZLIB,
12    ZSTD,
13}
14
15impl SupportedCompression {
16    /// If the given byte slice starts with the "magic" bytes for a supported compression family, return
17    /// that family, for unsupported/uncompressed slices, return None.
18    /// Based on <https://en.wikipedia.org/wiki/List_of_file_signatures>.
19    pub fn check(bytes: &[u8]) -> Option<Self> {
20        if bytes.len() < 4 {
21            // not enough bytes to perform prefix checks
22            return None;
23        }
24        match bytes[..4] {
25            [0x1f, 0x8b, _, _] => Some(Self::GZIP),
26            // Different zlib compression levels without preset dictionary.
27            [0x78, 0x01, _, _] => Some(Self::ZLIB),
28            [0x78, 0x5e, _, _] => Some(Self::ZLIB),
29            [0x78, 0x9c, _, _] => Some(Self::ZLIB),
30            [0x78, 0xda, _, _] => Some(Self::ZLIB),
31            [0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
32            _ => None,
33        }
34    }
35}
36
37/// Decompress `bytes` if compression is detected, otherwise simply return it.
38/// An `out` vec must be given for ownership of the decompressed data.
39#[allow(clippy::ptr_arg)]
40pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
41    assert!(out.is_empty());
42
43    let Some(algo) = SupportedCompression::check(bytes) else {
44        return Ok(bytes);
45    };
46
47    feature_gated!("decompress", {
48        match algo {
49            SupportedCompression::GZIP => {
50                flate2::read::MultiGzDecoder::new(bytes)
51                    .read_to_end(out)
52                    .map_err(to_compute_err)?;
53            },
54            SupportedCompression::ZLIB => {
55                flate2::read::ZlibDecoder::new(bytes)
56                    .read_to_end(out)
57                    .map_err(to_compute_err)?;
58            },
59            SupportedCompression::ZSTD => {
60                zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
61            },
62        }
63
64        Ok(out)
65    })
66}
67
68/// Reader that implements a streaming read trait for uncompressed, gzip, zlib and zstd
69/// compression.
70///
71/// This allows handling decompression transparently in a streaming fashion.
72pub enum CompressedReader {
73    Uncompressed {
74        slice: MemSlice,
75        offset: usize,
76    },
77    #[cfg(feature = "decompress")]
78    Gzip(flate2::bufread::MultiGzDecoder<MemReader>),
79    #[cfg(feature = "decompress")]
80    Zlib(flate2::bufread::ZlibDecoder<MemReader>),
81    #[cfg(feature = "decompress")]
82    Zstd(zstd::Decoder<'static, MemReader>),
83}
84
85impl CompressedReader {
86    pub fn try_new(slice: MemSlice) -> PolarsResult<Self> {
87        let algo = SupportedCompression::check(&slice);
88
89        Ok(match algo {
90            None => CompressedReader::Uncompressed { slice, offset: 0 },
91            #[cfg(feature = "decompress")]
92            Some(SupportedCompression::GZIP) => {
93                CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(MemReader::new(slice)))
94            },
95            #[cfg(feature = "decompress")]
96            Some(SupportedCompression::ZLIB) => {
97                CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(MemReader::new(slice)))
98            },
99            #[cfg(feature = "decompress")]
100            Some(SupportedCompression::ZSTD) => {
101                CompressedReader::Zstd(zstd::Decoder::with_buffer(MemReader::new(slice))?)
102            },
103            #[cfg(not(feature = "decompress"))]
104            _ => panic!("activate 'decompress' feature"),
105        })
106    }
107
108    pub fn is_compressed(&self) -> bool {
109        !matches!(&self, CompressedReader::Uncompressed { .. })
110    }
111
112    /// If possible returns the total number of bytes that will be produced by reading from the
113    /// start to finish.
114    pub fn total_len_estimate(&self) -> usize {
115        const ESTIMATED_DEFLATE_RATIO: usize = 3;
116        const ESTIMATED_ZSTD_RATIO: usize = 5;
117
118        match self {
119            CompressedReader::Uncompressed { slice, .. } => slice.len(),
120            #[cfg(feature = "decompress")]
121            CompressedReader::Gzip(reader) => {
122                reader.get_ref().total_len() * ESTIMATED_DEFLATE_RATIO
123            },
124            #[cfg(feature = "decompress")]
125            CompressedReader::Zlib(reader) => {
126                reader.get_ref().total_len() * ESTIMATED_DEFLATE_RATIO
127            },
128            #[cfg(feature = "decompress")]
129            CompressedReader::Zstd(reader) => reader.get_ref().total_len() * ESTIMATED_ZSTD_RATIO,
130        }
131    }
132
133    /// Reads exactly `read_size` bytes if possible from the internal readers and creates a new
134    /// [`MemSlice`] with the content `concat(prev_leftover, new_bytes)`.
135    ///
136    /// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and
137    /// this function is called again.
138    ///
139    /// If the underlying reader is uncompressed the operation is a cheap zero-copy
140    /// [`MemSlice::slice`] operation.
141    ///
142    /// By handling slice concatenation at this level we can implement zero-copy reading *and* make
143    /// the interface easier to use.
144    ///
145    /// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this
146    /// function.
147    pub fn read_next_slice(
148        &mut self,
149        prev_leftover: &MemSlice,
150        read_size: usize,
151    ) -> std::io::Result<(MemSlice, usize)> {
152        // Assuming that callers of this function correctly handle re-trying, by continuously growing
153        // prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily
154        // sized rows.
155        let prev_len = prev_leftover.len();
156
157        let mut buf = Vec::new();
158        if self.is_compressed() {
159            let reserve_size = if read_size == usize::MAX {
160                self.total_len_estimate()
161            } else {
162                prev_len.saturating_add(read_size)
163            };
164            buf.reserve_exact(reserve_size);
165            buf.extend_from_slice(prev_leftover);
166        }
167
168        let new_slice_from_read =
169            |read_n: usize, mut buf: Vec<u8>| -> std::io::Result<(MemSlice, usize)> {
170                buf.truncate(prev_len + read_n);
171                Ok((MemSlice::from_vec(buf), read_n))
172            };
173
174        match self {
175            CompressedReader::Uncompressed { slice, offset, .. } => {
176                let read_n = cmp::min(read_size, slice.len() - *offset);
177                let new_slice = slice.slice((*offset - prev_len)..(*offset + read_n));
178                *offset += read_n;
179                Ok((new_slice, read_n))
180            },
181            #[cfg(feature = "decompress")]
182            CompressedReader::Gzip(decoder) => {
183                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
184            },
185            #[cfg(feature = "decompress")]
186            CompressedReader::Zlib(decoder) => {
187                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
188            },
189            #[cfg(feature = "decompress")]
190            CompressedReader::Zstd(decoder) => {
191                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
192            },
193        }
194    }
195}