1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10pub mod merge_join;
11#[cfg(feature = "merge_sorted")]
12mod merge_sorted;
13
14use std::borrow::Cow;
15use std::fmt::{Debug, Display, Formatter};
16use std::hash::Hash;
17
18pub use args::*;
19use arrow::trusted_len::TrustedLen;
20#[cfg(feature = "asof_join")]
21pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
22pub use cross_join::CrossJoin;
23#[cfg(feature = "chunked_ids")]
24use either::Either;
25#[cfg(feature = "chunked_ids")]
26use general::create_chunked_index_mapping;
27pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
28pub use hash_join::*;
29use hashbrown::hash_map::{Entry, RawEntryMut};
30#[cfg(feature = "iejoin")]
31pub use iejoin::{IEJoinOptions, InequalityOperator};
32#[cfg(feature = "merge_sorted")]
33pub use merge_sorted::_merge_sorted_dfs;
34use polars_core::POOL;
35#[allow(unused_imports)]
36use polars_core::chunked_array::ops::row_encode::{
37 encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
38};
39use polars_core::hashing::_HASHMAP_INIT_SIZE;
40use polars_core::prelude::*;
41pub(super) use polars_core::series::IsSorted;
42use polars_core::utils::slice_offsets;
43#[allow(unused_imports)]
44use polars_core::utils::slice_slice;
45use polars_utils::hashing::BytesHash;
46use rayon::prelude::*;
47
48use self::cross_join::fused_cross_filter;
49use super::IntoDf;
50
51pub trait DataFrameJoinOps: IntoDf {
52 fn join(
88 &self,
89 other: &DataFrame,
90 left_on: impl IntoIterator<Item = impl AsRef<str>>,
91 right_on: impl IntoIterator<Item = impl AsRef<str>>,
92 args: JoinArgs,
93 options: Option<JoinTypeOptions>,
94 ) -> PolarsResult<DataFrame> {
95 let df_left = self.to_df();
96 let selected_left = df_left.select_to_vec(left_on)?;
97 let selected_right = other.select_to_vec(right_on)?;
98
99 let selected_left = selected_left
100 .into_iter()
101 .map(Column::take_materialized_series)
102 .collect::<Vec<_>>();
103 let selected_right = selected_right
104 .into_iter()
105 .map(Column::take_materialized_series)
106 .collect::<Vec<_>>();
107
108 self._join_impl(
109 other,
110 selected_left,
111 selected_right,
112 args,
113 options,
114 true,
115 false,
116 )
117 }
118
119 #[doc(hidden)]
120 #[allow(clippy::too_many_arguments)]
121 #[allow(unused_mut)]
122 fn _join_impl(
123 &self,
124 other: &DataFrame,
125 mut selected_left: Vec<Series>,
126 mut selected_right: Vec<Series>,
127 mut args: JoinArgs,
128 options: Option<JoinTypeOptions>,
129 _check_rechunk: bool,
130 _verbose: bool,
131 ) -> PolarsResult<DataFrame> {
132 let left_df = self.to_df();
133
134 #[cfg(feature = "cross_join")]
135 if let JoinType::Cross = args.how {
136 if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
137 assert!(args.slice.is_none());
138 return fused_cross_filter(
139 left_df,
140 other,
141 args.suffix.clone(),
142 cross_options,
143 args.maintain_order,
144 );
145 }
146 return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
147 }
148
149 fn clear(s: &mut [Series]) {
151 for s in s.iter_mut() {
152 if s.len() == 1 {
153 *s = s.clear()
154 }
155 }
156 }
157 if left_df.height() == 0 {
158 clear(&mut selected_left);
159 }
160 if other.height() == 0 {
161 clear(&mut selected_right);
162 }
163
164 let should_coalesce = args.should_coalesce();
165 assert_eq!(selected_left.len(), selected_right.len());
166
167 #[cfg(feature = "chunked_ids")]
168 {
169 if _check_rechunk
173 && !(matches!(args.how, JoinType::Left)
174 || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
175 {
176 let mut left = Cow::Borrowed(left_df);
177 let mut right = Cow::Borrowed(other);
178 if left_df.should_rechunk() {
179 if _verbose {
180 eprintln!(
181 "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
182 args.how,
183 left_df.width()
184 );
185 }
186
187 let mut tmp_left = left_df.clone();
188 tmp_left.rechunk_mut_par();
189 left = Cow::Owned(tmp_left);
190 }
191 if other.should_rechunk() {
192 if _verbose {
193 eprintln!(
194 "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
195 args.how,
196 other.width()
197 );
198 }
199 let mut tmp_right = other.clone();
200 tmp_right.rechunk_mut_par();
201 right = Cow::Owned(tmp_right);
202 }
203 return left._join_impl(
204 &right,
205 selected_left,
206 selected_right,
207 args,
208 options,
209 false,
210 _verbose,
211 );
212 }
213 }
214
215 if let Some((l, r)) = selected_left
216 .iter()
217 .zip(&selected_right)
218 .find(|(l, r)| l.dtype() != r.dtype())
219 {
220 polars_bail!(
221 ComputeError:
222 format!(
223 "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
224 l.name(), l.dtype(), r.name(), r.dtype()
225 )
226 );
227 };
228
229 #[cfg(feature = "iejoin")]
230 if let JoinType::IEJoin = args.how {
231 let Some(JoinTypeOptions::IEJoin(options)) = options else {
232 unreachable!()
233 };
234 let func = if POOL.current_num_threads() > 1
235 && !left_df.shape_has_zero()
236 && !other.shape_has_zero()
237 {
238 iejoin::iejoin_par
239 } else {
240 iejoin::iejoin
241 };
242 return func(
243 left_df,
244 other,
245 selected_left,
246 selected_right,
247 &options,
248 args.suffix,
249 args.slice,
250 );
251 }
252
253 if selected_left.len() == 1 {
255 let s_left = &selected_left[0];
256 let s_right = &selected_right[0];
257 let drop_names: Option<Vec<PlSmallStr>> =
258 if should_coalesce { None } else { Some(vec![]) };
259 return match args.how {
260 JoinType::Inner => left_df
261 ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
262 JoinType::Left => dispatch_left_right::left_join_from_series(
263 self.to_df().clone(),
264 other,
265 s_left,
266 s_right,
267 args,
268 _verbose,
269 drop_names,
270 ),
271 JoinType::Right => dispatch_left_right::right_join_from_series(
272 self.to_df(),
273 other.clone(),
274 s_left,
275 s_right,
276 args,
277 _verbose,
278 drop_names,
279 ),
280 JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
281 #[cfg(feature = "semi_anti_join")]
282 JoinType::Anti => left_df._semi_anti_join_from_series(
283 s_left,
284 s_right,
285 args.slice,
286 true,
287 args.nulls_equal,
288 ),
289 #[cfg(feature = "semi_anti_join")]
290 JoinType::Semi => left_df._semi_anti_join_from_series(
291 s_left,
292 s_right,
293 args.slice,
294 false,
295 args.nulls_equal,
296 ),
297 #[cfg(feature = "asof_join")]
298 JoinType::AsOf(options) => match (options.left_by, options.right_by) {
299 (Some(left_by), Some(right_by)) => left_df._join_asof_by(
300 other,
301 s_left,
302 s_right,
303 left_by,
304 right_by,
305 options.strategy,
306 options.tolerance.map(|v| v.into_value()),
307 args.suffix.clone(),
308 args.slice,
309 should_coalesce,
310 options.allow_eq,
311 options.check_sortedness,
312 ),
313 (None, None) => left_df._join_asof(
314 other,
315 s_left,
316 s_right,
317 options.strategy,
318 options.tolerance.map(|v| v.into_value()),
319 args.suffix,
320 args.slice,
321 should_coalesce,
322 options.allow_eq,
323 options.check_sortedness,
324 ),
325 _ => {
326 panic!("expected by arguments on both sides")
327 },
328 },
329 #[cfg(feature = "iejoin")]
330 JoinType::IEJoin => {
331 unreachable!()
332 },
333 JoinType::Cross => {
334 unreachable!()
335 },
336 };
337 }
338 let (lhs_keys, rhs_keys) = if (left_df.height() == 0 || other.height() == 0)
339 && matches!(&args.how, JoinType::Inner)
340 {
341 let a = Series::full_null("".into(), 0, &DataType::Null);
344 (a.clone(), a)
345 } else {
346 (
348 prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
349 prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
350 )
351 };
352
353 let drop_names = if should_coalesce {
354 if args.how == JoinType::Right {
355 selected_left
356 .iter()
357 .map(|s| s.name().clone())
358 .collect::<Vec<_>>()
359 } else {
360 selected_right
361 .iter()
362 .map(|s| s.name().clone())
363 .collect::<Vec<_>>()
364 }
365 } else {
366 vec![]
367 };
368
369 match args.how {
371 #[cfg(feature = "asof_join")]
372 JoinType::AsOf(_) => polars_bail!(
373 ComputeError: "asof join not supported for join on multiple keys"
374 ),
375 #[cfg(feature = "iejoin")]
376 JoinType::IEJoin => {
377 unreachable!()
378 },
379 JoinType::Cross => {
380 unreachable!()
381 },
382 JoinType::Full => {
383 let names_left = selected_left
384 .iter()
385 .map(|s| s.name().clone())
386 .collect::<Vec<_>>();
387 args.coalesce = JoinCoalesce::KeepColumns;
388 let suffix = args.suffix.clone();
389 let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
390
391 if should_coalesce {
392 Ok(_coalesce_full_join(
393 out?,
394 names_left.as_slice(),
395 drop_names.as_slice(),
396 suffix,
397 left_df,
398 ))
399 } else {
400 out
401 }
402 },
403 JoinType::Inner => left_df._inner_join_from_series(
404 other,
405 &lhs_keys,
406 &rhs_keys,
407 args,
408 _verbose,
409 Some(drop_names),
410 ),
411 JoinType::Left => dispatch_left_right::left_join_from_series(
412 left_df.clone(),
413 other,
414 &lhs_keys,
415 &rhs_keys,
416 args,
417 _verbose,
418 Some(drop_names),
419 ),
420 JoinType::Right => dispatch_left_right::right_join_from_series(
421 left_df,
422 other.clone(),
423 &lhs_keys,
424 &rhs_keys,
425 args,
426 _verbose,
427 Some(drop_names),
428 ),
429 #[cfg(feature = "semi_anti_join")]
430 JoinType::Anti | JoinType::Semi => self._join_impl(
431 other,
432 vec![lhs_keys],
433 vec![rhs_keys],
434 args,
435 options,
436 _check_rechunk,
437 _verbose,
438 ),
439 }
440 }
441
442 fn inner_join(
454 &self,
455 other: &DataFrame,
456 left_on: impl IntoIterator<Item = impl AsRef<str>>,
457 right_on: impl IntoIterator<Item = impl AsRef<str>>,
458 ) -> PolarsResult<DataFrame> {
459 self.join(
460 other,
461 left_on,
462 right_on,
463 JoinArgs::new(JoinType::Inner),
464 None,
465 )
466 }
467
468 fn left_join(
504 &self,
505 other: &DataFrame,
506 left_on: impl IntoIterator<Item = impl AsRef<str>>,
507 right_on: impl IntoIterator<Item = impl AsRef<str>>,
508 ) -> PolarsResult<DataFrame> {
509 self.join(
510 other,
511 left_on,
512 right_on,
513 JoinArgs::new(JoinType::Left),
514 None,
515 )
516 }
517
518 fn full_join(
529 &self,
530 other: &DataFrame,
531 left_on: impl IntoIterator<Item = impl AsRef<str>>,
532 right_on: impl IntoIterator<Item = impl AsRef<str>>,
533 ) -> PolarsResult<DataFrame> {
534 self.join(
535 other,
536 left_on,
537 right_on,
538 JoinArgs::new(JoinType::Full),
539 None,
540 )
541 }
542}
543
544trait DataFrameJoinOpsPrivate: IntoDf {
545 fn _inner_join_from_series(
546 &self,
547 other: &DataFrame,
548 s_left: &Series,
549 s_right: &Series,
550 args: JoinArgs,
551 verbose: bool,
552 drop_names: Option<Vec<PlSmallStr>>,
553 ) -> PolarsResult<DataFrame> {
554 let left_df = self.to_df();
555 let ((join_tuples_left, join_tuples_right), sorted) =
556 _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
557
558 let mut join_tuples_left = &*join_tuples_left;
559 let mut join_tuples_right = &*join_tuples_right;
560
561 if let Some((offset, len)) = args.slice {
562 join_tuples_left = slice_slice(join_tuples_left, offset, len);
563 join_tuples_right = slice_slice(join_tuples_right, offset, len);
564 }
565
566 let other = if let Some(drop_names) = drop_names {
567 other.drop_many(drop_names)
568 } else {
569 other.drop(s_right.name()).unwrap()
570 };
571
572 let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
573 if sorted {
574 left.set_sorted_flag(IsSorted::Ascending);
575 }
576 let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
577
578 let already_left_sorted = sorted
579 && matches!(
580 args.maintain_order,
581 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
582 );
583 try_raise_keyboard_interrupt();
584 let (df_left, df_right) =
585 if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
586 let mut df = unsafe {
587 DataFrame::new_unchecked_infer_height(vec![
588 left.into_series().into(),
589 right.into_series().into(),
590 ])
591 };
592
593 let columns = match args.maintain_order {
594 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
595 MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
596 _ => unreachable!(),
597 };
598
599 let options = SortMultipleOptions::new()
600 .with_order_descending(false)
601 .with_maintain_order(true);
602
603 df.sort_in_place(columns, options)?;
604
605 let [mut a, b]: [Column; 2] = df.into_columns().try_into().unwrap();
606 if matches!(
607 args.maintain_order,
608 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
609 ) {
610 a.set_sorted_flag(IsSorted::Ascending);
611 }
612
613 POOL.join(
614 || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
616 || unsafe { other.take_unchecked(b.idx().unwrap()) },
617 )
618 } else {
619 POOL.join(
620 || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
622 || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
623 )
624 };
625
626 _finish_join(df_left, df_right, args.suffix)
627 }
628}
629
630impl DataFrameJoinOps for DataFrame {}
631impl DataFrameJoinOpsPrivate for DataFrame {}
632
633fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
634 let keys = s
635 .iter()
636 .map(|s| {
637 let phys = s.to_physical_repr();
638 match phys.dtype() {
639 #[cfg(feature = "dtype-f16")]
640 DataType::Float16 => phys.f16().unwrap().to_canonical().into_column(),
641 DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
642 DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
643 _ => phys.into_owned().into_column(),
644 }
645 })
646 .collect::<Vec<_>>();
647
648 if nulls_equal {
649 encode_rows_vertical_par_unordered(&keys)
650 } else {
651 encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
652 }
653}
654pub fn private_left_join_multiple_keys(
655 a: &DataFrame,
656 b: &DataFrame,
657 nulls_equal: bool,
658) -> PolarsResult<LeftJoinIds> {
659 let a_cols = a
661 .columns()
662 .iter()
663 .map(|c| c.as_materialized_series().clone())
664 .collect::<Vec<_>>();
665 let b_cols = b
666 .columns()
667 .iter()
668 .map(|c| c.as_materialized_series().clone())
669 .collect::<Vec<_>>();
670
671 let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
672 let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
673 sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
674}