polars_io/
hive.rs

1use polars_core::frame::DataFrame;
2use polars_core::frame::column::ScalarColumn;
3use polars_core::prelude::{Column, DataType};
4use polars_core::series::Series;
5
6use crate::utils::HIVE_VALUE_ENCODE_CHARSET;
7
8/// Materializes hive partitions.
9/// We have a special num_rows arg, as df can be empty when a projection contains
10/// only hive partition columns.
11///
12/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`. The
13/// columns will be materialized by their positions in the file schema if they exist, or otherwise
14/// at the end.
15///
16/// # Safety
17///
18/// num_rows equals the height of the df when the df height is non-zero.
19pub(crate) fn materialize_hive_partitions<F, M>(
20    df: &mut DataFrame,
21    reader_schema: &polars_schema::Schema<F, M>,
22    hive_partition_columns: Option<&[Series]>,
23) {
24    let num_rows = df.height();
25
26    if let Some(hive_columns) = hive_partition_columns {
27        // Insert these hive columns in the order they are stored in the file.
28        if hive_columns.is_empty() {
29            return;
30        }
31
32        let hive_columns = hive_columns
33            .iter()
34            .map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
35            .collect::<Vec<Column>>();
36
37        if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
38            // Fast-path - all hive columns are at the end
39            if df.width() == 0 {
40                unsafe { df.set_height(num_rows) };
41            }
42            unsafe { df.hstack_mut_unchecked(&hive_columns) };
43            return;
44        }
45
46        let mut merged = Vec::with_capacity(df.width() + hive_columns.len());
47
48        // `hive_partitions_from_paths()` guarantees `hive_columns` is sorted by their appearance in `reader_schema`.
49        merge_sorted_to_schema_order(
50            &mut unsafe { df.columns_mut() }.drain(..),
51            &mut hive_columns.into_iter(),
52            reader_schema,
53            &mut merged,
54        );
55
56        *df = unsafe { DataFrame::new_unchecked(num_rows, merged) };
57    }
58}
59
60/// Merge 2 lists of columns into one, where each list contains columns ordered such that their indices
61/// in the `schema` are in ascending order.
62///
63/// Layouts:
64/// * `cols_lhs`: `[row_index?, ..schema_columns?, ..other_left?]`
65///   * If the first item in `cols_lhs` is not found in the schema, it will be assumed to be a
66///     `row_index` column and placed first into the result.
67/// * `cols_rhs`: `[..schema_columns? ..other_right?]`
68///
69/// Output:
70/// * `[..schema_columns?, ..other_left?, ..other_right?]`
71///
72/// Note: The `row_index` column should be handled before calling this function.
73///
74/// # Panics
75/// Panics if either `cols_lhs` or `cols_rhs` is empty.
76pub fn merge_sorted_to_schema_order<'a, F, M>(
77    cols_lhs: &'a mut dyn Iterator<Item = Column>,
78    cols_rhs: &'a mut dyn Iterator<Item = Column>,
79    schema: &polars_schema::Schema<F, M>,
80    output: &'a mut Vec<Column>,
81) {
82    merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
83}
84
85pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
86    cols_lhs: &'a mut dyn Iterator<Item = T>,
87    cols_rhs: &'a mut dyn Iterator<Item = T>,
88    output: &mut O,
89    get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
90) where
91    O: Extend<T>,
92{
93    let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];
94
95    (|| {
96        let (Some(a), Some(b)) = (
97            series_arr[0]
98                .peek()
99                .and_then(|x| get_opt_index(x).or(Some(0))),
100            series_arr[1].peek().and_then(get_opt_index),
101        ) else {
102            return;
103        };
104
105        let mut schema_idx_arr = [a, b];
106
107        loop {
108            // Take from the side whose next column appears earlier in the `schema`.
109            let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
110                1
111            } else {
112                0
113            };
114
115            output.extend([series_arr[arg_min].next().unwrap()]);
116
117            let Some(v) = series_arr[arg_min].peek() else {
118                return;
119            };
120
121            let Some(i) = get_opt_index(v) else {
122                // All columns in `cols_lhs` should be present in `schema` except for a row_index column.
123                // We assume that if a row_index column exists it is always the first column and handle that at
124                // initialization.
125                debug_assert_eq!(arg_min, 1);
126                break;
127            };
128
129            schema_idx_arr[arg_min] = i;
130        }
131    })();
132
133    let [a, b] = series_arr;
134    output.extend(a);
135    output.extend(b);
136}
137
138/// # Panics
139/// The `Display` impl of this will panic if a column has non-unit length.
140pub struct HivePathFormatter<'a> {
141    keys: &'a [Column],
142}
143
144impl<'a> HivePathFormatter<'a> {
145    pub fn new(keys: &'a [Column]) -> Self {
146        Self { keys }
147    }
148}
149
150impl std::fmt::Display for HivePathFormatter<'_> {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        for column in self.keys {
153            assert_eq!(column.len(), 1);
154            let column = column.cast(&DataType::String).unwrap();
155
156            let key = column.name();
157            let value = percent_encoding::percent_encode(
158                column
159                    .str()
160                    .unwrap()
161                    .get(0)
162                    .unwrap_or("__HIVE_DEFAULT_PARTITION__")
163                    .as_bytes(),
164                HIVE_VALUE_ENCODE_CHARSET,
165            );
166
167            write!(f, "{key}={value}/")?
168        }
169
170        Ok(())
171    }
172}