polars_ops/frame/join/
dispatch_left_right.rs

1use super::*;
2use crate::prelude::*;
3
4pub(super) fn left_join_from_series(
5    left: DataFrame,
6    right: &DataFrame,
7    s_left: &Series,
8    s_right: &Series,
9    args: JoinArgs,
10    verbose: bool,
11    drop_names: Option<Vec<PlSmallStr>>,
12) -> PolarsResult<DataFrame> {
13    let (df_left, df_right) = materialize_left_join_from_series(
14        left, right, s_left, s_right, &args, verbose, drop_names,
15    )?;
16    _finish_join(df_left, df_right, args.suffix)
17}
18
19pub(super) fn right_join_from_series(
20    left: &DataFrame,
21    right: DataFrame,
22    s_left: &Series,
23    s_right: &Series,
24    mut args: JoinArgs,
25    verbose: bool,
26    drop_names: Option<Vec<PlSmallStr>>,
27) -> PolarsResult<DataFrame> {
28    // Swap the order of tables to do a right join.
29    args.maintain_order = args.maintain_order.flip();
30    let (df_right, df_left) = materialize_left_join_from_series(
31        right, left, s_right, s_left, &args, verbose, drop_names,
32    )?;
33    _finish_join(df_left, df_right, args.suffix)
34}
35
36pub fn materialize_left_join_from_series(
37    mut left: DataFrame,
38    right_: &DataFrame,
39    s_left: &Series,
40    s_right: &Series,
41    args: &JoinArgs,
42    verbose: bool,
43    drop_names: Option<Vec<PlSmallStr>>,
44) -> PolarsResult<(DataFrame, DataFrame)> {
45    #[cfg(feature = "dtype-categorical")]
46    _check_categorical_src(s_left.dtype(), s_right.dtype())?;
47
48    let mut s_left = s_left.clone();
49    // Eagerly limit left if possible.
50    if let Some((offset, len)) = args.slice {
51        if offset == 0 {
52            left = left.slice(0, len);
53            s_left = s_left.slice(0, len);
54        }
55    }
56
57    // Ensure that the chunks are aligned otherwise we go OOB.
58    let mut right = Cow::Borrowed(right_);
59    let mut s_right = s_right.clone();
60    if left.should_rechunk() {
61        left.as_single_chunk_par();
62        s_left = s_left.rechunk();
63    }
64    if right.should_rechunk() {
65        let mut other = right_.clone();
66        other.as_single_chunk_par();
67        right = Cow::Owned(other);
68        s_right = s_right.rechunk();
69    }
70
71    // The current sort_or_hash_left implementation preserves the Left DataFrame order so skip left for now.
72    let requires_ordering = matches!(
73        args.maintain_order,
74        MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft
75    );
76    if requires_ordering {
77        // When ordering we rechunk the series so we don't get ChunkIds as output
78        s_left = s_left.rechunk();
79        s_right = s_right.rechunk();
80    }
81
82    let (left_idx, right_idx) = sort_or_hash_left(
83        &s_left,
84        &s_right,
85        verbose,
86        args.validation,
87        args.nulls_equal,
88    )?;
89
90    let right = if let Some(drop_names) = drop_names {
91        right.drop_many(drop_names)
92    } else {
93        right.drop(s_right.name()).unwrap()
94    };
95    try_raise_keyboard_interrupt();
96
97    #[cfg(feature = "chunked_ids")]
98    match (left_idx, right_idx) {
99        (ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Left(right_idx)) => {
100            if requires_ordering {
101                Ok(maintain_order_idx(
102                    &left,
103                    &right,
104                    left_idx.as_slice(),
105                    right_idx.as_slice(),
106                    args,
107                ))
108            } else {
109                Ok(POOL.join(
110                    || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
111                    || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
112                ))
113            }
114        },
115        (ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
116            || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
117            || materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
118        )),
119        (ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
120            || materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
121            || materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
122        )),
123        (ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Left(right_idx)) => Ok(POOL.join(
124            || materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
125            || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
126        )),
127    }
128
129    #[cfg(not(feature = "chunked_ids"))]
130    if requires_ordering {
131        Ok(maintain_order_idx(
132            &left,
133            &right,
134            left_idx.as_slice(),
135            right_idx.as_slice(),
136            args,
137        ))
138    } else {
139        Ok(POOL.join(
140            || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
141            || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
142        ))
143    }
144}
145
146fn maintain_order_idx(
147    left: &DataFrame,
148    other: &DataFrame,
149    left_idx: &[IdxSize],
150    right_idx: &[NullableIdxSize],
151    args: &JoinArgs,
152) -> (DataFrame, DataFrame) {
153    let mut df = {
154        // SAFETY: left_idx and right_idx are continuous memory that outlive the memory mapped slices
155        let left = unsafe { IdxCa::mmap_slice("a".into(), left_idx) };
156        let right = unsafe { IdxCa::mmap_slice("b".into(), bytemuck::cast_slice(right_idx)) };
157        DataFrame::new(vec![left.into_series().into(), right.into_series().into()]).unwrap()
158    };
159
160    let options = SortMultipleOptions::new()
161        .with_order_descending(false)
162        .with_maintain_order(true);
163
164    let columns = match args.maintain_order {
165        // If the left order is preserved then there are no unsorted right rows
166        // So Left and LeftRight are equal
167        MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
168        MaintainOrderJoin::Right => vec!["b"],
169        MaintainOrderJoin::RightLeft => vec!["b", "a"],
170        _ => unreachable!(),
171    };
172
173    df.sort_in_place(columns, options).unwrap();
174    df.rechunk_mut();
175
176    let join_tuples_left = df
177        .column("a")
178        .unwrap()
179        .as_materialized_series()
180        .idx()
181        .unwrap()
182        .cont_slice()
183        .unwrap();
184
185    let join_tuples_right = df
186        .column("b")
187        .unwrap()
188        .as_materialized_series()
189        .idx()
190        .unwrap()
191        .cont_slice()
192        .unwrap();
193
194    POOL.join(
195        || materialize_left_join_idx_left(left, join_tuples_left, args),
196        || materialize_left_join_idx_right(other, bytemuck::cast_slice(join_tuples_right), args),
197    )
198}
199
200fn materialize_left_join_idx_left(
201    left: &DataFrame,
202    left_idx: &[IdxSize],
203    args: &JoinArgs,
204) -> DataFrame {
205    let left_idx = if let Some((offset, len)) = args.slice {
206        slice_slice(left_idx, offset, len)
207    } else {
208        left_idx
209    };
210
211    unsafe {
212        left._create_left_df_from_slice(
213            left_idx,
214            true,
215            args.slice.is_some(),
216            matches!(
217                args.maintain_order,
218                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
219            ) || args.how == JoinType::Left
220                && !matches!(
221                    args.maintain_order,
222                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft,
223                ),
224        )
225    }
226}
227
228fn materialize_left_join_idx_right(
229    right: &DataFrame,
230    right_idx: &[NullableIdxSize],
231    args: &JoinArgs,
232) -> DataFrame {
233    let right_idx = if let Some((offset, len)) = args.slice {
234        slice_slice(right_idx, offset, len)
235    } else {
236        right_idx
237    };
238    unsafe { IdxCa::with_nullable_idx(right_idx, |idx| right.take_unchecked(idx)) }
239}
240#[cfg(feature = "chunked_ids")]
241fn materialize_left_join_chunked_left(
242    left: &DataFrame,
243    left_idx: &[ChunkId],
244    args: &JoinArgs,
245) -> DataFrame {
246    let left_idx = if let Some((offset, len)) = args.slice {
247        slice_slice(left_idx, offset, len)
248    } else {
249        left_idx
250    };
251    unsafe { left.create_left_df_chunked(left_idx, true, args.slice.is_some()) }
252}
253
254#[cfg(feature = "chunked_ids")]
255fn materialize_left_join_chunked_right(
256    right: &DataFrame,
257    right_idx: &[ChunkId],
258    args: &JoinArgs,
259) -> DataFrame {
260    let right_idx = if let Some((offset, len)) = args.slice {
261        slice_slice(right_idx, offset, len)
262    } else {
263        right_idx
264    };
265    unsafe { right._take_opt_chunked_unchecked_hor_par(right_idx) }
266}