polars_io/parquet/write/
writer.rs1use std::io::Write;
2use std::sync::Mutex;
3
4use arrow::datatypes::PhysicalType;
5use polars_core::frame::chunk_df_for_writing;
6use polars_core::prelude::*;
7use polars_parquet::write::{
8 CompressionOptions, Encoding, FileWriter, StatisticsOptions, Version, WriteOptions,
9 to_parquet_schema, transverse,
10};
11
12use super::ParquetWriteOptions;
13use super::batched_writer::BatchedWriter;
14use super::options::ParquetCompression;
15use crate::shared::schema_to_arrow_checked;
16
17impl ParquetWriteOptions {
18 pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>
19 where
20 F: Write,
21 {
22 ParquetWriter::new(f)
23 .with_compression(self.compression)
24 .with_statistics(self.statistics)
25 .with_row_group_size(self.row_group_size)
26 .with_data_page_size(self.data_page_size)
27 }
28}
29
30#[must_use]
32pub struct ParquetWriter<W> {
33 writer: W,
34 compression: CompressionOptions,
36 statistics: StatisticsOptions,
38 row_group_size: Option<usize>,
40 data_page_size: Option<usize>,
42 parallel: bool,
44}
45
46impl<W> ParquetWriter<W>
47where
48 W: Write,
49{
50 pub fn new(writer: W) -> Self
52 where
53 W: Write,
54 {
55 ParquetWriter {
56 writer,
57 compression: ParquetCompression::default().into(),
58 statistics: StatisticsOptions::default(),
59 row_group_size: None,
60 data_page_size: None,
61 parallel: true,
62 }
63 }
64
65 pub fn with_compression(mut self, compression: ParquetCompression) -> Self {
70 self.compression = compression.into();
71 self
72 }
73
74 pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {
76 self.statistics = statistics;
77 self
78 }
79
80 pub fn with_row_group_size(mut self, size: Option<usize>) -> Self {
83 self.row_group_size = size;
84 self
85 }
86
87 pub fn with_data_page_size(mut self, limit: Option<usize>) -> Self {
89 self.data_page_size = limit;
90 self
91 }
92
93 pub fn set_parallel(mut self, parallel: bool) -> Self {
95 self.parallel = parallel;
96 self
97 }
98
99 pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
100 let schema = schema_to_arrow_checked(schema, CompatLevel::newest(), "parquet")?;
101 let parquet_schema = to_parquet_schema(&schema)?;
102 let encodings = get_encodings(&schema);
103 let options = self.materialize_options();
104 let writer = Mutex::new(FileWriter::try_new(self.writer, schema, options)?);
105
106 Ok(BatchedWriter {
107 writer,
108 parquet_schema,
109 encodings,
110 options,
111 parallel: self.parallel,
112 })
113 }
114
115 fn materialize_options(&self) -> WriteOptions {
116 WriteOptions {
117 statistics: self.statistics,
118 compression: self.compression,
119 version: Version::V1,
120 data_page_size: self.data_page_size,
121 }
122 }
123
124 pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
126 let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;
127 let mut batched = self.batched(chunked_df.schema())?;
128 batched.write_batch(&chunked_df)?;
129 batched.finish()
130 }
131}
132
133pub fn get_encodings(schema: &ArrowSchema) -> Vec<Vec<Encoding>> {
134 schema
135 .iter_values()
136 .map(|f| transverse(&f.dtype, encoding_map))
137 .collect()
138}
139
140fn encoding_map(dtype: &ArrowDataType) -> Encoding {
142 match dtype.to_physical_type() {
143 PhysicalType::Dictionary(_)
144 | PhysicalType::LargeBinary
145 | PhysicalType::LargeUtf8
146 | PhysicalType::Utf8View
147 | PhysicalType::BinaryView => Encoding::RleDictionary,
148 PhysicalType::Primitive(dt) => {
149 use arrow::types::PrimitiveType::*;
150 match dt {
151 Float32 | Float64 | Float16 => Encoding::Plain,
152 _ => Encoding::RleDictionary,
153 }
154 },
155 _ => Encoding::Plain,
157 }
158}