1use arrow::array::PrimitiveArray;
2use polars_core::chunked_array::ops::row_encode::encode_rows_unordered;
3use polars_core::series::BitRepr;
4use polars_core::utils::split;
5use polars_core::with_match_physical_float_polars_type;
6use polars_utils::aliases::PlRandomState;
7use polars_utils::hashing::DirtyHash;
8use polars_utils::nulls::IsNull;
9use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};
10
11use super::*;
12use crate::series::SeriesSealed;
13
14pub trait SeriesJoin: SeriesSealed + Sized {
15 #[doc(hidden)]
16 fn hash_join_left(
17 &self,
18 other: &Series,
19 validate: JoinValidation,
20 nulls_equal: bool,
21 ) -> PolarsResult<LeftJoinIds> {
22 let s_self = self.as_series();
23 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
24 validate.validate_probe(&lhs, &rhs, false, nulls_equal)?;
25
26 let lhs_dtype = lhs.dtype();
27 let rhs_dtype = rhs.dtype();
28
29 use DataType as T;
30 match lhs_dtype {
31 T::String | T::Binary => {
32 let lhs = lhs.cast(&T::Binary).unwrap();
33 let rhs = rhs.cast(&T::Binary).unwrap();
34 let lhs = lhs.binary().unwrap();
35 let rhs = rhs.binary().unwrap();
36 let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
37 let lhs = lhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
38 let rhs = rhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
39 let build_null_count = other.null_count();
40 hash_join_tuples_left(
41 lhs,
42 rhs,
43 None,
44 None,
45 validate,
46 nulls_equal,
47 build_null_count,
48 )
49 },
50 T::BinaryOffset => {
51 let lhs = lhs.binary_offset().unwrap();
52 let rhs = rhs.binary_offset().unwrap();
53 let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
54 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
56 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
57 let build_null_count = other.null_count();
58 hash_join_tuples_left(
59 lhs,
60 rhs,
61 None,
62 None,
63 validate,
64 nulls_equal,
65 build_null_count,
66 )
67 },
68 T::List(_) => {
69 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
70 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
71 lhs.hash_join_left(rhs, validate, nulls_equal)
72 },
73 #[cfg(feature = "dtype-array")]
74 T::Array(_, _) => {
75 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
76 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
77 lhs.hash_join_left(rhs, validate, nulls_equal)
78 },
79 #[cfg(feature = "dtype-struct")]
80 T::Struct(_) => {
81 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
82 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
83 lhs.hash_join_left(rhs, validate, nulls_equal)
84 },
85 x if x.is_float() => {
86 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
87 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
88 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
89 num_group_join_left(lhs, rhs, validate, nulls_equal)
90 })
91 },
92 _ => {
93 let lhs = s_self.bit_repr();
94 let rhs = other.bit_repr();
95
96 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
97 polars_bail!(nyi = "Hash Left Join between {lhs_dtype} and {rhs_dtype}");
98 };
99
100 use BitRepr as B;
101 match (lhs, rhs) {
102 (B::Small(lhs), B::Small(rhs)) => {
103 num_group_join_left::<UInt32Type>(&lhs, &rhs, validate, nulls_equal)
105 },
106 (B::Large(lhs), B::Large(rhs)) => {
107 num_group_join_left::<UInt64Type>(&lhs, &rhs, validate, nulls_equal)
109 },
110 _ => {
111 polars_bail!(
112 nyi = "Mismatch bit repr Hash Left Join between {lhs_dtype} and {rhs_dtype}",
113 );
114 },
115 }
116 },
117 }
118 }
119
120 #[cfg(feature = "semi_anti_join")]
121 fn hash_join_semi_anti(
122 &self,
123 other: &Series,
124 anti: bool,
125 nulls_equal: bool,
126 ) -> PolarsResult<Vec<IdxSize>> {
127 let s_self = self.as_series();
128 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
129
130 let lhs_dtype = lhs.dtype();
131 let rhs_dtype = rhs.dtype();
132
133 use DataType as T;
134 Ok(match lhs_dtype {
135 T::String | T::Binary => {
136 let lhs = lhs.cast(&T::Binary).unwrap();
137 let rhs = rhs.cast(&T::Binary).unwrap();
138 let lhs = lhs.binary().unwrap();
139 let rhs = rhs.binary().unwrap();
140 let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
141 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
143 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
144 if anti {
145 hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
146 } else {
147 hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
148 }
149 },
150 T::BinaryOffset => {
151 let lhs = lhs.binary_offset().unwrap();
152 let rhs = rhs.binary_offset().unwrap();
153 let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
154 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
156 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
157 if anti {
158 hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
159 } else {
160 hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
161 }
162 },
163 T::List(_) => {
164 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
165 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
166 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
167 },
168 #[cfg(feature = "dtype-array")]
169 T::Array(_, _) => {
170 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
171 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
172 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
173 },
174 #[cfg(feature = "dtype-struct")]
175 T::Struct(_) => {
176 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
177 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
178 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
179 },
180 x if x.is_float() => {
181 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
182 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
183 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
184 num_group_join_anti_semi(lhs, rhs, anti, nulls_equal)
185 })
186 },
187 _ => {
188 let lhs = s_self.bit_repr();
189 let rhs = other.bit_repr();
190
191 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
192 polars_bail!(nyi = "Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}");
193 };
194
195 use BitRepr as B;
196 match (lhs, rhs) {
197 (B::Small(lhs), B::Small(rhs)) => {
198 num_group_join_anti_semi::<UInt32Type>(&lhs, &rhs, anti, nulls_equal)
200 },
201 (B::Large(lhs), B::Large(rhs)) => {
202 num_group_join_anti_semi::<UInt64Type>(&lhs, &rhs, anti, nulls_equal)
204 },
205 _ => {
206 polars_bail!(
207 nyi = "Mismatch bit repr Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}",
208 );
209 },
210 }
211 },
212 })
213 }
214
215 fn hash_join_inner(
217 &self,
218 other: &Series,
219 validate: JoinValidation,
220 nulls_equal: bool,
221 ) -> PolarsResult<(InnerJoinIds, bool)> {
222 let s_self = self.as_series();
223 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
224 validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
225
226 let lhs_dtype = lhs.dtype();
227 let rhs_dtype = rhs.dtype();
228
229 use DataType as T;
230 match lhs_dtype {
231 T::String | T::Binary => {
232 let lhs = lhs.cast(&T::Binary).unwrap();
233 let rhs = rhs.cast(&T::Binary).unwrap();
234 let lhs = lhs.binary().unwrap();
235 let rhs = rhs.binary().unwrap();
236 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
237 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
239 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
240 let build_null_count = if swapped {
241 s_self.null_count()
242 } else {
243 other.null_count()
244 };
245 Ok((
246 hash_join_tuples_inner(
247 lhs,
248 rhs,
249 swapped,
250 validate,
251 nulls_equal,
252 build_null_count,
253 )?,
254 !swapped,
255 ))
256 },
257 T::BinaryOffset => {
258 let lhs = lhs.binary_offset().unwrap();
259 let rhs = rhs.binary_offset()?;
260 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
261 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
263 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
264 let build_null_count = if swapped {
265 s_self.null_count()
266 } else {
267 other.null_count()
268 };
269 Ok((
270 hash_join_tuples_inner(
271 lhs,
272 rhs,
273 swapped,
274 validate,
275 nulls_equal,
276 build_null_count,
277 )?,
278 !swapped,
279 ))
280 },
281 T::List(_) => {
282 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
283 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
284 lhs.hash_join_inner(rhs, validate, nulls_equal)
285 },
286 #[cfg(feature = "dtype-array")]
287 T::Array(_, _) => {
288 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
289 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
290 lhs.hash_join_inner(rhs, validate, nulls_equal)
291 },
292 #[cfg(feature = "dtype-struct")]
293 T::Struct(_) => {
294 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
295 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
296 lhs.hash_join_inner(rhs, validate, nulls_equal)
297 },
298 x if x.is_float() => {
299 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
300 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
301 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
302 group_join_inner::<$T>(lhs, rhs, validate, nulls_equal)
303 })
304 },
305 _ => {
306 let lhs = s_self.bit_repr();
307 let rhs = other.bit_repr();
308
309 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
310 polars_bail!(nyi = "Hash Inner Join between {lhs_dtype} and {rhs_dtype}");
311 };
312
313 use BitRepr as B;
314 match (lhs, rhs) {
315 (B::Small(lhs), B::Small(rhs)) => {
316 group_join_inner::<UInt32Type>(&lhs, &rhs, validate, nulls_equal)
318 },
319 (B::Large(lhs), BitRepr::Large(rhs)) => {
320 group_join_inner::<UInt64Type>(&lhs, &rhs, validate, nulls_equal)
322 },
323 _ => {
324 polars_bail!(
325 nyi = "Mismatch bit repr Hash Inner Join between {lhs_dtype} and {rhs_dtype}"
326 );
327 },
328 }
329 },
330 }
331 }
332
333 fn hash_join_outer(
334 &self,
335 other: &Series,
336 validate: JoinValidation,
337 nulls_equal: bool,
338 ) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)> {
339 let s_self = self.as_series();
340 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
341 validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
342
343 let lhs_dtype = lhs.dtype();
344 let rhs_dtype = rhs.dtype();
345
346 use DataType as T;
347 match lhs_dtype {
348 T::String | T::Binary => {
349 let lhs = lhs.cast(&T::Binary).unwrap();
350 let rhs = rhs.cast(&T::Binary).unwrap();
351 let lhs = lhs.binary().unwrap();
352 let rhs = rhs.binary().unwrap();
353 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
354 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
356 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
357 hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
358 },
359 T::BinaryOffset => {
360 let lhs = lhs.binary_offset().unwrap();
361 let rhs = rhs.binary_offset()?;
362 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
363 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
365 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
366 hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
367 },
368 T::List(_) => {
369 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
370 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
371 lhs.hash_join_outer(rhs, validate, nulls_equal)
372 },
373 #[cfg(feature = "dtype-array")]
374 T::Array(_, _) => {
375 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
376 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
377 lhs.hash_join_outer(rhs, validate, nulls_equal)
378 },
379 #[cfg(feature = "dtype-struct")]
380 T::Struct(_) => {
381 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
382 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
383 lhs.hash_join_outer(rhs, validate, nulls_equal)
384 },
385 x if x.is_float() => {
386 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
387 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
388 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
389 hash_join_outer(lhs, rhs, validate, nulls_equal)
390 })
391 },
392 _ => {
393 let (Some(lhs), Some(rhs)) = (s_self.bit_repr(), other.bit_repr()) else {
394 polars_bail!(nyi = "Hash Join Outer between {lhs_dtype} and {rhs_dtype}");
395 };
396
397 use BitRepr as B;
398 match (lhs, rhs) {
399 (B::Small(lhs), B::Small(rhs)) => {
400 hash_join_outer::<UInt32Type>(&lhs, &rhs, validate, nulls_equal)
402 },
403 (B::Large(lhs), B::Large(rhs)) => {
404 hash_join_outer::<UInt64Type>(&lhs, &rhs, validate, nulls_equal)
406 },
407 _ => {
408 polars_bail!(
409 nyi = "Mismatch bit repr Hash Join Outer between {lhs_dtype} and {rhs_dtype}"
410 );
411 },
412 }
413 },
414 }
415 }
416}
417
418impl SeriesJoin for Series {}
419
420fn chunks_as_slices<T>(splitted: &[ChunkedArray<T>]) -> Vec<&[T::Native]>
421where
422 T: PolarsNumericType,
423{
424 splitted
425 .iter()
426 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
427 .collect()
428}
429
430fn get_arrays<T: PolarsDataType>(cas: &[ChunkedArray<T>]) -> Vec<&T::Array> {
431 cas.iter().flat_map(|arr| arr.downcast_iter()).collect()
432}
433
434fn group_join_inner<T>(
435 left: &ChunkedArray<T>,
436 right: &ChunkedArray<T>,
437 validate: JoinValidation,
438 nulls_equal: bool,
439) -> PolarsResult<(InnerJoinIds, bool)>
440where
441 T: PolarsDataType,
442 for<'a> &'a T::Array: IntoIterator<Item = Option<&'a T::Physical<'a>>>,
443 for<'a> T::Physical<'a>:
444 Send + Sync + Copy + TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
445 for<'a> <T::Physical<'a> as ToTotalOrd>::TotalOrdItem:
446 Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
447{
448 let n_threads = POOL.current_num_threads();
449 let (a, b, swapped) = det_hash_prone_order!(left, right);
450 let splitted_a = split(a, n_threads);
451 let splitted_b = split(b, n_threads);
452 let splitted_a = get_arrays(&splitted_a);
453 let splitted_b = get_arrays(&splitted_b);
454
455 match (left.null_count(), right.null_count()) {
456 (0, 0) => {
457 let first = &splitted_a[0];
458 if first.as_slice().is_some() {
459 let splitted_a = splitted_a
460 .iter()
461 .map(|arr| arr.as_slice().unwrap())
462 .collect::<Vec<_>>();
463 let splitted_b = splitted_b
464 .iter()
465 .map(|arr| arr.as_slice().unwrap())
466 .collect::<Vec<_>>();
467 Ok((
468 hash_join_tuples_inner(
469 splitted_a,
470 splitted_b,
471 swapped,
472 validate,
473 nulls_equal,
474 0,
475 )?,
476 !swapped,
477 ))
478 } else {
479 Ok((
480 hash_join_tuples_inner(
481 splitted_a,
482 splitted_b,
483 swapped,
484 validate,
485 nulls_equal,
486 0,
487 )?,
488 !swapped,
489 ))
490 }
491 },
492 _ => {
493 let build_null_count = if swapped {
494 left.null_count()
495 } else {
496 right.null_count()
497 };
498 Ok((
499 hash_join_tuples_inner(
500 splitted_a,
501 splitted_b,
502 swapped,
503 validate,
504 nulls_equal,
505 build_null_count,
506 )?,
507 !swapped,
508 ))
509 },
510 }
511}
512
513#[cfg(feature = "chunked_ids")]
514fn create_mappings(
515 chunks_left: &[ArrayRef],
516 chunks_right: &[ArrayRef],
517 left_len: usize,
518 right_len: usize,
519) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
520 let mapping_left = || {
521 if chunks_left.len() > 1 {
522 Some(create_chunked_index_mapping(chunks_left, left_len))
523 } else {
524 None
525 }
526 };
527
528 let mapping_right = || {
529 if chunks_right.len() > 1 {
530 Some(create_chunked_index_mapping(chunks_right, right_len))
531 } else {
532 None
533 }
534 };
535
536 POOL.join(mapping_left, mapping_right)
537}
538
539#[cfg(not(feature = "chunked_ids"))]
540fn create_mappings(
541 _chunks_left: &[ArrayRef],
542 _chunks_right: &[ArrayRef],
543 _left_len: usize,
544 _right_len: usize,
545) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
546 (None, None)
547}
548
549fn num_group_join_left<T>(
550 left: &ChunkedArray<T>,
551 right: &ChunkedArray<T>,
552 validate: JoinValidation,
553 nulls_equal: bool,
554) -> PolarsResult<LeftJoinIds>
555where
556 T: PolarsNumericType,
557 T::Native: TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
558 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
559 T::Native: DirtyHash + Copy + ToTotalOrd,
560 <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash,
561{
562 let n_threads = POOL.current_num_threads();
563 let splitted_a = split(left, n_threads);
564 let splitted_b = split(right, n_threads);
565 match (
566 left.null_count(),
567 right.null_count(),
568 left.chunks().len(),
569 right.chunks().len(),
570 ) {
571 (0, 0, 1, 1) => {
572 let keys_a = chunks_as_slices(&splitted_a);
573 let keys_b = chunks_as_slices(&splitted_b);
574 hash_join_tuples_left(keys_a, keys_b, None, None, validate, nulls_equal, 0)
575 },
576 (0, 0, _, _) => {
577 let keys_a = chunks_as_slices(&splitted_a);
578 let keys_b = chunks_as_slices(&splitted_b);
579
580 let (mapping_left, mapping_right) =
581 create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
582 hash_join_tuples_left(
583 keys_a,
584 keys_b,
585 mapping_left.as_deref(),
586 mapping_right.as_deref(),
587 validate,
588 nulls_equal,
589 0,
590 )
591 },
592 _ => {
593 let keys_a = get_arrays(&splitted_a);
594 let keys_b = get_arrays(&splitted_b);
595 let (mapping_left, mapping_right) =
596 create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
597 let build_null_count = right.null_count();
598 hash_join_tuples_left(
599 keys_a,
600 keys_b,
601 mapping_left.as_deref(),
602 mapping_right.as_deref(),
603 validate,
604 nulls_equal,
605 build_null_count,
606 )
607 },
608 }
609}
610
611fn hash_join_outer<T>(
612 ca_in: &ChunkedArray<T>,
613 other: &ChunkedArray<T>,
614 validate: JoinValidation,
615 nulls_equal: bool,
616) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)>
617where
618 T: PolarsNumericType,
619 T::Native: TotalHash + TotalEq + ToTotalOrd,
620 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + IsNull,
621{
622 let (a, b, swapped) = det_hash_prone_order!(ca_in, other);
623
624 let n_partitions = _set_partition_size();
625 let splitted_a = split(a, n_partitions);
626 let splitted_b = split(b, n_partitions);
627
628 match (a.null_count(), b.null_count()) {
629 (0, 0) => {
630 let iters_a = splitted_a
631 .iter()
632 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
633 .collect::<Vec<_>>();
634 let iters_b = splitted_b
635 .iter()
636 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
637 .collect::<Vec<_>>();
638 hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
639 },
640 _ => {
641 let iters_a = splitted_a
642 .iter()
643 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
644 .collect::<Vec<_>>();
645 let iters_b = splitted_b
646 .iter()
647 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
648 .collect::<Vec<_>>();
649 hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
650 },
651 }
652}
653
654pub(crate) fn prepare_binary<'a, T>(
655 ca: &'a ChunkedArray<T>,
656 other: &'a ChunkedArray<T>,
657 build_shortest_table: bool,
660) -> (
661 Vec<Vec<BytesHash<'a>>>,
662 Vec<Vec<BytesHash<'a>>>,
663 bool,
664 PlRandomState,
665)
666where
667 T: PolarsDataType,
668 for<'b> <T::Array as StaticArray>::ValueT<'b>: AsRef<[u8]>,
669{
670 let (a, b, swapped) = if build_shortest_table {
671 det_hash_prone_order!(ca, other)
672 } else {
673 (ca, other, false)
674 };
675 let hb = PlRandomState::default();
676 let bh_a = a.to_bytes_hashes(true, hb);
677 let bh_b = b.to_bytes_hashes(true, hb);
678
679 (bh_a, bh_b, swapped, hb)
680}
681
682#[cfg(feature = "semi_anti_join")]
683fn num_group_join_anti_semi<T>(
684 left: &ChunkedArray<T>,
685 right: &ChunkedArray<T>,
686 anti: bool,
687 nulls_equal: bool,
688) -> Vec<IdxSize>
689where
690 T: PolarsNumericType,
691 T::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
692 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
693 <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash + IsNull,
694{
695 let n_threads = POOL.current_num_threads();
696 let splitted_a = split(left, n_threads);
697 let splitted_b = split(right, n_threads);
698 match (
699 left.null_count(),
700 right.null_count(),
701 left.chunks().len(),
702 right.chunks().len(),
703 ) {
704 (0, 0, 1, 1) => {
705 let keys_a = chunks_as_slices(&splitted_a);
706 let keys_b = chunks_as_slices(&splitted_b);
707 if anti {
708 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
709 } else {
710 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
711 }
712 },
713 (0, 0, _, _) => {
714 let keys_a = chunks_as_slices(&splitted_a);
715 let keys_b = chunks_as_slices(&splitted_b);
716 if anti {
717 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
718 } else {
719 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
720 }
721 },
722 _ => {
723 let keys_a = get_arrays(&splitted_a);
724 let keys_b = get_arrays(&splitted_b);
725 if anti {
726 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
727 } else {
728 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
729 }
730 },
731 }
732}