polars_ops/frame/join/
dispatch_left_right.rs

1use super::*;
2use crate::prelude::*;
3
4pub(super) fn left_join_from_series(
5    left: DataFrame,
6    right: &DataFrame,
7    s_left: &Series,
8    s_right: &Series,
9    args: JoinArgs,
10    verbose: bool,
11    drop_names: Option<Vec<PlSmallStr>>,
12) -> PolarsResult<DataFrame> {
13    let (df_left, df_right) = materialize_left_join_from_series(
14        left, right, s_left, s_right, &args, verbose, drop_names,
15    )?;
16    _finish_join(df_left, df_right, args.suffix)
17}
18
19pub(super) fn right_join_from_series(
20    left: &DataFrame,
21    right: DataFrame,
22    s_left: &Series,
23    s_right: &Series,
24    mut args: JoinArgs,
25    verbose: bool,
26    drop_names: Option<Vec<PlSmallStr>>,
27) -> PolarsResult<DataFrame> {
28    // Swap the order of tables to do a right join.
29    args.maintain_order = args.maintain_order.flip();
30    let (df_right, df_left) = materialize_left_join_from_series(
31        right, left, s_right, s_left, &args, verbose, drop_names,
32    )?;
33    _finish_join(df_left, df_right, args.suffix)
34}
35
36pub fn materialize_left_join_from_series(
37    mut left: DataFrame,
38    right_: &DataFrame,
39    s_left: &Series,
40    s_right: &Series,
41    args: &JoinArgs,
42    verbose: bool,
43    drop_names: Option<Vec<PlSmallStr>>,
44) -> PolarsResult<(DataFrame, DataFrame)> {
45    let mut s_left = s_left.clone();
46    // Eagerly limit left if possible.
47    if let Some((offset, len)) = args.slice {
48        if offset == 0 {
49            left = left.slice(0, len);
50            s_left = s_left.slice(0, len);
51        }
52    }
53
54    // Ensure that the chunks are aligned otherwise we go OOB.
55    let mut right = Cow::Borrowed(right_);
56    let mut s_right = s_right.clone();
57    if left.should_rechunk() {
58        left.as_single_chunk_par();
59        s_left = s_left.rechunk();
60    }
61    if right.should_rechunk() {
62        let mut other = right_.clone();
63        other.as_single_chunk_par();
64        right = Cow::Owned(other);
65        s_right = s_right.rechunk();
66    }
67
68    // The current sort_or_hash_left implementation preserves the Left DataFrame order so skip left for now.
69    let requires_ordering = matches!(
70        args.maintain_order,
71        MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft
72    );
73    if requires_ordering {
74        // When ordering we rechunk the series so we don't get ChunkIds as output
75        s_left = s_left.rechunk();
76        s_right = s_right.rechunk();
77    }
78
79    let (left_idx, right_idx) = sort_or_hash_left(
80        &s_left,
81        &s_right,
82        verbose,
83        args.validation,
84        args.nulls_equal,
85    )?;
86
87    let right = if let Some(drop_names) = drop_names {
88        right.drop_many(drop_names)
89    } else {
90        right.drop(s_right.name()).unwrap()
91    };
92    try_raise_keyboard_interrupt();
93
94    #[cfg(feature = "chunked_ids")]
95    match (left_idx, right_idx) {
96        (ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Left(right_idx)) => {
97            if requires_ordering {
98                Ok(maintain_order_idx(
99                    &left,
100                    &right,
101                    left_idx.as_slice(),
102                    right_idx.as_slice(),
103                    args,
104                ))
105            } else {
106                Ok(POOL.join(
107                    || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
108                    || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
109                ))
110            }
111        },
112        (ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
113            || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
114            || materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
115        )),
116        (ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
117            || materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
118            || materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
119        )),
120        (ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Left(right_idx)) => Ok(POOL.join(
121            || materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
122            || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
123        )),
124    }
125
126    #[cfg(not(feature = "chunked_ids"))]
127    if requires_ordering {
128        Ok(maintain_order_idx(
129            &left,
130            &right,
131            left_idx.as_slice(),
132            right_idx.as_slice(),
133            args,
134        ))
135    } else {
136        Ok(POOL.join(
137            || materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
138            || materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
139        ))
140    }
141}
142
143fn maintain_order_idx(
144    left: &DataFrame,
145    other: &DataFrame,
146    left_idx: &[IdxSize],
147    right_idx: &[NullableIdxSize],
148    args: &JoinArgs,
149) -> (DataFrame, DataFrame) {
150    let mut df = {
151        // SAFETY: left_idx and right_idx are continuous memory that outlive the memory mapped slices
152        let left = unsafe { IdxCa::mmap_slice("a".into(), left_idx) };
153        let right = unsafe { IdxCa::mmap_slice("b".into(), bytemuck::cast_slice(right_idx)) };
154        DataFrame::new(vec![left.into_series().into(), right.into_series().into()]).unwrap()
155    };
156
157    let options = SortMultipleOptions::new()
158        .with_order_descending(false)
159        .with_maintain_order(true);
160
161    let columns = match args.maintain_order {
162        // If the left order is preserved then there are no unsorted right rows
163        // So Left and LeftRight are equal
164        MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
165        MaintainOrderJoin::Right => vec!["b"],
166        MaintainOrderJoin::RightLeft => vec!["b", "a"],
167        _ => unreachable!(),
168    };
169
170    df.sort_in_place(columns, options).unwrap();
171    df.rechunk_mut();
172
173    let join_tuples_left = df
174        .column("a")
175        .unwrap()
176        .as_materialized_series()
177        .idx()
178        .unwrap()
179        .cont_slice()
180        .unwrap();
181
182    let join_tuples_right = df
183        .column("b")
184        .unwrap()
185        .as_materialized_series()
186        .idx()
187        .unwrap()
188        .cont_slice()
189        .unwrap();
190
191    POOL.join(
192        || materialize_left_join_idx_left(left, join_tuples_left, args),
193        || materialize_left_join_idx_right(other, bytemuck::cast_slice(join_tuples_right), args),
194    )
195}
196
197fn materialize_left_join_idx_left(
198    left: &DataFrame,
199    left_idx: &[IdxSize],
200    args: &JoinArgs,
201) -> DataFrame {
202    let left_idx = if let Some((offset, len)) = args.slice {
203        slice_slice(left_idx, offset, len)
204    } else {
205        left_idx
206    };
207
208    unsafe {
209        left._create_left_df_from_slice(
210            left_idx,
211            true,
212            args.slice.is_some(),
213            matches!(
214                args.maintain_order,
215                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
216            ) || args.how == JoinType::Left
217                && !matches!(
218                    args.maintain_order,
219                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft,
220                ),
221        )
222    }
223}
224
225fn materialize_left_join_idx_right(
226    right: &DataFrame,
227    right_idx: &[NullableIdxSize],
228    args: &JoinArgs,
229) -> DataFrame {
230    let right_idx = if let Some((offset, len)) = args.slice {
231        slice_slice(right_idx, offset, len)
232    } else {
233        right_idx
234    };
235    unsafe { IdxCa::with_nullable_idx(right_idx, |idx| right.take_unchecked(idx)) }
236}
237#[cfg(feature = "chunked_ids")]
238fn materialize_left_join_chunked_left(
239    left: &DataFrame,
240    left_idx: &[ChunkId],
241    args: &JoinArgs,
242) -> DataFrame {
243    let left_idx = if let Some((offset, len)) = args.slice {
244        slice_slice(left_idx, offset, len)
245    } else {
246        left_idx
247    };
248    unsafe { left.create_left_df_chunked(left_idx, true, args.slice.is_some()) }
249}
250
251#[cfg(feature = "chunked_ids")]
252fn materialize_left_join_chunked_right(
253    right: &DataFrame,
254    right_idx: &[ChunkId],
255    args: &JoinArgs,
256) -> DataFrame {
257    let right_idx = if let Some((offset, len)) = args.slice {
258        slice_slice(right_idx, offset, len)
259    } else {
260        right_idx
261    };
262    unsafe { right._take_opt_chunked_unchecked_hor_par(right_idx) }
263}