1use polars_core::frame::DataFrame;
2use polars_core::frame::column::ScalarColumn;
3use polars_core::prelude::Column;
4use polars_core::series::Series;
56/// Materializes hive partitions.
7/// We have a special num_rows arg, as df can be empty when a projection contains
8/// only hive partition columns.
9///
10/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`. The
11/// columns will be materialized by their positions in the file schema if they exist, or otherwise
12/// at the end.
13///
14/// # Safety
15///
16/// num_rows equals the height of the df when the df height is non-zero.
17pub(crate) fn materialize_hive_partitions<D>(
18 df: &mut DataFrame,
19 reader_schema: &polars_schema::Schema<D>,
20 hive_partition_columns: Option<&[Series]>,
21) {
22let num_rows = df.height();
2324if let Some(hive_columns) = hive_partition_columns {
25// Insert these hive columns in the order they are stored in the file.
26if hive_columns.is_empty() {
27return;
28 }
2930let hive_columns = hive_columns
31 .iter()
32 .map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
33 .collect::<Vec<Column>>();
3435if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
36// Fast-path - all hive columns are at the end
37if df.width() == 0 {
38unsafe { df.set_height(num_rows) };
39 }
40unsafe { df.hstack_mut_unchecked(&hive_columns) };
41return;
42 }
4344let mut merged = Vec::with_capacity(df.width() + hive_columns.len());
4546// `hive_partitions_from_paths()` guarantees `hive_columns` is sorted by their appearance in `reader_schema`.
47merge_sorted_to_schema_order(
48&mut unsafe { df.get_columns_mut().drain(..) },
49&mut hive_columns.into_iter(),
50 reader_schema,
51&mut merged,
52 );
5354*df = unsafe { DataFrame::new_no_checks(num_rows, merged) };
55 }
56}
5758/// Merge 2 lists of columns into one, where each list contains columns ordered such that their indices
59/// in the `schema` are in ascending order.
60///
61/// Layouts:
62/// * `cols_lhs`: `[row_index?, ..schema_columns?, ..other_left?]`
63/// * If the first item in `cols_lhs` is not found in the schema, it will be assumed to be a
64/// `row_index` column and placed first into the result.
65/// * `cols_rhs`: `[..schema_columns? ..other_right?]`
66///
67/// Output:
68/// * `[..schema_columns?, ..other_left?, ..other_right?]`
69///
70/// Note: The `row_index` column should be handled before calling this function.
71///
72/// # Panics
73/// Panics if either `cols_lhs` or `cols_rhs` is empty.
74pub fn merge_sorted_to_schema_order<'a, D>(
75 cols_lhs: &'a mut dyn Iterator<Item = Column>,
76 cols_rhs: &'a mut dyn Iterator<Item = Column>,
77 schema: &polars_schema::Schema<D>,
78 output: &'a mut Vec<Column>,
79) {
80 merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
81}
8283pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
84 cols_lhs: &'a mut dyn Iterator<Item = T>,
85 cols_rhs: &'a mut dyn Iterator<Item = T>,
86 output: &mut O,
87 get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
88) where
89O: Extend<T>,
90{
91let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];
9293 (|| {
94let (Some(a), Some(b)) = (
95 series_arr[0]
96 .peek()
97 .and_then(|x| get_opt_index(x).or(Some(0))),
98 series_arr[1].peek().and_then(get_opt_index),
99 ) else {
100return;
101 };
102103let mut schema_idx_arr = [a, b];
104105loop {
106// Take from the side whose next column appears earlier in the `schema`.
107let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
1081
109} else {
1100
111};
112113 output.extend([series_arr[arg_min].next().unwrap()]);
114115let Some(v) = series_arr[arg_min].peek() else {
116return;
117 };
118119let Some(i) = get_opt_index(v) else {
120// All columns in `cols_lhs` should be present in `schema` except for a row_index column.
121 // We assume that if a row_index column exists it is always the first column and handle that at
122 // initialization.
123debug_assert_eq!(arg_min, 1);
124break;
125 };
126127 schema_idx_arr[arg_min] = i;
128 }
129 })();
130131let [a, b] = series_arr;
132 output.extend(a);
133 output.extend(b);
134}