polars_io/parquet/write/
writer.rs1use std::io::Write;
2use std::sync::Mutex;
3
4use polars_buffer::Buffer;
5use polars_core::frame::chunk_df_for_writing;
6use polars_core::prelude::*;
7use polars_parquet::write::{
8 CompressionOptions, Encoding, FileWriter, StatisticsOptions, Version, WriteOptions,
9 get_dtype_encoding, to_parquet_schema,
10};
11
12use super::batched_writer::BatchedWriter;
13use super::options::ParquetCompression;
14use super::{KeyValueMetadata, ParquetWriteOptions};
15use crate::shared::schema_to_arrow_checked;
16
17impl ParquetWriteOptions {
18 pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>
19 where
20 F: Write,
21 {
22 ParquetWriter::new(f)
23 .with_compression(self.compression)
24 .with_statistics(self.statistics)
25 .with_row_group_size(self.row_group_size)
26 .with_data_page_size(self.data_page_size)
27 .with_key_value_metadata(self.key_value_metadata.clone())
28 }
29}
30
31#[must_use]
33pub struct ParquetWriter<W> {
34 writer: W,
35 compression: CompressionOptions,
37 statistics: StatisticsOptions,
39 row_group_size: Option<usize>,
41 data_page_size: Option<usize>,
43 parallel: bool,
45 key_value_metadata: Option<KeyValueMetadata>,
47 context_info: Option<PlHashMap<String, String>>,
49}
50
51impl<W> ParquetWriter<W>
52where
53 W: Write,
54{
55 pub fn new(writer: W) -> Self
57 where
58 W: Write,
59 {
60 ParquetWriter {
61 writer,
62 compression: ParquetCompression::default().into(),
63 statistics: StatisticsOptions::default(),
64 row_group_size: None,
65 data_page_size: None,
66 parallel: true,
67 key_value_metadata: None,
68 context_info: None,
69 }
70 }
71
72 pub fn with_compression(mut self, compression: ParquetCompression) -> Self {
77 self.compression = compression.into();
78 self
79 }
80
81 pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {
83 self.statistics = statistics;
84 self
85 }
86
87 pub fn with_row_group_size(mut self, size: Option<usize>) -> Self {
90 self.row_group_size = size;
91 self
92 }
93
94 pub fn with_data_page_size(mut self, limit: Option<usize>) -> Self {
96 self.data_page_size = limit;
97 self
98 }
99
100 pub fn set_parallel(mut self, parallel: bool) -> Self {
102 self.parallel = parallel;
103 self
104 }
105
106 pub fn with_key_value_metadata(mut self, key_value_metadata: Option<KeyValueMetadata>) -> Self {
108 self.key_value_metadata = key_value_metadata;
109 self
110 }
111
112 pub fn with_context_info(mut self, context_info: Option<PlHashMap<String, String>>) -> Self {
114 self.context_info = context_info;
115 self
116 }
117
118 pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
119 let schema = schema_to_arrow_checked(schema, CompatLevel::newest(), "parquet")?;
120 let parquet_schema = to_parquet_schema(&schema)?;
121 let encodings = get_encodings(&schema);
122 let options = self.materialize_options();
123 let writer = Mutex::new(FileWriter::try_new(self.writer, schema, options)?);
124
125 Ok(BatchedWriter {
126 writer,
127 parquet_schema,
128 encodings,
129 options,
130 parallel: self.parallel,
131 key_value_metadata: self.key_value_metadata,
132 })
133 }
134
135 fn materialize_options(&self) -> WriteOptions {
136 WriteOptions {
137 statistics: self.statistics,
138 compression: self.compression,
139 version: Version::V1,
140 data_page_size: self.data_page_size,
141 }
142 }
143
144 pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
147 let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;
148 let mut batched = self.batched(chunked_df.schema())?;
149 batched.write_batch(&chunked_df)?;
150 batched.finish()
151 }
152}
153
154pub fn get_encodings(schema: &ArrowSchema) -> Buffer<Vec<Encoding>> {
155 schema
156 .iter_values()
157 .map(|f| get_dtype_encoding(&f.dtype))
158 .collect()
159}