1use std::io;
2#[cfg(feature = "cloud")]
3use std::num::NonZeroUsize;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7#[cfg(feature = "cloud")]
8pub use async_writeable::{AsyncDynWriteable, AsyncWriteable};
9use polars_error::{PolarsResult, feature_gated, polars_err};
10use polars_utils::create_file;
11use polars_utils::file::close_file;
12use polars_utils::mmap::ensure_not_mapped;
13use polars_utils::pl_path::{PlRefPath, format_file_uri};
14
15use super::sync_on_close::SyncOnCloseType;
16use crate::cloud::CloudOptions;
17use crate::metrics::IOMetrics;
18use crate::resolve_homedir;
19
20pub trait WriteableTrait: std::io::Write {
22 fn close(&mut self) -> std::io::Result<()>;
23 fn sync_all(&self) -> std::io::Result<()>;
24 fn sync_data(&self) -> std::io::Result<()>;
25}
26
27#[allow(clippy::large_enum_variant)] pub enum Writeable {
34 Dyn(Box<dyn WriteableTrait + Send>),
38 Local(std::fs::File),
39 #[cfg(feature = "cloud")]
40 Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
41}
42
43impl Writeable {
44 pub fn try_new(
45 path: PlRefPath,
46 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_options: Option<&CloudOptions>,
47 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_chunk_size: usize,
48 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_concurrency: usize,
49 io_metrics: Option<Arc<IOMetrics>>,
50 ) -> PolarsResult<Self> {
51 Ok(if path.has_scheme() {
52 feature_gated!("cloud", {
53 use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
54 use crate::pl_async::get_runtime;
55
56 let writer = get_runtime().block_in_place_on(new_cloud_writer(
57 path,
58 cloud_options,
59 cloud_upload_chunk_size,
60 cloud_upload_concurrency.try_into().unwrap(),
61 io_metrics,
62 ))?;
63
64 Self::Cloud(CloudWriterIoTraitWrap::from(writer))
65 })
66 } else if polars_config::config().force_async() {
67 feature_gated!("cloud", {
68 let path = resolve_homedir(path.as_std_path());
69 create_file(&path)?;
70 let path = std::fs::canonicalize(&path)?;
71
72 ensure_not_mapped(&path.metadata()?)?;
73
74 let path = path.to_str().ok_or_else(|| polars_err!(non_utf8_path))?;
75 let path = format_file_uri(path);
76
77 use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
78 use crate::pl_async::get_runtime;
79
80 let writer = get_runtime().block_in_place_on(new_cloud_writer(
81 path,
82 cloud_options,
83 cloud_upload_chunk_size,
84 cloud_upload_concurrency.try_into().unwrap(),
85 io_metrics,
86 ))?;
87
88 Self::Cloud(CloudWriterIoTraitWrap::from(writer))
89 })
90 } else {
91 let path = resolve_homedir(path.as_std_path());
92 create_file(&path)?;
93
94 Self::Local(polars_utils::open_file_write(&path)?)
95 })
96 }
97
98 #[cfg(feature = "cloud")]
101 pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
102 use self::async_writeable::AsyncDynWriteable;
103
104 match self {
105 Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
106 Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
107 Self::Cloud(v) => Ok(AsyncWriteable::Cloud(v)),
108 }
109 }
110
111 pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
112 match self {
113 Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
114 Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
115 #[cfg(feature = "cloud")]
116 Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
117 }
118 }
119
120 pub fn sync_all(&self) -> io::Result<()> {
121 match self {
122 Self::Dyn(v) => v.sync_all(),
123 Self::Local(v) => v.sync_all(),
124 #[cfg(feature = "cloud")]
125 Self::Cloud(v) => v.sync_all(),
126 }
127 }
128
129 pub fn sync_data(&self) -> io::Result<()> {
130 match self {
131 Self::Dyn(v) => v.sync_data(),
132 Self::Local(v) => v.sync_data(),
133 #[cfg(feature = "cloud")]
134 Self::Cloud(v) => v.sync_data(),
135 }
136 }
137
138 pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
139 match sync {
140 SyncOnCloseType::All => self.sync_all()?,
141 SyncOnCloseType::Data => self.sync_data()?,
142 SyncOnCloseType::None => {},
143 }
144
145 match self {
146 Self::Dyn(mut v) => v.close(),
147 Self::Local(v) => close_file(v),
148 #[cfg(feature = "cloud")]
149 Self::Cloud(mut v) => v.close(),
150 }
151 }
152}
153
154impl io::Write for Writeable {
155 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
156 match self {
157 Self::Dyn(v) => v.write(buf),
158 Self::Local(v) => v.write(buf),
159 #[cfg(feature = "cloud")]
160 Self::Cloud(v) => v.write(buf),
161 }
162 }
163
164 fn flush(&mut self) -> io::Result<()> {
165 self.sync_all()
166 }
167}
168
169impl Deref for Writeable {
170 type Target = dyn io::Write + Send;
171
172 fn deref(&self) -> &Self::Target {
173 match self {
174 Self::Dyn(v) => v,
175 Self::Local(v) => v,
176 #[cfg(feature = "cloud")]
177 Self::Cloud(v) => v,
178 }
179 }
180}
181
182impl DerefMut for Writeable {
183 fn deref_mut(&mut self) -> &mut Self::Target {
184 match self {
185 Self::Dyn(v) => v,
186 Self::Local(v) => v,
187 #[cfg(feature = "cloud")]
188 Self::Cloud(v) => v,
189 }
190 }
191}
192
193pub enum BufferedWriteable<'a> {
195 BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
196 Direct(&'a mut (dyn std::io::Write + Send)),
197}
198
199impl<'a> Deref for BufferedWriteable<'a> {
200 type Target = dyn io::Write + Send + 'a;
201
202 fn deref(&self) -> &Self::Target {
203 match self {
204 Self::BufWriter(v) => v as _,
205 Self::Direct(v) => v,
206 }
207 }
208}
209
210impl DerefMut for BufferedWriteable<'_> {
211 fn deref_mut(&mut self) -> &mut Self::Target {
212 match self {
213 Self::BufWriter(v) => v as _,
214 Self::Direct(v) => v,
215 }
216 }
217}
218
219#[cfg(feature = "cloud")]
220async fn new_cloud_writer(
221 path: PlRefPath,
222 cloud_options: Option<&CloudOptions>,
223 cloud_upload_chunk_size: usize,
224 cloud_upload_concurrency: NonZeroUsize,
225 io_metrics: Option<Arc<IOMetrics>>,
226) -> PolarsResult<crate::cloud::cloud_writer::CloudWriter> {
227 use crate::cloud::cloud_writer::CloudWriter;
228 use crate::cloud::object_path_from_str;
229
230 let (cloud_location, object_store) =
231 crate::cloud::build_object_store(path, cloud_options, false).await?;
232
233 let mut writer = CloudWriter::new(
234 object_store,
235 object_path_from_str(&cloud_location.prefix)?,
236 cloud_upload_chunk_size,
237 cloud_upload_concurrency,
238 io_metrics,
239 );
240
241 writer.start().await?;
242
243 Ok(writer)
244}
245
246#[cfg(feature = "cloud")]
247mod async_writeable {
248 use std::io;
249 use std::ops::{Deref, DerefMut};
250 use std::pin::Pin;
251 use std::sync::Arc;
252 use std::task::{Context, Poll};
253
254 use bytes::Bytes;
255 use polars_error::{PolarsError, PolarsResult};
256 use polars_utils::file::close_file;
257 use polars_utils::pl_path::PlRefPath;
258 use tokio::io::AsyncWriteExt;
259 use tokio::task;
260
261 use super::{Writeable, WriteableTrait};
262 use crate::cloud::CloudOptions;
263 use crate::metrics::IOMetrics;
264 use crate::utils::sync_on_close::SyncOnCloseType;
265
266 pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
268
269 impl tokio::io::AsyncWrite for AsyncDynWriteable {
270 fn poll_write(
271 self: Pin<&mut Self>,
272 _cx: &mut Context<'_>,
273 buf: &[u8],
274 ) -> Poll<io::Result<usize>> {
275 let result = task::block_in_place(|| self.get_mut().0.write(buf));
276 Poll::Ready(result)
277 }
278
279 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
280 let result = task::block_in_place(|| self.get_mut().0.flush());
281 Poll::Ready(result)
282 }
283
284 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
285 self.poll_flush(cx)
286 }
287 }
288
289 pub enum AsyncWriteable {
296 Dyn(AsyncDynWriteable),
297 Local(tokio::fs::File),
298 Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
299 }
300
301 impl AsyncWriteable {
302 pub async fn try_new(
303 path: PlRefPath,
304 cloud_options: Option<&CloudOptions>,
305 cloud_upload_chunk_size: usize,
306 cloud_upload_concurrency: usize,
307 io_metrics: Option<Arc<IOMetrics>>,
308 ) -> PolarsResult<Self> {
309 Writeable::try_new(
311 path,
312 cloud_options,
313 cloud_upload_chunk_size,
314 cloud_upload_concurrency,
315 io_metrics,
316 )
317 .and_then(|x| x.try_into_async_writeable())
318 }
319
320 pub async fn write_all_owned<T>(&mut self, src: &mut T) -> io::Result<()>
323 where
324 T: AsRef<[u8]> + Default + Drop, Bytes: From<T>,
326 {
327 match self {
328 Self::Cloud(v) => v.write_all_owned(Bytes::from(std::mem::take(src))).await,
329 Self::Dyn(_) | Self::Local(_) => self.write_all(src.as_ref()).await,
330 }
331 }
332
333 pub async fn sync_all(&mut self) -> io::Result<()> {
334 match self {
335 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
336 Self::Local(v) => v.sync_all().await,
337 Self::Cloud(_) => Ok(()),
338 }
339 }
340
341 pub async fn sync_data(&mut self) -> io::Result<()> {
342 match self {
343 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
344 Self::Local(v) => v.sync_data().await,
345 Self::Cloud(_) => Ok(()),
346 }
347 }
348
349 pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
350 match sync {
351 SyncOnCloseType::All => self.sync_all().await?,
352 SyncOnCloseType::Data => self.sync_data().await?,
353 SyncOnCloseType::None => {},
354 }
355
356 match self {
357 Self::Dyn(mut v) => {
358 v.shutdown().await.map_err(PolarsError::from)?;
359 Ok(task::block_in_place(|| v.0.close())?)
360 },
361 Self::Local(v) => async {
362 let f = v.into_std().await;
363 close_file(f)
364 }
365 .await
366 .map_err(PolarsError::from),
367 Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
368 }
369 }
370 }
371
372 impl Deref for AsyncWriteable {
373 type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
374
375 fn deref(&self) -> &Self::Target {
376 match self {
377 Self::Dyn(v) => v,
378 Self::Local(v) => v,
379 Self::Cloud(v) => v,
380 }
381 }
382 }
383
384 impl DerefMut for AsyncWriteable {
385 fn deref_mut(&mut self) -> &mut Self::Target {
386 match self {
387 Self::Dyn(v) => v,
388 Self::Local(v) => v,
389 Self::Cloud(v) => v,
390 }
391 }
392 }
393}