polars_ops/frame/join/hash_join/
mod.rs1#![allow(unsafe_op_in_unsafe_fn)]
2pub(super) mod single_keys;
3mod single_keys_dispatch;
4mod single_keys_inner;
5mod single_keys_left;
6mod single_keys_outer;
7#[cfg(feature = "semi_anti_join")]
8mod single_keys_semi_anti;
9pub(super) mod sort_merge;
10use arrow::array::ArrayRef;
11use polars_core::POOL;
12use polars_core::utils::_set_partition_size;
13use polars_utils::index::ChunkId;
14pub(super) use single_keys::*;
15pub use single_keys_dispatch::SeriesJoin;
16#[cfg(feature = "asof_join")]
17pub(super) use single_keys_dispatch::prepare_binary;
18use single_keys_inner::*;
19use single_keys_left::*;
20use single_keys_outer::*;
21#[cfg(feature = "semi_anti_join")]
22use single_keys_semi_anti::*;
23pub(crate) use sort_merge::*;
24
25pub use super::*;
26#[cfg(feature = "chunked_ids")]
27use crate::chunked_array::gather::chunked::TakeChunkedHorPar;
28
29pub fn default_join_ids() -> ChunkJoinOptIds {
30 #[cfg(feature = "chunked_ids")]
31 {
32 Either::Left(vec![])
33 }
34 #[cfg(not(feature = "chunked_ids"))]
35 {
36 vec![]
37 }
38}
39
40macro_rules! det_hash_prone_order {
41 ($self:expr, $other:expr) => {{
42 if $self.len() > $other.len() {
44 ($self, $other, false)
45 } else {
46 ($other, $self, true)
47 }
48 }};
49}
50
51#[cfg(feature = "performant")]
52use arrow::legacy::conversion::primitive_to_vec;
53pub(super) use det_hash_prone_order;
54
55pub trait JoinDispatch: IntoDf {
56 #[cfg(feature = "chunked_ids")]
59 unsafe fn create_left_df_chunked(
60 &self,
61 chunk_ids: &[ChunkId],
62 left_join: bool,
63 was_sliced: bool,
64 ) -> DataFrame {
65 let df_self = self.to_df();
66
67 let left_join_no_duplicate_matches =
68 left_join && !was_sliced && chunk_ids.len() == df_self.height();
69
70 if left_join_no_duplicate_matches {
71 df_self.clone()
72 } else {
73 let sorted = if left_join {
75 IsSorted::Ascending
76 } else {
77 IsSorted::Not
78 };
79 df_self._take_chunked_unchecked_hor_par(chunk_ids, sorted)
80 }
81 }
82
83 unsafe fn _create_left_df_from_slice(
86 &self,
87 join_tuples: &[IdxSize],
88 left_join: bool,
89 was_sliced: bool,
90 sorted_tuple_idx: bool,
91 ) -> DataFrame {
92 let df_self = self.to_df();
93
94 let left_join_no_duplicate_matches =
95 sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height();
96
97 if left_join_no_duplicate_matches {
98 df_self.clone()
99 } else {
100 let sorted = if sorted_tuple_idx {
101 IsSorted::Ascending
102 } else {
103 IsSorted::Not
104 };
105
106 df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)
107 }
108 }
109
110 #[cfg(feature = "semi_anti_join")]
111 unsafe fn _finish_anti_semi_join(
114 &self,
115 mut idx: &[IdxSize],
116 slice: Option<(i64, usize)>,
117 ) -> DataFrame {
118 let ca_self = self.to_df();
119 if let Some((offset, len)) = slice {
120 idx = slice_slice(idx, offset, len);
121 }
122 ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)
124 }
125
126 #[cfg(feature = "semi_anti_join")]
127 fn _semi_anti_join_from_series(
128 &self,
129 s_left: &Series,
130 s_right: &Series,
131 slice: Option<(i64, usize)>,
132 anti: bool,
133 nulls_equal: bool,
134 ) -> PolarsResult<DataFrame> {
135 let ca_self = self.to_df();
136 #[cfg(feature = "dtype-categorical")]
137 _check_categorical_src(s_left.dtype(), s_right.dtype())?;
138
139 let idx = s_left.hash_join_semi_anti(s_right, anti, nulls_equal)?;
140 Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })
143 }
144 fn _full_join_from_series(
145 &self,
146 other: &DataFrame,
147 s_left: &Series,
148 s_right: &Series,
149 args: JoinArgs,
150 ) -> PolarsResult<DataFrame> {
151 let df_self = self.to_df();
152 #[cfg(feature = "dtype-categorical")]
153 _check_categorical_src(s_left.dtype(), s_right.dtype())?;
154
155 let (mut join_idx_l, mut join_idx_r) =
157 s_left.hash_join_outer(s_right, args.validation, args.nulls_equal)?;
158
159 try_raise_keyboard_interrupt();
160 if let Some((offset, len)) = args.slice {
161 let (offset, len) = slice_offsets(offset, len, join_idx_l.len());
162 join_idx_l.slice(offset, len);
163 join_idx_r.slice(offset, len);
164 }
165 let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l);
166 let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r);
167
168 let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None {
169 let mut df = DataFrame::new(vec![
170 idx_ca_l.into_series().into(),
171 idx_ca_r.into_series().into(),
172 ])?;
173
174 let options = SortMultipleOptions::new()
175 .with_order_descending(false)
176 .with_maintain_order(true)
177 .with_nulls_last(true);
178
179 let columns = match args.maintain_order {
180 MaintainOrderJoin::Left => vec!["a"],
181 MaintainOrderJoin::LeftRight => vec!["a", "b"],
182 MaintainOrderJoin::Right => vec!["b"],
183 MaintainOrderJoin::RightLeft => vec!["b", "a"],
184 _ => unreachable!(),
185 };
186
187 df.sort_in_place(columns, options)?;
188
189 let join_tuples_left = df.column("a").unwrap().idx().unwrap();
190 let join_tuples_right = df.column("b").unwrap().idx().unwrap();
191 POOL.join(
192 || unsafe { df_self.take_unchecked(join_tuples_left) },
193 || unsafe { other.take_unchecked(join_tuples_right) },
194 )
195 } else {
196 POOL.join(
197 || unsafe { df_self.take_unchecked(&idx_ca_l) },
198 || unsafe { other.take_unchecked(&idx_ca_r) },
199 )
200 };
201
202 let coalesce = args.coalesce.coalesce(&JoinType::Full);
203 let out = _finish_join(df_left, df_right, args.suffix.clone());
204 if coalesce {
205 Ok(_coalesce_full_join(
206 out?,
207 &[s_left.name().clone()],
208 &[s_right.name().clone()],
209 args.suffix.clone(),
210 df_self,
211 ))
212 } else {
213 out
214 }
215 }
216}
217
218impl JoinDispatch for DataFrame {}