polars_io/
partition.rs

1//! Functionality for writing a DataFrame partitioned into multiple files.
2
3use polars_core::POOL;
4use polars_core::prelude::*;
5use polars_core::series::IsSorted;
6use polars_utils::plpath::PlPathRef;
7use rayon::prelude::*;
8
9use crate::cloud::CloudOptions;
10use crate::parquet::write::ParquetWriteOptions;
11#[cfg(feature = "ipc")]
12use crate::prelude::IpcWriterOptions;
13use crate::prelude::URL_ENCODE_CHAR_SET;
14use crate::utils::file::try_get_writeable;
15use crate::{SerWriter, WriteDataFrameToFile};
16
17impl WriteDataFrameToFile for ParquetWriteOptions {
18    fn write_df_to_file(
19        &self,
20        df: &mut DataFrame,
21        addr: PlPathRef<'_>,
22        cloud_options: Option<&CloudOptions>,
23    ) -> PolarsResult<()> {
24        let f = try_get_writeable(addr, cloud_options)?;
25        self.to_writer(f).finish(df)?;
26        Ok(())
27    }
28}
29
30#[cfg(feature = "ipc")]
31impl WriteDataFrameToFile for IpcWriterOptions {
32    fn write_df_to_file(
33        &self,
34        df: &mut DataFrame,
35        addr: PlPathRef<'_>,
36        cloud_options: Option<&CloudOptions>,
37    ) -> PolarsResult<()> {
38        let f = try_get_writeable(addr, cloud_options)?;
39        self.to_writer(f).finish(df)?;
40        Ok(())
41    }
42}
43
44/// Write a partitioned parquet dataset. This functionality is unstable.
45pub fn write_partitioned_dataset(
46    df: &mut DataFrame,
47    addr: PlPathRef<'_>,
48    partition_by: Vec<PlSmallStr>,
49    file_write_options: &(dyn WriteDataFrameToFile + Send + Sync),
50    cloud_options: Option<&CloudOptions>,
51    chunk_size: usize,
52) -> PolarsResult<()> {
53    // Ensure we have a single chunk as the gather will otherwise rechunk per group.
54    df.as_single_chunk_par();
55
56    // Note: When adding support for formats other than Parquet, avoid writing the partitioned
57    // columns into the file. We write them for parquet because they are encoded efficiently with
58    // RLE and also gives us a way to get the hive schema from the parquet file for free.
59    let get_hive_path_part = {
60        let schema = &df.schema();
61
62        let partition_by_col_idx = partition_by
63            .iter()
64            .map(|x| {
65                let Some(i) = schema.index_of(x.as_str()) else {
66                    polars_bail!(col_not_found = x)
67                };
68                Ok(i)
69            })
70            .collect::<PolarsResult<Vec<_>>>()?;
71
72        move |df: &DataFrame| {
73            let cols = df.get_columns();
74
75            partition_by_col_idx
76                .iter()
77                .map(|&i| {
78                    let s = &cols[i].slice(0, 1).cast(&DataType::String).unwrap();
79
80                    format!(
81                        "{}={}",
82                        s.name(),
83                        percent_encoding::percent_encode(
84                            s.str()
85                                .unwrap()
86                                .get(0)
87                                .unwrap_or("__HIVE_DEFAULT_PARTITION__")
88                                .as_bytes(),
89                            URL_ENCODE_CHAR_SET
90                        )
91                    )
92                })
93                .collect::<Vec<_>>()
94                .join("/")
95        }
96    };
97
98    let base_path = addr;
99    let groups = df.group_by(partition_by)?.take_groups();
100
101    let init_part_base_dir = |part_df: &DataFrame| {
102        let path_part = get_hive_path_part(part_df);
103        let dir = base_path.join(path_part);
104
105        if let Some(dir) = dir.as_ref().as_local_path() {
106            std::fs::create_dir_all(dir)?;
107        }
108
109        PolarsResult::Ok(dir)
110    };
111
112    fn get_path_for_index(i: usize) -> String {
113        // Use a fixed-width file name so that it sorts properly.
114        format!("{i:08x}.parquet")
115    }
116
117    let get_n_files_and_rows_per_file = |part_df: &DataFrame| {
118        let n_files = (part_df.estimated_size() / chunk_size).clamp(1, 0xffff_ffff);
119        let rows_per_file = (df.height() / n_files).saturating_add(1);
120        (n_files, rows_per_file)
121    };
122
123    let write_part = |mut df: DataFrame, addr: PlPathRef| {
124        file_write_options.write_df_to_file(&mut df, addr, cloud_options)?;
125        PolarsResult::Ok(())
126    };
127
128    // This is sqrt(N) of the actual limit - we chunk the input both at the groups
129    // proxy level and within every group.
130    const MAX_OPEN_FILES: usize = 8;
131
132    let finish_part_df = |df: DataFrame| {
133        let dir_path = init_part_base_dir(&df)?;
134        let (n_files, rows_per_file) = get_n_files_and_rows_per_file(&df);
135
136        if n_files == 1 {
137            write_part(df, dir_path.as_ref().join(get_path_for_index(0)).as_ref())
138        } else {
139            (0..df.height())
140                .step_by(rows_per_file)
141                .enumerate()
142                .collect::<Vec<_>>()
143                .chunks(MAX_OPEN_FILES)
144                .map(|chunk| {
145                    chunk
146                        .into_par_iter()
147                        .map(|&(idx, slice_start)| {
148                            let df = df.slice(slice_start as i64, rows_per_file);
149                            write_part(df, dir_path.as_ref().join(get_path_for_index(idx)).as_ref())
150                        })
151                        .reduce(
152                            || PolarsResult::Ok(()),
153                            |a, b| if a.is_err() { a } else { b },
154                        )
155                })
156                .collect::<PolarsResult<Vec<()>>>()?;
157            Ok(())
158        }
159    };
160
161    POOL.install(|| match groups.as_ref() {
162        GroupsType::Idx(idx) => idx
163            .all()
164            .chunks(MAX_OPEN_FILES)
165            .map(|chunk| {
166                chunk
167                    .par_iter()
168                    .map(|group| {
169                        let df = unsafe {
170                            df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending)
171                        };
172                        finish_part_df(df)
173                    })
174                    .reduce(
175                        || PolarsResult::Ok(()),
176                        |a, b| if a.is_err() { a } else { b },
177                    )
178            })
179            .collect::<PolarsResult<Vec<()>>>(),
180        GroupsType::Slice { groups, .. } => groups
181            .chunks(MAX_OPEN_FILES)
182            .map(|chunk| {
183                chunk
184                    .into_par_iter()
185                    .map(|&[offset, len]| {
186                        let df = df.slice(offset as i64, len as usize);
187                        finish_part_df(df)
188                    })
189                    .reduce(
190                        || PolarsResult::Ok(()),
191                        |a, b| if a.is_err() { a } else { b },
192                    )
193            })
194            .collect::<PolarsResult<Vec<()>>>(),
195    })?;
196
197    Ok(())
198}