1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10pub mod merge_join;
11#[cfg(feature = "merge_sorted")]
12mod merge_sorted;
13
14use std::borrow::Cow;
15use std::fmt::{Debug, Display, Formatter};
16use std::hash::Hash;
17
18pub use args::*;
19use arrow::trusted_len::TrustedLen;
20#[cfg(feature = "asof_join")]
21pub use asof::{
22 _check_asof_columns, _join_asof_dispatch, AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy,
23};
24pub use cross_join::CrossJoin;
25#[cfg(feature = "chunked_ids")]
26use either::Either;
27#[cfg(feature = "chunked_ids")]
28use general::create_chunked_index_mapping;
29pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
30pub use hash_join::*;
31use hashbrown::hash_map::{Entry, RawEntryMut};
32#[cfg(feature = "iejoin")]
33pub use iejoin::{IEJoinOptions, InequalityOperator};
34#[cfg(feature = "merge_sorted")]
35pub use merge_sorted::_merge_sorted_dfs;
36use polars_core::POOL;
37#[allow(unused_imports)]
38use polars_core::chunked_array::ops::row_encode::{
39 encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
40};
41use polars_core::datatypes::DataType;
42use polars_core::hashing::_HASHMAP_INIT_SIZE;
43use polars_core::prelude::*;
44pub(super) use polars_core::series::IsSorted;
45use polars_core::utils::slice_offsets;
46#[allow(unused_imports)]
47use polars_core::utils::slice_slice;
48use polars_utils::hashing::BytesHash;
49use rayon::prelude::*;
50
51use self::cross_join::fused_cross_filter;
52use super::IntoDf;
53
54pub trait DataFrameJoinOps: IntoDf {
55 fn join(
91 &self,
92 other: &DataFrame,
93 left_on: impl IntoIterator<Item = impl AsRef<str>>,
94 right_on: impl IntoIterator<Item = impl AsRef<str>>,
95 args: JoinArgs,
96 options: Option<JoinTypeOptions>,
97 ) -> PolarsResult<DataFrame> {
98 let df_left = self.to_df();
99 let selected_left = df_left.select_to_vec(left_on)?;
100 let selected_right = other.select_to_vec(right_on)?;
101
102 let selected_left = selected_left
103 .into_iter()
104 .map(Column::take_materialized_series)
105 .collect::<Vec<_>>();
106 let selected_right = selected_right
107 .into_iter()
108 .map(Column::take_materialized_series)
109 .collect::<Vec<_>>();
110
111 self._join_impl(
112 other,
113 selected_left,
114 selected_right,
115 args,
116 options,
117 true,
118 false,
119 )
120 }
121
122 #[doc(hidden)]
123 #[allow(clippy::too_many_arguments)]
124 #[allow(unused_mut)]
125 fn _join_impl(
126 &self,
127 other: &DataFrame,
128 mut selected_left: Vec<Series>,
129 mut selected_right: Vec<Series>,
130 mut args: JoinArgs,
131 options: Option<JoinTypeOptions>,
132 _check_rechunk: bool,
133 _verbose: bool,
134 ) -> PolarsResult<DataFrame> {
135 let left_df = self.to_df();
136
137 #[cfg(feature = "cross_join")]
138 if let JoinType::Cross = args.how {
139 if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
140 assert!(args.slice.is_none());
141 return fused_cross_filter(
142 left_df,
143 other,
144 args.suffix.clone(),
145 cross_options,
146 args.maintain_order,
147 );
148 }
149 return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
150 }
151
152 fn clear(s: &mut [Series]) {
154 for s in s.iter_mut() {
155 if s.len() == 1 {
156 *s = s.clear()
157 }
158 }
159 }
160 if left_df.height() == 0 {
161 clear(&mut selected_left);
162 }
163 if other.height() == 0 {
164 clear(&mut selected_right);
165 }
166
167 let should_coalesce = args.should_coalesce();
168 assert_eq!(selected_left.len(), selected_right.len());
169
170 #[cfg(feature = "chunked_ids")]
171 {
172 if _check_rechunk
176 && !(matches!(args.how, JoinType::Left)
177 || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
178 {
179 let mut left = Cow::Borrowed(left_df);
180 let mut right = Cow::Borrowed(other);
181 if left_df.should_rechunk() {
182 if _verbose {
183 eprintln!(
184 "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
185 args.how,
186 left_df.width()
187 );
188 }
189
190 let mut tmp_left = left_df.clone();
191 tmp_left.rechunk_mut_par();
192 left = Cow::Owned(tmp_left);
193 }
194 if other.should_rechunk() {
195 if _verbose {
196 eprintln!(
197 "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
198 args.how,
199 other.width()
200 );
201 }
202 let mut tmp_right = other.clone();
203 tmp_right.rechunk_mut_par();
204 right = Cow::Owned(tmp_right);
205 }
206 return left._join_impl(
207 &right,
208 selected_left,
209 selected_right,
210 args,
211 options,
212 false,
213 _verbose,
214 );
215 }
216 }
217
218 if let Some((l, r)) = selected_left
219 .iter()
220 .zip(&selected_right)
221 .find(|(l, r)| l.dtype() != r.dtype())
222 {
223 polars_bail!(
224 ComputeError:
225 "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
226 l.name(), l.dtype().pretty_format(), r.name(), r.dtype().pretty_format()
227 );
228 };
229
230 #[cfg(feature = "iejoin")]
231 if let JoinType::IEJoin = args.how {
232 let Some(JoinTypeOptions::IEJoin(options)) = options else {
233 unreachable!()
234 };
235 let func = if POOL.current_num_threads() > 1
236 && !left_df.shape_has_zero()
237 && !other.shape_has_zero()
238 {
239 iejoin::iejoin_par
240 } else {
241 iejoin::iejoin
242 };
243 return func(
244 left_df,
245 other,
246 selected_left,
247 selected_right,
248 &options,
249 args.suffix,
250 args.slice,
251 );
252 }
253
254 if selected_left.len() == 1 {
256 let s_left = &selected_left[0];
257 let s_right = &selected_right[0];
258 let drop_names: Option<Vec<PlSmallStr>> =
259 if should_coalesce { None } else { Some(vec![]) };
260 return match args.how {
261 JoinType::Inner => left_df
262 ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
263 JoinType::Left => dispatch_left_right::left_join_from_series(
264 self.to_df().clone(),
265 other,
266 s_left,
267 s_right,
268 args,
269 _verbose,
270 drop_names,
271 ),
272 JoinType::Right => dispatch_left_right::right_join_from_series(
273 self.to_df(),
274 other.clone(),
275 s_left,
276 s_right,
277 args,
278 _verbose,
279 drop_names,
280 ),
281 JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
282 #[cfg(feature = "semi_anti_join")]
283 JoinType::Anti => left_df._semi_anti_join_from_series(
284 s_left,
285 s_right,
286 args.slice,
287 true,
288 args.nulls_equal,
289 ),
290 #[cfg(feature = "semi_anti_join")]
291 JoinType::Semi => left_df._semi_anti_join_from_series(
292 s_left,
293 s_right,
294 args.slice,
295 false,
296 args.nulls_equal,
297 ),
298 #[cfg(feature = "asof_join")]
299 JoinType::AsOf(options) => match (options.left_by, options.right_by) {
300 (Some(left_by), Some(right_by)) => left_df._join_asof_by(
301 other,
302 s_left,
303 s_right,
304 left_by,
305 right_by,
306 options.strategy,
307 options.tolerance.map(|v| v.into_value()),
308 args.suffix.clone(),
309 args.slice,
310 should_coalesce,
311 options.allow_eq,
312 options.check_sortedness,
313 ),
314 (None, None) => left_df._join_asof(
315 other,
316 s_left,
317 s_right,
318 options.strategy,
319 options.tolerance.map(|v| v.into_value()),
320 args.suffix,
321 args.slice,
322 should_coalesce,
323 options.allow_eq,
324 options.check_sortedness,
325 ),
326 _ => {
327 panic!("expected by arguments on both sides")
328 },
329 },
330 #[cfg(feature = "iejoin")]
331 JoinType::IEJoin | JoinType::Range => {
332 unreachable!()
333 },
334 JoinType::Cross => {
335 unreachable!()
336 },
337 };
338 }
339 let (lhs_keys, rhs_keys) = if (left_df.height() == 0 || other.height() == 0)
340 && matches!(&args.how, JoinType::Inner)
341 {
342 let a = Series::full_null("".into(), 0, &DataType::Null);
345 (a.clone(), a)
346 } else {
347 (
349 prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
350 prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
351 )
352 };
353
354 let drop_names = if should_coalesce {
355 if args.how == JoinType::Right {
356 selected_left
357 .iter()
358 .map(|s| s.name().clone())
359 .collect::<Vec<_>>()
360 } else {
361 selected_right
362 .iter()
363 .map(|s| s.name().clone())
364 .collect::<Vec<_>>()
365 }
366 } else {
367 vec![]
368 };
369
370 match args.how {
372 #[cfg(feature = "asof_join")]
373 JoinType::AsOf(_) => polars_bail!(
374 ComputeError: "asof join not supported for join on multiple keys"
375 ),
376 #[cfg(feature = "iejoin")]
377 JoinType::IEJoin | JoinType::Range => {
378 unreachable!()
379 },
380 JoinType::Cross => {
381 unreachable!()
382 },
383 JoinType::Full => {
384 let names_left = selected_left
385 .iter()
386 .map(|s| s.name().clone())
387 .collect::<Vec<_>>();
388 args.coalesce = JoinCoalesce::KeepColumns;
389 let suffix = args.suffix.clone();
390 let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
391
392 if should_coalesce {
393 Ok(_coalesce_full_join(
394 out?,
395 names_left.as_slice(),
396 drop_names.as_slice(),
397 suffix,
398 left_df,
399 ))
400 } else {
401 out
402 }
403 },
404 JoinType::Inner => left_df._inner_join_from_series(
405 other,
406 &lhs_keys,
407 &rhs_keys,
408 args,
409 _verbose,
410 Some(drop_names),
411 ),
412 JoinType::Left => dispatch_left_right::left_join_from_series(
413 left_df.clone(),
414 other,
415 &lhs_keys,
416 &rhs_keys,
417 args,
418 _verbose,
419 Some(drop_names),
420 ),
421 JoinType::Right => dispatch_left_right::right_join_from_series(
422 left_df,
423 other.clone(),
424 &lhs_keys,
425 &rhs_keys,
426 args,
427 _verbose,
428 Some(drop_names),
429 ),
430 #[cfg(feature = "semi_anti_join")]
431 JoinType::Anti | JoinType::Semi => self._join_impl(
432 other,
433 vec![lhs_keys],
434 vec![rhs_keys],
435 args,
436 options,
437 _check_rechunk,
438 _verbose,
439 ),
440 }
441 }
442
443 fn inner_join(
455 &self,
456 other: &DataFrame,
457 left_on: impl IntoIterator<Item = impl AsRef<str>>,
458 right_on: impl IntoIterator<Item = impl AsRef<str>>,
459 ) -> PolarsResult<DataFrame> {
460 self.join(
461 other,
462 left_on,
463 right_on,
464 JoinArgs::new(JoinType::Inner),
465 None,
466 )
467 }
468
469 fn left_join(
505 &self,
506 other: &DataFrame,
507 left_on: impl IntoIterator<Item = impl AsRef<str>>,
508 right_on: impl IntoIterator<Item = impl AsRef<str>>,
509 ) -> PolarsResult<DataFrame> {
510 self.join(
511 other,
512 left_on,
513 right_on,
514 JoinArgs::new(JoinType::Left),
515 None,
516 )
517 }
518
519 fn full_join(
530 &self,
531 other: &DataFrame,
532 left_on: impl IntoIterator<Item = impl AsRef<str>>,
533 right_on: impl IntoIterator<Item = impl AsRef<str>>,
534 ) -> PolarsResult<DataFrame> {
535 self.join(
536 other,
537 left_on,
538 right_on,
539 JoinArgs::new(JoinType::Full),
540 None,
541 )
542 }
543}
544
545trait DataFrameJoinOpsPrivate: IntoDf {
546 fn _inner_join_from_series(
547 &self,
548 other: &DataFrame,
549 s_left: &Series,
550 s_right: &Series,
551 args: JoinArgs,
552 verbose: bool,
553 drop_names: Option<Vec<PlSmallStr>>,
554 ) -> PolarsResult<DataFrame> {
555 let left_df = self.to_df();
556 let ((join_tuples_left, join_tuples_right), sorted) =
557 _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
558
559 let mut join_tuples_left = &*join_tuples_left;
560 let mut join_tuples_right = &*join_tuples_right;
561
562 if let Some((offset, len)) = args.slice {
563 join_tuples_left = slice_slice(join_tuples_left, offset, len);
564 join_tuples_right = slice_slice(join_tuples_right, offset, len);
565 }
566
567 let other = if let Some(drop_names) = drop_names {
568 other.drop_many(drop_names)
569 } else {
570 other.drop(s_right.name()).unwrap()
571 };
572
573 let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
574 if sorted {
575 left.set_sorted_flag(IsSorted::Ascending);
576 }
577 let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
578
579 let already_left_sorted = sorted
580 && matches!(
581 args.maintain_order,
582 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
583 );
584 try_raise_keyboard_interrupt();
585 let (df_left, df_right) =
586 if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
587 let mut df = unsafe {
588 DataFrame::new_unchecked_infer_height(vec![
589 left.into_series().into(),
590 right.into_series().into(),
591 ])
592 };
593
594 let columns = match args.maintain_order {
595 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
596 MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
597 _ => unreachable!(),
598 };
599
600 let options = SortMultipleOptions::new()
601 .with_order_descending(false)
602 .with_maintain_order(true);
603
604 df.sort_in_place(columns, options)?;
605
606 let [mut a, b]: [Column; 2] = df.into_columns().try_into().unwrap();
607 if matches!(
608 args.maintain_order,
609 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
610 ) {
611 a.set_sorted_flag(IsSorted::Ascending);
612 }
613
614 POOL.join(
615 || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
617 || unsafe { other.take_unchecked(b.idx().unwrap()) },
618 )
619 } else {
620 POOL.join(
621 || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
623 || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
624 )
625 };
626
627 _finish_join(df_left, df_right, args.suffix)
628 }
629}
630
631impl DataFrameJoinOps for DataFrame {}
632impl DataFrameJoinOpsPrivate for DataFrame {}
633
634fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
635 let keys = s
636 .iter()
637 .map(|s| {
638 let phys = s.to_physical_repr();
639 match phys.dtype() {
640 #[cfg(feature = "dtype-f16")]
641 DataType::Float16 => phys.f16().unwrap().to_canonical().into_column(),
642 DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
643 DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
644 _ => phys.into_owned().into_column(),
645 }
646 })
647 .collect::<Vec<_>>();
648
649 if nulls_equal {
650 encode_rows_vertical_par_unordered(&keys)
651 } else {
652 encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
653 }
654}
655pub fn private_left_join_multiple_keys(
656 a: &DataFrame,
657 b: &DataFrame,
658 nulls_equal: bool,
659) -> PolarsResult<LeftJoinIds> {
660 let a_cols = a
662 .columns()
663 .iter()
664 .map(|c| c.as_materialized_series().clone())
665 .collect::<Vec<_>>();
666 let b_cols = b
667 .columns()
668 .iter()
669 .map(|c| c.as_materialized_series().clone())
670 .collect::<Vec<_>>();
671
672 let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
673 let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
674 sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
675}