polars_io/cloud/
adaptors.rs1use std::sync::Arc;
4
5use object_store::ObjectStore;
6use object_store::buffered::BufWriter;
7use object_store::path::Path;
8use polars_error::PolarsResult;
9use polars_utils::file::WriteClose;
10use tokio::io::AsyncWriteExt;
11
12use super::{CloudOptions, object_path_from_str};
13use crate::pl_async::{get_runtime, get_upload_chunk_size};
14
15fn clone_io_err(e: &std::io::Error) -> std::io::Error {
16 std::io::Error::new(e.kind(), e.to_string())
17}
18
19pub struct BlockingCloudWriter {
27 state: std::io::Result<BufWriter>,
28}
29
30impl BlockingCloudWriter {
31 pub fn new_with_object_store(
37 object_store: Arc<dyn ObjectStore>,
38 path: Path,
39 ) -> PolarsResult<Self> {
40 let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size());
41 Ok(BlockingCloudWriter { state: Ok(writer) })
42 }
43
44 pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
49 if let Some(local_path) = uri.strip_prefix("file://") {
50 if !matches!(std::fs::exists(local_path), Ok(true)) {
52 panic!("[BlockingCloudWriter] Expected local file to be created: {local_path}");
53 }
54 }
55
56 let (cloud_location, object_store) =
57 crate::cloud::build_object_store(uri, cloud_options, false).await?;
58 Self::new_with_object_store(
59 object_store.to_dyn_object_store().await,
60 object_path_from_str(&cloud_location.prefix)?,
61 )
62 }
63
64 pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
66 std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
69 }
70
71 pub fn close(&mut self) -> std::io::Result<()> {
74 match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
75 Ok(_) => {
76 self.state = Err(std::io::Error::other("closed"));
77 Ok(())
78 },
79 Err(e) => Err(e),
80 }
81 }
82
83 fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
84 where
85 F: Fn(&mut BufWriter) -> std::io::Result<O>,
86 {
87 let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
88 match func(writer) {
89 Ok(v) => Ok(v),
90 Err(e) => {
91 self.state = Err(clone_io_err(&e));
92 Err(e)
93 },
94 }
95 }
96}
97
98impl std::io::Write for BlockingCloudWriter {
99 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
100 let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
104
105 self.try_with_writer(|writer| {
106 get_runtime()
107 .block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
108 })
109 }
110
111 fn flush(&mut self) -> std::io::Result<()> {
112 self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
113 }
114}
115
116impl WriteClose for BlockingCloudWriter {
117 fn close(mut self: Box<Self>) -> std::io::Result<()> {
118 BlockingCloudWriter::close(self.as_mut())
119 }
120}
121
122impl Drop for BlockingCloudWriter {
123 fn drop(&mut self) {
124 if self.state.is_err() {
125 return;
126 }
127
128 match self.close() {
131 Ok(()) => {},
132 e @ Err(_) => {
133 if std::thread::panicking() {
134 eprintln!("ERROR: CloudWriter errored on close: {e:?}")
135 } else {
136 e.unwrap()
137 }
138 },
139 }
140 }
141}
142
143#[cfg(test)]
144mod tests {
145
146 use polars_core::df;
147 use polars_core::prelude::DataFrame;
148
149 fn example_dataframe() -> DataFrame {
150 df!(
151 "foo" => &[1, 2, 3],
152 "bar" => &[None, Some("bak"), Some("baz")],
153 )
154 .unwrap()
155 }
156
157 #[test]
158 #[cfg(feature = "csv")]
159 fn csv_to_local_objectstore_cloudwriter() {
160 use super::*;
161 use crate::csv::write::CsvWriter;
162 use crate::prelude::SerWriter;
163
164 let mut df = example_dataframe();
165
166 let object_store: Arc<dyn ObjectStore> = Arc::new(
167 object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
168 .expect("Could not initialize connection"),
169 );
170
171 let path: object_store::path::Path = "cloud_writer_example.csv".into();
172
173 let mut cloud_writer =
174 BlockingCloudWriter::new_with_object_store(object_store, path).unwrap();
175 CsvWriter::new(&mut cloud_writer)
176 .finish(&mut df)
177 .expect("Could not write DataFrame as CSV to remote location");
178 }
179
180 #[cfg_attr(target_os = "windows", ignore)]
182 #[cfg(feature = "csv")]
183 #[test]
184 fn cloudwriter_from_cloudlocation_test() {
185 use super::*;
186 use crate::SerReader;
187 use crate::csv::write::CsvWriter;
188 use crate::prelude::{CsvReadOptions, SerWriter};
189
190 let mut df = example_dataframe();
191
192 let path = "/tmp/cloud_writer_example2.csv";
193
194 std::fs::File::create(path).unwrap();
195
196 let mut cloud_writer = get_runtime()
197 .block_on(BlockingCloudWriter::new(
198 format!("file://{path}").as_str(),
199 None,
200 ))
201 .unwrap();
202
203 CsvWriter::new(&mut cloud_writer)
204 .finish(&mut df)
205 .expect("Could not write DataFrame as CSV to remote location");
206
207 cloud_writer.close().unwrap();
208
209 assert_eq!(
210 CsvReadOptions::default()
211 .try_into_reader_with_file_path(Some(path.into()))
212 .unwrap()
213 .finish()
214 .unwrap(),
215 df
216 );
217 }
218}