use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::POOL;
use rayon::prelude::*;
use crate::utils::resolve_homedir;
use crate::WriterFactory;
pub struct PartitionedWriter<F> {
option: F,
rootdir: PathBuf,
by: Vec<String>,
parallel: bool,
}
impl<F> PartitionedWriter<F>
where
F: WriterFactory + Send + Sync,
{
pub fn new<P, I, S>(option: F, rootdir: P, by: I) -> Self
where
P: Into<PathBuf>,
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
Self {
option,
rootdir: rootdir.into(),
by: by.into_iter().map(|s| s.as_ref().to_string()).collect(),
parallel: true,
}
}
pub fn with_parallel(mut self, parallel: bool) -> Self {
self.parallel = parallel;
self
}
fn write_partition_df(&self, partition_df: &mut DataFrame, i: usize) -> PolarsResult<()> {
let mut path = resolve_partition_dir(&self.rootdir, &self.by, partition_df);
std::fs::create_dir_all(&path)?;
path.push(format!(
"data-{:04}.{}",
i,
self.option.extension().display()
));
let file = std::fs::File::create(path)?;
let writer = BufWriter::new(file);
self.option
.create_writer::<BufWriter<File>>(writer)
.finish(partition_df)
}
pub fn finish(self, df: &DataFrame) -> PolarsResult<()> {
let groups = df.group_by(self.by.clone())?;
let groups = groups.get_groups();
POOL.install(|| {
match groups {
GroupsProxy::Idx(idx) => {
idx.par_iter()
.enumerate()
.map(|(i, (_, group))| {
let mut part_df = unsafe {
df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
};
self.write_partition_df(&mut part_df, i)
})
.collect::<PolarsResult<Vec<_>>>()
},
GroupsProxy::Slice { groups, .. } => groups
.par_iter()
.enumerate()
.map(|(i, [first, len])| {
let mut part_df = df.slice(*first as i64, *len as usize);
self.write_partition_df(&mut part_df, i)
})
.collect::<PolarsResult<Vec<_>>>(),
}
})?;
Ok(())
}
}
fn resolve_partition_dir<I, S>(rootdir: &Path, by: I, partition_df: &DataFrame) -> PathBuf
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut path = PathBuf::new();
path.push(resolve_homedir(rootdir));
for key in by.into_iter() {
let value = partition_df[key.as_ref()].get(0).unwrap().to_string();
path.push(format!("{}={}", key.as_ref(), value))
}
path
}