1use std::io;
2use std::ops::{Deref, DerefMut};
3
4#[cfg(feature = "cloud")]
5pub use async_writeable::AsyncWriteable;
6use polars_core::config;
7use polars_error::{PolarsError, PolarsResult, feature_gated};
8use polars_utils::create_file;
9use polars_utils::file::close_file;
10use polars_utils::mmap::ensure_not_mapped;
11use polars_utils::plpath::PlPathRef;
12
13use super::sync_on_close::SyncOnCloseType;
14use crate::cloud::CloudOptions;
15use crate::resolve_homedir;
16
17pub trait WriteableTrait: std::io::Write {
18 fn close(&mut self) -> std::io::Result<()>;
19 fn sync_all(&self) -> std::io::Result<()>;
20 fn sync_data(&self) -> std::io::Result<()>;
21}
22
23#[allow(clippy::large_enum_variant)] pub enum Writeable {
30 Dyn(Box<dyn WriteableTrait + Send>),
34 Local(std::fs::File),
35 #[cfg(feature = "cloud")]
36 Cloud(crate::cloud::BlockingCloudWriter),
37}
38
39impl Writeable {
40 pub fn try_new(
41 path: PlPathRef<'_>,
42 #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
43 #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_upload_chunk_size: usize,
44 ) -> PolarsResult<Self> {
45 match path {
46 PlPathRef::Cloud(_) => {
47 feature_gated!("cloud", {
48 use crate::cloud::BlockingCloudWriter;
49
50 let writer = crate::pl_async::get_runtime().block_in_place_on(
51 BlockingCloudWriter::new(path, cloud_options, cloud_upload_chunk_size),
52 )?;
53
54 Ok(Self::Cloud(writer))
55 })
56 },
57 PlPathRef::Local(path) if config::force_async() => {
58 feature_gated!("cloud", {
59 use crate::cloud::BlockingCloudWriter;
60
61 let path = resolve_homedir(&path);
62
63 create_file(&path)?;
64 let path = std::fs::canonicalize(&path)?;
65
66 ensure_not_mapped(&path.metadata()?)?;
67
68 let path = format!(
69 "file://{}",
70 if cfg!(target_family = "windows") {
71 path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
72 } else {
73 path.to_str().unwrap()
74 }
75 );
76
77 let writer = crate::pl_async::get_runtime().block_in_place_on(
78 BlockingCloudWriter::new(
79 PlPathRef::new(&path),
80 cloud_options,
81 cloud_upload_chunk_size,
82 ),
83 )?;
84
85 Ok(Self::Cloud(writer))
86 })
87 },
88 PlPathRef::Local(path) => {
89 let path = resolve_homedir(&path);
90 create_file(&path)?;
91
92 Ok(Self::Local(polars_utils::open_file_write(&path)?))
93 },
94 }
95 }
96
97 #[cfg(feature = "cloud")]
100 pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
101 use self::async_writeable::AsyncDynWriteable;
102
103 match self {
104 Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
105 Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
106 Self::Cloud(v) => v
109 .try_into_inner()
110 .map(AsyncWriteable::Cloud)
111 .map_err(PolarsError::from),
112 }
113 }
114
115 pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
116 match self {
117 Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
118 Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
119 #[cfg(feature = "cloud")]
120 Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
121 }
122 }
123
124 pub fn sync_all(&self) -> io::Result<()> {
125 match self {
126 Self::Dyn(v) => v.sync_all(),
127 Self::Local(v) => v.sync_all(),
128 #[cfg(feature = "cloud")]
129 Self::Cloud(v) => v.sync_all(),
130 }
131 }
132
133 pub fn sync_data(&self) -> io::Result<()> {
134 match self {
135 Self::Dyn(v) => v.sync_data(),
136 Self::Local(v) => v.sync_data(),
137 #[cfg(feature = "cloud")]
138 Self::Cloud(v) => v.sync_data(),
139 }
140 }
141
142 pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
143 match sync {
144 SyncOnCloseType::All => self.sync_all()?,
145 SyncOnCloseType::Data => self.sync_data()?,
146 SyncOnCloseType::None => {},
147 }
148
149 match self {
150 Self::Dyn(mut v) => v.close(),
151 Self::Local(v) => close_file(v),
152 #[cfg(feature = "cloud")]
153 Self::Cloud(mut v) => v.close(),
154 }
155 }
156}
157
158impl Deref for Writeable {
159 type Target = dyn io::Write + Send;
160
161 fn deref(&self) -> &Self::Target {
162 match self {
163 Self::Dyn(v) => v,
164 Self::Local(v) => v,
165 #[cfg(feature = "cloud")]
166 Self::Cloud(v) => v,
167 }
168 }
169}
170
171impl DerefMut for Writeable {
172 fn deref_mut(&mut self) -> &mut Self::Target {
173 match self {
174 Self::Dyn(v) => v,
175 Self::Local(v) => v,
176 #[cfg(feature = "cloud")]
177 Self::Cloud(v) => v,
178 }
179 }
180}
181
182pub enum BufferedWriteable<'a> {
184 BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
185 Direct(&'a mut (dyn std::io::Write + Send)),
186}
187
188impl<'a> Deref for BufferedWriteable<'a> {
189 type Target = dyn io::Write + Send + 'a;
190
191 fn deref(&self) -> &Self::Target {
192 match self {
193 Self::BufWriter(v) => v as _,
194 Self::Direct(v) => v,
195 }
196 }
197}
198
199impl DerefMut for BufferedWriteable<'_> {
200 fn deref_mut(&mut self) -> &mut Self::Target {
201 match self {
202 Self::BufWriter(v) => v as _,
203 Self::Direct(v) => v,
204 }
205 }
206}
207#[cfg(feature = "cloud")]
208mod async_writeable {
209 use std::io;
210 use std::ops::{Deref, DerefMut};
211 use std::pin::Pin;
212 use std::task::{Context, Poll};
213
214 use polars_error::{PolarsError, PolarsResult};
215 use polars_utils::file::close_file;
216 use polars_utils::plpath::PlPathRef;
217 use tokio::io::AsyncWriteExt;
218 use tokio::task;
219
220 use super::{Writeable, WriteableTrait};
221 use crate::cloud::CloudOptions;
222 use crate::utils::sync_on_close::SyncOnCloseType;
223
224 pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
226
227 impl tokio::io::AsyncWrite for AsyncDynWriteable {
228 fn poll_write(
229 self: Pin<&mut Self>,
230 _cx: &mut Context<'_>,
231 buf: &[u8],
232 ) -> Poll<io::Result<usize>> {
233 let result = task::block_in_place(|| self.get_mut().0.write(buf));
234 Poll::Ready(result)
235 }
236
237 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
238 let result = task::block_in_place(|| self.get_mut().0.flush());
239 Poll::Ready(result)
240 }
241
242 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
243 self.poll_flush(cx)
244 }
245 }
246
247 pub enum AsyncWriteable {
254 Dyn(AsyncDynWriteable),
255 Local(tokio::fs::File),
256 Cloud(object_store::buffered::BufWriter),
257 }
258
259 impl AsyncWriteable {
260 pub async fn try_new(
261 path: PlPathRef<'_>,
262 cloud_options: Option<&CloudOptions>,
263 cloud_upload_chunk_size: usize,
264 ) -> PolarsResult<Self> {
265 Writeable::try_new(path, cloud_options, cloud_upload_chunk_size)
267 .and_then(|x| x.try_into_async_writeable())
268 }
269
270 pub async fn sync_all(&mut self) -> io::Result<()> {
271 match self {
272 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
273 Self::Local(v) => v.sync_all().await,
274 Self::Cloud(_) => Ok(()),
275 }
276 }
277
278 pub async fn sync_data(&mut self) -> io::Result<()> {
279 match self {
280 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
281 Self::Local(v) => v.sync_data().await,
282 Self::Cloud(_) => Ok(()),
283 }
284 }
285
286 pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
287 match sync {
288 SyncOnCloseType::All => self.sync_all().await?,
289 SyncOnCloseType::Data => self.sync_data().await?,
290 SyncOnCloseType::None => {},
291 }
292
293 match self {
294 Self::Dyn(mut v) => {
295 v.shutdown().await.map_err(PolarsError::from)?;
296 Ok(task::block_in_place(|| v.0.close())?)
297 },
298 Self::Local(v) => async {
299 let f = v.into_std().await;
300 close_file(f)
301 }
302 .await
303 .map_err(PolarsError::from),
304 Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
305 }
306 }
307 }
308
309 impl Deref for AsyncWriteable {
310 type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
311
312 fn deref(&self) -> &Self::Target {
313 match self {
314 Self::Dyn(v) => v,
315 Self::Local(v) => v,
316 Self::Cloud(v) => v,
317 }
318 }
319 }
320
321 impl DerefMut for AsyncWriteable {
322 fn deref_mut(&mut self) -> &mut Self::Target {
323 match self {
324 Self::Dyn(v) => v,
325 Self::Local(v) => v,
326 Self::Cloud(v) => v,
327 }
328 }
329 }
330}