1use polars_core::frame::DataFrame;
2use polars_core::frame::column::ScalarColumn;
3use polars_core::prelude::Column;
4use polars_core::series::Series;
5
6pub(crate) fn materialize_hive_partitions<D>(
18 df: &mut DataFrame,
19 reader_schema: &polars_schema::Schema<D>,
20 hive_partition_columns: Option<&[Series]>,
21) {
22 let num_rows = df.height();
23
24 if let Some(hive_columns) = hive_partition_columns {
25 if hive_columns.is_empty() {
27 return;
28 }
29
30 let hive_columns = hive_columns
31 .iter()
32 .map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
33 .collect::<Vec<Column>>();
34
35 if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
36 if df.width() == 0 {
38 unsafe { df.set_height(num_rows) };
39 }
40 unsafe { df.hstack_mut_unchecked(&hive_columns) };
41 return;
42 }
43
44 let mut merged = Vec::with_capacity(df.width() + hive_columns.len());
45
46 merge_sorted_to_schema_order(
48 &mut unsafe { df.get_columns_mut().drain(..) },
49 &mut hive_columns.into_iter(),
50 reader_schema,
51 &mut merged,
52 );
53
54 *df = unsafe { DataFrame::new_no_checks(num_rows, merged) };
55 }
56}
57
58pub fn merge_sorted_to_schema_order<'a, D>(
75 cols_lhs: &'a mut dyn Iterator<Item = Column>,
76 cols_rhs: &'a mut dyn Iterator<Item = Column>,
77 schema: &polars_schema::Schema<D>,
78 output: &'a mut Vec<Column>,
79) {
80 merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
81}
82
83pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
84 cols_lhs: &'a mut dyn Iterator<Item = T>,
85 cols_rhs: &'a mut dyn Iterator<Item = T>,
86 output: &mut O,
87 get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
88) where
89 O: Extend<T>,
90{
91 let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];
92
93 (|| {
94 let (Some(a), Some(b)) = (
95 series_arr[0]
96 .peek()
97 .and_then(|x| get_opt_index(x).or(Some(0))),
98 series_arr[1].peek().and_then(get_opt_index),
99 ) else {
100 return;
101 };
102
103 let mut schema_idx_arr = [a, b];
104
105 loop {
106 let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
108 1
109 } else {
110 0
111 };
112
113 output.extend([series_arr[arg_min].next().unwrap()]);
114
115 let Some(v) = series_arr[arg_min].peek() else {
116 return;
117 };
118
119 let Some(i) = get_opt_index(v) else {
120 debug_assert_eq!(arg_min, 1);
124 break;
125 };
126
127 schema_idx_arr[arg_min] = i;
128 }
129 })();
130
131 let [a, b] = series_arr;
132 output.extend(a);
133 output.extend(b);
134}