Skip to main content

polars_ops/frame/join/
mod.rs

1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10pub mod merge_join;
11#[cfg(feature = "merge_sorted")]
12mod merge_sorted;
13
14use std::borrow::Cow;
15use std::fmt::{Debug, Display, Formatter};
16use std::hash::Hash;
17
18pub use args::*;
19use arrow::trusted_len::TrustedLen;
20#[cfg(feature = "asof_join")]
21pub use asof::{
22    _check_asof_columns, _join_asof_dispatch, AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy,
23};
24pub use cross_join::CrossJoin;
25#[cfg(feature = "chunked_ids")]
26use either::Either;
27#[cfg(feature = "chunked_ids")]
28use general::create_chunked_index_mapping;
29pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
30pub use hash_join::*;
31use hashbrown::hash_map::{Entry, RawEntryMut};
32#[cfg(feature = "iejoin")]
33pub use iejoin::{IEJoinOptions, InequalityOperator};
34#[cfg(feature = "merge_sorted")]
35pub use merge_sorted::_merge_sorted_dfs;
36use polars_core::POOL;
37#[allow(unused_imports)]
38use polars_core::chunked_array::ops::row_encode::{
39    encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
40};
41use polars_core::datatypes::DataType;
42use polars_core::hashing::_HASHMAP_INIT_SIZE;
43use polars_core::prelude::*;
44pub(super) use polars_core::series::IsSorted;
45use polars_core::utils::slice_offsets;
46#[allow(unused_imports)]
47use polars_core::utils::slice_slice;
48use polars_utils::hashing::BytesHash;
49use rayon::prelude::*;
50
51use self::cross_join::fused_cross_filter;
52use super::IntoDf;
53
54pub trait DataFrameJoinOps: IntoDf {
55    /// Generic join method. Can be used to join on multiple columns.
56    ///
57    /// # Example
58    ///
59    /// ```no_run
60    /// # use polars_core::prelude::*;
61    /// # use polars_ops::prelude::*;
62    /// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
63    ///                          "Phosphorus (mg/100g)" => &[11, 22, 12])?;
64    /// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
65    ///                          "Potassium (mg/100g)" => &[107, 358, 115])?;
66    ///
67    /// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
68    /// None)?;
69    /// assert_eq!(df3.shape(), (3, 3));
70    /// println!("{}", df3);
71    /// # Ok::<(), PolarsError>(())
72    /// ```
73    ///
74    /// Output:
75    ///
76    /// ```text
77    /// shape: (3, 3)
78    /// +--------+----------------------+---------------------+
79    /// | Fruit  | Phosphorus (mg/100g) | Potassium (mg/100g) |
80    /// | ---    | ---                  | ---                 |
81    /// | str    | i32                  | i32                 |
82    /// +========+======================+=====================+
83    /// | Apple  | 11                   | 107                 |
84    /// +--------+----------------------+---------------------+
85    /// | Banana | 22                   | 358                 |
86    /// +--------+----------------------+---------------------+
87    /// | Pear   | 12                   | 115                 |
88    /// +--------+----------------------+---------------------+
89    /// ```
90    fn join(
91        &self,
92        other: &DataFrame,
93        left_on: impl IntoIterator<Item = impl AsRef<str>>,
94        right_on: impl IntoIterator<Item = impl AsRef<str>>,
95        args: JoinArgs,
96        options: Option<JoinTypeOptions>,
97    ) -> PolarsResult<DataFrame> {
98        let df_left = self.to_df();
99        let selected_left = df_left.select_to_vec(left_on)?;
100        let selected_right = other.select_to_vec(right_on)?;
101
102        let selected_left = selected_left
103            .into_iter()
104            .map(Column::take_materialized_series)
105            .collect::<Vec<_>>();
106        let selected_right = selected_right
107            .into_iter()
108            .map(Column::take_materialized_series)
109            .collect::<Vec<_>>();
110
111        self._join_impl(
112            other,
113            selected_left,
114            selected_right,
115            args,
116            options,
117            true,
118            false,
119        )
120    }
121
122    #[doc(hidden)]
123    #[allow(clippy::too_many_arguments)]
124    #[allow(unused_mut)]
125    fn _join_impl(
126        &self,
127        other: &DataFrame,
128        mut selected_left: Vec<Series>,
129        mut selected_right: Vec<Series>,
130        mut args: JoinArgs,
131        options: Option<JoinTypeOptions>,
132        _check_rechunk: bool,
133        _verbose: bool,
134    ) -> PolarsResult<DataFrame> {
135        let left_df = self.to_df();
136
137        #[cfg(feature = "cross_join")]
138        if let JoinType::Cross = args.how {
139            if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
140                assert!(args.slice.is_none());
141                return fused_cross_filter(
142                    left_df,
143                    other,
144                    args.suffix.clone(),
145                    cross_options,
146                    args.maintain_order,
147                );
148            }
149            return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
150        }
151
152        // Clear literals if a frame is empty. Otherwise we could get an oob
153        fn clear(s: &mut [Series]) {
154            for s in s.iter_mut() {
155                if s.len() == 1 {
156                    *s = s.clear()
157                }
158            }
159        }
160        if left_df.height() == 0 {
161            clear(&mut selected_left);
162        }
163        if other.height() == 0 {
164            clear(&mut selected_right);
165        }
166
167        let should_coalesce = args.should_coalesce();
168        assert_eq!(selected_left.len(), selected_right.len());
169
170        #[cfg(feature = "chunked_ids")]
171        {
172            // a left join create chunked-ids
173            // the others not yet.
174            // TODO! change this to other join types once they support chunked-id joins
175            if _check_rechunk
176                && !(matches!(args.how, JoinType::Left)
177                    || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
178            {
179                let mut left = Cow::Borrowed(left_df);
180                let mut right = Cow::Borrowed(other);
181                if left_df.should_rechunk() {
182                    if _verbose {
183                        eprintln!(
184                            "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
185                            args.how,
186                            left_df.width()
187                        );
188                    }
189
190                    let mut tmp_left = left_df.clone();
191                    tmp_left.rechunk_mut_par();
192                    left = Cow::Owned(tmp_left);
193                }
194                if other.should_rechunk() {
195                    if _verbose {
196                        eprintln!(
197                            "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
198                            args.how,
199                            other.width()
200                        );
201                    }
202                    let mut tmp_right = other.clone();
203                    tmp_right.rechunk_mut_par();
204                    right = Cow::Owned(tmp_right);
205                }
206                return left._join_impl(
207                    &right,
208                    selected_left,
209                    selected_right,
210                    args,
211                    options,
212                    false,
213                    _verbose,
214                );
215            }
216        }
217
218        if let Some((l, r)) = selected_left
219            .iter()
220            .zip(&selected_right)
221            .find(|(l, r)| l.dtype() != r.dtype())
222        {
223            polars_bail!(
224                ComputeError:
225                    "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
226                    l.name(), l.dtype().pretty_format(), r.name(), r.dtype().pretty_format()
227            );
228        };
229
230        #[cfg(feature = "iejoin")]
231        if let JoinType::IEJoin = args.how {
232            let Some(JoinTypeOptions::IEJoin(options)) = options else {
233                unreachable!()
234            };
235            let func = if POOL.current_num_threads() > 1
236                && !left_df.shape_has_zero()
237                && !other.shape_has_zero()
238            {
239                iejoin::iejoin_par
240            } else {
241                iejoin::iejoin
242            };
243            return func(
244                left_df,
245                other,
246                selected_left,
247                selected_right,
248                &options,
249                args.suffix,
250                args.slice,
251            );
252        }
253
254        // Single keys.
255        if selected_left.len() == 1 {
256            let s_left = &selected_left[0];
257            let s_right = &selected_right[0];
258            let drop_names: Option<Vec<PlSmallStr>> =
259                if should_coalesce { None } else { Some(vec![]) };
260            return match args.how {
261                JoinType::Inner => left_df
262                    ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
263                JoinType::Left => dispatch_left_right::left_join_from_series(
264                    self.to_df().clone(),
265                    other,
266                    s_left,
267                    s_right,
268                    args,
269                    _verbose,
270                    drop_names,
271                ),
272                JoinType::Right => dispatch_left_right::right_join_from_series(
273                    self.to_df(),
274                    other.clone(),
275                    s_left,
276                    s_right,
277                    args,
278                    _verbose,
279                    drop_names,
280                ),
281                JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
282                #[cfg(feature = "semi_anti_join")]
283                JoinType::Anti => left_df._semi_anti_join_from_series(
284                    s_left,
285                    s_right,
286                    args.slice,
287                    true,
288                    args.nulls_equal,
289                ),
290                #[cfg(feature = "semi_anti_join")]
291                JoinType::Semi => left_df._semi_anti_join_from_series(
292                    s_left,
293                    s_right,
294                    args.slice,
295                    false,
296                    args.nulls_equal,
297                ),
298                #[cfg(feature = "asof_join")]
299                JoinType::AsOf(options) => match (options.left_by, options.right_by) {
300                    (Some(left_by), Some(right_by)) => left_df._join_asof_by(
301                        other,
302                        s_left,
303                        s_right,
304                        left_by,
305                        right_by,
306                        options.strategy,
307                        options.tolerance.map(|v| v.into_value()),
308                        args.suffix.clone(),
309                        args.slice,
310                        should_coalesce,
311                        options.allow_eq,
312                        options.check_sortedness,
313                    ),
314                    (None, None) => left_df._join_asof(
315                        other,
316                        s_left,
317                        s_right,
318                        options.strategy,
319                        options.tolerance.map(|v| v.into_value()),
320                        args.suffix,
321                        args.slice,
322                        should_coalesce,
323                        options.allow_eq,
324                        options.check_sortedness,
325                    ),
326                    _ => {
327                        panic!("expected by arguments on both sides")
328                    },
329                },
330                #[cfg(feature = "iejoin")]
331                JoinType::IEJoin | JoinType::Range => {
332                    unreachable!()
333                },
334                JoinType::Cross => {
335                    unreachable!()
336                },
337            };
338        }
339        let (lhs_keys, rhs_keys) = if (left_df.height() == 0 || other.height() == 0)
340            && matches!(&args.how, JoinType::Inner)
341        {
342            // Fast path for empty inner joins.
343            // Return 2 dummies so that we don't row-encode.
344            let a = Series::full_null("".into(), 0, &DataType::Null);
345            (a.clone(), a)
346        } else {
347            // Row encode the keys.
348            (
349                prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
350                prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
351            )
352        };
353
354        let drop_names = if should_coalesce {
355            if args.how == JoinType::Right {
356                selected_left
357                    .iter()
358                    .map(|s| s.name().clone())
359                    .collect::<Vec<_>>()
360            } else {
361                selected_right
362                    .iter()
363                    .map(|s| s.name().clone())
364                    .collect::<Vec<_>>()
365            }
366        } else {
367            vec![]
368        };
369
370        // Multiple keys.
371        match args.how {
372            #[cfg(feature = "asof_join")]
373            JoinType::AsOf(_) => polars_bail!(
374                ComputeError: "asof join not supported for join on multiple keys"
375            ),
376            #[cfg(feature = "iejoin")]
377            JoinType::IEJoin | JoinType::Range => {
378                unreachable!()
379            },
380            JoinType::Cross => {
381                unreachable!()
382            },
383            JoinType::Full => {
384                let names_left = selected_left
385                    .iter()
386                    .map(|s| s.name().clone())
387                    .collect::<Vec<_>>();
388                args.coalesce = JoinCoalesce::KeepColumns;
389                let suffix = args.suffix.clone();
390                let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
391
392                if should_coalesce {
393                    Ok(_coalesce_full_join(
394                        out?,
395                        names_left.as_slice(),
396                        drop_names.as_slice(),
397                        suffix,
398                        left_df,
399                    ))
400                } else {
401                    out
402                }
403            },
404            JoinType::Inner => left_df._inner_join_from_series(
405                other,
406                &lhs_keys,
407                &rhs_keys,
408                args,
409                _verbose,
410                Some(drop_names),
411            ),
412            JoinType::Left => dispatch_left_right::left_join_from_series(
413                left_df.clone(),
414                other,
415                &lhs_keys,
416                &rhs_keys,
417                args,
418                _verbose,
419                Some(drop_names),
420            ),
421            JoinType::Right => dispatch_left_right::right_join_from_series(
422                left_df,
423                other.clone(),
424                &lhs_keys,
425                &rhs_keys,
426                args,
427                _verbose,
428                Some(drop_names),
429            ),
430            #[cfg(feature = "semi_anti_join")]
431            JoinType::Anti | JoinType::Semi => self._join_impl(
432                other,
433                vec![lhs_keys],
434                vec![rhs_keys],
435                args,
436                options,
437                _check_rechunk,
438                _verbose,
439            ),
440        }
441    }
442
443    /// Perform an inner join on two DataFrames.
444    ///
445    /// # Example
446    ///
447    /// ```
448    /// # use polars_core::prelude::*;
449    /// # use polars_ops::prelude::*;
450    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
451    ///     left.inner_join(right, ["join_column_left"], ["join_column_right"])
452    /// }
453    /// ```
454    fn inner_join(
455        &self,
456        other: &DataFrame,
457        left_on: impl IntoIterator<Item = impl AsRef<str>>,
458        right_on: impl IntoIterator<Item = impl AsRef<str>>,
459    ) -> PolarsResult<DataFrame> {
460        self.join(
461            other,
462            left_on,
463            right_on,
464            JoinArgs::new(JoinType::Inner),
465            None,
466        )
467    }
468
469    /// Perform a left outer join on two DataFrames
470    /// # Example
471    ///
472    /// ```no_run
473    /// # use polars_core::prelude::*;
474    /// # use polars_ops::prelude::*;
475    /// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
476    /// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
477    ///                          "Wavelength nm" => &[480.0, 577.0, 650.0])?;
478    ///
479    /// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
480    /// println!("{:?}", df3);
481    /// # Ok::<(), PolarsError>(())
482    /// ```
483    ///
484    /// Output:
485    ///
486    /// ```text
487    /// shape: (5, 2)
488    /// +-----------------+--------+
489    /// | Wavelength (nm) | Color  |
490    /// | ---             | ---    |
491    /// | f64             | str    |
492    /// +=================+========+
493    /// | 480             | Blue   |
494    /// +-----------------+--------+
495    /// | 650             | Red    |
496    /// +-----------------+--------+
497    /// | 577             | Yellow |
498    /// +-----------------+--------+
499    /// | 1201            | null   |
500    /// +-----------------+--------+
501    /// | 100             | null   |
502    /// +-----------------+--------+
503    /// ```
504    fn left_join(
505        &self,
506        other: &DataFrame,
507        left_on: impl IntoIterator<Item = impl AsRef<str>>,
508        right_on: impl IntoIterator<Item = impl AsRef<str>>,
509    ) -> PolarsResult<DataFrame> {
510        self.join(
511            other,
512            left_on,
513            right_on,
514            JoinArgs::new(JoinType::Left),
515            None,
516        )
517    }
518
519    /// Perform a full outer join on two DataFrames
520    /// # Example
521    ///
522    /// ```
523    /// # use polars_core::prelude::*;
524    /// # use polars_ops::prelude::*;
525    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
526    ///     left.full_join(right, ["join_column_left"], ["join_column_right"])
527    /// }
528    /// ```
529    fn full_join(
530        &self,
531        other: &DataFrame,
532        left_on: impl IntoIterator<Item = impl AsRef<str>>,
533        right_on: impl IntoIterator<Item = impl AsRef<str>>,
534    ) -> PolarsResult<DataFrame> {
535        self.join(
536            other,
537            left_on,
538            right_on,
539            JoinArgs::new(JoinType::Full),
540            None,
541        )
542    }
543}
544
545trait DataFrameJoinOpsPrivate: IntoDf {
546    fn _inner_join_from_series(
547        &self,
548        other: &DataFrame,
549        s_left: &Series,
550        s_right: &Series,
551        args: JoinArgs,
552        verbose: bool,
553        drop_names: Option<Vec<PlSmallStr>>,
554    ) -> PolarsResult<DataFrame> {
555        let left_df = self.to_df();
556        let ((join_tuples_left, join_tuples_right), sorted) =
557            _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
558
559        let mut join_tuples_left = &*join_tuples_left;
560        let mut join_tuples_right = &*join_tuples_right;
561
562        if let Some((offset, len)) = args.slice {
563            join_tuples_left = slice_slice(join_tuples_left, offset, len);
564            join_tuples_right = slice_slice(join_tuples_right, offset, len);
565        }
566
567        let other = if let Some(drop_names) = drop_names {
568            other.drop_many(drop_names)
569        } else {
570            other.drop(s_right.name()).unwrap()
571        };
572
573        let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
574        if sorted {
575            left.set_sorted_flag(IsSorted::Ascending);
576        }
577        let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
578
579        let already_left_sorted = sorted
580            && matches!(
581                args.maintain_order,
582                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
583            );
584        try_raise_keyboard_interrupt();
585        let (df_left, df_right) =
586            if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
587                let mut df = unsafe {
588                    DataFrame::new_unchecked_infer_height(vec![
589                        left.into_series().into(),
590                        right.into_series().into(),
591                    ])
592                };
593
594                let columns = match args.maintain_order {
595                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
596                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
597                    _ => unreachable!(),
598                };
599
600                let options = SortMultipleOptions::new()
601                    .with_order_descending(false)
602                    .with_maintain_order(true);
603
604                df.sort_in_place(columns, options)?;
605
606                let [mut a, b]: [Column; 2] = df.into_columns().try_into().unwrap();
607                if matches!(
608                    args.maintain_order,
609                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
610                ) {
611                    a.set_sorted_flag(IsSorted::Ascending);
612                }
613
614                POOL.join(
615                    // SAFETY: join indices are known to be in bounds
616                    || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
617                    || unsafe { other.take_unchecked(b.idx().unwrap()) },
618                )
619            } else {
620                POOL.join(
621                    // SAFETY: join indices are known to be in bounds
622                    || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
623                    || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
624                )
625            };
626
627        _finish_join(df_left, df_right, args.suffix)
628    }
629}
630
631impl DataFrameJoinOps for DataFrame {}
632impl DataFrameJoinOpsPrivate for DataFrame {}
633
634fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
635    let keys = s
636        .iter()
637        .map(|s| {
638            let phys = s.to_physical_repr();
639            match phys.dtype() {
640                #[cfg(feature = "dtype-f16")]
641                DataType::Float16 => phys.f16().unwrap().to_canonical().into_column(),
642                DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
643                DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
644                _ => phys.into_owned().into_column(),
645            }
646        })
647        .collect::<Vec<_>>();
648
649    if nulls_equal {
650        encode_rows_vertical_par_unordered(&keys)
651    } else {
652        encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
653    }
654}
655pub fn private_left_join_multiple_keys(
656    a: &DataFrame,
657    b: &DataFrame,
658    nulls_equal: bool,
659) -> PolarsResult<LeftJoinIds> {
660    // @scalar-opt
661    let a_cols = a
662        .columns()
663        .iter()
664        .map(|c| c.as_materialized_series().clone())
665        .collect::<Vec<_>>();
666    let b_cols = b
667        .columns()
668        .iter()
669        .map(|c| c.as_materialized_series().clone())
670        .collect::<Vec<_>>();
671
672    let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
673    let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
674    sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
675}