polars_ops/frame/join/
general.rs1use polars_utils::format_pl_smallstr;
2
3use super::*;
4use crate::series::coalesce_columns;
5
6pub fn _join_suffix_name(name: &str, suffix: &str) -> PlSmallStr {
7 format_pl_smallstr!("{name}{suffix}")
8}
9
10fn get_suffix(suffix: Option<PlSmallStr>) -> PlSmallStr {
11 suffix.unwrap_or_else(|| PlSmallStr::from_static("_right"))
12}
13
14#[doc(hidden)]
17pub fn _finish_join(
18 mut df_left: DataFrame,
19 mut df_right: DataFrame,
20 suffix: Option<PlSmallStr>,
21) -> PolarsResult<DataFrame> {
22 let mut left_names = PlHashSet::with_capacity(df_left.width());
23
24 df_left.get_columns().iter().for_each(|series| {
25 left_names.insert(series.name());
26 });
27
28 let mut rename_strs = Vec::with_capacity(df_right.width());
29 let right_names = df_right.schema();
30
31 for name in right_names.iter_names() {
32 if left_names.contains(name) {
33 rename_strs.push(name.clone())
34 }
35 }
36
37 let suffix = get_suffix(suffix);
38
39 df_right.rename_many(rename_strs.iter().map(|name| {
40 (
41 name.as_str(),
42 _join_suffix_name(name.as_str(), suffix.as_str()),
43 )
44 }))?;
45
46 drop(left_names);
47 unsafe { df_left.hstack_mut_unchecked(df_right.get_columns()) };
49 Ok(df_left)
50}
51
52pub fn _coalesce_full_join(
53 mut df: DataFrame,
54 keys_left: &[PlSmallStr],
55 keys_right: &[PlSmallStr],
56 suffix: Option<PlSmallStr>,
57 df_left: &DataFrame,
58) -> DataFrame {
59 let schema_left = if keys_left == keys_right {
63 Arc::new(Schema::default())
64 } else {
65 df_left.schema().clone()
66 };
67
68 let schema = df.schema().clone();
69 let mut to_remove = Vec::with_capacity(keys_right.len());
70
71 let columns = unsafe { df.get_columns_mut() };
73 let suffix = get_suffix(suffix);
74 for (l, r) in keys_left.iter().zip(keys_right.iter()) {
75 let pos_l = schema.get_full(l.as_str()).unwrap().0;
76
77 let r = if l == r || schema_left.contains(r.as_str()) {
78 _join_suffix_name(r.as_str(), suffix.as_str())
79 } else {
80 r.clone()
81 };
82 let pos_r = schema.get_full(&r).unwrap().0;
83
84 let l = columns[pos_l].clone();
85 let r = columns[pos_r].clone();
86
87 columns[pos_l] = coalesce_columns(&[l, r]).unwrap();
88 to_remove.push(pos_r);
89 }
90 to_remove.sort_by(|a, b| b.cmp(a));
92 for pos in to_remove {
93 let _ = columns.remove(pos);
94 }
95 df.clear_schema();
96 df
97}
98
99#[cfg(feature = "chunked_ids")]
100pub(crate) fn create_chunked_index_mapping(chunks: &[ArrayRef], len: usize) -> Vec<ChunkId> {
101 let mut vals = Vec::with_capacity(len);
102
103 for (chunk_i, chunk) in chunks.iter().enumerate() {
104 vals.extend(
105 (0..chunk.len()).map(|array_i| ChunkId::store(chunk_i as IdxSize, array_i as IdxSize)),
106 )
107 }
108
109 vals
110}