polars_io/
hive.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use polars_core::frame::column::ScalarColumn;
use polars_core::frame::DataFrame;
use polars_core::prelude::Column;
use polars_core::series::Series;

/// Materializes hive partitions.
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
///
/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`. The
/// columns will be materialized by their positions in the file schema if they exist, or otherwise
/// at the end.
///
/// # Safety
///
/// num_rows equals the height of the df when the df height is non-zero.
pub(crate) fn materialize_hive_partitions<D>(
    df: &mut DataFrame,
    reader_schema: &polars_schema::Schema<D>,
    hive_partition_columns: Option<&[Series]>,
    num_rows: usize,
) {
    if let Some(hive_columns) = hive_partition_columns {
        // Insert these hive columns in the order they are stored in the file.
        if hive_columns.is_empty() {
            return;
        }

        let hive_columns = hive_columns
            .iter()
            .map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
            .collect::<Vec<Column>>();

        if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
            // Fast-path - all hive columns are at the end
            if df.width() == 0 {
                unsafe { df.set_height(num_rows) };
            }
            unsafe { df.hstack_mut_unchecked(&hive_columns) };
            return;
        }

        let mut merged = Vec::with_capacity(df.width() + hive_columns.len());

        // `hive_partitions_from_paths()` guarantees `hive_columns` is sorted by their appearance in `reader_schema`.
        merge_sorted_to_schema_order(
            &mut unsafe { df.get_columns_mut().drain(..) },
            &mut hive_columns.into_iter(),
            reader_schema,
            &mut merged,
        );

        *df = unsafe { DataFrame::new_no_checks(num_rows, merged) };
    }
}

/// Merge 2 lists of columns into one, where each list contains columns ordered such that their indices
/// in the `schema` are in ascending order.
///
/// Layouts:
/// * `cols_lhs`: `[row_index?, ..schema_columns?, ..other_left?]`
///   * If the first item in `cols_lhs` is not found in the schema, it will be assumed to be a
///     `row_index` column and placed first into the result.
/// * `cols_rhs`: `[..schema_columns? ..other_right?]`
///
/// Output:
/// * `[..schema_columns?, ..other_left?, ..other_right?]`
///
/// Note: The `row_index` column should be handled before calling this function.
///
/// # Panics
/// Panics if either `cols_lhs` or `cols_rhs` is empty.
pub fn merge_sorted_to_schema_order<'a, D>(
    cols_lhs: &'a mut dyn Iterator<Item = Column>,
    cols_rhs: &'a mut dyn Iterator<Item = Column>,
    schema: &polars_schema::Schema<D>,
    output: &'a mut Vec<Column>,
) {
    merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
}

pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
    cols_lhs: &'a mut dyn Iterator<Item = T>,
    cols_rhs: &'a mut dyn Iterator<Item = T>,
    output: &mut O,
    get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
) where
    O: Extend<T>,
{
    let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];

    (|| {
        let (Some(a), Some(b)) = (
            series_arr[0]
                .peek()
                .and_then(|x| get_opt_index(x).or(Some(0))),
            series_arr[1].peek().and_then(get_opt_index),
        ) else {
            return;
        };

        let mut schema_idx_arr = [a, b];

        loop {
            // Take from the side whose next column appears earlier in the `schema`.
            let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
                1
            } else {
                0
            };

            output.extend([series_arr[arg_min].next().unwrap()]);

            let Some(v) = series_arr[arg_min].peek() else {
                return;
            };

            let Some(i) = get_opt_index(v) else {
                // All columns in `cols_lhs` should be present in `schema` except for a row_index column.
                // We assume that if a row_index column exists it is always the first column and handle that at
                // initialization.
                debug_assert_eq!(arg_min, 1);
                break;
            };

            schema_idx_arr[arg_min] = i;
        }
    })();

    let [a, b] = series_arr;
    output.extend(a);
    output.extend(b);
}