1use polars_core::frame::DataFrame;
2use polars_core::frame::column::ScalarColumn;
3use polars_core::prelude::{Column, DataType};
4use polars_core::series::Series;
5
6use crate::utils::HIVE_VALUE_ENCODE_CHARSET;
7
8pub(crate) fn materialize_hive_partitions<F, M>(
20 df: &mut DataFrame,
21 reader_schema: &polars_schema::Schema<F, M>,
22 hive_partition_columns: Option<&[Series]>,
23) {
24 let num_rows = df.height();
25
26 if let Some(hive_columns) = hive_partition_columns {
27 if hive_columns.is_empty() {
29 return;
30 }
31
32 let hive_columns = hive_columns
33 .iter()
34 .map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
35 .collect::<Vec<Column>>();
36
37 if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
38 if df.width() == 0 {
40 unsafe { df.set_height(num_rows) };
41 }
42 unsafe { df.hstack_mut_unchecked(&hive_columns) };
43 return;
44 }
45
46 let mut merged = Vec::with_capacity(df.width() + hive_columns.len());
47
48 merge_sorted_to_schema_order(
50 &mut unsafe { df.columns_mut() }.drain(..),
51 &mut hive_columns.into_iter(),
52 reader_schema,
53 &mut merged,
54 );
55
56 *df = unsafe { DataFrame::new_unchecked(num_rows, merged) };
57 }
58}
59
60pub fn merge_sorted_to_schema_order<'a, F, M>(
77 cols_lhs: &'a mut dyn Iterator<Item = Column>,
78 cols_rhs: &'a mut dyn Iterator<Item = Column>,
79 schema: &polars_schema::Schema<F, M>,
80 output: &'a mut Vec<Column>,
81) {
82 merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
83}
84
85pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
86 cols_lhs: &'a mut dyn Iterator<Item = T>,
87 cols_rhs: &'a mut dyn Iterator<Item = T>,
88 output: &mut O,
89 get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
90) where
91 O: Extend<T>,
92{
93 let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];
94
95 (|| {
96 let (Some(a), Some(b)) = (
97 series_arr[0]
98 .peek()
99 .and_then(|x| get_opt_index(x).or(Some(0))),
100 series_arr[1].peek().and_then(get_opt_index),
101 ) else {
102 return;
103 };
104
105 let mut schema_idx_arr = [a, b];
106
107 loop {
108 let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
110 1
111 } else {
112 0
113 };
114
115 output.extend([series_arr[arg_min].next().unwrap()]);
116
117 let Some(v) = series_arr[arg_min].peek() else {
118 return;
119 };
120
121 let Some(i) = get_opt_index(v) else {
122 debug_assert_eq!(arg_min, 1);
126 break;
127 };
128
129 schema_idx_arr[arg_min] = i;
130 }
131 })();
132
133 let [a, b] = series_arr;
134 output.extend(a);
135 output.extend(b);
136}
137
138pub struct HivePathFormatter<'a> {
141 keys: &'a [Column],
142}
143
144impl<'a> HivePathFormatter<'a> {
145 pub fn new(keys: &'a [Column]) -> Self {
146 Self { keys }
147 }
148}
149
150impl std::fmt::Display for HivePathFormatter<'_> {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 for column in self.keys {
153 assert_eq!(column.len(), 1);
154 let column = column.cast(&DataType::String).unwrap();
155
156 let key = column.name();
157 let value = percent_encoding::percent_encode(
158 column
159 .str()
160 .unwrap()
161 .get(0)
162 .unwrap_or("__HIVE_DEFAULT_PARTITION__")
163 .as_bytes(),
164 HIVE_VALUE_ENCODE_CHARSET,
165 );
166
167 write!(f, "{key}={value}/")?
168 }
169
170 Ok(())
171 }
172}