polars_io/csv/write/
writer.rs1use std::io::Write;
2use std::num::NonZeroUsize;
3
4use polars_core::POOL;
5use polars_core::frame::DataFrame;
6use polars_core::schema::Schema;
7use polars_error::PolarsResult;
8
9use super::write_impl::{write, write_bom, write_header};
10use super::{QuoteStyle, SerializeOptions};
11use crate::shared::SerWriter;
12
13#[must_use]
17pub struct CsvWriter<W: Write> {
18 buffer: W,
20 options: SerializeOptions,
21 header: bool,
22 bom: bool,
23 batch_size: NonZeroUsize,
24 n_threads: usize,
25}
26
27impl<W> SerWriter<W> for CsvWriter<W>
28where
29 W: Write,
30{
31 fn new(buffer: W) -> Self {
32 let options = SerializeOptions {
34 time_format: Some("%T%.9f".to_string()),
35 ..Default::default()
36 };
37
38 CsvWriter {
39 buffer,
40 options,
41 header: true,
42 bom: false,
43 batch_size: NonZeroUsize::new(1024).unwrap(),
44 n_threads: POOL.current_num_threads(),
45 }
46 }
47
48 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
49 if self.bom {
50 write_bom(&mut self.buffer)?;
51 }
52 let names = df
53 .get_column_names()
54 .into_iter()
55 .map(|x| x.as_str())
56 .collect::<Vec<_>>();
57 if self.header {
58 write_header(&mut self.buffer, names.as_slice(), &self.options)?;
59 }
60 write(
61 &mut self.buffer,
62 df,
63 self.batch_size.into(),
64 &self.options,
65 self.n_threads,
66 )
67 }
68}
69
70impl<W> CsvWriter<W>
71where
72 W: Write,
73{
74 pub fn include_bom(mut self, include_bom: bool) -> Self {
76 self.bom = include_bom;
77 self
78 }
79
80 pub fn include_header(mut self, include_header: bool) -> Self {
82 self.header = include_header;
83 self
84 }
85
86 pub fn with_separator(mut self, separator: u8) -> Self {
88 self.options.separator = separator;
89 self
90 }
91
92 pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
94 self.batch_size = batch_size;
95 self
96 }
97
98 pub fn with_date_format(mut self, format: Option<String>) -> Self {
100 if format.is_some() {
101 self.options.date_format = format;
102 }
103 self
104 }
105
106 pub fn with_time_format(mut self, format: Option<String>) -> Self {
108 if format.is_some() {
109 self.options.time_format = format;
110 }
111 self
112 }
113
114 pub fn with_datetime_format(mut self, format: Option<String>) -> Self {
116 if format.is_some() {
117 self.options.datetime_format = format;
118 }
119 self
120 }
121
122 pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {
124 if scientific.is_some() {
125 self.options.float_scientific = scientific;
126 }
127 self
128 }
129
130 pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {
132 if precision.is_some() {
133 self.options.float_precision = precision;
134 }
135 self
136 }
137
138 pub fn with_quote_char(mut self, char: u8) -> Self {
140 self.options.quote_char = char;
141 self
142 }
143
144 pub fn with_null_value(mut self, null_value: String) -> Self {
146 self.options.null = null_value;
147 self
148 }
149
150 pub fn with_line_terminator(mut self, line_terminator: String) -> Self {
152 self.options.line_terminator = line_terminator;
153 self
154 }
155
156 pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
159 self.options.quote_style = quote_style;
160 self
161 }
162
163 pub fn n_threads(mut self, n_threads: usize) -> Self {
164 self.n_threads = n_threads;
165 self
166 }
167
168 pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
169 let expects_bom = self.bom;
170 let expects_header = self.header;
171 Ok(BatchedWriter {
172 writer: self,
173 has_written_bom: !expects_bom,
174 has_written_header: !expects_header,
175 schema: schema.clone(),
176 })
177 }
178}
179
180pub struct BatchedWriter<W: Write> {
181 writer: CsvWriter<W>,
182 has_written_bom: bool,
183 has_written_header: bool,
184 schema: Schema,
185}
186
187impl<W: Write> BatchedWriter<W> {
188 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
193 if !self.has_written_bom {
194 self.has_written_bom = true;
195 write_bom(&mut self.writer.buffer)?;
196 }
197
198 if !self.has_written_header {
199 self.has_written_header = true;
200 let names = df
201 .get_column_names()
202 .into_iter()
203 .map(|x| x.as_str())
204 .collect::<Vec<_>>();
205 write_header(
206 &mut self.writer.buffer,
207 names.as_slice(),
208 &self.writer.options,
209 )?;
210 }
211
212 write(
213 &mut self.writer.buffer,
214 df,
215 self.writer.batch_size.into(),
216 &self.writer.options,
217 self.writer.n_threads,
218 )?;
219 Ok(())
220 }
221
222 pub fn finish(&mut self) -> PolarsResult<()> {
224 if !self.has_written_bom {
225 self.has_written_bom = true;
226 write_bom(&mut self.writer.buffer)?;
227 }
228
229 if !self.has_written_header {
230 self.has_written_header = true;
231 let names = self
232 .schema
233 .iter_names()
234 .map(|x| x.as_str())
235 .collect::<Vec<_>>();
236 write_header(&mut self.writer.buffer, &names, &self.writer.options)?;
237 };
238
239 Ok(())
240 }
241}