1use arrow::array::PrimitiveArray;
2use polars_core::chunked_array::ops::row_encode::encode_rows_unordered;
3use polars_core::series::BitRepr;
4use polars_core::utils::split;
5use polars_core::with_match_physical_float_polars_type;
6use polars_utils::aliases::PlRandomState;
7use polars_utils::hashing::DirtyHash;
8use polars_utils::nulls::IsNull;
9use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};
10
11use super::*;
12use crate::series::SeriesSealed;
13
14pub trait SeriesJoin: SeriesSealed + Sized {
15 #[doc(hidden)]
16 fn hash_join_left(
17 &self,
18 other: &Series,
19 validate: JoinValidation,
20 nulls_equal: bool,
21 ) -> PolarsResult<LeftJoinIds> {
22 let s_self = self.as_series();
23 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
24 validate.validate_probe(&lhs, &rhs, false, nulls_equal)?;
25
26 let lhs_dtype = lhs.dtype();
27 let rhs_dtype = rhs.dtype();
28
29 use DataType as T;
30 match lhs_dtype {
31 T::String | T::Binary => {
32 let lhs = lhs.cast(&T::Binary).unwrap();
33 let rhs = rhs.cast(&T::Binary).unwrap();
34 let lhs = lhs.binary().unwrap();
35 let rhs = rhs.binary().unwrap();
36 let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
37 let lhs = lhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
38 let rhs = rhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
39 let build_null_count = other.null_count();
40 hash_join_tuples_left(
41 lhs,
42 rhs,
43 None,
44 None,
45 validate,
46 nulls_equal,
47 build_null_count,
48 )
49 },
50 T::BinaryOffset => {
51 let lhs = lhs.binary_offset().unwrap();
52 let rhs = rhs.binary_offset().unwrap();
53 let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
54 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
56 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
57 let build_null_count = other.null_count();
58 hash_join_tuples_left(
59 lhs,
60 rhs,
61 None,
62 None,
63 validate,
64 nulls_equal,
65 build_null_count,
66 )
67 },
68 T::List(_) => {
69 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
70 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
71 lhs.hash_join_left(rhs, validate, nulls_equal)
72 },
73 #[cfg(feature = "dtype-array")]
74 T::Array(_, _) => {
75 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
76 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
77 lhs.hash_join_left(rhs, validate, nulls_equal)
78 },
79 #[cfg(feature = "dtype-struct")]
80 T::Struct(_) => {
81 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
82 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
83 lhs.hash_join_left(rhs, validate, nulls_equal)
84 },
85 x if x.is_float() => {
86 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
87 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
88 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
89 num_group_join_left(lhs, rhs, validate, nulls_equal)
90 })
91 },
92 _ => {
93 let lhs = s_self.bit_repr();
94 let rhs = other.bit_repr();
95
96 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
97 polars_bail!(nyi = "Hash Left Join between {lhs_dtype} and {rhs_dtype}");
98 };
99
100 use BitRepr as B;
101 match (lhs, rhs) {
102 (B::U8(lhs), B::U8(rhs)) => {
103 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
104 },
105 (B::U16(lhs), B::U16(rhs)) => {
106 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
107 },
108 (B::U32(lhs), B::U32(rhs)) => {
109 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
110 },
111 (B::U64(lhs), B::U64(rhs)) => {
112 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
113 },
114 #[cfg(feature = "dtype-u128")]
115 (B::U128(lhs), B::U128(rhs)) => {
116 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
117 },
118 _ => {
119 polars_bail!(
120 nyi = "Mismatch bit repr Hash Left Join between {lhs_dtype} and {rhs_dtype}",
121 );
122 },
123 }
124 },
125 }
126 }
127
128 #[cfg(feature = "semi_anti_join")]
129 fn hash_join_semi_anti(
130 &self,
131 other: &Series,
132 anti: bool,
133 nulls_equal: bool,
134 ) -> PolarsResult<Vec<IdxSize>> {
135 let s_self = self.as_series();
136 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
137
138 let lhs_dtype = lhs.dtype();
139 let rhs_dtype = rhs.dtype();
140
141 use DataType as T;
142 Ok(match lhs_dtype {
143 T::String | T::Binary => {
144 let lhs = lhs.cast(&T::Binary).unwrap();
145 let rhs = rhs.cast(&T::Binary).unwrap();
146 let lhs = lhs.binary().unwrap();
147 let rhs = rhs.binary().unwrap();
148 let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
149 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
151 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
152 if anti {
153 hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
154 } else {
155 hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
156 }
157 },
158 T::BinaryOffset => {
159 let lhs = lhs.binary_offset().unwrap();
160 let rhs = rhs.binary_offset().unwrap();
161 let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
162 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
164 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
165 if anti {
166 hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
167 } else {
168 hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
169 }
170 },
171 T::List(_) => {
172 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
173 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
174 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
175 },
176 #[cfg(feature = "dtype-array")]
177 T::Array(_, _) => {
178 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
179 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
180 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
181 },
182 #[cfg(feature = "dtype-struct")]
183 T::Struct(_) => {
184 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
185 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
186 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
187 },
188 x if x.is_float() => {
189 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
190 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
191 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
192 num_group_join_anti_semi(lhs, rhs, anti, nulls_equal)
193 })
194 },
195 _ => {
196 let lhs = s_self.bit_repr();
197 let rhs = other.bit_repr();
198
199 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
200 polars_bail!(nyi = "Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}");
201 };
202
203 use BitRepr as B;
204 match (lhs, rhs) {
205 (B::U8(lhs), B::U8(rhs)) => {
206 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
207 },
208 (B::U16(lhs), B::U16(rhs)) => {
209 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
210 },
211 (B::U32(lhs), B::U32(rhs)) => {
212 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
213 },
214 (B::U64(lhs), B::U64(rhs)) => {
215 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
216 },
217 #[cfg(feature = "dtype-u128")]
218 (B::U128(lhs), B::U128(rhs)) => {
219 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
220 },
221 _ => {
222 polars_bail!(
223 nyi = "Mismatch bit repr Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}",
224 );
225 },
226 }
227 },
228 })
229 }
230
231 fn hash_join_inner(
233 &self,
234 other: &Series,
235 validate: JoinValidation,
236 nulls_equal: bool,
237 ) -> PolarsResult<(InnerJoinIds, bool)> {
238 let s_self = self.as_series();
239 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
240 validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
241
242 let lhs_dtype = lhs.dtype();
243 let rhs_dtype = rhs.dtype();
244
245 use DataType as T;
246 match lhs_dtype {
247 T::String | T::Binary => {
248 let lhs = lhs.cast(&T::Binary).unwrap();
249 let rhs = rhs.cast(&T::Binary).unwrap();
250 let lhs = lhs.binary().unwrap();
251 let rhs = rhs.binary().unwrap();
252 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
253 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
255 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
256 let build_null_count = if swapped {
257 s_self.null_count()
258 } else {
259 other.null_count()
260 };
261 Ok((
262 hash_join_tuples_inner(
263 lhs,
264 rhs,
265 swapped,
266 validate,
267 nulls_equal,
268 build_null_count,
269 )?,
270 !swapped,
271 ))
272 },
273 T::BinaryOffset => {
274 let lhs = lhs.binary_offset().unwrap();
275 let rhs = rhs.binary_offset()?;
276 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
277 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
279 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
280 let build_null_count = if swapped {
281 s_self.null_count()
282 } else {
283 other.null_count()
284 };
285 Ok((
286 hash_join_tuples_inner(
287 lhs,
288 rhs,
289 swapped,
290 validate,
291 nulls_equal,
292 build_null_count,
293 )?,
294 !swapped,
295 ))
296 },
297 T::List(_) => {
298 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
299 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
300 lhs.hash_join_inner(rhs, validate, nulls_equal)
301 },
302 #[cfg(feature = "dtype-array")]
303 T::Array(_, _) => {
304 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
305 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
306 lhs.hash_join_inner(rhs, validate, nulls_equal)
307 },
308 #[cfg(feature = "dtype-struct")]
309 T::Struct(_) => {
310 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
311 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
312 lhs.hash_join_inner(rhs, validate, nulls_equal)
313 },
314 x if x.is_float() => {
315 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
316 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
317 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
318 group_join_inner::<$T>(lhs, rhs, validate, nulls_equal)
319 })
320 },
321 _ => {
322 let lhs = s_self.bit_repr();
323 let rhs = other.bit_repr();
324
325 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
326 polars_bail!(nyi = "Hash Inner Join between {lhs_dtype} and {rhs_dtype}");
327 };
328
329 use BitRepr as B;
330 match (lhs, rhs) {
331 (B::U8(lhs), B::U8(rhs)) => group_join_inner(&lhs, &rhs, validate, nulls_equal),
332 (B::U16(lhs), B::U16(rhs)) => {
333 group_join_inner(&lhs, &rhs, validate, nulls_equal)
334 },
335 (B::U32(lhs), B::U32(rhs)) => {
336 group_join_inner(&lhs, &rhs, validate, nulls_equal)
337 },
338 (B::U64(lhs), BitRepr::U64(rhs)) => {
339 group_join_inner(&lhs, &rhs, validate, nulls_equal)
340 },
341 #[cfg(feature = "dtype-u128")]
342 (B::U128(lhs), BitRepr::U128(rhs)) => {
343 group_join_inner(&lhs, &rhs, validate, nulls_equal)
344 },
345 _ => {
346 polars_bail!(
347 nyi = "Mismatch bit repr Hash Inner Join between {lhs_dtype} and {rhs_dtype}"
348 );
349 },
350 }
351 },
352 }
353 }
354
355 fn hash_join_outer(
356 &self,
357 other: &Series,
358 validate: JoinValidation,
359 nulls_equal: bool,
360 ) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)> {
361 let s_self = self.as_series();
362 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
363 validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
364
365 let lhs_dtype = lhs.dtype();
366 let rhs_dtype = rhs.dtype();
367
368 use DataType as T;
369 match lhs_dtype {
370 T::String | T::Binary => {
371 let lhs = lhs.cast(&T::Binary).unwrap();
372 let rhs = rhs.cast(&T::Binary).unwrap();
373 let lhs = lhs.binary().unwrap();
374 let rhs = rhs.binary().unwrap();
375 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
376 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
378 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
379 hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
380 },
381 T::BinaryOffset => {
382 let lhs = lhs.binary_offset().unwrap();
383 let rhs = rhs.binary_offset()?;
384 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
385 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
387 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
388 hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
389 },
390 T::List(_) => {
391 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
392 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
393 lhs.hash_join_outer(rhs, validate, nulls_equal)
394 },
395 #[cfg(feature = "dtype-array")]
396 T::Array(_, _) => {
397 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
398 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
399 lhs.hash_join_outer(rhs, validate, nulls_equal)
400 },
401 #[cfg(feature = "dtype-struct")]
402 T::Struct(_) => {
403 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
404 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
405 lhs.hash_join_outer(rhs, validate, nulls_equal)
406 },
407 x if x.is_float() => {
408 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
409 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
410 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
411 hash_join_outer(lhs, rhs, validate, nulls_equal)
412 })
413 },
414 _ => {
415 let (Some(lhs), Some(rhs)) = (s_self.bit_repr(), other.bit_repr()) else {
416 polars_bail!(nyi = "Hash Join Outer between {lhs_dtype} and {rhs_dtype}");
417 };
418
419 use BitRepr as B;
420 match (lhs, rhs) {
421 (B::U8(lhs), B::U8(rhs)) => hash_join_outer(&lhs, &rhs, validate, nulls_equal),
422 (B::U16(lhs), B::U16(rhs)) => {
423 hash_join_outer(&lhs, &rhs, validate, nulls_equal)
424 },
425 (B::U32(lhs), B::U32(rhs)) => {
426 hash_join_outer(&lhs, &rhs, validate, nulls_equal)
427 },
428 (B::U64(lhs), B::U64(rhs)) => {
429 hash_join_outer(&lhs, &rhs, validate, nulls_equal)
430 },
431 #[cfg(feature = "dtype-u128")]
432 (B::U128(lhs), B::U128(rhs)) => {
433 hash_join_outer(&lhs, &rhs, validate, nulls_equal)
434 },
435 _ => {
436 polars_bail!(
437 nyi = "Mismatch bit repr Hash Join Outer between {lhs_dtype} and {rhs_dtype}"
438 );
439 },
440 }
441 },
442 }
443 }
444}
445
446impl SeriesJoin for Series {}
447
448fn chunks_as_slices<T>(splitted: &[ChunkedArray<T>]) -> Vec<&[T::Native]>
449where
450 T: PolarsNumericType,
451{
452 splitted
453 .iter()
454 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
455 .collect()
456}
457
458fn get_arrays<T: PolarsDataType>(cas: &[ChunkedArray<T>]) -> Vec<&T::Array> {
459 cas.iter().flat_map(|arr| arr.downcast_iter()).collect()
460}
461
462fn group_join_inner<T>(
463 left: &ChunkedArray<T>,
464 right: &ChunkedArray<T>,
465 validate: JoinValidation,
466 nulls_equal: bool,
467) -> PolarsResult<(InnerJoinIds, bool)>
468where
469 T: PolarsDataType,
470 for<'a> &'a T::Array: IntoIterator<Item = Option<&'a T::Physical<'a>>>,
471 for<'a> T::Physical<'a>:
472 Send + Sync + Copy + TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
473 for<'a> <T::Physical<'a> as ToTotalOrd>::TotalOrdItem:
474 Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
475{
476 let n_threads = POOL.current_num_threads();
477 let (a, b, swapped) = det_hash_prone_order!(left, right);
478 let splitted_a = split(a, n_threads);
479 let splitted_b = split(b, n_threads);
480 let splitted_a = get_arrays(&splitted_a);
481 let splitted_b = get_arrays(&splitted_b);
482
483 match (left.null_count(), right.null_count()) {
484 (0, 0) => {
485 let first = &splitted_a[0];
486 if first.as_slice().is_some() {
487 let splitted_a = splitted_a
488 .iter()
489 .map(|arr| arr.as_slice().unwrap())
490 .collect::<Vec<_>>();
491 let splitted_b = splitted_b
492 .iter()
493 .map(|arr| arr.as_slice().unwrap())
494 .collect::<Vec<_>>();
495 Ok((
496 hash_join_tuples_inner(
497 splitted_a,
498 splitted_b,
499 swapped,
500 validate,
501 nulls_equal,
502 0,
503 )?,
504 !swapped,
505 ))
506 } else {
507 Ok((
508 hash_join_tuples_inner(
509 splitted_a,
510 splitted_b,
511 swapped,
512 validate,
513 nulls_equal,
514 0,
515 )?,
516 !swapped,
517 ))
518 }
519 },
520 _ => {
521 let build_null_count = if swapped {
522 left.null_count()
523 } else {
524 right.null_count()
525 };
526 Ok((
527 hash_join_tuples_inner(
528 splitted_a,
529 splitted_b,
530 swapped,
531 validate,
532 nulls_equal,
533 build_null_count,
534 )?,
535 !swapped,
536 ))
537 },
538 }
539}
540
541#[cfg(feature = "chunked_ids")]
542fn create_mappings(
543 chunks_left: &[ArrayRef],
544 chunks_right: &[ArrayRef],
545 left_len: usize,
546 right_len: usize,
547) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
548 let mapping_left = || {
549 if chunks_left.len() > 1 {
550 Some(create_chunked_index_mapping(chunks_left, left_len))
551 } else {
552 None
553 }
554 };
555
556 let mapping_right = || {
557 if chunks_right.len() > 1 {
558 Some(create_chunked_index_mapping(chunks_right, right_len))
559 } else {
560 None
561 }
562 };
563
564 POOL.join(mapping_left, mapping_right)
565}
566
567#[cfg(not(feature = "chunked_ids"))]
568fn create_mappings(
569 _chunks_left: &[ArrayRef],
570 _chunks_right: &[ArrayRef],
571 _left_len: usize,
572 _right_len: usize,
573) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
574 (None, None)
575}
576
577fn num_group_join_left<T>(
578 left: &ChunkedArray<T>,
579 right: &ChunkedArray<T>,
580 validate: JoinValidation,
581 nulls_equal: bool,
582) -> PolarsResult<LeftJoinIds>
583where
584 T: PolarsNumericType,
585 T::Native: TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
586 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
587 T::Native: DirtyHash + Copy + ToTotalOrd,
588 <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash,
589{
590 let n_threads = POOL.current_num_threads();
591 let splitted_a = split(left, n_threads);
592 let splitted_b = split(right, n_threads);
593 match (
594 left.null_count(),
595 right.null_count(),
596 left.chunks().len(),
597 right.chunks().len(),
598 ) {
599 (0, 0, 1, 1) => {
600 let keys_a = chunks_as_slices(&splitted_a);
601 let keys_b = chunks_as_slices(&splitted_b);
602 hash_join_tuples_left(keys_a, keys_b, None, None, validate, nulls_equal, 0)
603 },
604 (0, 0, _, _) => {
605 let keys_a = chunks_as_slices(&splitted_a);
606 let keys_b = chunks_as_slices(&splitted_b);
607
608 let (mapping_left, mapping_right) =
609 create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
610 hash_join_tuples_left(
611 keys_a,
612 keys_b,
613 mapping_left.as_deref(),
614 mapping_right.as_deref(),
615 validate,
616 nulls_equal,
617 0,
618 )
619 },
620 _ => {
621 let keys_a = get_arrays(&splitted_a);
622 let keys_b = get_arrays(&splitted_b);
623 let (mapping_left, mapping_right) =
624 create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
625 let build_null_count = right.null_count();
626 hash_join_tuples_left(
627 keys_a,
628 keys_b,
629 mapping_left.as_deref(),
630 mapping_right.as_deref(),
631 validate,
632 nulls_equal,
633 build_null_count,
634 )
635 },
636 }
637}
638
639fn hash_join_outer<T>(
640 ca_in: &ChunkedArray<T>,
641 other: &ChunkedArray<T>,
642 validate: JoinValidation,
643 nulls_equal: bool,
644) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)>
645where
646 T: PolarsNumericType,
647 T::Native: TotalHash + TotalEq + ToTotalOrd,
648 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + IsNull,
649{
650 let (a, b, swapped) = det_hash_prone_order!(ca_in, other);
651
652 let n_partitions = _set_partition_size();
653 let splitted_a = split(a, n_partitions);
654 let splitted_b = split(b, n_partitions);
655
656 match (a.null_count(), b.null_count()) {
657 (0, 0) => {
658 let iters_a = splitted_a
659 .iter()
660 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
661 .collect::<Vec<_>>();
662 let iters_b = splitted_b
663 .iter()
664 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
665 .collect::<Vec<_>>();
666 hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
667 },
668 _ => {
669 let iters_a = splitted_a
670 .iter()
671 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
672 .collect::<Vec<_>>();
673 let iters_b = splitted_b
674 .iter()
675 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
676 .collect::<Vec<_>>();
677 hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
678 },
679 }
680}
681
682pub(crate) fn prepare_binary<'a, T>(
683 ca: &'a ChunkedArray<T>,
684 other: &'a ChunkedArray<T>,
685 build_shortest_table: bool,
688) -> (
689 Vec<Vec<BytesHash<'a>>>,
690 Vec<Vec<BytesHash<'a>>>,
691 bool,
692 PlRandomState,
693)
694where
695 T: PolarsDataType,
696 for<'b> <T::Array as StaticArray>::ValueT<'b>: AsRef<[u8]>,
697{
698 let (a, b, swapped) = if build_shortest_table {
699 det_hash_prone_order!(ca, other)
700 } else {
701 (ca, other, false)
702 };
703 let hb = PlRandomState::default();
704 let bh_a = a.to_bytes_hashes(true, hb);
705 let bh_b = b.to_bytes_hashes(true, hb);
706
707 (bh_a, bh_b, swapped, hb)
708}
709
710#[cfg(feature = "semi_anti_join")]
711fn num_group_join_anti_semi<T>(
712 left: &ChunkedArray<T>,
713 right: &ChunkedArray<T>,
714 anti: bool,
715 nulls_equal: bool,
716) -> Vec<IdxSize>
717where
718 T: PolarsNumericType,
719 T::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
720 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
721 <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash + IsNull,
722{
723 let n_threads = POOL.current_num_threads();
724 let splitted_a = split(left, n_threads);
725 let splitted_b = split(right, n_threads);
726 match (
727 left.null_count(),
728 right.null_count(),
729 left.chunks().len(),
730 right.chunks().len(),
731 ) {
732 (0, 0, 1, 1) => {
733 let keys_a = chunks_as_slices(&splitted_a);
734 let keys_b = chunks_as_slices(&splitted_b);
735 if anti {
736 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
737 } else {
738 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
739 }
740 },
741 (0, 0, _, _) => {
742 let keys_a = chunks_as_slices(&splitted_a);
743 let keys_b = chunks_as_slices(&splitted_b);
744 if anti {
745 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
746 } else {
747 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
748 }
749 },
750 _ => {
751 let keys_a = get_arrays(&splitted_a);
752 let keys_b = get_arrays(&splitted_b);
753 if anti {
754 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
755 } else {
756 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
757 }
758 },
759 }
760}