polars_io/cloud/
adaptors.rs1use std::sync::Arc;
4
5use object_store::ObjectStore;
6use object_store::buffered::BufWriter;
7use object_store::path::Path;
8use polars_error::PolarsResult;
9use polars_utils::plpath::PlPathRef;
10use tokio::io::AsyncWriteExt;
11
12use super::{CloudOptions, object_path_from_str};
13use crate::pl_async::get_runtime;
14use crate::utils::file::WriteableTrait;
15
16fn clone_io_err(e: &std::io::Error) -> std::io::Error {
17 std::io::Error::new(e.kind(), e.to_string())
18}
19
20pub struct BlockingCloudWriter {
28 state: std::io::Result<BufWriter>,
29}
30
31impl BlockingCloudWriter {
32 pub fn new_with_object_store(
38 object_store: Arc<dyn ObjectStore>,
39 path: Path,
40 cloud_upload_chunk_size: usize,
41 ) -> PolarsResult<Self> {
42 let writer = BufWriter::with_capacity(object_store, path, cloud_upload_chunk_size);
43 Ok(BlockingCloudWriter { state: Ok(writer) })
44 }
45
46 pub async fn new(
51 uri: PlPathRef<'_>,
52 cloud_options: Option<&CloudOptions>,
53 cloud_upload_chunk_size: usize,
54 ) -> PolarsResult<Self> {
55 let (cloud_location, object_store) =
56 crate::cloud::build_object_store(uri, cloud_options, false).await?;
57 Self::new_with_object_store(
58 object_store.to_dyn_object_store().await,
59 object_path_from_str(&cloud_location.prefix)?,
60 cloud_upload_chunk_size,
61 )
62 }
63
64 pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
66 std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
69 }
70
71 pub fn close(&mut self) -> std::io::Result<()> {
74 match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
75 Ok(_) => {
76 self.state = Err(std::io::Error::other("closed"));
77 Ok(())
78 },
79 Err(e) => Err(e),
80 }
81 }
82
83 fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
84 where
85 F: Fn(&mut BufWriter) -> std::io::Result<O>,
86 {
87 let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
88 match func(writer) {
89 Ok(v) => Ok(v),
90 Err(e) => {
91 self.state = Err(clone_io_err(&e));
92 Err(e)
93 },
94 }
95 }
96}
97
98impl std::io::Write for BlockingCloudWriter {
99 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
100 let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
104
105 self.try_with_writer(|writer| {
106 get_runtime()
107 .block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
108 })
109 }
110
111 fn flush(&mut self) -> std::io::Result<()> {
112 self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
113 }
114}
115
116impl WriteableTrait for BlockingCloudWriter {
117 fn close(&mut self) -> std::io::Result<()> {
118 BlockingCloudWriter::close(self)
119 }
120
121 fn sync_all(&self) -> std::io::Result<()> {
122 Ok(())
123 }
124
125 fn sync_data(&self) -> std::io::Result<()> {
126 Ok(())
127 }
128}
129
130impl Drop for BlockingCloudWriter {
131 fn drop(&mut self) {
132 if self.state.is_err() {
133 return;
134 }
135
136 match self.close() {
139 Ok(()) => {},
140 e @ Err(_) => {
141 if std::thread::panicking() {
142 eprintln!("ERROR: CloudWriter errored on close: {e:?}")
143 } else {
144 e.unwrap()
145 }
146 },
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153
154 use polars_core::df;
155 use polars_core::prelude::DataFrame;
156
157 use crate::get_upload_chunk_size;
158
159 fn example_dataframe() -> DataFrame {
160 df!(
161 "foo" => &[1, 2, 3],
162 "bar" => &[None, Some("bak"), Some("baz")],
163 )
164 .unwrap()
165 }
166
167 #[test]
168 #[cfg(feature = "csv")]
169 fn csv_to_local_objectstore_cloudwriter() {
170 use super::*;
171 use crate::csv::write::CsvWriter;
172 use crate::prelude::SerWriter;
173
174 let mut df = example_dataframe();
175
176 let object_store: Arc<dyn ObjectStore> = Arc::new(
177 object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
178 .expect("Could not initialize connection"),
179 );
180
181 let path: object_store::path::Path = "cloud_writer_example.csv".into();
182
183 let mut cloud_writer =
184 BlockingCloudWriter::new_with_object_store(object_store, path, get_upload_chunk_size())
185 .unwrap();
186 CsvWriter::new(&mut cloud_writer)
187 .finish(&mut df)
188 .expect("Could not write DataFrame as CSV to remote location");
189 }
190
191 #[cfg_attr(target_os = "windows", ignore)]
193 #[cfg(feature = "csv")]
194 #[test]
195 fn cloudwriter_from_cloudlocation_test() {
196 use super::*;
197 use crate::SerReader;
198 use crate::csv::write::CsvWriter;
199 use crate::prelude::{CsvReadOptions, SerWriter};
200
201 let mut df = example_dataframe();
202
203 let path = "/tmp/cloud_writer_example2.csv";
204
205 std::fs::File::create(path).unwrap();
206
207 let mut cloud_writer = get_runtime()
208 .block_on(BlockingCloudWriter::new(
209 PlPathRef::new(&format!("file://{path}")),
210 None,
211 get_upload_chunk_size(),
212 ))
213 .unwrap();
214
215 CsvWriter::new(&mut cloud_writer)
216 .finish(&mut df)
217 .expect("Could not write DataFrame as CSV to remote location");
218
219 cloud_writer.close().unwrap();
220
221 assert_eq!(
222 CsvReadOptions::default()
223 .try_into_reader_with_file_path(Some(path.into()))
224 .unwrap()
225 .finish()
226 .unwrap(),
227 df
228 );
229 }
230}