polars_io/cloud/cloud_writer/
writer.rs

1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use polars_error::PolarsResult;
6
7use crate::cloud::PolarsObjectStore;
8use crate::cloud::cloud_writer::bufferer::BytesBufferer;
9use crate::cloud::cloud_writer::internal_writer::{InternalCloudWriter, InternalCloudWriterState};
10use crate::metrics::{IOMetrics, OptIOMetrics};
11
12pub struct CloudWriter {
13    writer: InternalCloudWriter,
14    bufferer: BytesBufferer,
15}
16
17impl CloudWriter {
18    pub fn new(
19        store: PolarsObjectStore,
20        path: object_store::path::Path,
21        upload_chunk_size: usize,
22        max_concurrency: NonZeroUsize,
23        io_metrics: Option<Arc<IOMetrics>>,
24    ) -> Self {
25        let bufferer = BytesBufferer::new(upload_chunk_size);
26
27        Self {
28            writer: InternalCloudWriter {
29                store,
30                path,
31                max_concurrency,
32                io_metrics: OptIOMetrics(io_metrics),
33                state: InternalCloudWriterState::NotStarted,
34            },
35            bufferer,
36        }
37    }
38
39    pub async fn start(&mut self) -> PolarsResult<()> {
40        self.writer.start().await
41    }
42
43    pub async fn write_all_owned(&mut self, mut bytes: Bytes) -> PolarsResult<()> {
44        while !bytes.is_empty() {
45            self.bufferer.push_owned(&mut bytes);
46
47            if let Some(payload) = self.bufferer.flush_complete_chunk() {
48                self.writer.put(payload).await?;
49            }
50        }
51
52        Ok(())
53    }
54
55    pub(super) fn fill_buffer_from_slice(&mut self, bytes: &mut &[u8]) -> bool {
56        self.bufferer.push_slice(bytes);
57        self.bufferer.has_complete_chunk()
58    }
59
60    pub(super) async fn flush_complete_chunk(&mut self) -> PolarsResult<()> {
61        if let Some(payload) = self.bufferer.flush_complete_chunk() {
62            self.writer.put(payload).await?;
63        }
64
65        Ok(())
66    }
67
68    pub(super) async fn flush(&mut self) -> PolarsResult<()> {
69        if let Some(payload) = self.bufferer.flush() {
70            self.writer.put(payload).await?;
71        }
72
73        assert!(self.bufferer.is_empty());
74
75        Ok(())
76    }
77
78    pub(super) fn has_buffered_bytes(&self) -> bool {
79        !self.bufferer.is_empty()
80    }
81
82    pub async fn finish(&mut self) -> PolarsResult<()> {
83        if let Some(payload) = self.bufferer.flush() {
84            self.writer.put(payload).await?;
85        }
86
87        assert!(self.bufferer.is_empty());
88
89        self.writer.finish().await
90    }
91}