Skip to main content

polars_io/utils/
file.rs

1use std::io;
2#[cfg(feature = "cloud")]
3use std::num::NonZeroUsize;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7#[cfg(feature = "cloud")]
8pub use async_writeable::{AsyncDynWriteable, AsyncWriteable};
9use polars_error::{PolarsResult, feature_gated, polars_err};
10use polars_utils::create_file;
11use polars_utils::file::close_file;
12use polars_utils::mmap::ensure_not_mapped;
13use polars_utils::pl_path::{PlRefPath, format_file_uri};
14
15use super::sync_on_close::SyncOnCloseType;
16use crate::cloud::CloudOptions;
17use crate::metrics::IOMetrics;
18use crate::resolve_homedir;
19
20// TODO document precise contract.
21pub trait WriteableTrait: std::io::Write {
22    fn close(&mut self) -> std::io::Result<()>;
23    fn sync_all(&self) -> std::io::Result<()>;
24    fn sync_data(&self) -> std::io::Result<()>;
25}
26
27/// Holds a non-async writeable file, abstracted over local files or cloud files.
28///
29/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
30///
31/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
32#[allow(clippy::large_enum_variant)] // It will be boxed
33pub enum Writeable {
34    /// An abstract implementation for writable.
35    ///
36    /// This is used to implement writing to in-memory and arbitrary file descriptors.
37    Dyn(Box<dyn WriteableTrait + Send>),
38    Local(std::fs::File),
39    #[cfg(feature = "cloud")]
40    Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
41}
42
43impl Writeable {
44    pub fn try_new(
45        path: PlRefPath,
46        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_options: Option<&CloudOptions>,
47        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_chunk_size: usize,
48        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_concurrency: usize,
49        io_metrics: Option<Arc<IOMetrics>>,
50    ) -> PolarsResult<Self> {
51        Ok(if path.has_scheme() {
52            feature_gated!("cloud", {
53                use polars_core::runtime::ASYNC;
54
55                use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
56
57                let writer = ASYNC.block_in_place_on(new_cloud_writer(
58                    path,
59                    cloud_options,
60                    cloud_upload_chunk_size,
61                    cloud_upload_concurrency.try_into().unwrap(),
62                    io_metrics,
63                ))?;
64
65                Self::Cloud(CloudWriterIoTraitWrap::from(writer))
66            })
67        } else if polars_config::config().force_async() {
68            feature_gated!("cloud", {
69                let path = resolve_homedir(path.as_std_path());
70                create_file(&path)?;
71                let path = std::fs::canonicalize(&path)?;
72
73                ensure_not_mapped(&path.metadata()?)?;
74
75                let path = path.to_str().ok_or_else(|| polars_err!(non_utf8_path))?;
76                let path = format_file_uri(path);
77
78                use polars_core::runtime::ASYNC;
79
80                use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
81
82                let writer = ASYNC.block_in_place_on(new_cloud_writer(
83                    path,
84                    cloud_options,
85                    cloud_upload_chunk_size,
86                    cloud_upload_concurrency.try_into().unwrap(),
87                    io_metrics,
88                ))?;
89
90                Self::Cloud(CloudWriterIoTraitWrap::from(writer))
91            })
92        } else {
93            let path = resolve_homedir(path.as_std_path());
94            create_file(&path)?;
95
96            Self::Local(polars_utils::open_file_write(&path)?)
97        })
98    }
99
100    /// This returns `Result<>` - if a write was performed before calling this,
101    /// `CloudWriter` can be in an Err(_) state.
102    #[cfg(feature = "cloud")]
103    pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
104        use self::async_writeable::AsyncDynWriteable;
105
106        match self {
107            Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
108            Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
109            Self::Cloud(v) => Ok(AsyncWriteable::Cloud(v)),
110        }
111    }
112
113    pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
114        match self {
115            Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
116            Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
117            #[cfg(feature = "cloud")]
118            Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
119        }
120    }
121
122    pub fn sync_all(&self) -> io::Result<()> {
123        match self {
124            Self::Dyn(v) => v.sync_all(),
125            Self::Local(v) => v.sync_all(),
126            #[cfg(feature = "cloud")]
127            Self::Cloud(v) => v.sync_all(),
128        }
129    }
130
131    pub fn sync_data(&self) -> io::Result<()> {
132        match self {
133            Self::Dyn(v) => v.sync_data(),
134            Self::Local(v) => v.sync_data(),
135            #[cfg(feature = "cloud")]
136            Self::Cloud(v) => v.sync_data(),
137        }
138    }
139
140    pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
141        match sync {
142            SyncOnCloseType::All => self.sync_all()?,
143            SyncOnCloseType::Data => self.sync_data()?,
144            SyncOnCloseType::None => {},
145        }
146
147        match self {
148            Self::Dyn(mut v) => v.close(),
149            Self::Local(v) => close_file(v),
150            #[cfg(feature = "cloud")]
151            Self::Cloud(mut v) => v.close(),
152        }
153    }
154}
155
156impl io::Write for Writeable {
157    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
158        match self {
159            Self::Dyn(v) => v.write(buf),
160            Self::Local(v) => v.write(buf),
161            #[cfg(feature = "cloud")]
162            Self::Cloud(v) => v.write(buf),
163        }
164    }
165
166    fn flush(&mut self) -> io::Result<()> {
167        self.sync_all()
168    }
169}
170
171impl Deref for Writeable {
172    type Target = dyn io::Write + Send;
173
174    fn deref(&self) -> &Self::Target {
175        match self {
176            Self::Dyn(v) => v,
177            Self::Local(v) => v,
178            #[cfg(feature = "cloud")]
179            Self::Cloud(v) => v,
180        }
181    }
182}
183
184impl DerefMut for Writeable {
185    fn deref_mut(&mut self) -> &mut Self::Target {
186        match self {
187            Self::Dyn(v) => v,
188            Self::Local(v) => v,
189            #[cfg(feature = "cloud")]
190            Self::Cloud(v) => v,
191        }
192    }
193}
194
195/// Avoid BufWriter wrapping on writers that already have internal buffering.
196pub enum BufferedWriteable<'a> {
197    BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
198    Direct(&'a mut (dyn std::io::Write + Send)),
199}
200
201impl<'a> Deref for BufferedWriteable<'a> {
202    type Target = dyn io::Write + Send + 'a;
203
204    fn deref(&self) -> &Self::Target {
205        match self {
206            Self::BufWriter(v) => v as _,
207            Self::Direct(v) => v,
208        }
209    }
210}
211
212impl DerefMut for BufferedWriteable<'_> {
213    fn deref_mut(&mut self) -> &mut Self::Target {
214        match self {
215            Self::BufWriter(v) => v as _,
216            Self::Direct(v) => v,
217        }
218    }
219}
220
221#[cfg(feature = "cloud")]
222async fn new_cloud_writer(
223    path: PlRefPath,
224    cloud_options: Option<&CloudOptions>,
225    cloud_upload_chunk_size: usize,
226    cloud_upload_concurrency: NonZeroUsize,
227    io_metrics: Option<Arc<IOMetrics>>,
228) -> PolarsResult<crate::cloud::cloud_writer::CloudWriter> {
229    use crate::cloud::cloud_writer::CloudWriter;
230    use crate::cloud::object_path_from_str;
231
232    let (cloud_location, object_store) =
233        crate::cloud::build_object_store(path, cloud_options, false).await?;
234
235    let mut writer = CloudWriter::new(
236        object_store,
237        object_path_from_str(&cloud_location.prefix)?,
238        cloud_upload_chunk_size,
239        cloud_upload_concurrency,
240        io_metrics,
241    );
242
243    writer.start().await?;
244
245    Ok(writer)
246}
247
248#[cfg(feature = "cloud")]
249mod async_writeable {
250    use std::io;
251    use std::ops::{Deref, DerefMut};
252    use std::pin::Pin;
253    use std::sync::Arc;
254    use std::task::{Context, Poll};
255
256    use bytes::Bytes;
257    use polars_error::{PolarsError, PolarsResult};
258    use polars_utils::file::close_file;
259    use polars_utils::pl_path::PlRefPath;
260    use tokio::io::AsyncWriteExt;
261    use tokio::task;
262
263    use super::{Writeable, WriteableTrait};
264    use crate::cloud::CloudOptions;
265    use crate::metrics::IOMetrics;
266    use crate::utils::sync_on_close::SyncOnCloseType;
267
268    /// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
269    pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
270
271    impl tokio::io::AsyncWrite for AsyncDynWriteable {
272        fn poll_write(
273            self: Pin<&mut Self>,
274            _cx: &mut Context<'_>,
275            buf: &[u8],
276        ) -> Poll<io::Result<usize>> {
277            let result = task::block_in_place(|| self.get_mut().0.write(buf));
278            Poll::Ready(result)
279        }
280
281        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
282            let result = task::block_in_place(|| self.get_mut().0.flush());
283            Poll::Ready(result)
284        }
285
286        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
287            self.poll_flush(cx)
288        }
289    }
290
291    /// Holds an async writeable file, abstracted over local files or cloud files.
292    ///
293    /// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
294    ///
295    /// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
296    /// You should instead call the [`AsyncWriteable::close`] at the end.
297    pub enum AsyncWriteable {
298        Dyn(AsyncDynWriteable),
299        Local(tokio::fs::File),
300        Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
301    }
302
303    impl AsyncWriteable {
304        pub async fn try_new(
305            path: PlRefPath,
306            cloud_options: Option<&CloudOptions>,
307            cloud_upload_chunk_size: usize,
308            cloud_upload_concurrency: usize,
309            io_metrics: Option<Arc<IOMetrics>>,
310        ) -> PolarsResult<Self> {
311            // TODO: Native async impl
312            Writeable::try_new(
313                path,
314                cloud_options,
315                cloud_upload_chunk_size,
316                cloud_upload_concurrency,
317                io_metrics,
318            )
319            .and_then(|x| x.try_into_async_writeable())
320        }
321
322        /// If this writer holds a cloud writer, it will `mem::take(T)`. `T` is unmodified for other
323        /// writer types.
324        pub async fn write_all_owned<T>(&mut self, src: &mut T) -> io::Result<()>
325        where
326            T: AsRef<[u8]> + Default + Drop, // `Drop` is to exclude `&[u8]` slices.
327            Bytes: From<T>,
328        {
329            match self {
330                Self::Cloud(v) => v.write_all_owned(Bytes::from(std::mem::take(src))).await,
331                Self::Dyn(_) | Self::Local(_) => self.write_all(src.as_ref()).await,
332            }
333        }
334
335        pub async fn sync_all(&mut self) -> io::Result<()> {
336            match self {
337                Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
338                Self::Local(v) => v.sync_all().await,
339                Self::Cloud(_) => Ok(()),
340            }
341        }
342
343        pub async fn sync_data(&mut self) -> io::Result<()> {
344            match self {
345                Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
346                Self::Local(v) => v.sync_data().await,
347                Self::Cloud(_) => Ok(()),
348            }
349        }
350
351        pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
352            match sync {
353                SyncOnCloseType::All => self.sync_all().await?,
354                SyncOnCloseType::Data => self.sync_data().await?,
355                SyncOnCloseType::None => {},
356            }
357
358            match self {
359                Self::Dyn(mut v) => {
360                    v.shutdown().await.map_err(PolarsError::from)?;
361                    Ok(task::block_in_place(|| v.0.close())?)
362                },
363                Self::Local(v) => async {
364                    let f = v.into_std().await;
365                    close_file(f)
366                }
367                .await
368                .map_err(PolarsError::from),
369                Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
370            }
371        }
372    }
373
374    impl Deref for AsyncWriteable {
375        type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
376
377        fn deref(&self) -> &Self::Target {
378            match self {
379                Self::Dyn(v) => v,
380                Self::Local(v) => v,
381                Self::Cloud(v) => v,
382            }
383        }
384    }
385
386    impl DerefMut for AsyncWriteable {
387        fn deref_mut(&mut self) -> &mut Self::Target {
388            match self {
389                Self::Dyn(v) => v,
390                Self::Local(v) => v,
391                Self::Cloud(v) => v,
392            }
393        }
394    }
395}