polars_io/utils/
file.rs

1use std::io;
2use std::ops::{Deref, DerefMut};
3use std::path::Path;
4
5#[cfg(feature = "cloud")]
6pub use async_writeable::AsyncWriteable;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, feature_gated};
9use polars_utils::create_file;
10use polars_utils::file::{ClosableFile, WriteClose};
11use polars_utils::mmap::ensure_not_mapped;
12
13use super::sync_on_close::SyncOnCloseType;
14use crate::cloud::CloudOptions;
15use crate::{is_cloud_url, resolve_homedir};
16
17pub trait DynWriteable: io::Write + Send {
18    // Needed because trait upcasting is only stable in 1.86.
19    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static);
20    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static);
21
22    fn close(self: Box<Self>) -> io::Result<()>;
23    fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()>;
24}
25
26impl DynWriteable for ClosableFile {
27    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
28        self as _
29    }
30    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
31        self as _
32    }
33    fn close(self: Box<Self>) -> io::Result<()> {
34        ClosableFile::close(*self)
35    }
36    fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()> {
37        super::sync_on_close::sync_on_close(sync_on_close, self.as_mut())
38    }
39}
40
41/// Holds a non-async writeable file, abstracted over local files or cloud files.
42///
43/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
44///
45/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
46#[allow(clippy::large_enum_variant)] // It will be boxed
47pub enum Writeable {
48    /// An abstract implementation for writable.
49    ///
50    /// This is used to implement writing to in-memory and arbitrary file descriptors.
51    Dyn(Box<dyn DynWriteable>),
52    Local(std::fs::File),
53    #[cfg(feature = "cloud")]
54    Cloud(crate::cloud::BlockingCloudWriter),
55}
56
57impl Writeable {
58    pub fn try_new(
59        path: &str,
60        #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
61    ) -> PolarsResult<Self> {
62        let is_cloud = is_cloud_url(path);
63        let verbose = config::verbose();
64
65        if is_cloud {
66            feature_gated!("cloud", {
67                use crate::cloud::BlockingCloudWriter;
68
69                if verbose {
70                    eprintln!("Writeable: try_new: cloud: {}", path)
71                }
72
73                if path.starts_with("file://") {
74                    create_file(Path::new(&path[const { "file://".len() }..]))?;
75                }
76
77                let writer = crate::pl_async::get_runtime()
78                    .block_in_place_on(BlockingCloudWriter::new(path, cloud_options))?;
79                Ok(Self::Cloud(writer))
80            })
81        } else if config::force_async() {
82            feature_gated!("cloud", {
83                use crate::cloud::BlockingCloudWriter;
84
85                let path = resolve_homedir(&path);
86
87                if verbose {
88                    eprintln!(
89                        "Writeable: try_new: forced async: {}",
90                        path.to_str().unwrap()
91                    )
92                }
93
94                create_file(&path)?;
95                let path = std::fs::canonicalize(&path)?;
96
97                ensure_not_mapped(&path.metadata()?)?;
98
99                let path = format!(
100                    "file://{}",
101                    if cfg!(target_family = "windows") {
102                        path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
103                    } else {
104                        path.to_str().unwrap()
105                    }
106                );
107
108                if verbose {
109                    eprintln!("Writeable: try_new: forced async converted path: {}", path)
110                }
111
112                let writer = crate::pl_async::get_runtime()
113                    .block_in_place_on(BlockingCloudWriter::new(&path, cloud_options))?;
114                Ok(Self::Cloud(writer))
115            })
116        } else {
117            let path = resolve_homedir(&path);
118            create_file(&path)?;
119
120            // Note: `canonicalize` does not work on some systems.
121
122            if verbose {
123                eprintln!(
124                    "Writeable: try_new: local: {} (canonicalize: {:?})",
125                    path.to_str().unwrap(),
126                    std::fs::canonicalize(&path)
127                )
128            }
129
130            Ok(Self::Local(polars_utils::open_file_write(&path)?))
131        }
132    }
133
134    /// This returns `Result<>` - if a write was performed before calling this,
135    /// `CloudWriter` can be in an Err(_) state.
136    #[cfg(feature = "cloud")]
137    pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
138        use self::async_writeable::AsyncDynWriteable;
139
140        match self {
141            Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
142            Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
143            // Moves the `BufWriter` out of the `BlockingCloudWriter` wrapper, as
144            // `BlockingCloudWriter` has a `Drop` impl that we don't want.
145            Self::Cloud(v) => v
146                .try_into_inner()
147                .map(AsyncWriteable::Cloud)
148                .map_err(PolarsError::from),
149        }
150    }
151
152    pub fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> std::io::Result<()> {
153        match self {
154            Writeable::Dyn(d) => d.sync_on_close(sync_on_close),
155            Writeable::Local(file) => {
156                crate::utils::sync_on_close::sync_on_close(sync_on_close, file)
157            },
158            #[cfg(feature = "cloud")]
159            Writeable::Cloud(_) => Ok(()),
160        }
161    }
162
163    pub fn close(self) -> std::io::Result<()> {
164        match self {
165            Self::Dyn(v) => v.close(),
166            Self::Local(v) => ClosableFile::from(v).close(),
167            #[cfg(feature = "cloud")]
168            Self::Cloud(mut v) => v.close(),
169        }
170    }
171}
172
173impl Deref for Writeable {
174    type Target = dyn io::Write + Send;
175
176    fn deref(&self) -> &Self::Target {
177        match self {
178            Self::Dyn(v) => v.as_dyn_write(),
179            Self::Local(v) => v,
180            #[cfg(feature = "cloud")]
181            Self::Cloud(v) => v,
182        }
183    }
184}
185
186impl DerefMut for Writeable {
187    fn deref_mut(&mut self) -> &mut Self::Target {
188        match self {
189            Self::Dyn(v) => v.as_mut_dyn_write(),
190            Self::Local(v) => v,
191            #[cfg(feature = "cloud")]
192            Self::Cloud(v) => v,
193        }
194    }
195}
196
197/// Note: Prefer using [`Writeable`] / [`Writeable::try_new`] where possible.
198///
199/// Open a path for writing. Supports cloud paths.
200pub fn try_get_writeable(
201    path: &str,
202    cloud_options: Option<&CloudOptions>,
203) -> PolarsResult<Box<dyn WriteClose + Send>> {
204    Writeable::try_new(path, cloud_options).map(|x| match x {
205        Writeable::Dyn(_) => unreachable!(),
206        Writeable::Local(v) => Box::new(ClosableFile::from(v)) as Box<dyn WriteClose + Send>,
207        #[cfg(feature = "cloud")]
208        Writeable::Cloud(v) => Box::new(v) as Box<dyn WriteClose + Send>,
209    })
210}
211
212#[cfg(feature = "cloud")]
213mod async_writeable {
214    use std::io;
215    use std::ops::{Deref, DerefMut};
216    use std::pin::Pin;
217    use std::task::{Context, Poll};
218
219    use polars_error::{PolarsError, PolarsResult};
220    use polars_utils::file::ClosableFile;
221    use tokio::io::AsyncWriteExt;
222    use tokio::task;
223
224    use super::{DynWriteable, Writeable};
225    use crate::cloud::CloudOptions;
226    use crate::utils::sync_on_close::SyncOnCloseType;
227
228    /// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
229    pub struct AsyncDynWriteable(pub Box<dyn DynWriteable>);
230
231    impl tokio::io::AsyncWrite for AsyncDynWriteable {
232        fn poll_write(
233            self: Pin<&mut Self>,
234            _cx: &mut Context<'_>,
235            buf: &[u8],
236        ) -> Poll<io::Result<usize>> {
237            let result = task::block_in_place(|| self.get_mut().0.write(buf));
238            Poll::Ready(result)
239        }
240
241        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
242            let result = task::block_in_place(|| self.get_mut().0.flush());
243            Poll::Ready(result)
244        }
245
246        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
247            self.poll_flush(cx)
248        }
249    }
250
251    /// Holds an async writeable file, abstracted over local files or cloud files.
252    ///
253    /// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
254    ///
255    /// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
256    /// You should instead call the [`AsyncWriteable::close`] at the end.
257    pub enum AsyncWriteable {
258        Dyn(AsyncDynWriteable),
259        Local(tokio::fs::File),
260        Cloud(object_store::buffered::BufWriter),
261    }
262
263    impl AsyncWriteable {
264        pub async fn try_new(
265            path: &str,
266            cloud_options: Option<&CloudOptions>,
267        ) -> PolarsResult<Self> {
268            // TODO: Native async impl
269            Writeable::try_new(path, cloud_options).and_then(|x| x.try_into_async_writeable())
270        }
271
272        pub async fn sync_on_close(
273            &mut self,
274            sync_on_close: SyncOnCloseType,
275        ) -> std::io::Result<()> {
276            match self {
277                Self::Dyn(d) => task::block_in_place(|| d.0.sync_on_close(sync_on_close)),
278                Self::Local(file) => {
279                    crate::utils::sync_on_close::tokio_sync_on_close(sync_on_close, file).await
280                },
281                Self::Cloud(_) => Ok(()),
282            }
283        }
284
285        pub async fn close(self) -> PolarsResult<()> {
286            match self {
287                Self::Dyn(mut v) => {
288                    v.shutdown().await.map_err(PolarsError::from)?;
289                    Ok(task::block_in_place(|| v.0.close())?)
290                },
291                Self::Local(v) => async {
292                    let f = v.into_std().await;
293                    ClosableFile::from(f).close()
294                }
295                .await
296                .map_err(PolarsError::from),
297                Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
298            }
299        }
300    }
301
302    impl Deref for AsyncWriteable {
303        type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
304
305        fn deref(&self) -> &Self::Target {
306            match self {
307                Self::Dyn(v) => v,
308                Self::Local(v) => v,
309                Self::Cloud(v) => v,
310            }
311        }
312    }
313
314    impl DerefMut for AsyncWriteable {
315        fn deref_mut(&mut self) -> &mut Self::Target {
316            match self {
317                Self::Dyn(v) => v,
318                Self::Local(v) => v,
319                Self::Cloud(v) => v,
320            }
321        }
322    }
323}