polars_io/parquet/write/
writer.rs

1use std::io::Write;
2use std::sync::Mutex;
3
4use polars_buffer::Buffer;
5use polars_core::frame::chunk_df_for_writing;
6use polars_core::prelude::*;
7use polars_parquet::write::{
8    CompressionOptions, Encoding, FileWriter, StatisticsOptions, Version, WriteOptions,
9    get_dtype_encoding, to_parquet_schema,
10};
11
12use super::batched_writer::BatchedWriter;
13use super::options::ParquetCompression;
14use super::{KeyValueMetadata, ParquetWriteOptions};
15use crate::shared::schema_to_arrow_checked;
16
17impl ParquetWriteOptions {
18    pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>
19    where
20        F: Write,
21    {
22        ParquetWriter::new(f)
23            .with_compression(self.compression)
24            .with_statistics(self.statistics)
25            .with_row_group_size(self.row_group_size)
26            .with_data_page_size(self.data_page_size)
27            .with_key_value_metadata(self.key_value_metadata.clone())
28    }
29}
30
31/// Write a DataFrame to Parquet format.
32#[must_use]
33pub struct ParquetWriter<W> {
34    writer: W,
35    /// Data page compression
36    compression: CompressionOptions,
37    /// Compute and write column statistics.
38    statistics: StatisticsOptions,
39    /// if `None` will be 512^2 rows
40    row_group_size: Option<usize>,
41    /// if `None` will be 1024^2 bytes
42    data_page_size: Option<usize>,
43    /// Serialize columns in parallel
44    parallel: bool,
45    /// Custom file-level key value metadata
46    key_value_metadata: Option<KeyValueMetadata>,
47    /// Context info for the Parquet file being written.
48    context_info: Option<PlHashMap<String, String>>,
49}
50
51impl<W> ParquetWriter<W>
52where
53    W: Write,
54{
55    /// Create a new writer
56    pub fn new(writer: W) -> Self
57    where
58        W: Write,
59    {
60        ParquetWriter {
61            writer,
62            compression: ParquetCompression::default().into(),
63            statistics: StatisticsOptions::default(),
64            row_group_size: None,
65            data_page_size: None,
66            parallel: true,
67            key_value_metadata: None,
68            context_info: None,
69        }
70    }
71
72    /// Set the compression used. Defaults to `Zstd`.
73    ///
74    /// The default compression `Zstd` has very good performance, but may not yet been supported
75    /// by older readers. If you want more compatibility guarantees, consider using `Snappy`.
76    pub fn with_compression(mut self, compression: ParquetCompression) -> Self {
77        self.compression = compression.into();
78        self
79    }
80
81    /// Compute and write statistic
82    pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {
83        self.statistics = statistics;
84        self
85    }
86
87    /// Set the row group size (in number of rows) during writing. This can reduce memory pressure and improve
88    /// writing performance.
89    pub fn with_row_group_size(mut self, size: Option<usize>) -> Self {
90        self.row_group_size = size;
91        self
92    }
93
94    /// Sets the maximum bytes size of a data page. If `None` will be 1024^2 bytes.
95    pub fn with_data_page_size(mut self, limit: Option<usize>) -> Self {
96        self.data_page_size = limit;
97        self
98    }
99
100    /// Serialize columns in parallel
101    pub fn set_parallel(mut self, parallel: bool) -> Self {
102        self.parallel = parallel;
103        self
104    }
105
106    /// Set custom file-level key value metadata for the Parquet file
107    pub fn with_key_value_metadata(mut self, key_value_metadata: Option<KeyValueMetadata>) -> Self {
108        self.key_value_metadata = key_value_metadata;
109        self
110    }
111
112    /// Set context information for the writer
113    pub fn with_context_info(mut self, context_info: Option<PlHashMap<String, String>>) -> Self {
114        self.context_info = context_info;
115        self
116    }
117
118    pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
119        let schema = schema_to_arrow_checked(schema, CompatLevel::newest(), "parquet")?;
120        let parquet_schema = to_parquet_schema(&schema)?;
121        let encodings = get_encodings(&schema);
122        let options = self.materialize_options();
123        let writer = Mutex::new(FileWriter::try_new(self.writer, schema, options)?);
124
125        Ok(BatchedWriter {
126            writer,
127            parquet_schema,
128            encodings,
129            options,
130            parallel: self.parallel,
131            key_value_metadata: self.key_value_metadata,
132        })
133    }
134
135    fn materialize_options(&self) -> WriteOptions {
136        WriteOptions {
137            statistics: self.statistics,
138            compression: self.compression,
139            version: Version::V1,
140            data_page_size: self.data_page_size,
141        }
142    }
143
144    /// Write the given DataFrame in the writer `W`.
145    /// Returns the total size of the file.
146    pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
147        let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;
148        let mut batched = self.batched(chunked_df.schema())?;
149        batched.write_batch(&chunked_df)?;
150        batched.finish()
151    }
152}
153
154pub fn get_encodings(schema: &ArrowSchema) -> Buffer<Vec<Encoding>> {
155    schema
156        .iter_values()
157        .map(|f| get_dtype_encoding(&f.dtype))
158        .collect()
159}