polars_io/utils/
file.rs

1use std::io;
2use std::ops::{Deref, DerefMut};
3use std::path::Path;
4
5#[cfg(feature = "cloud")]
6pub use async_writeable::AsyncWriteable;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, feature_gated};
9use polars_utils::create_file;
10use polars_utils::file::{ClosableFile, WriteClose};
11use polars_utils::mmap::ensure_not_mapped;
12use polars_utils::plpath::{CloudScheme, PlPathRef};
13
14use super::sync_on_close::SyncOnCloseType;
15use crate::cloud::CloudOptions;
16use crate::resolve_homedir;
17
18pub trait DynWriteable: io::Write + Send {
19    // Needed because trait upcasting is only stable in 1.86.
20    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static);
21    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static);
22
23    fn close(self: Box<Self>) -> io::Result<()>;
24    fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()>;
25}
26
27impl DynWriteable for ClosableFile {
28    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
29        self as _
30    }
31    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
32        self as _
33    }
34    fn close(self: Box<Self>) -> io::Result<()> {
35        ClosableFile::close(*self)
36    }
37    fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()> {
38        super::sync_on_close::sync_on_close(sync_on_close, self.as_mut())
39    }
40}
41
42/// Holds a non-async writeable file, abstracted over local files or cloud files.
43///
44/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
45///
46/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
47#[allow(clippy::large_enum_variant)] // It will be boxed
48pub enum Writeable {
49    /// An abstract implementation for writable.
50    ///
51    /// This is used to implement writing to in-memory and arbitrary file descriptors.
52    Dyn(Box<dyn DynWriteable>),
53    Local(std::fs::File),
54    #[cfg(feature = "cloud")]
55    Cloud(crate::cloud::BlockingCloudWriter),
56}
57
58impl Writeable {
59    pub fn try_new(
60        addr: PlPathRef,
61        #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
62    ) -> PolarsResult<Self> {
63        let verbose = config::verbose();
64
65        match addr {
66            PlPathRef::Cloud(p) => {
67                feature_gated!("cloud", {
68                    use crate::cloud::BlockingCloudWriter;
69
70                    if verbose {
71                        eprintln!("Writeable: try_new: cloud: {p}")
72                    }
73
74                    if p.scheme() == CloudScheme::File {
75                        create_file(Path::new(p.strip_scheme()))?;
76                    }
77
78                    let writer = crate::pl_async::get_runtime().block_in_place_on(
79                        BlockingCloudWriter::new(&p.to_string(), cloud_options),
80                    )?;
81                    Ok(Self::Cloud(writer))
82                })
83            },
84            PlPathRef::Local(path) if config::force_async() => {
85                feature_gated!("cloud", {
86                    use crate::cloud::BlockingCloudWriter;
87
88                    let path = resolve_homedir(&path);
89
90                    if verbose {
91                        eprintln!("Writeable: try_new: forced async: {}", path.display())
92                    }
93
94                    create_file(&path)?;
95                    let path = std::fs::canonicalize(&path)?;
96
97                    ensure_not_mapped(&path.metadata()?)?;
98
99                    let path = format!(
100                        "file://{}",
101                        if cfg!(target_family = "windows") {
102                            path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
103                        } else {
104                            path.to_str().unwrap()
105                        }
106                    );
107
108                    if verbose {
109                        eprintln!("Writeable: try_new: forced async converted path: {path}")
110                    }
111
112                    let writer = crate::pl_async::get_runtime()
113                        .block_in_place_on(BlockingCloudWriter::new(&path, cloud_options))?;
114                    Ok(Self::Cloud(writer))
115                })
116            },
117            PlPathRef::Local(path) => {
118                let path = resolve_homedir(&path);
119                create_file(&path)?;
120
121                // Note: `canonicalize` does not work on some systems.
122
123                if verbose {
124                    eprintln!(
125                        "Writeable: try_new: local: {} (canonicalize: {:?})",
126                        path.display(),
127                        std::fs::canonicalize(&path)
128                    )
129                }
130
131                Ok(Self::Local(polars_utils::open_file_write(&path)?))
132            },
133        }
134    }
135
136    /// This returns `Result<>` - if a write was performed before calling this,
137    /// `CloudWriter` can be in an Err(_) state.
138    #[cfg(feature = "cloud")]
139    pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
140        use self::async_writeable::AsyncDynWriteable;
141
142        match self {
143            Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
144            Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
145            // Moves the `BufWriter` out of the `BlockingCloudWriter` wrapper, as
146            // `BlockingCloudWriter` has a `Drop` impl that we don't want.
147            Self::Cloud(v) => v
148                .try_into_inner()
149                .map(AsyncWriteable::Cloud)
150                .map_err(PolarsError::from),
151        }
152    }
153
154    pub fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> std::io::Result<()> {
155        match self {
156            Writeable::Dyn(d) => d.sync_on_close(sync_on_close),
157            Writeable::Local(file) => {
158                crate::utils::sync_on_close::sync_on_close(sync_on_close, file)
159            },
160            #[cfg(feature = "cloud")]
161            Writeable::Cloud(_) => Ok(()),
162        }
163    }
164
165    pub fn close(self) -> std::io::Result<()> {
166        match self {
167            Self::Dyn(v) => v.close(),
168            Self::Local(v) => ClosableFile::from(v).close(),
169            #[cfg(feature = "cloud")]
170            Self::Cloud(mut v) => v.close(),
171        }
172    }
173}
174
175impl Deref for Writeable {
176    type Target = dyn io::Write + Send;
177
178    fn deref(&self) -> &Self::Target {
179        match self {
180            Self::Dyn(v) => v.as_dyn_write(),
181            Self::Local(v) => v,
182            #[cfg(feature = "cloud")]
183            Self::Cloud(v) => v,
184        }
185    }
186}
187
188impl DerefMut for Writeable {
189    fn deref_mut(&mut self) -> &mut Self::Target {
190        match self {
191            Self::Dyn(v) => v.as_mut_dyn_write(),
192            Self::Local(v) => v,
193            #[cfg(feature = "cloud")]
194            Self::Cloud(v) => v,
195        }
196    }
197}
198
199/// Note: Prefer using [`Writeable`] / [`Writeable::try_new`] where possible.
200///
201/// Open a path for writing. Supports cloud paths.
202pub fn try_get_writeable(
203    addr: PlPathRef<'_>,
204    cloud_options: Option<&CloudOptions>,
205) -> PolarsResult<Box<dyn WriteClose + Send>> {
206    Writeable::try_new(addr, cloud_options).map(|x| match x {
207        Writeable::Dyn(_) => unreachable!(),
208        Writeable::Local(v) => Box::new(ClosableFile::from(v)) as Box<dyn WriteClose + Send>,
209        #[cfg(feature = "cloud")]
210        Writeable::Cloud(v) => Box::new(v) as Box<dyn WriteClose + Send>,
211    })
212}
213
214#[cfg(feature = "cloud")]
215mod async_writeable {
216    use std::io;
217    use std::ops::{Deref, DerefMut};
218    use std::pin::Pin;
219    use std::task::{Context, Poll};
220
221    use polars_error::{PolarsError, PolarsResult};
222    use polars_utils::file::ClosableFile;
223    use polars_utils::plpath::PlPathRef;
224    use tokio::io::AsyncWriteExt;
225    use tokio::task;
226
227    use super::{DynWriteable, Writeable};
228    use crate::cloud::CloudOptions;
229    use crate::utils::sync_on_close::SyncOnCloseType;
230
231    /// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
232    pub struct AsyncDynWriteable(pub Box<dyn DynWriteable>);
233
234    impl tokio::io::AsyncWrite for AsyncDynWriteable {
235        fn poll_write(
236            self: Pin<&mut Self>,
237            _cx: &mut Context<'_>,
238            buf: &[u8],
239        ) -> Poll<io::Result<usize>> {
240            let result = task::block_in_place(|| self.get_mut().0.write(buf));
241            Poll::Ready(result)
242        }
243
244        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
245            let result = task::block_in_place(|| self.get_mut().0.flush());
246            Poll::Ready(result)
247        }
248
249        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
250            self.poll_flush(cx)
251        }
252    }
253
254    /// Holds an async writeable file, abstracted over local files or cloud files.
255    ///
256    /// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
257    ///
258    /// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
259    /// You should instead call the [`AsyncWriteable::close`] at the end.
260    pub enum AsyncWriteable {
261        Dyn(AsyncDynWriteable),
262        Local(tokio::fs::File),
263        Cloud(object_store::buffered::BufWriter),
264    }
265
266    impl AsyncWriteable {
267        pub async fn try_new(
268            addr: PlPathRef<'_>,
269            cloud_options: Option<&CloudOptions>,
270        ) -> PolarsResult<Self> {
271            // TODO: Native async impl
272            Writeable::try_new(addr, cloud_options).and_then(|x| x.try_into_async_writeable())
273        }
274
275        pub async fn sync_on_close(
276            &mut self,
277            sync_on_close: SyncOnCloseType,
278        ) -> std::io::Result<()> {
279            match self {
280                Self::Dyn(d) => task::block_in_place(|| d.0.sync_on_close(sync_on_close)),
281                Self::Local(file) => {
282                    crate::utils::sync_on_close::tokio_sync_on_close(sync_on_close, file).await
283                },
284                Self::Cloud(_) => Ok(()),
285            }
286        }
287
288        pub async fn close(self) -> PolarsResult<()> {
289            match self {
290                Self::Dyn(mut v) => {
291                    v.shutdown().await.map_err(PolarsError::from)?;
292                    Ok(task::block_in_place(|| v.0.close())?)
293                },
294                Self::Local(v) => async {
295                    let f = v.into_std().await;
296                    ClosableFile::from(f).close()
297                }
298                .await
299                .map_err(PolarsError::from),
300                Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
301            }
302        }
303    }
304
305    impl Deref for AsyncWriteable {
306        type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
307
308        fn deref(&self) -> &Self::Target {
309            match self {
310                Self::Dyn(v) => v,
311                Self::Local(v) => v,
312                Self::Cloud(v) => v,
313            }
314        }
315    }
316
317    impl DerefMut for AsyncWriteable {
318        fn deref_mut(&mut self) -> &mut Self::Target {
319            match self {
320                Self::Dyn(v) => v,
321                Self::Local(v) => v,
322                Self::Cloud(v) => v,
323            }
324        }
325    }
326}