polars_io/
hive.rs

1use polars_core::frame::DataFrame;
2use polars_core::frame::column::ScalarColumn;
3use polars_core::prelude::Column;
4use polars_core::series::Series;
5
6/// Materializes hive partitions.
7/// We have a special num_rows arg, as df can be empty when a projection contains
8/// only hive partition columns.
9///
10/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`. The
11/// columns will be materialized by their positions in the file schema if they exist, or otherwise
12/// at the end.
13///
14/// # Safety
15///
16/// num_rows equals the height of the df when the df height is non-zero.
17pub(crate) fn materialize_hive_partitions<D>(
18    df: &mut DataFrame,
19    reader_schema: &polars_schema::Schema<D>,
20    hive_partition_columns: Option<&[Series]>,
21) {
22    let num_rows = df.height();
23
24    if let Some(hive_columns) = hive_partition_columns {
25        // Insert these hive columns in the order they are stored in the file.
26        if hive_columns.is_empty() {
27            return;
28        }
29
30        let hive_columns = hive_columns
31            .iter()
32            .map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
33            .collect::<Vec<Column>>();
34
35        if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
36            // Fast-path - all hive columns are at the end
37            if df.width() == 0 {
38                unsafe { df.set_height(num_rows) };
39            }
40            unsafe { df.hstack_mut_unchecked(&hive_columns) };
41            return;
42        }
43
44        let mut merged = Vec::with_capacity(df.width() + hive_columns.len());
45
46        // `hive_partitions_from_paths()` guarantees `hive_columns` is sorted by their appearance in `reader_schema`.
47        merge_sorted_to_schema_order(
48            &mut unsafe { df.get_columns_mut().drain(..) },
49            &mut hive_columns.into_iter(),
50            reader_schema,
51            &mut merged,
52        );
53
54        *df = unsafe { DataFrame::new_no_checks(num_rows, merged) };
55    }
56}
57
58/// Merge 2 lists of columns into one, where each list contains columns ordered such that their indices
59/// in the `schema` are in ascending order.
60///
61/// Layouts:
62/// * `cols_lhs`: `[row_index?, ..schema_columns?, ..other_left?]`
63///   * If the first item in `cols_lhs` is not found in the schema, it will be assumed to be a
64///     `row_index` column and placed first into the result.
65/// * `cols_rhs`: `[..schema_columns? ..other_right?]`
66///
67/// Output:
68/// * `[..schema_columns?, ..other_left?, ..other_right?]`
69///
70/// Note: The `row_index` column should be handled before calling this function.
71///
72/// # Panics
73/// Panics if either `cols_lhs` or `cols_rhs` is empty.
74pub fn merge_sorted_to_schema_order<'a, D>(
75    cols_lhs: &'a mut dyn Iterator<Item = Column>,
76    cols_rhs: &'a mut dyn Iterator<Item = Column>,
77    schema: &polars_schema::Schema<D>,
78    output: &'a mut Vec<Column>,
79) {
80    merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
81}
82
83pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
84    cols_lhs: &'a mut dyn Iterator<Item = T>,
85    cols_rhs: &'a mut dyn Iterator<Item = T>,
86    output: &mut O,
87    get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
88) where
89    O: Extend<T>,
90{
91    let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];
92
93    (|| {
94        let (Some(a), Some(b)) = (
95            series_arr[0]
96                .peek()
97                .and_then(|x| get_opt_index(x).or(Some(0))),
98            series_arr[1].peek().and_then(get_opt_index),
99        ) else {
100            return;
101        };
102
103        let mut schema_idx_arr = [a, b];
104
105        loop {
106            // Take from the side whose next column appears earlier in the `schema`.
107            let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
108                1
109            } else {
110                0
111            };
112
113            output.extend([series_arr[arg_min].next().unwrap()]);
114
115            let Some(v) = series_arr[arg_min].peek() else {
116                return;
117            };
118
119            let Some(i) = get_opt_index(v) else {
120                // All columns in `cols_lhs` should be present in `schema` except for a row_index column.
121                // We assume that if a row_index column exists it is always the first column and handle that at
122                // initialization.
123                debug_assert_eq!(arg_min, 1);
124                break;
125            };
126
127            schema_idx_arr[arg_min] = i;
128        }
129    })();
130
131    let [a, b] = series_arr;
132    output.extend(a);
133    output.extend(b);
134}