polars_io/utils/
file.rs

1use std::io;
2#[cfg(feature = "cloud")]
3use std::num::NonZeroUsize;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7#[cfg(feature = "cloud")]
8pub use async_writeable::{AsyncDynWriteable, AsyncWriteable};
9use polars_error::{PolarsResult, feature_gated, polars_err};
10use polars_utils::create_file;
11use polars_utils::file::close_file;
12use polars_utils::mmap::ensure_not_mapped;
13use polars_utils::pl_path::{PlRefPath, format_file_uri};
14
15use super::sync_on_close::SyncOnCloseType;
16use crate::cloud::CloudOptions;
17use crate::metrics::IOMetrics;
18use crate::resolve_homedir;
19
20// TODO document precise contract.
21pub trait WriteableTrait: std::io::Write {
22    fn close(&mut self) -> std::io::Result<()>;
23    fn sync_all(&self) -> std::io::Result<()>;
24    fn sync_data(&self) -> std::io::Result<()>;
25}
26
27/// Holds a non-async writeable file, abstracted over local files or cloud files.
28///
29/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
30///
31/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
32#[allow(clippy::large_enum_variant)] // It will be boxed
33pub enum Writeable {
34    /// An abstract implementation for writable.
35    ///
36    /// This is used to implement writing to in-memory and arbitrary file descriptors.
37    Dyn(Box<dyn WriteableTrait + Send>),
38    Local(std::fs::File),
39    #[cfg(feature = "cloud")]
40    Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
41}
42
43impl Writeable {
44    pub fn try_new(
45        path: PlRefPath,
46        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_options: Option<&CloudOptions>,
47        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_chunk_size: usize,
48        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_concurrency: usize,
49        io_metrics: Option<Arc<IOMetrics>>,
50    ) -> PolarsResult<Self> {
51        Ok(if path.has_scheme() {
52            feature_gated!("cloud", {
53                use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
54                use crate::pl_async::get_runtime;
55
56                let writer = get_runtime().block_in_place_on(new_cloud_writer(
57                    path,
58                    cloud_options,
59                    cloud_upload_chunk_size,
60                    cloud_upload_concurrency.try_into().unwrap(),
61                    io_metrics,
62                ))?;
63
64                Self::Cloud(CloudWriterIoTraitWrap::from(writer))
65            })
66        } else if polars_config::config().force_async() {
67            feature_gated!("cloud", {
68                let path = resolve_homedir(path.as_std_path());
69                create_file(&path)?;
70                let path = std::fs::canonicalize(&path)?;
71
72                ensure_not_mapped(&path.metadata()?)?;
73
74                let path = path.to_str().ok_or_else(|| polars_err!(non_utf8_path))?;
75                let path = format_file_uri(path);
76
77                use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
78                use crate::pl_async::get_runtime;
79
80                let writer = get_runtime().block_in_place_on(new_cloud_writer(
81                    path,
82                    cloud_options,
83                    cloud_upload_chunk_size,
84                    cloud_upload_concurrency.try_into().unwrap(),
85                    io_metrics,
86                ))?;
87
88                Self::Cloud(CloudWriterIoTraitWrap::from(writer))
89            })
90        } else {
91            let path = resolve_homedir(path.as_std_path());
92            create_file(&path)?;
93
94            Self::Local(polars_utils::open_file_write(&path)?)
95        })
96    }
97
98    /// This returns `Result<>` - if a write was performed before calling this,
99    /// `CloudWriter` can be in an Err(_) state.
100    #[cfg(feature = "cloud")]
101    pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
102        use self::async_writeable::AsyncDynWriteable;
103
104        match self {
105            Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
106            Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
107            Self::Cloud(v) => Ok(AsyncWriteable::Cloud(v)),
108        }
109    }
110
111    pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
112        match self {
113            Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
114            Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
115            #[cfg(feature = "cloud")]
116            Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
117        }
118    }
119
120    pub fn sync_all(&self) -> io::Result<()> {
121        match self {
122            Self::Dyn(v) => v.sync_all(),
123            Self::Local(v) => v.sync_all(),
124            #[cfg(feature = "cloud")]
125            Self::Cloud(v) => v.sync_all(),
126        }
127    }
128
129    pub fn sync_data(&self) -> io::Result<()> {
130        match self {
131            Self::Dyn(v) => v.sync_data(),
132            Self::Local(v) => v.sync_data(),
133            #[cfg(feature = "cloud")]
134            Self::Cloud(v) => v.sync_data(),
135        }
136    }
137
138    pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
139        match sync {
140            SyncOnCloseType::All => self.sync_all()?,
141            SyncOnCloseType::Data => self.sync_data()?,
142            SyncOnCloseType::None => {},
143        }
144
145        match self {
146            Self::Dyn(mut v) => v.close(),
147            Self::Local(v) => close_file(v),
148            #[cfg(feature = "cloud")]
149            Self::Cloud(mut v) => v.close(),
150        }
151    }
152}
153
154impl io::Write for Writeable {
155    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
156        match self {
157            Self::Dyn(v) => v.write(buf),
158            Self::Local(v) => v.write(buf),
159            #[cfg(feature = "cloud")]
160            Self::Cloud(v) => v.write(buf),
161        }
162    }
163
164    fn flush(&mut self) -> io::Result<()> {
165        self.sync_all()
166    }
167}
168
169impl Deref for Writeable {
170    type Target = dyn io::Write + Send;
171
172    fn deref(&self) -> &Self::Target {
173        match self {
174            Self::Dyn(v) => v,
175            Self::Local(v) => v,
176            #[cfg(feature = "cloud")]
177            Self::Cloud(v) => v,
178        }
179    }
180}
181
182impl DerefMut for Writeable {
183    fn deref_mut(&mut self) -> &mut Self::Target {
184        match self {
185            Self::Dyn(v) => v,
186            Self::Local(v) => v,
187            #[cfg(feature = "cloud")]
188            Self::Cloud(v) => v,
189        }
190    }
191}
192
193/// Avoid BufWriter wrapping on writers that already have internal buffering.
194pub enum BufferedWriteable<'a> {
195    BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
196    Direct(&'a mut (dyn std::io::Write + Send)),
197}
198
199impl<'a> Deref for BufferedWriteable<'a> {
200    type Target = dyn io::Write + Send + 'a;
201
202    fn deref(&self) -> &Self::Target {
203        match self {
204            Self::BufWriter(v) => v as _,
205            Self::Direct(v) => v,
206        }
207    }
208}
209
210impl DerefMut for BufferedWriteable<'_> {
211    fn deref_mut(&mut self) -> &mut Self::Target {
212        match self {
213            Self::BufWriter(v) => v as _,
214            Self::Direct(v) => v,
215        }
216    }
217}
218
219#[cfg(feature = "cloud")]
220async fn new_cloud_writer(
221    path: PlRefPath,
222    cloud_options: Option<&CloudOptions>,
223    cloud_upload_chunk_size: usize,
224    cloud_upload_concurrency: NonZeroUsize,
225    io_metrics: Option<Arc<IOMetrics>>,
226) -> PolarsResult<crate::cloud::cloud_writer::CloudWriter> {
227    use crate::cloud::cloud_writer::CloudWriter;
228    use crate::cloud::object_path_from_str;
229
230    let (cloud_location, object_store) =
231        crate::cloud::build_object_store(path, cloud_options, false).await?;
232
233    let mut writer = CloudWriter::new(
234        object_store,
235        object_path_from_str(&cloud_location.prefix)?,
236        cloud_upload_chunk_size,
237        cloud_upload_concurrency,
238        io_metrics,
239    );
240
241    writer.start().await?;
242
243    Ok(writer)
244}
245
246#[cfg(feature = "cloud")]
247mod async_writeable {
248    use std::io;
249    use std::ops::{Deref, DerefMut};
250    use std::pin::Pin;
251    use std::sync::Arc;
252    use std::task::{Context, Poll};
253
254    use polars_error::{PolarsError, PolarsResult};
255    use polars_utils::file::close_file;
256    use polars_utils::pl_path::PlRefPath;
257    use tokio::io::AsyncWriteExt;
258    use tokio::task;
259
260    use super::{Writeable, WriteableTrait};
261    use crate::cloud::CloudOptions;
262    use crate::metrics::IOMetrics;
263    use crate::utils::sync_on_close::SyncOnCloseType;
264
265    /// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
266    pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
267
268    impl tokio::io::AsyncWrite for AsyncDynWriteable {
269        fn poll_write(
270            self: Pin<&mut Self>,
271            _cx: &mut Context<'_>,
272            buf: &[u8],
273        ) -> Poll<io::Result<usize>> {
274            let result = task::block_in_place(|| self.get_mut().0.write(buf));
275            Poll::Ready(result)
276        }
277
278        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
279            let result = task::block_in_place(|| self.get_mut().0.flush());
280            Poll::Ready(result)
281        }
282
283        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
284            self.poll_flush(cx)
285        }
286    }
287
288    /// Holds an async writeable file, abstracted over local files or cloud files.
289    ///
290    /// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
291    ///
292    /// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
293    /// You should instead call the [`AsyncWriteable::close`] at the end.
294    pub enum AsyncWriteable {
295        Dyn(AsyncDynWriteable),
296        Local(tokio::fs::File),
297        Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
298    }
299
300    impl AsyncWriteable {
301        pub async fn try_new(
302            path: PlRefPath,
303            cloud_options: Option<&CloudOptions>,
304            cloud_upload_chunk_size: usize,
305            cloud_upload_concurrency: usize,
306            io_metrics: Option<Arc<IOMetrics>>,
307        ) -> PolarsResult<Self> {
308            // TODO: Native async impl
309            Writeable::try_new(
310                path,
311                cloud_options,
312                cloud_upload_chunk_size,
313                cloud_upload_concurrency,
314                io_metrics,
315            )
316            .and_then(|x| x.try_into_async_writeable())
317        }
318
319        pub async fn sync_all(&mut self) -> io::Result<()> {
320            match self {
321                Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
322                Self::Local(v) => v.sync_all().await,
323                Self::Cloud(_) => Ok(()),
324            }
325        }
326
327        pub async fn sync_data(&mut self) -> io::Result<()> {
328            match self {
329                Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
330                Self::Local(v) => v.sync_data().await,
331                Self::Cloud(_) => Ok(()),
332            }
333        }
334
335        pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
336            match sync {
337                SyncOnCloseType::All => self.sync_all().await?,
338                SyncOnCloseType::Data => self.sync_data().await?,
339                SyncOnCloseType::None => {},
340            }
341
342            match self {
343                Self::Dyn(mut v) => {
344                    v.shutdown().await.map_err(PolarsError::from)?;
345                    Ok(task::block_in_place(|| v.0.close())?)
346                },
347                Self::Local(v) => async {
348                    let f = v.into_std().await;
349                    close_file(f)
350                }
351                .await
352                .map_err(PolarsError::from),
353                Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
354            }
355        }
356    }
357
358    impl Deref for AsyncWriteable {
359        type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
360
361        fn deref(&self) -> &Self::Target {
362            match self {
363                Self::Dyn(v) => v,
364                Self::Local(v) => v,
365                Self::Cloud(v) => v,
366            }
367        }
368    }
369
370    impl DerefMut for AsyncWriteable {
371        fn deref_mut(&mut self) -> &mut Self::Target {
372            match self {
373                Self::Dyn(v) => v,
374                Self::Local(v) => v,
375                Self::Cloud(v) => v,
376            }
377        }
378    }
379}