1use std::io;
2#[cfg(feature = "cloud")]
3use std::num::NonZeroUsize;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7#[cfg(feature = "cloud")]
8pub use async_writeable::{AsyncDynWriteable, AsyncWriteable};
9use polars_error::{PolarsResult, feature_gated, polars_err};
10use polars_utils::create_file;
11use polars_utils::file::close_file;
12use polars_utils::mmap::ensure_not_mapped;
13use polars_utils::pl_path::{PlRefPath, format_file_uri};
14
15use super::sync_on_close::SyncOnCloseType;
16use crate::cloud::CloudOptions;
17use crate::metrics::IOMetrics;
18use crate::resolve_homedir;
19
20pub trait WriteableTrait: std::io::Write {
22 fn close(&mut self) -> std::io::Result<()>;
23 fn sync_all(&self) -> std::io::Result<()>;
24 fn sync_data(&self) -> std::io::Result<()>;
25}
26
27#[allow(clippy::large_enum_variant)] pub enum Writeable {
34 Dyn(Box<dyn WriteableTrait + Send>),
38 Local(std::fs::File),
39 #[cfg(feature = "cloud")]
40 Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
41}
42
43impl Writeable {
44 pub fn try_new(
45 path: PlRefPath,
46 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_options: Option<&CloudOptions>,
47 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_chunk_size: usize,
48 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_concurrency: usize,
49 io_metrics: Option<Arc<IOMetrics>>,
50 ) -> PolarsResult<Self> {
51 Ok(if path.has_scheme() {
52 feature_gated!("cloud", {
53 use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
54 use crate::pl_async::get_runtime;
55
56 let writer = get_runtime().block_in_place_on(new_cloud_writer(
57 path,
58 cloud_options,
59 cloud_upload_chunk_size,
60 cloud_upload_concurrency.try_into().unwrap(),
61 io_metrics,
62 ))?;
63
64 Self::Cloud(CloudWriterIoTraitWrap::from(writer))
65 })
66 } else if polars_config::config().force_async() {
67 feature_gated!("cloud", {
68 let path = resolve_homedir(path.as_std_path());
69 create_file(&path)?;
70 let path = std::fs::canonicalize(&path)?;
71
72 ensure_not_mapped(&path.metadata()?)?;
73
74 let path = path.to_str().ok_or_else(|| polars_err!(non_utf8_path))?;
75 let path = format_file_uri(path);
76
77 use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
78 use crate::pl_async::get_runtime;
79
80 let writer = get_runtime().block_in_place_on(new_cloud_writer(
81 path,
82 cloud_options,
83 cloud_upload_chunk_size,
84 cloud_upload_concurrency.try_into().unwrap(),
85 io_metrics,
86 ))?;
87
88 Self::Cloud(CloudWriterIoTraitWrap::from(writer))
89 })
90 } else {
91 let path = resolve_homedir(path.as_std_path());
92 create_file(&path)?;
93
94 Self::Local(polars_utils::open_file_write(&path)?)
95 })
96 }
97
98 #[cfg(feature = "cloud")]
101 pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
102 use self::async_writeable::AsyncDynWriteable;
103
104 match self {
105 Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
106 Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
107 Self::Cloud(v) => Ok(AsyncWriteable::Cloud(v)),
108 }
109 }
110
111 pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
112 match self {
113 Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
114 Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
115 #[cfg(feature = "cloud")]
116 Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
117 }
118 }
119
120 pub fn sync_all(&self) -> io::Result<()> {
121 match self {
122 Self::Dyn(v) => v.sync_all(),
123 Self::Local(v) => v.sync_all(),
124 #[cfg(feature = "cloud")]
125 Self::Cloud(v) => v.sync_all(),
126 }
127 }
128
129 pub fn sync_data(&self) -> io::Result<()> {
130 match self {
131 Self::Dyn(v) => v.sync_data(),
132 Self::Local(v) => v.sync_data(),
133 #[cfg(feature = "cloud")]
134 Self::Cloud(v) => v.sync_data(),
135 }
136 }
137
138 pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
139 match sync {
140 SyncOnCloseType::All => self.sync_all()?,
141 SyncOnCloseType::Data => self.sync_data()?,
142 SyncOnCloseType::None => {},
143 }
144
145 match self {
146 Self::Dyn(mut v) => v.close(),
147 Self::Local(v) => close_file(v),
148 #[cfg(feature = "cloud")]
149 Self::Cloud(mut v) => v.close(),
150 }
151 }
152}
153
154impl io::Write for Writeable {
155 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
156 match self {
157 Self::Dyn(v) => v.write(buf),
158 Self::Local(v) => v.write(buf),
159 #[cfg(feature = "cloud")]
160 Self::Cloud(v) => v.write(buf),
161 }
162 }
163
164 fn flush(&mut self) -> io::Result<()> {
165 self.sync_all()
166 }
167}
168
169impl Deref for Writeable {
170 type Target = dyn io::Write + Send;
171
172 fn deref(&self) -> &Self::Target {
173 match self {
174 Self::Dyn(v) => v,
175 Self::Local(v) => v,
176 #[cfg(feature = "cloud")]
177 Self::Cloud(v) => v,
178 }
179 }
180}
181
182impl DerefMut for Writeable {
183 fn deref_mut(&mut self) -> &mut Self::Target {
184 match self {
185 Self::Dyn(v) => v,
186 Self::Local(v) => v,
187 #[cfg(feature = "cloud")]
188 Self::Cloud(v) => v,
189 }
190 }
191}
192
193pub enum BufferedWriteable<'a> {
195 BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
196 Direct(&'a mut (dyn std::io::Write + Send)),
197}
198
199impl<'a> Deref for BufferedWriteable<'a> {
200 type Target = dyn io::Write + Send + 'a;
201
202 fn deref(&self) -> &Self::Target {
203 match self {
204 Self::BufWriter(v) => v as _,
205 Self::Direct(v) => v,
206 }
207 }
208}
209
210impl DerefMut for BufferedWriteable<'_> {
211 fn deref_mut(&mut self) -> &mut Self::Target {
212 match self {
213 Self::BufWriter(v) => v as _,
214 Self::Direct(v) => v,
215 }
216 }
217}
218
219#[cfg(feature = "cloud")]
220async fn new_cloud_writer(
221 path: PlRefPath,
222 cloud_options: Option<&CloudOptions>,
223 cloud_upload_chunk_size: usize,
224 cloud_upload_concurrency: NonZeroUsize,
225 io_metrics: Option<Arc<IOMetrics>>,
226) -> PolarsResult<crate::cloud::cloud_writer::CloudWriter> {
227 use crate::cloud::cloud_writer::CloudWriter;
228 use crate::cloud::object_path_from_str;
229
230 let (cloud_location, object_store) =
231 crate::cloud::build_object_store(path, cloud_options, false).await?;
232
233 let mut writer = CloudWriter::new(
234 object_store,
235 object_path_from_str(&cloud_location.prefix)?,
236 cloud_upload_chunk_size,
237 cloud_upload_concurrency,
238 io_metrics,
239 );
240
241 writer.start().await?;
242
243 Ok(writer)
244}
245
246#[cfg(feature = "cloud")]
247mod async_writeable {
248 use std::io;
249 use std::ops::{Deref, DerefMut};
250 use std::pin::Pin;
251 use std::sync::Arc;
252 use std::task::{Context, Poll};
253
254 use polars_error::{PolarsError, PolarsResult};
255 use polars_utils::file::close_file;
256 use polars_utils::pl_path::PlRefPath;
257 use tokio::io::AsyncWriteExt;
258 use tokio::task;
259
260 use super::{Writeable, WriteableTrait};
261 use crate::cloud::CloudOptions;
262 use crate::metrics::IOMetrics;
263 use crate::utils::sync_on_close::SyncOnCloseType;
264
265 pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
267
268 impl tokio::io::AsyncWrite for AsyncDynWriteable {
269 fn poll_write(
270 self: Pin<&mut Self>,
271 _cx: &mut Context<'_>,
272 buf: &[u8],
273 ) -> Poll<io::Result<usize>> {
274 let result = task::block_in_place(|| self.get_mut().0.write(buf));
275 Poll::Ready(result)
276 }
277
278 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
279 let result = task::block_in_place(|| self.get_mut().0.flush());
280 Poll::Ready(result)
281 }
282
283 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
284 self.poll_flush(cx)
285 }
286 }
287
288 pub enum AsyncWriteable {
295 Dyn(AsyncDynWriteable),
296 Local(tokio::fs::File),
297 Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
298 }
299
300 impl AsyncWriteable {
301 pub async fn try_new(
302 path: PlRefPath,
303 cloud_options: Option<&CloudOptions>,
304 cloud_upload_chunk_size: usize,
305 cloud_upload_concurrency: usize,
306 io_metrics: Option<Arc<IOMetrics>>,
307 ) -> PolarsResult<Self> {
308 Writeable::try_new(
310 path,
311 cloud_options,
312 cloud_upload_chunk_size,
313 cloud_upload_concurrency,
314 io_metrics,
315 )
316 .and_then(|x| x.try_into_async_writeable())
317 }
318
319 pub async fn sync_all(&mut self) -> io::Result<()> {
320 match self {
321 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
322 Self::Local(v) => v.sync_all().await,
323 Self::Cloud(_) => Ok(()),
324 }
325 }
326
327 pub async fn sync_data(&mut self) -> io::Result<()> {
328 match self {
329 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
330 Self::Local(v) => v.sync_data().await,
331 Self::Cloud(_) => Ok(()),
332 }
333 }
334
335 pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
336 match sync {
337 SyncOnCloseType::All => self.sync_all().await?,
338 SyncOnCloseType::Data => self.sync_data().await?,
339 SyncOnCloseType::None => {},
340 }
341
342 match self {
343 Self::Dyn(mut v) => {
344 v.shutdown().await.map_err(PolarsError::from)?;
345 Ok(task::block_in_place(|| v.0.close())?)
346 },
347 Self::Local(v) => async {
348 let f = v.into_std().await;
349 close_file(f)
350 }
351 .await
352 .map_err(PolarsError::from),
353 Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
354 }
355 }
356 }
357
358 impl Deref for AsyncWriteable {
359 type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
360
361 fn deref(&self) -> &Self::Target {
362 match self {
363 Self::Dyn(v) => v,
364 Self::Local(v) => v,
365 Self::Cloud(v) => v,
366 }
367 }
368 }
369
370 impl DerefMut for AsyncWriteable {
371 fn deref_mut(&mut self) -> &mut Self::Target {
372 match self {
373 Self::Dyn(v) => v,
374 Self::Local(v) => v,
375 Self::Cloud(v) => v,
376 }
377 }
378 }
379}