polars_io/utils/
file.rs

1use std::io;
2use std::ops::{Deref, DerefMut};
3
4#[cfg(feature = "cloud")]
5pub use async_writeable::AsyncWriteable;
6use polars_core::config;
7use polars_error::{PolarsError, PolarsResult, feature_gated};
8use polars_utils::create_file;
9use polars_utils::file::{ClosableFile, WriteClose};
10use polars_utils::mmap::ensure_not_mapped;
11use polars_utils::plpath::PlPathRef;
12
13use super::sync_on_close::SyncOnCloseType;
14use crate::cloud::CloudOptions;
15use crate::resolve_homedir;
16
17pub trait DynWriteable: io::Write + Send {
18    // Needed because trait upcasting is only stable in 1.86.
19    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static);
20    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static);
21
22    fn close(self: Box<Self>) -> io::Result<()>;
23    fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()>;
24}
25
26impl DynWriteable for ClosableFile {
27    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
28        self as _
29    }
30    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
31        self as _
32    }
33    fn close(self: Box<Self>) -> io::Result<()> {
34        ClosableFile::close(*self)
35    }
36    fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()> {
37        super::sync_on_close::sync_on_close(sync_on_close, self.as_mut())
38    }
39}
40
41/// Holds a non-async writeable file, abstracted over local files or cloud files.
42///
43/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
44///
45/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
46#[allow(clippy::large_enum_variant)] // It will be boxed
47pub enum Writeable {
48    /// An abstract implementation for writable.
49    ///
50    /// This is used to implement writing to in-memory and arbitrary file descriptors.
51    Dyn(Box<dyn DynWriteable>),
52    Local(std::fs::File),
53    #[cfg(feature = "cloud")]
54    Cloud(crate::cloud::BlockingCloudWriter),
55}
56
57impl Writeable {
58    pub fn try_new(
59        path: PlPathRef<'_>,
60        #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
61    ) -> PolarsResult<Self> {
62        let verbose = config::verbose();
63
64        match path {
65            PlPathRef::Cloud(_) => {
66                feature_gated!("cloud", {
67                    use crate::cloud::BlockingCloudWriter;
68
69                    if verbose {
70                        eprintln!("Writeable: try_new: cloud: {}", path.to_str())
71                    }
72
73                    let writer = crate::pl_async::get_runtime()
74                        .block_in_place_on(BlockingCloudWriter::new(path, cloud_options))?;
75                    Ok(Self::Cloud(writer))
76                })
77            },
78            PlPathRef::Local(path) if config::force_async() => {
79                feature_gated!("cloud", {
80                    use crate::cloud::BlockingCloudWriter;
81
82                    let path = resolve_homedir(&path);
83
84                    if verbose {
85                        eprintln!("Writeable: try_new: forced async: {}", path.display())
86                    }
87
88                    create_file(&path)?;
89                    let path = std::fs::canonicalize(&path)?;
90
91                    ensure_not_mapped(&path.metadata()?)?;
92
93                    let path = format!(
94                        "file://{}",
95                        if cfg!(target_family = "windows") {
96                            path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
97                        } else {
98                            path.to_str().unwrap()
99                        }
100                    );
101
102                    if verbose {
103                        eprintln!("Writeable: try_new: forced async converted path: {path}")
104                    }
105
106                    let writer = crate::pl_async::get_runtime().block_in_place_on(
107                        BlockingCloudWriter::new(PlPathRef::new(&path), cloud_options),
108                    )?;
109                    Ok(Self::Cloud(writer))
110                })
111            },
112            PlPathRef::Local(path) => {
113                let path = resolve_homedir(&path);
114                create_file(&path)?;
115
116                // Note: `canonicalize` does not work on some systems.
117
118                if verbose {
119                    eprintln!(
120                        "Writeable: try_new: local: {} (canonicalize: {:?})",
121                        path.display(),
122                        std::fs::canonicalize(&path)
123                    )
124                }
125
126                Ok(Self::Local(polars_utils::open_file_write(&path)?))
127            },
128        }
129    }
130
131    /// This returns `Result<>` - if a write was performed before calling this,
132    /// `CloudWriter` can be in an Err(_) state.
133    #[cfg(feature = "cloud")]
134    pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
135        use self::async_writeable::AsyncDynWriteable;
136
137        match self {
138            Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
139            Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
140            // Moves the `BufWriter` out of the `BlockingCloudWriter` wrapper, as
141            // `BlockingCloudWriter` has a `Drop` impl that we don't want.
142            Self::Cloud(v) => v
143                .try_into_inner()
144                .map(AsyncWriteable::Cloud)
145                .map_err(PolarsError::from),
146        }
147    }
148
149    pub fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> std::io::Result<()> {
150        match self {
151            Writeable::Dyn(d) => d.sync_on_close(sync_on_close),
152            Writeable::Local(file) => {
153                crate::utils::sync_on_close::sync_on_close(sync_on_close, file)
154            },
155            #[cfg(feature = "cloud")]
156            Writeable::Cloud(_) => Ok(()),
157        }
158    }
159
160    pub fn close(self) -> std::io::Result<()> {
161        match self {
162            Self::Dyn(v) => v.close(),
163            Self::Local(v) => ClosableFile::from(v).close(),
164            #[cfg(feature = "cloud")]
165            Self::Cloud(mut v) => v.close(),
166        }
167    }
168}
169
170impl Deref for Writeable {
171    type Target = dyn io::Write + Send;
172
173    fn deref(&self) -> &Self::Target {
174        match self {
175            Self::Dyn(v) => v.as_dyn_write(),
176            Self::Local(v) => v,
177            #[cfg(feature = "cloud")]
178            Self::Cloud(v) => v,
179        }
180    }
181}
182
183impl DerefMut for Writeable {
184    fn deref_mut(&mut self) -> &mut Self::Target {
185        match self {
186            Self::Dyn(v) => v.as_mut_dyn_write(),
187            Self::Local(v) => v,
188            #[cfg(feature = "cloud")]
189            Self::Cloud(v) => v,
190        }
191    }
192}
193
194/// Note: Prefer using [`Writeable`] / [`Writeable::try_new`] where possible.
195///
196/// Open a path for writing. Supports cloud paths.
197pub fn try_get_writeable(
198    addr: PlPathRef<'_>,
199    cloud_options: Option<&CloudOptions>,
200) -> PolarsResult<Box<dyn WriteClose + Send>> {
201    Writeable::try_new(addr, cloud_options).map(|x| match x {
202        Writeable::Dyn(_) => unreachable!(),
203        Writeable::Local(v) => Box::new(ClosableFile::from(v)) as Box<dyn WriteClose + Send>,
204        #[cfg(feature = "cloud")]
205        Writeable::Cloud(v) => Box::new(v) as Box<dyn WriteClose + Send>,
206    })
207}
208
209#[cfg(feature = "cloud")]
210mod async_writeable {
211    use std::io;
212    use std::ops::{Deref, DerefMut};
213    use std::pin::Pin;
214    use std::task::{Context, Poll};
215
216    use polars_error::{PolarsError, PolarsResult};
217    use polars_utils::file::ClosableFile;
218    use polars_utils::plpath::PlPathRef;
219    use tokio::io::AsyncWriteExt;
220    use tokio::task;
221
222    use super::{DynWriteable, Writeable};
223    use crate::cloud::CloudOptions;
224    use crate::utils::sync_on_close::SyncOnCloseType;
225
226    /// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
227    pub struct AsyncDynWriteable(pub Box<dyn DynWriteable>);
228
229    impl tokio::io::AsyncWrite for AsyncDynWriteable {
230        fn poll_write(
231            self: Pin<&mut Self>,
232            _cx: &mut Context<'_>,
233            buf: &[u8],
234        ) -> Poll<io::Result<usize>> {
235            let result = task::block_in_place(|| self.get_mut().0.write(buf));
236            Poll::Ready(result)
237        }
238
239        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
240            let result = task::block_in_place(|| self.get_mut().0.flush());
241            Poll::Ready(result)
242        }
243
244        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
245            self.poll_flush(cx)
246        }
247    }
248
249    /// Holds an async writeable file, abstracted over local files or cloud files.
250    ///
251    /// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
252    ///
253    /// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
254    /// You should instead call the [`AsyncWriteable::close`] at the end.
255    pub enum AsyncWriteable {
256        Dyn(AsyncDynWriteable),
257        Local(tokio::fs::File),
258        Cloud(object_store::buffered::BufWriter),
259    }
260
261    impl AsyncWriteable {
262        pub async fn try_new(
263            addr: PlPathRef<'_>,
264            cloud_options: Option<&CloudOptions>,
265        ) -> PolarsResult<Self> {
266            // TODO: Native async impl
267            Writeable::try_new(addr, cloud_options).and_then(|x| x.try_into_async_writeable())
268        }
269
270        pub async fn sync_on_close(
271            &mut self,
272            sync_on_close: SyncOnCloseType,
273        ) -> std::io::Result<()> {
274            match self {
275                Self::Dyn(d) => task::block_in_place(|| d.0.sync_on_close(sync_on_close)),
276                Self::Local(file) => {
277                    crate::utils::sync_on_close::tokio_sync_on_close(sync_on_close, file).await
278                },
279                Self::Cloud(_) => Ok(()),
280            }
281        }
282
283        pub async fn close(self) -> PolarsResult<()> {
284            match self {
285                Self::Dyn(mut v) => {
286                    v.shutdown().await.map_err(PolarsError::from)?;
287                    Ok(task::block_in_place(|| v.0.close())?)
288                },
289                Self::Local(v) => async {
290                    let f = v.into_std().await;
291                    ClosableFile::from(f).close()
292                }
293                .await
294                .map_err(PolarsError::from),
295                Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
296            }
297        }
298    }
299
300    impl Deref for AsyncWriteable {
301        type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
302
303        fn deref(&self) -> &Self::Target {
304            match self {
305                Self::Dyn(v) => v,
306                Self::Local(v) => v,
307                Self::Cloud(v) => v,
308            }
309        }
310    }
311
312    impl DerefMut for AsyncWriteable {
313        fn deref_mut(&mut self) -> &mut Self::Target {
314            match self {
315                Self::Dyn(v) => v,
316                Self::Local(v) => v,
317                Self::Cloud(v) => v,
318            }
319        }
320    }
321}