polars_io/cloud/
adaptors.rs1use std::sync::Arc;
4
5use object_store::ObjectStore;
6use object_store::buffered::BufWriter;
7use object_store::path::Path;
8use polars_error::PolarsResult;
9use polars_utils::plpath::PlPathRef;
10use tokio::io::AsyncWriteExt;
11
12use super::{CloudOptions, object_path_from_str};
13use crate::pl_async::{get_runtime, get_upload_chunk_size};
14use crate::utils::file::WriteableTrait;
15
16fn clone_io_err(e: &std::io::Error) -> std::io::Error {
17 std::io::Error::new(e.kind(), e.to_string())
18}
19
20pub struct BlockingCloudWriter {
28 state: std::io::Result<BufWriter>,
29}
30
31impl BlockingCloudWriter {
32 pub fn new_with_object_store(
38 object_store: Arc<dyn ObjectStore>,
39 path: Path,
40 ) -> PolarsResult<Self> {
41 let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size());
42 Ok(BlockingCloudWriter { state: Ok(writer) })
43 }
44
45 pub async fn new(
50 uri: PlPathRef<'_>,
51 cloud_options: Option<&CloudOptions>,
52 ) -> PolarsResult<Self> {
53 let (cloud_location, object_store) =
54 crate::cloud::build_object_store(uri, cloud_options, false).await?;
55 Self::new_with_object_store(
56 object_store.to_dyn_object_store().await,
57 object_path_from_str(&cloud_location.prefix)?,
58 )
59 }
60
61 pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
63 std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
66 }
67
68 pub fn close(&mut self) -> std::io::Result<()> {
71 match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
72 Ok(_) => {
73 self.state = Err(std::io::Error::other("closed"));
74 Ok(())
75 },
76 Err(e) => Err(e),
77 }
78 }
79
80 fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
81 where
82 F: Fn(&mut BufWriter) -> std::io::Result<O>,
83 {
84 let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
85 match func(writer) {
86 Ok(v) => Ok(v),
87 Err(e) => {
88 self.state = Err(clone_io_err(&e));
89 Err(e)
90 },
91 }
92 }
93}
94
95impl std::io::Write for BlockingCloudWriter {
96 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
97 let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
101
102 self.try_with_writer(|writer| {
103 get_runtime()
104 .block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
105 })
106 }
107
108 fn flush(&mut self) -> std::io::Result<()> {
109 self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
110 }
111}
112
113impl WriteableTrait for BlockingCloudWriter {
114 fn close(&mut self) -> std::io::Result<()> {
115 BlockingCloudWriter::close(self)
116 }
117
118 fn sync_all(&self) -> std::io::Result<()> {
119 Ok(())
120 }
121
122 fn sync_data(&self) -> std::io::Result<()> {
123 Ok(())
124 }
125}
126
127impl Drop for BlockingCloudWriter {
128 fn drop(&mut self) {
129 if self.state.is_err() {
130 return;
131 }
132
133 match self.close() {
136 Ok(()) => {},
137 e @ Err(_) => {
138 if std::thread::panicking() {
139 eprintln!("ERROR: CloudWriter errored on close: {e:?}")
140 } else {
141 e.unwrap()
142 }
143 },
144 }
145 }
146}
147
148#[cfg(test)]
149mod tests {
150
151 use polars_core::df;
152 use polars_core::prelude::DataFrame;
153
154 fn example_dataframe() -> DataFrame {
155 df!(
156 "foo" => &[1, 2, 3],
157 "bar" => &[None, Some("bak"), Some("baz")],
158 )
159 .unwrap()
160 }
161
162 #[test]
163 #[cfg(feature = "csv")]
164 fn csv_to_local_objectstore_cloudwriter() {
165 use super::*;
166 use crate::csv::write::CsvWriter;
167 use crate::prelude::SerWriter;
168
169 let mut df = example_dataframe();
170
171 let object_store: Arc<dyn ObjectStore> = Arc::new(
172 object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
173 .expect("Could not initialize connection"),
174 );
175
176 let path: object_store::path::Path = "cloud_writer_example.csv".into();
177
178 let mut cloud_writer =
179 BlockingCloudWriter::new_with_object_store(object_store, path).unwrap();
180 CsvWriter::new(&mut cloud_writer)
181 .finish(&mut df)
182 .expect("Could not write DataFrame as CSV to remote location");
183 }
184
185 #[cfg_attr(target_os = "windows", ignore)]
187 #[cfg(feature = "csv")]
188 #[test]
189 fn cloudwriter_from_cloudlocation_test() {
190 use super::*;
191 use crate::SerReader;
192 use crate::csv::write::CsvWriter;
193 use crate::prelude::{CsvReadOptions, SerWriter};
194
195 let mut df = example_dataframe();
196
197 let path = "/tmp/cloud_writer_example2.csv";
198
199 std::fs::File::create(path).unwrap();
200
201 let mut cloud_writer = get_runtime()
202 .block_on(BlockingCloudWriter::new(
203 PlPathRef::new(&format!("file://{path}")),
204 None,
205 ))
206 .unwrap();
207
208 CsvWriter::new(&mut cloud_writer)
209 .finish(&mut df)
210 .expect("Could not write DataFrame as CSV to remote location");
211
212 cloud_writer.close().unwrap();
213
214 assert_eq!(
215 CsvReadOptions::default()
216 .try_into_reader_with_file_path(Some(path.into()))
217 .unwrap()
218 .finish()
219 .unwrap(),
220 df
221 );
222 }
223}