polars_ops/frame/join/hash_join/
mod.rs1#![allow(unsafe_op_in_unsafe_fn)]
2pub(super) mod single_keys;
3mod single_keys_dispatch;
4mod single_keys_inner;
5mod single_keys_left;
6mod single_keys_outer;
7#[cfg(feature = "semi_anti_join")]
8mod single_keys_semi_anti;
9pub(super) mod sort_merge;
10use arrow::array::ArrayRef;
11use polars_core::runtime::RAYON;
12use polars_core::utils::_set_partition_size;
13use polars_utils::index::ChunkId;
14use polars_utils::unique_column_name;
15pub(super) use single_keys::*;
16pub use single_keys_dispatch::SeriesJoin;
17#[cfg(feature = "asof_join")]
18pub(super) use single_keys_dispatch::prepare_binary;
19use single_keys_inner::*;
20use single_keys_left::*;
21use single_keys_outer::*;
22#[cfg(feature = "semi_anti_join")]
23use single_keys_semi_anti::*;
24pub(crate) use sort_merge::*;
25
26pub use super::*;
27#[cfg(feature = "chunked_ids")]
28use crate::chunked_array::gather::chunked::TakeChunkedHorPar;
29
30pub fn default_join_ids() -> ChunkJoinOptIds {
31 #[cfg(feature = "chunked_ids")]
32 {
33 Either::Left(vec![])
34 }
35 #[cfg(not(feature = "chunked_ids"))]
36 {
37 vec![]
38 }
39}
40
41macro_rules! det_hash_prone_order {
42 ($self:expr, $other:expr) => {{
43 if $self.len() > $other.len() {
45 ($self, $other, false)
46 } else {
47 ($other, $self, true)
48 }
49 }};
50}
51
52#[cfg(feature = "performant")]
53use arrow::legacy::conversion::primitive_to_vec;
54pub(super) use det_hash_prone_order;
55
56pub trait JoinDispatch: IntoDf {
57 #[cfg(feature = "chunked_ids")]
60 unsafe fn create_left_df_chunked(
61 &self,
62 chunk_ids: &[ChunkId],
63 left_join: bool,
64 was_sliced: bool,
65 ) -> DataFrame {
66 let df_self = self.to_df();
67
68 let left_join_no_duplicate_matches =
69 left_join && !was_sliced && chunk_ids.len() == df_self.height();
70
71 if left_join_no_duplicate_matches {
72 df_self.clone()
73 } else {
74 let sorted = if left_join {
76 IsSorted::Ascending
77 } else {
78 IsSorted::Not
79 };
80 df_self._take_chunked_unchecked_hor_par(chunk_ids, sorted)
81 }
82 }
83
84 unsafe fn _create_left_df_from_slice(
87 &self,
88 join_tuples: &[IdxSize],
89 left_join: bool,
90 was_sliced: bool,
91 sorted_tuple_idx: bool,
92 ) -> DataFrame {
93 let df_self = self.to_df();
94
95 let left_join_no_duplicate_matches =
96 sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height();
97
98 if left_join_no_duplicate_matches {
99 df_self.clone()
100 } else {
101 let sorted = if sorted_tuple_idx {
102 IsSorted::Ascending
103 } else {
104 IsSorted::Not
105 };
106
107 df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)
108 }
109 }
110
111 #[cfg(feature = "semi_anti_join")]
112 unsafe fn _finish_anti_semi_join(
115 &self,
116 mut idx: &[IdxSize],
117 slice: Option<(i64, usize)>,
118 ) -> DataFrame {
119 let ca_self = self.to_df();
120 if let Some((offset, len)) = slice {
121 idx = slice_slice(idx, offset, len);
122 }
123 ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)
125 }
126
127 #[cfg(feature = "semi_anti_join")]
128 fn _semi_anti_join_from_series(
129 &self,
130 s_left: &Series,
131 s_right: &Series,
132 slice: Option<(i64, usize)>,
133 anti: bool,
134 nulls_equal: bool,
135 ) -> PolarsResult<DataFrame> {
136 let ca_self = self.to_df();
137
138 let idx = s_left.hash_join_semi_anti(s_right, anti, nulls_equal)?;
139 Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })
142 }
143 fn _full_join_from_series(
144 &self,
145 other: &DataFrame,
146 s_left: &Series,
147 s_right: &Series,
148 args: JoinArgs,
149 ) -> PolarsResult<DataFrame> {
150 let df_self = self.to_df();
151
152 let (mut join_idx_l, mut join_idx_r) =
154 s_left.hash_join_outer(s_right, args.validation, args.nulls_equal)?;
155
156 try_raise_keyboard_interrupt();
157 if let Some((offset, len)) = args.slice {
158 let (offset, len) = slice_offsets(offset, len, join_idx_l.len());
159 join_idx_l.slice(offset, len);
160 join_idx_r.slice(offset, len);
161 }
162 let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l);
163 let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r);
164
165 let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None {
166 let mut df = unsafe {
167 DataFrame::new_unchecked_infer_height(vec![
168 idx_ca_l.into_series().into(),
169 idx_ca_r.into_series().into(),
170 ])
171 };
172
173 let options = SortMultipleOptions::new()
174 .with_order_descending(false)
175 .with_maintain_order(true)
176 .with_nulls_last(true);
177
178 let columns = match args.maintain_order {
179 MaintainOrderJoin::Left => vec!["a"],
180 MaintainOrderJoin::LeftRight => vec!["a", "b"],
181 MaintainOrderJoin::Right => vec!["b"],
182 MaintainOrderJoin::RightLeft => vec!["b", "a"],
183 _ => unreachable!(),
184 };
185
186 df.sort_in_place(columns, options)?;
187
188 let join_tuples_left = df.column("a").unwrap().idx().unwrap();
189 let join_tuples_right = df.column("b").unwrap().idx().unwrap();
190 RAYON.join(
191 || unsafe { df_self.take_unchecked(join_tuples_left) },
192 || unsafe { other.take_unchecked(join_tuples_right) },
193 )
194 } else {
195 RAYON.join(
196 || unsafe { df_self.take_unchecked(&idx_ca_l) },
197 || unsafe { other.take_unchecked(&idx_ca_r) },
198 )
199 };
200
201 let coalesce = args.coalesce.coalesce(&JoinType::Full);
202 if coalesce {
203 let tmp_right_name = unique_column_name();
204 let mut df_right = df_right;
205 df_right.rename(s_right.name().as_str(), tmp_right_name.clone())?;
206 let out = _finish_join(df_left, df_right, args.suffix.clone())?;
207 Ok(_coalesce_full_join(
208 out,
209 &[s_left.name().clone()],
210 &[tmp_right_name],
211 args.suffix,
212 df_self,
213 ))
214 } else {
215 _finish_join(df_left, df_right, args.suffix.clone())
216 }
217 }
218}
219
220impl JoinDispatch for DataFrame {}