polars_io/utils/
file.rs

1use std::io;
2use std::ops::{Deref, DerefMut};
3
4#[cfg(feature = "cloud")]
5pub use async_writeable::AsyncWriteable;
6use polars_core::config;
7use polars_error::{PolarsError, PolarsResult, feature_gated};
8use polars_utils::create_file;
9use polars_utils::file::close_file;
10use polars_utils::mmap::ensure_not_mapped;
11use polars_utils::plpath::PlPathRef;
12
13use super::sync_on_close::SyncOnCloseType;
14use crate::cloud::CloudOptions;
15use crate::resolve_homedir;
16
17pub trait WriteableTrait: std::io::Write {
18    fn close(&mut self) -> std::io::Result<()>;
19    fn sync_all(&self) -> std::io::Result<()>;
20    fn sync_data(&self) -> std::io::Result<()>;
21}
22
23/// Holds a non-async writeable file, abstracted over local files or cloud files.
24///
25/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
26///
27/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
28#[allow(clippy::large_enum_variant)] // It will be boxed
29pub enum Writeable {
30    /// An abstract implementation for writable.
31    ///
32    /// This is used to implement writing to in-memory and arbitrary file descriptors.
33    Dyn(Box<dyn WriteableTrait + Send>),
34    Local(std::fs::File),
35    #[cfg(feature = "cloud")]
36    Cloud(crate::cloud::BlockingCloudWriter),
37}
38
39impl Writeable {
40    pub fn try_new(
41        path: PlPathRef<'_>,
42        #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
43        #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_upload_chunk_size: usize,
44    ) -> PolarsResult<Self> {
45        match path {
46            PlPathRef::Cloud(_) => {
47                feature_gated!("cloud", {
48                    use crate::cloud::BlockingCloudWriter;
49
50                    let writer = crate::pl_async::get_runtime().block_in_place_on(
51                        BlockingCloudWriter::new(path, cloud_options, cloud_upload_chunk_size),
52                    )?;
53
54                    Ok(Self::Cloud(writer))
55                })
56            },
57            PlPathRef::Local(path) if config::force_async() => {
58                feature_gated!("cloud", {
59                    use crate::cloud::BlockingCloudWriter;
60
61                    let path = resolve_homedir(&path);
62
63                    create_file(&path)?;
64                    let path = std::fs::canonicalize(&path)?;
65
66                    ensure_not_mapped(&path.metadata()?)?;
67
68                    let path = format!(
69                        "file://{}",
70                        if cfg!(target_family = "windows") {
71                            path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
72                        } else {
73                            path.to_str().unwrap()
74                        }
75                    );
76
77                    let writer = crate::pl_async::get_runtime().block_in_place_on(
78                        BlockingCloudWriter::new(
79                            PlPathRef::new(&path),
80                            cloud_options,
81                            cloud_upload_chunk_size,
82                        ),
83                    )?;
84
85                    Ok(Self::Cloud(writer))
86                })
87            },
88            PlPathRef::Local(path) => {
89                let path = resolve_homedir(&path);
90                create_file(&path)?;
91
92                Ok(Self::Local(polars_utils::open_file_write(&path)?))
93            },
94        }
95    }
96
97    /// This returns `Result<>` - if a write was performed before calling this,
98    /// `CloudWriter` can be in an Err(_) state.
99    #[cfg(feature = "cloud")]
100    pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
101        use self::async_writeable::AsyncDynWriteable;
102
103        match self {
104            Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
105            Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
106            // Moves the `BufWriter` out of the `BlockingCloudWriter` wrapper, as
107            // `BlockingCloudWriter` has a `Drop` impl that we don't want.
108            Self::Cloud(v) => v
109                .try_into_inner()
110                .map(AsyncWriteable::Cloud)
111                .map_err(PolarsError::from),
112        }
113    }
114
115    pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
116        match self {
117            Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
118            Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
119            #[cfg(feature = "cloud")]
120            Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
121        }
122    }
123
124    pub fn sync_all(&self) -> io::Result<()> {
125        match self {
126            Self::Dyn(v) => v.sync_all(),
127            Self::Local(v) => v.sync_all(),
128            #[cfg(feature = "cloud")]
129            Self::Cloud(v) => v.sync_all(),
130        }
131    }
132
133    pub fn sync_data(&self) -> io::Result<()> {
134        match self {
135            Self::Dyn(v) => v.sync_data(),
136            Self::Local(v) => v.sync_data(),
137            #[cfg(feature = "cloud")]
138            Self::Cloud(v) => v.sync_data(),
139        }
140    }
141
142    pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
143        match sync {
144            SyncOnCloseType::All => self.sync_all()?,
145            SyncOnCloseType::Data => self.sync_data()?,
146            SyncOnCloseType::None => {},
147        }
148
149        match self {
150            Self::Dyn(mut v) => v.close(),
151            Self::Local(v) => close_file(v),
152            #[cfg(feature = "cloud")]
153            Self::Cloud(mut v) => v.close(),
154        }
155    }
156}
157
158impl Deref for Writeable {
159    type Target = dyn io::Write + Send;
160
161    fn deref(&self) -> &Self::Target {
162        match self {
163            Self::Dyn(v) => v,
164            Self::Local(v) => v,
165            #[cfg(feature = "cloud")]
166            Self::Cloud(v) => v,
167        }
168    }
169}
170
171impl DerefMut for Writeable {
172    fn deref_mut(&mut self) -> &mut Self::Target {
173        match self {
174            Self::Dyn(v) => v,
175            Self::Local(v) => v,
176            #[cfg(feature = "cloud")]
177            Self::Cloud(v) => v,
178        }
179    }
180}
181
182/// Avoid BufWriter wrapping on writers that already have internal buffering.
183pub enum BufferedWriteable<'a> {
184    BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
185    Direct(&'a mut (dyn std::io::Write + Send)),
186}
187
188impl<'a> Deref for BufferedWriteable<'a> {
189    type Target = dyn io::Write + Send + 'a;
190
191    fn deref(&self) -> &Self::Target {
192        match self {
193            Self::BufWriter(v) => v as _,
194            Self::Direct(v) => v,
195        }
196    }
197}
198
199impl DerefMut for BufferedWriteable<'_> {
200    fn deref_mut(&mut self) -> &mut Self::Target {
201        match self {
202            Self::BufWriter(v) => v as _,
203            Self::Direct(v) => v,
204        }
205    }
206}
207#[cfg(feature = "cloud")]
208mod async_writeable {
209    use std::io;
210    use std::ops::{Deref, DerefMut};
211    use std::pin::Pin;
212    use std::task::{Context, Poll};
213
214    use polars_error::{PolarsError, PolarsResult};
215    use polars_utils::file::close_file;
216    use polars_utils::plpath::PlPathRef;
217    use tokio::io::AsyncWriteExt;
218    use tokio::task;
219
220    use super::{Writeable, WriteableTrait};
221    use crate::cloud::CloudOptions;
222    use crate::utils::sync_on_close::SyncOnCloseType;
223
224    /// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
225    pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
226
227    impl tokio::io::AsyncWrite for AsyncDynWriteable {
228        fn poll_write(
229            self: Pin<&mut Self>,
230            _cx: &mut Context<'_>,
231            buf: &[u8],
232        ) -> Poll<io::Result<usize>> {
233            let result = task::block_in_place(|| self.get_mut().0.write(buf));
234            Poll::Ready(result)
235        }
236
237        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
238            let result = task::block_in_place(|| self.get_mut().0.flush());
239            Poll::Ready(result)
240        }
241
242        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
243            self.poll_flush(cx)
244        }
245    }
246
247    /// Holds an async writeable file, abstracted over local files or cloud files.
248    ///
249    /// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
250    ///
251    /// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
252    /// You should instead call the [`AsyncWriteable::close`] at the end.
253    pub enum AsyncWriteable {
254        Dyn(AsyncDynWriteable),
255        Local(tokio::fs::File),
256        Cloud(object_store::buffered::BufWriter),
257    }
258
259    impl AsyncWriteable {
260        pub async fn try_new(
261            path: PlPathRef<'_>,
262            cloud_options: Option<&CloudOptions>,
263            cloud_upload_chunk_size: usize,
264        ) -> PolarsResult<Self> {
265            // TODO: Native async impl
266            Writeable::try_new(path, cloud_options, cloud_upload_chunk_size)
267                .and_then(|x| x.try_into_async_writeable())
268        }
269
270        pub async fn sync_all(&mut self) -> io::Result<()> {
271            match self {
272                Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
273                Self::Local(v) => v.sync_all().await,
274                Self::Cloud(_) => Ok(()),
275            }
276        }
277
278        pub async fn sync_data(&mut self) -> io::Result<()> {
279            match self {
280                Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
281                Self::Local(v) => v.sync_data().await,
282                Self::Cloud(_) => Ok(()),
283            }
284        }
285
286        pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
287            match sync {
288                SyncOnCloseType::All => self.sync_all().await?,
289                SyncOnCloseType::Data => self.sync_data().await?,
290                SyncOnCloseType::None => {},
291            }
292
293            match self {
294                Self::Dyn(mut v) => {
295                    v.shutdown().await.map_err(PolarsError::from)?;
296                    Ok(task::block_in_place(|| v.0.close())?)
297                },
298                Self::Local(v) => async {
299                    let f = v.into_std().await;
300                    close_file(f)
301                }
302                .await
303                .map_err(PolarsError::from),
304                Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
305            }
306        }
307    }
308
309    impl Deref for AsyncWriteable {
310        type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
311
312        fn deref(&self) -> &Self::Target {
313            match self {
314                Self::Dyn(v) => v,
315                Self::Local(v) => v,
316                Self::Cloud(v) => v,
317            }
318        }
319    }
320
321    impl DerefMut for AsyncWriteable {
322        fn deref_mut(&mut self) -> &mut Self::Target {
323            match self {
324                Self::Dyn(v) => v,
325                Self::Local(v) => v,
326                Self::Cloud(v) => v,
327            }
328        }
329    }
330}