polars_io/cloud/
adaptors.rs

1//! Interface with the object_store crate and define AsyncSeek, AsyncRead.
2
3use std::sync::Arc;
4
5use object_store::ObjectStore;
6use object_store::buffered::BufWriter;
7use object_store::path::Path;
8use polars_error::PolarsResult;
9use polars_utils::file::WriteClose;
10use tokio::io::AsyncWriteExt;
11
12use super::{CloudOptions, object_path_from_str};
13use crate::pl_async::{get_runtime, get_upload_chunk_size};
14
15fn clone_io_err(e: &std::io::Error) -> std::io::Error {
16    std::io::Error::new(e.kind(), e.to_string())
17}
18
19/// Adaptor which wraps the interface of [ObjectStore::BufWriter] exposing a synchronous interface
20/// which implements `std::io::Write`.
21///
22/// This allows it to be used in sync code which would otherwise write to a simple File or byte stream,
23/// such as with `polars::prelude::CsvWriter`.
24///
25/// [ObjectStore::BufWriter]: https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html
26pub struct BlockingCloudWriter {
27    state: std::io::Result<BufWriter>,
28}
29
30impl BlockingCloudWriter {
31    /// Construct a new BlockingCloudWriter, re-using the given `object_store`
32    ///
33    /// Creates a new (current-thread) Tokio runtime
34    /// which bridges the sync writing process with the async ObjectStore multipart uploading.
35    /// TODO: Naming?
36    pub fn new_with_object_store(
37        object_store: Arc<dyn ObjectStore>,
38        path: Path,
39    ) -> PolarsResult<Self> {
40        let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size());
41        Ok(BlockingCloudWriter { state: Ok(writer) })
42    }
43
44    /// Constructs a new BlockingCloudWriter from a path and an optional set of CloudOptions.
45    ///
46    /// Wrapper around `BlockingCloudWriter::new_with_object_store` that is useful if you only have a single write task.
47    /// TODO: Naming?
48    pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
49        if let Some(local_path) = uri.strip_prefix("file://") {
50            // Local paths must be created first, otherwise object store will not write anything.
51            if !matches!(std::fs::exists(local_path), Ok(true)) {
52                panic!(
53                    "[BlockingCloudWriter] Expected local file to be created: {}",
54                    local_path
55                );
56            }
57        }
58
59        let (cloud_location, object_store) =
60            crate::cloud::build_object_store(uri, cloud_options, false).await?;
61        Self::new_with_object_store(
62            object_store.to_dyn_object_store().await,
63            object_path_from_str(&cloud_location.prefix)?,
64        )
65    }
66
67    /// Returns the underlying [`object_store::buffered::BufWriter`]
68    pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
69        // We can't just return self.state:
70        // * cannot move out of type `adaptors::BlockingCloudWriter`, which implements the `Drop` trait
71        std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
72    }
73
74    /// Closes the writer, or returns the existing error if it exists. After this function is called
75    /// the writer is guaranteed to be in an error state.
76    pub fn close(&mut self) -> std::io::Result<()> {
77        match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
78            Ok(_) => {
79                self.state = Err(std::io::Error::other("closed"));
80                Ok(())
81            },
82            Err(e) => Err(e),
83        }
84    }
85
86    fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
87    where
88        F: Fn(&mut BufWriter) -> std::io::Result<O>,
89    {
90        let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
91        match func(writer) {
92            Ok(v) => Ok(v),
93            Err(e) => {
94                self.state = Err(clone_io_err(&e));
95                Err(e)
96            },
97        }
98    }
99}
100
101impl std::io::Write for BlockingCloudWriter {
102    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
103        // SAFETY:
104        // We extend the lifetime for the duration of this function. This is safe as we block the
105        // async runtime here
106        let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
107
108        self.try_with_writer(|writer| {
109            get_runtime()
110                .block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
111        })
112    }
113
114    fn flush(&mut self) -> std::io::Result<()> {
115        self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
116    }
117}
118
119impl WriteClose for BlockingCloudWriter {
120    fn close(mut self: Box<Self>) -> std::io::Result<()> {
121        BlockingCloudWriter::close(self.as_mut())
122    }
123}
124
125impl Drop for BlockingCloudWriter {
126    fn drop(&mut self) {
127        if self.state.is_err() {
128            return;
129        }
130
131        // Note: We should not hit here - the writer should instead be explicitly closed.
132        // But we still have this here as a safety measure to prevent silently dropping errors.
133        match self.close() {
134            Ok(()) => {},
135            e @ Err(_) => {
136                if std::thread::panicking() {
137                    eprintln!("ERROR: CloudWriter errored on close: {:?}", e)
138                } else {
139                    e.unwrap()
140                }
141            },
142        }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148
149    use polars_core::df;
150    use polars_core::prelude::DataFrame;
151
152    fn example_dataframe() -> DataFrame {
153        df!(
154            "foo" => &[1, 2, 3],
155            "bar" => &[None, Some("bak"), Some("baz")],
156        )
157        .unwrap()
158    }
159
160    #[test]
161    #[cfg(feature = "csv")]
162    fn csv_to_local_objectstore_cloudwriter() {
163        use super::*;
164        use crate::csv::write::CsvWriter;
165        use crate::prelude::SerWriter;
166
167        let mut df = example_dataframe();
168
169        let object_store: Arc<dyn ObjectStore> = Arc::new(
170            object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
171                .expect("Could not initialize connection"),
172        );
173
174        let path: object_store::path::Path = "cloud_writer_example.csv".into();
175
176        let mut cloud_writer =
177            BlockingCloudWriter::new_with_object_store(object_store, path).unwrap();
178        CsvWriter::new(&mut cloud_writer)
179            .finish(&mut df)
180            .expect("Could not write DataFrame as CSV to remote location");
181    }
182
183    // Skip this tests on Windows since it does not have a convenient /tmp/ location.
184    #[cfg_attr(target_os = "windows", ignore)]
185    #[cfg(feature = "csv")]
186    #[test]
187    fn cloudwriter_from_cloudlocation_test() {
188        use super::*;
189        use crate::SerReader;
190        use crate::csv::write::CsvWriter;
191        use crate::prelude::{CsvReadOptions, SerWriter};
192
193        let mut df = example_dataframe();
194
195        let path = "/tmp/cloud_writer_example2.csv";
196
197        std::fs::File::create(path).unwrap();
198
199        let mut cloud_writer = get_runtime()
200            .block_on(BlockingCloudWriter::new(
201                format!("file://{}", path).as_str(),
202                None,
203            ))
204            .unwrap();
205
206        CsvWriter::new(&mut cloud_writer)
207            .finish(&mut df)
208            .expect("Could not write DataFrame as CSV to remote location");
209
210        cloud_writer.close().unwrap();
211
212        assert_eq!(
213            CsvReadOptions::default()
214                .try_into_reader_with_file_path(Some(path.into()))
215                .unwrap()
216                .finish()
217                .unwrap(),
218            df
219        );
220    }
221}