1use std::cmp;
2use std::io::{BufRead, Cursor, Read, Write};
3
4use polars_buffer::Buffer;
5use polars_core::prelude::*;
6use polars_error::{feature_gated, to_compute_err};
7
8use crate::utils::file::{Writeable, WriteableTrait};
9#[cfg(feature = "async")]
10use crate::utils::stream_buf_reader::ReaderSource;
11use crate::utils::sync_on_close::SyncOnCloseType;
12
13#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
15pub enum SupportedCompression {
16 GZIP,
17 ZLIB,
18 ZSTD,
19}
20
21impl SupportedCompression {
22 pub fn check(bytes: &[u8]) -> Option<Self> {
26 if bytes.len() < 4 {
27 return None;
29 }
30 match bytes[..4] {
31 [0x1f, 0x8b, _, _] => Some(Self::GZIP),
32 [0x78, 0x01, _, _] => Some(Self::ZLIB),
34 [0x78, 0x5e, _, _] => Some(Self::ZLIB),
35 [0x78, 0x9c, _, _] => Some(Self::ZLIB),
36 [0x78, 0xda, _, _] => Some(Self::ZLIB),
37 [0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
38 _ => None,
39 }
40 }
41}
42
43#[allow(clippy::ptr_arg)]
46#[deprecated(note = "may cause OOM, use CompressedReader instead")]
47pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
48 assert!(out.is_empty());
49
50 let Some(algo) = SupportedCompression::check(bytes) else {
51 return Ok(bytes);
52 };
53
54 feature_gated!("decompress", {
55 match algo {
56 SupportedCompression::GZIP => {
57 flate2::read::MultiGzDecoder::new(bytes)
58 .read_to_end(out)
59 .map_err(to_compute_err)?;
60 },
61 SupportedCompression::ZLIB => {
62 flate2::read::ZlibDecoder::new(bytes)
63 .read_to_end(out)
64 .map_err(to_compute_err)?;
65 },
66 SupportedCompression::ZSTD => {
67 zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
68 },
69 }
70
71 Ok(out)
72 })
73}
74
75pub enum CompressedReader {
80 Uncompressed {
81 slice: Buffer<u8>,
82 offset: usize,
83 },
84 #[cfg(feature = "decompress")]
85 Gzip(flate2::bufread::MultiGzDecoder<Cursor<Buffer<u8>>>),
86 #[cfg(feature = "decompress")]
87 Zlib(flate2::bufread::ZlibDecoder<Cursor<Buffer<u8>>>),
88 #[cfg(feature = "decompress")]
89 Zstd(zstd::Decoder<'static, Cursor<Buffer<u8>>>),
90}
91
92impl CompressedReader {
93 pub fn try_new(slice: Buffer<u8>) -> PolarsResult<Self> {
94 let algo = SupportedCompression::check(&slice);
95
96 Ok(match algo {
97 None => CompressedReader::Uncompressed { slice, offset: 0 },
98 #[cfg(feature = "decompress")]
99 Some(SupportedCompression::GZIP) => {
100 CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))
101 },
102 #[cfg(feature = "decompress")]
103 Some(SupportedCompression::ZLIB) => {
104 CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))
105 },
106 #[cfg(feature = "decompress")]
107 Some(SupportedCompression::ZSTD) => {
108 CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)
109 },
110 #[cfg(not(feature = "decompress"))]
111 _ => panic!("activate 'decompress' feature"),
112 })
113 }
114
115 pub fn is_compressed(&self) -> bool {
116 !matches!(&self, CompressedReader::Uncompressed { .. })
117 }
118
119 pub const fn initial_read_size() -> usize {
120 32 * 1024
124 }
125
126 pub const fn ideal_read_size() -> usize {
127 512 * 1024
134 }
135
136 pub fn total_len_estimate(&self) -> usize {
139 const ESTIMATED_DEFLATE_RATIO: usize = 3;
140 const ESTIMATED_ZSTD_RATIO: usize = 5;
141
142 match self {
143 CompressedReader::Uncompressed { slice, .. } => slice.len(),
144 #[cfg(feature = "decompress")]
145 CompressedReader::Gzip(reader) => {
146 reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
147 },
148 #[cfg(feature = "decompress")]
149 CompressedReader::Zlib(reader) => {
150 reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
151 },
152 #[cfg(feature = "decompress")]
153 CompressedReader::Zstd(reader) => {
154 reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO
155 },
156 }
157 }
158
159 pub fn read_next_slice(
174 &mut self,
175 prev_leftover: &Buffer<u8>,
176 read_size: usize,
177 ) -> std::io::Result<(Buffer<u8>, usize)> {
178 let prev_len = prev_leftover.len();
182
183 let mut buf = Vec::new();
184 if self.is_compressed() {
185 let reserve_size = cmp::min(
186 prev_len.saturating_add(read_size),
187 self.total_len_estimate().saturating_mul(2),
188 );
189 buf.reserve_exact(reserve_size);
190 buf.extend_from_slice(prev_leftover);
191 }
192
193 let new_slice_from_read =
194 |bytes_read: usize, mut buf: Vec<u8>| -> std::io::Result<(Buffer<u8>, usize)> {
195 buf.truncate(prev_len + bytes_read);
196 Ok((Buffer::from_vec(buf), bytes_read))
197 };
198
199 match self {
200 CompressedReader::Uncompressed { slice, offset, .. } => {
201 let bytes_read = cmp::min(read_size, slice.len() - *offset);
202 let new_slice = slice
203 .clone()
204 .sliced(*offset - prev_len..*offset + bytes_read);
205 *offset += bytes_read;
206 Ok((new_slice, bytes_read))
207 },
208 #[cfg(feature = "decompress")]
209 CompressedReader::Gzip(decoder) => {
210 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
211 },
212 #[cfg(feature = "decompress")]
213 CompressedReader::Zlib(decoder) => {
214 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
215 },
216 #[cfg(feature = "decompress")]
217 CompressedReader::Zstd(decoder) => {
218 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
219 },
220 }
221 }
222}
223
224impl Read for CompressedReader {
227 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
228 match self {
229 CompressedReader::Uncompressed { slice, offset, .. } => {
230 let bytes_read = cmp::min(buf.len(), slice.len() - *offset);
231 buf[..bytes_read].copy_from_slice(&slice[*offset..(*offset + bytes_read)]);
232 *offset += bytes_read;
233 Ok(bytes_read)
234 },
235 #[cfg(feature = "decompress")]
236 CompressedReader::Gzip(decoder) => decoder.read(buf),
237 #[cfg(feature = "decompress")]
238 CompressedReader::Zlib(decoder) => decoder.read(buf),
239 #[cfg(feature = "decompress")]
240 CompressedReader::Zstd(decoder) => decoder.read(buf),
241 }
242 }
243}
244
245#[cfg(feature = "async")]
255pub enum ByteSourceReader<R: BufRead> {
256 UncompressedMemory {
257 slice: Buffer<u8>,
258 offset: usize,
259 },
260 UncompressedStream(R),
261 #[cfg(feature = "decompress")]
262 Gzip(flate2::bufread::MultiGzDecoder<R>),
263 #[cfg(feature = "decompress")]
264 Zlib(flate2::bufread::ZlibDecoder<R>),
265 #[cfg(feature = "decompress")]
266 Zstd(zstd::Decoder<'static, R>),
267}
268
269#[cfg(feature = "async")]
270impl<R: BufRead> ByteSourceReader<R> {
271 pub fn try_new(reader: R, compression: Option<SupportedCompression>) -> PolarsResult<Self> {
272 Ok(match compression {
273 None => Self::UncompressedStream(reader),
274 #[cfg(feature = "decompress")]
275 Some(SupportedCompression::GZIP) => {
276 Self::Gzip(flate2::bufread::MultiGzDecoder::new(reader))
277 },
278 #[cfg(feature = "decompress")]
279 Some(SupportedCompression::ZLIB) => {
280 Self::Zlib(flate2::bufread::ZlibDecoder::new(reader))
281 },
282 #[cfg(feature = "decompress")]
283 Some(SupportedCompression::ZSTD) => Self::Zstd(zstd::Decoder::with_buffer(reader)?),
284 #[cfg(not(feature = "decompress"))]
285 _ => panic!("activate 'decompress' feature"),
286 })
287 }
288
289 pub fn is_compressed(&self) -> bool {
290 !matches!(
291 &self,
292 Self::UncompressedMemory { .. } | Self::UncompressedStream(_)
293 )
294 }
295
296 pub const fn initial_read_size() -> usize {
297 32 * 1024
301 }
302
303 pub const fn ideal_read_size() -> usize {
304 512 * 1024
311 }
312
313 pub fn read_next_slice(
328 &mut self,
329 prev_leftover: &Buffer<u8>,
330 read_size: usize,
331 uncompressed_size_hint: Option<usize>,
332 ) -> std::io::Result<(Buffer<u8>, usize)> {
333 let prev_len = prev_leftover.len();
337
338 let reader: &mut dyn Read = match self {
339 Self::UncompressedMemory { slice, offset } => {
341 let bytes_read = cmp::min(read_size, slice.len() - *offset);
342 let new_slice = slice
343 .clone()
344 .sliced(*offset - prev_len..*offset + bytes_read);
345 *offset += bytes_read;
346 return Ok((new_slice, bytes_read));
347 },
348 Self::UncompressedStream(reader) => reader,
349 #[cfg(feature = "decompress")]
350 Self::Gzip(reader) => reader,
351 #[cfg(feature = "decompress")]
352 Self::Zlib(reader) => reader,
353 #[cfg(feature = "decompress")]
354 Self::Zstd(reader) => reader,
355 };
356
357 let mut buf = Vec::new();
358
359 let max_reserve_size = uncompressed_size_hint.unwrap_or(4 * 1024 * 1024);
361 let reserve_size = cmp::min(prev_len.saturating_add(read_size), max_reserve_size);
362 buf.reserve_exact(reserve_size);
363 buf.extend_from_slice(prev_leftover);
364
365 let bytes_read = reader.take(read_size as u64).read_to_end(&mut buf)?;
366 buf.truncate(prev_len + bytes_read);
367 Ok((Buffer::from_vec(buf), bytes_read))
368 }
369}
370
371#[cfg(feature = "async")]
372impl ByteSourceReader<ReaderSource> {
373 pub fn from_memory(
374 slice: Buffer<u8>,
375 compression: Option<SupportedCompression>,
376 ) -> PolarsResult<Self> {
377 match compression {
378 None => Ok(Self::UncompressedMemory { slice, offset: 0 }),
379 _ => Self::try_new(ReaderSource::Memory(Cursor::new(slice)), compression),
380 }
381 }
382}
383
384pub enum CompressedWriter {
386 #[cfg(feature = "decompress")]
387 Gzip(Option<flate2::write::GzEncoder<Writeable>>),
388 #[cfg(feature = "decompress")]
389 Zstd(Option<zstd::Encoder<'static, Writeable>>),
390}
391
392impl CompressedWriter {
393 pub fn gzip(writer: Writeable, level: Option<u32>) -> Self {
394 feature_gated!("decompress", {
395 Self::Gzip(Some(flate2::write::GzEncoder::new(
396 writer,
397 level.map(flate2::Compression::new).unwrap_or_default(),
398 )))
399 })
400 }
401
402 pub fn zstd(writer: Writeable, level: Option<u32>) -> std::io::Result<Self> {
403 feature_gated!("decompress", {
404 zstd::Encoder::new(writer, level.unwrap_or(3) as i32)
405 .map(Some)
406 .map(Self::Zstd)
407 })
408 }
409}
410
411impl Write for CompressedWriter {
412 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
413 feature_gated!("decompress", {
414 match self {
415 Self::Gzip(encoder) => encoder.as_mut().unwrap().write(buf),
416 Self::Zstd(encoder) => encoder.as_mut().unwrap().write(buf),
417 }
418 })
419 }
420
421 fn flush(&mut self) -> std::io::Result<()> {
422 feature_gated!("decompress", {
423 match self {
424 Self::Gzip(encoder) => encoder.as_mut().unwrap().flush(),
425 Self::Zstd(encoder) => encoder.as_mut().unwrap().flush(),
426 }
427 })
428 }
429}
430
431impl WriteableTrait for CompressedWriter {
432 fn close(&mut self) -> std::io::Result<()> {
433 feature_gated!("decompress", {
434 let writer = match self {
435 Self::Gzip(encoder) => encoder.take().unwrap().finish()?,
436 Self::Zstd(encoder) => encoder.take().unwrap().finish()?,
437 };
438
439 writer.close(SyncOnCloseType::All)
440 })
441 }
442
443 fn sync_all(&self) -> std::io::Result<()> {
444 feature_gated!("decompress", {
445 match self {
446 Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
447 Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
448 }
449 })
450 }
451
452 fn sync_data(&self) -> std::io::Result<()> {
453 feature_gated!("decompress", {
454 match self {
455 Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
456 Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
457 }
458 })
459 }
460}