polars_ops/frame/join/
mod.rs

1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10#[cfg(feature = "merge_sorted")]
11mod merge_sorted;
12
13use std::borrow::Cow;
14use std::fmt::{Debug, Display, Formatter};
15use std::hash::Hash;
16
17pub use args::*;
18use arrow::trusted_len::TrustedLen;
19#[cfg(feature = "asof_join")]
20pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
21pub use cross_join::CrossJoin;
22#[cfg(feature = "chunked_ids")]
23use either::Either;
24#[cfg(feature = "chunked_ids")]
25use general::create_chunked_index_mapping;
26pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
27pub use hash_join::*;
28use hashbrown::hash_map::{Entry, RawEntryMut};
29#[cfg(feature = "iejoin")]
30pub use iejoin::{IEJoinOptions, InequalityOperator};
31#[cfg(feature = "merge_sorted")]
32pub use merge_sorted::_merge_sorted_dfs;
33use polars_core::POOL;
34#[allow(unused_imports)]
35use polars_core::chunked_array::ops::row_encode::{
36    encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
37};
38use polars_core::hashing::_HASHMAP_INIT_SIZE;
39use polars_core::prelude::*;
40pub(super) use polars_core::series::IsSorted;
41use polars_core::utils::slice_offsets;
42#[allow(unused_imports)]
43use polars_core::utils::slice_slice;
44use polars_utils::hashing::BytesHash;
45use rayon::prelude::*;
46
47use self::cross_join::fused_cross_filter;
48use super::IntoDf;
49
50pub trait DataFrameJoinOps: IntoDf {
51    /// Generic join method. Can be used to join on multiple columns.
52    ///
53    /// # Example
54    ///
55    /// ```no_run
56    /// # use polars_core::prelude::*;
57    /// # use polars_ops::prelude::*;
58    /// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
59    ///                          "Phosphorus (mg/100g)" => &[11, 22, 12])?;
60    /// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
61    ///                          "Potassium (mg/100g)" => &[107, 358, 115])?;
62    ///
63    /// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
64    /// None)?;
65    /// assert_eq!(df3.shape(), (3, 3));
66    /// println!("{}", df3);
67    /// # Ok::<(), PolarsError>(())
68    /// ```
69    ///
70    /// Output:
71    ///
72    /// ```text
73    /// shape: (3, 3)
74    /// +--------+----------------------+---------------------+
75    /// | Fruit  | Phosphorus (mg/100g) | Potassium (mg/100g) |
76    /// | ---    | ---                  | ---                 |
77    /// | str    | i32                  | i32                 |
78    /// +========+======================+=====================+
79    /// | Apple  | 11                   | 107                 |
80    /// +--------+----------------------+---------------------+
81    /// | Banana | 22                   | 358                 |
82    /// +--------+----------------------+---------------------+
83    /// | Pear   | 12                   | 115                 |
84    /// +--------+----------------------+---------------------+
85    /// ```
86    fn join(
87        &self,
88        other: &DataFrame,
89        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
90        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
91        args: JoinArgs,
92        options: Option<JoinTypeOptions>,
93    ) -> PolarsResult<DataFrame> {
94        let df_left = self.to_df();
95        let selected_left = df_left.select_columns(left_on)?;
96        let selected_right = other.select_columns(right_on)?;
97
98        let selected_left = selected_left
99            .into_iter()
100            .map(Column::take_materialized_series)
101            .collect::<Vec<_>>();
102        let selected_right = selected_right
103            .into_iter()
104            .map(Column::take_materialized_series)
105            .collect::<Vec<_>>();
106
107        self._join_impl(
108            other,
109            selected_left,
110            selected_right,
111            args,
112            options,
113            true,
114            false,
115        )
116    }
117
118    #[doc(hidden)]
119    #[allow(clippy::too_many_arguments)]
120    #[allow(unused_mut)]
121    fn _join_impl(
122        &self,
123        other: &DataFrame,
124        mut selected_left: Vec<Series>,
125        mut selected_right: Vec<Series>,
126        mut args: JoinArgs,
127        options: Option<JoinTypeOptions>,
128        _check_rechunk: bool,
129        _verbose: bool,
130    ) -> PolarsResult<DataFrame> {
131        let left_df = self.to_df();
132
133        #[cfg(feature = "cross_join")]
134        if let JoinType::Cross = args.how {
135            if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
136                assert!(args.slice.is_none());
137                return fused_cross_filter(
138                    left_df,
139                    other,
140                    args.suffix.clone(),
141                    cross_options,
142                    args.maintain_order,
143                );
144            }
145            return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
146        }
147
148        // Clear literals if a frame is empty. Otherwise we could get an oob
149        fn clear(s: &mut [Series]) {
150            for s in s.iter_mut() {
151                if s.len() == 1 {
152                    *s = s.clear()
153                }
154            }
155        }
156        if left_df.is_empty() {
157            clear(&mut selected_left);
158        }
159        if other.is_empty() {
160            clear(&mut selected_right);
161        }
162
163        let should_coalesce = args.should_coalesce();
164        assert_eq!(selected_left.len(), selected_right.len());
165
166        #[cfg(feature = "chunked_ids")]
167        {
168            // a left join create chunked-ids
169            // the others not yet.
170            // TODO! change this to other join types once they support chunked-id joins
171            if _check_rechunk
172                && !(matches!(args.how, JoinType::Left)
173                    || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
174            {
175                let mut left = Cow::Borrowed(left_df);
176                let mut right = Cow::Borrowed(other);
177                if left_df.should_rechunk() {
178                    if _verbose {
179                        eprintln!(
180                            "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
181                            args.how,
182                            left_df.width()
183                        );
184                    }
185
186                    let mut tmp_left = left_df.clone();
187                    tmp_left.as_single_chunk_par();
188                    left = Cow::Owned(tmp_left);
189                }
190                if other.should_rechunk() {
191                    if _verbose {
192                        eprintln!(
193                            "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
194                            args.how,
195                            other.width()
196                        );
197                    }
198                    let mut tmp_right = other.clone();
199                    tmp_right.as_single_chunk_par();
200                    right = Cow::Owned(tmp_right);
201                }
202                return left._join_impl(
203                    &right,
204                    selected_left,
205                    selected_right,
206                    args,
207                    options,
208                    false,
209                    _verbose,
210                );
211            }
212        }
213
214        if let Some((l, r)) = selected_left
215            .iter()
216            .zip(&selected_right)
217            .find(|(l, r)| l.dtype() != r.dtype())
218        {
219            polars_bail!(
220                ComputeError:
221                    format!(
222                        "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
223                        l.name(), l.dtype(), r.name(), r.dtype()
224                    )
225            );
226        };
227
228        #[cfg(feature = "iejoin")]
229        if let JoinType::IEJoin = args.how {
230            let Some(JoinTypeOptions::IEJoin(options)) = options else {
231                unreachable!()
232            };
233            let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
234            {
235                iejoin::iejoin_par
236            } else {
237                iejoin::iejoin
238            };
239            return func(
240                left_df,
241                other,
242                selected_left,
243                selected_right,
244                &options,
245                args.suffix,
246                args.slice,
247            );
248        }
249
250        // Single keys.
251        if selected_left.len() == 1 {
252            let s_left = &selected_left[0];
253            let s_right = &selected_right[0];
254            let drop_names: Option<Vec<PlSmallStr>> =
255                if should_coalesce { None } else { Some(vec![]) };
256            return match args.how {
257                JoinType::Inner => left_df
258                    ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
259                JoinType::Left => dispatch_left_right::left_join_from_series(
260                    self.to_df().clone(),
261                    other,
262                    s_left,
263                    s_right,
264                    args,
265                    _verbose,
266                    drop_names,
267                ),
268                JoinType::Right => dispatch_left_right::right_join_from_series(
269                    self.to_df(),
270                    other.clone(),
271                    s_left,
272                    s_right,
273                    args,
274                    _verbose,
275                    drop_names,
276                ),
277                JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
278                #[cfg(feature = "semi_anti_join")]
279                JoinType::Anti => left_df._semi_anti_join_from_series(
280                    s_left,
281                    s_right,
282                    args.slice,
283                    true,
284                    args.nulls_equal,
285                ),
286                #[cfg(feature = "semi_anti_join")]
287                JoinType::Semi => left_df._semi_anti_join_from_series(
288                    s_left,
289                    s_right,
290                    args.slice,
291                    false,
292                    args.nulls_equal,
293                ),
294                #[cfg(feature = "asof_join")]
295                JoinType::AsOf(options) => match (options.left_by, options.right_by) {
296                    (Some(left_by), Some(right_by)) => left_df._join_asof_by(
297                        other,
298                        s_left,
299                        s_right,
300                        left_by,
301                        right_by,
302                        options.strategy,
303                        options.tolerance.map(|v| v.into_value()),
304                        args.suffix.clone(),
305                        args.slice,
306                        should_coalesce,
307                        options.allow_eq,
308                        options.check_sortedness,
309                    ),
310                    (None, None) => left_df._join_asof(
311                        other,
312                        s_left,
313                        s_right,
314                        options.strategy,
315                        options.tolerance.map(|v| v.into_value()),
316                        args.suffix,
317                        args.slice,
318                        should_coalesce,
319                        options.allow_eq,
320                        options.check_sortedness,
321                    ),
322                    _ => {
323                        panic!("expected by arguments on both sides")
324                    },
325                },
326                #[cfg(feature = "iejoin")]
327                JoinType::IEJoin => {
328                    unreachable!()
329                },
330                JoinType::Cross => {
331                    unreachable!()
332                },
333            };
334        }
335        let (lhs_keys, rhs_keys) =
336            if (left_df.is_empty() || other.is_empty()) && matches!(&args.how, JoinType::Inner) {
337                // Fast path for empty inner joins.
338                // Return 2 dummies so that we don't row-encode.
339                let a = Series::full_null("".into(), 0, &DataType::Null);
340                (a.clone(), a)
341            } else {
342                // Row encode the keys.
343                (
344                    prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
345                    prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
346                )
347            };
348
349        let drop_names = if should_coalesce {
350            if args.how == JoinType::Right {
351                selected_left
352                    .iter()
353                    .map(|s| s.name().clone())
354                    .collect::<Vec<_>>()
355            } else {
356                selected_right
357                    .iter()
358                    .map(|s| s.name().clone())
359                    .collect::<Vec<_>>()
360            }
361        } else {
362            vec![]
363        };
364
365        // Multiple keys.
366        match args.how {
367            #[cfg(feature = "asof_join")]
368            JoinType::AsOf(_) => polars_bail!(
369                ComputeError: "asof join not supported for join on multiple keys"
370            ),
371            #[cfg(feature = "iejoin")]
372            JoinType::IEJoin => {
373                unreachable!()
374            },
375            JoinType::Cross => {
376                unreachable!()
377            },
378            JoinType::Full => {
379                let names_left = selected_left
380                    .iter()
381                    .map(|s| s.name().clone())
382                    .collect::<Vec<_>>();
383                args.coalesce = JoinCoalesce::KeepColumns;
384                let suffix = args.suffix.clone();
385                let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
386
387                if should_coalesce {
388                    Ok(_coalesce_full_join(
389                        out?,
390                        names_left.as_slice(),
391                        drop_names.as_slice(),
392                        suffix,
393                        left_df,
394                    ))
395                } else {
396                    out
397                }
398            },
399            JoinType::Inner => left_df._inner_join_from_series(
400                other,
401                &lhs_keys,
402                &rhs_keys,
403                args,
404                _verbose,
405                Some(drop_names),
406            ),
407            JoinType::Left => dispatch_left_right::left_join_from_series(
408                left_df.clone(),
409                other,
410                &lhs_keys,
411                &rhs_keys,
412                args,
413                _verbose,
414                Some(drop_names),
415            ),
416            JoinType::Right => dispatch_left_right::right_join_from_series(
417                left_df,
418                other.clone(),
419                &lhs_keys,
420                &rhs_keys,
421                args,
422                _verbose,
423                Some(drop_names),
424            ),
425            #[cfg(feature = "semi_anti_join")]
426            JoinType::Anti | JoinType::Semi => self._join_impl(
427                other,
428                vec![lhs_keys],
429                vec![rhs_keys],
430                args,
431                options,
432                _check_rechunk,
433                _verbose,
434            ),
435        }
436    }
437
438    /// Perform an inner join on two DataFrames.
439    ///
440    /// # Example
441    ///
442    /// ```
443    /// # use polars_core::prelude::*;
444    /// # use polars_ops::prelude::*;
445    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
446    ///     left.inner_join(right, ["join_column_left"], ["join_column_right"])
447    /// }
448    /// ```
449    fn inner_join(
450        &self,
451        other: &DataFrame,
452        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
453        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
454    ) -> PolarsResult<DataFrame> {
455        self.join(
456            other,
457            left_on,
458            right_on,
459            JoinArgs::new(JoinType::Inner),
460            None,
461        )
462    }
463
464    /// Perform a left outer join on two DataFrames
465    /// # Example
466    ///
467    /// ```no_run
468    /// # use polars_core::prelude::*;
469    /// # use polars_ops::prelude::*;
470    /// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
471    /// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
472    ///                          "Wavelength nm" => &[480.0, 577.0, 650.0])?;
473    ///
474    /// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
475    /// println!("{:?}", df3);
476    /// # Ok::<(), PolarsError>(())
477    /// ```
478    ///
479    /// Output:
480    ///
481    /// ```text
482    /// shape: (5, 2)
483    /// +-----------------+--------+
484    /// | Wavelength (nm) | Color  |
485    /// | ---             | ---    |
486    /// | f64             | str    |
487    /// +=================+========+
488    /// | 480             | Blue   |
489    /// +-----------------+--------+
490    /// | 650             | Red    |
491    /// +-----------------+--------+
492    /// | 577             | Yellow |
493    /// +-----------------+--------+
494    /// | 1201            | null   |
495    /// +-----------------+--------+
496    /// | 100             | null   |
497    /// +-----------------+--------+
498    /// ```
499    fn left_join(
500        &self,
501        other: &DataFrame,
502        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
503        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
504    ) -> PolarsResult<DataFrame> {
505        self.join(
506            other,
507            left_on,
508            right_on,
509            JoinArgs::new(JoinType::Left),
510            None,
511        )
512    }
513
514    /// Perform a full outer join on two DataFrames
515    /// # Example
516    ///
517    /// ```
518    /// # use polars_core::prelude::*;
519    /// # use polars_ops::prelude::*;
520    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
521    ///     left.full_join(right, ["join_column_left"], ["join_column_right"])
522    /// }
523    /// ```
524    fn full_join(
525        &self,
526        other: &DataFrame,
527        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
528        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
529    ) -> PolarsResult<DataFrame> {
530        self.join(
531            other,
532            left_on,
533            right_on,
534            JoinArgs::new(JoinType::Full),
535            None,
536        )
537    }
538}
539
540trait DataFrameJoinOpsPrivate: IntoDf {
541    fn _inner_join_from_series(
542        &self,
543        other: &DataFrame,
544        s_left: &Series,
545        s_right: &Series,
546        args: JoinArgs,
547        verbose: bool,
548        drop_names: Option<Vec<PlSmallStr>>,
549    ) -> PolarsResult<DataFrame> {
550        let left_df = self.to_df();
551        let ((join_tuples_left, join_tuples_right), sorted) =
552            _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
553
554        let mut join_tuples_left = &*join_tuples_left;
555        let mut join_tuples_right = &*join_tuples_right;
556
557        if let Some((offset, len)) = args.slice {
558            join_tuples_left = slice_slice(join_tuples_left, offset, len);
559            join_tuples_right = slice_slice(join_tuples_right, offset, len);
560        }
561
562        let other = if let Some(drop_names) = drop_names {
563            other.drop_many(drop_names)
564        } else {
565            other.drop(s_right.name()).unwrap()
566        };
567
568        let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
569        if sorted {
570            left.set_sorted_flag(IsSorted::Ascending);
571        }
572        let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
573
574        let already_left_sorted = sorted
575            && matches!(
576                args.maintain_order,
577                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
578            );
579        try_raise_keyboard_interrupt();
580        let (df_left, df_right) =
581            if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
582                let mut df =
583                    DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?;
584
585                let columns = match args.maintain_order {
586                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
587                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
588                    _ => unreachable!(),
589                };
590
591                let options = SortMultipleOptions::new()
592                    .with_order_descending(false)
593                    .with_maintain_order(true);
594
595                df.sort_in_place(columns, options)?;
596
597                let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap();
598                if matches!(
599                    args.maintain_order,
600                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
601                ) {
602                    a.set_sorted_flag(IsSorted::Ascending);
603                }
604
605                POOL.join(
606                    // SAFETY: join indices are known to be in bounds
607                    || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
608                    || unsafe { other.take_unchecked(b.idx().unwrap()) },
609                )
610            } else {
611                POOL.join(
612                    // SAFETY: join indices are known to be in bounds
613                    || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
614                    || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
615                )
616            };
617
618        _finish_join(df_left, df_right, args.suffix)
619    }
620}
621
622impl DataFrameJoinOps for DataFrame {}
623impl DataFrameJoinOpsPrivate for DataFrame {}
624
625fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
626    let keys = s
627        .iter()
628        .map(|s| {
629            let phys = s.to_physical_repr();
630            match phys.dtype() {
631                DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
632                DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
633                _ => phys.into_owned().into_column(),
634            }
635        })
636        .collect::<Vec<_>>();
637
638    if nulls_equal {
639        encode_rows_vertical_par_unordered(&keys)
640    } else {
641        encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
642    }
643}
644pub fn private_left_join_multiple_keys(
645    a: &DataFrame,
646    b: &DataFrame,
647    nulls_equal: bool,
648) -> PolarsResult<LeftJoinIds> {
649    // @scalar-opt
650    let a_cols = a
651        .get_columns()
652        .iter()
653        .map(|c| c.as_materialized_series().clone())
654        .collect::<Vec<_>>();
655    let b_cols = b
656        .get_columns()
657        .iter()
658        .map(|c| c.as_materialized_series().clone())
659        .collect::<Vec<_>>();
660
661    let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
662    let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
663    sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
664}