1use std::io;
2use std::ops::{Deref, DerefMut};
3
4#[cfg(feature = "cloud")]
5pub use async_writeable::AsyncWriteable;
6use polars_core::config;
7use polars_error::{PolarsError, PolarsResult, feature_gated};
8use polars_utils::create_file;
9use polars_utils::file::{ClosableFile, WriteClose};
10use polars_utils::mmap::ensure_not_mapped;
11use polars_utils::plpath::PlPathRef;
12
13use super::sync_on_close::SyncOnCloseType;
14use crate::cloud::CloudOptions;
15use crate::resolve_homedir;
16
17pub trait DynWriteable: io::Write + Send {
18 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static);
20 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static);
21
22 fn close(self: Box<Self>) -> io::Result<()>;
23 fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()>;
24}
25
26impl DynWriteable for ClosableFile {
27 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
28 self as _
29 }
30 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
31 self as _
32 }
33 fn close(self: Box<Self>) -> io::Result<()> {
34 ClosableFile::close(*self)
35 }
36 fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()> {
37 super::sync_on_close::sync_on_close(sync_on_close, self.as_mut())
38 }
39}
40
41#[allow(clippy::large_enum_variant)] pub enum Writeable {
48 Dyn(Box<dyn DynWriteable>),
52 Local(std::fs::File),
53 #[cfg(feature = "cloud")]
54 Cloud(crate::cloud::BlockingCloudWriter),
55}
56
57impl Writeable {
58 pub fn try_new(
59 path: PlPathRef<'_>,
60 #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
61 ) -> PolarsResult<Self> {
62 let verbose = config::verbose();
63
64 match path {
65 PlPathRef::Cloud(_) => {
66 feature_gated!("cloud", {
67 use crate::cloud::BlockingCloudWriter;
68
69 if verbose {
70 eprintln!("Writeable: try_new: cloud: {}", path.to_str())
71 }
72
73 let writer = crate::pl_async::get_runtime()
74 .block_in_place_on(BlockingCloudWriter::new(path, cloud_options))?;
75 Ok(Self::Cloud(writer))
76 })
77 },
78 PlPathRef::Local(path) if config::force_async() => {
79 feature_gated!("cloud", {
80 use crate::cloud::BlockingCloudWriter;
81
82 let path = resolve_homedir(&path);
83
84 if verbose {
85 eprintln!("Writeable: try_new: forced async: {}", path.display())
86 }
87
88 create_file(&path)?;
89 let path = std::fs::canonicalize(&path)?;
90
91 ensure_not_mapped(&path.metadata()?)?;
92
93 let path = format!(
94 "file://{}",
95 if cfg!(target_family = "windows") {
96 path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
97 } else {
98 path.to_str().unwrap()
99 }
100 );
101
102 if verbose {
103 eprintln!("Writeable: try_new: forced async converted path: {path}")
104 }
105
106 let writer = crate::pl_async::get_runtime().block_in_place_on(
107 BlockingCloudWriter::new(PlPathRef::new(&path), cloud_options),
108 )?;
109 Ok(Self::Cloud(writer))
110 })
111 },
112 PlPathRef::Local(path) => {
113 let path = resolve_homedir(&path);
114 create_file(&path)?;
115
116 if verbose {
119 eprintln!(
120 "Writeable: try_new: local: {} (canonicalize: {:?})",
121 path.display(),
122 std::fs::canonicalize(&path)
123 )
124 }
125
126 Ok(Self::Local(polars_utils::open_file_write(&path)?))
127 },
128 }
129 }
130
131 #[cfg(feature = "cloud")]
134 pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
135 use self::async_writeable::AsyncDynWriteable;
136
137 match self {
138 Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
139 Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
140 Self::Cloud(v) => v
143 .try_into_inner()
144 .map(AsyncWriteable::Cloud)
145 .map_err(PolarsError::from),
146 }
147 }
148
149 pub fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> std::io::Result<()> {
150 match self {
151 Writeable::Dyn(d) => d.sync_on_close(sync_on_close),
152 Writeable::Local(file) => {
153 crate::utils::sync_on_close::sync_on_close(sync_on_close, file)
154 },
155 #[cfg(feature = "cloud")]
156 Writeable::Cloud(_) => Ok(()),
157 }
158 }
159
160 pub fn close(self) -> std::io::Result<()> {
161 match self {
162 Self::Dyn(v) => v.close(),
163 Self::Local(v) => ClosableFile::from(v).close(),
164 #[cfg(feature = "cloud")]
165 Self::Cloud(mut v) => v.close(),
166 }
167 }
168}
169
170impl Deref for Writeable {
171 type Target = dyn io::Write + Send;
172
173 fn deref(&self) -> &Self::Target {
174 match self {
175 Self::Dyn(v) => v.as_dyn_write(),
176 Self::Local(v) => v,
177 #[cfg(feature = "cloud")]
178 Self::Cloud(v) => v,
179 }
180 }
181}
182
183impl DerefMut for Writeable {
184 fn deref_mut(&mut self) -> &mut Self::Target {
185 match self {
186 Self::Dyn(v) => v.as_mut_dyn_write(),
187 Self::Local(v) => v,
188 #[cfg(feature = "cloud")]
189 Self::Cloud(v) => v,
190 }
191 }
192}
193
194pub fn try_get_writeable(
198 addr: PlPathRef<'_>,
199 cloud_options: Option<&CloudOptions>,
200) -> PolarsResult<Box<dyn WriteClose + Send>> {
201 Writeable::try_new(addr, cloud_options).map(|x| match x {
202 Writeable::Dyn(_) => unreachable!(),
203 Writeable::Local(v) => Box::new(ClosableFile::from(v)) as Box<dyn WriteClose + Send>,
204 #[cfg(feature = "cloud")]
205 Writeable::Cloud(v) => Box::new(v) as Box<dyn WriteClose + Send>,
206 })
207}
208
209#[cfg(feature = "cloud")]
210mod async_writeable {
211 use std::io;
212 use std::ops::{Deref, DerefMut};
213 use std::pin::Pin;
214 use std::task::{Context, Poll};
215
216 use polars_error::{PolarsError, PolarsResult};
217 use polars_utils::file::ClosableFile;
218 use polars_utils::plpath::PlPathRef;
219 use tokio::io::AsyncWriteExt;
220 use tokio::task;
221
222 use super::{DynWriteable, Writeable};
223 use crate::cloud::CloudOptions;
224 use crate::utils::sync_on_close::SyncOnCloseType;
225
226 pub struct AsyncDynWriteable(pub Box<dyn DynWriteable>);
228
229 impl tokio::io::AsyncWrite for AsyncDynWriteable {
230 fn poll_write(
231 self: Pin<&mut Self>,
232 _cx: &mut Context<'_>,
233 buf: &[u8],
234 ) -> Poll<io::Result<usize>> {
235 let result = task::block_in_place(|| self.get_mut().0.write(buf));
236 Poll::Ready(result)
237 }
238
239 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
240 let result = task::block_in_place(|| self.get_mut().0.flush());
241 Poll::Ready(result)
242 }
243
244 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
245 self.poll_flush(cx)
246 }
247 }
248
249 pub enum AsyncWriteable {
256 Dyn(AsyncDynWriteable),
257 Local(tokio::fs::File),
258 Cloud(object_store::buffered::BufWriter),
259 }
260
261 impl AsyncWriteable {
262 pub async fn try_new(
263 addr: PlPathRef<'_>,
264 cloud_options: Option<&CloudOptions>,
265 ) -> PolarsResult<Self> {
266 Writeable::try_new(addr, cloud_options).and_then(|x| x.try_into_async_writeable())
268 }
269
270 pub async fn sync_on_close(
271 &mut self,
272 sync_on_close: SyncOnCloseType,
273 ) -> std::io::Result<()> {
274 match self {
275 Self::Dyn(d) => task::block_in_place(|| d.0.sync_on_close(sync_on_close)),
276 Self::Local(file) => {
277 crate::utils::sync_on_close::tokio_sync_on_close(sync_on_close, file).await
278 },
279 Self::Cloud(_) => Ok(()),
280 }
281 }
282
283 pub async fn close(self) -> PolarsResult<()> {
284 match self {
285 Self::Dyn(mut v) => {
286 v.shutdown().await.map_err(PolarsError::from)?;
287 Ok(task::block_in_place(|| v.0.close())?)
288 },
289 Self::Local(v) => async {
290 let f = v.into_std().await;
291 ClosableFile::from(f).close()
292 }
293 .await
294 .map_err(PolarsError::from),
295 Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
296 }
297 }
298 }
299
300 impl Deref for AsyncWriteable {
301 type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
302
303 fn deref(&self) -> &Self::Target {
304 match self {
305 Self::Dyn(v) => v,
306 Self::Local(v) => v,
307 Self::Cloud(v) => v,
308 }
309 }
310 }
311
312 impl DerefMut for AsyncWriteable {
313 fn deref_mut(&mut self) -> &mut Self::Target {
314 match self {
315 Self::Dyn(v) => v,
316 Self::Local(v) => v,
317 Self::Cloud(v) => v,
318 }
319 }
320 }
321}