polars_ops/frame/join/
mod.rs

1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10#[cfg(feature = "merge_sorted")]
11mod merge_sorted;
12
13use std::borrow::Cow;
14use std::fmt::{Debug, Display, Formatter};
15use std::hash::Hash;
16
17pub use args::*;
18use arrow::trusted_len::TrustedLen;
19#[cfg(feature = "asof_join")]
20pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
21pub use cross_join::CrossJoin;
22#[cfg(feature = "chunked_ids")]
23use either::Either;
24#[cfg(feature = "chunked_ids")]
25use general::create_chunked_index_mapping;
26pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
27pub use hash_join::*;
28use hashbrown::hash_map::{Entry, RawEntryMut};
29#[cfg(feature = "iejoin")]
30pub use iejoin::{IEJoinOptions, InequalityOperator};
31#[cfg(feature = "merge_sorted")]
32pub use merge_sorted::_merge_sorted_dfs;
33use polars_core::POOL;
34#[allow(unused_imports)]
35use polars_core::chunked_array::ops::row_encode::{
36    encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
37};
38use polars_core::hashing::_HASHMAP_INIT_SIZE;
39use polars_core::prelude::*;
40pub(super) use polars_core::series::IsSorted;
41use polars_core::utils::slice_offsets;
42#[allow(unused_imports)]
43use polars_core::utils::slice_slice;
44use polars_utils::hashing::BytesHash;
45use rayon::prelude::*;
46
47use self::cross_join::fused_cross_filter;
48use super::IntoDf;
49
50pub trait DataFrameJoinOps: IntoDf {
51    /// Generic join method. Can be used to join on multiple columns.
52    ///
53    /// # Example
54    ///
55    /// ```no_run
56    /// # use polars_core::prelude::*;
57    /// # use polars_ops::prelude::*;
58    /// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
59    ///                          "Phosphorus (mg/100g)" => &[11, 22, 12])?;
60    /// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
61    ///                          "Potassium (mg/100g)" => &[107, 358, 115])?;
62    ///
63    /// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
64    /// None)?;
65    /// assert_eq!(df3.shape(), (3, 3));
66    /// println!("{}", df3);
67    /// # Ok::<(), PolarsError>(())
68    /// ```
69    ///
70    /// Output:
71    ///
72    /// ```text
73    /// shape: (3, 3)
74    /// +--------+----------------------+---------------------+
75    /// | Fruit  | Phosphorus (mg/100g) | Potassium (mg/100g) |
76    /// | ---    | ---                  | ---                 |
77    /// | str    | i32                  | i32                 |
78    /// +========+======================+=====================+
79    /// | Apple  | 11                   | 107                 |
80    /// +--------+----------------------+---------------------+
81    /// | Banana | 22                   | 358                 |
82    /// +--------+----------------------+---------------------+
83    /// | Pear   | 12                   | 115                 |
84    /// +--------+----------------------+---------------------+
85    /// ```
86    fn join(
87        &self,
88        other: &DataFrame,
89        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
90        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
91        args: JoinArgs,
92        options: Option<JoinTypeOptions>,
93    ) -> PolarsResult<DataFrame> {
94        let df_left = self.to_df();
95        let selected_left = df_left.select_columns(left_on)?;
96        let selected_right = other.select_columns(right_on)?;
97
98        let selected_left = selected_left
99            .into_iter()
100            .map(Column::take_materialized_series)
101            .collect::<Vec<_>>();
102        let selected_right = selected_right
103            .into_iter()
104            .map(Column::take_materialized_series)
105            .collect::<Vec<_>>();
106
107        self._join_impl(
108            other,
109            selected_left,
110            selected_right,
111            args,
112            options,
113            true,
114            false,
115        )
116    }
117
118    #[doc(hidden)]
119    #[allow(clippy::too_many_arguments)]
120    #[allow(unused_mut)]
121    fn _join_impl(
122        &self,
123        other: &DataFrame,
124        mut selected_left: Vec<Series>,
125        mut selected_right: Vec<Series>,
126        mut args: JoinArgs,
127        options: Option<JoinTypeOptions>,
128        _check_rechunk: bool,
129        _verbose: bool,
130    ) -> PolarsResult<DataFrame> {
131        let left_df = self.to_df();
132
133        #[cfg(feature = "cross_join")]
134        if let JoinType::Cross = args.how {
135            if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
136                assert!(args.slice.is_none());
137                return fused_cross_filter(left_df, other, args.suffix.clone(), cross_options);
138            }
139            return left_df.cross_join(other, args.suffix.clone(), args.slice);
140        }
141
142        // Clear literals if a frame is empty. Otherwise we could get an oob
143        fn clear(s: &mut [Series]) {
144            for s in s.iter_mut() {
145                if s.len() == 1 {
146                    *s = s.clear()
147                }
148            }
149        }
150        if left_df.is_empty() {
151            clear(&mut selected_left);
152        }
153        if other.is_empty() {
154            clear(&mut selected_right);
155        }
156
157        let should_coalesce = args.should_coalesce();
158        assert_eq!(selected_left.len(), selected_right.len());
159
160        #[cfg(feature = "chunked_ids")]
161        {
162            // a left join create chunked-ids
163            // the others not yet.
164            // TODO! change this to other join types once they support chunked-id joins
165            if _check_rechunk
166                && !(matches!(args.how, JoinType::Left)
167                    || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
168            {
169                let mut left = Cow::Borrowed(left_df);
170                let mut right = Cow::Borrowed(other);
171                if left_df.should_rechunk() {
172                    if _verbose {
173                        eprintln!(
174                            "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
175                            args.how,
176                            left_df.width()
177                        );
178                    }
179
180                    let mut tmp_left = left_df.clone();
181                    tmp_left.as_single_chunk_par();
182                    left = Cow::Owned(tmp_left);
183                }
184                if other.should_rechunk() {
185                    if _verbose {
186                        eprintln!(
187                            "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
188                            args.how,
189                            other.width()
190                        );
191                    }
192                    let mut tmp_right = other.clone();
193                    tmp_right.as_single_chunk_par();
194                    right = Cow::Owned(tmp_right);
195                }
196                return left._join_impl(
197                    &right,
198                    selected_left,
199                    selected_right,
200                    args,
201                    options,
202                    false,
203                    _verbose,
204                );
205            }
206        }
207
208        if let Some((l, r)) = selected_left
209            .iter()
210            .zip(&selected_right)
211            .find(|(l, r)| l.dtype() != r.dtype())
212        {
213            polars_bail!(
214                ComputeError:
215                    format!(
216                        "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
217                        l.name(), l.dtype(), r.name(), r.dtype()
218                    )
219            );
220        };
221
222        #[cfg(feature = "iejoin")]
223        if let JoinType::IEJoin = args.how {
224            let Some(JoinTypeOptions::IEJoin(options)) = options else {
225                unreachable!()
226            };
227            let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
228            {
229                iejoin::iejoin_par
230            } else {
231                iejoin::iejoin
232            };
233            return func(
234                left_df,
235                other,
236                selected_left,
237                selected_right,
238                &options,
239                args.suffix,
240                args.slice,
241            );
242        }
243
244        // Single keys.
245        if selected_left.len() == 1 {
246            let s_left = &selected_left[0];
247            let s_right = &selected_right[0];
248            let drop_names: Option<Vec<PlSmallStr>> =
249                if should_coalesce { None } else { Some(vec![]) };
250            return match args.how {
251                JoinType::Inner => left_df
252                    ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
253                JoinType::Left => dispatch_left_right::left_join_from_series(
254                    self.to_df().clone(),
255                    other,
256                    s_left,
257                    s_right,
258                    args,
259                    _verbose,
260                    drop_names,
261                ),
262                JoinType::Right => dispatch_left_right::right_join_from_series(
263                    self.to_df(),
264                    other.clone(),
265                    s_left,
266                    s_right,
267                    args,
268                    _verbose,
269                    drop_names,
270                ),
271                JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
272                #[cfg(feature = "semi_anti_join")]
273                JoinType::Anti => left_df._semi_anti_join_from_series(
274                    s_left,
275                    s_right,
276                    args.slice,
277                    true,
278                    args.nulls_equal,
279                ),
280                #[cfg(feature = "semi_anti_join")]
281                JoinType::Semi => left_df._semi_anti_join_from_series(
282                    s_left,
283                    s_right,
284                    args.slice,
285                    false,
286                    args.nulls_equal,
287                ),
288                #[cfg(feature = "asof_join")]
289                JoinType::AsOf(options) => match (options.left_by, options.right_by) {
290                    (Some(left_by), Some(right_by)) => left_df._join_asof_by(
291                        other,
292                        s_left,
293                        s_right,
294                        left_by,
295                        right_by,
296                        options.strategy,
297                        options.tolerance.map(|v| v.into_value()),
298                        args.suffix.clone(),
299                        args.slice,
300                        should_coalesce,
301                        options.allow_eq,
302                        options.check_sortedness,
303                    ),
304                    (None, None) => left_df._join_asof(
305                        other,
306                        s_left,
307                        s_right,
308                        options.strategy,
309                        options.tolerance.map(|v| v.into_value()),
310                        args.suffix,
311                        args.slice,
312                        should_coalesce,
313                        options.allow_eq,
314                        options.check_sortedness,
315                    ),
316                    _ => {
317                        panic!("expected by arguments on both sides")
318                    },
319                },
320                #[cfg(feature = "iejoin")]
321                JoinType::IEJoin => {
322                    unreachable!()
323                },
324                JoinType::Cross => {
325                    unreachable!()
326                },
327            };
328        }
329        let (lhs_keys, rhs_keys) =
330            if (left_df.is_empty() || other.is_empty()) && matches!(&args.how, JoinType::Inner) {
331                // Fast path for empty inner joins.
332                // Return 2 dummies so that we don't row-encode.
333                let a = Series::full_null("".into(), 0, &DataType::Null);
334                (a.clone(), a)
335            } else {
336                // Row encode the keys.
337                (
338                    prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
339                    prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
340                )
341            };
342
343        let drop_names = if should_coalesce {
344            if args.how == JoinType::Right {
345                selected_left
346                    .iter()
347                    .map(|s| s.name().clone())
348                    .collect::<Vec<_>>()
349            } else {
350                selected_right
351                    .iter()
352                    .map(|s| s.name().clone())
353                    .collect::<Vec<_>>()
354            }
355        } else {
356            vec![]
357        };
358
359        // Multiple keys.
360        match args.how {
361            #[cfg(feature = "asof_join")]
362            JoinType::AsOf(_) => polars_bail!(
363                ComputeError: "asof join not supported for join on multiple keys"
364            ),
365            #[cfg(feature = "iejoin")]
366            JoinType::IEJoin => {
367                unreachable!()
368            },
369            JoinType::Cross => {
370                unreachable!()
371            },
372            JoinType::Full => {
373                let names_left = selected_left
374                    .iter()
375                    .map(|s| s.name().clone())
376                    .collect::<Vec<_>>();
377                args.coalesce = JoinCoalesce::KeepColumns;
378                let suffix = args.suffix.clone();
379                let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
380
381                if should_coalesce {
382                    Ok(_coalesce_full_join(
383                        out?,
384                        names_left.as_slice(),
385                        drop_names.as_slice(),
386                        suffix,
387                        left_df,
388                    ))
389                } else {
390                    out
391                }
392            },
393            JoinType::Inner => left_df._inner_join_from_series(
394                other,
395                &lhs_keys,
396                &rhs_keys,
397                args,
398                _verbose,
399                Some(drop_names),
400            ),
401            JoinType::Left => dispatch_left_right::left_join_from_series(
402                left_df.clone(),
403                other,
404                &lhs_keys,
405                &rhs_keys,
406                args,
407                _verbose,
408                Some(drop_names),
409            ),
410            JoinType::Right => dispatch_left_right::right_join_from_series(
411                left_df,
412                other.clone(),
413                &lhs_keys,
414                &rhs_keys,
415                args,
416                _verbose,
417                Some(drop_names),
418            ),
419            #[cfg(feature = "semi_anti_join")]
420            JoinType::Anti | JoinType::Semi => self._join_impl(
421                other,
422                vec![lhs_keys],
423                vec![rhs_keys],
424                args,
425                options,
426                _check_rechunk,
427                _verbose,
428            ),
429        }
430    }
431
432    /// Perform an inner join on two DataFrames.
433    ///
434    /// # Example
435    ///
436    /// ```
437    /// # use polars_core::prelude::*;
438    /// # use polars_ops::prelude::*;
439    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
440    ///     left.inner_join(right, ["join_column_left"], ["join_column_right"])
441    /// }
442    /// ```
443    fn inner_join(
444        &self,
445        other: &DataFrame,
446        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
447        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
448    ) -> PolarsResult<DataFrame> {
449        self.join(
450            other,
451            left_on,
452            right_on,
453            JoinArgs::new(JoinType::Inner),
454            None,
455        )
456    }
457
458    /// Perform a left outer join on two DataFrames
459    /// # Example
460    ///
461    /// ```no_run
462    /// # use polars_core::prelude::*;
463    /// # use polars_ops::prelude::*;
464    /// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
465    /// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
466    ///                          "Wavelength nm" => &[480.0, 577.0, 650.0])?;
467    ///
468    /// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
469    /// println!("{:?}", df3);
470    /// # Ok::<(), PolarsError>(())
471    /// ```
472    ///
473    /// Output:
474    ///
475    /// ```text
476    /// shape: (5, 2)
477    /// +-----------------+--------+
478    /// | Wavelength (nm) | Color  |
479    /// | ---             | ---    |
480    /// | f64             | str    |
481    /// +=================+========+
482    /// | 480             | Blue   |
483    /// +-----------------+--------+
484    /// | 650             | Red    |
485    /// +-----------------+--------+
486    /// | 577             | Yellow |
487    /// +-----------------+--------+
488    /// | 1201            | null   |
489    /// +-----------------+--------+
490    /// | 100             | null   |
491    /// +-----------------+--------+
492    /// ```
493    fn left_join(
494        &self,
495        other: &DataFrame,
496        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
497        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
498    ) -> PolarsResult<DataFrame> {
499        self.join(
500            other,
501            left_on,
502            right_on,
503            JoinArgs::new(JoinType::Left),
504            None,
505        )
506    }
507
508    /// Perform a full outer join on two DataFrames
509    /// # Example
510    ///
511    /// ```
512    /// # use polars_core::prelude::*;
513    /// # use polars_ops::prelude::*;
514    /// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
515    ///     left.full_join(right, ["join_column_left"], ["join_column_right"])
516    /// }
517    /// ```
518    fn full_join(
519        &self,
520        other: &DataFrame,
521        left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
522        right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
523    ) -> PolarsResult<DataFrame> {
524        self.join(
525            other,
526            left_on,
527            right_on,
528            JoinArgs::new(JoinType::Full),
529            None,
530        )
531    }
532}
533
534trait DataFrameJoinOpsPrivate: IntoDf {
535    fn _inner_join_from_series(
536        &self,
537        other: &DataFrame,
538        s_left: &Series,
539        s_right: &Series,
540        args: JoinArgs,
541        verbose: bool,
542        drop_names: Option<Vec<PlSmallStr>>,
543    ) -> PolarsResult<DataFrame> {
544        let left_df = self.to_df();
545        let ((join_tuples_left, join_tuples_right), sorted) =
546            _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
547
548        let mut join_tuples_left = &*join_tuples_left;
549        let mut join_tuples_right = &*join_tuples_right;
550
551        if let Some((offset, len)) = args.slice {
552            join_tuples_left = slice_slice(join_tuples_left, offset, len);
553            join_tuples_right = slice_slice(join_tuples_right, offset, len);
554        }
555
556        let other = if let Some(drop_names) = drop_names {
557            other.drop_many(drop_names)
558        } else {
559            other.drop(s_right.name()).unwrap()
560        };
561
562        let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
563        if sorted {
564            left.set_sorted_flag(IsSorted::Ascending);
565        }
566        let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
567
568        let already_left_sorted = sorted
569            && matches!(
570                args.maintain_order,
571                MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
572            );
573        try_raise_keyboard_interrupt();
574        let (df_left, df_right) =
575            if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
576                let mut df =
577                    DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?;
578
579                let columns = match args.maintain_order {
580                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
581                    MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
582                    _ => unreachable!(),
583                };
584
585                let options = SortMultipleOptions::new()
586                    .with_order_descending(false)
587                    .with_maintain_order(true);
588
589                df.sort_in_place(columns, options)?;
590
591                let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap();
592                if matches!(
593                    args.maintain_order,
594                    MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
595                ) {
596                    a.set_sorted_flag(IsSorted::Ascending);
597                }
598
599                POOL.join(
600                    // SAFETY: join indices are known to be in bounds
601                    || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
602                    || unsafe { other.take_unchecked(b.idx().unwrap()) },
603                )
604            } else {
605                POOL.join(
606                    // SAFETY: join indices are known to be in bounds
607                    || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
608                    || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
609                )
610            };
611
612        _finish_join(df_left, df_right, args.suffix)
613    }
614}
615
616impl DataFrameJoinOps for DataFrame {}
617impl DataFrameJoinOpsPrivate for DataFrame {}
618
619fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
620    let keys = s
621        .iter()
622        .map(|s| {
623            let phys = s.to_physical_repr();
624            match phys.dtype() {
625                DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
626                DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
627                _ => phys.into_owned().into_column(),
628            }
629        })
630        .collect::<Vec<_>>();
631
632    if nulls_equal {
633        encode_rows_vertical_par_unordered(&keys)
634    } else {
635        encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
636    }
637}
638pub fn private_left_join_multiple_keys(
639    a: &DataFrame,
640    b: &DataFrame,
641    nulls_equal: bool,
642) -> PolarsResult<LeftJoinIds> {
643    // @scalar-opt
644    let a_cols = a
645        .get_columns()
646        .iter()
647        .map(|c| c.as_materialized_series().clone())
648        .collect::<Vec<_>>();
649    let b_cols = b
650        .get_columns()
651        .iter()
652        .map(|c| c.as_materialized_series().clone())
653        .collect::<Vec<_>>();
654
655    let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
656    let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
657    sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
658}