1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
//! Functionality for writing a DataFrame partitioned into multiple files.

use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};

use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::POOL;
use rayon::prelude::*;

use crate::utils::resolve_homedir;
use crate::WriterFactory;

/// Write a DataFrame with disk partitioning
///
/// # Example
/// ```
/// use polars_core::prelude::*;
/// use polars_io::ipc::IpcWriterOption;
/// use polars_io::partition::PartitionedWriter;
///
/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
///     let option = IpcWriterOption::default();
///     PartitionedWriter::new(option, "./rootdir", ["a", "b"])
///         .finish(df)
/// }
/// ```

pub struct PartitionedWriter<F> {
    option: F,
    rootdir: PathBuf,
    by: Vec<String>,
    parallel: bool,
}

impl<F> PartitionedWriter<F>
where
    F: WriterFactory + Send + Sync,
{
    pub fn new<P, I, S>(option: F, rootdir: P, by: I) -> Self
    where
        P: Into<PathBuf>,
        I: IntoIterator<Item = S>,
        S: AsRef<str>,
    {
        Self {
            option,
            rootdir: rootdir.into(),
            by: by.into_iter().map(|s| s.as_ref().to_string()).collect(),
            parallel: true,
        }
    }

    /// Write the parquet file in parallel (default).
    pub fn with_parallel(mut self, parallel: bool) -> Self {
        self.parallel = parallel;
        self
    }

    fn write_partition_df(&self, partition_df: &mut DataFrame, i: usize) -> PolarsResult<()> {
        let mut path = resolve_partition_dir(&self.rootdir, &self.by, partition_df);
        std::fs::create_dir_all(&path)?;

        path.push(format!(
            "data-{:04}.{}",
            i,
            self.option.extension().display()
        ));

        let file = std::fs::File::create(path)?;
        let writer = BufWriter::new(file);

        self.option
            .create_writer::<BufWriter<File>>(writer)
            .finish(partition_df)
    }

    pub fn finish(self, df: &DataFrame) -> PolarsResult<()> {
        let groups = df.group_by(self.by.clone())?;
        let groups = groups.get_groups();

        // don't parallelize this
        // there is a lot of parallelization in take and this may easily SO
        POOL.install(|| {
            match groups {
                GroupsProxy::Idx(idx) => {
                    idx.par_iter()
                        .enumerate()
                        .map(|(i, (_, group))| {
                            // groups are in bounds
                            // and sorted
                            let mut part_df = unsafe {
                                df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
                            };
                            self.write_partition_df(&mut part_df, i)
                        })
                        .collect::<PolarsResult<Vec<_>>>()
                },
                GroupsProxy::Slice { groups, .. } => groups
                    .par_iter()
                    .enumerate()
                    .map(|(i, [first, len])| {
                        let mut part_df = df.slice(*first as i64, *len as usize);
                        self.write_partition_df(&mut part_df, i)
                    })
                    .collect::<PolarsResult<Vec<_>>>(),
            }
        })?;

        Ok(())
    }
}

/// `partition_df` must be created in the same way as `partition_by`.
fn resolve_partition_dir<I, S>(rootdir: &Path, by: I, partition_df: &DataFrame) -> PathBuf
where
    I: IntoIterator<Item = S>,
    S: AsRef<str>,
{
    let mut path = PathBuf::new();
    path.push(resolve_homedir(rootdir));

    for key in by.into_iter() {
        let value = partition_df[key.as_ref()].get(0).unwrap().to_string();
        path.push(format!("{}={}", key.as_ref(), value))
    }
    path
}