1use std::cmp;
2use std::io::{BufRead, Cursor, Read, Write};
3
4use polars_buffer::Buffer;
5use polars_core::prelude::*;
6use polars_error::{feature_gated, to_compute_err};
7
8use crate::utils::file::{Writeable, WriteableTrait};
9use crate::utils::stream_buf_reader::ReaderSource;
10use crate::utils::sync_on_close::SyncOnCloseType;
11
12#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
14pub enum SupportedCompression {
15 GZIP,
16 ZLIB,
17 ZSTD,
18}
19
20impl SupportedCompression {
21 pub fn check(bytes: &[u8]) -> Option<Self> {
25 if bytes.len() < 4 {
26 return None;
28 }
29 match bytes[..4] {
30 [0x1f, 0x8b, _, _] => Some(Self::GZIP),
31 [0x78, 0x01, _, _] => Some(Self::ZLIB),
33 [0x78, 0x5e, _, _] => Some(Self::ZLIB),
34 [0x78, 0x9c, _, _] => Some(Self::ZLIB),
35 [0x78, 0xda, _, _] => Some(Self::ZLIB),
36 [0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
37 _ => None,
38 }
39 }
40}
41
42#[allow(clippy::ptr_arg)]
45#[deprecated(note = "may cause OOM, use CompressedReader instead")]
46pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
47 assert!(out.is_empty());
48
49 let Some(algo) = SupportedCompression::check(bytes) else {
50 return Ok(bytes);
51 };
52
53 feature_gated!("decompress", {
54 match algo {
55 SupportedCompression::GZIP => {
56 flate2::read::MultiGzDecoder::new(bytes)
57 .read_to_end(out)
58 .map_err(to_compute_err)?;
59 },
60 SupportedCompression::ZLIB => {
61 flate2::read::ZlibDecoder::new(bytes)
62 .read_to_end(out)
63 .map_err(to_compute_err)?;
64 },
65 SupportedCompression::ZSTD => {
66 zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
67 },
68 }
69
70 Ok(out)
71 })
72}
73
74pub enum CompressedReader {
79 Uncompressed {
80 slice: Buffer<u8>,
81 offset: usize,
82 },
83 #[cfg(feature = "decompress")]
84 Gzip(flate2::bufread::MultiGzDecoder<Cursor<Buffer<u8>>>),
85 #[cfg(feature = "decompress")]
86 Zlib(flate2::bufread::ZlibDecoder<Cursor<Buffer<u8>>>),
87 #[cfg(feature = "decompress")]
88 Zstd(zstd::Decoder<'static, Cursor<Buffer<u8>>>),
89}
90
91impl CompressedReader {
92 pub fn try_new(slice: Buffer<u8>) -> PolarsResult<Self> {
93 let algo = SupportedCompression::check(&slice);
94
95 Ok(match algo {
96 None => CompressedReader::Uncompressed { slice, offset: 0 },
97 #[cfg(feature = "decompress")]
98 Some(SupportedCompression::GZIP) => {
99 CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))
100 },
101 #[cfg(feature = "decompress")]
102 Some(SupportedCompression::ZLIB) => {
103 CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))
104 },
105 #[cfg(feature = "decompress")]
106 Some(SupportedCompression::ZSTD) => {
107 CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)
108 },
109 #[cfg(not(feature = "decompress"))]
110 _ => panic!("activate 'decompress' feature"),
111 })
112 }
113
114 pub fn is_compressed(&self) -> bool {
115 !matches!(&self, CompressedReader::Uncompressed { .. })
116 }
117
118 pub const fn initial_read_size() -> usize {
119 32 * 1024
123 }
124
125 pub const fn ideal_read_size() -> usize {
126 512 * 1024
133 }
134
135 pub fn total_len_estimate(&self) -> usize {
138 const ESTIMATED_DEFLATE_RATIO: usize = 3;
139 const ESTIMATED_ZSTD_RATIO: usize = 5;
140
141 match self {
142 CompressedReader::Uncompressed { slice, .. } => slice.len(),
143 #[cfg(feature = "decompress")]
144 CompressedReader::Gzip(reader) => {
145 reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
146 },
147 #[cfg(feature = "decompress")]
148 CompressedReader::Zlib(reader) => {
149 reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
150 },
151 #[cfg(feature = "decompress")]
152 CompressedReader::Zstd(reader) => {
153 reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO
154 },
155 }
156 }
157
158 pub fn read_next_slice(
173 &mut self,
174 prev_leftover: &Buffer<u8>,
175 read_size: usize,
176 ) -> std::io::Result<(Buffer<u8>, usize)> {
177 let prev_len = prev_leftover.len();
181
182 let mut buf = Vec::new();
183 if self.is_compressed() {
184 let reserve_size = cmp::min(
185 prev_len.saturating_add(read_size),
186 self.total_len_estimate().saturating_mul(2),
187 );
188 buf.reserve_exact(reserve_size);
189 buf.extend_from_slice(prev_leftover);
190 }
191
192 let new_slice_from_read =
193 |bytes_read: usize, mut buf: Vec<u8>| -> std::io::Result<(Buffer<u8>, usize)> {
194 buf.truncate(prev_len + bytes_read);
195 Ok((Buffer::from_vec(buf), bytes_read))
196 };
197
198 match self {
199 CompressedReader::Uncompressed { slice, offset, .. } => {
200 let bytes_read = cmp::min(read_size, slice.len() - *offset);
201 let new_slice = slice
202 .clone()
203 .sliced(*offset - prev_len..*offset + bytes_read);
204 *offset += bytes_read;
205 Ok((new_slice, bytes_read))
206 },
207 #[cfg(feature = "decompress")]
208 CompressedReader::Gzip(decoder) => {
209 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
210 },
211 #[cfg(feature = "decompress")]
212 CompressedReader::Zlib(decoder) => {
213 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
214 },
215 #[cfg(feature = "decompress")]
216 CompressedReader::Zstd(decoder) => {
217 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
218 },
219 }
220 }
221}
222
223impl Read for CompressedReader {
226 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
227 match self {
228 CompressedReader::Uncompressed { slice, offset, .. } => {
229 let bytes_read = cmp::min(buf.len(), slice.len() - *offset);
230 buf[..bytes_read].copy_from_slice(&slice[*offset..(*offset + bytes_read)]);
231 *offset += bytes_read;
232 Ok(bytes_read)
233 },
234 #[cfg(feature = "decompress")]
235 CompressedReader::Gzip(decoder) => decoder.read(buf),
236 #[cfg(feature = "decompress")]
237 CompressedReader::Zlib(decoder) => decoder.read(buf),
238 #[cfg(feature = "decompress")]
239 CompressedReader::Zstd(decoder) => decoder.read(buf),
240 }
241 }
242}
243
244pub enum ByteSourceReader<R: BufRead> {
254 UncompressedMemory {
255 slice: Buffer<u8>,
256 offset: usize,
257 },
258 UncompressedStream(R),
259 #[cfg(feature = "decompress")]
260 Gzip(flate2::bufread::MultiGzDecoder<R>),
261 #[cfg(feature = "decompress")]
262 Zlib(flate2::bufread::ZlibDecoder<R>),
263 #[cfg(feature = "decompress")]
264 Zstd(zstd::Decoder<'static, R>),
265}
266
267impl<R: BufRead> ByteSourceReader<R> {
268 pub fn try_new(reader: R, compression: Option<SupportedCompression>) -> PolarsResult<Self> {
269 Ok(match compression {
270 None => Self::UncompressedStream(reader),
271 #[cfg(feature = "decompress")]
272 Some(SupportedCompression::GZIP) => {
273 Self::Gzip(flate2::bufread::MultiGzDecoder::new(reader))
274 },
275 #[cfg(feature = "decompress")]
276 Some(SupportedCompression::ZLIB) => {
277 Self::Zlib(flate2::bufread::ZlibDecoder::new(reader))
278 },
279 #[cfg(feature = "decompress")]
280 Some(SupportedCompression::ZSTD) => Self::Zstd(zstd::Decoder::with_buffer(reader)?),
281 #[cfg(not(feature = "decompress"))]
282 _ => panic!("activate 'decompress' feature"),
283 })
284 }
285
286 pub fn is_compressed(&self) -> bool {
287 !matches!(
288 &self,
289 Self::UncompressedMemory { .. } | Self::UncompressedStream(_)
290 )
291 }
292
293 pub fn compression(&self) -> Option<SupportedCompression> {
294 match self {
295 Self::UncompressedMemory { .. } => None,
296 Self::UncompressedStream(_) => None,
297 #[cfg(feature = "decompress")]
298 Self::Gzip(_) => Some(SupportedCompression::GZIP),
299 #[cfg(feature = "decompress")]
300 Self::Zlib(_) => Some(SupportedCompression::ZLIB),
301 #[cfg(feature = "decompress")]
302 Self::Zstd(_) => Some(SupportedCompression::ZSTD),
303 }
304 }
305
306 pub const fn initial_read_size() -> usize {
307 32 * 1024
311 }
312
313 pub const fn ideal_read_size() -> usize {
314 512 * 1024
321 }
322
323 pub fn read_next_slice(
338 &mut self,
339 prev_leftover: &Buffer<u8>,
340 read_size: usize,
341 uncompressed_size_hint: Option<usize>,
342 ) -> std::io::Result<(Buffer<u8>, usize)> {
343 let prev_len = prev_leftover.len();
347
348 let reader: &mut dyn Read = match self {
349 Self::UncompressedMemory { slice, offset } => {
351 let bytes_read = cmp::min(read_size, slice.len() - *offset);
352 let new_slice = slice
353 .clone()
354 .sliced(*offset - prev_len..*offset + bytes_read);
355 *offset += bytes_read;
356 return Ok((new_slice, bytes_read));
357 },
358 Self::UncompressedStream(reader) => reader,
359 #[cfg(feature = "decompress")]
360 Self::Gzip(reader) => reader,
361 #[cfg(feature = "decompress")]
362 Self::Zlib(reader) => reader,
363 #[cfg(feature = "decompress")]
364 Self::Zstd(reader) => reader,
365 };
366
367 let mut buf = Vec::new();
368
369 let max_reserve_size = uncompressed_size_hint.unwrap_or(4 * 1024 * 1024);
371 let reserve_size = cmp::min(prev_len.saturating_add(read_size), max_reserve_size);
372 buf.reserve_exact(reserve_size);
373 buf.extend_from_slice(prev_leftover);
374
375 let bytes_read = reader.take(read_size as u64).read_to_end(&mut buf)?;
376 buf.truncate(prev_len + bytes_read);
377 Ok((Buffer::from_vec(buf), bytes_read))
378 }
379}
380
381impl ByteSourceReader<ReaderSource> {
382 pub fn from_memory(slice: Buffer<u8>) -> PolarsResult<Self> {
383 let compression = SupportedCompression::check(&slice);
384 match compression {
385 None => Ok(Self::UncompressedMemory { slice, offset: 0 }),
386 _ => Self::try_new(ReaderSource::Memory(Cursor::new(slice)), compression),
387 }
388 }
389}
390
391pub enum CompressedWriter {
393 #[cfg(feature = "decompress")]
394 Gzip(Option<flate2::write::GzEncoder<Writeable>>),
395 #[cfg(feature = "decompress")]
396 Zstd(Option<zstd::Encoder<'static, Writeable>>),
397}
398
399impl CompressedWriter {
400 pub fn gzip(writer: Writeable, level: Option<u32>) -> Self {
401 feature_gated!("decompress", {
402 Self::Gzip(Some(flate2::write::GzEncoder::new(
403 writer,
404 level.map(flate2::Compression::new).unwrap_or_default(),
405 )))
406 })
407 }
408
409 pub fn zstd(writer: Writeable, level: Option<u32>) -> std::io::Result<Self> {
410 feature_gated!("decompress", {
411 zstd::Encoder::new(writer, level.unwrap_or(3) as i32)
412 .map(Some)
413 .map(Self::Zstd)
414 })
415 }
416}
417
418impl Write for CompressedWriter {
419 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
420 feature_gated!("decompress", {
421 match self {
422 Self::Gzip(encoder) => encoder.as_mut().unwrap().write(buf),
423 Self::Zstd(encoder) => encoder.as_mut().unwrap().write(buf),
424 }
425 })
426 }
427
428 fn flush(&mut self) -> std::io::Result<()> {
429 feature_gated!("decompress", {
430 match self {
431 Self::Gzip(encoder) => encoder.as_mut().unwrap().flush(),
432 Self::Zstd(encoder) => encoder.as_mut().unwrap().flush(),
433 }
434 })
435 }
436}
437
438impl WriteableTrait for CompressedWriter {
439 fn close(&mut self) -> std::io::Result<()> {
440 feature_gated!("decompress", {
441 let writer = match self {
442 Self::Gzip(encoder) => encoder.take().unwrap().finish()?,
443 Self::Zstd(encoder) => encoder.take().unwrap().finish()?,
444 };
445
446 writer.close(SyncOnCloseType::All)
447 })
448 }
449
450 fn sync_all(&self) -> std::io::Result<()> {
451 feature_gated!("decompress", {
452 match self {
453 Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
454 Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
455 }
456 })
457 }
458
459 fn sync_data(&self) -> std::io::Result<()> {
460 feature_gated!("decompress", {
461 match self {
462 Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
463 Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
464 }
465 })
466 }
467}