Skip to main content

polars_ops/frame/join/hash_join/
mod.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2pub(super) mod single_keys;
3mod single_keys_dispatch;
4mod single_keys_inner;
5mod single_keys_left;
6mod single_keys_outer;
7#[cfg(feature = "semi_anti_join")]
8mod single_keys_semi_anti;
9pub(super) mod sort_merge;
10use arrow::array::ArrayRef;
11use polars_core::runtime::RAYON;
12use polars_core::utils::_set_partition_size;
13use polars_utils::index::ChunkId;
14use polars_utils::unique_column_name;
15pub(super) use single_keys::*;
16pub use single_keys_dispatch::SeriesJoin;
17#[cfg(feature = "asof_join")]
18pub(super) use single_keys_dispatch::prepare_binary;
19use single_keys_inner::*;
20use single_keys_left::*;
21use single_keys_outer::*;
22#[cfg(feature = "semi_anti_join")]
23use single_keys_semi_anti::*;
24pub(crate) use sort_merge::*;
25
26pub use super::*;
27#[cfg(feature = "chunked_ids")]
28use crate::chunked_array::gather::chunked::TakeChunkedHorPar;
29
30pub fn default_join_ids() -> ChunkJoinOptIds {
31    #[cfg(feature = "chunked_ids")]
32    {
33        Either::Left(vec![])
34    }
35    #[cfg(not(feature = "chunked_ids"))]
36    {
37        vec![]
38    }
39}
40
41macro_rules! det_hash_prone_order {
42    ($self:expr, $other:expr) => {{
43        // The shortest relation will be used to create a hash table.
44        if $self.len() > $other.len() {
45            ($self, $other, false)
46        } else {
47            ($other, $self, true)
48        }
49    }};
50}
51
52#[cfg(feature = "performant")]
53use arrow::legacy::conversion::primitive_to_vec;
54pub(super) use det_hash_prone_order;
55
56pub trait JoinDispatch: IntoDf {
57    /// # Safety
58    /// Join tuples must be in bounds
59    #[cfg(feature = "chunked_ids")]
60    unsafe fn create_left_df_chunked(
61        &self,
62        chunk_ids: &[ChunkId],
63        left_join: bool,
64        was_sliced: bool,
65    ) -> DataFrame {
66        let df_self = self.to_df();
67
68        let left_join_no_duplicate_matches =
69            left_join && !was_sliced && chunk_ids.len() == df_self.height();
70
71        if left_join_no_duplicate_matches {
72            df_self.clone()
73        } else {
74            // left join keys are in ascending order
75            let sorted = if left_join {
76                IsSorted::Ascending
77            } else {
78                IsSorted::Not
79            };
80            df_self._take_chunked_unchecked_hor_par(chunk_ids, sorted)
81        }
82    }
83
84    /// # Safety
85    /// Join tuples must be in bounds
86    unsafe fn _create_left_df_from_slice(
87        &self,
88        join_tuples: &[IdxSize],
89        left_join: bool,
90        was_sliced: bool,
91        sorted_tuple_idx: bool,
92    ) -> DataFrame {
93        let df_self = self.to_df();
94
95        let left_join_no_duplicate_matches =
96            sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height();
97
98        if left_join_no_duplicate_matches {
99            df_self.clone()
100        } else {
101            let sorted = if sorted_tuple_idx {
102                IsSorted::Ascending
103            } else {
104                IsSorted::Not
105            };
106
107            df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)
108        }
109    }
110
111    #[cfg(feature = "semi_anti_join")]
112    /// # Safety
113    /// `idx` must be in bounds
114    unsafe fn _finish_anti_semi_join(
115        &self,
116        mut idx: &[IdxSize],
117        slice: Option<(i64, usize)>,
118    ) -> DataFrame {
119        let ca_self = self.to_df();
120        if let Some((offset, len)) = slice {
121            idx = slice_slice(idx, offset, len);
122        }
123        // idx from anti-semi join should always be sorted
124        ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)
125    }
126
127    #[cfg(feature = "semi_anti_join")]
128    fn _semi_anti_join_from_series(
129        &self,
130        s_left: &Series,
131        s_right: &Series,
132        slice: Option<(i64, usize)>,
133        anti: bool,
134        nulls_equal: bool,
135    ) -> PolarsResult<DataFrame> {
136        let ca_self = self.to_df();
137
138        let idx = s_left.hash_join_semi_anti(s_right, anti, nulls_equal)?;
139        // SAFETY:
140        // indices are in bounds
141        Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })
142    }
143    fn _full_join_from_series(
144        &self,
145        other: &DataFrame,
146        s_left: &Series,
147        s_right: &Series,
148        args: JoinArgs,
149    ) -> PolarsResult<DataFrame> {
150        let df_self = self.to_df();
151
152        // Get the indexes of the joined relations
153        let (mut join_idx_l, mut join_idx_r) =
154            s_left.hash_join_outer(s_right, args.validation, args.nulls_equal)?;
155
156        try_raise_keyboard_interrupt();
157        if let Some((offset, len)) = args.slice {
158            let (offset, len) = slice_offsets(offset, len, join_idx_l.len());
159            join_idx_l.slice(offset, len);
160            join_idx_r.slice(offset, len);
161        }
162        let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l);
163        let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r);
164
165        let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None {
166            let mut df = unsafe {
167                DataFrame::new_unchecked_infer_height(vec![
168                    idx_ca_l.into_series().into(),
169                    idx_ca_r.into_series().into(),
170                ])
171            };
172
173            let options = SortMultipleOptions::new()
174                .with_order_descending(false)
175                .with_maintain_order(true)
176                .with_nulls_last(true);
177
178            let columns = match args.maintain_order {
179                MaintainOrderJoin::Left => vec!["a"],
180                MaintainOrderJoin::LeftRight => vec!["a", "b"],
181                MaintainOrderJoin::Right => vec!["b"],
182                MaintainOrderJoin::RightLeft => vec!["b", "a"],
183                _ => unreachable!(),
184            };
185
186            df.sort_in_place(columns, options)?;
187
188            let join_tuples_left = df.column("a").unwrap().idx().unwrap();
189            let join_tuples_right = df.column("b").unwrap().idx().unwrap();
190            RAYON.join(
191                || unsafe { df_self.take_unchecked(join_tuples_left) },
192                || unsafe { other.take_unchecked(join_tuples_right) },
193            )
194        } else {
195            RAYON.join(
196                || unsafe { df_self.take_unchecked(&idx_ca_l) },
197                || unsafe { other.take_unchecked(&idx_ca_r) },
198            )
199        };
200
201        let coalesce = args.coalesce.coalesce(&JoinType::Full);
202        if coalesce {
203            let tmp_right_name = unique_column_name();
204            let mut df_right = df_right;
205            df_right.rename(s_right.name().as_str(), tmp_right_name.clone())?;
206            let out = _finish_join(df_left, df_right, args.suffix.clone())?;
207            Ok(_coalesce_full_join(
208                out,
209                &[s_left.name().clone()],
210                &[tmp_right_name],
211                args.suffix,
212                df_self,
213            ))
214        } else {
215            _finish_join(df_left, df_right, args.suffix.clone())
216        }
217    }
218}
219
220impl JoinDispatch for DataFrame {}