polars_ops/frame/join/hash_join/
mod.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2pub(super) mod single_keys;
3mod single_keys_dispatch;
4mod single_keys_inner;
5mod single_keys_left;
6mod single_keys_outer;
7#[cfg(feature = "semi_anti_join")]
8mod single_keys_semi_anti;
9pub(super) mod sort_merge;
10use arrow::array::ArrayRef;
11use polars_core::POOL;
12use polars_core::utils::_set_partition_size;
13use polars_utils::index::ChunkId;
14pub(super) use single_keys::*;
15pub use single_keys_dispatch::SeriesJoin;
16#[cfg(feature = "asof_join")]
17pub(super) use single_keys_dispatch::prepare_binary;
18use single_keys_inner::*;
19use single_keys_left::*;
20use single_keys_outer::*;
21#[cfg(feature = "semi_anti_join")]
22use single_keys_semi_anti::*;
23pub(crate) use sort_merge::*;
24
25pub use super::*;
26#[cfg(feature = "chunked_ids")]
27use crate::chunked_array::gather::chunked::TakeChunkedHorPar;
28
29pub fn default_join_ids() -> ChunkJoinOptIds {
30    #[cfg(feature = "chunked_ids")]
31    {
32        Either::Left(vec![])
33    }
34    #[cfg(not(feature = "chunked_ids"))]
35    {
36        vec![]
37    }
38}
39
40macro_rules! det_hash_prone_order {
41    ($self:expr, $other:expr) => {{
42        // The shortest relation will be used to create a hash table.
43        if $self.len() > $other.len() {
44            ($self, $other, false)
45        } else {
46            ($other, $self, true)
47        }
48    }};
49}
50
51#[cfg(feature = "performant")]
52use arrow::legacy::conversion::primitive_to_vec;
53pub(super) use det_hash_prone_order;
54
55pub trait JoinDispatch: IntoDf {
56    /// # Safety
57    /// Join tuples must be in bounds
58    #[cfg(feature = "chunked_ids")]
59    unsafe fn create_left_df_chunked(
60        &self,
61        chunk_ids: &[ChunkId],
62        left_join: bool,
63        was_sliced: bool,
64    ) -> DataFrame {
65        let df_self = self.to_df();
66
67        let left_join_no_duplicate_matches =
68            left_join && !was_sliced && chunk_ids.len() == df_self.height();
69
70        if left_join_no_duplicate_matches {
71            df_self.clone()
72        } else {
73            // left join keys are in ascending order
74            let sorted = if left_join {
75                IsSorted::Ascending
76            } else {
77                IsSorted::Not
78            };
79            df_self._take_chunked_unchecked_hor_par(chunk_ids, sorted)
80        }
81    }
82
83    /// # Safety
84    /// Join tuples must be in bounds
85    unsafe fn _create_left_df_from_slice(
86        &self,
87        join_tuples: &[IdxSize],
88        left_join: bool,
89        was_sliced: bool,
90        sorted_tuple_idx: bool,
91    ) -> DataFrame {
92        let df_self = self.to_df();
93
94        let left_join_no_duplicate_matches =
95            sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height();
96
97        if left_join_no_duplicate_matches {
98            df_self.clone()
99        } else {
100            let sorted = if sorted_tuple_idx {
101                IsSorted::Ascending
102            } else {
103                IsSorted::Not
104            };
105
106            df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)
107        }
108    }
109
110    #[cfg(feature = "semi_anti_join")]
111    /// # Safety
112    /// `idx` must be in bounds
113    unsafe fn _finish_anti_semi_join(
114        &self,
115        mut idx: &[IdxSize],
116        slice: Option<(i64, usize)>,
117    ) -> DataFrame {
118        let ca_self = self.to_df();
119        if let Some((offset, len)) = slice {
120            idx = slice_slice(idx, offset, len);
121        }
122        // idx from anti-semi join should always be sorted
123        ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)
124    }
125
126    #[cfg(feature = "semi_anti_join")]
127    fn _semi_anti_join_from_series(
128        &self,
129        s_left: &Series,
130        s_right: &Series,
131        slice: Option<(i64, usize)>,
132        anti: bool,
133        nulls_equal: bool,
134    ) -> PolarsResult<DataFrame> {
135        let ca_self = self.to_df();
136
137        let idx = s_left.hash_join_semi_anti(s_right, anti, nulls_equal)?;
138        // SAFETY:
139        // indices are in bounds
140        Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })
141    }
142    fn _full_join_from_series(
143        &self,
144        other: &DataFrame,
145        s_left: &Series,
146        s_right: &Series,
147        args: JoinArgs,
148    ) -> PolarsResult<DataFrame> {
149        let df_self = self.to_df();
150
151        // Get the indexes of the joined relations
152        let (mut join_idx_l, mut join_idx_r) =
153            s_left.hash_join_outer(s_right, args.validation, args.nulls_equal)?;
154
155        try_raise_keyboard_interrupt();
156        if let Some((offset, len)) = args.slice {
157            let (offset, len) = slice_offsets(offset, len, join_idx_l.len());
158            join_idx_l.slice(offset, len);
159            join_idx_r.slice(offset, len);
160        }
161        let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l);
162        let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r);
163
164        let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None {
165            let mut df = DataFrame::new(vec![
166                idx_ca_l.into_series().into(),
167                idx_ca_r.into_series().into(),
168            ])?;
169
170            let options = SortMultipleOptions::new()
171                .with_order_descending(false)
172                .with_maintain_order(true)
173                .with_nulls_last(true);
174
175            let columns = match args.maintain_order {
176                MaintainOrderJoin::Left => vec!["a"],
177                MaintainOrderJoin::LeftRight => vec!["a", "b"],
178                MaintainOrderJoin::Right => vec!["b"],
179                MaintainOrderJoin::RightLeft => vec!["b", "a"],
180                _ => unreachable!(),
181            };
182
183            df.sort_in_place(columns, options)?;
184
185            let join_tuples_left = df.column("a").unwrap().idx().unwrap();
186            let join_tuples_right = df.column("b").unwrap().idx().unwrap();
187            POOL.join(
188                || unsafe { df_self.take_unchecked(join_tuples_left) },
189                || unsafe { other.take_unchecked(join_tuples_right) },
190            )
191        } else {
192            POOL.join(
193                || unsafe { df_self.take_unchecked(&idx_ca_l) },
194                || unsafe { other.take_unchecked(&idx_ca_r) },
195            )
196        };
197
198        let coalesce = args.coalesce.coalesce(&JoinType::Full);
199        let out = _finish_join(df_left, df_right, args.suffix.clone());
200        if coalesce {
201            Ok(_coalesce_full_join(
202                out?,
203                &[s_left.name().clone()],
204                &[s_right.name().clone()],
205                args.suffix,
206                df_self,
207            ))
208        } else {
209            out
210        }
211    }
212}
213
214impl JoinDispatch for DataFrame {}