polars_io/
partition.rs

1//! Functionality for writing a DataFrame partitioned into multiple files.
2
3use std::path::Path;
4
5use polars_core::POOL;
6use polars_core::prelude::*;
7use polars_core::series::IsSorted;
8use rayon::prelude::*;
9
10use crate::cloud::CloudOptions;
11use crate::parquet::write::ParquetWriteOptions;
12#[cfg(feature = "ipc")]
13use crate::prelude::IpcWriterOptions;
14use crate::prelude::URL_ENCODE_CHAR_SET;
15use crate::utils::file::try_get_writeable;
16use crate::{SerWriter, WriteDataFrameToFile, is_cloud_url};
17
18impl WriteDataFrameToFile for ParquetWriteOptions {
19    fn write_df_to_file(
20        &self,
21        df: &mut DataFrame,
22        path: &str,
23        cloud_options: Option<&CloudOptions>,
24    ) -> PolarsResult<()> {
25        let f = try_get_writeable(path, cloud_options)?;
26        self.to_writer(f).finish(df)?;
27        Ok(())
28    }
29}
30
31#[cfg(feature = "ipc")]
32impl WriteDataFrameToFile for IpcWriterOptions {
33    fn write_df_to_file(
34        &self,
35        df: &mut DataFrame,
36        path: &str,
37        cloud_options: Option<&CloudOptions>,
38    ) -> PolarsResult<()> {
39        let f = try_get_writeable(path, cloud_options)?;
40        self.to_writer(f).finish(df)?;
41        Ok(())
42    }
43}
44
45/// Write a partitioned parquet dataset. This functionality is unstable.
46pub fn write_partitioned_dataset(
47    df: &mut DataFrame,
48    path: &Path,
49    partition_by: Vec<PlSmallStr>,
50    file_write_options: &(dyn WriteDataFrameToFile + Send + Sync),
51    cloud_options: Option<&CloudOptions>,
52    chunk_size: usize,
53) -> PolarsResult<()> {
54    // Ensure we have a single chunk as the gather will otherwise rechunk per group.
55    df.as_single_chunk_par();
56
57    // Note: When adding support for formats other than Parquet, avoid writing the partitioned
58    // columns into the file. We write them for parquet because they are encoded efficiently with
59    // RLE and also gives us a way to get the hive schema from the parquet file for free.
60    let get_hive_path_part = {
61        let schema = &df.schema();
62
63        let partition_by_col_idx = partition_by
64            .iter()
65            .map(|x| {
66                let Some(i) = schema.index_of(x.as_str()) else {
67                    polars_bail!(col_not_found = x)
68                };
69                Ok(i)
70            })
71            .collect::<PolarsResult<Vec<_>>>()?;
72
73        move |df: &DataFrame| {
74            let cols = df.get_columns();
75
76            partition_by_col_idx
77                .iter()
78                .map(|&i| {
79                    let s = &cols[i].slice(0, 1).cast(&DataType::String).unwrap();
80
81                    format!(
82                        "{}={}",
83                        s.name(),
84                        percent_encoding::percent_encode(
85                            s.str()
86                                .unwrap()
87                                .get(0)
88                                .unwrap_or("__HIVE_DEFAULT_PARTITION__")
89                                .as_bytes(),
90                            URL_ENCODE_CHAR_SET
91                        )
92                    )
93                })
94                .collect::<Vec<_>>()
95                .join("/")
96        }
97    };
98
99    let base_path = path;
100    let is_cloud = is_cloud_url(base_path);
101    let groups = df.group_by(partition_by)?.take_groups();
102
103    let init_part_base_dir = |part_df: &DataFrame| {
104        let path_part = get_hive_path_part(part_df);
105        let dir = base_path.join(path_part);
106
107        if !is_cloud {
108            std::fs::create_dir_all(&dir)?;
109        }
110
111        PolarsResult::Ok(dir)
112    };
113
114    fn get_path_for_index(i: usize) -> String {
115        // Use a fixed-width file name so that it sorts properly.
116        format!("{:08x}.parquet", i)
117    }
118
119    let get_n_files_and_rows_per_file = |part_df: &DataFrame| {
120        let n_files = (part_df.estimated_size() / chunk_size).clamp(1, 0xffff_ffff);
121        let rows_per_file = (df.height() / n_files).saturating_add(1);
122        (n_files, rows_per_file)
123    };
124
125    let write_part = |mut df: DataFrame, path: &Path| {
126        file_write_options.write_df_to_file(&mut df, path.to_str().unwrap(), cloud_options)?;
127        PolarsResult::Ok(())
128    };
129
130    // This is sqrt(N) of the actual limit - we chunk the input both at the groups
131    // proxy level and within every group.
132    const MAX_OPEN_FILES: usize = 8;
133
134    let finish_part_df = |df: DataFrame| {
135        let dir_path = init_part_base_dir(&df)?;
136        let (n_files, rows_per_file) = get_n_files_and_rows_per_file(&df);
137
138        if n_files == 1 {
139            write_part(df.clone(), &dir_path.join(get_path_for_index(0)))
140        } else {
141            (0..df.height())
142                .step_by(rows_per_file)
143                .enumerate()
144                .collect::<Vec<_>>()
145                .chunks(MAX_OPEN_FILES)
146                .map(|chunk| {
147                    chunk
148                        .into_par_iter()
149                        .map(|&(idx, slice_start)| {
150                            let df = df.slice(slice_start as i64, rows_per_file);
151                            write_part(df.clone(), &dir_path.join(get_path_for_index(idx)))
152                        })
153                        .reduce(
154                            || PolarsResult::Ok(()),
155                            |a, b| if a.is_err() { a } else { b },
156                        )
157                })
158                .collect::<PolarsResult<Vec<()>>>()?;
159            Ok(())
160        }
161    };
162
163    POOL.install(|| match groups.as_ref() {
164        GroupsType::Idx(idx) => idx
165            .all()
166            .chunks(MAX_OPEN_FILES)
167            .map(|chunk| {
168                chunk
169                    .par_iter()
170                    .map(|group| {
171                        let df = unsafe {
172                            df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending)
173                        };
174                        finish_part_df(df)
175                    })
176                    .reduce(
177                        || PolarsResult::Ok(()),
178                        |a, b| if a.is_err() { a } else { b },
179                    )
180            })
181            .collect::<PolarsResult<Vec<()>>>(),
182        GroupsType::Slice { groups, .. } => groups
183            .chunks(MAX_OPEN_FILES)
184            .map(|chunk| {
185                chunk
186                    .into_par_iter()
187                    .map(|&[offset, len]| {
188                        let df = df.slice(offset as i64, len as usize);
189                        finish_part_df(df)
190                    })
191                    .reduce(
192                        || PolarsResult::Ok(()),
193                        |a, b| if a.is_err() { a } else { b },
194                    )
195            })
196            .collect::<PolarsResult<Vec<()>>>(),
197    })?;
198
199    Ok(())
200}