polars_io/csv/write/
writer.rs

1use std::io::Write;
2use std::num::NonZeroUsize;
3
4use polars_core::POOL;
5use polars_core::frame::DataFrame;
6use polars_core::schema::Schema;
7use polars_error::PolarsResult;
8
9use super::write_impl::{write, write_bom, write_header};
10use super::{QuoteStyle, SerializeOptions};
11use crate::shared::SerWriter;
12
13/// Write a DataFrame to csv.
14///
15/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.
16#[must_use]
17pub struct CsvWriter<W: Write> {
18    /// File or Stream handler
19    buffer: W,
20    options: SerializeOptions,
21    header: bool,
22    bom: bool,
23    batch_size: NonZeroUsize,
24    n_threads: usize,
25}
26
27impl<W> SerWriter<W> for CsvWriter<W>
28where
29    W: Write,
30{
31    fn new(buffer: W) -> Self {
32        // 9f: all nanoseconds
33        let options = SerializeOptions {
34            time_format: Some("%T%.9f".to_string()),
35            ..Default::default()
36        };
37
38        CsvWriter {
39            buffer,
40            options,
41            header: true,
42            bom: false,
43            batch_size: NonZeroUsize::new(1024).unwrap(),
44            n_threads: POOL.current_num_threads(),
45        }
46    }
47
48    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
49        if self.bom {
50            write_bom(&mut self.buffer)?;
51        }
52        let names = df
53            .get_column_names()
54            .into_iter()
55            .map(|x| x.as_str())
56            .collect::<Vec<_>>();
57        if self.header {
58            write_header(&mut self.buffer, names.as_slice(), &self.options)?;
59        }
60        write(
61            &mut self.buffer,
62            df,
63            self.batch_size.into(),
64            &self.options,
65            self.n_threads,
66        )
67    }
68}
69
70impl<W> CsvWriter<W>
71where
72    W: Write,
73{
74    /// Set whether to write UTF-8 BOM.
75    pub fn include_bom(mut self, include_bom: bool) -> Self {
76        self.bom = include_bom;
77        self
78    }
79
80    /// Set whether to write headers.
81    pub fn include_header(mut self, include_header: bool) -> Self {
82        self.header = include_header;
83        self
84    }
85
86    /// Set the CSV file's column separator as a byte character.
87    pub fn with_separator(mut self, separator: u8) -> Self {
88        self.options.separator = separator;
89        self
90    }
91
92    /// Set the batch size to use while writing the CSV.
93    pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
94        self.batch_size = batch_size;
95        self
96    }
97
98    /// Set the CSV file's date format.
99    pub fn with_date_format(mut self, format: Option<String>) -> Self {
100        if format.is_some() {
101            self.options.date_format = format;
102        }
103        self
104    }
105
106    /// Set the CSV file's time format.
107    pub fn with_time_format(mut self, format: Option<String>) -> Self {
108        if format.is_some() {
109            self.options.time_format = format;
110        }
111        self
112    }
113
114    /// Set the CSV file's datetime format.
115    pub fn with_datetime_format(mut self, format: Option<String>) -> Self {
116        if format.is_some() {
117            self.options.datetime_format = format;
118        }
119        self
120    }
121
122    /// Set the CSV file's forced scientific notation for floats.
123    pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {
124        if scientific.is_some() {
125            self.options.float_scientific = scientific;
126        }
127        self
128    }
129
130    /// Set the CSV file's float precision.
131    pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {
132        if precision.is_some() {
133            self.options.float_precision = precision;
134        }
135        self
136    }
137
138    /// Set the CSV decimal separator.
139    pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
140        self.options.decimal_comma = decimal_comma;
141        self
142    }
143
144    /// Set the single byte character used for quoting.
145    pub fn with_quote_char(mut self, char: u8) -> Self {
146        self.options.quote_char = char;
147        self
148    }
149
150    /// Set the CSV file's null value representation.
151    pub fn with_null_value(mut self, null_value: String) -> Self {
152        self.options.null = null_value;
153        self
154    }
155
156    /// Set the CSV file's line terminator.
157    pub fn with_line_terminator(mut self, line_terminator: String) -> Self {
158        self.options.line_terminator = line_terminator;
159        self
160    }
161
162    /// Set the CSV file's quoting behavior.
163    /// See more on [`QuoteStyle`].
164    pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
165        self.options.quote_style = quote_style;
166        self
167    }
168
169    pub fn n_threads(mut self, n_threads: usize) -> Self {
170        self.n_threads = n_threads;
171        self
172    }
173
174    pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
175        let expects_bom = self.bom;
176        let expects_header = self.header;
177        Ok(BatchedWriter {
178            writer: self,
179            has_written_bom: !expects_bom,
180            has_written_header: !expects_header,
181            schema: schema.clone(),
182        })
183    }
184}
185
186pub struct BatchedWriter<W: Write> {
187    writer: CsvWriter<W>,
188    has_written_bom: bool,
189    has_written_header: bool,
190    schema: Schema,
191}
192
193impl<W: Write> BatchedWriter<W> {
194    /// Write a batch to the csv writer.
195    ///
196    /// # Panics
197    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
198    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
199        if !self.has_written_bom {
200            self.has_written_bom = true;
201            write_bom(&mut self.writer.buffer)?;
202        }
203
204        if !self.has_written_header {
205            self.has_written_header = true;
206            let names = df
207                .get_column_names()
208                .into_iter()
209                .map(|x| x.as_str())
210                .collect::<Vec<_>>();
211            write_header(
212                &mut self.writer.buffer,
213                names.as_slice(),
214                &self.writer.options,
215            )?;
216        }
217
218        write(
219            &mut self.writer.buffer,
220            df,
221            self.writer.batch_size.into(),
222            &self.writer.options,
223            self.writer.n_threads,
224        )?;
225        Ok(())
226    }
227
228    /// Writes the header of the csv file if not done already. Returns the total size of the file.
229    pub fn finish(&mut self) -> PolarsResult<()> {
230        if !self.has_written_bom {
231            self.has_written_bom = true;
232            write_bom(&mut self.writer.buffer)?;
233        }
234
235        if !self.has_written_header {
236            self.has_written_header = true;
237            let names = self
238                .schema
239                .iter_names()
240                .map(|x| x.as_str())
241                .collect::<Vec<_>>();
242            write_header(&mut self.writer.buffer, &names, &self.writer.options)?;
243        };
244
245        Ok(())
246    }
247}