1use std::io;
2#[cfg(feature = "cloud")]
3use std::num::NonZeroUsize;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7#[cfg(feature = "cloud")]
8pub use async_writeable::{AsyncDynWriteable, AsyncWriteable};
9use polars_error::{PolarsResult, feature_gated, polars_err};
10use polars_utils::create_file;
11use polars_utils::file::close_file;
12use polars_utils::mmap::ensure_not_mapped;
13use polars_utils::pl_path::{PlRefPath, format_file_uri};
14
15use super::sync_on_close::SyncOnCloseType;
16use crate::cloud::CloudOptions;
17use crate::metrics::IOMetrics;
18use crate::resolve_homedir;
19
20pub trait WriteableTrait: std::io::Write {
22 fn close(&mut self) -> std::io::Result<()>;
23 fn sync_all(&self) -> std::io::Result<()>;
24 fn sync_data(&self) -> std::io::Result<()>;
25}
26
27#[allow(clippy::large_enum_variant)] pub enum Writeable {
34 Dyn(Box<dyn WriteableTrait + Send>),
38 Local(std::fs::File),
39 #[cfg(feature = "cloud")]
40 Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
41}
42
43impl Writeable {
44 pub fn try_new(
45 path: PlRefPath,
46 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_options: Option<&CloudOptions>,
47 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_chunk_size: usize,
48 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_concurrency: usize,
49 io_metrics: Option<Arc<IOMetrics>>,
50 ) -> PolarsResult<Self> {
51 Ok(if path.has_scheme() {
52 feature_gated!("cloud", {
53 use polars_core::runtime::ASYNC;
54
55 use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
56
57 let writer = ASYNC.block_in_place_on(new_cloud_writer(
58 path,
59 cloud_options,
60 cloud_upload_chunk_size,
61 cloud_upload_concurrency.try_into().unwrap(),
62 io_metrics,
63 ))?;
64
65 Self::Cloud(CloudWriterIoTraitWrap::from(writer))
66 })
67 } else if polars_config::config().force_async() {
68 feature_gated!("cloud", {
69 let path = resolve_homedir(path.as_std_path());
70 create_file(&path)?;
71 let path = std::fs::canonicalize(&path)?;
72
73 ensure_not_mapped(&path.metadata()?)?;
74
75 let path = path.to_str().ok_or_else(|| polars_err!(non_utf8_path))?;
76 let path = format_file_uri(path);
77
78 use polars_core::runtime::ASYNC;
79
80 use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
81
82 let writer = ASYNC.block_in_place_on(new_cloud_writer(
83 path,
84 cloud_options,
85 cloud_upload_chunk_size,
86 cloud_upload_concurrency.try_into().unwrap(),
87 io_metrics,
88 ))?;
89
90 Self::Cloud(CloudWriterIoTraitWrap::from(writer))
91 })
92 } else {
93 let path = resolve_homedir(path.as_std_path());
94 create_file(&path)?;
95
96 Self::Local(polars_utils::open_file_write(&path)?)
97 })
98 }
99
100 #[cfg(feature = "cloud")]
103 pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
104 use self::async_writeable::AsyncDynWriteable;
105
106 match self {
107 Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
108 Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
109 Self::Cloud(v) => Ok(AsyncWriteable::Cloud(v)),
110 }
111 }
112
113 pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
114 match self {
115 Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
116 Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
117 #[cfg(feature = "cloud")]
118 Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
119 }
120 }
121
122 pub fn sync_all(&self) -> io::Result<()> {
123 match self {
124 Self::Dyn(v) => v.sync_all(),
125 Self::Local(v) => v.sync_all(),
126 #[cfg(feature = "cloud")]
127 Self::Cloud(v) => v.sync_all(),
128 }
129 }
130
131 pub fn sync_data(&self) -> io::Result<()> {
132 match self {
133 Self::Dyn(v) => v.sync_data(),
134 Self::Local(v) => v.sync_data(),
135 #[cfg(feature = "cloud")]
136 Self::Cloud(v) => v.sync_data(),
137 }
138 }
139
140 pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
141 match sync {
142 SyncOnCloseType::All => self.sync_all()?,
143 SyncOnCloseType::Data => self.sync_data()?,
144 SyncOnCloseType::None => {},
145 }
146
147 match self {
148 Self::Dyn(mut v) => v.close(),
149 Self::Local(v) => close_file(v),
150 #[cfg(feature = "cloud")]
151 Self::Cloud(mut v) => v.close(),
152 }
153 }
154}
155
156impl io::Write for Writeable {
157 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
158 match self {
159 Self::Dyn(v) => v.write(buf),
160 Self::Local(v) => v.write(buf),
161 #[cfg(feature = "cloud")]
162 Self::Cloud(v) => v.write(buf),
163 }
164 }
165
166 fn flush(&mut self) -> io::Result<()> {
167 self.sync_all()
168 }
169}
170
171impl Deref for Writeable {
172 type Target = dyn io::Write + Send;
173
174 fn deref(&self) -> &Self::Target {
175 match self {
176 Self::Dyn(v) => v,
177 Self::Local(v) => v,
178 #[cfg(feature = "cloud")]
179 Self::Cloud(v) => v,
180 }
181 }
182}
183
184impl DerefMut for Writeable {
185 fn deref_mut(&mut self) -> &mut Self::Target {
186 match self {
187 Self::Dyn(v) => v,
188 Self::Local(v) => v,
189 #[cfg(feature = "cloud")]
190 Self::Cloud(v) => v,
191 }
192 }
193}
194
195pub enum BufferedWriteable<'a> {
197 BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
198 Direct(&'a mut (dyn std::io::Write + Send)),
199}
200
201impl<'a> Deref for BufferedWriteable<'a> {
202 type Target = dyn io::Write + Send + 'a;
203
204 fn deref(&self) -> &Self::Target {
205 match self {
206 Self::BufWriter(v) => v as _,
207 Self::Direct(v) => v,
208 }
209 }
210}
211
212impl DerefMut for BufferedWriteable<'_> {
213 fn deref_mut(&mut self) -> &mut Self::Target {
214 match self {
215 Self::BufWriter(v) => v as _,
216 Self::Direct(v) => v,
217 }
218 }
219}
220
221#[cfg(feature = "cloud")]
222async fn new_cloud_writer(
223 path: PlRefPath,
224 cloud_options: Option<&CloudOptions>,
225 cloud_upload_chunk_size: usize,
226 cloud_upload_concurrency: NonZeroUsize,
227 io_metrics: Option<Arc<IOMetrics>>,
228) -> PolarsResult<crate::cloud::cloud_writer::CloudWriter> {
229 use crate::cloud::cloud_writer::CloudWriter;
230 use crate::cloud::object_path_from_str;
231
232 let (cloud_location, object_store) =
233 crate::cloud::build_object_store(path, cloud_options, false).await?;
234
235 let mut writer = CloudWriter::new(
236 object_store,
237 object_path_from_str(&cloud_location.prefix)?,
238 cloud_upload_chunk_size,
239 cloud_upload_concurrency,
240 io_metrics,
241 );
242
243 writer.start().await?;
244
245 Ok(writer)
246}
247
248#[cfg(feature = "cloud")]
249mod async_writeable {
250 use std::io;
251 use std::ops::{Deref, DerefMut};
252 use std::pin::Pin;
253 use std::sync::Arc;
254 use std::task::{Context, Poll};
255
256 use bytes::Bytes;
257 use polars_error::{PolarsError, PolarsResult};
258 use polars_utils::file::close_file;
259 use polars_utils::pl_path::PlRefPath;
260 use tokio::io::AsyncWriteExt;
261 use tokio::task;
262
263 use super::{Writeable, WriteableTrait};
264 use crate::cloud::CloudOptions;
265 use crate::metrics::IOMetrics;
266 use crate::utils::sync_on_close::SyncOnCloseType;
267
268 pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
270
271 impl tokio::io::AsyncWrite for AsyncDynWriteable {
272 fn poll_write(
273 self: Pin<&mut Self>,
274 _cx: &mut Context<'_>,
275 buf: &[u8],
276 ) -> Poll<io::Result<usize>> {
277 let result = task::block_in_place(|| self.get_mut().0.write(buf));
278 Poll::Ready(result)
279 }
280
281 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
282 let result = task::block_in_place(|| self.get_mut().0.flush());
283 Poll::Ready(result)
284 }
285
286 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
287 self.poll_flush(cx)
288 }
289 }
290
291 pub enum AsyncWriteable {
298 Dyn(AsyncDynWriteable),
299 Local(tokio::fs::File),
300 Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
301 }
302
303 impl AsyncWriteable {
304 pub async fn try_new(
305 path: PlRefPath,
306 cloud_options: Option<&CloudOptions>,
307 cloud_upload_chunk_size: usize,
308 cloud_upload_concurrency: usize,
309 io_metrics: Option<Arc<IOMetrics>>,
310 ) -> PolarsResult<Self> {
311 Writeable::try_new(
313 path,
314 cloud_options,
315 cloud_upload_chunk_size,
316 cloud_upload_concurrency,
317 io_metrics,
318 )
319 .and_then(|x| x.try_into_async_writeable())
320 }
321
322 pub async fn write_all_owned<T>(&mut self, src: &mut T) -> io::Result<()>
325 where
326 T: AsRef<[u8]> + Default + Drop, Bytes: From<T>,
328 {
329 match self {
330 Self::Cloud(v) => v.write_all_owned(Bytes::from(std::mem::take(src))).await,
331 Self::Dyn(_) | Self::Local(_) => self.write_all(src.as_ref()).await,
332 }
333 }
334
335 pub async fn sync_all(&mut self) -> io::Result<()> {
336 match self {
337 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
338 Self::Local(v) => v.sync_all().await,
339 Self::Cloud(_) => Ok(()),
340 }
341 }
342
343 pub async fn sync_data(&mut self) -> io::Result<()> {
344 match self {
345 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
346 Self::Local(v) => v.sync_data().await,
347 Self::Cloud(_) => Ok(()),
348 }
349 }
350
351 pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
352 match sync {
353 SyncOnCloseType::All => self.sync_all().await?,
354 SyncOnCloseType::Data => self.sync_data().await?,
355 SyncOnCloseType::None => {},
356 }
357
358 match self {
359 Self::Dyn(mut v) => {
360 v.shutdown().await.map_err(PolarsError::from)?;
361 Ok(task::block_in_place(|| v.0.close())?)
362 },
363 Self::Local(v) => async {
364 let f = v.into_std().await;
365 close_file(f)
366 }
367 .await
368 .map_err(PolarsError::from),
369 Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
370 }
371 }
372 }
373
374 impl Deref for AsyncWriteable {
375 type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
376
377 fn deref(&self) -> &Self::Target {
378 match self {
379 Self::Dyn(v) => v,
380 Self::Local(v) => v,
381 Self::Cloud(v) => v,
382 }
383 }
384 }
385
386 impl DerefMut for AsyncWriteable {
387 fn deref_mut(&mut self) -> &mut Self::Target {
388 match self {
389 Self::Dyn(v) => v,
390 Self::Local(v) => v,
391 Self::Cloud(v) => v,
392 }
393 }
394 }
395}