1use std::io;
2use std::ops::{Deref, DerefMut};
3use std::path::Path;
4
5#[cfg(feature = "cloud")]
6pub use async_writeable::AsyncWriteable;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, feature_gated};
9use polars_utils::create_file;
10use polars_utils::file::{ClosableFile, WriteClose};
11use polars_utils::mmap::ensure_not_mapped;
12use polars_utils::plpath::{CloudScheme, PlPathRef};
13
14use super::sync_on_close::SyncOnCloseType;
15use crate::cloud::CloudOptions;
16use crate::resolve_homedir;
17
18pub trait DynWriteable: io::Write + Send {
19 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static);
21 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static);
22
23 fn close(self: Box<Self>) -> io::Result<()>;
24 fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()>;
25}
26
27impl DynWriteable for ClosableFile {
28 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
29 self as _
30 }
31 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
32 self as _
33 }
34 fn close(self: Box<Self>) -> io::Result<()> {
35 ClosableFile::close(*self)
36 }
37 fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()> {
38 super::sync_on_close::sync_on_close(sync_on_close, self.as_mut())
39 }
40}
41
42#[allow(clippy::large_enum_variant)] pub enum Writeable {
49 Dyn(Box<dyn DynWriteable>),
53 Local(std::fs::File),
54 #[cfg(feature = "cloud")]
55 Cloud(crate::cloud::BlockingCloudWriter),
56}
57
58impl Writeable {
59 pub fn try_new(
60 addr: PlPathRef,
61 #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
62 ) -> PolarsResult<Self> {
63 let verbose = config::verbose();
64
65 match addr {
66 PlPathRef::Cloud(p) => {
67 feature_gated!("cloud", {
68 use crate::cloud::BlockingCloudWriter;
69
70 if verbose {
71 eprintln!("Writeable: try_new: cloud: {p}")
72 }
73
74 if p.scheme() == CloudScheme::File {
75 create_file(Path::new(p.strip_scheme()))?;
76 }
77
78 let writer = crate::pl_async::get_runtime().block_in_place_on(
79 BlockingCloudWriter::new(&p.to_string(), cloud_options),
80 )?;
81 Ok(Self::Cloud(writer))
82 })
83 },
84 PlPathRef::Local(path) if config::force_async() => {
85 feature_gated!("cloud", {
86 use crate::cloud::BlockingCloudWriter;
87
88 let path = resolve_homedir(&path);
89
90 if verbose {
91 eprintln!("Writeable: try_new: forced async: {}", path.display())
92 }
93
94 create_file(&path)?;
95 let path = std::fs::canonicalize(&path)?;
96
97 ensure_not_mapped(&path.metadata()?)?;
98
99 let path = format!(
100 "file://{}",
101 if cfg!(target_family = "windows") {
102 path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
103 } else {
104 path.to_str().unwrap()
105 }
106 );
107
108 if verbose {
109 eprintln!("Writeable: try_new: forced async converted path: {path}")
110 }
111
112 let writer = crate::pl_async::get_runtime()
113 .block_in_place_on(BlockingCloudWriter::new(&path, cloud_options))?;
114 Ok(Self::Cloud(writer))
115 })
116 },
117 PlPathRef::Local(path) => {
118 let path = resolve_homedir(&path);
119 create_file(&path)?;
120
121 if verbose {
124 eprintln!(
125 "Writeable: try_new: local: {} (canonicalize: {:?})",
126 path.display(),
127 std::fs::canonicalize(&path)
128 )
129 }
130
131 Ok(Self::Local(polars_utils::open_file_write(&path)?))
132 },
133 }
134 }
135
136 #[cfg(feature = "cloud")]
139 pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
140 use self::async_writeable::AsyncDynWriteable;
141
142 match self {
143 Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
144 Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
145 Self::Cloud(v) => v
148 .try_into_inner()
149 .map(AsyncWriteable::Cloud)
150 .map_err(PolarsError::from),
151 }
152 }
153
154 pub fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> std::io::Result<()> {
155 match self {
156 Writeable::Dyn(d) => d.sync_on_close(sync_on_close),
157 Writeable::Local(file) => {
158 crate::utils::sync_on_close::sync_on_close(sync_on_close, file)
159 },
160 #[cfg(feature = "cloud")]
161 Writeable::Cloud(_) => Ok(()),
162 }
163 }
164
165 pub fn close(self) -> std::io::Result<()> {
166 match self {
167 Self::Dyn(v) => v.close(),
168 Self::Local(v) => ClosableFile::from(v).close(),
169 #[cfg(feature = "cloud")]
170 Self::Cloud(mut v) => v.close(),
171 }
172 }
173}
174
175impl Deref for Writeable {
176 type Target = dyn io::Write + Send;
177
178 fn deref(&self) -> &Self::Target {
179 match self {
180 Self::Dyn(v) => v.as_dyn_write(),
181 Self::Local(v) => v,
182 #[cfg(feature = "cloud")]
183 Self::Cloud(v) => v,
184 }
185 }
186}
187
188impl DerefMut for Writeable {
189 fn deref_mut(&mut self) -> &mut Self::Target {
190 match self {
191 Self::Dyn(v) => v.as_mut_dyn_write(),
192 Self::Local(v) => v,
193 #[cfg(feature = "cloud")]
194 Self::Cloud(v) => v,
195 }
196 }
197}
198
199pub fn try_get_writeable(
203 addr: PlPathRef<'_>,
204 cloud_options: Option<&CloudOptions>,
205) -> PolarsResult<Box<dyn WriteClose + Send>> {
206 Writeable::try_new(addr, cloud_options).map(|x| match x {
207 Writeable::Dyn(_) => unreachable!(),
208 Writeable::Local(v) => Box::new(ClosableFile::from(v)) as Box<dyn WriteClose + Send>,
209 #[cfg(feature = "cloud")]
210 Writeable::Cloud(v) => Box::new(v) as Box<dyn WriteClose + Send>,
211 })
212}
213
214#[cfg(feature = "cloud")]
215mod async_writeable {
216 use std::io;
217 use std::ops::{Deref, DerefMut};
218 use std::pin::Pin;
219 use std::task::{Context, Poll};
220
221 use polars_error::{PolarsError, PolarsResult};
222 use polars_utils::file::ClosableFile;
223 use polars_utils::plpath::PlPathRef;
224 use tokio::io::AsyncWriteExt;
225 use tokio::task;
226
227 use super::{DynWriteable, Writeable};
228 use crate::cloud::CloudOptions;
229 use crate::utils::sync_on_close::SyncOnCloseType;
230
231 pub struct AsyncDynWriteable(pub Box<dyn DynWriteable>);
233
234 impl tokio::io::AsyncWrite for AsyncDynWriteable {
235 fn poll_write(
236 self: Pin<&mut Self>,
237 _cx: &mut Context<'_>,
238 buf: &[u8],
239 ) -> Poll<io::Result<usize>> {
240 let result = task::block_in_place(|| self.get_mut().0.write(buf));
241 Poll::Ready(result)
242 }
243
244 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
245 let result = task::block_in_place(|| self.get_mut().0.flush());
246 Poll::Ready(result)
247 }
248
249 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
250 self.poll_flush(cx)
251 }
252 }
253
254 pub enum AsyncWriteable {
261 Dyn(AsyncDynWriteable),
262 Local(tokio::fs::File),
263 Cloud(object_store::buffered::BufWriter),
264 }
265
266 impl AsyncWriteable {
267 pub async fn try_new(
268 addr: PlPathRef<'_>,
269 cloud_options: Option<&CloudOptions>,
270 ) -> PolarsResult<Self> {
271 Writeable::try_new(addr, cloud_options).and_then(|x| x.try_into_async_writeable())
273 }
274
275 pub async fn sync_on_close(
276 &mut self,
277 sync_on_close: SyncOnCloseType,
278 ) -> std::io::Result<()> {
279 match self {
280 Self::Dyn(d) => task::block_in_place(|| d.0.sync_on_close(sync_on_close)),
281 Self::Local(file) => {
282 crate::utils::sync_on_close::tokio_sync_on_close(sync_on_close, file).await
283 },
284 Self::Cloud(_) => Ok(()),
285 }
286 }
287
288 pub async fn close(self) -> PolarsResult<()> {
289 match self {
290 Self::Dyn(mut v) => {
291 v.shutdown().await.map_err(PolarsError::from)?;
292 Ok(task::block_in_place(|| v.0.close())?)
293 },
294 Self::Local(v) => async {
295 let f = v.into_std().await;
296 ClosableFile::from(f).close()
297 }
298 .await
299 .map_err(PolarsError::from),
300 Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
301 }
302 }
303 }
304
305 impl Deref for AsyncWriteable {
306 type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
307
308 fn deref(&self) -> &Self::Target {
309 match self {
310 Self::Dyn(v) => v,
311 Self::Local(v) => v,
312 Self::Cloud(v) => v,
313 }
314 }
315 }
316
317 impl DerefMut for AsyncWriteable {
318 fn deref_mut(&mut self) -> &mut Self::Target {
319 match self {
320 Self::Dyn(v) => v,
321 Self::Local(v) => v,
322 Self::Cloud(v) => v,
323 }
324 }
325 }
326}