polars_ops/frame/join/hash_join/
mod.rs1#![allow(unsafe_op_in_unsafe_fn)]
2pub(super) mod single_keys;
3mod single_keys_dispatch;
4mod single_keys_inner;
5mod single_keys_left;
6mod single_keys_outer;
7#[cfg(feature = "semi_anti_join")]
8mod single_keys_semi_anti;
9pub(super) mod sort_merge;
10use arrow::array::ArrayRef;
11use polars_core::POOL;
12use polars_core::utils::_set_partition_size;
13use polars_utils::index::ChunkId;
14pub(super) use single_keys::*;
15pub use single_keys_dispatch::SeriesJoin;
16#[cfg(feature = "asof_join")]
17pub(super) use single_keys_dispatch::prepare_binary;
18use single_keys_inner::*;
19use single_keys_left::*;
20use single_keys_outer::*;
21#[cfg(feature = "semi_anti_join")]
22use single_keys_semi_anti::*;
23pub(crate) use sort_merge::*;
24
25pub use super::*;
26#[cfg(feature = "chunked_ids")]
27use crate::chunked_array::gather::chunked::TakeChunkedHorPar;
28
29pub fn default_join_ids() -> ChunkJoinOptIds {
30 #[cfg(feature = "chunked_ids")]
31 {
32 Either::Left(vec![])
33 }
34 #[cfg(not(feature = "chunked_ids"))]
35 {
36 vec![]
37 }
38}
39
40macro_rules! det_hash_prone_order {
41 ($self:expr, $other:expr) => {{
42 if $self.len() > $other.len() {
44 ($self, $other, false)
45 } else {
46 ($other, $self, true)
47 }
48 }};
49}
50
51#[cfg(feature = "performant")]
52use arrow::legacy::conversion::primitive_to_vec;
53pub(super) use det_hash_prone_order;
54
55pub trait JoinDispatch: IntoDf {
56 #[cfg(feature = "chunked_ids")]
59 unsafe fn create_left_df_chunked(
60 &self,
61 chunk_ids: &[ChunkId],
62 left_join: bool,
63 was_sliced: bool,
64 ) -> DataFrame {
65 let df_self = self.to_df();
66
67 let left_join_no_duplicate_matches =
68 left_join && !was_sliced && chunk_ids.len() == df_self.height();
69
70 if left_join_no_duplicate_matches {
71 df_self.clone()
72 } else {
73 let sorted = if left_join {
75 IsSorted::Ascending
76 } else {
77 IsSorted::Not
78 };
79 df_self._take_chunked_unchecked_hor_par(chunk_ids, sorted)
80 }
81 }
82
83 unsafe fn _create_left_df_from_slice(
86 &self,
87 join_tuples: &[IdxSize],
88 left_join: bool,
89 was_sliced: bool,
90 sorted_tuple_idx: bool,
91 ) -> DataFrame {
92 let df_self = self.to_df();
93
94 let left_join_no_duplicate_matches =
95 sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height();
96
97 if left_join_no_duplicate_matches {
98 df_self.clone()
99 } else {
100 let sorted = if sorted_tuple_idx {
101 IsSorted::Ascending
102 } else {
103 IsSorted::Not
104 };
105
106 df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)
107 }
108 }
109
110 #[cfg(feature = "semi_anti_join")]
111 unsafe fn _finish_anti_semi_join(
114 &self,
115 mut idx: &[IdxSize],
116 slice: Option<(i64, usize)>,
117 ) -> DataFrame {
118 let ca_self = self.to_df();
119 if let Some((offset, len)) = slice {
120 idx = slice_slice(idx, offset, len);
121 }
122 ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)
124 }
125
126 #[cfg(feature = "semi_anti_join")]
127 fn _semi_anti_join_from_series(
128 &self,
129 s_left: &Series,
130 s_right: &Series,
131 slice: Option<(i64, usize)>,
132 anti: bool,
133 nulls_equal: bool,
134 ) -> PolarsResult<DataFrame> {
135 let ca_self = self.to_df();
136
137 let idx = s_left.hash_join_semi_anti(s_right, anti, nulls_equal)?;
138 Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })
141 }
142 fn _full_join_from_series(
143 &self,
144 other: &DataFrame,
145 s_left: &Series,
146 s_right: &Series,
147 args: JoinArgs,
148 ) -> PolarsResult<DataFrame> {
149 let df_self = self.to_df();
150
151 let (mut join_idx_l, mut join_idx_r) =
153 s_left.hash_join_outer(s_right, args.validation, args.nulls_equal)?;
154
155 try_raise_keyboard_interrupt();
156 if let Some((offset, len)) = args.slice {
157 let (offset, len) = slice_offsets(offset, len, join_idx_l.len());
158 join_idx_l.slice(offset, len);
159 join_idx_r.slice(offset, len);
160 }
161 let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l);
162 let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r);
163
164 let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None {
165 let mut df = DataFrame::new(vec![
166 idx_ca_l.into_series().into(),
167 idx_ca_r.into_series().into(),
168 ])?;
169
170 let options = SortMultipleOptions::new()
171 .with_order_descending(false)
172 .with_maintain_order(true)
173 .with_nulls_last(true);
174
175 let columns = match args.maintain_order {
176 MaintainOrderJoin::Left => vec!["a"],
177 MaintainOrderJoin::LeftRight => vec!["a", "b"],
178 MaintainOrderJoin::Right => vec!["b"],
179 MaintainOrderJoin::RightLeft => vec!["b", "a"],
180 _ => unreachable!(),
181 };
182
183 df.sort_in_place(columns, options)?;
184
185 let join_tuples_left = df.column("a").unwrap().idx().unwrap();
186 let join_tuples_right = df.column("b").unwrap().idx().unwrap();
187 POOL.join(
188 || unsafe { df_self.take_unchecked(join_tuples_left) },
189 || unsafe { other.take_unchecked(join_tuples_right) },
190 )
191 } else {
192 POOL.join(
193 || unsafe { df_self.take_unchecked(&idx_ca_l) },
194 || unsafe { other.take_unchecked(&idx_ca_r) },
195 )
196 };
197
198 let coalesce = args.coalesce.coalesce(&JoinType::Full);
199 let out = _finish_join(df_left, df_right, args.suffix.clone());
200 if coalesce {
201 Ok(_coalesce_full_join(
202 out?,
203 &[s_left.name().clone()],
204 &[s_right.name().clone()],
205 args.suffix,
206 df_self,
207 ))
208 } else {
209 out
210 }
211 }
212}
213
214impl JoinDispatch for DataFrame {}