polars_ops/frame/join/
mod.rs

1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4#[cfg(feature = "dtype-categorical")]
5mod checks;
6mod cross_join;
7mod dispatch_left_right;
8mod general;
9mod hash_join;
10#[cfg(feature = "iejoin")]
11mod iejoin;
12#[cfg(feature = "merge_sorted")]
13mod merge_sorted;
14
15use std::borrow::Cow;
16use std::fmt::{Debug, Display, Formatter};
17use std::hash::Hash;
18
19pub use args::*;
20use arrow::trusted_len::TrustedLen;
21#[cfg(feature = "asof_join")]
22pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
23#[cfg(feature = "dtype-categorical")]
24pub(crate) use checks::*;
25pub use cross_join::CrossJoin;
26#[cfg(feature = "chunked_ids")]
27use either::Either;
28#[cfg(feature = "chunked_ids")]
29use general::create_chunked_index_mapping;
30pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
31pub use hash_join::*;
32use hashbrown::hash_map::{Entry, RawEntryMut};
33#[cfg(feature = "iejoin")]
34pub use iejoin::{IEJoinOptions, InequalityOperator};
35#[cfg(feature = "merge_sorted")]
36pub use merge_sorted::_merge_sorted_dfs;
37use polars_core::POOL;
38#[allow(unused_imports)]
39use polars_core::chunked_array::ops::row_encode::{
40    encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
41};
42use polars_core::hashing::_HASHMAP_INIT_SIZE;
43use polars_core::prelude::*;
44pub(super) use polars_core::series::IsSorted;
45use polars_core::utils::slice_offsets;
46#[allow(unused_imports)]
47use polars_core::utils::slice_slice;
48use polars_utils::hashing::BytesHash;
49use rayon::prelude::*;
50
51use self::cross_join::fused_cross_filter;
52use super::IntoDf;
53
54pub trait DataFrameJoinOps: IntoDf {
55    /// Generic join method. Can be used to join on multiple columns.
56    ///
57    /// # Example
58    ///
59    /// ```no_run
60    /// # use polars_core::prelude::*;
61    /// # use polars_ops::prelude::*;
62    /// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
63    ///                          "Phosphorus (mg/100g)" => &[11, 22, 12])?;
64    /// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
65    ///                          "Potassium (mg/100g)" => &[107, 358, 115])?;
66    ///
67    /// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
68    /// None)?;
69    /// assert_eq!(df3.shape(), (3, 3));
70    /// println!("{}", df3);
71    /// # Ok::<(), PolarsError>(())
72    /// ```
73    ///
74    /// Output:
75    ///
76    /// ```text
77    /// shape: (3, 3)
78    /// +--------+----------------------+---------------------+
79    /// | Fruit  | Phosphorus (mg/100g) | Potassium (mg/100g) |
80    /// | ---    | ---                  | ---                 |
81    /// | str    | i32                  | i32                 |
82    /// +========+======================+=====================+
83    /// | Apple  | 11                   | 107                 |
84    /// +--------+----------------------+---------------------+
85    /// | Banana | 22                   | 358                 |
86    /// +--------+----------------------+---------------------+
87    /// | Pear   | 12                   | 115                 |
88    /// +--------+----------------------+---------------------+
89    /// ```
90    fn join(
91        &self,
92        other: &DataFrame,
93        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
94        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
95        args: JoinArgs,
96        options: Option<JoinTypeOptions>,
97    ) -> PolarsResult<DataFrame> {
98        let df_left = self.to_df();
99        let selected_left = df_left.select_columns(left_on)?;
100        let selected_right = other.select_columns(right_on)?;
101
102        let selected_left = selected_left
103            .into_iter()
104            .map(Column::take_materialized_series)
105            .collect::<Vec<_>>();
106        let selected_right = selected_right
107            .into_iter()
108            .map(Column::take_materialized_series)
109            .collect::<Vec<_>>();
110
111        self._join_impl(
112            other,
113            selected_left,
114            selected_right,
115            args,
116            options,
117            true,
118            false,
119        )
120    }
121
122    #[doc(hidden)]
123    #[allow(clippy::too_many_arguments)]
124    #[allow(unused_mut)]
125    fn _join_impl(
126        &self,
127        other: &DataFrame,
128        mut selected_left: Vec<Series>,
129        mut selected_right: Vec<Series>,
130        mut args: JoinArgs,
131        options: Option<JoinTypeOptions>,
132        _check_rechunk: bool,
133        _verbose: bool,
134    ) -> PolarsResult<DataFrame> {
135        let left_df = self.to_df();
136
137        #[cfg(feature = "cross_join")]
138        if let JoinType::Cross = args.how {
139            if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
140                assert!(args.slice.is_none());
141                return fused_cross_filter(left_df, other, args.suffix.clone(), cross_options);
142            }
143            return left_df.cross_join(other, args.suffix.clone(), args.slice);
144        }
145
146        // Clear literals if a frame is empty. Otherwise we could get an oob
147        fn clear(s: &mut [Series]) {
148            for s in s.iter_mut() {
149                if s.len() == 1 {
150                    *s = s.clear()
151                }
152            }
153        }
154        if left_df.is_empty() {
155            clear(&mut selected_left);
156        }
157        if other.is_empty() {
158            clear(&mut selected_right);
159        }
160
161        let should_coalesce = args.should_coalesce();
162        assert_eq!(selected_left.len(), selected_right.len());
163
164        #[cfg(feature = "chunked_ids")]
165        {
166            // a left join create chunked-ids
167            // the others not yet.
168            // TODO! change this to other join types once they support chunked-id joins
169            if _check_rechunk
170                && !(matches!(args.how, JoinType::Left)
171                    || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
172            {
173                let mut left = Cow::Borrowed(left_df);
174                let mut right = Cow::Borrowed(other);
175                if left_df.should_rechunk() {
176                    if _verbose {
177                        eprintln!(
178                            "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
179                            args.how,
180                            left_df.width()
181                        );
182                    }
183
184                    let mut tmp_left = left_df.clone();
185                    tmp_left.as_single_chunk_par();
186                    left = Cow::Owned(tmp_left);
187                }
188                if other.should_rechunk() {
189                    if _verbose {
190                        eprintln!(
191                            "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
192                            args.how,
193                            other.width()
194                        );
195                    }
196                    let mut tmp_right = other.clone();
197                    tmp_right.as_single_chunk_par();
198                    right = Cow::Owned(tmp_right);
199                }
200                return left._join_impl(
201                    &right,
202                    selected_left,
203                    selected_right,
204                    args,
205                    options,
206                    false,
207                    _verbose,
208                );
209            }
210        }
211
212        if let Some((l, r)) = selected_left
213            .iter()
214            .zip(&selected_right)
215            .find(|(l, r)| l.dtype() != r.dtype())
216        {
217            polars_bail!(
218                ComputeError:
219                    format!(
220                        "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
221                        l.name(), l.dtype(), r.name(), r.dtype()
222                    )
223            );
224        };
225
226        #[cfg(feature = "dtype-categorical")]
227        for (l, r) in selected_left.iter_mut().zip(selected_right.iter_mut()) {
228            match _check_categorical_src(l.dtype(), r.dtype()) {
229                Ok(_) => {},
230                Err(_) => {
231                    let (ca_left, ca_right) =
232                        make_categoricals_compatible(l.categorical()?, r.categorical()?)?;
233                    *l = ca_left.into_series().with_name(l.name().clone());
234                    *r = ca_right.into_series().with_name(r.name().clone());
235                },
236            }
237        }
238
239        #[cfg(feature = "iejoin")]
240        if let JoinType::IEJoin = args.how {
241            let Some(JoinTypeOptions::IEJoin(options)) = options else {
242                unreachable!()
243            };
244            let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
245            {
246                iejoin::iejoin_par
247            } else {
248                iejoin::iejoin
249            };
250            return func(
251                left_df,
252                other,
253                selected_left,
254                selected_right,
255                &options,
256                args.suffix,
257                args.slice,
258            );
259        }
260
261        // Single keys.
262        if selected_left.len() == 1 {
263            let s_left = &selected_left[0];
264            let s_right = &selected_right[0];
265            let drop_names: Option<Vec<PlSmallStr>> =
266                if should_coalesce { None } else { Some(vec![]) };
267            return match args.how {
268                JoinType::Inner => left_df
269                    ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
270                JoinType::Left => dispatch_left_right::left_join_from_series(
271                    self.to_df().clone(),
272                    other,
273                    s_left,
274                    s_right,
275                    args,
276                    _verbose,
277                    drop_names,
278                ),
279                JoinType::Right => dispatch_left_right::right_join_from_series(
280                    self.to_df(),
281                    other.clone(),
282                    s_left,
283                    s_right,
284                    args,
285                    _verbose,
286                    drop_names,
287                ),
288                JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
289                #[cfg(feature = "semi_anti_join")]
290                JoinType::Anti => left_df._semi_anti_join_from_series(
291                    s_left,
292                    s_right,
293                    args.slice,
294                    true,
295                    args.nulls_equal,
296                ),
297                #[cfg(feature = "semi_anti_join")]
298                JoinType::Semi => left_df._semi_anti_join_from_series(
299                    s_left,
300                    s_right,
301                    args.slice,
302                    false,
303                    args.nulls_equal,
304                ),
305                #[cfg(feature = "asof_join")]
306                JoinType::AsOf(options) => match (options.left_by, options.right_by) {
307                    (Some(left_by), Some(right_by)) => left_df._join_asof_by(
308                        other,
309                        s_left,
310                        s_right,
311                        left_by,
312                        right_by,
313                        options.strategy,
314                        options.tolerance,
315                        args.suffix.clone(),
316                        args.slice,
317                        should_coalesce,
318                        options.allow_eq,
319                        options.check_sortedness,
320                    ),
321                    (None, None) => left_df._join_asof(
322                        other,
323                        s_left,
324                        s_right,
325                        options.strategy,
326                        options.tolerance,
327                        args.suffix,
328                        args.slice,
329                        should_coalesce,
330                        options.allow_eq,
331                        options.check_sortedness,
332                    ),
333                    _ => {
334                        panic!("expected by arguments on both sides")
335                    },
336                },
337                #[cfg(feature = "iejoin")]
338                JoinType::IEJoin => {
339                    unreachable!()
340                },
341                JoinType::Cross => {
342                    unreachable!()
343                },
344            };
345        }
346        let (lhs_keys, rhs_keys) =
347            if left_df.is_empty() || other.is_empty() && matches!(&args.how, JoinType::Inner) {
348                // Fast path for empty inner joins.
349                // Return 2 dummies so that we don't row-encode.
350                let a = Series::full_null("".into(), 0, &DataType::Null);
351                (a.clone(), a)
352            } else {
353                // Row encode the keys.
354                (
355                    prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
356                    prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
357                )
358            };
359
360        let drop_names = if should_coalesce {
361            if args.how == JoinType::Right {
362                selected_left
363                    .iter()
364                    .map(|s| s.name().clone())
365                    .collect::<Vec<_>>()
366            } else {
367                selected_right
368                    .iter()
369                    .map(|s| s.name().clone())
370                    .collect::<Vec<_>>()
371            }
372        } else {
373            vec![]
374        };
375
376        // Multiple keys.
377        match args.how {
378            #[cfg(feature = "asof_join")]
379            JoinType::AsOf(_) => polars_bail!(
380                ComputeError: "asof join not supported for join on multiple keys"
381            ),
382            #[cfg(feature = "iejoin")]
383            JoinType::IEJoin => {
384                unreachable!()
385            },
386            JoinType::Cross => {
387                unreachable!()
388            },
389            JoinType::Full => {
390                let names_left = selected_left
391                    .iter()
392                    .map(|s| s.name().clone())
393                    .collect::<Vec<_>>();
394                args.coalesce = JoinCoalesce::KeepColumns;
395                let suffix = args.suffix.clone();
396                let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
397
398                if should_coalesce {
399                    Ok(_coalesce_full_join(
400                        out?,
401                        names_left.as_slice(),
402                        drop_names.as_slice(),
403                        suffix.clone(),
404                        left_df,
405                    ))
406                } else {
407                    out
408                }
409            },
410            JoinType::Inner => left_df._inner_join_from_series(
411                other,
412                &lhs_keys,
413                &rhs_keys,
414                args,
415                _verbose,
416                Some(drop_names),
417            ),
418            JoinType::Left => dispatch_left_right::left_join_from_series(
419                left_df.clone(),
420                other,
421                &lhs_keys,
422                &rhs_keys,
423                args,
424                _verbose,
425                Some(drop_names),
426            ),
427            JoinType::Right => dispatch_left_right::right_join_from_series(
428                left_df,
429                other.clone(),
430                &lhs_keys,
431                &rhs_keys,
432                args,
433                _verbose,
434                Some(drop_names),
435            ),
436            #[cfg(feature = "semi_anti_join")]
437            JoinType::Anti | JoinType::Semi => self._join_impl(
438                other,
439                vec![lhs_keys],
440                vec![rhs_keys],
441                args,
442                options,
443                _check_rechunk,
444                _verbose,
445            ),
446        }
447    }
448
449    /// Perform an inner join on two DataFrames.
450    ///
451    /// # Example
452    ///
453    /// ```
454    /// # use polars_core::prelude::*;
455    /// # use polars_ops::prelude::*;
456    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
457    ///     left.inner_join(right, ["join_column_left"], ["join_column_right"])
458    /// }
459    /// ```
460    fn inner_join(
461        &self,
462        other: &DataFrame,
463        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
464        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
465    ) -> PolarsResult<DataFrame> {
466        self.join(
467            other,
468            left_on,
469            right_on,
470            JoinArgs::new(JoinType::Inner),
471            None,
472        )
473    }
474
475    /// Perform a left outer join on two DataFrames
476    /// # Example
477    ///
478    /// ```no_run
479    /// # use polars_core::prelude::*;
480    /// # use polars_ops::prelude::*;
481    /// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
482    /// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
483    ///                          "Wavelength nm" => &[480.0, 577.0, 650.0])?;
484    ///
485    /// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
486    /// println!("{:?}", df3);
487    /// # Ok::<(), PolarsError>(())
488    /// ```
489    ///
490    /// Output:
491    ///
492    /// ```text
493    /// shape: (5, 2)
494    /// +-----------------+--------+
495    /// | Wavelength (nm) | Color  |
496    /// | ---             | ---    |
497    /// | f64             | str    |
498    /// +=================+========+
499    /// | 480             | Blue   |
500    /// +-----------------+--------+
501    /// | 650             | Red    |
502    /// +-----------------+--------+
503    /// | 577             | Yellow |
504    /// +-----------------+--------+
505    /// | 1201            | null   |
506    /// +-----------------+--------+
507    /// | 100             | null   |
508    /// +-----------------+--------+
509    /// ```
510    fn left_join(
511        &self,
512        other: &DataFrame,
513        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
514        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
515    ) -> PolarsResult<DataFrame> {
516        self.join(
517            other,
518            left_on,
519            right_on,
520            JoinArgs::new(JoinType::Left),
521            None,
522        )
523    }
524
525    /// Perform a full outer join on two DataFrames
526    /// # Example
527    ///
528    /// ```
529    /// # use polars_core::prelude::*;
530    /// # use polars_ops::prelude::*;
531    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
532    ///     left.full_join(right, ["join_column_left"], ["join_column_right"])
533    /// }
534    /// ```
535    fn full_join(
536        &self,
537        other: &DataFrame,
538        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
539        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
540    ) -> PolarsResult<DataFrame> {
541        self.join(
542            other,
543            left_on,
544            right_on,
545            JoinArgs::new(JoinType::Full),
546            None,
547        )
548    }
549}
550
551trait DataFrameJoinOpsPrivate: IntoDf {
552    fn _inner_join_from_series(
553        &self,
554        other: &DataFrame,
555        s_left: &Series,
556        s_right: &Series,
557        args: JoinArgs,
558        verbose: bool,
559        drop_names: Option<Vec<PlSmallStr>>,
560    ) -> PolarsResult<DataFrame> {
561        let left_df = self.to_df();
562        #[cfg(feature = "dtype-categorical")]
563        _check_categorical_src(s_left.dtype(), s_right.dtype())?;
564        let ((join_tuples_left, join_tuples_right), sorted) =
565            _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
566
567        let mut join_tuples_left = &*join_tuples_left;
568        let mut join_tuples_right = &*join_tuples_right;
569
570        if let Some((offset, len)) = args.slice {
571            join_tuples_left = slice_slice(join_tuples_left, offset, len);
572            join_tuples_right = slice_slice(join_tuples_right, offset, len);
573        }
574
575        let other = if let Some(drop_names) = drop_names {
576            other.drop_many(drop_names)
577        } else {
578            other.drop(s_right.name()).unwrap()
579        };
580
581        let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
582        if sorted {
583            left.set_sorted_flag(IsSorted::Ascending);
584        }
585        let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
586
587        let already_left_sorted = sorted
588            && matches!(
589                args.maintain_order,
590                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
591            );
592        try_raise_keyboard_interrupt();
593        let (df_left, df_right) =
594            if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
595                let mut df =
596                    DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?;
597
598                let columns = match args.maintain_order {
599                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
600                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
601                    _ => unreachable!(),
602                };
603
604                let options = SortMultipleOptions::new()
605                    .with_order_descending(false)
606                    .with_maintain_order(true);
607
608                df.sort_in_place(columns, options)?;
609
610                let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap();
611                if matches!(
612                    args.maintain_order,
613                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
614                ) {
615                    a.set_sorted_flag(IsSorted::Ascending);
616                }
617
618                POOL.join(
619                    // SAFETY: join indices are known to be in bounds
620                    || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
621                    || unsafe { other.take_unchecked(b.idx().unwrap()) },
622                )
623            } else {
624                POOL.join(
625                    // SAFETY: join indices are known to be in bounds
626                    || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
627                    || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
628                )
629            };
630
631        _finish_join(df_left, df_right, args.suffix.clone())
632    }
633}
634
635impl DataFrameJoinOps for DataFrame {}
636impl DataFrameJoinOpsPrivate for DataFrame {}
637
638fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
639    let keys = s
640        .iter()
641        .map(|s| {
642            let phys = s.to_physical_repr();
643            match phys.dtype() {
644                DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
645                DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
646                _ => phys.into_owned().into_column(),
647            }
648        })
649        .collect::<Vec<_>>();
650
651    if nulls_equal {
652        encode_rows_vertical_par_unordered(&keys)
653    } else {
654        encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
655    }
656}
657pub fn private_left_join_multiple_keys(
658    a: &DataFrame,
659    b: &DataFrame,
660    nulls_equal: bool,
661) -> PolarsResult<LeftJoinIds> {
662    // @scalar-opt
663    let a_cols = a
664        .get_columns()
665        .iter()
666        .map(|c| c.as_materialized_series().clone())
667        .collect::<Vec<_>>();
668    let b_cols = b
669        .get_columns()
670        .iter()
671        .map(|c| c.as_materialized_series().clone())
672        .collect::<Vec<_>>();
673
674    let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
675    let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
676    sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
677}