polars_ops/frame/join/hash_join/
single_keys_dispatch.rs

1use arrow::array::PrimitiveArray;
2use polars_core::chunked_array::ops::row_encode::encode_rows_unordered;
3use polars_core::series::BitRepr;
4use polars_core::utils::split;
5use polars_core::with_match_physical_float_polars_type;
6use polars_utils::aliases::PlRandomState;
7use polars_utils::hashing::DirtyHash;
8use polars_utils::nulls::IsNull;
9use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};
10
11use super::*;
12use crate::series::SeriesSealed;
13
14pub trait SeriesJoin: SeriesSealed + Sized {
15    #[doc(hidden)]
16    fn hash_join_left(
17        &self,
18        other: &Series,
19        validate: JoinValidation,
20        nulls_equal: bool,
21    ) -> PolarsResult<LeftJoinIds> {
22        let s_self = self.as_series();
23        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
24        validate.validate_probe(&lhs, &rhs, false, nulls_equal)?;
25
26        let lhs_dtype = lhs.dtype();
27        let rhs_dtype = rhs.dtype();
28
29        use DataType as T;
30        match lhs_dtype {
31            T::String | T::Binary => {
32                let lhs = lhs.cast(&T::Binary).unwrap();
33                let rhs = rhs.cast(&T::Binary).unwrap();
34                let lhs = lhs.binary().unwrap();
35                let rhs = rhs.binary().unwrap();
36                let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
37                let lhs = lhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
38                let rhs = rhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
39                let build_null_count = other.null_count();
40                hash_join_tuples_left(
41                    lhs,
42                    rhs,
43                    None,
44                    None,
45                    validate,
46                    nulls_equal,
47                    build_null_count,
48                )
49            },
50            T::BinaryOffset => {
51                let lhs = lhs.binary_offset().unwrap();
52                let rhs = rhs.binary_offset().unwrap();
53                let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
54                // Take slices so that vecs are not copied
55                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
56                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
57                let build_null_count = other.null_count();
58                hash_join_tuples_left(
59                    lhs,
60                    rhs,
61                    None,
62                    None,
63                    validate,
64                    nulls_equal,
65                    build_null_count,
66                )
67            },
68            T::List(_) => {
69                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
70                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
71                lhs.hash_join_left(rhs, validate, nulls_equal)
72            },
73            #[cfg(feature = "dtype-array")]
74            T::Array(_, _) => {
75                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
76                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
77                lhs.hash_join_left(rhs, validate, nulls_equal)
78            },
79            #[cfg(feature = "dtype-struct")]
80            T::Struct(_) => {
81                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
82                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
83                lhs.hash_join_left(rhs, validate, nulls_equal)
84            },
85            x if x.is_float() => {
86                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
87                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
88                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
89                    num_group_join_left(lhs, rhs, validate, nulls_equal)
90                })
91            },
92            _ => {
93                let lhs = s_self.bit_repr();
94                let rhs = other.bit_repr();
95
96                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
97                    polars_bail!(nyi = "Hash Left Join between {lhs_dtype} and {rhs_dtype}");
98                };
99
100                use BitRepr as B;
101                match (lhs, rhs) {
102                    (B::Small(lhs), B::Small(rhs)) => {
103                        // Turbofish: see #17137.
104                        num_group_join_left::<UInt32Type>(&lhs, &rhs, validate, nulls_equal)
105                    },
106                    (B::Large(lhs), B::Large(rhs)) => {
107                        // Turbofish: see #17137.
108                        num_group_join_left::<UInt64Type>(&lhs, &rhs, validate, nulls_equal)
109                    },
110                    _ => {
111                        polars_bail!(
112                            nyi = "Mismatch bit repr Hash Left Join between {lhs_dtype} and {rhs_dtype}",
113                        );
114                    },
115                }
116            },
117        }
118    }
119
120    #[cfg(feature = "semi_anti_join")]
121    fn hash_join_semi_anti(
122        &self,
123        other: &Series,
124        anti: bool,
125        nulls_equal: bool,
126    ) -> PolarsResult<Vec<IdxSize>> {
127        let s_self = self.as_series();
128        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
129
130        let lhs_dtype = lhs.dtype();
131        let rhs_dtype = rhs.dtype();
132
133        use DataType as T;
134        Ok(match lhs_dtype {
135            T::String | T::Binary => {
136                let lhs = lhs.cast(&T::Binary).unwrap();
137                let rhs = rhs.cast(&T::Binary).unwrap();
138                let lhs = lhs.binary().unwrap();
139                let rhs = rhs.binary().unwrap();
140                let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
141                // Take slices so that vecs are not copied
142                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
143                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
144                if anti {
145                    hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
146                } else {
147                    hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
148                }
149            },
150            T::BinaryOffset => {
151                let lhs = lhs.binary_offset().unwrap();
152                let rhs = rhs.binary_offset().unwrap();
153                let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
154                // Take slices so that vecs are not copied
155                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
156                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
157                if anti {
158                    hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
159                } else {
160                    hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
161                }
162            },
163            T::List(_) => {
164                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
165                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
166                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
167            },
168            #[cfg(feature = "dtype-array")]
169            T::Array(_, _) => {
170                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
171                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
172                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
173            },
174            #[cfg(feature = "dtype-struct")]
175            T::Struct(_) => {
176                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
177                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
178                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
179            },
180            x if x.is_float() => {
181                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
182                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
183                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
184                    num_group_join_anti_semi(lhs, rhs, anti, nulls_equal)
185                })
186            },
187            _ => {
188                let lhs = s_self.bit_repr();
189                let rhs = other.bit_repr();
190
191                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
192                    polars_bail!(nyi = "Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}");
193                };
194
195                use BitRepr as B;
196                match (lhs, rhs) {
197                    (B::Small(lhs), B::Small(rhs)) => {
198                        // Turbofish: see #17137.
199                        num_group_join_anti_semi::<UInt32Type>(&lhs, &rhs, anti, nulls_equal)
200                    },
201                    (B::Large(lhs), B::Large(rhs)) => {
202                        // Turbofish: see #17137.
203                        num_group_join_anti_semi::<UInt64Type>(&lhs, &rhs, anti, nulls_equal)
204                    },
205                    _ => {
206                        polars_bail!(
207                            nyi = "Mismatch bit repr Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}",
208                        );
209                    },
210                }
211            },
212        })
213    }
214
215    // returns the join tuples and whether or not the lhs tuples are sorted
216    fn hash_join_inner(
217        &self,
218        other: &Series,
219        validate: JoinValidation,
220        nulls_equal: bool,
221    ) -> PolarsResult<(InnerJoinIds, bool)> {
222        let s_self = self.as_series();
223        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
224        validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
225
226        let lhs_dtype = lhs.dtype();
227        let rhs_dtype = rhs.dtype();
228
229        use DataType as T;
230        match lhs_dtype {
231            T::String | T::Binary => {
232                let lhs = lhs.cast(&T::Binary).unwrap();
233                let rhs = rhs.cast(&T::Binary).unwrap();
234                let lhs = lhs.binary().unwrap();
235                let rhs = rhs.binary().unwrap();
236                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
237                // Take slices so that vecs are not copied
238                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
239                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
240                let build_null_count = if swapped {
241                    s_self.null_count()
242                } else {
243                    other.null_count()
244                };
245                Ok((
246                    hash_join_tuples_inner(
247                        lhs,
248                        rhs,
249                        swapped,
250                        validate,
251                        nulls_equal,
252                        build_null_count,
253                    )?,
254                    !swapped,
255                ))
256            },
257            T::BinaryOffset => {
258                let lhs = lhs.binary_offset().unwrap();
259                let rhs = rhs.binary_offset()?;
260                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
261                // Take slices so that vecs are not copied
262                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
263                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
264                let build_null_count = if swapped {
265                    s_self.null_count()
266                } else {
267                    other.null_count()
268                };
269                Ok((
270                    hash_join_tuples_inner(
271                        lhs,
272                        rhs,
273                        swapped,
274                        validate,
275                        nulls_equal,
276                        build_null_count,
277                    )?,
278                    !swapped,
279                ))
280            },
281            T::List(_) => {
282                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
283                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
284                lhs.hash_join_inner(rhs, validate, nulls_equal)
285            },
286            #[cfg(feature = "dtype-array")]
287            T::Array(_, _) => {
288                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
289                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
290                lhs.hash_join_inner(rhs, validate, nulls_equal)
291            },
292            #[cfg(feature = "dtype-struct")]
293            T::Struct(_) => {
294                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
295                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
296                lhs.hash_join_inner(rhs, validate, nulls_equal)
297            },
298            x if x.is_float() => {
299                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
300                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
301                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
302                    group_join_inner::<$T>(lhs, rhs, validate, nulls_equal)
303                })
304            },
305            _ => {
306                let lhs = s_self.bit_repr();
307                let rhs = other.bit_repr();
308
309                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
310                    polars_bail!(nyi = "Hash Inner Join between {lhs_dtype} and {rhs_dtype}");
311                };
312
313                use BitRepr as B;
314                match (lhs, rhs) {
315                    (B::Small(lhs), B::Small(rhs)) => {
316                        // Turbofish: see #17137.
317                        group_join_inner::<UInt32Type>(&lhs, &rhs, validate, nulls_equal)
318                    },
319                    (B::Large(lhs), BitRepr::Large(rhs)) => {
320                        // Turbofish: see #17137.
321                        group_join_inner::<UInt64Type>(&lhs, &rhs, validate, nulls_equal)
322                    },
323                    _ => {
324                        polars_bail!(
325                            nyi = "Mismatch bit repr Hash Inner Join between {lhs_dtype} and {rhs_dtype}"
326                        );
327                    },
328                }
329            },
330        }
331    }
332
333    fn hash_join_outer(
334        &self,
335        other: &Series,
336        validate: JoinValidation,
337        nulls_equal: bool,
338    ) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)> {
339        let s_self = self.as_series();
340        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
341        validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
342
343        let lhs_dtype = lhs.dtype();
344        let rhs_dtype = rhs.dtype();
345
346        use DataType as T;
347        match lhs_dtype {
348            T::String | T::Binary => {
349                let lhs = lhs.cast(&T::Binary).unwrap();
350                let rhs = rhs.cast(&T::Binary).unwrap();
351                let lhs = lhs.binary().unwrap();
352                let rhs = rhs.binary().unwrap();
353                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
354                // Take slices so that vecs are not copied
355                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
356                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
357                hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
358            },
359            T::BinaryOffset => {
360                let lhs = lhs.binary_offset().unwrap();
361                let rhs = rhs.binary_offset()?;
362                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
363                // Take slices so that vecs are not copied
364                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
365                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
366                hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
367            },
368            T::List(_) => {
369                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
370                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
371                lhs.hash_join_outer(rhs, validate, nulls_equal)
372            },
373            #[cfg(feature = "dtype-array")]
374            T::Array(_, _) => {
375                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
376                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
377                lhs.hash_join_outer(rhs, validate, nulls_equal)
378            },
379            #[cfg(feature = "dtype-struct")]
380            T::Struct(_) => {
381                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
382                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
383                lhs.hash_join_outer(rhs, validate, nulls_equal)
384            },
385            x if x.is_float() => {
386                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
387                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
388                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
389                    hash_join_outer(lhs, rhs, validate, nulls_equal)
390                })
391            },
392            _ => {
393                let (Some(lhs), Some(rhs)) = (s_self.bit_repr(), other.bit_repr()) else {
394                    polars_bail!(nyi = "Hash Join Outer between {lhs_dtype} and {rhs_dtype}");
395                };
396
397                use BitRepr as B;
398                match (lhs, rhs) {
399                    (B::Small(lhs), B::Small(rhs)) => {
400                        // Turbofish: see #17137.
401                        hash_join_outer::<UInt32Type>(&lhs, &rhs, validate, nulls_equal)
402                    },
403                    (B::Large(lhs), B::Large(rhs)) => {
404                        // Turbofish: see #17137.
405                        hash_join_outer::<UInt64Type>(&lhs, &rhs, validate, nulls_equal)
406                    },
407                    _ => {
408                        polars_bail!(
409                            nyi = "Mismatch bit repr Hash Join Outer between {lhs_dtype} and {rhs_dtype}"
410                        );
411                    },
412                }
413            },
414        }
415    }
416}
417
418impl SeriesJoin for Series {}
419
420fn chunks_as_slices<T>(splitted: &[ChunkedArray<T>]) -> Vec<&[T::Native]>
421where
422    T: PolarsNumericType,
423{
424    splitted
425        .iter()
426        .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
427        .collect()
428}
429
430fn get_arrays<T: PolarsDataType>(cas: &[ChunkedArray<T>]) -> Vec<&T::Array> {
431    cas.iter().flat_map(|arr| arr.downcast_iter()).collect()
432}
433
434fn group_join_inner<T>(
435    left: &ChunkedArray<T>,
436    right: &ChunkedArray<T>,
437    validate: JoinValidation,
438    nulls_equal: bool,
439) -> PolarsResult<(InnerJoinIds, bool)>
440where
441    T: PolarsDataType,
442    for<'a> &'a T::Array: IntoIterator<Item = Option<&'a T::Physical<'a>>>,
443    for<'a> T::Physical<'a>:
444        Send + Sync + Copy + TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
445    for<'a> <T::Physical<'a> as ToTotalOrd>::TotalOrdItem:
446        Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
447{
448    let n_threads = POOL.current_num_threads();
449    let (a, b, swapped) = det_hash_prone_order!(left, right);
450    let splitted_a = split(a, n_threads);
451    let splitted_b = split(b, n_threads);
452    let splitted_a = get_arrays(&splitted_a);
453    let splitted_b = get_arrays(&splitted_b);
454
455    match (left.null_count(), right.null_count()) {
456        (0, 0) => {
457            let first = &splitted_a[0];
458            if first.as_slice().is_some() {
459                let splitted_a = splitted_a
460                    .iter()
461                    .map(|arr| arr.as_slice().unwrap())
462                    .collect::<Vec<_>>();
463                let splitted_b = splitted_b
464                    .iter()
465                    .map(|arr| arr.as_slice().unwrap())
466                    .collect::<Vec<_>>();
467                Ok((
468                    hash_join_tuples_inner(
469                        splitted_a,
470                        splitted_b,
471                        swapped,
472                        validate,
473                        nulls_equal,
474                        0,
475                    )?,
476                    !swapped,
477                ))
478            } else {
479                Ok((
480                    hash_join_tuples_inner(
481                        splitted_a,
482                        splitted_b,
483                        swapped,
484                        validate,
485                        nulls_equal,
486                        0,
487                    )?,
488                    !swapped,
489                ))
490            }
491        },
492        _ => {
493            let build_null_count = if swapped {
494                left.null_count()
495            } else {
496                right.null_count()
497            };
498            Ok((
499                hash_join_tuples_inner(
500                    splitted_a,
501                    splitted_b,
502                    swapped,
503                    validate,
504                    nulls_equal,
505                    build_null_count,
506                )?,
507                !swapped,
508            ))
509        },
510    }
511}
512
513#[cfg(feature = "chunked_ids")]
514fn create_mappings(
515    chunks_left: &[ArrayRef],
516    chunks_right: &[ArrayRef],
517    left_len: usize,
518    right_len: usize,
519) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
520    let mapping_left = || {
521        if chunks_left.len() > 1 {
522            Some(create_chunked_index_mapping(chunks_left, left_len))
523        } else {
524            None
525        }
526    };
527
528    let mapping_right = || {
529        if chunks_right.len() > 1 {
530            Some(create_chunked_index_mapping(chunks_right, right_len))
531        } else {
532            None
533        }
534    };
535
536    POOL.join(mapping_left, mapping_right)
537}
538
539#[cfg(not(feature = "chunked_ids"))]
540fn create_mappings(
541    _chunks_left: &[ArrayRef],
542    _chunks_right: &[ArrayRef],
543    _left_len: usize,
544    _right_len: usize,
545) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
546    (None, None)
547}
548
549fn num_group_join_left<T>(
550    left: &ChunkedArray<T>,
551    right: &ChunkedArray<T>,
552    validate: JoinValidation,
553    nulls_equal: bool,
554) -> PolarsResult<LeftJoinIds>
555where
556    T: PolarsNumericType,
557    T::Native: TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
558    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
559    T::Native: DirtyHash + Copy + ToTotalOrd,
560    <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash,
561{
562    let n_threads = POOL.current_num_threads();
563    let splitted_a = split(left, n_threads);
564    let splitted_b = split(right, n_threads);
565    match (
566        left.null_count(),
567        right.null_count(),
568        left.chunks().len(),
569        right.chunks().len(),
570    ) {
571        (0, 0, 1, 1) => {
572            let keys_a = chunks_as_slices(&splitted_a);
573            let keys_b = chunks_as_slices(&splitted_b);
574            hash_join_tuples_left(keys_a, keys_b, None, None, validate, nulls_equal, 0)
575        },
576        (0, 0, _, _) => {
577            let keys_a = chunks_as_slices(&splitted_a);
578            let keys_b = chunks_as_slices(&splitted_b);
579
580            let (mapping_left, mapping_right) =
581                create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
582            hash_join_tuples_left(
583                keys_a,
584                keys_b,
585                mapping_left.as_deref(),
586                mapping_right.as_deref(),
587                validate,
588                nulls_equal,
589                0,
590            )
591        },
592        _ => {
593            let keys_a = get_arrays(&splitted_a);
594            let keys_b = get_arrays(&splitted_b);
595            let (mapping_left, mapping_right) =
596                create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
597            let build_null_count = right.null_count();
598            hash_join_tuples_left(
599                keys_a,
600                keys_b,
601                mapping_left.as_deref(),
602                mapping_right.as_deref(),
603                validate,
604                nulls_equal,
605                build_null_count,
606            )
607        },
608    }
609}
610
611fn hash_join_outer<T>(
612    ca_in: &ChunkedArray<T>,
613    other: &ChunkedArray<T>,
614    validate: JoinValidation,
615    nulls_equal: bool,
616) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)>
617where
618    T: PolarsNumericType,
619    T::Native: TotalHash + TotalEq + ToTotalOrd,
620    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + IsNull,
621{
622    let (a, b, swapped) = det_hash_prone_order!(ca_in, other);
623
624    let n_partitions = _set_partition_size();
625    let splitted_a = split(a, n_partitions);
626    let splitted_b = split(b, n_partitions);
627
628    match (a.null_count(), b.null_count()) {
629        (0, 0) => {
630            let iters_a = splitted_a
631                .iter()
632                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
633                .collect::<Vec<_>>();
634            let iters_b = splitted_b
635                .iter()
636                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
637                .collect::<Vec<_>>();
638            hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
639        },
640        _ => {
641            let iters_a = splitted_a
642                .iter()
643                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
644                .collect::<Vec<_>>();
645            let iters_b = splitted_b
646                .iter()
647                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
648                .collect::<Vec<_>>();
649            hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
650        },
651    }
652}
653
654pub(crate) fn prepare_binary<'a, T>(
655    ca: &'a ChunkedArray<T>,
656    other: &'a ChunkedArray<T>,
657    // In inner join and outer join, the shortest relation will be used to create a hash table.
658    // In left join, always use the right side to create.
659    build_shortest_table: bool,
660) -> (
661    Vec<Vec<BytesHash<'a>>>,
662    Vec<Vec<BytesHash<'a>>>,
663    bool,
664    PlRandomState,
665)
666where
667    T: PolarsDataType,
668    for<'b> <T::Array as StaticArray>::ValueT<'b>: AsRef<[u8]>,
669{
670    let (a, b, swapped) = if build_shortest_table {
671        det_hash_prone_order!(ca, other)
672    } else {
673        (ca, other, false)
674    };
675    let hb = PlRandomState::default();
676    let bh_a = a.to_bytes_hashes(true, hb);
677    let bh_b = b.to_bytes_hashes(true, hb);
678
679    (bh_a, bh_b, swapped, hb)
680}
681
682#[cfg(feature = "semi_anti_join")]
683fn num_group_join_anti_semi<T>(
684    left: &ChunkedArray<T>,
685    right: &ChunkedArray<T>,
686    anti: bool,
687    nulls_equal: bool,
688) -> Vec<IdxSize>
689where
690    T: PolarsNumericType,
691    T::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
692    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
693    <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash + IsNull,
694{
695    let n_threads = POOL.current_num_threads();
696    let splitted_a = split(left, n_threads);
697    let splitted_b = split(right, n_threads);
698    match (
699        left.null_count(),
700        right.null_count(),
701        left.chunks().len(),
702        right.chunks().len(),
703    ) {
704        (0, 0, 1, 1) => {
705            let keys_a = chunks_as_slices(&splitted_a);
706            let keys_b = chunks_as_slices(&splitted_b);
707            if anti {
708                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
709            } else {
710                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
711            }
712        },
713        (0, 0, _, _) => {
714            let keys_a = chunks_as_slices(&splitted_a);
715            let keys_b = chunks_as_slices(&splitted_b);
716            if anti {
717                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
718            } else {
719                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
720            }
721        },
722        _ => {
723            let keys_a = get_arrays(&splitted_a);
724            let keys_b = get_arrays(&splitted_b);
725            if anti {
726                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
727            } else {
728                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
729            }
730        },
731    }
732}