1use polars_core::POOL;
4use polars_core::prelude::*;
5use polars_core::series::IsSorted;
6use polars_utils::plpath::PlPathRef;
7use rayon::prelude::*;
8
9use crate::cloud::CloudOptions;
10use crate::parquet::write::ParquetWriteOptions;
11#[cfg(feature = "ipc")]
12use crate::prelude::IpcWriterOptions;
13use crate::prelude::URL_ENCODE_CHAR_SET;
14use crate::utils::file::try_get_writeable;
15use crate::{SerWriter, WriteDataFrameToFile};
16
17impl WriteDataFrameToFile for ParquetWriteOptions {
18 fn write_df_to_file(
19 &self,
20 df: &mut DataFrame,
21 addr: PlPathRef<'_>,
22 cloud_options: Option<&CloudOptions>,
23 ) -> PolarsResult<()> {
24 let f = try_get_writeable(addr, cloud_options)?;
25 self.to_writer(f).finish(df)?;
26 Ok(())
27 }
28}
29
30#[cfg(feature = "ipc")]
31impl WriteDataFrameToFile for IpcWriterOptions {
32 fn write_df_to_file(
33 &self,
34 df: &mut DataFrame,
35 addr: PlPathRef<'_>,
36 cloud_options: Option<&CloudOptions>,
37 ) -> PolarsResult<()> {
38 let f = try_get_writeable(addr, cloud_options)?;
39 self.to_writer(f).finish(df)?;
40 Ok(())
41 }
42}
43
44pub fn write_partitioned_dataset(
46 df: &mut DataFrame,
47 addr: PlPathRef<'_>,
48 partition_by: Vec<PlSmallStr>,
49 file_write_options: &(dyn WriteDataFrameToFile + Send + Sync),
50 cloud_options: Option<&CloudOptions>,
51 chunk_size: usize,
52) -> PolarsResult<()> {
53 df.as_single_chunk_par();
55
56 let get_hive_path_part = {
60 let schema = &df.schema();
61
62 let partition_by_col_idx = partition_by
63 .iter()
64 .map(|x| {
65 let Some(i) = schema.index_of(x.as_str()) else {
66 polars_bail!(col_not_found = x)
67 };
68 Ok(i)
69 })
70 .collect::<PolarsResult<Vec<_>>>()?;
71
72 move |df: &DataFrame| {
73 let cols = df.get_columns();
74
75 partition_by_col_idx
76 .iter()
77 .map(|&i| {
78 let s = &cols[i].slice(0, 1).cast(&DataType::String).unwrap();
79
80 format!(
81 "{}={}",
82 s.name(),
83 percent_encoding::percent_encode(
84 s.str()
85 .unwrap()
86 .get(0)
87 .unwrap_or("__HIVE_DEFAULT_PARTITION__")
88 .as_bytes(),
89 URL_ENCODE_CHAR_SET
90 )
91 )
92 })
93 .collect::<Vec<_>>()
94 .join("/")
95 }
96 };
97
98 let base_path = addr;
99 let groups = df.group_by(partition_by)?.take_groups();
100
101 let init_part_base_dir = |part_df: &DataFrame| {
102 let path_part = get_hive_path_part(part_df);
103 let dir = base_path.join(path_part);
104
105 if let Some(dir) = dir.as_ref().as_local_path() {
106 std::fs::create_dir_all(dir)?;
107 }
108
109 PolarsResult::Ok(dir)
110 };
111
112 fn get_path_for_index(i: usize) -> String {
113 format!("{i:08x}.parquet")
115 }
116
117 let get_n_files_and_rows_per_file = |part_df: &DataFrame| {
118 let n_files = (part_df.estimated_size() / chunk_size).clamp(1, 0xffff_ffff);
119 let rows_per_file = (df.height() / n_files).saturating_add(1);
120 (n_files, rows_per_file)
121 };
122
123 let write_part = |mut df: DataFrame, addr: PlPathRef| {
124 file_write_options.write_df_to_file(&mut df, addr, cloud_options)?;
125 PolarsResult::Ok(())
126 };
127
128 const MAX_OPEN_FILES: usize = 8;
131
132 let finish_part_df = |df: DataFrame| {
133 let dir_path = init_part_base_dir(&df)?;
134 let (n_files, rows_per_file) = get_n_files_and_rows_per_file(&df);
135
136 if n_files == 1 {
137 write_part(df, dir_path.as_ref().join(get_path_for_index(0)).as_ref())
138 } else {
139 (0..df.height())
140 .step_by(rows_per_file)
141 .enumerate()
142 .collect::<Vec<_>>()
143 .chunks(MAX_OPEN_FILES)
144 .map(|chunk| {
145 chunk
146 .into_par_iter()
147 .map(|&(idx, slice_start)| {
148 let df = df.slice(slice_start as i64, rows_per_file);
149 write_part(df, dir_path.as_ref().join(get_path_for_index(idx)).as_ref())
150 })
151 .reduce(
152 || PolarsResult::Ok(()),
153 |a, b| if a.is_err() { a } else { b },
154 )
155 })
156 .collect::<PolarsResult<Vec<()>>>()?;
157 Ok(())
158 }
159 };
160
161 POOL.install(|| match groups.as_ref() {
162 GroupsType::Idx(idx) => idx
163 .all()
164 .chunks(MAX_OPEN_FILES)
165 .map(|chunk| {
166 chunk
167 .par_iter()
168 .map(|group| {
169 let df = unsafe {
170 df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending)
171 };
172 finish_part_df(df)
173 })
174 .reduce(
175 || PolarsResult::Ok(()),
176 |a, b| if a.is_err() { a } else { b },
177 )
178 })
179 .collect::<PolarsResult<Vec<()>>>(),
180 GroupsType::Slice { groups, .. } => groups
181 .chunks(MAX_OPEN_FILES)
182 .map(|chunk| {
183 chunk
184 .into_par_iter()
185 .map(|&[offset, len]| {
186 let df = df.slice(offset as i64, len as usize);
187 finish_part_df(df)
188 })
189 .reduce(
190 || PolarsResult::Ok(()),
191 |a, b| if a.is_err() { a } else { b },
192 )
193 })
194 .collect::<PolarsResult<Vec<()>>>(),
195 })?;
196
197 Ok(())
198}