polars_ops/frame/join/
mod.rs

1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10pub mod merge_join;
11#[cfg(feature = "merge_sorted")]
12mod merge_sorted;
13
14use std::borrow::Cow;
15use std::fmt::{Debug, Display, Formatter};
16use std::hash::Hash;
17
18pub use args::*;
19use arrow::trusted_len::TrustedLen;
20#[cfg(feature = "asof_join")]
21pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
22pub use cross_join::CrossJoin;
23#[cfg(feature = "chunked_ids")]
24use either::Either;
25#[cfg(feature = "chunked_ids")]
26use general::create_chunked_index_mapping;
27pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
28pub use hash_join::*;
29use hashbrown::hash_map::{Entry, RawEntryMut};
30#[cfg(feature = "iejoin")]
31pub use iejoin::{IEJoinOptions, InequalityOperator};
32#[cfg(feature = "merge_sorted")]
33pub use merge_sorted::_merge_sorted_dfs;
34use polars_core::POOL;
35#[allow(unused_imports)]
36use polars_core::chunked_array::ops::row_encode::{
37    encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
38};
39use polars_core::hashing::_HASHMAP_INIT_SIZE;
40use polars_core::prelude::*;
41pub(super) use polars_core::series::IsSorted;
42use polars_core::utils::slice_offsets;
43#[allow(unused_imports)]
44use polars_core::utils::slice_slice;
45use polars_utils::hashing::BytesHash;
46use rayon::prelude::*;
47
48use self::cross_join::fused_cross_filter;
49use super::IntoDf;
50
51pub trait DataFrameJoinOps: IntoDf {
52    /// Generic join method. Can be used to join on multiple columns.
53    ///
54    /// # Example
55    ///
56    /// ```no_run
57    /// # use polars_core::prelude::*;
58    /// # use polars_ops::prelude::*;
59    /// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
60    ///                          "Phosphorus (mg/100g)" => &[11, 22, 12])?;
61    /// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
62    ///                          "Potassium (mg/100g)" => &[107, 358, 115])?;
63    ///
64    /// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
65    /// None)?;
66    /// assert_eq!(df3.shape(), (3, 3));
67    /// println!("{}", df3);
68    /// # Ok::<(), PolarsError>(())
69    /// ```
70    ///
71    /// Output:
72    ///
73    /// ```text
74    /// shape: (3, 3)
75    /// +--------+----------------------+---------------------+
76    /// | Fruit  | Phosphorus (mg/100g) | Potassium (mg/100g) |
77    /// | ---    | ---                  | ---                 |
78    /// | str    | i32                  | i32                 |
79    /// +========+======================+=====================+
80    /// | Apple  | 11                   | 107                 |
81    /// +--------+----------------------+---------------------+
82    /// | Banana | 22                   | 358                 |
83    /// +--------+----------------------+---------------------+
84    /// | Pear   | 12                   | 115                 |
85    /// +--------+----------------------+---------------------+
86    /// ```
87    fn join(
88        &self,
89        other: &DataFrame,
90        left_on: impl IntoIterator<Item = impl AsRef<str>>,
91        right_on: impl IntoIterator<Item = impl AsRef<str>>,
92        args: JoinArgs,
93        options: Option<JoinTypeOptions>,
94    ) -> PolarsResult<DataFrame> {
95        let df_left = self.to_df();
96        let selected_left = df_left.select_to_vec(left_on)?;
97        let selected_right = other.select_to_vec(right_on)?;
98
99        let selected_left = selected_left
100            .into_iter()
101            .map(Column::take_materialized_series)
102            .collect::<Vec<_>>();
103        let selected_right = selected_right
104            .into_iter()
105            .map(Column::take_materialized_series)
106            .collect::<Vec<_>>();
107
108        self._join_impl(
109            other,
110            selected_left,
111            selected_right,
112            args,
113            options,
114            true,
115            false,
116        )
117    }
118
119    #[doc(hidden)]
120    #[allow(clippy::too_many_arguments)]
121    #[allow(unused_mut)]
122    fn _join_impl(
123        &self,
124        other: &DataFrame,
125        mut selected_left: Vec<Series>,
126        mut selected_right: Vec<Series>,
127        mut args: JoinArgs,
128        options: Option<JoinTypeOptions>,
129        _check_rechunk: bool,
130        _verbose: bool,
131    ) -> PolarsResult<DataFrame> {
132        let left_df = self.to_df();
133
134        #[cfg(feature = "cross_join")]
135        if let JoinType::Cross = args.how {
136            if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
137                assert!(args.slice.is_none());
138                return fused_cross_filter(
139                    left_df,
140                    other,
141                    args.suffix.clone(),
142                    cross_options,
143                    args.maintain_order,
144                );
145            }
146            return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
147        }
148
149        // Clear literals if a frame is empty. Otherwise we could get an oob
150        fn clear(s: &mut [Series]) {
151            for s in s.iter_mut() {
152                if s.len() == 1 {
153                    *s = s.clear()
154                }
155            }
156        }
157        if left_df.height() == 0 {
158            clear(&mut selected_left);
159        }
160        if other.height() == 0 {
161            clear(&mut selected_right);
162        }
163
164        let should_coalesce = args.should_coalesce();
165        assert_eq!(selected_left.len(), selected_right.len());
166
167        #[cfg(feature = "chunked_ids")]
168        {
169            // a left join create chunked-ids
170            // the others not yet.
171            // TODO! change this to other join types once they support chunked-id joins
172            if _check_rechunk
173                && !(matches!(args.how, JoinType::Left)
174                    || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
175            {
176                let mut left = Cow::Borrowed(left_df);
177                let mut right = Cow::Borrowed(other);
178                if left_df.should_rechunk() {
179                    if _verbose {
180                        eprintln!(
181                            "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
182                            args.how,
183                            left_df.width()
184                        );
185                    }
186
187                    let mut tmp_left = left_df.clone();
188                    tmp_left.rechunk_mut_par();
189                    left = Cow::Owned(tmp_left);
190                }
191                if other.should_rechunk() {
192                    if _verbose {
193                        eprintln!(
194                            "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
195                            args.how,
196                            other.width()
197                        );
198                    }
199                    let mut tmp_right = other.clone();
200                    tmp_right.rechunk_mut_par();
201                    right = Cow::Owned(tmp_right);
202                }
203                return left._join_impl(
204                    &right,
205                    selected_left,
206                    selected_right,
207                    args,
208                    options,
209                    false,
210                    _verbose,
211                );
212            }
213        }
214
215        if let Some((l, r)) = selected_left
216            .iter()
217            .zip(&selected_right)
218            .find(|(l, r)| l.dtype() != r.dtype())
219        {
220            polars_bail!(
221                ComputeError:
222                    format!(
223                        "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
224                        l.name(), l.dtype(), r.name(), r.dtype()
225                    )
226            );
227        };
228
229        #[cfg(feature = "iejoin")]
230        if let JoinType::IEJoin = args.how {
231            let Some(JoinTypeOptions::IEJoin(options)) = options else {
232                unreachable!()
233            };
234            let func = if POOL.current_num_threads() > 1
235                && !left_df.shape_has_zero()
236                && !other.shape_has_zero()
237            {
238                iejoin::iejoin_par
239            } else {
240                iejoin::iejoin
241            };
242            return func(
243                left_df,
244                other,
245                selected_left,
246                selected_right,
247                &options,
248                args.suffix,
249                args.slice,
250            );
251        }
252
253        // Single keys.
254        if selected_left.len() == 1 {
255            let s_left = &selected_left[0];
256            let s_right = &selected_right[0];
257            let drop_names: Option<Vec<PlSmallStr>> =
258                if should_coalesce { None } else { Some(vec![]) };
259            return match args.how {
260                JoinType::Inner => left_df
261                    ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
262                JoinType::Left => dispatch_left_right::left_join_from_series(
263                    self.to_df().clone(),
264                    other,
265                    s_left,
266                    s_right,
267                    args,
268                    _verbose,
269                    drop_names,
270                ),
271                JoinType::Right => dispatch_left_right::right_join_from_series(
272                    self.to_df(),
273                    other.clone(),
274                    s_left,
275                    s_right,
276                    args,
277                    _verbose,
278                    drop_names,
279                ),
280                JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
281                #[cfg(feature = "semi_anti_join")]
282                JoinType::Anti => left_df._semi_anti_join_from_series(
283                    s_left,
284                    s_right,
285                    args.slice,
286                    true,
287                    args.nulls_equal,
288                ),
289                #[cfg(feature = "semi_anti_join")]
290                JoinType::Semi => left_df._semi_anti_join_from_series(
291                    s_left,
292                    s_right,
293                    args.slice,
294                    false,
295                    args.nulls_equal,
296                ),
297                #[cfg(feature = "asof_join")]
298                JoinType::AsOf(options) => match (options.left_by, options.right_by) {
299                    (Some(left_by), Some(right_by)) => left_df._join_asof_by(
300                        other,
301                        s_left,
302                        s_right,
303                        left_by,
304                        right_by,
305                        options.strategy,
306                        options.tolerance.map(|v| v.into_value()),
307                        args.suffix.clone(),
308                        args.slice,
309                        should_coalesce,
310                        options.allow_eq,
311                        options.check_sortedness,
312                    ),
313                    (None, None) => left_df._join_asof(
314                        other,
315                        s_left,
316                        s_right,
317                        options.strategy,
318                        options.tolerance.map(|v| v.into_value()),
319                        args.suffix,
320                        args.slice,
321                        should_coalesce,
322                        options.allow_eq,
323                        options.check_sortedness,
324                    ),
325                    _ => {
326                        panic!("expected by arguments on both sides")
327                    },
328                },
329                #[cfg(feature = "iejoin")]
330                JoinType::IEJoin => {
331                    unreachable!()
332                },
333                JoinType::Cross => {
334                    unreachable!()
335                },
336            };
337        }
338        let (lhs_keys, rhs_keys) = if (left_df.height() == 0 || other.height() == 0)
339            && matches!(&args.how, JoinType::Inner)
340        {
341            // Fast path for empty inner joins.
342            // Return 2 dummies so that we don't row-encode.
343            let a = Series::full_null("".into(), 0, &DataType::Null);
344            (a.clone(), a)
345        } else {
346            // Row encode the keys.
347            (
348                prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
349                prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
350            )
351        };
352
353        let drop_names = if should_coalesce {
354            if args.how == JoinType::Right {
355                selected_left
356                    .iter()
357                    .map(|s| s.name().clone())
358                    .collect::<Vec<_>>()
359            } else {
360                selected_right
361                    .iter()
362                    .map(|s| s.name().clone())
363                    .collect::<Vec<_>>()
364            }
365        } else {
366            vec![]
367        };
368
369        // Multiple keys.
370        match args.how {
371            #[cfg(feature = "asof_join")]
372            JoinType::AsOf(_) => polars_bail!(
373                ComputeError: "asof join not supported for join on multiple keys"
374            ),
375            #[cfg(feature = "iejoin")]
376            JoinType::IEJoin => {
377                unreachable!()
378            },
379            JoinType::Cross => {
380                unreachable!()
381            },
382            JoinType::Full => {
383                let names_left = selected_left
384                    .iter()
385                    .map(|s| s.name().clone())
386                    .collect::<Vec<_>>();
387                args.coalesce = JoinCoalesce::KeepColumns;
388                let suffix = args.suffix.clone();
389                let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
390
391                if should_coalesce {
392                    Ok(_coalesce_full_join(
393                        out?,
394                        names_left.as_slice(),
395                        drop_names.as_slice(),
396                        suffix,
397                        left_df,
398                    ))
399                } else {
400                    out
401                }
402            },
403            JoinType::Inner => left_df._inner_join_from_series(
404                other,
405                &lhs_keys,
406                &rhs_keys,
407                args,
408                _verbose,
409                Some(drop_names),
410            ),
411            JoinType::Left => dispatch_left_right::left_join_from_series(
412                left_df.clone(),
413                other,
414                &lhs_keys,
415                &rhs_keys,
416                args,
417                _verbose,
418                Some(drop_names),
419            ),
420            JoinType::Right => dispatch_left_right::right_join_from_series(
421                left_df,
422                other.clone(),
423                &lhs_keys,
424                &rhs_keys,
425                args,
426                _verbose,
427                Some(drop_names),
428            ),
429            #[cfg(feature = "semi_anti_join")]
430            JoinType::Anti | JoinType::Semi => self._join_impl(
431                other,
432                vec![lhs_keys],
433                vec![rhs_keys],
434                args,
435                options,
436                _check_rechunk,
437                _verbose,
438            ),
439        }
440    }
441
442    /// Perform an inner join on two DataFrames.
443    ///
444    /// # Example
445    ///
446    /// ```
447    /// # use polars_core::prelude::*;
448    /// # use polars_ops::prelude::*;
449    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
450    ///     left.inner_join(right, ["join_column_left"], ["join_column_right"])
451    /// }
452    /// ```
453    fn inner_join(
454        &self,
455        other: &DataFrame,
456        left_on: impl IntoIterator<Item = impl AsRef<str>>,
457        right_on: impl IntoIterator<Item = impl AsRef<str>>,
458    ) -> PolarsResult<DataFrame> {
459        self.join(
460            other,
461            left_on,
462            right_on,
463            JoinArgs::new(JoinType::Inner),
464            None,
465        )
466    }
467
468    /// Perform a left outer join on two DataFrames
469    /// # Example
470    ///
471    /// ```no_run
472    /// # use polars_core::prelude::*;
473    /// # use polars_ops::prelude::*;
474    /// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
475    /// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
476    ///                          "Wavelength nm" => &[480.0, 577.0, 650.0])?;
477    ///
478    /// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
479    /// println!("{:?}", df3);
480    /// # Ok::<(), PolarsError>(())
481    /// ```
482    ///
483    /// Output:
484    ///
485    /// ```text
486    /// shape: (5, 2)
487    /// +-----------------+--------+
488    /// | Wavelength (nm) | Color  |
489    /// | ---             | ---    |
490    /// | f64             | str    |
491    /// +=================+========+
492    /// | 480             | Blue   |
493    /// +-----------------+--------+
494    /// | 650             | Red    |
495    /// +-----------------+--------+
496    /// | 577             | Yellow |
497    /// +-----------------+--------+
498    /// | 1201            | null   |
499    /// +-----------------+--------+
500    /// | 100             | null   |
501    /// +-----------------+--------+
502    /// ```
503    fn left_join(
504        &self,
505        other: &DataFrame,
506        left_on: impl IntoIterator<Item = impl AsRef<str>>,
507        right_on: impl IntoIterator<Item = impl AsRef<str>>,
508    ) -> PolarsResult<DataFrame> {
509        self.join(
510            other,
511            left_on,
512            right_on,
513            JoinArgs::new(JoinType::Left),
514            None,
515        )
516    }
517
518    /// Perform a full outer join on two DataFrames
519    /// # Example
520    ///
521    /// ```
522    /// # use polars_core::prelude::*;
523    /// # use polars_ops::prelude::*;
524    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
525    ///     left.full_join(right, ["join_column_left"], ["join_column_right"])
526    /// }
527    /// ```
528    fn full_join(
529        &self,
530        other: &DataFrame,
531        left_on: impl IntoIterator<Item = impl AsRef<str>>,
532        right_on: impl IntoIterator<Item = impl AsRef<str>>,
533    ) -> PolarsResult<DataFrame> {
534        self.join(
535            other,
536            left_on,
537            right_on,
538            JoinArgs::new(JoinType::Full),
539            None,
540        )
541    }
542}
543
544trait DataFrameJoinOpsPrivate: IntoDf {
545    fn _inner_join_from_series(
546        &self,
547        other: &DataFrame,
548        s_left: &Series,
549        s_right: &Series,
550        args: JoinArgs,
551        verbose: bool,
552        drop_names: Option<Vec<PlSmallStr>>,
553    ) -> PolarsResult<DataFrame> {
554        let left_df = self.to_df();
555        let ((join_tuples_left, join_tuples_right), sorted) =
556            _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
557
558        let mut join_tuples_left = &*join_tuples_left;
559        let mut join_tuples_right = &*join_tuples_right;
560
561        if let Some((offset, len)) = args.slice {
562            join_tuples_left = slice_slice(join_tuples_left, offset, len);
563            join_tuples_right = slice_slice(join_tuples_right, offset, len);
564        }
565
566        let other = if let Some(drop_names) = drop_names {
567            other.drop_many(drop_names)
568        } else {
569            other.drop(s_right.name()).unwrap()
570        };
571
572        let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
573        if sorted {
574            left.set_sorted_flag(IsSorted::Ascending);
575        }
576        let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
577
578        let already_left_sorted = sorted
579            && matches!(
580                args.maintain_order,
581                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
582            );
583        try_raise_keyboard_interrupt();
584        let (df_left, df_right) =
585            if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
586                let mut df = unsafe {
587                    DataFrame::new_unchecked_infer_height(vec![
588                        left.into_series().into(),
589                        right.into_series().into(),
590                    ])
591                };
592
593                let columns = match args.maintain_order {
594                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
595                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
596                    _ => unreachable!(),
597                };
598
599                let options = SortMultipleOptions::new()
600                    .with_order_descending(false)
601                    .with_maintain_order(true);
602
603                df.sort_in_place(columns, options)?;
604
605                let [mut a, b]: [Column; 2] = df.into_columns().try_into().unwrap();
606                if matches!(
607                    args.maintain_order,
608                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
609                ) {
610                    a.set_sorted_flag(IsSorted::Ascending);
611                }
612
613                POOL.join(
614                    // SAFETY: join indices are known to be in bounds
615                    || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
616                    || unsafe { other.take_unchecked(b.idx().unwrap()) },
617                )
618            } else {
619                POOL.join(
620                    // SAFETY: join indices are known to be in bounds
621                    || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
622                    || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
623                )
624            };
625
626        _finish_join(df_left, df_right, args.suffix)
627    }
628}
629
630impl DataFrameJoinOps for DataFrame {}
631impl DataFrameJoinOpsPrivate for DataFrame {}
632
633fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
634    let keys = s
635        .iter()
636        .map(|s| {
637            let phys = s.to_physical_repr();
638            match phys.dtype() {
639                #[cfg(feature = "dtype-f16")]
640                DataType::Float16 => phys.f16().unwrap().to_canonical().into_column(),
641                DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
642                DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
643                _ => phys.into_owned().into_column(),
644            }
645        })
646        .collect::<Vec<_>>();
647
648    if nulls_equal {
649        encode_rows_vertical_par_unordered(&keys)
650    } else {
651        encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
652    }
653}
654pub fn private_left_join_multiple_keys(
655    a: &DataFrame,
656    b: &DataFrame,
657    nulls_equal: bool,
658) -> PolarsResult<LeftJoinIds> {
659    // @scalar-opt
660    let a_cols = a
661        .columns()
662        .iter()
663        .map(|c| c.as_materialized_series().clone())
664        .collect::<Vec<_>>();
665    let b_cols = b
666        .columns()
667        .iter()
668        .map(|c| c.as_materialized_series().clone())
669        .collect::<Vec<_>>();
670
671    let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
672    let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
673    sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
674}