polars_io/csv/write/
writer.rs1use std::io::Write;
2use std::num::NonZeroUsize;
3
4use polars_core::POOL;
5use polars_core::frame::DataFrame;
6use polars_core::schema::Schema;
7use polars_error::PolarsResult;
8
9use super::write_impl::{write, write_bom, write_header};
10use super::{QuoteStyle, SerializeOptions};
11use crate::shared::SerWriter;
12
13#[must_use]
17pub struct CsvWriter<W: Write> {
18 buffer: W,
20 options: SerializeOptions,
21 header: bool,
22 bom: bool,
23 batch_size: NonZeroUsize,
24 n_threads: usize,
25}
26
27impl<W> SerWriter<W> for CsvWriter<W>
28where
29 W: Write,
30{
31 fn new(buffer: W) -> Self {
32 let options = SerializeOptions {
34 time_format: Some("%T%.9f".to_string()),
35 ..Default::default()
36 };
37
38 CsvWriter {
39 buffer,
40 options,
41 header: true,
42 bom: false,
43 batch_size: NonZeroUsize::new(1024).unwrap(),
44 n_threads: POOL.current_num_threads(),
45 }
46 }
47
48 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
49 if self.bom {
50 write_bom(&mut self.buffer)?;
51 }
52 let names = df
53 .get_column_names()
54 .into_iter()
55 .map(|x| x.as_str())
56 .collect::<Vec<_>>();
57 if self.header {
58 write_header(&mut self.buffer, names.as_slice(), &self.options)?;
59 }
60 write(
61 &mut self.buffer,
62 df,
63 self.batch_size.into(),
64 &self.options,
65 self.n_threads,
66 )
67 }
68}
69
70impl<W> CsvWriter<W>
71where
72 W: Write,
73{
74 pub fn include_bom(mut self, include_bom: bool) -> Self {
76 self.bom = include_bom;
77 self
78 }
79
80 pub fn include_header(mut self, include_header: bool) -> Self {
82 self.header = include_header;
83 self
84 }
85
86 pub fn with_separator(mut self, separator: u8) -> Self {
88 self.options.separator = separator;
89 self
90 }
91
92 pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
94 self.batch_size = batch_size;
95 self
96 }
97
98 pub fn with_date_format(mut self, format: Option<String>) -> Self {
100 if format.is_some() {
101 self.options.date_format = format;
102 }
103 self
104 }
105
106 pub fn with_time_format(mut self, format: Option<String>) -> Self {
108 if format.is_some() {
109 self.options.time_format = format;
110 }
111 self
112 }
113
114 pub fn with_datetime_format(mut self, format: Option<String>) -> Self {
116 if format.is_some() {
117 self.options.datetime_format = format;
118 }
119 self
120 }
121
122 pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {
124 if scientific.is_some() {
125 self.options.float_scientific = scientific;
126 }
127 self
128 }
129
130 pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {
132 if precision.is_some() {
133 self.options.float_precision = precision;
134 }
135 self
136 }
137
138 pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
140 self.options.decimal_comma = decimal_comma;
141 self
142 }
143
144 pub fn with_quote_char(mut self, char: u8) -> Self {
146 self.options.quote_char = char;
147 self
148 }
149
150 pub fn with_null_value(mut self, null_value: String) -> Self {
152 self.options.null = null_value;
153 self
154 }
155
156 pub fn with_line_terminator(mut self, line_terminator: String) -> Self {
158 self.options.line_terminator = line_terminator;
159 self
160 }
161
162 pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
165 self.options.quote_style = quote_style;
166 self
167 }
168
169 pub fn n_threads(mut self, n_threads: usize) -> Self {
170 self.n_threads = n_threads;
171 self
172 }
173
174 pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
175 let expects_bom = self.bom;
176 let expects_header = self.header;
177 Ok(BatchedWriter {
178 writer: self,
179 has_written_bom: !expects_bom,
180 has_written_header: !expects_header,
181 schema: schema.clone(),
182 })
183 }
184}
185
186pub struct BatchedWriter<W: Write> {
187 writer: CsvWriter<W>,
188 has_written_bom: bool,
189 has_written_header: bool,
190 schema: Schema,
191}
192
193impl<W: Write> BatchedWriter<W> {
194 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
199 if !self.has_written_bom {
200 self.has_written_bom = true;
201 write_bom(&mut self.writer.buffer)?;
202 }
203
204 if !self.has_written_header {
205 self.has_written_header = true;
206 let names = df
207 .get_column_names()
208 .into_iter()
209 .map(|x| x.as_str())
210 .collect::<Vec<_>>();
211 write_header(
212 &mut self.writer.buffer,
213 names.as_slice(),
214 &self.writer.options,
215 )?;
216 }
217
218 write(
219 &mut self.writer.buffer,
220 df,
221 self.writer.batch_size.into(),
222 &self.writer.options,
223 self.writer.n_threads,
224 )?;
225 Ok(())
226 }
227
228 pub fn finish(&mut self) -> PolarsResult<()> {
230 if !self.has_written_bom {
231 self.has_written_bom = true;
232 write_bom(&mut self.writer.buffer)?;
233 }
234
235 if !self.has_written_header {
236 self.has_written_header = true;
237 let names = self
238 .schema
239 .iter_names()
240 .map(|x| x.as_str())
241 .collect::<Vec<_>>();
242 write_header(&mut self.writer.buffer, &names, &self.writer.options)?;
243 };
244
245 Ok(())
246 }
247}