1use arrow::array::PrimitiveArray;
2use polars_core::chunked_array::ops::row_encode::encode_rows_unordered;
3use polars_core::series::BitRepr;
4use polars_core::utils::split;
5use polars_core::with_match_physical_float_polars_type;
6use polars_utils::aliases::PlRandomState;
7use polars_utils::hashing::DirtyHash;
8use polars_utils::nulls::IsNull;
9use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};
10
11use super::*;
12use crate::series::SeriesSealed;
13
14pub trait SeriesJoin: SeriesSealed + Sized {
15 #[doc(hidden)]
16 fn hash_join_left(
17 &self,
18 other: &Series,
19 validate: JoinValidation,
20 nulls_equal: bool,
21 ) -> PolarsResult<LeftJoinIds> {
22 let s_self = self.as_series();
23 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
24 validate.validate_probe(&lhs, &rhs, false, nulls_equal)?;
25
26 let lhs_dtype = lhs.dtype();
27 let rhs_dtype = rhs.dtype();
28
29 use DataType as T;
30 match lhs_dtype {
31 T::String | T::Binary => {
32 let lhs = lhs.cast(&T::Binary).unwrap();
33 let rhs = rhs.cast(&T::Binary).unwrap();
34 let lhs = lhs.binary().unwrap();
35 let rhs = rhs.binary().unwrap();
36 let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
37 let lhs = lhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
38 let rhs = rhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
39 let build_null_count = other.null_count();
40 hash_join_tuples_left(
41 lhs,
42 rhs,
43 None,
44 None,
45 validate,
46 nulls_equal,
47 build_null_count,
48 )
49 },
50 T::BinaryOffset => {
51 let lhs = lhs.binary_offset().unwrap();
52 let rhs = rhs.binary_offset().unwrap();
53 let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
54 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
56 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
57 let build_null_count = other.null_count();
58 hash_join_tuples_left(
59 lhs,
60 rhs,
61 None,
62 None,
63 validate,
64 nulls_equal,
65 build_null_count,
66 )
67 },
68 T::List(_) => {
69 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
70 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
71 lhs.hash_join_left(rhs, validate, nulls_equal)
72 },
73 #[cfg(feature = "dtype-array")]
74 T::Array(_, _) => {
75 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
76 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
77 lhs.hash_join_left(rhs, validate, nulls_equal)
78 },
79 #[cfg(feature = "dtype-struct")]
80 T::Struct(_) => {
81 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
82 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
83 lhs.hash_join_left(rhs, validate, nulls_equal)
84 },
85 x if x.is_float() => {
86 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
87 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
88 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
89 num_group_join_left(lhs, rhs, validate, nulls_equal)
90 })
91 },
92 _ => {
93 let lhs = s_self.bit_repr();
94 let rhs = other.bit_repr();
95
96 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
97 polars_bail!(nyi = "Hash Left Join between {lhs_dtype} and {rhs_dtype}");
98 };
99
100 use BitRepr as B;
101 match (lhs, rhs) {
102 (B::U8(lhs), B::U8(rhs)) => {
103 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
104 },
105 (B::U16(lhs), B::U16(rhs)) => {
106 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
107 },
108 (B::U32(lhs), B::U32(rhs)) => {
109 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
110 },
111 (B::U64(lhs), B::U64(rhs)) => {
112 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
113 },
114 #[cfg(feature = "dtype-u128")]
115 (B::U128(lhs), B::U128(rhs)) => {
116 num_group_join_left(&lhs, &rhs, validate, nulls_equal)
117 },
118 _ => {
119 polars_bail!(
120 nyi = "Mismatch bit repr Hash Left Join between {lhs_dtype} and {rhs_dtype}",
121 );
122 },
123 }
124 },
125 }
126 }
127
128 #[cfg(feature = "semi_anti_join")]
129 fn hash_join_semi_anti(
130 &self,
131 other: &Series,
132 anti: bool,
133 nulls_equal: bool,
134 ) -> PolarsResult<Vec<IdxSize>> {
135 let s_self = self.as_series();
136 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
137
138 let lhs_dtype = lhs.dtype();
139 let rhs_dtype = rhs.dtype();
140
141 use DataType as T;
142 Ok(match lhs_dtype {
143 T::String | T::Binary => {
144 let lhs = lhs.cast(&T::Binary).unwrap();
145 let rhs = rhs.cast(&T::Binary).unwrap();
146 let lhs = lhs.binary().unwrap();
147 let rhs = rhs.binary().unwrap();
148 let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
149 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
151 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
152 if anti {
153 hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
154 } else {
155 hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
156 }
157 },
158 T::BinaryOffset => {
159 let lhs = lhs.binary_offset().unwrap();
160 let rhs = rhs.binary_offset().unwrap();
161 let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
162 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
164 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
165 if anti {
166 hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
167 } else {
168 hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
169 }
170 },
171 T::List(_) => {
172 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
173 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
174 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
175 },
176 #[cfg(feature = "dtype-array")]
177 T::Array(_, _) => {
178 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
179 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
180 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
181 },
182 #[cfg(feature = "dtype-struct")]
183 T::Struct(_) => {
184 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
185 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
186 lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
187 },
188 x if x.is_float() => {
189 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
190 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
191 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
192 num_group_join_anti_semi(lhs, rhs, anti, nulls_equal)
193 })
194 },
195 _ => {
196 let lhs = s_self.bit_repr();
197 let rhs = other.bit_repr();
198
199 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
200 polars_bail!(nyi = "Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}");
201 };
202
203 use BitRepr as B;
204 match (lhs, rhs) {
205 (B::U8(lhs), B::U8(rhs)) => {
206 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
207 },
208 (B::U16(lhs), B::U16(rhs)) => {
209 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
210 },
211 (B::U32(lhs), B::U32(rhs)) => {
212 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
213 },
214 (B::U64(lhs), B::U64(rhs)) => {
215 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
216 },
217 #[cfg(feature = "dtype-u128")]
218 (B::U128(lhs), B::U128(rhs)) => {
219 num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
220 },
221 _ => {
222 polars_bail!(
223 nyi = "Mismatch bit repr Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}",
224 );
225 },
226 }
227 },
228 })
229 }
230
231 fn hash_join_inner(
233 &self,
234 other: &Series,
235 validate: JoinValidation,
236 nulls_equal: bool,
237 ) -> PolarsResult<(InnerJoinIds, bool)> {
238 let s_self = self.as_series();
239 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
240 validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
241
242 let lhs_dtype = lhs.dtype();
243 let rhs_dtype = rhs.dtype();
244
245 use DataType as T;
246 match lhs_dtype {
247 T::String | T::Binary => {
248 let lhs = lhs.cast(&T::Binary).unwrap();
249 let rhs = rhs.cast(&T::Binary).unwrap();
250 let lhs = lhs.binary().unwrap();
251 let rhs = rhs.binary().unwrap();
252 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
253 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
255 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
256 let build_null_count = if swapped {
257 s_self.null_count()
258 } else {
259 other.null_count()
260 };
261 Ok((
262 hash_join_tuples_inner(
263 lhs,
264 rhs,
265 swapped,
266 validate,
267 nulls_equal,
268 build_null_count,
269 )?,
270 !swapped,
271 ))
272 },
273 T::BinaryOffset => {
274 let lhs = lhs.binary_offset().unwrap();
275 let rhs = rhs.binary_offset()?;
276 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
277 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
279 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
280 let build_null_count = if swapped {
281 s_self.null_count()
282 } else {
283 other.null_count()
284 };
285 Ok((
286 hash_join_tuples_inner(
287 lhs,
288 rhs,
289 swapped,
290 validate,
291 nulls_equal,
292 build_null_count,
293 )?,
294 !swapped,
295 ))
296 },
297 T::List(_) => {
298 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
299 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
300 lhs.hash_join_inner(rhs, validate, nulls_equal)
301 },
302 #[cfg(feature = "dtype-array")]
303 T::Array(_, _) => {
304 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
305 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
306 lhs.hash_join_inner(rhs, validate, nulls_equal)
307 },
308 #[cfg(feature = "dtype-struct")]
309 T::Struct(_) => {
310 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
311 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
312 lhs.hash_join_inner(rhs, validate, nulls_equal)
313 },
314 x if x.is_float() => {
315 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
316 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
317 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
318 group_join_inner::<$T>(lhs, rhs, validate, nulls_equal)
319 })
320 },
321 _ => {
322 let lhs = s_self.bit_repr();
323 let rhs = other.bit_repr();
324
325 let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
326 polars_bail!(nyi = "Hash Inner Join between {lhs_dtype} and {rhs_dtype}");
327 };
328
329 use BitRepr as B;
330 match (lhs, rhs) {
331 (B::U8(lhs), B::U8(rhs)) => {
332 group_join_inner::<UInt8Type>(&lhs, &rhs, validate, nulls_equal)
333 },
334 (B::U16(lhs), B::U16(rhs)) => {
335 group_join_inner::<UInt16Type>(&lhs, &rhs, validate, nulls_equal)
336 },
337 (B::U32(lhs), B::U32(rhs)) => {
338 group_join_inner::<UInt32Type>(&lhs, &rhs, validate, nulls_equal)
339 },
340 (B::U64(lhs), BitRepr::U64(rhs)) => {
341 group_join_inner::<UInt64Type>(&lhs, &rhs, validate, nulls_equal)
342 },
343 #[cfg(feature = "dtype-u128")]
344 (B::U128(lhs), BitRepr::U128(rhs)) => {
345 group_join_inner::<UInt128Type>(&lhs, &rhs, validate, nulls_equal)
346 },
347 _ => {
348 polars_bail!(
349 nyi = "Mismatch bit repr Hash Inner Join between {lhs_dtype} and {rhs_dtype}"
350 );
351 },
352 }
353 },
354 }
355 }
356
357 fn hash_join_outer(
358 &self,
359 other: &Series,
360 validate: JoinValidation,
361 nulls_equal: bool,
362 ) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)> {
363 let s_self = self.as_series();
364 let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
365 validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
366
367 let lhs_dtype = lhs.dtype();
368 let rhs_dtype = rhs.dtype();
369
370 use DataType as T;
371 match lhs_dtype {
372 T::String | T::Binary => {
373 let lhs = lhs.cast(&T::Binary).unwrap();
374 let rhs = rhs.cast(&T::Binary).unwrap();
375 let lhs = lhs.binary().unwrap();
376 let rhs = rhs.binary().unwrap();
377 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
378 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
380 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
381 hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
382 },
383 T::BinaryOffset => {
384 let lhs = lhs.binary_offset().unwrap();
385 let rhs = rhs.binary_offset()?;
386 let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
387 let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
389 let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
390 hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
391 },
392 T::List(_) => {
393 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
394 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
395 lhs.hash_join_outer(rhs, validate, nulls_equal)
396 },
397 #[cfg(feature = "dtype-array")]
398 T::Array(_, _) => {
399 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
400 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
401 lhs.hash_join_outer(rhs, validate, nulls_equal)
402 },
403 #[cfg(feature = "dtype-struct")]
404 T::Struct(_) => {
405 let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
406 let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
407 lhs.hash_join_outer(rhs, validate, nulls_equal)
408 },
409 x if x.is_float() => {
410 with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
411 let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
412 let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
413 hash_join_outer(lhs, rhs, validate, nulls_equal)
414 })
415 },
416 _ => {
417 let (Some(lhs), Some(rhs)) = (s_self.bit_repr(), other.bit_repr()) else {
418 polars_bail!(nyi = "Hash Join Outer between {lhs_dtype} and {rhs_dtype}");
419 };
420
421 use BitRepr as B;
422 match (lhs, rhs) {
423 (B::U8(lhs), B::U8(rhs)) => hash_join_outer(&lhs, &rhs, validate, nulls_equal),
424 (B::U16(lhs), B::U16(rhs)) => {
425 hash_join_outer(&lhs, &rhs, validate, nulls_equal)
426 },
427 (B::U32(lhs), B::U32(rhs)) => {
428 hash_join_outer(&lhs, &rhs, validate, nulls_equal)
429 },
430 (B::U64(lhs), B::U64(rhs)) => {
431 hash_join_outer(&lhs, &rhs, validate, nulls_equal)
432 },
433 #[cfg(feature = "dtype-u128")]
434 (B::U128(lhs), B::U128(rhs)) => {
435 hash_join_outer(&lhs, &rhs, validate, nulls_equal)
436 },
437 _ => {
438 polars_bail!(
439 nyi = "Mismatch bit repr Hash Join Outer between {lhs_dtype} and {rhs_dtype}"
440 );
441 },
442 }
443 },
444 }
445 }
446}
447
448impl SeriesJoin for Series {}
449
450fn chunks_as_slices<T>(splitted: &[ChunkedArray<T>]) -> Vec<&[T::Native]>
451where
452 T: PolarsNumericType,
453{
454 splitted
455 .iter()
456 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
457 .collect()
458}
459
460fn get_arrays<T: PolarsDataType>(cas: &[ChunkedArray<T>]) -> Vec<&T::Array> {
461 cas.iter().flat_map(|arr| arr.downcast_iter()).collect()
462}
463
464fn group_join_inner<T>(
465 left: &ChunkedArray<T>,
466 right: &ChunkedArray<T>,
467 validate: JoinValidation,
468 nulls_equal: bool,
469) -> PolarsResult<(InnerJoinIds, bool)>
470where
471 T: PolarsDataType,
472 for<'a> &'a T::Array: IntoIterator<Item = Option<&'a T::Physical<'a>>>,
473 for<'a> T::Physical<'a>:
474 Send + Sync + Copy + TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
475 for<'a> <T::Physical<'a> as ToTotalOrd>::TotalOrdItem:
476 Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
477{
478 let n_threads = POOL.current_num_threads();
479 let (a, b, swapped) = det_hash_prone_order!(left, right);
480 let splitted_a = split(a, n_threads);
481 let splitted_b = split(b, n_threads);
482 let splitted_a = get_arrays(&splitted_a);
483 let splitted_b = get_arrays(&splitted_b);
484
485 match (left.null_count(), right.null_count()) {
486 (0, 0) => {
487 let first = &splitted_a[0];
488 if first.as_slice().is_some() {
489 let splitted_a = splitted_a
490 .iter()
491 .map(|arr| arr.as_slice().unwrap())
492 .collect::<Vec<_>>();
493 let splitted_b = splitted_b
494 .iter()
495 .map(|arr| arr.as_slice().unwrap())
496 .collect::<Vec<_>>();
497 Ok((
498 hash_join_tuples_inner(
499 splitted_a,
500 splitted_b,
501 swapped,
502 validate,
503 nulls_equal,
504 0,
505 )?,
506 !swapped,
507 ))
508 } else {
509 Ok((
510 hash_join_tuples_inner(
511 splitted_a,
512 splitted_b,
513 swapped,
514 validate,
515 nulls_equal,
516 0,
517 )?,
518 !swapped,
519 ))
520 }
521 },
522 _ => {
523 let build_null_count = if swapped {
524 left.null_count()
525 } else {
526 right.null_count()
527 };
528 Ok((
529 hash_join_tuples_inner(
530 splitted_a,
531 splitted_b,
532 swapped,
533 validate,
534 nulls_equal,
535 build_null_count,
536 )?,
537 !swapped,
538 ))
539 },
540 }
541}
542
543#[cfg(feature = "chunked_ids")]
544fn create_mappings(
545 chunks_left: &[ArrayRef],
546 chunks_right: &[ArrayRef],
547 left_len: usize,
548 right_len: usize,
549) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
550 let mapping_left = || {
551 if chunks_left.len() > 1 {
552 Some(create_chunked_index_mapping(chunks_left, left_len))
553 } else {
554 None
555 }
556 };
557
558 let mapping_right = || {
559 if chunks_right.len() > 1 {
560 Some(create_chunked_index_mapping(chunks_right, right_len))
561 } else {
562 None
563 }
564 };
565
566 POOL.join(mapping_left, mapping_right)
567}
568
569#[cfg(not(feature = "chunked_ids"))]
570fn create_mappings(
571 _chunks_left: &[ArrayRef],
572 _chunks_right: &[ArrayRef],
573 _left_len: usize,
574 _right_len: usize,
575) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
576 (None, None)
577}
578
579fn num_group_join_left<T>(
580 left: &ChunkedArray<T>,
581 right: &ChunkedArray<T>,
582 validate: JoinValidation,
583 nulls_equal: bool,
584) -> PolarsResult<LeftJoinIds>
585where
586 T: PolarsNumericType,
587 T::Native: TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
588 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
589 T::Native: DirtyHash + Copy + ToTotalOrd,
590 <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash,
591{
592 let n_threads = POOL.current_num_threads();
593 let splitted_a = split(left, n_threads);
594 let splitted_b = split(right, n_threads);
595 match (
596 left.null_count(),
597 right.null_count(),
598 left.chunks().len(),
599 right.chunks().len(),
600 ) {
601 (0, 0, 1, 1) => {
602 let keys_a = chunks_as_slices(&splitted_a);
603 let keys_b = chunks_as_slices(&splitted_b);
604 hash_join_tuples_left(keys_a, keys_b, None, None, validate, nulls_equal, 0)
605 },
606 (0, 0, _, _) => {
607 let keys_a = chunks_as_slices(&splitted_a);
608 let keys_b = chunks_as_slices(&splitted_b);
609
610 let (mapping_left, mapping_right) =
611 create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
612 hash_join_tuples_left(
613 keys_a,
614 keys_b,
615 mapping_left.as_deref(),
616 mapping_right.as_deref(),
617 validate,
618 nulls_equal,
619 0,
620 )
621 },
622 _ => {
623 let keys_a = get_arrays(&splitted_a);
624 let keys_b = get_arrays(&splitted_b);
625 let (mapping_left, mapping_right) =
626 create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
627 let build_null_count = right.null_count();
628 hash_join_tuples_left(
629 keys_a,
630 keys_b,
631 mapping_left.as_deref(),
632 mapping_right.as_deref(),
633 validate,
634 nulls_equal,
635 build_null_count,
636 )
637 },
638 }
639}
640
641fn hash_join_outer<T>(
642 ca_in: &ChunkedArray<T>,
643 other: &ChunkedArray<T>,
644 validate: JoinValidation,
645 nulls_equal: bool,
646) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)>
647where
648 T: PolarsNumericType,
649 T::Native: TotalHash + TotalEq + ToTotalOrd,
650 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + IsNull,
651{
652 let (a, b, swapped) = det_hash_prone_order!(ca_in, other);
653
654 let n_partitions = _set_partition_size();
655 let splitted_a = split(a, n_partitions);
656 let splitted_b = split(b, n_partitions);
657
658 match (a.null_count(), b.null_count()) {
659 (0, 0) => {
660 let iters_a = splitted_a
661 .iter()
662 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
663 .collect::<Vec<_>>();
664 let iters_b = splitted_b
665 .iter()
666 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
667 .collect::<Vec<_>>();
668 hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
669 },
670 _ => {
671 let iters_a = splitted_a
672 .iter()
673 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
674 .collect::<Vec<_>>();
675 let iters_b = splitted_b
676 .iter()
677 .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
678 .collect::<Vec<_>>();
679 hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
680 },
681 }
682}
683
684pub(crate) fn prepare_binary<'a, T>(
685 ca: &'a ChunkedArray<T>,
686 other: &'a ChunkedArray<T>,
687 build_shortest_table: bool,
690) -> (
691 Vec<Vec<BytesHash<'a>>>,
692 Vec<Vec<BytesHash<'a>>>,
693 bool,
694 PlRandomState,
695)
696where
697 T: PolarsDataType,
698 for<'b> <T::Array as StaticArray>::ValueT<'b>: AsRef<[u8]>,
699{
700 let (a, b, swapped) = if build_shortest_table {
701 det_hash_prone_order!(ca, other)
702 } else {
703 (ca, other, false)
704 };
705 let hb = PlRandomState::default();
706 let bh_a = a.to_bytes_hashes(true, hb.clone());
707 let bh_b = b.to_bytes_hashes(true, hb.clone());
708
709 (bh_a, bh_b, swapped, hb)
710}
711
712#[cfg(feature = "semi_anti_join")]
713fn num_group_join_anti_semi<T>(
714 left: &ChunkedArray<T>,
715 right: &ChunkedArray<T>,
716 anti: bool,
717 nulls_equal: bool,
718) -> Vec<IdxSize>
719where
720 T: PolarsNumericType,
721 T::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
722 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
723 <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash + IsNull,
724{
725 let n_threads = POOL.current_num_threads();
726 let splitted_a = split(left, n_threads);
727 let splitted_b = split(right, n_threads);
728 match (
729 left.null_count(),
730 right.null_count(),
731 left.chunks().len(),
732 right.chunks().len(),
733 ) {
734 (0, 0, 1, 1) => {
735 let keys_a = chunks_as_slices(&splitted_a);
736 let keys_b = chunks_as_slices(&splitted_b);
737 if anti {
738 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
739 } else {
740 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
741 }
742 },
743 (0, 0, _, _) => {
744 let keys_a = chunks_as_slices(&splitted_a);
745 let keys_b = chunks_as_slices(&splitted_b);
746 if anti {
747 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
748 } else {
749 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
750 }
751 },
752 _ => {
753 let keys_a = get_arrays(&splitted_a);
754 let keys_b = get_arrays(&splitted_b);
755 if anti {
756 hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
757 } else {
758 hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
759 }
760 },
761 }
762}