polars_ops/frame/join/
general.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use polars_utils::format_pl_smallstr;

use super::*;
use crate::series::coalesce_columns;

pub fn _join_suffix_name(name: &str, suffix: &str) -> PlSmallStr {
    format_pl_smallstr!("{name}{suffix}")
}

fn get_suffix(suffix: Option<PlSmallStr>) -> PlSmallStr {
    suffix.unwrap_or_else(|| PlSmallStr::from_static("_right"))
}

/// Renames the columns on the right to not clash with the left using a specified or otherwise default suffix
/// and then merges the right dataframe into the left
#[doc(hidden)]
pub fn _finish_join(
    mut df_left: DataFrame,
    mut df_right: DataFrame,
    suffix: Option<PlSmallStr>,
) -> PolarsResult<DataFrame> {
    let mut left_names = PlHashSet::with_capacity(df_left.width());

    df_left.get_columns().iter().for_each(|series| {
        left_names.insert(series.name());
    });

    let mut rename_strs = Vec::with_capacity(df_right.width());
    let right_names = df_right.schema();

    for name in right_names.iter_names() {
        if left_names.contains(name) {
            rename_strs.push(name.clone())
        }
    }

    let suffix = get_suffix(suffix);

    for name in rename_strs {
        let new_name = _join_suffix_name(name.as_str(), suffix.as_str());

        df_right.rename(&name, new_name.clone()).map_err(|_| {
            polars_err!(Duplicate: "column with name '{}' already exists\n\n\
            You may want to try:\n\
            - renaming the column prior to joining\n\
            - using the `suffix` parameter to specify a suffix different to the default one ('_right')", new_name)
        })?;
    }

    drop(left_names);
    unsafe { df_left.hstack_mut_unchecked(df_right.get_columns()) };
    Ok(df_left)
}

pub fn _coalesce_full_join(
    mut df: DataFrame,
    keys_left: &[PlSmallStr],
    keys_right: &[PlSmallStr],
    suffix: Option<PlSmallStr>,
    df_left: &DataFrame,
) -> DataFrame {
    // No need to allocate the schema because we already
    // know for certain that the column name for left is `name`
    // and for right is `name + suffix`
    let schema_left = if keys_left == keys_right {
        Arc::new(Schema::default())
    } else {
        df_left.schema().clone()
    };

    let schema = df.schema().clone();
    let mut to_remove = Vec::with_capacity(keys_right.len());

    // SAFETY: we maintain invariants.
    let columns = unsafe { df.get_columns_mut() };
    let suffix = get_suffix(suffix);
    for (l, r) in keys_left.iter().zip(keys_right.iter()) {
        let pos_l = schema.get_full(l.as_str()).unwrap().0;

        let r = if l == r || schema_left.contains(r.as_str()) {
            _join_suffix_name(r.as_str(), suffix.as_str())
        } else {
            r.clone()
        };
        let pos_r = schema.get_full(&r).unwrap().0;

        let l = columns[pos_l].clone();
        let r = columns[pos_r].clone();

        columns[pos_l] = coalesce_columns(&[l, r]).unwrap();
        to_remove.push(pos_r);
    }
    // sort in reverse order, so the indexes remain correct if we remove.
    to_remove.sort_by(|a, b| b.cmp(a));
    for pos in to_remove {
        let _ = columns.remove(pos);
    }
    df.clear_schema();
    df
}

#[cfg(feature = "chunked_ids")]
pub(crate) fn create_chunked_index_mapping(chunks: &[ArrayRef], len: usize) -> Vec<ChunkId> {
    let mut vals = Vec::with_capacity(len);

    for (chunk_i, chunk) in chunks.iter().enumerate() {
        vals.extend(
            (0..chunk.len()).map(|array_i| ChunkId::store(chunk_i as IdxSize, array_i as IdxSize)),
        )
    }

    vals
}