polars_io/csv/write/
writer.rs1use std::io::Write;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4
5use polars_core::POOL;
6use polars_core::frame::DataFrame;
7use polars_core::schema::Schema;
8use polars_error::PolarsResult;
9use polars_utils::pl_str::PlSmallStr;
10
11use super::write_impl::{write, write_bom, write_csv_header};
12use super::{QuoteStyle, SerializeOptions};
13use crate::shared::SerWriter;
14
15#[must_use]
19pub struct CsvWriter<W: Write> {
20 buffer: W,
22 options: Arc<SerializeOptions>,
23 header: bool,
24 bom: bool,
25 batch_size: NonZeroUsize,
26 n_threads: usize,
27}
28
29impl<W> SerWriter<W> for CsvWriter<W>
30where
31 W: Write,
32{
33 fn new(buffer: W) -> Self {
34 let options = SerializeOptions::default();
35
36 CsvWriter {
37 buffer,
38 options: options.into(),
39 header: true,
40 bom: false,
41 batch_size: NonZeroUsize::new(1024).unwrap(),
42 n_threads: POOL.current_num_threads(),
43 }
44 }
45
46 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
47 if self.bom {
48 write_bom(&mut self.buffer)?;
49 }
50 let names = df
51 .get_column_names()
52 .into_iter()
53 .map(|x| x.as_str())
54 .collect::<Vec<_>>();
55 if self.header {
56 write_csv_header(&mut self.buffer, names.as_slice(), &self.options)?;
57 }
58 write(
59 &mut self.buffer,
60 df,
61 self.batch_size.into(),
62 self.options.clone(),
63 self.n_threads,
64 )
65 }
66}
67
68impl<W> CsvWriter<W>
69where
70 W: Write,
71{
72 fn options_mut(&mut self) -> &mut SerializeOptions {
73 Arc::make_mut(&mut self.options)
74 }
75
76 pub fn include_bom(mut self, include_bom: bool) -> Self {
78 self.bom = include_bom;
79 self
80 }
81
82 pub fn include_header(mut self, include_header: bool) -> Self {
84 self.header = include_header;
85 self
86 }
87
88 pub fn with_separator(mut self, separator: u8) -> Self {
90 self.options_mut().separator = separator;
91 self
92 }
93
94 pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
96 self.batch_size = batch_size;
97 self
98 }
99
100 pub fn with_date_format(mut self, format: Option<PlSmallStr>) -> Self {
102 if format.is_some() {
103 self.options_mut().date_format = format;
104 }
105 self
106 }
107
108 pub fn with_time_format(mut self, format: Option<PlSmallStr>) -> Self {
110 if format.is_some() {
111 self.options_mut().time_format = format;
112 }
113 self
114 }
115
116 pub fn with_datetime_format(mut self, format: Option<PlSmallStr>) -> Self {
118 if format.is_some() {
119 self.options_mut().datetime_format = format;
120 }
121 self
122 }
123
124 pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {
126 if scientific.is_some() {
127 self.options_mut().float_scientific = scientific;
128 }
129 self
130 }
131
132 pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {
134 if precision.is_some() {
135 self.options_mut().float_precision = precision;
136 }
137 self
138 }
139
140 pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
142 self.options_mut().decimal_comma = decimal_comma;
143 self
144 }
145
146 pub fn with_quote_char(mut self, char: u8) -> Self {
148 self.options_mut().quote_char = char;
149 self
150 }
151
152 pub fn with_null_value(mut self, null_value: PlSmallStr) -> Self {
154 self.options_mut().null = null_value;
155 self
156 }
157
158 pub fn with_line_terminator(mut self, line_terminator: PlSmallStr) -> Self {
160 self.options_mut().line_terminator = line_terminator;
161 self
162 }
163
164 pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
167 self.options_mut().quote_style = quote_style;
168 self
169 }
170
171 pub fn n_threads(mut self, n_threads: usize) -> Self {
172 self.n_threads = n_threads;
173 self
174 }
175
176 pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
177 let expects_bom = self.bom;
178 let expects_header = self.header;
179 Ok(BatchedWriter {
180 writer: self,
181 has_written_bom: !expects_bom,
182 has_written_header: !expects_header,
183 schema: schema.clone(),
184 })
185 }
186}
187
188pub struct BatchedWriter<W: Write> {
189 writer: CsvWriter<W>,
190 has_written_bom: bool,
191 has_written_header: bool,
192 schema: Schema,
193}
194
195impl<W: Write> BatchedWriter<W> {
196 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
201 if !self.has_written_bom {
202 self.has_written_bom = true;
203 write_bom(&mut self.writer.buffer)?;
204 }
205
206 if !self.has_written_header {
207 self.has_written_header = true;
208 let names = df
209 .get_column_names()
210 .into_iter()
211 .map(|x| x.as_str())
212 .collect::<Vec<_>>();
213 write_csv_header(
214 &mut self.writer.buffer,
215 names.as_slice(),
216 &self.writer.options,
217 )?;
218 }
219
220 write(
221 &mut self.writer.buffer,
222 df,
223 self.writer.batch_size.into(),
224 self.writer.options.clone(),
225 self.writer.n_threads,
226 )?;
227 Ok(())
228 }
229
230 pub fn finish(&mut self) -> PolarsResult<()> {
232 if !self.has_written_bom {
233 self.has_written_bom = true;
234 write_bom(&mut self.writer.buffer)?;
235 }
236
237 if !self.has_written_header {
238 self.has_written_header = true;
239 let names = self
240 .schema
241 .iter_names()
242 .map(|x| x.as_str())
243 .collect::<Vec<_>>();
244 write_csv_header(&mut self.writer.buffer, &names, &self.writer.options)?;
245 };
246
247 Ok(())
248 }
249}