polars_io/csv/write/
writer.rs

1use std::io::Write;
2use std::num::NonZeroUsize;
3
4use polars_core::POOL;
5use polars_core::frame::DataFrame;
6use polars_core::schema::Schema;
7use polars_error::PolarsResult;
8
9use super::write_impl::{write, write_bom, write_header};
10use super::{QuoteStyle, SerializeOptions};
11use crate::shared::SerWriter;
12
13/// Write a DataFrame to csv.
14///
15/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.
16#[must_use]
17pub struct CsvWriter<W: Write> {
18    /// File or Stream handler
19    buffer: W,
20    options: SerializeOptions,
21    header: bool,
22    bom: bool,
23    batch_size: NonZeroUsize,
24    n_threads: usize,
25}
26
27impl<W> SerWriter<W> for CsvWriter<W>
28where
29    W: Write,
30{
31    fn new(buffer: W) -> Self {
32        // 9f: all nanoseconds
33        let options = SerializeOptions {
34            time_format: Some("%T%.9f".to_string()),
35            ..Default::default()
36        };
37
38        CsvWriter {
39            buffer,
40            options,
41            header: true,
42            bom: false,
43            batch_size: NonZeroUsize::new(1024).unwrap(),
44            n_threads: POOL.current_num_threads(),
45        }
46    }
47
48    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
49        if self.bom {
50            write_bom(&mut self.buffer)?;
51        }
52        let names = df
53            .get_column_names()
54            .into_iter()
55            .map(|x| x.as_str())
56            .collect::<Vec<_>>();
57        if self.header {
58            write_header(&mut self.buffer, names.as_slice(), &self.options)?;
59        }
60        write(
61            &mut self.buffer,
62            df,
63            self.batch_size.into(),
64            &self.options,
65            self.n_threads,
66        )
67    }
68}
69
70impl<W> CsvWriter<W>
71where
72    W: Write,
73{
74    /// Set whether to write UTF-8 BOM.
75    pub fn include_bom(mut self, include_bom: bool) -> Self {
76        self.bom = include_bom;
77        self
78    }
79
80    /// Set whether to write headers.
81    pub fn include_header(mut self, include_header: bool) -> Self {
82        self.header = include_header;
83        self
84    }
85
86    /// Set the CSV file's column separator as a byte character.
87    pub fn with_separator(mut self, separator: u8) -> Self {
88        self.options.separator = separator;
89        self
90    }
91
92    /// Set the batch size to use while writing the CSV.
93    pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
94        self.batch_size = batch_size;
95        self
96    }
97
98    /// Set the CSV file's date format.
99    pub fn with_date_format(mut self, format: Option<String>) -> Self {
100        if format.is_some() {
101            self.options.date_format = format;
102        }
103        self
104    }
105
106    /// Set the CSV file's time format.
107    pub fn with_time_format(mut self, format: Option<String>) -> Self {
108        if format.is_some() {
109            self.options.time_format = format;
110        }
111        self
112    }
113
114    /// Set the CSV file's datetime format.
115    pub fn with_datetime_format(mut self, format: Option<String>) -> Self {
116        if format.is_some() {
117            self.options.datetime_format = format;
118        }
119        self
120    }
121
122    /// Set the CSV file's forced scientific notation for floats.
123    pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {
124        if scientific.is_some() {
125            self.options.float_scientific = scientific;
126        }
127        self
128    }
129
130    /// Set the CSV file's float precision.
131    pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {
132        if precision.is_some() {
133            self.options.float_precision = precision;
134        }
135        self
136    }
137
138    /// Set the single byte character used for quoting.
139    pub fn with_quote_char(mut self, char: u8) -> Self {
140        self.options.quote_char = char;
141        self
142    }
143
144    /// Set the CSV file's null value representation.
145    pub fn with_null_value(mut self, null_value: String) -> Self {
146        self.options.null = null_value;
147        self
148    }
149
150    /// Set the CSV file's line terminator.
151    pub fn with_line_terminator(mut self, line_terminator: String) -> Self {
152        self.options.line_terminator = line_terminator;
153        self
154    }
155
156    /// Set the CSV file's quoting behavior.
157    /// See more on [`QuoteStyle`].
158    pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
159        self.options.quote_style = quote_style;
160        self
161    }
162
163    pub fn n_threads(mut self, n_threads: usize) -> Self {
164        self.n_threads = n_threads;
165        self
166    }
167
168    pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
169        let expects_bom = self.bom;
170        let expects_header = self.header;
171        Ok(BatchedWriter {
172            writer: self,
173            has_written_bom: !expects_bom,
174            has_written_header: !expects_header,
175            schema: schema.clone(),
176        })
177    }
178}
179
180pub struct BatchedWriter<W: Write> {
181    writer: CsvWriter<W>,
182    has_written_bom: bool,
183    has_written_header: bool,
184    schema: Schema,
185}
186
187impl<W: Write> BatchedWriter<W> {
188    /// Write a batch to the csv writer.
189    ///
190    /// # Panics
191    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
192    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
193        if !self.has_written_bom {
194            self.has_written_bom = true;
195            write_bom(&mut self.writer.buffer)?;
196        }
197
198        if !self.has_written_header {
199            self.has_written_header = true;
200            let names = df
201                .get_column_names()
202                .into_iter()
203                .map(|x| x.as_str())
204                .collect::<Vec<_>>();
205            write_header(
206                &mut self.writer.buffer,
207                names.as_slice(),
208                &self.writer.options,
209            )?;
210        }
211
212        write(
213            &mut self.writer.buffer,
214            df,
215            self.writer.batch_size.into(),
216            &self.writer.options,
217            self.writer.n_threads,
218        )?;
219        Ok(())
220    }
221
222    /// Writes the header of the csv file if not done already. Returns the total size of the file.
223    pub fn finish(&mut self) -> PolarsResult<()> {
224        if !self.has_written_bom {
225            self.has_written_bom = true;
226            write_bom(&mut self.writer.buffer)?;
227        }
228
229        if !self.has_written_header {
230            self.has_written_header = true;
231            let names = self
232                .schema
233                .iter_names()
234                .map(|x| x.as_str())
235                .collect::<Vec<_>>();
236            write_header(&mut self.writer.buffer, &names, &self.writer.options)?;
237        };
238
239        Ok(())
240    }
241}