polars_io/utils/
compression.rs

1use std::cmp;
2use std::io::{Cursor, Read, Write};
3
4use polars_buffer::Buffer;
5use polars_core::prelude::*;
6use polars_error::{feature_gated, to_compute_err};
7
8use crate::utils::file::{Writeable, WriteableTrait};
9use crate::utils::sync_on_close::SyncOnCloseType;
10
11/// Represents the compression algorithms that we have decoders for
12#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
13pub enum SupportedCompression {
14    GZIP,
15    ZLIB,
16    ZSTD,
17}
18
19impl SupportedCompression {
20    /// If the given byte slice starts with the "magic" bytes for a supported compression family, return
21    /// that family, for unsupported/uncompressed slices, return None.
22    /// Based on <https://en.wikipedia.org/wiki/List_of_file_signatures>.
23    pub fn check(bytes: &[u8]) -> Option<Self> {
24        if bytes.len() < 4 {
25            // not enough bytes to perform prefix checks
26            return None;
27        }
28        match bytes[..4] {
29            [0x1f, 0x8b, _, _] => Some(Self::GZIP),
30            // Different zlib compression levels without preset dictionary.
31            [0x78, 0x01, _, _] => Some(Self::ZLIB),
32            [0x78, 0x5e, _, _] => Some(Self::ZLIB),
33            [0x78, 0x9c, _, _] => Some(Self::ZLIB),
34            [0x78, 0xda, _, _] => Some(Self::ZLIB),
35            [0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
36            _ => None,
37        }
38    }
39}
40
41/// Decompress `bytes` if compression is detected, otherwise simply return it.
42/// An `out` vec must be given for ownership of the decompressed data.
43#[allow(clippy::ptr_arg)]
44#[deprecated(note = "may cause OOM, use CompressedReader instead")]
45pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
46    assert!(out.is_empty());
47
48    let Some(algo) = SupportedCompression::check(bytes) else {
49        return Ok(bytes);
50    };
51
52    feature_gated!("decompress", {
53        match algo {
54            SupportedCompression::GZIP => {
55                flate2::read::MultiGzDecoder::new(bytes)
56                    .read_to_end(out)
57                    .map_err(to_compute_err)?;
58            },
59            SupportedCompression::ZLIB => {
60                flate2::read::ZlibDecoder::new(bytes)
61                    .read_to_end(out)
62                    .map_err(to_compute_err)?;
63            },
64            SupportedCompression::ZSTD => {
65                zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
66            },
67        }
68
69        Ok(out)
70    })
71}
72
73/// Reader that implements a streaming read trait for uncompressed, gzip, zlib and zstd
74/// compression.
75///
76/// This allows handling decompression transparently in a streaming fashion.
77pub enum CompressedReader {
78    Uncompressed {
79        slice: Buffer<u8>,
80        offset: usize,
81    },
82    #[cfg(feature = "decompress")]
83    Gzip(flate2::bufread::MultiGzDecoder<Cursor<Buffer<u8>>>),
84    #[cfg(feature = "decompress")]
85    Zlib(flate2::bufread::ZlibDecoder<Cursor<Buffer<u8>>>),
86    #[cfg(feature = "decompress")]
87    Zstd(zstd::Decoder<'static, Cursor<Buffer<u8>>>),
88}
89
90impl CompressedReader {
91    pub fn try_new(slice: Buffer<u8>) -> PolarsResult<Self> {
92        let algo = SupportedCompression::check(&slice);
93
94        Ok(match algo {
95            None => CompressedReader::Uncompressed { slice, offset: 0 },
96            #[cfg(feature = "decompress")]
97            Some(SupportedCompression::GZIP) => {
98                CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))
99            },
100            #[cfg(feature = "decompress")]
101            Some(SupportedCompression::ZLIB) => {
102                CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))
103            },
104            #[cfg(feature = "decompress")]
105            Some(SupportedCompression::ZSTD) => {
106                CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)
107            },
108            #[cfg(not(feature = "decompress"))]
109            _ => panic!("activate 'decompress' feature"),
110        })
111    }
112
113    pub fn is_compressed(&self) -> bool {
114        !matches!(&self, CompressedReader::Uncompressed { .. })
115    }
116
117    pub const fn initial_read_size() -> usize {
118        // We don't want to read too much at the beginning to keep decompression to a minimum if for
119        // example only the schema is needed or a slice op is used. Keep in sync with
120        // `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.
121        32 * 1024
122    }
123
124    pub const fn ideal_read_size() -> usize {
125        // Somewhat conservative guess for L2 size, which performs the best on most machines and is
126        // nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not
127        // recouped by amortizing the block processing cost even further.
128        //
129        // It's possible that callers use or need a larger `read_size` if for example a single row
130        // doesn't fit in the 512KB.
131        512 * 1024
132    }
133
134    /// If possible returns the total number of bytes that will be produced by reading from the
135    /// start to finish.
136    pub fn total_len_estimate(&self) -> usize {
137        const ESTIMATED_DEFLATE_RATIO: usize = 3;
138        const ESTIMATED_ZSTD_RATIO: usize = 5;
139
140        match self {
141            CompressedReader::Uncompressed { slice, .. } => slice.len(),
142            #[cfg(feature = "decompress")]
143            CompressedReader::Gzip(reader) => {
144                reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
145            },
146            #[cfg(feature = "decompress")]
147            CompressedReader::Zlib(reader) => {
148                reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
149            },
150            #[cfg(feature = "decompress")]
151            CompressedReader::Zstd(reader) => {
152                reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO
153            },
154        }
155    }
156
157    /// Reads exactly `read_size` bytes if possible from the internal readers and creates a new
158    /// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.
159    ///
160    /// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and
161    /// this function is called again.
162    ///
163    /// If the underlying reader is uncompressed the operation is a cheap zero-copy
164    /// [`Buffer::sliced`] operation.
165    ///
166    /// By handling slice concatenation at this level we can implement zero-copy reading *and* make
167    /// the interface easier to use.
168    ///
169    /// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this
170    /// function.
171    pub fn read_next_slice(
172        &mut self,
173        prev_leftover: &Buffer<u8>,
174        read_size: usize,
175    ) -> std::io::Result<(Buffer<u8>, usize)> {
176        // Assuming that callers of this function correctly handle re-trying, by continuously growing
177        // prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily
178        // sized rows.
179        let prev_len = prev_leftover.len();
180
181        let mut buf = Vec::new();
182        if self.is_compressed() {
183            let reserve_size = cmp::min(
184                prev_len.saturating_add(read_size),
185                self.total_len_estimate().saturating_mul(2),
186            );
187            buf.reserve_exact(reserve_size);
188            buf.extend_from_slice(prev_leftover);
189        }
190
191        let new_slice_from_read =
192            |bytes_read: usize, mut buf: Vec<u8>| -> std::io::Result<(Buffer<u8>, usize)> {
193                buf.truncate(prev_len + bytes_read);
194                Ok((Buffer::from_vec(buf), bytes_read))
195            };
196
197        match self {
198            CompressedReader::Uncompressed { slice, offset, .. } => {
199                let bytes_read = cmp::min(read_size, slice.len() - *offset);
200                let new_slice = slice
201                    .clone()
202                    .sliced(*offset - prev_len..*offset + bytes_read);
203                *offset += bytes_read;
204                Ok((new_slice, bytes_read))
205            },
206            #[cfg(feature = "decompress")]
207            CompressedReader::Gzip(decoder) => {
208                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
209            },
210            #[cfg(feature = "decompress")]
211            CompressedReader::Zlib(decoder) => {
212                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
213            },
214            #[cfg(feature = "decompress")]
215            CompressedReader::Zstd(decoder) => {
216                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
217            },
218        }
219    }
220}
221
222/// This implementation is meant for compatibility. Use [`Self::read_next_slice`] for best
223/// performance.
224impl Read for CompressedReader {
225    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
226        match self {
227            CompressedReader::Uncompressed { slice, offset, .. } => {
228                let bytes_read = cmp::min(buf.len(), slice.len() - *offset);
229                buf[..bytes_read].copy_from_slice(&slice[*offset..(*offset + bytes_read)]);
230                *offset += bytes_read;
231                Ok(bytes_read)
232            },
233            #[cfg(feature = "decompress")]
234            CompressedReader::Gzip(decoder) => decoder.read(buf),
235            #[cfg(feature = "decompress")]
236            CompressedReader::Zlib(decoder) => decoder.read(buf),
237            #[cfg(feature = "decompress")]
238            CompressedReader::Zstd(decoder) => decoder.read(buf),
239        }
240    }
241}
242
243/// Constructor for `WriteableTrait` compressed encoders.
244pub enum CompressedWriter {
245    #[cfg(feature = "decompress")]
246    Gzip(Option<flate2::write::GzEncoder<Writeable>>),
247    #[cfg(feature = "decompress")]
248    Zstd(Option<zstd::Encoder<'static, Writeable>>),
249}
250
251impl CompressedWriter {
252    pub fn gzip(writer: Writeable, level: Option<u32>) -> Self {
253        feature_gated!("decompress", {
254            Self::Gzip(Some(flate2::write::GzEncoder::new(
255                writer,
256                level.map(flate2::Compression::new).unwrap_or_default(),
257            )))
258        })
259    }
260
261    pub fn zstd(writer: Writeable, level: Option<u32>) -> std::io::Result<Self> {
262        feature_gated!("decompress", {
263            zstd::Encoder::new(writer, level.unwrap_or(3) as i32)
264                .map(Some)
265                .map(Self::Zstd)
266        })
267    }
268}
269
270impl Write for CompressedWriter {
271    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
272        feature_gated!("decompress", {
273            match self {
274                Self::Gzip(encoder) => encoder.as_mut().unwrap().write(buf),
275                Self::Zstd(encoder) => encoder.as_mut().unwrap().write(buf),
276            }
277        })
278    }
279
280    fn flush(&mut self) -> std::io::Result<()> {
281        feature_gated!("decompress", {
282            match self {
283                Self::Gzip(encoder) => encoder.as_mut().unwrap().flush(),
284                Self::Zstd(encoder) => encoder.as_mut().unwrap().flush(),
285            }
286        })
287    }
288}
289
290impl WriteableTrait for CompressedWriter {
291    fn close(&mut self) -> std::io::Result<()> {
292        feature_gated!("decompress", {
293            let writer = match self {
294                Self::Gzip(encoder) => encoder.take().unwrap().finish()?,
295                Self::Zstd(encoder) => encoder.take().unwrap().finish()?,
296            };
297
298            writer.close(SyncOnCloseType::All)
299        })
300    }
301
302    fn sync_all(&self) -> std::io::Result<()> {
303        feature_gated!("decompress", {
304            match self {
305                Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
306                Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
307            }
308        })
309    }
310
311    fn sync_data(&self) -> std::io::Result<()> {
312        feature_gated!("decompress", {
313            match self {
314                Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
315                Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
316            }
317        })
318    }
319}