polars_io/cloud/cloud_writer/
writer.rs1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use polars_error::PolarsResult;
6
7use crate::cloud::PolarsObjectStore;
8use crate::cloud::cloud_writer::bufferer::BytesBufferer;
9use crate::cloud::cloud_writer::internal_writer::{InternalCloudWriter, InternalCloudWriterState};
10use crate::metrics::{IOMetrics, OptIOMetrics};
11
12pub struct CloudWriter {
13 writer: InternalCloudWriter,
14 bufferer: BytesBufferer,
15}
16
17impl CloudWriter {
18 pub fn new(
19 store: PolarsObjectStore,
20 path: object_store::path::Path,
21 upload_chunk_size: usize,
22 max_concurrency: NonZeroUsize,
23 io_metrics: Option<Arc<IOMetrics>>,
24 ) -> Self {
25 let bufferer = BytesBufferer::new(upload_chunk_size);
26
27 Self {
28 writer: InternalCloudWriter {
29 store,
30 path,
31 max_concurrency,
32 io_metrics: OptIOMetrics(io_metrics),
33 state: InternalCloudWriterState::NotStarted,
34 },
35 bufferer,
36 }
37 }
38
39 pub async fn start(&mut self) -> PolarsResult<()> {
40 self.writer.start().await
41 }
42
43 pub async fn write_all_owned(&mut self, mut bytes: Bytes) -> PolarsResult<()> {
44 while !bytes.is_empty() {
45 self.bufferer.push_owned(&mut bytes);
46
47 if let Some(payload) = self.bufferer.flush_complete_chunk() {
48 self.writer.put(payload).await?;
49 }
50 }
51
52 Ok(())
53 }
54
55 pub(super) fn fill_buffer_from_slice(&mut self, bytes: &mut &[u8]) -> bool {
56 self.bufferer.push_slice(bytes);
57 self.bufferer.has_complete_chunk()
58 }
59
60 pub(super) async fn flush_complete_chunk(&mut self) -> PolarsResult<()> {
61 if let Some(payload) = self.bufferer.flush_complete_chunk() {
62 self.writer.put(payload).await?;
63 }
64
65 Ok(())
66 }
67
68 pub(super) async fn flush(&mut self) -> PolarsResult<()> {
69 if let Some(payload) = self.bufferer.flush() {
70 self.writer.put(payload).await?;
71 }
72
73 assert!(self.bufferer.is_empty());
74
75 Ok(())
76 }
77
78 pub(super) fn has_buffered_bytes(&self) -> bool {
79 !self.bufferer.is_empty()
80 }
81
82 pub async fn finish(&mut self) -> PolarsResult<()> {
83 if let Some(payload) = self.bufferer.flush() {
84 self.writer.put(payload).await?;
85 }
86
87 assert!(self.bufferer.is_empty());
88
89 self.writer.finish().await
90 }
91}