polars_io/cloud/
adaptors.rs

1//! Interface with the object_store crate and define AsyncSeek, AsyncRead.
2
3use std::sync::Arc;
4
5use object_store::ObjectStore;
6use object_store::buffered::BufWriter;
7use object_store::path::Path;
8use polars_error::PolarsResult;
9use polars_utils::file::WriteClose;
10use polars_utils::plpath::PlPathRef;
11use tokio::io::AsyncWriteExt;
12
13use super::{CloudOptions, object_path_from_str};
14use crate::pl_async::{get_runtime, get_upload_chunk_size};
15
16fn clone_io_err(e: &std::io::Error) -> std::io::Error {
17    std::io::Error::new(e.kind(), e.to_string())
18}
19
20/// Adaptor which wraps the interface of [ObjectStore::BufWriter] exposing a synchronous interface
21/// which implements `std::io::Write`.
22///
23/// This allows it to be used in sync code which would otherwise write to a simple File or byte stream,
24/// such as with `polars::prelude::CsvWriter`.
25///
26/// [ObjectStore::BufWriter]: https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html
27pub struct BlockingCloudWriter {
28    state: std::io::Result<BufWriter>,
29}
30
31impl BlockingCloudWriter {
32    /// Construct a new BlockingCloudWriter, re-using the given `object_store`
33    ///
34    /// Creates a new (current-thread) Tokio runtime
35    /// which bridges the sync writing process with the async ObjectStore multipart uploading.
36    /// TODO: Naming?
37    pub fn new_with_object_store(
38        object_store: Arc<dyn ObjectStore>,
39        path: Path,
40    ) -> PolarsResult<Self> {
41        let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size());
42        Ok(BlockingCloudWriter { state: Ok(writer) })
43    }
44
45    /// Constructs a new BlockingCloudWriter from a path and an optional set of CloudOptions.
46    ///
47    /// Wrapper around `BlockingCloudWriter::new_with_object_store` that is useful if you only have a single write task.
48    /// TODO: Naming?
49    pub async fn new(
50        uri: PlPathRef<'_>,
51        cloud_options: Option<&CloudOptions>,
52    ) -> PolarsResult<Self> {
53        let (cloud_location, object_store) =
54            crate::cloud::build_object_store(uri, cloud_options, false).await?;
55        Self::new_with_object_store(
56            object_store.to_dyn_object_store().await,
57            object_path_from_str(&cloud_location.prefix)?,
58        )
59    }
60
61    /// Returns the underlying [`object_store::buffered::BufWriter`]
62    pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
63        // We can't just return self.state:
64        // * cannot move out of type `adaptors::BlockingCloudWriter`, which implements the `Drop` trait
65        std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
66    }
67
68    /// Closes the writer, or returns the existing error if it exists. After this function is called
69    /// the writer is guaranteed to be in an error state.
70    pub fn close(&mut self) -> std::io::Result<()> {
71        match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
72            Ok(_) => {
73                self.state = Err(std::io::Error::other("closed"));
74                Ok(())
75            },
76            Err(e) => Err(e),
77        }
78    }
79
80    fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
81    where
82        F: Fn(&mut BufWriter) -> std::io::Result<O>,
83    {
84        let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
85        match func(writer) {
86            Ok(v) => Ok(v),
87            Err(e) => {
88                self.state = Err(clone_io_err(&e));
89                Err(e)
90            },
91        }
92    }
93}
94
95impl std::io::Write for BlockingCloudWriter {
96    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
97        // SAFETY:
98        // We extend the lifetime for the duration of this function. This is safe as we block the
99        // async runtime here
100        let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
101
102        self.try_with_writer(|writer| {
103            get_runtime()
104                .block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
105        })
106    }
107
108    fn flush(&mut self) -> std::io::Result<()> {
109        self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
110    }
111}
112
113impl WriteClose for BlockingCloudWriter {
114    fn close(mut self: Box<Self>) -> std::io::Result<()> {
115        BlockingCloudWriter::close(self.as_mut())
116    }
117}
118
119impl Drop for BlockingCloudWriter {
120    fn drop(&mut self) {
121        if self.state.is_err() {
122            return;
123        }
124
125        // Note: We should not hit here - the writer should instead be explicitly closed.
126        // But we still have this here as a safety measure to prevent silently dropping errors.
127        match self.close() {
128            Ok(()) => {},
129            e @ Err(_) => {
130                if std::thread::panicking() {
131                    eprintln!("ERROR: CloudWriter errored on close: {e:?}")
132                } else {
133                    e.unwrap()
134                }
135            },
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142
143    use polars_core::df;
144    use polars_core::prelude::DataFrame;
145
146    fn example_dataframe() -> DataFrame {
147        df!(
148            "foo" => &[1, 2, 3],
149            "bar" => &[None, Some("bak"), Some("baz")],
150        )
151        .unwrap()
152    }
153
154    #[test]
155    #[cfg(feature = "csv")]
156    fn csv_to_local_objectstore_cloudwriter() {
157        use super::*;
158        use crate::csv::write::CsvWriter;
159        use crate::prelude::SerWriter;
160
161        let mut df = example_dataframe();
162
163        let object_store: Arc<dyn ObjectStore> = Arc::new(
164            object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
165                .expect("Could not initialize connection"),
166        );
167
168        let path: object_store::path::Path = "cloud_writer_example.csv".into();
169
170        let mut cloud_writer =
171            BlockingCloudWriter::new_with_object_store(object_store, path).unwrap();
172        CsvWriter::new(&mut cloud_writer)
173            .finish(&mut df)
174            .expect("Could not write DataFrame as CSV to remote location");
175    }
176
177    // Skip this tests on Windows since it does not have a convenient /tmp/ location.
178    #[cfg_attr(target_os = "windows", ignore)]
179    #[cfg(feature = "csv")]
180    #[test]
181    fn cloudwriter_from_cloudlocation_test() {
182        use super::*;
183        use crate::SerReader;
184        use crate::csv::write::CsvWriter;
185        use crate::prelude::{CsvReadOptions, SerWriter};
186
187        let mut df = example_dataframe();
188
189        let path = "/tmp/cloud_writer_example2.csv";
190
191        std::fs::File::create(path).unwrap();
192
193        let mut cloud_writer = get_runtime()
194            .block_on(BlockingCloudWriter::new(
195                PlPathRef::new(&format!("file://{path}")),
196                None,
197            ))
198            .unwrap();
199
200        CsvWriter::new(&mut cloud_writer)
201            .finish(&mut df)
202            .expect("Could not write DataFrame as CSV to remote location");
203
204        cloud_writer.close().unwrap();
205
206        assert_eq!(
207            CsvReadOptions::default()
208                .try_into_reader_with_file_path(Some(path.into()))
209                .unwrap()
210                .finish()
211                .unwrap(),
212            df
213        );
214    }
215}