polars_io/csv/write/
writer.rs

1use std::io::Write;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4
5use polars_core::POOL;
6use polars_core::frame::DataFrame;
7use polars_core::schema::Schema;
8use polars_error::PolarsResult;
9use polars_utils::pl_str::PlSmallStr;
10
11use super::write_impl::{write, write_bom, write_csv_header};
12use super::{QuoteStyle, SerializeOptions};
13use crate::shared::SerWriter;
14
15/// Write a DataFrame to csv.
16///
17/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.
18#[must_use]
19pub struct CsvWriter<W: Write> {
20    /// File or Stream handler
21    buffer: W,
22    options: Arc<SerializeOptions>,
23    header: bool,
24    bom: bool,
25    batch_size: NonZeroUsize,
26    n_threads: usize,
27}
28
29impl<W> SerWriter<W> for CsvWriter<W>
30where
31    W: Write,
32{
33    fn new(buffer: W) -> Self {
34        let options = SerializeOptions::default();
35
36        CsvWriter {
37            buffer,
38            options: options.into(),
39            header: true,
40            bom: false,
41            batch_size: NonZeroUsize::new(1024).unwrap(),
42            n_threads: POOL.current_num_threads(),
43        }
44    }
45
46    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
47        if self.bom {
48            write_bom(&mut self.buffer)?;
49        }
50        let names = df
51            .get_column_names()
52            .into_iter()
53            .map(|x| x.as_str())
54            .collect::<Vec<_>>();
55        if self.header {
56            write_csv_header(&mut self.buffer, names.as_slice(), &self.options)?;
57        }
58        write(
59            &mut self.buffer,
60            df,
61            self.batch_size.into(),
62            self.options.clone(),
63            self.n_threads,
64        )
65    }
66}
67
68impl<W> CsvWriter<W>
69where
70    W: Write,
71{
72    fn options_mut(&mut self) -> &mut SerializeOptions {
73        Arc::make_mut(&mut self.options)
74    }
75
76    /// Set whether to write UTF-8 BOM.
77    pub fn include_bom(mut self, include_bom: bool) -> Self {
78        self.bom = include_bom;
79        self
80    }
81
82    /// Set whether to write headers.
83    pub fn include_header(mut self, include_header: bool) -> Self {
84        self.header = include_header;
85        self
86    }
87
88    /// Set the CSV file's column separator as a byte character.
89    pub fn with_separator(mut self, separator: u8) -> Self {
90        self.options_mut().separator = separator;
91        self
92    }
93
94    /// Set the batch size to use while writing the CSV.
95    pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
96        self.batch_size = batch_size;
97        self
98    }
99
100    /// Set the CSV file's date format.
101    pub fn with_date_format(mut self, format: Option<PlSmallStr>) -> Self {
102        if format.is_some() {
103            self.options_mut().date_format = format;
104        }
105        self
106    }
107
108    /// Set the CSV file's time format.
109    pub fn with_time_format(mut self, format: Option<PlSmallStr>) -> Self {
110        if format.is_some() {
111            self.options_mut().time_format = format;
112        }
113        self
114    }
115
116    /// Set the CSV file's datetime format.
117    pub fn with_datetime_format(mut self, format: Option<PlSmallStr>) -> Self {
118        if format.is_some() {
119            self.options_mut().datetime_format = format;
120        }
121        self
122    }
123
124    /// Set the CSV file's forced scientific notation for floats.
125    pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {
126        if scientific.is_some() {
127            self.options_mut().float_scientific = scientific;
128        }
129        self
130    }
131
132    /// Set the CSV file's float precision.
133    pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {
134        if precision.is_some() {
135            self.options_mut().float_precision = precision;
136        }
137        self
138    }
139
140    /// Set the CSV decimal separator.
141    pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
142        self.options_mut().decimal_comma = decimal_comma;
143        self
144    }
145
146    /// Set the single byte character used for quoting.
147    pub fn with_quote_char(mut self, char: u8) -> Self {
148        self.options_mut().quote_char = char;
149        self
150    }
151
152    /// Set the CSV file's null value representation.
153    pub fn with_null_value(mut self, null_value: PlSmallStr) -> Self {
154        self.options_mut().null = null_value;
155        self
156    }
157
158    /// Set the CSV file's line terminator.
159    pub fn with_line_terminator(mut self, line_terminator: PlSmallStr) -> Self {
160        self.options_mut().line_terminator = line_terminator;
161        self
162    }
163
164    /// Set the CSV file's quoting behavior.
165    /// See more on [`QuoteStyle`].
166    pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
167        self.options_mut().quote_style = quote_style;
168        self
169    }
170
171    pub fn n_threads(mut self, n_threads: usize) -> Self {
172        self.n_threads = n_threads;
173        self
174    }
175
176    pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
177        let expects_bom = self.bom;
178        let expects_header = self.header;
179        Ok(BatchedWriter {
180            writer: self,
181            has_written_bom: !expects_bom,
182            has_written_header: !expects_header,
183            schema: schema.clone(),
184        })
185    }
186}
187
188pub struct BatchedWriter<W: Write> {
189    writer: CsvWriter<W>,
190    has_written_bom: bool,
191    has_written_header: bool,
192    schema: Schema,
193}
194
195impl<W: Write> BatchedWriter<W> {
196    /// Write a batch to the csv writer.
197    ///
198    /// # Panics
199    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
200    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
201        if !self.has_written_bom {
202            self.has_written_bom = true;
203            write_bom(&mut self.writer.buffer)?;
204        }
205
206        if !self.has_written_header {
207            self.has_written_header = true;
208            let names = df
209                .get_column_names()
210                .into_iter()
211                .map(|x| x.as_str())
212                .collect::<Vec<_>>();
213            write_csv_header(
214                &mut self.writer.buffer,
215                names.as_slice(),
216                &self.writer.options,
217            )?;
218        }
219
220        write(
221            &mut self.writer.buffer,
222            df,
223            self.writer.batch_size.into(),
224            self.writer.options.clone(),
225            self.writer.n_threads,
226        )?;
227        Ok(())
228    }
229
230    /// Writes the header of the csv file if not done already. Returns the total size of the file.
231    pub fn finish(&mut self) -> PolarsResult<()> {
232        if !self.has_written_bom {
233            self.has_written_bom = true;
234            write_bom(&mut self.writer.buffer)?;
235        }
236
237        if !self.has_written_header {
238            self.has_written_header = true;
239            let names = self
240                .schema
241                .iter_names()
242                .map(|x| x.as_str())
243                .collect::<Vec<_>>();
244            write_csv_header(&mut self.writer.buffer, &names, &self.writer.options)?;
245        };
246
247        Ok(())
248    }
249}