polars_ops/frame/join/hash_join/
single_keys_dispatch.rs

1use arrow::array::PrimitiveArray;
2use polars_core::chunked_array::ops::row_encode::encode_rows_unordered;
3use polars_core::series::BitRepr;
4use polars_core::utils::split;
5use polars_core::with_match_physical_float_polars_type;
6use polars_utils::aliases::PlRandomState;
7use polars_utils::hashing::DirtyHash;
8use polars_utils::nulls::IsNull;
9use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};
10
11use super::*;
12use crate::series::SeriesSealed;
13
14pub trait SeriesJoin: SeriesSealed + Sized {
15    #[doc(hidden)]
16    fn hash_join_left(
17        &self,
18        other: &Series,
19        validate: JoinValidation,
20        nulls_equal: bool,
21    ) -> PolarsResult<LeftJoinIds> {
22        let s_self = self.as_series();
23        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
24        validate.validate_probe(&lhs, &rhs, false, nulls_equal)?;
25
26        let lhs_dtype = lhs.dtype();
27        let rhs_dtype = rhs.dtype();
28
29        use DataType as T;
30        match lhs_dtype {
31            T::String | T::Binary => {
32                let lhs = lhs.cast(&T::Binary).unwrap();
33                let rhs = rhs.cast(&T::Binary).unwrap();
34                let lhs = lhs.binary().unwrap();
35                let rhs = rhs.binary().unwrap();
36                let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
37                let lhs = lhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
38                let rhs = rhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
39                let build_null_count = other.null_count();
40                hash_join_tuples_left(
41                    lhs,
42                    rhs,
43                    None,
44                    None,
45                    validate,
46                    nulls_equal,
47                    build_null_count,
48                )
49            },
50            T::BinaryOffset => {
51                let lhs = lhs.binary_offset().unwrap();
52                let rhs = rhs.binary_offset().unwrap();
53                let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
54                // Take slices so that vecs are not copied
55                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
56                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
57                let build_null_count = other.null_count();
58                hash_join_tuples_left(
59                    lhs,
60                    rhs,
61                    None,
62                    None,
63                    validate,
64                    nulls_equal,
65                    build_null_count,
66                )
67            },
68            T::List(_) => {
69                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
70                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
71                lhs.hash_join_left(rhs, validate, nulls_equal)
72            },
73            #[cfg(feature = "dtype-array")]
74            T::Array(_, _) => {
75                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
76                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
77                lhs.hash_join_left(rhs, validate, nulls_equal)
78            },
79            #[cfg(feature = "dtype-struct")]
80            T::Struct(_) => {
81                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
82                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
83                lhs.hash_join_left(rhs, validate, nulls_equal)
84            },
85            x if x.is_float() => {
86                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
87                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
88                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
89                    num_group_join_left(lhs, rhs, validate, nulls_equal)
90                })
91            },
92            _ => {
93                let lhs = s_self.bit_repr();
94                let rhs = other.bit_repr();
95
96                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
97                    polars_bail!(nyi = "Hash Left Join between {lhs_dtype} and {rhs_dtype}");
98                };
99
100                use BitRepr as B;
101                match (lhs, rhs) {
102                    (B::U8(lhs), B::U8(rhs)) => {
103                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
104                    },
105                    (B::U16(lhs), B::U16(rhs)) => {
106                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
107                    },
108                    (B::U32(lhs), B::U32(rhs)) => {
109                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
110                    },
111                    (B::U64(lhs), B::U64(rhs)) => {
112                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
113                    },
114                    #[cfg(feature = "dtype-u128")]
115                    (B::U128(lhs), B::U128(rhs)) => {
116                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
117                    },
118                    _ => {
119                        polars_bail!(
120                            nyi = "Mismatch bit repr Hash Left Join between {lhs_dtype} and {rhs_dtype}",
121                        );
122                    },
123                }
124            },
125        }
126    }
127
128    #[cfg(feature = "semi_anti_join")]
129    fn hash_join_semi_anti(
130        &self,
131        other: &Series,
132        anti: bool,
133        nulls_equal: bool,
134    ) -> PolarsResult<Vec<IdxSize>> {
135        let s_self = self.as_series();
136        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
137
138        let lhs_dtype = lhs.dtype();
139        let rhs_dtype = rhs.dtype();
140
141        use DataType as T;
142        Ok(match lhs_dtype {
143            T::String | T::Binary => {
144                let lhs = lhs.cast(&T::Binary).unwrap();
145                let rhs = rhs.cast(&T::Binary).unwrap();
146                let lhs = lhs.binary().unwrap();
147                let rhs = rhs.binary().unwrap();
148                let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
149                // Take slices so that vecs are not copied
150                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
151                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
152                if anti {
153                    hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
154                } else {
155                    hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
156                }
157            },
158            T::BinaryOffset => {
159                let lhs = lhs.binary_offset().unwrap();
160                let rhs = rhs.binary_offset().unwrap();
161                let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
162                // Take slices so that vecs are not copied
163                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
164                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
165                if anti {
166                    hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
167                } else {
168                    hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
169                }
170            },
171            T::List(_) => {
172                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
173                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
174                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
175            },
176            #[cfg(feature = "dtype-array")]
177            T::Array(_, _) => {
178                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
179                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
180                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
181            },
182            #[cfg(feature = "dtype-struct")]
183            T::Struct(_) => {
184                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
185                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
186                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
187            },
188            x if x.is_float() => {
189                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
190                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
191                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
192                    num_group_join_anti_semi(lhs, rhs, anti, nulls_equal)
193                })
194            },
195            _ => {
196                let lhs = s_self.bit_repr();
197                let rhs = other.bit_repr();
198
199                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
200                    polars_bail!(nyi = "Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}");
201                };
202
203                use BitRepr as B;
204                match (lhs, rhs) {
205                    (B::U8(lhs), B::U8(rhs)) => {
206                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
207                    },
208                    (B::U16(lhs), B::U16(rhs)) => {
209                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
210                    },
211                    (B::U32(lhs), B::U32(rhs)) => {
212                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
213                    },
214                    (B::U64(lhs), B::U64(rhs)) => {
215                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
216                    },
217                    #[cfg(feature = "dtype-u128")]
218                    (B::U128(lhs), B::U128(rhs)) => {
219                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
220                    },
221                    _ => {
222                        polars_bail!(
223                            nyi = "Mismatch bit repr Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}",
224                        );
225                    },
226                }
227            },
228        })
229    }
230
231    // returns the join tuples and whether or not the lhs tuples are sorted
232    fn hash_join_inner(
233        &self,
234        other: &Series,
235        validate: JoinValidation,
236        nulls_equal: bool,
237    ) -> PolarsResult<(InnerJoinIds, bool)> {
238        let s_self = self.as_series();
239        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
240        validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
241
242        let lhs_dtype = lhs.dtype();
243        let rhs_dtype = rhs.dtype();
244
245        use DataType as T;
246        match lhs_dtype {
247            T::String | T::Binary => {
248                let lhs = lhs.cast(&T::Binary).unwrap();
249                let rhs = rhs.cast(&T::Binary).unwrap();
250                let lhs = lhs.binary().unwrap();
251                let rhs = rhs.binary().unwrap();
252                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
253                // Take slices so that vecs are not copied
254                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
255                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
256                let build_null_count = if swapped {
257                    s_self.null_count()
258                } else {
259                    other.null_count()
260                };
261                Ok((
262                    hash_join_tuples_inner(
263                        lhs,
264                        rhs,
265                        swapped,
266                        validate,
267                        nulls_equal,
268                        build_null_count,
269                    )?,
270                    !swapped,
271                ))
272            },
273            T::BinaryOffset => {
274                let lhs = lhs.binary_offset().unwrap();
275                let rhs = rhs.binary_offset()?;
276                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
277                // Take slices so that vecs are not copied
278                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
279                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
280                let build_null_count = if swapped {
281                    s_self.null_count()
282                } else {
283                    other.null_count()
284                };
285                Ok((
286                    hash_join_tuples_inner(
287                        lhs,
288                        rhs,
289                        swapped,
290                        validate,
291                        nulls_equal,
292                        build_null_count,
293                    )?,
294                    !swapped,
295                ))
296            },
297            T::List(_) => {
298                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
299                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
300                lhs.hash_join_inner(rhs, validate, nulls_equal)
301            },
302            #[cfg(feature = "dtype-array")]
303            T::Array(_, _) => {
304                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
305                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
306                lhs.hash_join_inner(rhs, validate, nulls_equal)
307            },
308            #[cfg(feature = "dtype-struct")]
309            T::Struct(_) => {
310                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
311                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
312                lhs.hash_join_inner(rhs, validate, nulls_equal)
313            },
314            x if x.is_float() => {
315                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
316                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
317                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
318                    group_join_inner::<$T>(lhs, rhs, validate, nulls_equal)
319                })
320            },
321            _ => {
322                let lhs = s_self.bit_repr();
323                let rhs = other.bit_repr();
324
325                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
326                    polars_bail!(nyi = "Hash Inner Join between {lhs_dtype} and {rhs_dtype}");
327                };
328
329                use BitRepr as B;
330                match (lhs, rhs) {
331                    (B::U8(lhs), B::U8(rhs)) => group_join_inner(&lhs, &rhs, validate, nulls_equal),
332                    (B::U16(lhs), B::U16(rhs)) => {
333                        group_join_inner(&lhs, &rhs, validate, nulls_equal)
334                    },
335                    (B::U32(lhs), B::U32(rhs)) => {
336                        group_join_inner(&lhs, &rhs, validate, nulls_equal)
337                    },
338                    (B::U64(lhs), BitRepr::U64(rhs)) => {
339                        group_join_inner(&lhs, &rhs, validate, nulls_equal)
340                    },
341                    #[cfg(feature = "dtype-u128")]
342                    (B::U128(lhs), BitRepr::U128(rhs)) => {
343                        group_join_inner(&lhs, &rhs, validate, nulls_equal)
344                    },
345                    _ => {
346                        polars_bail!(
347                            nyi = "Mismatch bit repr Hash Inner Join between {lhs_dtype} and {rhs_dtype}"
348                        );
349                    },
350                }
351            },
352        }
353    }
354
355    fn hash_join_outer(
356        &self,
357        other: &Series,
358        validate: JoinValidation,
359        nulls_equal: bool,
360    ) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)> {
361        let s_self = self.as_series();
362        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
363        validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
364
365        let lhs_dtype = lhs.dtype();
366        let rhs_dtype = rhs.dtype();
367
368        use DataType as T;
369        match lhs_dtype {
370            T::String | T::Binary => {
371                let lhs = lhs.cast(&T::Binary).unwrap();
372                let rhs = rhs.cast(&T::Binary).unwrap();
373                let lhs = lhs.binary().unwrap();
374                let rhs = rhs.binary().unwrap();
375                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
376                // Take slices so that vecs are not copied
377                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
378                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
379                hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
380            },
381            T::BinaryOffset => {
382                let lhs = lhs.binary_offset().unwrap();
383                let rhs = rhs.binary_offset()?;
384                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
385                // Take slices so that vecs are not copied
386                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
387                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
388                hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
389            },
390            T::List(_) => {
391                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
392                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
393                lhs.hash_join_outer(rhs, validate, nulls_equal)
394            },
395            #[cfg(feature = "dtype-array")]
396            T::Array(_, _) => {
397                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
398                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
399                lhs.hash_join_outer(rhs, validate, nulls_equal)
400            },
401            #[cfg(feature = "dtype-struct")]
402            T::Struct(_) => {
403                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
404                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
405                lhs.hash_join_outer(rhs, validate, nulls_equal)
406            },
407            x if x.is_float() => {
408                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
409                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
410                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
411                    hash_join_outer(lhs, rhs, validate, nulls_equal)
412                })
413            },
414            _ => {
415                let (Some(lhs), Some(rhs)) = (s_self.bit_repr(), other.bit_repr()) else {
416                    polars_bail!(nyi = "Hash Join Outer between {lhs_dtype} and {rhs_dtype}");
417                };
418
419                use BitRepr as B;
420                match (lhs, rhs) {
421                    (B::U8(lhs), B::U8(rhs)) => hash_join_outer(&lhs, &rhs, validate, nulls_equal),
422                    (B::U16(lhs), B::U16(rhs)) => {
423                        hash_join_outer(&lhs, &rhs, validate, nulls_equal)
424                    },
425                    (B::U32(lhs), B::U32(rhs)) => {
426                        hash_join_outer(&lhs, &rhs, validate, nulls_equal)
427                    },
428                    (B::U64(lhs), B::U64(rhs)) => {
429                        hash_join_outer(&lhs, &rhs, validate, nulls_equal)
430                    },
431                    #[cfg(feature = "dtype-u128")]
432                    (B::U128(lhs), B::U128(rhs)) => {
433                        hash_join_outer(&lhs, &rhs, validate, nulls_equal)
434                    },
435                    _ => {
436                        polars_bail!(
437                            nyi = "Mismatch bit repr Hash Join Outer between {lhs_dtype} and {rhs_dtype}"
438                        );
439                    },
440                }
441            },
442        }
443    }
444}
445
446impl SeriesJoin for Series {}
447
448fn chunks_as_slices<T>(splitted: &[ChunkedArray<T>]) -> Vec<&[T::Native]>
449where
450    T: PolarsNumericType,
451{
452    splitted
453        .iter()
454        .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
455        .collect()
456}
457
458fn get_arrays<T: PolarsDataType>(cas: &[ChunkedArray<T>]) -> Vec<&T::Array> {
459    cas.iter().flat_map(|arr| arr.downcast_iter()).collect()
460}
461
462fn group_join_inner<T>(
463    left: &ChunkedArray<T>,
464    right: &ChunkedArray<T>,
465    validate: JoinValidation,
466    nulls_equal: bool,
467) -> PolarsResult<(InnerJoinIds, bool)>
468where
469    T: PolarsDataType,
470    for<'a> &'a T::Array: IntoIterator<Item = Option<&'a T::Physical<'a>>>,
471    for<'a> T::Physical<'a>:
472        Send + Sync + Copy + TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
473    for<'a> <T::Physical<'a> as ToTotalOrd>::TotalOrdItem:
474        Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
475{
476    let n_threads = POOL.current_num_threads();
477    let (a, b, swapped) = det_hash_prone_order!(left, right);
478    let splitted_a = split(a, n_threads);
479    let splitted_b = split(b, n_threads);
480    let splitted_a = get_arrays(&splitted_a);
481    let splitted_b = get_arrays(&splitted_b);
482
483    match (left.null_count(), right.null_count()) {
484        (0, 0) => {
485            let first = &splitted_a[0];
486            if first.as_slice().is_some() {
487                let splitted_a = splitted_a
488                    .iter()
489                    .map(|arr| arr.as_slice().unwrap())
490                    .collect::<Vec<_>>();
491                let splitted_b = splitted_b
492                    .iter()
493                    .map(|arr| arr.as_slice().unwrap())
494                    .collect::<Vec<_>>();
495                Ok((
496                    hash_join_tuples_inner(
497                        splitted_a,
498                        splitted_b,
499                        swapped,
500                        validate,
501                        nulls_equal,
502                        0,
503                    )?,
504                    !swapped,
505                ))
506            } else {
507                Ok((
508                    hash_join_tuples_inner(
509                        splitted_a,
510                        splitted_b,
511                        swapped,
512                        validate,
513                        nulls_equal,
514                        0,
515                    )?,
516                    !swapped,
517                ))
518            }
519        },
520        _ => {
521            let build_null_count = if swapped {
522                left.null_count()
523            } else {
524                right.null_count()
525            };
526            Ok((
527                hash_join_tuples_inner(
528                    splitted_a,
529                    splitted_b,
530                    swapped,
531                    validate,
532                    nulls_equal,
533                    build_null_count,
534                )?,
535                !swapped,
536            ))
537        },
538    }
539}
540
541#[cfg(feature = "chunked_ids")]
542fn create_mappings(
543    chunks_left: &[ArrayRef],
544    chunks_right: &[ArrayRef],
545    left_len: usize,
546    right_len: usize,
547) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
548    let mapping_left = || {
549        if chunks_left.len() > 1 {
550            Some(create_chunked_index_mapping(chunks_left, left_len))
551        } else {
552            None
553        }
554    };
555
556    let mapping_right = || {
557        if chunks_right.len() > 1 {
558            Some(create_chunked_index_mapping(chunks_right, right_len))
559        } else {
560            None
561        }
562    };
563
564    POOL.join(mapping_left, mapping_right)
565}
566
567#[cfg(not(feature = "chunked_ids"))]
568fn create_mappings(
569    _chunks_left: &[ArrayRef],
570    _chunks_right: &[ArrayRef],
571    _left_len: usize,
572    _right_len: usize,
573) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
574    (None, None)
575}
576
577fn num_group_join_left<T>(
578    left: &ChunkedArray<T>,
579    right: &ChunkedArray<T>,
580    validate: JoinValidation,
581    nulls_equal: bool,
582) -> PolarsResult<LeftJoinIds>
583where
584    T: PolarsNumericType,
585    T::Native: TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
586    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
587    T::Native: DirtyHash + Copy + ToTotalOrd,
588    <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash,
589{
590    let n_threads = POOL.current_num_threads();
591    let splitted_a = split(left, n_threads);
592    let splitted_b = split(right, n_threads);
593    match (
594        left.null_count(),
595        right.null_count(),
596        left.chunks().len(),
597        right.chunks().len(),
598    ) {
599        (0, 0, 1, 1) => {
600            let keys_a = chunks_as_slices(&splitted_a);
601            let keys_b = chunks_as_slices(&splitted_b);
602            hash_join_tuples_left(keys_a, keys_b, None, None, validate, nulls_equal, 0)
603        },
604        (0, 0, _, _) => {
605            let keys_a = chunks_as_slices(&splitted_a);
606            let keys_b = chunks_as_slices(&splitted_b);
607
608            let (mapping_left, mapping_right) =
609                create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
610            hash_join_tuples_left(
611                keys_a,
612                keys_b,
613                mapping_left.as_deref(),
614                mapping_right.as_deref(),
615                validate,
616                nulls_equal,
617                0,
618            )
619        },
620        _ => {
621            let keys_a = get_arrays(&splitted_a);
622            let keys_b = get_arrays(&splitted_b);
623            let (mapping_left, mapping_right) =
624                create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
625            let build_null_count = right.null_count();
626            hash_join_tuples_left(
627                keys_a,
628                keys_b,
629                mapping_left.as_deref(),
630                mapping_right.as_deref(),
631                validate,
632                nulls_equal,
633                build_null_count,
634            )
635        },
636    }
637}
638
639fn hash_join_outer<T>(
640    ca_in: &ChunkedArray<T>,
641    other: &ChunkedArray<T>,
642    validate: JoinValidation,
643    nulls_equal: bool,
644) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)>
645where
646    T: PolarsNumericType,
647    T::Native: TotalHash + TotalEq + ToTotalOrd,
648    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + IsNull,
649{
650    let (a, b, swapped) = det_hash_prone_order!(ca_in, other);
651
652    let n_partitions = _set_partition_size();
653    let splitted_a = split(a, n_partitions);
654    let splitted_b = split(b, n_partitions);
655
656    match (a.null_count(), b.null_count()) {
657        (0, 0) => {
658            let iters_a = splitted_a
659                .iter()
660                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
661                .collect::<Vec<_>>();
662            let iters_b = splitted_b
663                .iter()
664                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
665                .collect::<Vec<_>>();
666            hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
667        },
668        _ => {
669            let iters_a = splitted_a
670                .iter()
671                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
672                .collect::<Vec<_>>();
673            let iters_b = splitted_b
674                .iter()
675                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
676                .collect::<Vec<_>>();
677            hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
678        },
679    }
680}
681
682pub(crate) fn prepare_binary<'a, T>(
683    ca: &'a ChunkedArray<T>,
684    other: &'a ChunkedArray<T>,
685    // In inner join and outer join, the shortest relation will be used to create a hash table.
686    // In left join, always use the right side to create.
687    build_shortest_table: bool,
688) -> (
689    Vec<Vec<BytesHash<'a>>>,
690    Vec<Vec<BytesHash<'a>>>,
691    bool,
692    PlRandomState,
693)
694where
695    T: PolarsDataType,
696    for<'b> <T::Array as StaticArray>::ValueT<'b>: AsRef<[u8]>,
697{
698    let (a, b, swapped) = if build_shortest_table {
699        det_hash_prone_order!(ca, other)
700    } else {
701        (ca, other, false)
702    };
703    let hb = PlRandomState::default();
704    let bh_a = a.to_bytes_hashes(true, hb);
705    let bh_b = b.to_bytes_hashes(true, hb);
706
707    (bh_a, bh_b, swapped, hb)
708}
709
710#[cfg(feature = "semi_anti_join")]
711fn num_group_join_anti_semi<T>(
712    left: &ChunkedArray<T>,
713    right: &ChunkedArray<T>,
714    anti: bool,
715    nulls_equal: bool,
716) -> Vec<IdxSize>
717where
718    T: PolarsNumericType,
719    T::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
720    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
721    <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash + IsNull,
722{
723    let n_threads = POOL.current_num_threads();
724    let splitted_a = split(left, n_threads);
725    let splitted_b = split(right, n_threads);
726    match (
727        left.null_count(),
728        right.null_count(),
729        left.chunks().len(),
730        right.chunks().len(),
731    ) {
732        (0, 0, 1, 1) => {
733            let keys_a = chunks_as_slices(&splitted_a);
734            let keys_b = chunks_as_slices(&splitted_b);
735            if anti {
736                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
737            } else {
738                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
739            }
740        },
741        (0, 0, _, _) => {
742            let keys_a = chunks_as_slices(&splitted_a);
743            let keys_b = chunks_as_slices(&splitted_b);
744            if anti {
745                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
746            } else {
747                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
748            }
749        },
750        _ => {
751            let keys_a = get_arrays(&splitted_a);
752            let keys_b = get_arrays(&splitted_b);
753            if anti {
754                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
755            } else {
756                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
757            }
758        },
759    }
760}