polars_io/utils/
file.rs
1use std::io;
2use std::ops::{Deref, DerefMut};
3use std::path::Path;
4
5#[cfg(feature = "cloud")]
6pub use async_writeable::AsyncWriteable;
7use polars_core::config;
8use polars_error::{PolarsError, PolarsResult, feature_gated};
9use polars_utils::create_file;
10use polars_utils::file::{ClosableFile, WriteClose};
11use polars_utils::mmap::ensure_not_mapped;
12
13use super::sync_on_close::SyncOnCloseType;
14use crate::cloud::CloudOptions;
15use crate::{is_cloud_url, resolve_homedir};
16
17pub trait DynWriteable: io::Write + Send {
18 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static);
20 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static);
21
22 fn close(self: Box<Self>) -> io::Result<()>;
23 fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()>;
24}
25
26impl DynWriteable for ClosableFile {
27 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
28 self as _
29 }
30 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
31 self as _
32 }
33 fn close(self: Box<Self>) -> io::Result<()> {
34 ClosableFile::close(*self)
35 }
36 fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()> {
37 super::sync_on_close::sync_on_close(sync_on_close, self.as_mut())
38 }
39}
40
41#[allow(clippy::large_enum_variant)] pub enum Writeable {
48 Dyn(Box<dyn DynWriteable>),
52 Local(std::fs::File),
53 #[cfg(feature = "cloud")]
54 Cloud(crate::cloud::BlockingCloudWriter),
55}
56
57impl Writeable {
58 pub fn try_new(
59 path: &str,
60 #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
61 ) -> PolarsResult<Self> {
62 let is_cloud = is_cloud_url(path);
63 let verbose = config::verbose();
64
65 if is_cloud {
66 feature_gated!("cloud", {
67 use crate::cloud::BlockingCloudWriter;
68
69 if verbose {
70 eprintln!("Writeable: try_new: cloud: {}", path)
71 }
72
73 if path.starts_with("file://") {
74 create_file(Path::new(&path[const { "file://".len() }..]))?;
75 }
76
77 let writer = crate::pl_async::get_runtime()
78 .block_in_place_on(BlockingCloudWriter::new(path, cloud_options))?;
79 Ok(Self::Cloud(writer))
80 })
81 } else if config::force_async() {
82 feature_gated!("cloud", {
83 use crate::cloud::BlockingCloudWriter;
84
85 let path = resolve_homedir(&path);
86
87 if verbose {
88 eprintln!(
89 "Writeable: try_new: forced async: {}",
90 path.to_str().unwrap()
91 )
92 }
93
94 create_file(&path)?;
95 let path = std::fs::canonicalize(&path)?;
96
97 ensure_not_mapped(&path.metadata()?)?;
98
99 let path = format!(
100 "file://{}",
101 if cfg!(target_family = "windows") {
102 path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
103 } else {
104 path.to_str().unwrap()
105 }
106 );
107
108 if verbose {
109 eprintln!("Writeable: try_new: forced async converted path: {}", path)
110 }
111
112 let writer = crate::pl_async::get_runtime()
113 .block_in_place_on(BlockingCloudWriter::new(&path, cloud_options))?;
114 Ok(Self::Cloud(writer))
115 })
116 } else {
117 let path = resolve_homedir(&path);
118 create_file(&path)?;
119
120 if verbose {
123 eprintln!(
124 "Writeable: try_new: local: {} (canonicalize: {:?})",
125 path.to_str().unwrap(),
126 std::fs::canonicalize(&path)
127 )
128 }
129
130 Ok(Self::Local(polars_utils::open_file_write(&path)?))
131 }
132 }
133
134 #[cfg(feature = "cloud")]
137 pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
138 use self::async_writeable::AsyncDynWriteable;
139
140 match self {
141 Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
142 Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
143 Self::Cloud(v) => v
146 .try_into_inner()
147 .map(AsyncWriteable::Cloud)
148 .map_err(PolarsError::from),
149 }
150 }
151
152 pub fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> std::io::Result<()> {
153 match self {
154 Writeable::Dyn(d) => d.sync_on_close(sync_on_close),
155 Writeable::Local(file) => {
156 crate::utils::sync_on_close::sync_on_close(sync_on_close, file)
157 },
158 #[cfg(feature = "cloud")]
159 Writeable::Cloud(_) => Ok(()),
160 }
161 }
162
163 pub fn close(self) -> std::io::Result<()> {
164 match self {
165 Self::Dyn(v) => v.close(),
166 Self::Local(v) => ClosableFile::from(v).close(),
167 #[cfg(feature = "cloud")]
168 Self::Cloud(mut v) => v.close(),
169 }
170 }
171}
172
173impl Deref for Writeable {
174 type Target = dyn io::Write + Send;
175
176 fn deref(&self) -> &Self::Target {
177 match self {
178 Self::Dyn(v) => v.as_dyn_write(),
179 Self::Local(v) => v,
180 #[cfg(feature = "cloud")]
181 Self::Cloud(v) => v,
182 }
183 }
184}
185
186impl DerefMut for Writeable {
187 fn deref_mut(&mut self) -> &mut Self::Target {
188 match self {
189 Self::Dyn(v) => v.as_mut_dyn_write(),
190 Self::Local(v) => v,
191 #[cfg(feature = "cloud")]
192 Self::Cloud(v) => v,
193 }
194 }
195}
196
197pub fn try_get_writeable(
201 path: &str,
202 cloud_options: Option<&CloudOptions>,
203) -> PolarsResult<Box<dyn WriteClose + Send>> {
204 Writeable::try_new(path, cloud_options).map(|x| match x {
205 Writeable::Dyn(_) => unreachable!(),
206 Writeable::Local(v) => Box::new(ClosableFile::from(v)) as Box<dyn WriteClose + Send>,
207 #[cfg(feature = "cloud")]
208 Writeable::Cloud(v) => Box::new(v) as Box<dyn WriteClose + Send>,
209 })
210}
211
212#[cfg(feature = "cloud")]
213mod async_writeable {
214 use std::io;
215 use std::ops::{Deref, DerefMut};
216 use std::pin::Pin;
217 use std::task::{Context, Poll};
218
219 use polars_error::{PolarsError, PolarsResult};
220 use polars_utils::file::ClosableFile;
221 use tokio::io::AsyncWriteExt;
222 use tokio::task;
223
224 use super::{DynWriteable, Writeable};
225 use crate::cloud::CloudOptions;
226 use crate::utils::sync_on_close::SyncOnCloseType;
227
228 pub struct AsyncDynWriteable(pub Box<dyn DynWriteable>);
230
231 impl tokio::io::AsyncWrite for AsyncDynWriteable {
232 fn poll_write(
233 self: Pin<&mut Self>,
234 _cx: &mut Context<'_>,
235 buf: &[u8],
236 ) -> Poll<io::Result<usize>> {
237 let result = task::block_in_place(|| self.get_mut().0.write(buf));
238 Poll::Ready(result)
239 }
240
241 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
242 let result = task::block_in_place(|| self.get_mut().0.flush());
243 Poll::Ready(result)
244 }
245
246 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
247 self.poll_flush(cx)
248 }
249 }
250
251 pub enum AsyncWriteable {
258 Dyn(AsyncDynWriteable),
259 Local(tokio::fs::File),
260 Cloud(object_store::buffered::BufWriter),
261 }
262
263 impl AsyncWriteable {
264 pub async fn try_new(
265 path: &str,
266 cloud_options: Option<&CloudOptions>,
267 ) -> PolarsResult<Self> {
268 Writeable::try_new(path, cloud_options).and_then(|x| x.try_into_async_writeable())
270 }
271
272 pub async fn sync_on_close(
273 &mut self,
274 sync_on_close: SyncOnCloseType,
275 ) -> std::io::Result<()> {
276 match self {
277 Self::Dyn(d) => task::block_in_place(|| d.0.sync_on_close(sync_on_close)),
278 Self::Local(file) => {
279 crate::utils::sync_on_close::tokio_sync_on_close(sync_on_close, file).await
280 },
281 Self::Cloud(_) => Ok(()),
282 }
283 }
284
285 pub async fn close(self) -> PolarsResult<()> {
286 match self {
287 Self::Dyn(mut v) => {
288 v.shutdown().await.map_err(PolarsError::from)?;
289 Ok(task::block_in_place(|| v.0.close())?)
290 },
291 Self::Local(v) => async {
292 let f = v.into_std().await;
293 ClosableFile::from(f).close()
294 }
295 .await
296 .map_err(PolarsError::from),
297 Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
298 }
299 }
300 }
301
302 impl Deref for AsyncWriteable {
303 type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
304
305 fn deref(&self) -> &Self::Target {
306 match self {
307 Self::Dyn(v) => v,
308 Self::Local(v) => v,
309 Self::Cloud(v) => v,
310 }
311 }
312 }
313
314 impl DerefMut for AsyncWriteable {
315 fn deref_mut(&mut self) -> &mut Self::Target {
316 match self {
317 Self::Dyn(v) => v,
318 Self::Local(v) => v,
319 Self::Cloud(v) => v,
320 }
321 }
322 }
323}