polars_ops/frame/join/hash_join/
mod.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2pub(super) mod single_keys;
3mod single_keys_dispatch;
4mod single_keys_inner;
5mod single_keys_left;
6mod single_keys_outer;
7#[cfg(feature = "semi_anti_join")]
8mod single_keys_semi_anti;
9pub(super) mod sort_merge;
10use arrow::array::ArrayRef;
11use polars_core::POOL;
12use polars_core::utils::_set_partition_size;
13use polars_utils::index::ChunkId;
14pub(super) use single_keys::*;
15pub use single_keys_dispatch::SeriesJoin;
16#[cfg(feature = "asof_join")]
17pub(super) use single_keys_dispatch::prepare_binary;
18use single_keys_inner::*;
19use single_keys_left::*;
20use single_keys_outer::*;
21#[cfg(feature = "semi_anti_join")]
22use single_keys_semi_anti::*;
23pub(crate) use sort_merge::*;
24
25pub use super::*;
26#[cfg(feature = "chunked_ids")]
27use crate::chunked_array::gather::chunked::TakeChunkedHorPar;
28
29pub fn default_join_ids() -> ChunkJoinOptIds {
30    #[cfg(feature = "chunked_ids")]
31    {
32        Either::Left(vec![])
33    }
34    #[cfg(not(feature = "chunked_ids"))]
35    {
36        vec![]
37    }
38}
39
40macro_rules! det_hash_prone_order {
41    ($self:expr, $other:expr) => {{
42        // The shortest relation will be used to create a hash table.
43        if $self.len() > $other.len() {
44            ($self, $other, false)
45        } else {
46            ($other, $self, true)
47        }
48    }};
49}
50
51#[cfg(feature = "performant")]
52use arrow::legacy::conversion::primitive_to_vec;
53pub(super) use det_hash_prone_order;
54
55pub trait JoinDispatch: IntoDf {
56    /// # Safety
57    /// Join tuples must be in bounds
58    #[cfg(feature = "chunked_ids")]
59    unsafe fn create_left_df_chunked(
60        &self,
61        chunk_ids: &[ChunkId],
62        left_join: bool,
63        was_sliced: bool,
64    ) -> DataFrame {
65        let df_self = self.to_df();
66
67        let left_join_no_duplicate_matches =
68            left_join && !was_sliced && chunk_ids.len() == df_self.height();
69
70        if left_join_no_duplicate_matches {
71            df_self.clone()
72        } else {
73            // left join keys are in ascending order
74            let sorted = if left_join {
75                IsSorted::Ascending
76            } else {
77                IsSorted::Not
78            };
79            df_self._take_chunked_unchecked_hor_par(chunk_ids, sorted)
80        }
81    }
82
83    /// # Safety
84    /// Join tuples must be in bounds
85    unsafe fn _create_left_df_from_slice(
86        &self,
87        join_tuples: &[IdxSize],
88        left_join: bool,
89        was_sliced: bool,
90        sorted_tuple_idx: bool,
91    ) -> DataFrame {
92        let df_self = self.to_df();
93
94        let left_join_no_duplicate_matches =
95            sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height();
96
97        if left_join_no_duplicate_matches {
98            df_self.clone()
99        } else {
100            let sorted = if sorted_tuple_idx {
101                IsSorted::Ascending
102            } else {
103                IsSorted::Not
104            };
105
106            df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)
107        }
108    }
109
110    #[cfg(feature = "semi_anti_join")]
111    /// # Safety
112    /// `idx` must be in bounds
113    unsafe fn _finish_anti_semi_join(
114        &self,
115        mut idx: &[IdxSize],
116        slice: Option<(i64, usize)>,
117    ) -> DataFrame {
118        let ca_self = self.to_df();
119        if let Some((offset, len)) = slice {
120            idx = slice_slice(idx, offset, len);
121        }
122        // idx from anti-semi join should always be sorted
123        ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)
124    }
125
126    #[cfg(feature = "semi_anti_join")]
127    fn _semi_anti_join_from_series(
128        &self,
129        s_left: &Series,
130        s_right: &Series,
131        slice: Option<(i64, usize)>,
132        anti: bool,
133        nulls_equal: bool,
134    ) -> PolarsResult<DataFrame> {
135        let ca_self = self.to_df();
136        #[cfg(feature = "dtype-categorical")]
137        _check_categorical_src(s_left.dtype(), s_right.dtype())?;
138
139        let idx = s_left.hash_join_semi_anti(s_right, anti, nulls_equal)?;
140        // SAFETY:
141        // indices are in bounds
142        Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })
143    }
144    fn _full_join_from_series(
145        &self,
146        other: &DataFrame,
147        s_left: &Series,
148        s_right: &Series,
149        args: JoinArgs,
150    ) -> PolarsResult<DataFrame> {
151        let df_self = self.to_df();
152        #[cfg(feature = "dtype-categorical")]
153        _check_categorical_src(s_left.dtype(), s_right.dtype())?;
154
155        // Get the indexes of the joined relations
156        let (mut join_idx_l, mut join_idx_r) =
157            s_left.hash_join_outer(s_right, args.validation, args.nulls_equal)?;
158
159        try_raise_keyboard_interrupt();
160        if let Some((offset, len)) = args.slice {
161            let (offset, len) = slice_offsets(offset, len, join_idx_l.len());
162            join_idx_l.slice(offset, len);
163            join_idx_r.slice(offset, len);
164        }
165        let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l);
166        let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r);
167
168        let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None {
169            let mut df = DataFrame::new(vec![
170                idx_ca_l.into_series().into(),
171                idx_ca_r.into_series().into(),
172            ])?;
173
174            let options = SortMultipleOptions::new()
175                .with_order_descending(false)
176                .with_maintain_order(true)
177                .with_nulls_last(true);
178
179            let columns = match args.maintain_order {
180                MaintainOrderJoin::Left => vec!["a"],
181                MaintainOrderJoin::LeftRight => vec!["a", "b"],
182                MaintainOrderJoin::Right => vec!["b"],
183                MaintainOrderJoin::RightLeft => vec!["b", "a"],
184                _ => unreachable!(),
185            };
186
187            df.sort_in_place(columns, options)?;
188
189            let join_tuples_left = df.column("a").unwrap().idx().unwrap();
190            let join_tuples_right = df.column("b").unwrap().idx().unwrap();
191            POOL.join(
192                || unsafe { df_self.take_unchecked(join_tuples_left) },
193                || unsafe { other.take_unchecked(join_tuples_right) },
194            )
195        } else {
196            POOL.join(
197                || unsafe { df_self.take_unchecked(&idx_ca_l) },
198                || unsafe { other.take_unchecked(&idx_ca_r) },
199            )
200        };
201
202        let coalesce = args.coalesce.coalesce(&JoinType::Full);
203        let out = _finish_join(df_left, df_right, args.suffix.clone());
204        if coalesce {
205            Ok(_coalesce_full_join(
206                out?,
207                &[s_left.name().clone()],
208                &[s_right.name().clone()],
209                args.suffix.clone(),
210                df_self,
211            ))
212        } else {
213            out
214        }
215    }
216}
217
218impl JoinDispatch for DataFrame {}