polars_io/parquet/write/
writer.rs

1use std::io::Write;
2use std::sync::Mutex;
3
4use arrow::datatypes::PhysicalType;
5use polars_core::frame::chunk_df_for_writing;
6use polars_core::prelude::*;
7use polars_parquet::write::{
8    CompressionOptions, Encoding, FileWriter, StatisticsOptions, Version, WriteOptions,
9    to_parquet_schema, transverse,
10};
11
12use super::ParquetWriteOptions;
13use super::batched_writer::BatchedWriter;
14use super::options::ParquetCompression;
15use crate::shared::schema_to_arrow_checked;
16
17impl ParquetWriteOptions {
18    pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>
19    where
20        F: Write,
21    {
22        ParquetWriter::new(f)
23            .with_compression(self.compression)
24            .with_statistics(self.statistics)
25            .with_row_group_size(self.row_group_size)
26            .with_data_page_size(self.data_page_size)
27    }
28}
29
30/// Write a DataFrame to Parquet format.
31#[must_use]
32pub struct ParquetWriter<W> {
33    writer: W,
34    /// Data page compression
35    compression: CompressionOptions,
36    /// Compute and write column statistics.
37    statistics: StatisticsOptions,
38    /// if `None` will be 512^2 rows
39    row_group_size: Option<usize>,
40    /// if `None` will be 1024^2 bytes
41    data_page_size: Option<usize>,
42    /// Serialize columns in parallel
43    parallel: bool,
44}
45
46impl<W> ParquetWriter<W>
47where
48    W: Write,
49{
50    /// Create a new writer
51    pub fn new(writer: W) -> Self
52    where
53        W: Write,
54    {
55        ParquetWriter {
56            writer,
57            compression: ParquetCompression::default().into(),
58            statistics: StatisticsOptions::default(),
59            row_group_size: None,
60            data_page_size: None,
61            parallel: true,
62        }
63    }
64
65    /// Set the compression used. Defaults to `Zstd`.
66    ///
67    /// The default compression `Zstd` has very good performance, but may not yet been supported
68    /// by older readers. If you want more compatibility guarantees, consider using `Snappy`.
69    pub fn with_compression(mut self, compression: ParquetCompression) -> Self {
70        self.compression = compression.into();
71        self
72    }
73
74    /// Compute and write statistic
75    pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {
76        self.statistics = statistics;
77        self
78    }
79
80    /// Set the row group size (in number of rows) during writing. This can reduce memory pressure and improve
81    /// writing performance.
82    pub fn with_row_group_size(mut self, size: Option<usize>) -> Self {
83        self.row_group_size = size;
84        self
85    }
86
87    /// Sets the maximum bytes size of a data page. If `None` will be 1024^2 bytes.
88    pub fn with_data_page_size(mut self, limit: Option<usize>) -> Self {
89        self.data_page_size = limit;
90        self
91    }
92
93    /// Serialize columns in parallel
94    pub fn set_parallel(mut self, parallel: bool) -> Self {
95        self.parallel = parallel;
96        self
97    }
98
99    pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
100        let schema = schema_to_arrow_checked(schema, CompatLevel::newest(), "parquet")?;
101        let parquet_schema = to_parquet_schema(&schema)?;
102        let encodings = get_encodings(&schema);
103        let options = self.materialize_options();
104        let writer = Mutex::new(FileWriter::try_new(self.writer, schema, options)?);
105
106        Ok(BatchedWriter {
107            writer,
108            parquet_schema,
109            encodings,
110            options,
111            parallel: self.parallel,
112        })
113    }
114
115    fn materialize_options(&self) -> WriteOptions {
116        WriteOptions {
117            statistics: self.statistics,
118            compression: self.compression,
119            version: Version::V1,
120            data_page_size: self.data_page_size,
121        }
122    }
123
124    /// Write the given DataFrame in the writer `W`. Returns the total size of the file.
125    pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
126        let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;
127        let mut batched = self.batched(chunked_df.schema())?;
128        batched.write_batch(&chunked_df)?;
129        batched.finish()
130    }
131}
132
133pub fn get_encodings(schema: &ArrowSchema) -> Vec<Vec<Encoding>> {
134    schema
135        .iter_values()
136        .map(|f| transverse(&f.dtype, encoding_map))
137        .collect()
138}
139
140/// Declare encodings
141fn encoding_map(dtype: &ArrowDataType) -> Encoding {
142    match dtype.to_physical_type() {
143        PhysicalType::Dictionary(_)
144        | PhysicalType::LargeBinary
145        | PhysicalType::LargeUtf8
146        | PhysicalType::Utf8View
147        | PhysicalType::BinaryView => Encoding::RleDictionary,
148        PhysicalType::Primitive(dt) => {
149            use arrow::types::PrimitiveType::*;
150            match dt {
151                Float32 | Float64 | Float16 => Encoding::Plain,
152                _ => Encoding::RleDictionary,
153            }
154        },
155        // remaining is plain
156        _ => Encoding::Plain,
157    }
158}