polars_io/cloud/
adaptors.rs
1use std::sync::Arc;
4
5use object_store::ObjectStore;
6use object_store::buffered::BufWriter;
7use object_store::path::Path;
8use polars_error::PolarsResult;
9use polars_utils::file::WriteClose;
10use tokio::io::AsyncWriteExt;
11
12use super::{CloudOptions, object_path_from_str};
13use crate::pl_async::{get_runtime, get_upload_chunk_size};
14
15fn clone_io_err(e: &std::io::Error) -> std::io::Error {
16 std::io::Error::new(e.kind(), e.to_string())
17}
18
19pub struct BlockingCloudWriter {
27 state: std::io::Result<BufWriter>,
28}
29
30impl BlockingCloudWriter {
31 pub fn new_with_object_store(
37 object_store: Arc<dyn ObjectStore>,
38 path: Path,
39 ) -> PolarsResult<Self> {
40 let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size());
41 Ok(BlockingCloudWriter { state: Ok(writer) })
42 }
43
44 pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
49 if let Some(local_path) = uri.strip_prefix("file://") {
50 if !matches!(std::fs::exists(local_path), Ok(true)) {
52 panic!(
53 "[BlockingCloudWriter] Expected local file to be created: {}",
54 local_path
55 );
56 }
57 }
58
59 let (cloud_location, object_store) =
60 crate::cloud::build_object_store(uri, cloud_options, false).await?;
61 Self::new_with_object_store(
62 object_store.to_dyn_object_store().await,
63 object_path_from_str(&cloud_location.prefix)?,
64 )
65 }
66
67 pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
69 std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
72 }
73
74 pub fn close(&mut self) -> std::io::Result<()> {
77 match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
78 Ok(_) => {
79 self.state = Err(std::io::Error::other("closed"));
80 Ok(())
81 },
82 Err(e) => Err(e),
83 }
84 }
85
86 fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
87 where
88 F: Fn(&mut BufWriter) -> std::io::Result<O>,
89 {
90 let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
91 match func(writer) {
92 Ok(v) => Ok(v),
93 Err(e) => {
94 self.state = Err(clone_io_err(&e));
95 Err(e)
96 },
97 }
98 }
99}
100
101impl std::io::Write for BlockingCloudWriter {
102 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
103 let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
107
108 self.try_with_writer(|writer| {
109 get_runtime()
110 .block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
111 })
112 }
113
114 fn flush(&mut self) -> std::io::Result<()> {
115 self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
116 }
117}
118
119impl WriteClose for BlockingCloudWriter {
120 fn close(mut self: Box<Self>) -> std::io::Result<()> {
121 BlockingCloudWriter::close(self.as_mut())
122 }
123}
124
125impl Drop for BlockingCloudWriter {
126 fn drop(&mut self) {
127 if self.state.is_err() {
128 return;
129 }
130
131 match self.close() {
134 Ok(()) => {},
135 e @ Err(_) => {
136 if std::thread::panicking() {
137 eprintln!("ERROR: CloudWriter errored on close: {:?}", e)
138 } else {
139 e.unwrap()
140 }
141 },
142 }
143 }
144}
145
146#[cfg(test)]
147mod tests {
148
149 use polars_core::df;
150 use polars_core::prelude::DataFrame;
151
152 fn example_dataframe() -> DataFrame {
153 df!(
154 "foo" => &[1, 2, 3],
155 "bar" => &[None, Some("bak"), Some("baz")],
156 )
157 .unwrap()
158 }
159
160 #[test]
161 #[cfg(feature = "csv")]
162 fn csv_to_local_objectstore_cloudwriter() {
163 use super::*;
164 use crate::csv::write::CsvWriter;
165 use crate::prelude::SerWriter;
166
167 let mut df = example_dataframe();
168
169 let object_store: Arc<dyn ObjectStore> = Arc::new(
170 object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
171 .expect("Could not initialize connection"),
172 );
173
174 let path: object_store::path::Path = "cloud_writer_example.csv".into();
175
176 let mut cloud_writer =
177 BlockingCloudWriter::new_with_object_store(object_store, path).unwrap();
178 CsvWriter::new(&mut cloud_writer)
179 .finish(&mut df)
180 .expect("Could not write DataFrame as CSV to remote location");
181 }
182
183 #[cfg_attr(target_os = "windows", ignore)]
185 #[cfg(feature = "csv")]
186 #[test]
187 fn cloudwriter_from_cloudlocation_test() {
188 use super::*;
189 use crate::SerReader;
190 use crate::csv::write::CsvWriter;
191 use crate::prelude::{CsvReadOptions, SerWriter};
192
193 let mut df = example_dataframe();
194
195 let path = "/tmp/cloud_writer_example2.csv";
196
197 std::fs::File::create(path).unwrap();
198
199 let mut cloud_writer = get_runtime()
200 .block_on(BlockingCloudWriter::new(
201 format!("file://{}", path).as_str(),
202 None,
203 ))
204 .unwrap();
205
206 CsvWriter::new(&mut cloud_writer)
207 .finish(&mut df)
208 .expect("Could not write DataFrame as CSV to remote location");
209
210 cloud_writer.close().unwrap();
211
212 assert_eq!(
213 CsvReadOptions::default()
214 .try_into_reader_with_file_path(Some(path.into()))
215 .unwrap()
216 .finish()
217 .unwrap(),
218 df
219 );
220 }
221}