polars_io/utils/
compression.rs

1use std::cmp;
2use std::io::{BufRead, Cursor, Read, Write};
3
4use polars_buffer::Buffer;
5use polars_core::prelude::*;
6use polars_error::{feature_gated, to_compute_err};
7
8use crate::utils::file::{Writeable, WriteableTrait};
9#[cfg(feature = "async")]
10use crate::utils::stream_buf_reader::ReaderSource;
11use crate::utils::sync_on_close::SyncOnCloseType;
12
13/// Represents the compression algorithms that we have decoders for
14#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
15pub enum SupportedCompression {
16    GZIP,
17    ZLIB,
18    ZSTD,
19}
20
21impl SupportedCompression {
22    /// If the given byte slice starts with the "magic" bytes for a supported compression family, return
23    /// that family, for unsupported/uncompressed slices, return None.
24    /// Based on <https://en.wikipedia.org/wiki/List_of_file_signatures>.
25    pub fn check(bytes: &[u8]) -> Option<Self> {
26        if bytes.len() < 4 {
27            // not enough bytes to perform prefix checks
28            return None;
29        }
30        match bytes[..4] {
31            [0x1f, 0x8b, _, _] => Some(Self::GZIP),
32            // Different zlib compression levels without preset dictionary.
33            [0x78, 0x01, _, _] => Some(Self::ZLIB),
34            [0x78, 0x5e, _, _] => Some(Self::ZLIB),
35            [0x78, 0x9c, _, _] => Some(Self::ZLIB),
36            [0x78, 0xda, _, _] => Some(Self::ZLIB),
37            [0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
38            _ => None,
39        }
40    }
41}
42
43/// Decompress `bytes` if compression is detected, otherwise simply return it.
44/// An `out` vec must be given for ownership of the decompressed data.
45#[allow(clippy::ptr_arg)]
46#[deprecated(note = "may cause OOM, use CompressedReader instead")]
47pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
48    assert!(out.is_empty());
49
50    let Some(algo) = SupportedCompression::check(bytes) else {
51        return Ok(bytes);
52    };
53
54    feature_gated!("decompress", {
55        match algo {
56            SupportedCompression::GZIP => {
57                flate2::read::MultiGzDecoder::new(bytes)
58                    .read_to_end(out)
59                    .map_err(to_compute_err)?;
60            },
61            SupportedCompression::ZLIB => {
62                flate2::read::ZlibDecoder::new(bytes)
63                    .read_to_end(out)
64                    .map_err(to_compute_err)?;
65            },
66            SupportedCompression::ZSTD => {
67                zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
68            },
69        }
70
71        Ok(out)
72    })
73}
74
75/// Reader that implements a streaming read trait for uncompressed, gzip, zlib and zstd
76/// compression.
77///
78/// This allows handling decompression transparently in a streaming fashion.
79pub enum CompressedReader {
80    Uncompressed {
81        slice: Buffer<u8>,
82        offset: usize,
83    },
84    #[cfg(feature = "decompress")]
85    Gzip(flate2::bufread::MultiGzDecoder<Cursor<Buffer<u8>>>),
86    #[cfg(feature = "decompress")]
87    Zlib(flate2::bufread::ZlibDecoder<Cursor<Buffer<u8>>>),
88    #[cfg(feature = "decompress")]
89    Zstd(zstd::Decoder<'static, Cursor<Buffer<u8>>>),
90}
91
92impl CompressedReader {
93    pub fn try_new(slice: Buffer<u8>) -> PolarsResult<Self> {
94        let algo = SupportedCompression::check(&slice);
95
96        Ok(match algo {
97            None => CompressedReader::Uncompressed { slice, offset: 0 },
98            #[cfg(feature = "decompress")]
99            Some(SupportedCompression::GZIP) => {
100                CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))
101            },
102            #[cfg(feature = "decompress")]
103            Some(SupportedCompression::ZLIB) => {
104                CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))
105            },
106            #[cfg(feature = "decompress")]
107            Some(SupportedCompression::ZSTD) => {
108                CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)
109            },
110            #[cfg(not(feature = "decompress"))]
111            _ => panic!("activate 'decompress' feature"),
112        })
113    }
114
115    pub fn is_compressed(&self) -> bool {
116        !matches!(&self, CompressedReader::Uncompressed { .. })
117    }
118
119    pub const fn initial_read_size() -> usize {
120        // We don't want to read too much at the beginning to keep decompression to a minimum if for
121        // example only the schema is needed or a slice op is used. Keep in sync with
122        // `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.
123        32 * 1024
124    }
125
126    pub const fn ideal_read_size() -> usize {
127        // Somewhat conservative guess for L2 size, which performs the best on most machines and is
128        // nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not
129        // recouped by amortizing the block processing cost even further.
130        //
131        // It's possible that callers use or need a larger `read_size` if for example a single row
132        // doesn't fit in the 512KB.
133        512 * 1024
134    }
135
136    /// If possible returns the total number of bytes that will be produced by reading from the
137    /// start to finish.
138    pub fn total_len_estimate(&self) -> usize {
139        const ESTIMATED_DEFLATE_RATIO: usize = 3;
140        const ESTIMATED_ZSTD_RATIO: usize = 5;
141
142        match self {
143            CompressedReader::Uncompressed { slice, .. } => slice.len(),
144            #[cfg(feature = "decompress")]
145            CompressedReader::Gzip(reader) => {
146                reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
147            },
148            #[cfg(feature = "decompress")]
149            CompressedReader::Zlib(reader) => {
150                reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
151            },
152            #[cfg(feature = "decompress")]
153            CompressedReader::Zstd(reader) => {
154                reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO
155            },
156        }
157    }
158
159    /// Reads exactly `read_size` bytes if possible from the internal readers and creates a new
160    /// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.
161    ///
162    /// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and
163    /// this function is called again.
164    ///
165    /// If the underlying reader is uncompressed the operation is a cheap zero-copy
166    /// [`Buffer::sliced`] operation.
167    ///
168    /// By handling slice concatenation at this level we can implement zero-copy reading *and* make
169    /// the interface easier to use.
170    ///
171    /// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this
172    /// function.
173    pub fn read_next_slice(
174        &mut self,
175        prev_leftover: &Buffer<u8>,
176        read_size: usize,
177    ) -> std::io::Result<(Buffer<u8>, usize)> {
178        // Assuming that callers of this function correctly handle re-trying, by continuously growing
179        // prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily
180        // sized rows.
181        let prev_len = prev_leftover.len();
182
183        let mut buf = Vec::new();
184        if self.is_compressed() {
185            let reserve_size = cmp::min(
186                prev_len.saturating_add(read_size),
187                self.total_len_estimate().saturating_mul(2),
188            );
189            buf.reserve_exact(reserve_size);
190            buf.extend_from_slice(prev_leftover);
191        }
192
193        let new_slice_from_read =
194            |bytes_read: usize, mut buf: Vec<u8>| -> std::io::Result<(Buffer<u8>, usize)> {
195                buf.truncate(prev_len + bytes_read);
196                Ok((Buffer::from_vec(buf), bytes_read))
197            };
198
199        match self {
200            CompressedReader::Uncompressed { slice, offset, .. } => {
201                let bytes_read = cmp::min(read_size, slice.len() - *offset);
202                let new_slice = slice
203                    .clone()
204                    .sliced(*offset - prev_len..*offset + bytes_read);
205                *offset += bytes_read;
206                Ok((new_slice, bytes_read))
207            },
208            #[cfg(feature = "decompress")]
209            CompressedReader::Gzip(decoder) => {
210                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
211            },
212            #[cfg(feature = "decompress")]
213            CompressedReader::Zlib(decoder) => {
214                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
215            },
216            #[cfg(feature = "decompress")]
217            CompressedReader::Zstd(decoder) => {
218                new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
219            },
220        }
221    }
222}
223
224/// This implementation is meant for compatibility. Use [`Self::read_next_slice`] for best
225/// performance.
226impl Read for CompressedReader {
227    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
228        match self {
229            CompressedReader::Uncompressed { slice, offset, .. } => {
230                let bytes_read = cmp::min(buf.len(), slice.len() - *offset);
231                buf[..bytes_read].copy_from_slice(&slice[*offset..(*offset + bytes_read)]);
232                *offset += bytes_read;
233                Ok(bytes_read)
234            },
235            #[cfg(feature = "decompress")]
236            CompressedReader::Gzip(decoder) => decoder.read(buf),
237            #[cfg(feature = "decompress")]
238            CompressedReader::Zlib(decoder) => decoder.read(buf),
239            #[cfg(feature = "decompress")]
240            CompressedReader::Zstd(decoder) => decoder.read(buf),
241        }
242    }
243}
244
245/// A byte source that abstracts over in-memory buffers and streaming
246/// readers, with optional transparent decompression and buffering.
247///
248/// Implements `BufRead`, allowing uniform access regardless of whether
249/// the underlying data is an in-memory slice, a raw stream, or a
250/// compressed stream (gzip/zlib/zstd).
251///
252/// This is the generic successor to [`CompressedReader`], which only
253/// supports in-memory (`Buffer<u8>`) sources.
254#[cfg(feature = "async")]
255pub enum ByteSourceReader<R: BufRead> {
256    UncompressedMemory {
257        slice: Buffer<u8>,
258        offset: usize,
259    },
260    UncompressedStream(R),
261    #[cfg(feature = "decompress")]
262    Gzip(flate2::bufread::MultiGzDecoder<R>),
263    #[cfg(feature = "decompress")]
264    Zlib(flate2::bufread::ZlibDecoder<R>),
265    #[cfg(feature = "decompress")]
266    Zstd(zstd::Decoder<'static, R>),
267}
268
269#[cfg(feature = "async")]
270impl<R: BufRead> ByteSourceReader<R> {
271    pub fn try_new(reader: R, compression: Option<SupportedCompression>) -> PolarsResult<Self> {
272        Ok(match compression {
273            None => Self::UncompressedStream(reader),
274            #[cfg(feature = "decompress")]
275            Some(SupportedCompression::GZIP) => {
276                Self::Gzip(flate2::bufread::MultiGzDecoder::new(reader))
277            },
278            #[cfg(feature = "decompress")]
279            Some(SupportedCompression::ZLIB) => {
280                Self::Zlib(flate2::bufread::ZlibDecoder::new(reader))
281            },
282            #[cfg(feature = "decompress")]
283            Some(SupportedCompression::ZSTD) => Self::Zstd(zstd::Decoder::with_buffer(reader)?),
284            #[cfg(not(feature = "decompress"))]
285            _ => panic!("activate 'decompress' feature"),
286        })
287    }
288
289    pub fn is_compressed(&self) -> bool {
290        !matches!(
291            &self,
292            Self::UncompressedMemory { .. } | Self::UncompressedStream(_)
293        )
294    }
295
296    pub const fn initial_read_size() -> usize {
297        // We don't want to read too much at the beginning to keep decompression to a minimum if for
298        // example only the schema is needed or a slice op is used. Keep in sync with
299        // `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.
300        32 * 1024
301    }
302
303    pub const fn ideal_read_size() -> usize {
304        // Somewhat conservative guess for L2 size, which performs the best on most machines and is
305        // nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not
306        // recouped by amortizing the block processing cost even further.
307        //
308        // It's possible that callers use or need a larger `read_size` if for example a single row
309        // doesn't fit in the 512KB.
310        512 * 1024
311    }
312
313    /// Reads exactly `read_size` bytes if possible from the internal readers and creates a new
314    /// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.
315    ///
316    /// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and
317    /// this function is called again.
318    ///
319    /// If the underlying reader is uncompressed the operation is a cheap zero-copy
320    /// [`Buffer::sliced`] operation.
321    ///
322    /// By handling slice concatenation at this level we can implement zero-copy reading *and* make
323    /// the interface easier to use.
324    ///
325    /// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this
326    /// function.
327    pub fn read_next_slice(
328        &mut self,
329        prev_leftover: &Buffer<u8>,
330        read_size: usize,
331        uncompressed_size_hint: Option<usize>,
332    ) -> std::io::Result<(Buffer<u8>, usize)> {
333        // Assuming that callers of this function correctly handle re-trying, by continuously growing
334        // prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily
335        // sized rows.
336        let prev_len = prev_leftover.len();
337
338        let reader: &mut dyn Read = match self {
339            // Zero-copy fast-path — no allocation required
340            Self::UncompressedMemory { slice, offset } => {
341                let bytes_read = cmp::min(read_size, slice.len() - *offset);
342                let new_slice = slice
343                    .clone()
344                    .sliced(*offset - prev_len..*offset + bytes_read);
345                *offset += bytes_read;
346                return Ok((new_slice, bytes_read));
347            },
348            Self::UncompressedStream(reader) => reader,
349            #[cfg(feature = "decompress")]
350            Self::Gzip(reader) => reader,
351            #[cfg(feature = "decompress")]
352            Self::Zlib(reader) => reader,
353            #[cfg(feature = "decompress")]
354            Self::Zstd(reader) => reader,
355        };
356
357        let mut buf = Vec::new();
358
359        // Cap the reserve_size, for the scenario where read_size == usize::MAX
360        let max_reserve_size = uncompressed_size_hint.unwrap_or(4 * 1024 * 1024);
361        let reserve_size = cmp::min(prev_len.saturating_add(read_size), max_reserve_size);
362        buf.reserve_exact(reserve_size);
363        buf.extend_from_slice(prev_leftover);
364
365        let bytes_read = reader.take(read_size as u64).read_to_end(&mut buf)?;
366        buf.truncate(prev_len + bytes_read);
367        Ok((Buffer::from_vec(buf), bytes_read))
368    }
369}
370
371#[cfg(feature = "async")]
372impl ByteSourceReader<ReaderSource> {
373    pub fn from_memory(
374        slice: Buffer<u8>,
375        compression: Option<SupportedCompression>,
376    ) -> PolarsResult<Self> {
377        match compression {
378            None => Ok(Self::UncompressedMemory { slice, offset: 0 }),
379            _ => Self::try_new(ReaderSource::Memory(Cursor::new(slice)), compression),
380        }
381    }
382}
383
384/// Constructor for `WriteableTrait` compressed encoders.
385pub enum CompressedWriter {
386    #[cfg(feature = "decompress")]
387    Gzip(Option<flate2::write::GzEncoder<Writeable>>),
388    #[cfg(feature = "decompress")]
389    Zstd(Option<zstd::Encoder<'static, Writeable>>),
390}
391
392impl CompressedWriter {
393    pub fn gzip(writer: Writeable, level: Option<u32>) -> Self {
394        feature_gated!("decompress", {
395            Self::Gzip(Some(flate2::write::GzEncoder::new(
396                writer,
397                level.map(flate2::Compression::new).unwrap_or_default(),
398            )))
399        })
400    }
401
402    pub fn zstd(writer: Writeable, level: Option<u32>) -> std::io::Result<Self> {
403        feature_gated!("decompress", {
404            zstd::Encoder::new(writer, level.unwrap_or(3) as i32)
405                .map(Some)
406                .map(Self::Zstd)
407        })
408    }
409}
410
411impl Write for CompressedWriter {
412    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
413        feature_gated!("decompress", {
414            match self {
415                Self::Gzip(encoder) => encoder.as_mut().unwrap().write(buf),
416                Self::Zstd(encoder) => encoder.as_mut().unwrap().write(buf),
417            }
418        })
419    }
420
421    fn flush(&mut self) -> std::io::Result<()> {
422        feature_gated!("decompress", {
423            match self {
424                Self::Gzip(encoder) => encoder.as_mut().unwrap().flush(),
425                Self::Zstd(encoder) => encoder.as_mut().unwrap().flush(),
426            }
427        })
428    }
429}
430
431impl WriteableTrait for CompressedWriter {
432    fn close(&mut self) -> std::io::Result<()> {
433        feature_gated!("decompress", {
434            let writer = match self {
435                Self::Gzip(encoder) => encoder.take().unwrap().finish()?,
436                Self::Zstd(encoder) => encoder.take().unwrap().finish()?,
437            };
438
439            writer.close(SyncOnCloseType::All)
440        })
441    }
442
443    fn sync_all(&self) -> std::io::Result<()> {
444        feature_gated!("decompress", {
445            match self {
446                Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
447                Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
448            }
449        })
450    }
451
452    fn sync_data(&self) -> std::io::Result<()> {
453        feature_gated!("decompress", {
454            match self {
455                Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
456                Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
457            }
458        })
459    }
460}