Skip to main content

polars_ops/frame/join/
dispatch_left_right.rs

1use polars_core::utils::Container;
2
3use super::*;
4use crate::prelude::*;
5
6pub(super) fn left_join_from_series(
7    left: DataFrame,
8    right: &DataFrame,
9    s_left: &Series,
10    s_right: &Series,
11    args: JoinArgs,
12    verbose: bool,
13    drop_names: Option<Vec<PlSmallStr>>,
14) -> PolarsResult<DataFrame> {
15    let (df_left, df_right) = materialize_left_join_from_series(
16        left, right, s_left, s_right, &args, verbose, drop_names,
17    )?;
18    _finish_join(df_left, df_right, args.suffix)
19}
20
21pub(super) fn right_join_from_series(
22    left: &DataFrame,
23    right: DataFrame,
24    s_left: &Series,
25    s_right: &Series,
26    mut args: JoinArgs,
27    verbose: bool,
28    drop_names: Option<Vec<PlSmallStr>>,
29) -> PolarsResult<DataFrame> {
30    // Swap the order of tables to do a right join.
31    args.maintain_order = args.maintain_order.flip();
32    let (df_right, df_left) = materialize_left_join_from_series(
33        right, left, s_right, s_left, &args, verbose, drop_names,
34    )?;
35    _finish_join(df_left, df_right, args.suffix)
36}
37
38pub fn materialize_left_join_from_series(
39    mut left: DataFrame,
40    right_: &DataFrame,
41    s_left: &Series,
42    s_right: &Series,
43    args: &JoinArgs,
44    verbose: bool,
45    drop_names: Option<Vec<PlSmallStr>>,
46) -> PolarsResult<(DataFrame, DataFrame)> {
47    let mut s_left = s_left.clone();
48    // Eagerly limit left if possible.
49    if let Some((offset, len)) = args.slice {
50        if offset == 0 {
51            left = left.slice(0, len);
52            s_left = s_left.slice(0, len);
53        }
54    }
55
56    // Ensure that the chunks are aligned otherwise we go OOB.
57    let requires_ordering = matches!(
58        args.maintain_order,
59        MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft
60    );
61
62    let mut right = Cow::Borrowed(right_);
63    let mut s_right = s_right.clone();
64    if left.should_rechunk() || requires_ordering || left.n_chunks() != s_left.n_chunks() {
65        left.rechunk_mut_par();
66        s_left = s_left.rechunk();
67    }
68    if right.should_rechunk() || requires_ordering || right.n_chunks() != s_right.n_chunks() {
69        let mut other = right_.clone();
70        other.rechunk_mut_par();
71        right = Cow::Owned(other);
72        s_right = s_right.rechunk();
73    }
74
75    let (left_idx, right_idx) = sort_or_hash_left(
76        &s_left,
77        &s_right,
78        verbose,
79        args.validation,
80        args.nulls_equal,
81    )?;
82
83    let right = if let Some(drop_names) = drop_names {
84        right.drop_many(drop_names)
85    } else {
86        right.drop(s_right.name()).unwrap()
87    };
88    try_raise_keyboard_interrupt();
89
90    #[cfg(feature = "chunked_ids")]
91    match (left_idx, right_idx) {
92        (ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Left(right_idx)) => {
93            if requires_ordering {
94                Ok(maintain_order_idx(
95                    &left,
96                    &right,
97                    left_idx.as_slice(),
98                    right_idx.as_slice(),
99                    args,
100                ))
101            } else {
102                Ok(RAYON.join(
103                    || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
104                    || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
105                ))
106            }
107        },
108        (ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(RAYON.join(
109            || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
110            || materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
111        )),
112        (ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(RAYON.join(
113            || materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
114            || materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
115        )),
116        (ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Left(right_idx)) => Ok(RAYON.join(
117            || materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
118            || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
119        )),
120    }
121
122    #[cfg(not(feature = "chunked_ids"))]
123    if requires_ordering {
124        Ok(maintain_order_idx(
125            &left,
126            &right,
127            left_idx.as_slice(),
128            right_idx.as_slice(),
129            args,
130        ))
131    } else {
132        Ok(RAYON.join(
133            || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
134            || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
135        ))
136    }
137}
138
139fn maintain_order_idx(
140    left: &DataFrame,
141    other: &DataFrame,
142    left_idx: &[IdxSize],
143    right_idx: &[NullableIdxSize],
144    args: &JoinArgs,
145) -> (DataFrame, DataFrame) {
146    let mut df = {
147        // SAFETY: left_idx and right_idx are continuous memory that outlive the memory mapped slices
148        let left = unsafe { IdxCa::mmap_slice("a".into(), left_idx) };
149        let right = unsafe { IdxCa::mmap_slice("b".into(), bytemuck::cast_slice(right_idx)) };
150        unsafe {
151            DataFrame::new_unchecked(
152                left_idx.len(),
153                vec![left.into_series().into(), right.into_series().into()],
154            )
155        }
156    };
157
158    let options = SortMultipleOptions::new()
159        .with_order_descending(false)
160        .with_maintain_order(true);
161
162    let columns = match args.maintain_order {
163        // If the left order is preserved then there are no unsorted right rows
164        // So Left and LeftRight are equal
165        MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
166        MaintainOrderJoin::Right => vec!["b"],
167        MaintainOrderJoin::RightLeft => vec!["b", "a"],
168        _ => unreachable!(),
169    };
170
171    df.sort_in_place(columns, options).unwrap();
172    df.rechunk_mut();
173
174    let join_tuples_left = df
175        .column("a")
176        .unwrap()
177        .as_materialized_series()
178        .idx()
179        .unwrap()
180        .cont_slice()
181        .unwrap();
182
183    let join_tuples_right = df
184        .column("b")
185        .unwrap()
186        .as_materialized_series()
187        .idx()
188        .unwrap()
189        .cont_slice()
190        .unwrap();
191
192    RAYON.join(
193        || materialize_left_join_idx_left(left, join_tuples_left, args),
194        || materialize_left_join_idx_right(other, bytemuck::cast_slice(join_tuples_right), args),
195    )
196}
197
198fn materialize_left_join_idx_left(
199    left: &DataFrame,
200    left_idx: &[IdxSize],
201    args: &JoinArgs,
202) -> DataFrame {
203    let left_idx = if let Some((offset, len)) = args.slice {
204        slice_slice(left_idx, offset, len)
205    } else {
206        left_idx
207    };
208
209    unsafe {
210        left._create_left_df_from_slice(
211            left_idx,
212            true,
213            args.slice.is_some(),
214            matches!(
215                args.maintain_order,
216                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
217            ) || args.how == JoinType::Left
218                && !matches!(
219                    args.maintain_order,
220                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft,
221                ),
222        )
223    }
224}
225
226fn materialize_left_join_idx_right(
227    right: &DataFrame,
228    right_idx: &[NullableIdxSize],
229    args: &JoinArgs,
230) -> DataFrame {
231    let right_idx = if let Some((offset, len)) = args.slice {
232        slice_slice(right_idx, offset, len)
233    } else {
234        right_idx
235    };
236    unsafe { IdxCa::with_nullable_idx(right_idx, |idx| right.take_unchecked(idx)) }
237}
238#[cfg(feature = "chunked_ids")]
239fn materialize_left_join_chunked_left(
240    left: &DataFrame,
241    left_idx: &[ChunkId],
242    args: &JoinArgs,
243) -> DataFrame {
244    let left_idx = if let Some((offset, len)) = args.slice {
245        slice_slice(left_idx, offset, len)
246    } else {
247        left_idx
248    };
249    unsafe { left.create_left_df_chunked(left_idx, true, args.slice.is_some()) }
250}
251
252#[cfg(feature = "chunked_ids")]
253fn materialize_left_join_chunked_right(
254    right: &DataFrame,
255    right_idx: &[ChunkId],
256    args: &JoinArgs,
257) -> DataFrame {
258    let right_idx = if let Some((offset, len)) = args.slice {
259        slice_slice(right_idx, offset, len)
260    } else {
261        right_idx
262    };
263    unsafe { right._take_opt_chunked_unchecked_hor_par(right_idx) }
264}