Skip to main content

polars_io/utils/
compression.rs

1use std::cmp;
2use std::io::{BufRead, Cursor, Read, Write};
3
4use polars_buffer::Buffer;
5use polars_core::prelude::*;
6use polars_error::{feature_gated, to_compute_err};
7
8use crate::utils::file::{Writeable, WriteableTrait};
9use crate::utils::stream_buf_reader::ReaderSource;
10use crate::utils::sync_on_close::SyncOnCloseType;
11
12/// Represents the compression algorithms that we have decoders for
13#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
14pub enum SupportedCompression {
15    GZIP,
16    ZLIB,
17    ZSTD,
18}
19
20impl SupportedCompression {
21    /// If the given byte slice starts with the "magic" bytes for a supported compression family, return
22    /// that family, for unsupported/uncompressed slices, return None.
23    /// Based on <https://en.wikipedia.org/wiki/List_of_file_signatures>.
24    pub fn check(bytes: &[u8]) -> Option<Self> {
25        if bytes.len() < 4 {
26            // not enough bytes to perform prefix checks
27            return None;
28        }
29        match bytes[..4] {
30            [0x1f, 0x8b, _, _] => Some(Self::GZIP),
31            // Different zlib compression levels without preset dictionary.
32            [0x78, 0x01, _, _] => Some(Self::ZLIB),
33            [0x78, 0x5e, _, _] => Some(Self::ZLIB),
34            [0x78, 0x9c, _, _] => Some(Self::ZLIB),
35            [0x78, 0xda, _, _] => Some(Self::ZLIB),
36            [0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
37            _ => None,
38        }
39    }
40}
41
42/// Decompress `bytes` if compression is detected, otherwise simply return it.
43/// An `out` vec must be given for ownership of the decompressed data.
44#[allow(clippy::ptr_arg)]
45#[deprecated(note = "may cause OOM, use CompressedReader instead")]
46pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
47    assert!(out.is_empty());
48
49    let Some(algo) = SupportedCompression::check(bytes) else {
50        return Ok(bytes);
51    };
52
53    feature_gated!("decompress", {
54        match algo {
55            SupportedCompression::GZIP => {
56                flate2::read::MultiGzDecoder::new(bytes)
57                    .read_to_end(out)
58                    .map_err(to_compute_err)?;
59            },
60            SupportedCompression::ZLIB => {
61                flate2::read::ZlibDecoder::new(bytes)
62                    .read_to_end(out)
63                    .map_err(to_compute_err)?;
64            },
65            SupportedCompression::ZSTD => {
66                zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
67            },
68        }
69
70        Ok(out)
71    })
72}
73
74/// Reader that implements a streaming read trait for uncompressed, gzip, zlib and zstd
75/// compression.
76///
77/// This allows handling decompression transparently in a streaming fashion.
78pub enum CompressedReader {
79    Uncompressed {
80        slice: Buffer<u8>,
81        offset: usize,
82    },
83    #[cfg(feature = "decompress")]
84    Gzip(flate2::bufread::MultiGzDecoder<Cursor<Buffer<u8>>>),
85    #[cfg(feature = "decompress")]
86    Zlib(flate2::bufread::ZlibDecoder<Cursor<Buffer<u8>>>),
87    #[cfg(feature = "decompress")]
88    Zstd(zstd::Decoder<'static, Cursor<Buffer<u8>>>),
89}
90
91impl CompressedReader {
92    pub fn try_new(slice: Buffer<u8>) -> PolarsResult<Self> {
93        let algo = SupportedCompression::check(&slice);
94
95        Ok(match algo {
96            None => CompressedReader::Uncompressed { slice, offset: 0 },
97            #[cfg(feature = "decompress")]
98            Some(SupportedCompression::GZIP) => {
99                CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))
100            },
101            #[cfg(feature = "decompress")]
102            Some(SupportedCompression::ZLIB) => {
103                CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))
104            },
105            #[cfg(feature = "decompress")]
106            Some(SupportedCompression::ZSTD) => {
107                CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)
108            },
109            #[cfg(not(feature = "decompress"))]
110            _ => panic!("activate 'decompress' feature"),
111        })
112    }
113
114    pub fn is_compressed(&self) -> bool {
115        !matches!(&self, CompressedReader::Uncompressed { .. })
116    }
117
118    pub const fn initial_read_size() -> usize {
119        // We don't want to read too much at the beginning to keep decompression to a minimum if for
120        // example only the schema is needed or a slice op is used. Keep in sync with
121        // `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.
122        32 * 1024
123    }
124
125    pub const fn ideal_read_size() -> usize {
126        // Somewhat conservative guess for L2 size, which performs the best on most machines and is
127        // nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not
128        // recouped by amortizing the block processing cost even further.
129        //
130        // It's possible that callers use or need a larger `read_size` if for example a single row
131        // doesn't fit in the 512KB.
132        512 * 1024
133    }
134
135    /// If possible returns the total number of bytes that will be produced by reading from the
136    /// start to finish.
137    pub fn total_len_estimate(&self) -> usize {
138        const ESTIMATED_DEFLATE_RATIO: usize = 3;
139        const ESTIMATED_ZSTD_RATIO: usize = 5;
140
141        match self {
142            CompressedReader::Uncompressed { slice, .. } => slice.len(),
143            #[cfg(feature = "decompress")]
144            CompressedReader::Gzip(reader) => {
145                reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
146            },
147            #[cfg(feature = "decompress")]
148            CompressedReader::Zlib(reader) => {
149                reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
150            },
151            #[cfg(feature = "decompress")]
152            CompressedReader::Zstd(reader) => {
153                reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO
154            },
155        }
156    }
157
158    /// Reads exactly `read_size` bytes if possible from the internal readers and creates a new
159    /// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.
160    ///
161    /// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and
162    /// this function is called again.
163    ///
164    /// If the underlying reader is uncompressed the operation is a cheap zero-copy
165    /// [`Buffer::sliced`] operation.
166    ///
167    /// By handling slice concatenation at this level we can implement zero-copy reading *and* make
168    /// the interface easier to use.
169    ///
170    /// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this
171    /// function.
172    pub fn read_next_slice(
173        &mut self,
174        prev_leftover: &Buffer<u8>,
175        read_size: usize,
176    ) -> std::io::Result<(Buffer<u8>, usize)> {
177        // Assuming that callers of this function correctly handle re-trying, by continuously growing
178        // prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily
179        // sized rows.
180        let prev_len = prev_leftover.len();
181
182        let mut buf = Vec::new();
183        if self.is_compressed() {
184            let reserve_size = cmp::min(
185                prev_len.saturating_add(read_size),
186                self.total_len_estimate().saturating_mul(2),
187            );
188            buf.reserve_exact(reserve_size);
189            buf.extend_from_slice(prev_leftover);
190        }
191
192        let new_slice_from_read =
193            |bytes_read: usize, mut buf: Vec<u8>| -> std::io::Result<(Buffer<u8>, usize)> {
194                buf.truncate(prev_len + bytes_read);
195                Ok((Buffer::from_vec(buf), bytes_read))
196            };
197
198        match self {
199            CompressedReader::Uncompressed { slice, offset, .. } => {
200                let bytes_read = cmp::min(read_size, slice.len() - *offset);
201                let new_slice = slice
202                    .clone()
203                    .sliced(*offset - prev_len..*offset + bytes_read);
204                *offset += bytes_read;
205                Ok((new_slice, bytes_read))
206            },
207            #[cfg(feature = "decompress")]
208            CompressedReader::Gzip(decoder) => {
209                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
210            },
211            #[cfg(feature = "decompress")]
212            CompressedReader::Zlib(decoder) => {
213                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
214            },
215            #[cfg(feature = "decompress")]
216            CompressedReader::Zstd(decoder) => {
217                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
218            },
219        }
220    }
221}
222
223/// This implementation is meant for compatibility. Use [`Self::read_next_slice`] for best
224/// performance.
225impl Read for CompressedReader {
226    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
227        match self {
228            CompressedReader::Uncompressed { slice, offset, .. } => {
229                let bytes_read = cmp::min(buf.len(), slice.len() - *offset);
230                buf[..bytes_read].copy_from_slice(&slice[*offset..(*offset + bytes_read)]);
231                *offset += bytes_read;
232                Ok(bytes_read)
233            },
234            #[cfg(feature = "decompress")]
235            CompressedReader::Gzip(decoder) => decoder.read(buf),
236            #[cfg(feature = "decompress")]
237            CompressedReader::Zlib(decoder) => decoder.read(buf),
238            #[cfg(feature = "decompress")]
239            CompressedReader::Zstd(decoder) => decoder.read(buf),
240        }
241    }
242}
243
244/// A byte source that abstracts over in-memory buffers and streaming
245/// readers, with optional transparent decompression and buffering.
246///
247/// Implements `BufRead`, allowing uniform access regardless of whether
248/// the underlying data is an in-memory slice, a raw stream, or a
249/// compressed stream (gzip/zlib/zstd).
250///
251/// This is the generic successor to [`CompressedReader`], which only
252/// supports in-memory (`Buffer<u8>`) sources.
253pub enum ByteSourceReader<R: BufRead> {
254    UncompressedMemory {
255        slice: Buffer<u8>,
256        offset: usize,
257    },
258    UncompressedStream(R),
259    #[cfg(feature = "decompress")]
260    Gzip(flate2::bufread::MultiGzDecoder<R>),
261    #[cfg(feature = "decompress")]
262    Zlib(flate2::bufread::ZlibDecoder<R>),
263    #[cfg(feature = "decompress")]
264    Zstd(zstd::Decoder<'static, R>),
265}
266
267impl<R: BufRead> ByteSourceReader<R> {
268    pub fn try_new(reader: R, compression: Option<SupportedCompression>) -> PolarsResult<Self> {
269        Ok(match compression {
270            None => Self::UncompressedStream(reader),
271            #[cfg(feature = "decompress")]
272            Some(SupportedCompression::GZIP) => {
273                Self::Gzip(flate2::bufread::MultiGzDecoder::new(reader))
274            },
275            #[cfg(feature = "decompress")]
276            Some(SupportedCompression::ZLIB) => {
277                Self::Zlib(flate2::bufread::ZlibDecoder::new(reader))
278            },
279            #[cfg(feature = "decompress")]
280            Some(SupportedCompression::ZSTD) => Self::Zstd(zstd::Decoder::with_buffer(reader)?),
281            #[cfg(not(feature = "decompress"))]
282            _ => panic!("activate 'decompress' feature"),
283        })
284    }
285
286    pub fn is_compressed(&self) -> bool {
287        !matches!(
288            &self,
289            Self::UncompressedMemory { .. } | Self::UncompressedStream(_)
290        )
291    }
292
293    pub fn compression(&self) -> Option<SupportedCompression> {
294        match self {
295            Self::UncompressedMemory { .. } => None,
296            Self::UncompressedStream(_) => None,
297            #[cfg(feature = "decompress")]
298            Self::Gzip(_) => Some(SupportedCompression::GZIP),
299            #[cfg(feature = "decompress")]
300            Self::Zlib(_) => Some(SupportedCompression::ZLIB),
301            #[cfg(feature = "decompress")]
302            Self::Zstd(_) => Some(SupportedCompression::ZSTD),
303        }
304    }
305
306    pub const fn initial_read_size() -> usize {
307        // We don't want to read too much at the beginning to keep decompression to a minimum if for
308        // example only the schema is needed or a slice op is used. Keep in sync with
309        // `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.
310        32 * 1024
311    }
312
313    pub const fn ideal_read_size() -> usize {
314        // Somewhat conservative guess for L2 size, which performs the best on most machines and is
315        // nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not
316        // recouped by amortizing the block processing cost even further.
317        //
318        // It's possible that callers use or need a larger `read_size` if for example a single row
319        // doesn't fit in the 512KB.
320        512 * 1024
321    }
322
323    /// Reads exactly `read_size` bytes if possible from the internal readers and creates a new
324    /// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.
325    ///
326    /// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and
327    /// this function is called again.
328    ///
329    /// If the underlying reader is uncompressed the operation is a cheap zero-copy
330    /// [`Buffer::sliced`] operation.
331    ///
332    /// By handling slice concatenation at this level we can implement zero-copy reading *and* make
333    /// the interface easier to use.
334    ///
335    /// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this
336    /// function.
337    pub fn read_next_slice(
338        &mut self,
339        prev_leftover: &Buffer<u8>,
340        read_size: usize,
341        uncompressed_size_hint: Option<usize>,
342    ) -> std::io::Result<(Buffer<u8>, usize)> {
343        // Assuming that callers of this function correctly handle re-trying, by continuously growing
344        // prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily
345        // sized rows.
346        let prev_len = prev_leftover.len();
347
348        let reader: &mut dyn Read = match self {
349            // Zero-copy fast-path — no allocation required
350            Self::UncompressedMemory { slice, offset } => {
351                let bytes_read = cmp::min(read_size, slice.len() - *offset);
352                let new_slice = slice
353                    .clone()
354                    .sliced(*offset - prev_len..*offset + bytes_read);
355                *offset += bytes_read;
356                return Ok((new_slice, bytes_read));
357            },
358            Self::UncompressedStream(reader) => reader,
359            #[cfg(feature = "decompress")]
360            Self::Gzip(reader) => reader,
361            #[cfg(feature = "decompress")]
362            Self::Zlib(reader) => reader,
363            #[cfg(feature = "decompress")]
364            Self::Zstd(reader) => reader,
365        };
366
367        let mut buf = Vec::new();
368
369        // Cap the reserve_size, for the scenario where read_size == usize::MAX
370        let max_reserve_size = uncompressed_size_hint.unwrap_or(4 * 1024 * 1024);
371        let reserve_size = cmp::min(prev_len.saturating_add(read_size), max_reserve_size);
372        buf.reserve_exact(reserve_size);
373        buf.extend_from_slice(prev_leftover);
374
375        let bytes_read = reader.take(read_size as u64).read_to_end(&mut buf)?;
376        buf.truncate(prev_len + bytes_read);
377        Ok((Buffer::from_vec(buf), bytes_read))
378    }
379}
380
381impl ByteSourceReader<ReaderSource> {
382    pub fn from_memory(slice: Buffer<u8>) -> PolarsResult<Self> {
383        let compression = SupportedCompression::check(&slice);
384        match compression {
385            None => Ok(Self::UncompressedMemory { slice, offset: 0 }),
386            _ => Self::try_new(ReaderSource::Memory(Cursor::new(slice)), compression),
387        }
388    }
389}
390
391/// Constructor for `WriteableTrait` compressed encoders.
392pub enum CompressedWriter {
393    #[cfg(feature = "decompress")]
394    Gzip(Option<flate2::write::GzEncoder<Writeable>>),
395    #[cfg(feature = "decompress")]
396    Zstd(Option<zstd::Encoder<'static, Writeable>>),
397}
398
399impl CompressedWriter {
400    pub fn gzip(writer: Writeable, level: Option<u32>) -> Self {
401        feature_gated!("decompress", {
402            Self::Gzip(Some(flate2::write::GzEncoder::new(
403                writer,
404                level.map(flate2::Compression::new).unwrap_or_default(),
405            )))
406        })
407    }
408
409    pub fn zstd(writer: Writeable, level: Option<u32>) -> std::io::Result<Self> {
410        feature_gated!("decompress", {
411            zstd::Encoder::new(writer, level.unwrap_or(3) as i32)
412                .map(Some)
413                .map(Self::Zstd)
414        })
415    }
416}
417
418impl Write for CompressedWriter {
419    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
420        feature_gated!("decompress", {
421            match self {
422                Self::Gzip(encoder) => encoder.as_mut().unwrap().write(buf),
423                Self::Zstd(encoder) => encoder.as_mut().unwrap().write(buf),
424            }
425        })
426    }
427
428    fn flush(&mut self) -> std::io::Result<()> {
429        feature_gated!("decompress", {
430            match self {
431                Self::Gzip(encoder) => encoder.as_mut().unwrap().flush(),
432                Self::Zstd(encoder) => encoder.as_mut().unwrap().flush(),
433            }
434        })
435    }
436}
437
438impl WriteableTrait for CompressedWriter {
439    fn close(&mut self) -> std::io::Result<()> {
440        feature_gated!("decompress", {
441            let writer = match self {
442                Self::Gzip(encoder) => encoder.take().unwrap().finish()?,
443                Self::Zstd(encoder) => encoder.take().unwrap().finish()?,
444            };
445
446            writer.close(SyncOnCloseType::All)
447        })
448    }
449
450    fn sync_all(&self) -> std::io::Result<()> {
451        feature_gated!("decompress", {
452            match self {
453                Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
454                Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
455            }
456        })
457    }
458
459    fn sync_data(&self) -> std::io::Result<()> {
460        feature_gated!("decompress", {
461            match self {
462                Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
463                Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
464            }
465        })
466    }
467}