polars_ops/frame/join/hash_join/
single_keys_dispatch.rs

1use arrow::array::PrimitiveArray;
2use polars_core::chunked_array::ops::row_encode::encode_rows_unordered;
3use polars_core::series::BitRepr;
4use polars_core::utils::split;
5use polars_core::with_match_physical_float_polars_type;
6use polars_utils::aliases::PlRandomState;
7use polars_utils::hashing::DirtyHash;
8use polars_utils::nulls::IsNull;
9use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};
10
11use super::*;
12use crate::series::SeriesSealed;
13
14pub trait SeriesJoin: SeriesSealed + Sized {
15    #[doc(hidden)]
16    fn hash_join_left(
17        &self,
18        other: &Series,
19        validate: JoinValidation,
20        nulls_equal: bool,
21    ) -> PolarsResult<LeftJoinIds> {
22        let s_self = self.as_series();
23        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
24        validate.validate_probe(&lhs, &rhs, false, nulls_equal)?;
25
26        let lhs_dtype = lhs.dtype();
27        let rhs_dtype = rhs.dtype();
28
29        use DataType as T;
30        match lhs_dtype {
31            T::String | T::Binary => {
32                let lhs = lhs.cast(&T::Binary).unwrap();
33                let rhs = rhs.cast(&T::Binary).unwrap();
34                let lhs = lhs.binary().unwrap();
35                let rhs = rhs.binary().unwrap();
36                let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
37                let lhs = lhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
38                let rhs = rhs.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
39                let build_null_count = other.null_count();
40                hash_join_tuples_left(
41                    lhs,
42                    rhs,
43                    None,
44                    None,
45                    validate,
46                    nulls_equal,
47                    build_null_count,
48                )
49            },
50            T::BinaryOffset => {
51                let lhs = lhs.binary_offset().unwrap();
52                let rhs = rhs.binary_offset().unwrap();
53                let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
54                // Take slices so that vecs are not copied
55                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
56                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
57                let build_null_count = other.null_count();
58                hash_join_tuples_left(
59                    lhs,
60                    rhs,
61                    None,
62                    None,
63                    validate,
64                    nulls_equal,
65                    build_null_count,
66                )
67            },
68            T::List(_) => {
69                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
70                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
71                lhs.hash_join_left(rhs, validate, nulls_equal)
72            },
73            #[cfg(feature = "dtype-array")]
74            T::Array(_, _) => {
75                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
76                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
77                lhs.hash_join_left(rhs, validate, nulls_equal)
78            },
79            #[cfg(feature = "dtype-struct")]
80            T::Struct(_) => {
81                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
82                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
83                lhs.hash_join_left(rhs, validate, nulls_equal)
84            },
85            x if x.is_float() => {
86                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
87                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
88                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
89                    num_group_join_left(lhs, rhs, validate, nulls_equal)
90                })
91            },
92            _ => {
93                let lhs = s_self.bit_repr();
94                let rhs = other.bit_repr();
95
96                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
97                    polars_bail!(nyi = "Hash Left Join between {lhs_dtype} and {rhs_dtype}");
98                };
99
100                use BitRepr as B;
101                match (lhs, rhs) {
102                    (B::U8(lhs), B::U8(rhs)) => {
103                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
104                    },
105                    (B::U16(lhs), B::U16(rhs)) => {
106                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
107                    },
108                    (B::U32(lhs), B::U32(rhs)) => {
109                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
110                    },
111                    (B::U64(lhs), B::U64(rhs)) => {
112                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
113                    },
114                    #[cfg(feature = "dtype-u128")]
115                    (B::U128(lhs), B::U128(rhs)) => {
116                        num_group_join_left(&lhs, &rhs, validate, nulls_equal)
117                    },
118                    _ => {
119                        polars_bail!(
120                            nyi = "Mismatch bit repr Hash Left Join between {lhs_dtype} and {rhs_dtype}",
121                        );
122                    },
123                }
124            },
125        }
126    }
127
128    #[cfg(feature = "semi_anti_join")]
129    fn hash_join_semi_anti(
130        &self,
131        other: &Series,
132        anti: bool,
133        nulls_equal: bool,
134    ) -> PolarsResult<Vec<IdxSize>> {
135        let s_self = self.as_series();
136        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
137
138        let lhs_dtype = lhs.dtype();
139        let rhs_dtype = rhs.dtype();
140
141        use DataType as T;
142        Ok(match lhs_dtype {
143            T::String | T::Binary => {
144                let lhs = lhs.cast(&T::Binary).unwrap();
145                let rhs = rhs.cast(&T::Binary).unwrap();
146                let lhs = lhs.binary().unwrap();
147                let rhs = rhs.binary().unwrap();
148                let (lhs, rhs, _, _) = prepare_binary::<BinaryType>(lhs, rhs, false);
149                // Take slices so that vecs are not copied
150                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
151                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
152                if anti {
153                    hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
154                } else {
155                    hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
156                }
157            },
158            T::BinaryOffset => {
159                let lhs = lhs.binary_offset().unwrap();
160                let rhs = rhs.binary_offset().unwrap();
161                let (lhs, rhs, _, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, false);
162                // Take slices so that vecs are not copied
163                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
164                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
165                if anti {
166                    hash_join_tuples_left_anti(lhs, rhs, nulls_equal)
167                } else {
168                    hash_join_tuples_left_semi(lhs, rhs, nulls_equal)
169                }
170            },
171            T::List(_) => {
172                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
173                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
174                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
175            },
176            #[cfg(feature = "dtype-array")]
177            T::Array(_, _) => {
178                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
179                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
180                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
181            },
182            #[cfg(feature = "dtype-struct")]
183            T::Struct(_) => {
184                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
185                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
186                lhs.hash_join_semi_anti(rhs, anti, nulls_equal)?
187            },
188            x if x.is_float() => {
189                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
190                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
191                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
192                    num_group_join_anti_semi(lhs, rhs, anti, nulls_equal)
193                })
194            },
195            _ => {
196                let lhs = s_self.bit_repr();
197                let rhs = other.bit_repr();
198
199                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
200                    polars_bail!(nyi = "Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}");
201                };
202
203                use BitRepr as B;
204                match (lhs, rhs) {
205                    (B::U8(lhs), B::U8(rhs)) => {
206                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
207                    },
208                    (B::U16(lhs), B::U16(rhs)) => {
209                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
210                    },
211                    (B::U32(lhs), B::U32(rhs)) => {
212                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
213                    },
214                    (B::U64(lhs), B::U64(rhs)) => {
215                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
216                    },
217                    #[cfg(feature = "dtype-u128")]
218                    (B::U128(lhs), B::U128(rhs)) => {
219                        num_group_join_anti_semi(&lhs, &rhs, anti, nulls_equal)
220                    },
221                    _ => {
222                        polars_bail!(
223                            nyi = "Mismatch bit repr Hash Semi-Anti Join between {lhs_dtype} and {rhs_dtype}",
224                        );
225                    },
226                }
227            },
228        })
229    }
230
231    // returns the join tuples and whether or not the lhs tuples are sorted
232    fn hash_join_inner(
233        &self,
234        other: &Series,
235        validate: JoinValidation,
236        nulls_equal: bool,
237    ) -> PolarsResult<(InnerJoinIds, bool)> {
238        let s_self = self.as_series();
239        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
240        validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
241
242        let lhs_dtype = lhs.dtype();
243        let rhs_dtype = rhs.dtype();
244
245        use DataType as T;
246        match lhs_dtype {
247            T::String | T::Binary => {
248                let lhs = lhs.cast(&T::Binary).unwrap();
249                let rhs = rhs.cast(&T::Binary).unwrap();
250                let lhs = lhs.binary().unwrap();
251                let rhs = rhs.binary().unwrap();
252                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
253                // Take slices so that vecs are not copied
254                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
255                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
256                let build_null_count = if swapped {
257                    s_self.null_count()
258                } else {
259                    other.null_count()
260                };
261                Ok((
262                    hash_join_tuples_inner(
263                        lhs,
264                        rhs,
265                        swapped,
266                        validate,
267                        nulls_equal,
268                        build_null_count,
269                    )?,
270                    !swapped,
271                ))
272            },
273            T::BinaryOffset => {
274                let lhs = lhs.binary_offset().unwrap();
275                let rhs = rhs.binary_offset()?;
276                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
277                // Take slices so that vecs are not copied
278                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
279                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
280                let build_null_count = if swapped {
281                    s_self.null_count()
282                } else {
283                    other.null_count()
284                };
285                Ok((
286                    hash_join_tuples_inner(
287                        lhs,
288                        rhs,
289                        swapped,
290                        validate,
291                        nulls_equal,
292                        build_null_count,
293                    )?,
294                    !swapped,
295                ))
296            },
297            T::List(_) => {
298                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
299                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
300                lhs.hash_join_inner(rhs, validate, nulls_equal)
301            },
302            #[cfg(feature = "dtype-array")]
303            T::Array(_, _) => {
304                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
305                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
306                lhs.hash_join_inner(rhs, validate, nulls_equal)
307            },
308            #[cfg(feature = "dtype-struct")]
309            T::Struct(_) => {
310                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
311                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
312                lhs.hash_join_inner(rhs, validate, nulls_equal)
313            },
314            x if x.is_float() => {
315                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
316                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
317                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
318                    group_join_inner::<$T>(lhs, rhs, validate, nulls_equal)
319                })
320            },
321            _ => {
322                let lhs = s_self.bit_repr();
323                let rhs = other.bit_repr();
324
325                let (Some(lhs), Some(rhs)) = (lhs, rhs) else {
326                    polars_bail!(nyi = "Hash Inner Join between {lhs_dtype} and {rhs_dtype}");
327                };
328
329                use BitRepr as B;
330                match (lhs, rhs) {
331                    (B::U8(lhs), B::U8(rhs)) => {
332                        group_join_inner::<UInt8Type>(&lhs, &rhs, validate, nulls_equal)
333                    },
334                    (B::U16(lhs), B::U16(rhs)) => {
335                        group_join_inner::<UInt16Type>(&lhs, &rhs, validate, nulls_equal)
336                    },
337                    (B::U32(lhs), B::U32(rhs)) => {
338                        group_join_inner::<UInt32Type>(&lhs, &rhs, validate, nulls_equal)
339                    },
340                    (B::U64(lhs), BitRepr::U64(rhs)) => {
341                        group_join_inner::<UInt64Type>(&lhs, &rhs, validate, nulls_equal)
342                    },
343                    #[cfg(feature = "dtype-u128")]
344                    (B::U128(lhs), BitRepr::U128(rhs)) => {
345                        group_join_inner::<UInt128Type>(&lhs, &rhs, validate, nulls_equal)
346                    },
347                    _ => {
348                        polars_bail!(
349                            nyi = "Mismatch bit repr Hash Inner Join between {lhs_dtype} and {rhs_dtype}"
350                        );
351                    },
352                }
353            },
354        }
355    }
356
357    fn hash_join_outer(
358        &self,
359        other: &Series,
360        validate: JoinValidation,
361        nulls_equal: bool,
362    ) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)> {
363        let s_self = self.as_series();
364        let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr());
365        validate.validate_probe(&lhs, &rhs, true, nulls_equal)?;
366
367        let lhs_dtype = lhs.dtype();
368        let rhs_dtype = rhs.dtype();
369
370        use DataType as T;
371        match lhs_dtype {
372            T::String | T::Binary => {
373                let lhs = lhs.cast(&T::Binary).unwrap();
374                let rhs = rhs.cast(&T::Binary).unwrap();
375                let lhs = lhs.binary().unwrap();
376                let rhs = rhs.binary().unwrap();
377                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryType>(lhs, rhs, true);
378                // Take slices so that vecs are not copied
379                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
380                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
381                hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
382            },
383            T::BinaryOffset => {
384                let lhs = lhs.binary_offset().unwrap();
385                let rhs = rhs.binary_offset()?;
386                let (lhs, rhs, swapped, _) = prepare_binary::<BinaryOffsetType>(lhs, rhs, true);
387                // Take slices so that vecs are not copied
388                let lhs = lhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
389                let rhs = rhs.iter().map(|k| k.as_slice()).collect::<Vec<_>>();
390                hash_join_tuples_outer(lhs, rhs, swapped, validate, nulls_equal)
391            },
392            T::List(_) => {
393                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
394                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
395                lhs.hash_join_outer(rhs, validate, nulls_equal)
396            },
397            #[cfg(feature = "dtype-array")]
398            T::Array(_, _) => {
399                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
400                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
401                lhs.hash_join_outer(rhs, validate, nulls_equal)
402            },
403            #[cfg(feature = "dtype-struct")]
404            T::Struct(_) => {
405                let lhs = &encode_rows_unordered(&[lhs.into_owned().into()])?.into_series();
406                let rhs = &encode_rows_unordered(&[rhs.into_owned().into()])?.into_series();
407                lhs.hash_join_outer(rhs, validate, nulls_equal)
408            },
409            x if x.is_float() => {
410                with_match_physical_float_polars_type!(lhs.dtype(), |$T| {
411                    let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
412                    let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();
413                    hash_join_outer(lhs, rhs, validate, nulls_equal)
414                })
415            },
416            _ => {
417                let (Some(lhs), Some(rhs)) = (s_self.bit_repr(), other.bit_repr()) else {
418                    polars_bail!(nyi = "Hash Join Outer between {lhs_dtype} and {rhs_dtype}");
419                };
420
421                use BitRepr as B;
422                match (lhs, rhs) {
423                    (B::U8(lhs), B::U8(rhs)) => hash_join_outer(&lhs, &rhs, validate, nulls_equal),
424                    (B::U16(lhs), B::U16(rhs)) => {
425                        hash_join_outer(&lhs, &rhs, validate, nulls_equal)
426                    },
427                    (B::U32(lhs), B::U32(rhs)) => {
428                        hash_join_outer(&lhs, &rhs, validate, nulls_equal)
429                    },
430                    (B::U64(lhs), B::U64(rhs)) => {
431                        hash_join_outer(&lhs, &rhs, validate, nulls_equal)
432                    },
433                    #[cfg(feature = "dtype-u128")]
434                    (B::U128(lhs), B::U128(rhs)) => {
435                        hash_join_outer(&lhs, &rhs, validate, nulls_equal)
436                    },
437                    _ => {
438                        polars_bail!(
439                            nyi = "Mismatch bit repr Hash Join Outer between {lhs_dtype} and {rhs_dtype}"
440                        );
441                    },
442                }
443            },
444        }
445    }
446}
447
448impl SeriesJoin for Series {}
449
450fn chunks_as_slices<T>(splitted: &[ChunkedArray<T>]) -> Vec<&[T::Native]>
451where
452    T: PolarsNumericType,
453{
454    splitted
455        .iter()
456        .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
457        .collect()
458}
459
460fn get_arrays<T: PolarsDataType>(cas: &[ChunkedArray<T>]) -> Vec<&T::Array> {
461    cas.iter().flat_map(|arr| arr.downcast_iter()).collect()
462}
463
464fn group_join_inner<T>(
465    left: &ChunkedArray<T>,
466    right: &ChunkedArray<T>,
467    validate: JoinValidation,
468    nulls_equal: bool,
469) -> PolarsResult<(InnerJoinIds, bool)>
470where
471    T: PolarsDataType,
472    for<'a> &'a T::Array: IntoIterator<Item = Option<&'a T::Physical<'a>>>,
473    for<'a> T::Physical<'a>:
474        Send + Sync + Copy + TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
475    for<'a> <T::Physical<'a> as ToTotalOrd>::TotalOrdItem:
476        Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
477{
478    let n_threads = POOL.current_num_threads();
479    let (a, b, swapped) = det_hash_prone_order!(left, right);
480    let splitted_a = split(a, n_threads);
481    let splitted_b = split(b, n_threads);
482    let splitted_a = get_arrays(&splitted_a);
483    let splitted_b = get_arrays(&splitted_b);
484
485    match (left.null_count(), right.null_count()) {
486        (0, 0) => {
487            let first = &splitted_a[0];
488            if first.as_slice().is_some() {
489                let splitted_a = splitted_a
490                    .iter()
491                    .map(|arr| arr.as_slice().unwrap())
492                    .collect::<Vec<_>>();
493                let splitted_b = splitted_b
494                    .iter()
495                    .map(|arr| arr.as_slice().unwrap())
496                    .collect::<Vec<_>>();
497                Ok((
498                    hash_join_tuples_inner(
499                        splitted_a,
500                        splitted_b,
501                        swapped,
502                        validate,
503                        nulls_equal,
504                        0,
505                    )?,
506                    !swapped,
507                ))
508            } else {
509                Ok((
510                    hash_join_tuples_inner(
511                        splitted_a,
512                        splitted_b,
513                        swapped,
514                        validate,
515                        nulls_equal,
516                        0,
517                    )?,
518                    !swapped,
519                ))
520            }
521        },
522        _ => {
523            let build_null_count = if swapped {
524                left.null_count()
525            } else {
526                right.null_count()
527            };
528            Ok((
529                hash_join_tuples_inner(
530                    splitted_a,
531                    splitted_b,
532                    swapped,
533                    validate,
534                    nulls_equal,
535                    build_null_count,
536                )?,
537                !swapped,
538            ))
539        },
540    }
541}
542
543#[cfg(feature = "chunked_ids")]
544fn create_mappings(
545    chunks_left: &[ArrayRef],
546    chunks_right: &[ArrayRef],
547    left_len: usize,
548    right_len: usize,
549) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
550    let mapping_left = || {
551        if chunks_left.len() > 1 {
552            Some(create_chunked_index_mapping(chunks_left, left_len))
553        } else {
554            None
555        }
556    };
557
558    let mapping_right = || {
559        if chunks_right.len() > 1 {
560            Some(create_chunked_index_mapping(chunks_right, right_len))
561        } else {
562            None
563        }
564    };
565
566    POOL.join(mapping_left, mapping_right)
567}
568
569#[cfg(not(feature = "chunked_ids"))]
570fn create_mappings(
571    _chunks_left: &[ArrayRef],
572    _chunks_right: &[ArrayRef],
573    _left_len: usize,
574    _right_len: usize,
575) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
576    (None, None)
577}
578
579fn num_group_join_left<T>(
580    left: &ChunkedArray<T>,
581    right: &ChunkedArray<T>,
582    validate: JoinValidation,
583    nulls_equal: bool,
584) -> PolarsResult<LeftJoinIds>
585where
586    T: PolarsNumericType,
587    T::Native: TotalHash + TotalEq + DirtyHash + IsNull + ToTotalOrd,
588    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
589    T::Native: DirtyHash + Copy + ToTotalOrd,
590    <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash,
591{
592    let n_threads = POOL.current_num_threads();
593    let splitted_a = split(left, n_threads);
594    let splitted_b = split(right, n_threads);
595    match (
596        left.null_count(),
597        right.null_count(),
598        left.chunks().len(),
599        right.chunks().len(),
600    ) {
601        (0, 0, 1, 1) => {
602            let keys_a = chunks_as_slices(&splitted_a);
603            let keys_b = chunks_as_slices(&splitted_b);
604            hash_join_tuples_left(keys_a, keys_b, None, None, validate, nulls_equal, 0)
605        },
606        (0, 0, _, _) => {
607            let keys_a = chunks_as_slices(&splitted_a);
608            let keys_b = chunks_as_slices(&splitted_b);
609
610            let (mapping_left, mapping_right) =
611                create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
612            hash_join_tuples_left(
613                keys_a,
614                keys_b,
615                mapping_left.as_deref(),
616                mapping_right.as_deref(),
617                validate,
618                nulls_equal,
619                0,
620            )
621        },
622        _ => {
623            let keys_a = get_arrays(&splitted_a);
624            let keys_b = get_arrays(&splitted_b);
625            let (mapping_left, mapping_right) =
626                create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
627            let build_null_count = right.null_count();
628            hash_join_tuples_left(
629                keys_a,
630                keys_b,
631                mapping_left.as_deref(),
632                mapping_right.as_deref(),
633                validate,
634                nulls_equal,
635                build_null_count,
636            )
637        },
638    }
639}
640
641fn hash_join_outer<T>(
642    ca_in: &ChunkedArray<T>,
643    other: &ChunkedArray<T>,
644    validate: JoinValidation,
645    nulls_equal: bool,
646) -> PolarsResult<(PrimitiveArray<IdxSize>, PrimitiveArray<IdxSize>)>
647where
648    T: PolarsNumericType,
649    T::Native: TotalHash + TotalEq + ToTotalOrd,
650    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + IsNull,
651{
652    let (a, b, swapped) = det_hash_prone_order!(ca_in, other);
653
654    let n_partitions = _set_partition_size();
655    let splitted_a = split(a, n_partitions);
656    let splitted_b = split(b, n_partitions);
657
658    match (a.null_count(), b.null_count()) {
659        (0, 0) => {
660            let iters_a = splitted_a
661                .iter()
662                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
663                .collect::<Vec<_>>();
664            let iters_b = splitted_b
665                .iter()
666                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.values().as_slice()))
667                .collect::<Vec<_>>();
668            hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
669        },
670        _ => {
671            let iters_a = splitted_a
672                .iter()
673                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
674                .collect::<Vec<_>>();
675            let iters_b = splitted_b
676                .iter()
677                .flat_map(|ca| ca.downcast_iter().map(|arr| arr.iter()))
678                .collect::<Vec<_>>();
679            hash_join_tuples_outer(iters_a, iters_b, swapped, validate, nulls_equal)
680        },
681    }
682}
683
684pub(crate) fn prepare_binary<'a, T>(
685    ca: &'a ChunkedArray<T>,
686    other: &'a ChunkedArray<T>,
687    // In inner join and outer join, the shortest relation will be used to create a hash table.
688    // In left join, always use the right side to create.
689    build_shortest_table: bool,
690) -> (
691    Vec<Vec<BytesHash<'a>>>,
692    Vec<Vec<BytesHash<'a>>>,
693    bool,
694    PlRandomState,
695)
696where
697    T: PolarsDataType,
698    for<'b> <T::Array as StaticArray>::ValueT<'b>: AsRef<[u8]>,
699{
700    let (a, b, swapped) = if build_shortest_table {
701        det_hash_prone_order!(ca, other)
702    } else {
703        (ca, other, false)
704    };
705    let hb = PlRandomState::default();
706    let bh_a = a.to_bytes_hashes(true, hb.clone());
707    let bh_b = b.to_bytes_hashes(true, hb.clone());
708
709    (bh_a, bh_b, swapped, hb)
710}
711
712#[cfg(feature = "semi_anti_join")]
713fn num_group_join_anti_semi<T>(
714    left: &ChunkedArray<T>,
715    right: &ChunkedArray<T>,
716    anti: bool,
717    nulls_equal: bool,
718) -> Vec<IdxSize>
719where
720    T: PolarsNumericType,
721    T::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
722    <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,
723    <Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash + IsNull,
724{
725    let n_threads = POOL.current_num_threads();
726    let splitted_a = split(left, n_threads);
727    let splitted_b = split(right, n_threads);
728    match (
729        left.null_count(),
730        right.null_count(),
731        left.chunks().len(),
732        right.chunks().len(),
733    ) {
734        (0, 0, 1, 1) => {
735            let keys_a = chunks_as_slices(&splitted_a);
736            let keys_b = chunks_as_slices(&splitted_b);
737            if anti {
738                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
739            } else {
740                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
741            }
742        },
743        (0, 0, _, _) => {
744            let keys_a = chunks_as_slices(&splitted_a);
745            let keys_b = chunks_as_slices(&splitted_b);
746            if anti {
747                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
748            } else {
749                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
750            }
751        },
752        _ => {
753            let keys_a = get_arrays(&splitted_a);
754            let keys_b = get_arrays(&splitted_b);
755            if anti {
756                hash_join_tuples_left_anti(keys_a, keys_b, nulls_equal)
757            } else {
758                hash_join_tuples_left_semi(keys_a, keys_b, nulls_equal)
759            }
760        },
761    }
762}