polars_ops/frame/join/
mod.rs

1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4#[cfg(feature = "dtype-categorical")]
5mod checks;
6mod cross_join;
7mod dispatch_left_right;
8mod general;
9mod hash_join;
10#[cfg(feature = "iejoin")]
11mod iejoin;
12#[cfg(feature = "merge_sorted")]
13mod merge_sorted;
14
15use std::borrow::Cow;
16use std::fmt::{Debug, Display, Formatter};
17use std::hash::Hash;
18
19pub use args::*;
20use arrow::trusted_len::TrustedLen;
21#[cfg(feature = "asof_join")]
22pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
23#[cfg(feature = "dtype-categorical")]
24pub(crate) use checks::*;
25pub use cross_join::CrossJoin;
26#[cfg(feature = "chunked_ids")]
27use either::Either;
28#[cfg(feature = "chunked_ids")]
29use general::create_chunked_index_mapping;
30pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
31pub use hash_join::*;
32use hashbrown::hash_map::{Entry, RawEntryMut};
33#[cfg(feature = "iejoin")]
34pub use iejoin::{IEJoinOptions, InequalityOperator};
35#[cfg(feature = "merge_sorted")]
36pub use merge_sorted::_merge_sorted_dfs;
37use polars_core::POOL;
38#[allow(unused_imports)]
39use polars_core::chunked_array::ops::row_encode::{
40    encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
41};
42use polars_core::hashing::_HASHMAP_INIT_SIZE;
43use polars_core::prelude::*;
44pub(super) use polars_core::series::IsSorted;
45use polars_core::utils::slice_offsets;
46#[allow(unused_imports)]
47use polars_core::utils::slice_slice;
48use polars_utils::hashing::BytesHash;
49use rayon::prelude::*;
50
51use self::cross_join::fused_cross_filter;
52use super::IntoDf;
53
54pub trait DataFrameJoinOps: IntoDf {
55    /// Generic join method. Can be used to join on multiple columns.
56    ///
57    /// # Example
58    ///
59    /// ```no_run
60    /// # use polars_core::prelude::*;
61    /// # use polars_ops::prelude::*;
62    /// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
63    ///                          "Phosphorus (mg/100g)" => &[11, 22, 12])?;
64    /// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
65    ///                          "Potassium (mg/100g)" => &[107, 358, 115])?;
66    ///
67    /// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
68    /// None)?;
69    /// assert_eq!(df3.shape(), (3, 3));
70    /// println!("{}", df3);
71    /// # Ok::<(), PolarsError>(())
72    /// ```
73    ///
74    /// Output:
75    ///
76    /// ```text
77    /// shape: (3, 3)
78    /// +--------+----------------------+---------------------+
79    /// | Fruit  | Phosphorus (mg/100g) | Potassium (mg/100g) |
80    /// | ---    | ---                  | ---                 |
81    /// | str    | i32                  | i32                 |
82    /// +========+======================+=====================+
83    /// | Apple  | 11                   | 107                 |
84    /// +--------+----------------------+---------------------+
85    /// | Banana | 22                   | 358                 |
86    /// +--------+----------------------+---------------------+
87    /// | Pear   | 12                   | 115                 |
88    /// +--------+----------------------+---------------------+
89    /// ```
90    fn join(
91        &self,
92        other: &DataFrame,
93        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
94        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
95        args: JoinArgs,
96        options: Option<JoinTypeOptions>,
97    ) -> PolarsResult<DataFrame> {
98        let df_left = self.to_df();
99        let selected_left = df_left.select_columns(left_on)?;
100        let selected_right = other.select_columns(right_on)?;
101
102        let selected_left = selected_left
103            .into_iter()
104            .map(Column::take_materialized_series)
105            .collect::<Vec<_>>();
106        let selected_right = selected_right
107            .into_iter()
108            .map(Column::take_materialized_series)
109            .collect::<Vec<_>>();
110
111        self._join_impl(
112            other,
113            selected_left,
114            selected_right,
115            args,
116            options,
117            true,
118            false,
119        )
120    }
121
122    #[doc(hidden)]
123    #[allow(clippy::too_many_arguments)]
124    #[allow(unused_mut)]
125    fn _join_impl(
126        &self,
127        other: &DataFrame,
128        mut selected_left: Vec<Series>,
129        mut selected_right: Vec<Series>,
130        mut args: JoinArgs,
131        options: Option<JoinTypeOptions>,
132        _check_rechunk: bool,
133        _verbose: bool,
134    ) -> PolarsResult<DataFrame> {
135        let left_df = self.to_df();
136
137        #[cfg(feature = "cross_join")]
138        if let JoinType::Cross = args.how {
139            if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
140                assert!(args.slice.is_none());
141                return fused_cross_filter(left_df, other, args.suffix.clone(), cross_options);
142            }
143            return left_df.cross_join(other, args.suffix.clone(), args.slice);
144        }
145
146        // Clear literals if a frame is empty. Otherwise we could get an oob
147        fn clear(s: &mut [Series]) {
148            for s in s.iter_mut() {
149                if s.len() == 1 {
150                    *s = s.clear()
151                }
152            }
153        }
154        if left_df.is_empty() {
155            clear(&mut selected_left);
156        }
157        if other.is_empty() {
158            clear(&mut selected_right);
159        }
160
161        let should_coalesce = args.should_coalesce();
162        assert_eq!(selected_left.len(), selected_right.len());
163
164        #[cfg(feature = "chunked_ids")]
165        {
166            // a left join create chunked-ids
167            // the others not yet.
168            // TODO! change this to other join types once they support chunked-id joins
169            if _check_rechunk
170                && !(matches!(args.how, JoinType::Left)
171                    || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
172            {
173                let mut left = Cow::Borrowed(left_df);
174                let mut right = Cow::Borrowed(other);
175                if left_df.should_rechunk() {
176                    if _verbose {
177                        eprintln!(
178                            "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
179                            args.how,
180                            left_df.width()
181                        );
182                    }
183
184                    let mut tmp_left = left_df.clone();
185                    tmp_left.as_single_chunk_par();
186                    left = Cow::Owned(tmp_left);
187                }
188                if other.should_rechunk() {
189                    if _verbose {
190                        eprintln!(
191                            "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
192                            args.how,
193                            other.width()
194                        );
195                    }
196                    let mut tmp_right = other.clone();
197                    tmp_right.as_single_chunk_par();
198                    right = Cow::Owned(tmp_right);
199                }
200                return left._join_impl(
201                    &right,
202                    selected_left,
203                    selected_right,
204                    args,
205                    options,
206                    false,
207                    _verbose,
208                );
209            }
210        }
211
212        if let Some((l, r)) = selected_left
213            .iter()
214            .zip(&selected_right)
215            .find(|(l, r)| l.dtype() != r.dtype())
216        {
217            polars_bail!(
218                ComputeError:
219                    format!(
220                        "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
221                        l.name(), l.dtype(), r.name(), r.dtype()
222                    )
223            );
224        };
225
226        #[cfg(feature = "dtype-categorical")]
227        for (l, r) in selected_left.iter_mut().zip(selected_right.iter_mut()) {
228            match _check_categorical_src(l.dtype(), r.dtype()) {
229                Ok(_) => {},
230                Err(_) => {
231                    let (ca_left, ca_right) =
232                        make_categoricals_compatible(l.categorical()?, r.categorical()?)?;
233                    *l = ca_left.into_series().with_name(l.name().clone());
234                    *r = ca_right.into_series().with_name(r.name().clone());
235                },
236            }
237        }
238
239        #[cfg(feature = "iejoin")]
240        if let JoinType::IEJoin = args.how {
241            let Some(JoinTypeOptions::IEJoin(options)) = options else {
242                unreachable!()
243            };
244            let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
245            {
246                iejoin::iejoin_par
247            } else {
248                iejoin::iejoin
249            };
250            return func(
251                left_df,
252                other,
253                selected_left,
254                selected_right,
255                &options,
256                args.suffix,
257                args.slice,
258            );
259        }
260
261        // Single keys.
262        if selected_left.len() == 1 {
263            let s_left = &selected_left[0];
264            let s_right = &selected_right[0];
265            let drop_names: Option<Vec<PlSmallStr>> =
266                if should_coalesce { None } else { Some(vec![]) };
267            return match args.how {
268                JoinType::Inner => left_df
269                    ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
270                JoinType::Left => dispatch_left_right::left_join_from_series(
271                    self.to_df().clone(),
272                    other,
273                    s_left,
274                    s_right,
275                    args,
276                    _verbose,
277                    drop_names,
278                ),
279                JoinType::Right => dispatch_left_right::right_join_from_series(
280                    self.to_df(),
281                    other.clone(),
282                    s_left,
283                    s_right,
284                    args,
285                    _verbose,
286                    drop_names,
287                ),
288                JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
289                #[cfg(feature = "semi_anti_join")]
290                JoinType::Anti => left_df._semi_anti_join_from_series(
291                    s_left,
292                    s_right,
293                    args.slice,
294                    true,
295                    args.nulls_equal,
296                ),
297                #[cfg(feature = "semi_anti_join")]
298                JoinType::Semi => left_df._semi_anti_join_from_series(
299                    s_left,
300                    s_right,
301                    args.slice,
302                    false,
303                    args.nulls_equal,
304                ),
305                #[cfg(feature = "asof_join")]
306                JoinType::AsOf(options) => match (options.left_by, options.right_by) {
307                    (Some(left_by), Some(right_by)) => left_df._join_asof_by(
308                        other,
309                        s_left,
310                        s_right,
311                        left_by,
312                        right_by,
313                        options.strategy,
314                        options.tolerance,
315                        args.suffix.clone(),
316                        args.slice,
317                        should_coalesce,
318                        options.allow_eq,
319                        options.check_sortedness,
320                    ),
321                    (None, None) => left_df._join_asof(
322                        other,
323                        s_left,
324                        s_right,
325                        options.strategy,
326                        options.tolerance,
327                        args.suffix,
328                        args.slice,
329                        should_coalesce,
330                        options.allow_eq,
331                        options.check_sortedness,
332                    ),
333                    _ => {
334                        panic!("expected by arguments on both sides")
335                    },
336                },
337                #[cfg(feature = "iejoin")]
338                JoinType::IEJoin => {
339                    unreachable!()
340                },
341                JoinType::Cross => {
342                    unreachable!()
343                },
344            };
345        }
346
347        let lhs_keys = prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series();
348        let rhs_keys = prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series();
349
350        let drop_names = if should_coalesce {
351            if args.how == JoinType::Right {
352                selected_left
353                    .iter()
354                    .map(|s| s.name().clone())
355                    .collect::<Vec<_>>()
356            } else {
357                selected_right
358                    .iter()
359                    .map(|s| s.name().clone())
360                    .collect::<Vec<_>>()
361            }
362        } else {
363            vec![]
364        };
365
366        // Multiple keys.
367        match args.how {
368            #[cfg(feature = "asof_join")]
369            JoinType::AsOf(_) => polars_bail!(
370                ComputeError: "asof join not supported for join on multiple keys"
371            ),
372            #[cfg(feature = "iejoin")]
373            JoinType::IEJoin => {
374                unreachable!()
375            },
376            JoinType::Cross => {
377                unreachable!()
378            },
379            JoinType::Full => {
380                let names_left = selected_left
381                    .iter()
382                    .map(|s| s.name().clone())
383                    .collect::<Vec<_>>();
384                args.coalesce = JoinCoalesce::KeepColumns;
385                let suffix = args.suffix.clone();
386                let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
387
388                if should_coalesce {
389                    Ok(_coalesce_full_join(
390                        out?,
391                        names_left.as_slice(),
392                        drop_names.as_slice(),
393                        suffix.clone(),
394                        left_df,
395                    ))
396                } else {
397                    out
398                }
399            },
400            JoinType::Inner => left_df._inner_join_from_series(
401                other,
402                &lhs_keys,
403                &rhs_keys,
404                args,
405                _verbose,
406                Some(drop_names),
407            ),
408            JoinType::Left => dispatch_left_right::left_join_from_series(
409                left_df.clone(),
410                other,
411                &lhs_keys,
412                &rhs_keys,
413                args,
414                _verbose,
415                Some(drop_names),
416            ),
417            JoinType::Right => dispatch_left_right::right_join_from_series(
418                left_df,
419                other.clone(),
420                &lhs_keys,
421                &rhs_keys,
422                args,
423                _verbose,
424                Some(drop_names),
425            ),
426            #[cfg(feature = "semi_anti_join")]
427            JoinType::Anti | JoinType::Semi => self._join_impl(
428                other,
429                vec![lhs_keys],
430                vec![rhs_keys],
431                args,
432                options,
433                _check_rechunk,
434                _verbose,
435            ),
436        }
437    }
438
439    /// Perform an inner join on two DataFrames.
440    ///
441    /// # Example
442    ///
443    /// ```
444    /// # use polars_core::prelude::*;
445    /// # use polars_ops::prelude::*;
446    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
447    ///     left.inner_join(right, ["join_column_left"], ["join_column_right"])
448    /// }
449    /// ```
450    fn inner_join(
451        &self,
452        other: &DataFrame,
453        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
454        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
455    ) -> PolarsResult<DataFrame> {
456        self.join(
457            other,
458            left_on,
459            right_on,
460            JoinArgs::new(JoinType::Inner),
461            None,
462        )
463    }
464
465    /// Perform a left outer join on two DataFrames
466    /// # Example
467    ///
468    /// ```no_run
469    /// # use polars_core::prelude::*;
470    /// # use polars_ops::prelude::*;
471    /// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
472    /// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
473    ///                          "Wavelength nm" => &[480.0, 577.0, 650.0])?;
474    ///
475    /// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
476    /// println!("{:?}", df3);
477    /// # Ok::<(), PolarsError>(())
478    /// ```
479    ///
480    /// Output:
481    ///
482    /// ```text
483    /// shape: (5, 2)
484    /// +-----------------+--------+
485    /// | Wavelength (nm) | Color  |
486    /// | ---             | ---    |
487    /// | f64             | str    |
488    /// +=================+========+
489    /// | 480             | Blue   |
490    /// +-----------------+--------+
491    /// | 650             | Red    |
492    /// +-----------------+--------+
493    /// | 577             | Yellow |
494    /// +-----------------+--------+
495    /// | 1201            | null   |
496    /// +-----------------+--------+
497    /// | 100             | null   |
498    /// +-----------------+--------+
499    /// ```
500    fn left_join(
501        &self,
502        other: &DataFrame,
503        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
504        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
505    ) -> PolarsResult<DataFrame> {
506        self.join(
507            other,
508            left_on,
509            right_on,
510            JoinArgs::new(JoinType::Left),
511            None,
512        )
513    }
514
515    /// Perform a full outer join on two DataFrames
516    /// # Example
517    ///
518    /// ```
519    /// # use polars_core::prelude::*;
520    /// # use polars_ops::prelude::*;
521    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
522    ///     left.full_join(right, ["join_column_left"], ["join_column_right"])
523    /// }
524    /// ```
525    fn full_join(
526        &self,
527        other: &DataFrame,
528        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
529        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
530    ) -> PolarsResult<DataFrame> {
531        self.join(
532            other,
533            left_on,
534            right_on,
535            JoinArgs::new(JoinType::Full),
536            None,
537        )
538    }
539}
540
541trait DataFrameJoinOpsPrivate: IntoDf {
542    fn _inner_join_from_series(
543        &self,
544        other: &DataFrame,
545        s_left: &Series,
546        s_right: &Series,
547        args: JoinArgs,
548        verbose: bool,
549        drop_names: Option<Vec<PlSmallStr>>,
550    ) -> PolarsResult<DataFrame> {
551        let left_df = self.to_df();
552        #[cfg(feature = "dtype-categorical")]
553        _check_categorical_src(s_left.dtype(), s_right.dtype())?;
554        let ((join_tuples_left, join_tuples_right), sorted) =
555            _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
556
557        let mut join_tuples_left = &*join_tuples_left;
558        let mut join_tuples_right = &*join_tuples_right;
559
560        if let Some((offset, len)) = args.slice {
561            join_tuples_left = slice_slice(join_tuples_left, offset, len);
562            join_tuples_right = slice_slice(join_tuples_right, offset, len);
563        }
564
565        let other = if let Some(drop_names) = drop_names {
566            other.drop_many(drop_names)
567        } else {
568            other.drop(s_right.name()).unwrap()
569        };
570
571        let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
572        if sorted {
573            left.set_sorted_flag(IsSorted::Ascending);
574        }
575        let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
576
577        let already_left_sorted = sorted
578            && matches!(
579                args.maintain_order,
580                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
581            );
582        try_raise_keyboard_interrupt();
583        let (df_left, df_right) =
584            if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
585                let mut df =
586                    DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?;
587
588                let columns = match args.maintain_order {
589                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
590                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
591                    _ => unreachable!(),
592                };
593
594                let options = SortMultipleOptions::new()
595                    .with_order_descending(false)
596                    .with_maintain_order(true);
597
598                df.sort_in_place(columns, options)?;
599
600                let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap();
601                if matches!(
602                    args.maintain_order,
603                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
604                ) {
605                    a.set_sorted_flag(IsSorted::Ascending);
606                }
607
608                POOL.join(
609                    // SAFETY: join indices are known to be in bounds
610                    || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
611                    || unsafe { other.take_unchecked(b.idx().unwrap()) },
612                )
613            } else {
614                POOL.join(
615                    // SAFETY: join indices are known to be in bounds
616                    || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
617                    || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
618                )
619            };
620
621        _finish_join(df_left, df_right, args.suffix.clone())
622    }
623}
624
625impl DataFrameJoinOps for DataFrame {}
626impl DataFrameJoinOpsPrivate for DataFrame {}
627
628fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
629    let keys = s
630        .iter()
631        .map(|s| {
632            let phys = s.to_physical_repr();
633            match phys.dtype() {
634                DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
635                DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
636                _ => phys.into_owned().into_column(),
637            }
638        })
639        .collect::<Vec<_>>();
640
641    if nulls_equal {
642        encode_rows_vertical_par_unordered(&keys)
643    } else {
644        encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
645    }
646}
647pub fn private_left_join_multiple_keys(
648    a: &DataFrame,
649    b: &DataFrame,
650    nulls_equal: bool,
651) -> PolarsResult<LeftJoinIds> {
652    // @scalar-opt
653    let a_cols = a
654        .get_columns()
655        .iter()
656        .map(|c| c.as_materialized_series().clone())
657        .collect::<Vec<_>>();
658    let b_cols = b
659        .get_columns()
660        .iter()
661        .map(|c| c.as_materialized_series().clone())
662        .collect::<Vec<_>>();
663
664    let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
665    let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
666    sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
667}