polars_io/
partition.rs
1use std::path::Path;
4
5use polars_core::POOL;
6use polars_core::prelude::*;
7use polars_core::series::IsSorted;
8use rayon::prelude::*;
9
10use crate::cloud::CloudOptions;
11use crate::parquet::write::ParquetWriteOptions;
12#[cfg(feature = "ipc")]
13use crate::prelude::IpcWriterOptions;
14use crate::prelude::URL_ENCODE_CHAR_SET;
15use crate::utils::file::try_get_writeable;
16use crate::{SerWriter, WriteDataFrameToFile, is_cloud_url};
17
18impl WriteDataFrameToFile for ParquetWriteOptions {
19 fn write_df_to_file(
20 &self,
21 df: &mut DataFrame,
22 path: &str,
23 cloud_options: Option<&CloudOptions>,
24 ) -> PolarsResult<()> {
25 let f = try_get_writeable(path, cloud_options)?;
26 self.to_writer(f).finish(df)?;
27 Ok(())
28 }
29}
30
31#[cfg(feature = "ipc")]
32impl WriteDataFrameToFile for IpcWriterOptions {
33 fn write_df_to_file(
34 &self,
35 df: &mut DataFrame,
36 path: &str,
37 cloud_options: Option<&CloudOptions>,
38 ) -> PolarsResult<()> {
39 let f = try_get_writeable(path, cloud_options)?;
40 self.to_writer(f).finish(df)?;
41 Ok(())
42 }
43}
44
45pub fn write_partitioned_dataset(
47 df: &mut DataFrame,
48 path: &Path,
49 partition_by: Vec<PlSmallStr>,
50 file_write_options: &(dyn WriteDataFrameToFile + Send + Sync),
51 cloud_options: Option<&CloudOptions>,
52 chunk_size: usize,
53) -> PolarsResult<()> {
54 df.as_single_chunk_par();
56
57 let get_hive_path_part = {
61 let schema = &df.schema();
62
63 let partition_by_col_idx = partition_by
64 .iter()
65 .map(|x| {
66 let Some(i) = schema.index_of(x.as_str()) else {
67 polars_bail!(col_not_found = x)
68 };
69 Ok(i)
70 })
71 .collect::<PolarsResult<Vec<_>>>()?;
72
73 move |df: &DataFrame| {
74 let cols = df.get_columns();
75
76 partition_by_col_idx
77 .iter()
78 .map(|&i| {
79 let s = &cols[i].slice(0, 1).cast(&DataType::String).unwrap();
80
81 format!(
82 "{}={}",
83 s.name(),
84 percent_encoding::percent_encode(
85 s.str()
86 .unwrap()
87 .get(0)
88 .unwrap_or("__HIVE_DEFAULT_PARTITION__")
89 .as_bytes(),
90 URL_ENCODE_CHAR_SET
91 )
92 )
93 })
94 .collect::<Vec<_>>()
95 .join("/")
96 }
97 };
98
99 let base_path = path;
100 let is_cloud = is_cloud_url(base_path);
101 let groups = df.group_by(partition_by)?.take_groups();
102
103 let init_part_base_dir = |part_df: &DataFrame| {
104 let path_part = get_hive_path_part(part_df);
105 let dir = base_path.join(path_part);
106
107 if !is_cloud {
108 std::fs::create_dir_all(&dir)?;
109 }
110
111 PolarsResult::Ok(dir)
112 };
113
114 fn get_path_for_index(i: usize) -> String {
115 format!("{:08x}.parquet", i)
117 }
118
119 let get_n_files_and_rows_per_file = |part_df: &DataFrame| {
120 let n_files = (part_df.estimated_size() / chunk_size).clamp(1, 0xffff_ffff);
121 let rows_per_file = (df.height() / n_files).saturating_add(1);
122 (n_files, rows_per_file)
123 };
124
125 let write_part = |mut df: DataFrame, path: &Path| {
126 file_write_options.write_df_to_file(&mut df, path.to_str().unwrap(), cloud_options)?;
127 PolarsResult::Ok(())
128 };
129
130 const MAX_OPEN_FILES: usize = 8;
133
134 let finish_part_df = |df: DataFrame| {
135 let dir_path = init_part_base_dir(&df)?;
136 let (n_files, rows_per_file) = get_n_files_and_rows_per_file(&df);
137
138 if n_files == 1 {
139 write_part(df.clone(), &dir_path.join(get_path_for_index(0)))
140 } else {
141 (0..df.height())
142 .step_by(rows_per_file)
143 .enumerate()
144 .collect::<Vec<_>>()
145 .chunks(MAX_OPEN_FILES)
146 .map(|chunk| {
147 chunk
148 .into_par_iter()
149 .map(|&(idx, slice_start)| {
150 let df = df.slice(slice_start as i64, rows_per_file);
151 write_part(df.clone(), &dir_path.join(get_path_for_index(idx)))
152 })
153 .reduce(
154 || PolarsResult::Ok(()),
155 |a, b| if a.is_err() { a } else { b },
156 )
157 })
158 .collect::<PolarsResult<Vec<()>>>()?;
159 Ok(())
160 }
161 };
162
163 POOL.install(|| match groups.as_ref() {
164 GroupsType::Idx(idx) => idx
165 .all()
166 .chunks(MAX_OPEN_FILES)
167 .map(|chunk| {
168 chunk
169 .par_iter()
170 .map(|group| {
171 let df = unsafe {
172 df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending)
173 };
174 finish_part_df(df)
175 })
176 .reduce(
177 || PolarsResult::Ok(()),
178 |a, b| if a.is_err() { a } else { b },
179 )
180 })
181 .collect::<PolarsResult<Vec<()>>>(),
182 GroupsType::Slice { groups, .. } => groups
183 .chunks(MAX_OPEN_FILES)
184 .map(|chunk| {
185 chunk
186 .into_par_iter()
187 .map(|&[offset, len]| {
188 let df = df.slice(offset as i64, len as usize);
189 finish_part_df(df)
190 })
191 .reduce(
192 || PolarsResult::Ok(()),
193 |a, b| if a.is_err() { a } else { b },
194 )
195 })
196 .collect::<PolarsResult<Vec<()>>>(),
197 })?;
198
199 Ok(())
200}