Skip to main content

polars_ops/frame/join/
mod.rs

1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10pub mod merge_join;
11#[cfg(feature = "merge_sorted")]
12mod merge_sorted;
13
14use std::borrow::Cow;
15use std::fmt::{Debug, Display, Formatter};
16use std::hash::Hash;
17
18pub use args::*;
19use arrow::trusted_len::TrustedLen;
20#[cfg(feature = "asof_join")]
21pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
22pub use cross_join::CrossJoin;
23#[cfg(feature = "chunked_ids")]
24use either::Either;
25#[cfg(feature = "chunked_ids")]
26use general::create_chunked_index_mapping;
27pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
28pub use hash_join::*;
29use hashbrown::hash_map::{Entry, RawEntryMut};
30#[cfg(feature = "iejoin")]
31pub use iejoin::{IEJoinOptions, InequalityOperator};
32#[cfg(feature = "merge_sorted")]
33pub use merge_sorted::_merge_sorted_dfs;
34use polars_core::POOL;
35#[allow(unused_imports)]
36use polars_core::chunked_array::ops::row_encode::{
37    encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
38};
39use polars_core::datatypes::DataType;
40use polars_core::hashing::_HASHMAP_INIT_SIZE;
41use polars_core::prelude::*;
42pub(super) use polars_core::series::IsSorted;
43use polars_core::utils::slice_offsets;
44#[allow(unused_imports)]
45use polars_core::utils::slice_slice;
46use polars_utils::hashing::BytesHash;
47use rayon::prelude::*;
48
49use self::cross_join::fused_cross_filter;
50use super::IntoDf;
51
52pub trait DataFrameJoinOps: IntoDf {
53    /// Generic join method. Can be used to join on multiple columns.
54    ///
55    /// # Example
56    ///
57    /// ```no_run
58    /// # use polars_core::prelude::*;
59    /// # use polars_ops::prelude::*;
60    /// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
61    ///                          "Phosphorus (mg/100g)" => &[11, 22, 12])?;
62    /// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
63    ///                          "Potassium (mg/100g)" => &[107, 358, 115])?;
64    ///
65    /// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
66    /// None)?;
67    /// assert_eq!(df3.shape(), (3, 3));
68    /// println!("{}", df3);
69    /// # Ok::<(), PolarsError>(())
70    /// ```
71    ///
72    /// Output:
73    ///
74    /// ```text
75    /// shape: (3, 3)
76    /// +--------+----------------------+---------------------+
77    /// | Fruit  | Phosphorus (mg/100g) | Potassium (mg/100g) |
78    /// | ---    | ---                  | ---                 |
79    /// | str    | i32                  | i32                 |
80    /// +========+======================+=====================+
81    /// | Apple  | 11                   | 107                 |
82    /// +--------+----------------------+---------------------+
83    /// | Banana | 22                   | 358                 |
84    /// +--------+----------------------+---------------------+
85    /// | Pear   | 12                   | 115                 |
86    /// +--------+----------------------+---------------------+
87    /// ```
88    fn join(
89        &self,
90        other: &DataFrame,
91        left_on: impl IntoIterator<Item = impl AsRef<str>>,
92        right_on: impl IntoIterator<Item = impl AsRef<str>>,
93        args: JoinArgs,
94        options: Option<JoinTypeOptions>,
95    ) -> PolarsResult<DataFrame> {
96        let df_left = self.to_df();
97        let selected_left = df_left.select_to_vec(left_on)?;
98        let selected_right = other.select_to_vec(right_on)?;
99
100        let selected_left = selected_left
101            .into_iter()
102            .map(Column::take_materialized_series)
103            .collect::<Vec<_>>();
104        let selected_right = selected_right
105            .into_iter()
106            .map(Column::take_materialized_series)
107            .collect::<Vec<_>>();
108
109        self._join_impl(
110            other,
111            selected_left,
112            selected_right,
113            args,
114            options,
115            true,
116            false,
117        )
118    }
119
120    #[doc(hidden)]
121    #[allow(clippy::too_many_arguments)]
122    #[allow(unused_mut)]
123    fn _join_impl(
124        &self,
125        other: &DataFrame,
126        mut selected_left: Vec<Series>,
127        mut selected_right: Vec<Series>,
128        mut args: JoinArgs,
129        options: Option<JoinTypeOptions>,
130        _check_rechunk: bool,
131        _verbose: bool,
132    ) -> PolarsResult<DataFrame> {
133        let left_df = self.to_df();
134
135        #[cfg(feature = "cross_join")]
136        if let JoinType::Cross = args.how {
137            if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
138                assert!(args.slice.is_none());
139                return fused_cross_filter(
140                    left_df,
141                    other,
142                    args.suffix.clone(),
143                    cross_options,
144                    args.maintain_order,
145                );
146            }
147            return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
148        }
149
150        // Clear literals if a frame is empty. Otherwise we could get an oob
151        fn clear(s: &mut [Series]) {
152            for s in s.iter_mut() {
153                if s.len() == 1 {
154                    *s = s.clear()
155                }
156            }
157        }
158        if left_df.height() == 0 {
159            clear(&mut selected_left);
160        }
161        if other.height() == 0 {
162            clear(&mut selected_right);
163        }
164
165        let should_coalesce = args.should_coalesce();
166        assert_eq!(selected_left.len(), selected_right.len());
167
168        #[cfg(feature = "chunked_ids")]
169        {
170            // a left join create chunked-ids
171            // the others not yet.
172            // TODO! change this to other join types once they support chunked-id joins
173            if _check_rechunk
174                && !(matches!(args.how, JoinType::Left)
175                    || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
176            {
177                let mut left = Cow::Borrowed(left_df);
178                let mut right = Cow::Borrowed(other);
179                if left_df.should_rechunk() {
180                    if _verbose {
181                        eprintln!(
182                            "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
183                            args.how,
184                            left_df.width()
185                        );
186                    }
187
188                    let mut tmp_left = left_df.clone();
189                    tmp_left.rechunk_mut_par();
190                    left = Cow::Owned(tmp_left);
191                }
192                if other.should_rechunk() {
193                    if _verbose {
194                        eprintln!(
195                            "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
196                            args.how,
197                            other.width()
198                        );
199                    }
200                    let mut tmp_right = other.clone();
201                    tmp_right.rechunk_mut_par();
202                    right = Cow::Owned(tmp_right);
203                }
204                return left._join_impl(
205                    &right,
206                    selected_left,
207                    selected_right,
208                    args,
209                    options,
210                    false,
211                    _verbose,
212                );
213            }
214        }
215
216        if let Some((l, r)) = selected_left
217            .iter()
218            .zip(&selected_right)
219            .find(|(l, r)| l.dtype() != r.dtype())
220        {
221            polars_bail!(
222                ComputeError:
223                    "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
224                    l.name(), l.dtype().pretty_format(), r.name(), r.dtype().pretty_format()
225            );
226        };
227
228        #[cfg(feature = "iejoin")]
229        if let JoinType::IEJoin = args.how {
230            let Some(JoinTypeOptions::IEJoin(options)) = options else {
231                unreachable!()
232            };
233            let func = if POOL.current_num_threads() > 1
234                && !left_df.shape_has_zero()
235                && !other.shape_has_zero()
236            {
237                iejoin::iejoin_par
238            } else {
239                iejoin::iejoin
240            };
241            return func(
242                left_df,
243                other,
244                selected_left,
245                selected_right,
246                &options,
247                args.suffix,
248                args.slice,
249            );
250        }
251
252        // Single keys.
253        if selected_left.len() == 1 {
254            let s_left = &selected_left[0];
255            let s_right = &selected_right[0];
256            let drop_names: Option<Vec<PlSmallStr>> =
257                if should_coalesce { None } else { Some(vec![]) };
258            return match args.how {
259                JoinType::Inner => left_df
260                    ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
261                JoinType::Left => dispatch_left_right::left_join_from_series(
262                    self.to_df().clone(),
263                    other,
264                    s_left,
265                    s_right,
266                    args,
267                    _verbose,
268                    drop_names,
269                ),
270                JoinType::Right => dispatch_left_right::right_join_from_series(
271                    self.to_df(),
272                    other.clone(),
273                    s_left,
274                    s_right,
275                    args,
276                    _verbose,
277                    drop_names,
278                ),
279                JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
280                #[cfg(feature = "semi_anti_join")]
281                JoinType::Anti => left_df._semi_anti_join_from_series(
282                    s_left,
283                    s_right,
284                    args.slice,
285                    true,
286                    args.nulls_equal,
287                ),
288                #[cfg(feature = "semi_anti_join")]
289                JoinType::Semi => left_df._semi_anti_join_from_series(
290                    s_left,
291                    s_right,
292                    args.slice,
293                    false,
294                    args.nulls_equal,
295                ),
296                #[cfg(feature = "asof_join")]
297                JoinType::AsOf(options) => match (options.left_by, options.right_by) {
298                    (Some(left_by), Some(right_by)) => left_df._join_asof_by(
299                        other,
300                        s_left,
301                        s_right,
302                        left_by,
303                        right_by,
304                        options.strategy,
305                        options.tolerance.map(|v| v.into_value()),
306                        args.suffix.clone(),
307                        args.slice,
308                        should_coalesce,
309                        options.allow_eq,
310                        options.check_sortedness,
311                    ),
312                    (None, None) => left_df._join_asof(
313                        other,
314                        s_left,
315                        s_right,
316                        options.strategy,
317                        options.tolerance.map(|v| v.into_value()),
318                        args.suffix,
319                        args.slice,
320                        should_coalesce,
321                        options.allow_eq,
322                        options.check_sortedness,
323                    ),
324                    _ => {
325                        panic!("expected by arguments on both sides")
326                    },
327                },
328                #[cfg(feature = "iejoin")]
329                JoinType::IEJoin => {
330                    unreachable!()
331                },
332                JoinType::Cross => {
333                    unreachable!()
334                },
335            };
336        }
337        let (lhs_keys, rhs_keys) = if (left_df.height() == 0 || other.height() == 0)
338            && matches!(&args.how, JoinType::Inner)
339        {
340            // Fast path for empty inner joins.
341            // Return 2 dummies so that we don't row-encode.
342            let a = Series::full_null("".into(), 0, &DataType::Null);
343            (a.clone(), a)
344        } else {
345            // Row encode the keys.
346            (
347                prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
348                prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
349            )
350        };
351
352        let drop_names = if should_coalesce {
353            if args.how == JoinType::Right {
354                selected_left
355                    .iter()
356                    .map(|s| s.name().clone())
357                    .collect::<Vec<_>>()
358            } else {
359                selected_right
360                    .iter()
361                    .map(|s| s.name().clone())
362                    .collect::<Vec<_>>()
363            }
364        } else {
365            vec![]
366        };
367
368        // Multiple keys.
369        match args.how {
370            #[cfg(feature = "asof_join")]
371            JoinType::AsOf(_) => polars_bail!(
372                ComputeError: "asof join not supported for join on multiple keys"
373            ),
374            #[cfg(feature = "iejoin")]
375            JoinType::IEJoin => {
376                unreachable!()
377            },
378            JoinType::Cross => {
379                unreachable!()
380            },
381            JoinType::Full => {
382                let names_left = selected_left
383                    .iter()
384                    .map(|s| s.name().clone())
385                    .collect::<Vec<_>>();
386                args.coalesce = JoinCoalesce::KeepColumns;
387                let suffix = args.suffix.clone();
388                let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
389
390                if should_coalesce {
391                    Ok(_coalesce_full_join(
392                        out?,
393                        names_left.as_slice(),
394                        drop_names.as_slice(),
395                        suffix,
396                        left_df,
397                    ))
398                } else {
399                    out
400                }
401            },
402            JoinType::Inner => left_df._inner_join_from_series(
403                other,
404                &lhs_keys,
405                &rhs_keys,
406                args,
407                _verbose,
408                Some(drop_names),
409            ),
410            JoinType::Left => dispatch_left_right::left_join_from_series(
411                left_df.clone(),
412                other,
413                &lhs_keys,
414                &rhs_keys,
415                args,
416                _verbose,
417                Some(drop_names),
418            ),
419            JoinType::Right => dispatch_left_right::right_join_from_series(
420                left_df,
421                other.clone(),
422                &lhs_keys,
423                &rhs_keys,
424                args,
425                _verbose,
426                Some(drop_names),
427            ),
428            #[cfg(feature = "semi_anti_join")]
429            JoinType::Anti | JoinType::Semi => self._join_impl(
430                other,
431                vec![lhs_keys],
432                vec![rhs_keys],
433                args,
434                options,
435                _check_rechunk,
436                _verbose,
437            ),
438        }
439    }
440
441    /// Perform an inner join on two DataFrames.
442    ///
443    /// # Example
444    ///
445    /// ```
446    /// # use polars_core::prelude::*;
447    /// # use polars_ops::prelude::*;
448    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
449    ///     left.inner_join(right, ["join_column_left"], ["join_column_right"])
450    /// }
451    /// ```
452    fn inner_join(
453        &self,
454        other: &DataFrame,
455        left_on: impl IntoIterator<Item = impl AsRef<str>>,
456        right_on: impl IntoIterator<Item = impl AsRef<str>>,
457    ) -> PolarsResult<DataFrame> {
458        self.join(
459            other,
460            left_on,
461            right_on,
462            JoinArgs::new(JoinType::Inner),
463            None,
464        )
465    }
466
467    /// Perform a left outer join on two DataFrames
468    /// # Example
469    ///
470    /// ```no_run
471    /// # use polars_core::prelude::*;
472    /// # use polars_ops::prelude::*;
473    /// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
474    /// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
475    ///                          "Wavelength nm" => &[480.0, 577.0, 650.0])?;
476    ///
477    /// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
478    /// println!("{:?}", df3);
479    /// # Ok::<(), PolarsError>(())
480    /// ```
481    ///
482    /// Output:
483    ///
484    /// ```text
485    /// shape: (5, 2)
486    /// +-----------------+--------+
487    /// | Wavelength (nm) | Color  |
488    /// | ---             | ---    |
489    /// | f64             | str    |
490    /// +=================+========+
491    /// | 480             | Blue   |
492    /// +-----------------+--------+
493    /// | 650             | Red    |
494    /// +-----------------+--------+
495    /// | 577             | Yellow |
496    /// +-----------------+--------+
497    /// | 1201            | null   |
498    /// +-----------------+--------+
499    /// | 100             | null   |
500    /// +-----------------+--------+
501    /// ```
502    fn left_join(
503        &self,
504        other: &DataFrame,
505        left_on: impl IntoIterator<Item = impl AsRef<str>>,
506        right_on: impl IntoIterator<Item = impl AsRef<str>>,
507    ) -> PolarsResult<DataFrame> {
508        self.join(
509            other,
510            left_on,
511            right_on,
512            JoinArgs::new(JoinType::Left),
513            None,
514        )
515    }
516
517    /// Perform a full outer join on two DataFrames
518    /// # Example
519    ///
520    /// ```
521    /// # use polars_core::prelude::*;
522    /// # use polars_ops::prelude::*;
523    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
524    ///     left.full_join(right, ["join_column_left"], ["join_column_right"])
525    /// }
526    /// ```
527    fn full_join(
528        &self,
529        other: &DataFrame,
530        left_on: impl IntoIterator<Item = impl AsRef<str>>,
531        right_on: impl IntoIterator<Item = impl AsRef<str>>,
532    ) -> PolarsResult<DataFrame> {
533        self.join(
534            other,
535            left_on,
536            right_on,
537            JoinArgs::new(JoinType::Full),
538            None,
539        )
540    }
541}
542
543trait DataFrameJoinOpsPrivate: IntoDf {
544    fn _inner_join_from_series(
545        &self,
546        other: &DataFrame,
547        s_left: &Series,
548        s_right: &Series,
549        args: JoinArgs,
550        verbose: bool,
551        drop_names: Option<Vec<PlSmallStr>>,
552    ) -> PolarsResult<DataFrame> {
553        let left_df = self.to_df();
554        let ((join_tuples_left, join_tuples_right), sorted) =
555            _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
556
557        let mut join_tuples_left = &*join_tuples_left;
558        let mut join_tuples_right = &*join_tuples_right;
559
560        if let Some((offset, len)) = args.slice {
561            join_tuples_left = slice_slice(join_tuples_left, offset, len);
562            join_tuples_right = slice_slice(join_tuples_right, offset, len);
563        }
564
565        let other = if let Some(drop_names) = drop_names {
566            other.drop_many(drop_names)
567        } else {
568            other.drop(s_right.name()).unwrap()
569        };
570
571        let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
572        if sorted {
573            left.set_sorted_flag(IsSorted::Ascending);
574        }
575        let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
576
577        let already_left_sorted = sorted
578            && matches!(
579                args.maintain_order,
580                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
581            );
582        try_raise_keyboard_interrupt();
583        let (df_left, df_right) =
584            if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
585                let mut df = unsafe {
586                    DataFrame::new_unchecked_infer_height(vec![
587                        left.into_series().into(),
588                        right.into_series().into(),
589                    ])
590                };
591
592                let columns = match args.maintain_order {
593                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
594                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
595                    _ => unreachable!(),
596                };
597
598                let options = SortMultipleOptions::new()
599                    .with_order_descending(false)
600                    .with_maintain_order(true);
601
602                df.sort_in_place(columns, options)?;
603
604                let [mut a, b]: [Column; 2] = df.into_columns().try_into().unwrap();
605                if matches!(
606                    args.maintain_order,
607                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
608                ) {
609                    a.set_sorted_flag(IsSorted::Ascending);
610                }
611
612                POOL.join(
613                    // SAFETY: join indices are known to be in bounds
614                    || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
615                    || unsafe { other.take_unchecked(b.idx().unwrap()) },
616                )
617            } else {
618                POOL.join(
619                    // SAFETY: join indices are known to be in bounds
620                    || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
621                    || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
622                )
623            };
624
625        _finish_join(df_left, df_right, args.suffix)
626    }
627}
628
629impl DataFrameJoinOps for DataFrame {}
630impl DataFrameJoinOpsPrivate for DataFrame {}
631
632fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
633    let keys = s
634        .iter()
635        .map(|s| {
636            let phys = s.to_physical_repr();
637            match phys.dtype() {
638                #[cfg(feature = "dtype-f16")]
639                DataType::Float16 => phys.f16().unwrap().to_canonical().into_column(),
640                DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
641                DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
642                _ => phys.into_owned().into_column(),
643            }
644        })
645        .collect::<Vec<_>>();
646
647    if nulls_equal {
648        encode_rows_vertical_par_unordered(&keys)
649    } else {
650        encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
651    }
652}
653pub fn private_left_join_multiple_keys(
654    a: &DataFrame,
655    b: &DataFrame,
656    nulls_equal: bool,
657) -> PolarsResult<LeftJoinIds> {
658    // @scalar-opt
659    let a_cols = a
660        .columns()
661        .iter()
662        .map(|c| c.as_materialized_series().clone())
663        .collect::<Vec<_>>();
664    let b_cols = b
665        .columns()
666        .iter()
667        .map(|c| c.as_materialized_series().clone())
668        .collect::<Vec<_>>();
669
670    let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
671    let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
672    sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
673}