1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10pub mod merge_join;
11#[cfg(feature = "merge_sorted")]
12mod merge_sorted;
13
14use std::borrow::Cow;
15use std::fmt::{Debug, Display, Formatter};
16use std::hash::Hash;
17
18pub use args::*;
19use arrow::trusted_len::TrustedLen;
20#[cfg(feature = "asof_join")]
21pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
22pub use cross_join::CrossJoin;
23#[cfg(feature = "chunked_ids")]
24use either::Either;
25#[cfg(feature = "chunked_ids")]
26use general::create_chunked_index_mapping;
27pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
28pub use hash_join::*;
29use hashbrown::hash_map::{Entry, RawEntryMut};
30#[cfg(feature = "iejoin")]
31pub use iejoin::{IEJoinOptions, InequalityOperator};
32#[cfg(feature = "merge_sorted")]
33pub use merge_sorted::_merge_sorted_dfs;
34use polars_core::POOL;
35#[allow(unused_imports)]
36use polars_core::chunked_array::ops::row_encode::{
37 encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
38};
39use polars_core::datatypes::DataType;
40use polars_core::hashing::_HASHMAP_INIT_SIZE;
41use polars_core::prelude::*;
42pub(super) use polars_core::series::IsSorted;
43use polars_core::utils::slice_offsets;
44#[allow(unused_imports)]
45use polars_core::utils::slice_slice;
46use polars_utils::hashing::BytesHash;
47use rayon::prelude::*;
48
49use self::cross_join::fused_cross_filter;
50use super::IntoDf;
51
52pub trait DataFrameJoinOps: IntoDf {
53 fn join(
89 &self,
90 other: &DataFrame,
91 left_on: impl IntoIterator<Item = impl AsRef<str>>,
92 right_on: impl IntoIterator<Item = impl AsRef<str>>,
93 args: JoinArgs,
94 options: Option<JoinTypeOptions>,
95 ) -> PolarsResult<DataFrame> {
96 let df_left = self.to_df();
97 let selected_left = df_left.select_to_vec(left_on)?;
98 let selected_right = other.select_to_vec(right_on)?;
99
100 let selected_left = selected_left
101 .into_iter()
102 .map(Column::take_materialized_series)
103 .collect::<Vec<_>>();
104 let selected_right = selected_right
105 .into_iter()
106 .map(Column::take_materialized_series)
107 .collect::<Vec<_>>();
108
109 self._join_impl(
110 other,
111 selected_left,
112 selected_right,
113 args,
114 options,
115 true,
116 false,
117 )
118 }
119
120 #[doc(hidden)]
121 #[allow(clippy::too_many_arguments)]
122 #[allow(unused_mut)]
123 fn _join_impl(
124 &self,
125 other: &DataFrame,
126 mut selected_left: Vec<Series>,
127 mut selected_right: Vec<Series>,
128 mut args: JoinArgs,
129 options: Option<JoinTypeOptions>,
130 _check_rechunk: bool,
131 _verbose: bool,
132 ) -> PolarsResult<DataFrame> {
133 let left_df = self.to_df();
134
135 #[cfg(feature = "cross_join")]
136 if let JoinType::Cross = args.how {
137 if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
138 assert!(args.slice.is_none());
139 return fused_cross_filter(
140 left_df,
141 other,
142 args.suffix.clone(),
143 cross_options,
144 args.maintain_order,
145 );
146 }
147 return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
148 }
149
150 fn clear(s: &mut [Series]) {
152 for s in s.iter_mut() {
153 if s.len() == 1 {
154 *s = s.clear()
155 }
156 }
157 }
158 if left_df.height() == 0 {
159 clear(&mut selected_left);
160 }
161 if other.height() == 0 {
162 clear(&mut selected_right);
163 }
164
165 let should_coalesce = args.should_coalesce();
166 assert_eq!(selected_left.len(), selected_right.len());
167
168 #[cfg(feature = "chunked_ids")]
169 {
170 if _check_rechunk
174 && !(matches!(args.how, JoinType::Left)
175 || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
176 {
177 let mut left = Cow::Borrowed(left_df);
178 let mut right = Cow::Borrowed(other);
179 if left_df.should_rechunk() {
180 if _verbose {
181 eprintln!(
182 "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
183 args.how,
184 left_df.width()
185 );
186 }
187
188 let mut tmp_left = left_df.clone();
189 tmp_left.rechunk_mut_par();
190 left = Cow::Owned(tmp_left);
191 }
192 if other.should_rechunk() {
193 if _verbose {
194 eprintln!(
195 "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
196 args.how,
197 other.width()
198 );
199 }
200 let mut tmp_right = other.clone();
201 tmp_right.rechunk_mut_par();
202 right = Cow::Owned(tmp_right);
203 }
204 return left._join_impl(
205 &right,
206 selected_left,
207 selected_right,
208 args,
209 options,
210 false,
211 _verbose,
212 );
213 }
214 }
215
216 if let Some((l, r)) = selected_left
217 .iter()
218 .zip(&selected_right)
219 .find(|(l, r)| l.dtype() != r.dtype())
220 {
221 polars_bail!(
222 ComputeError:
223 "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
224 l.name(), l.dtype().pretty_format(), r.name(), r.dtype().pretty_format()
225 );
226 };
227
228 #[cfg(feature = "iejoin")]
229 if let JoinType::IEJoin = args.how {
230 let Some(JoinTypeOptions::IEJoin(options)) = options else {
231 unreachable!()
232 };
233 let func = if POOL.current_num_threads() > 1
234 && !left_df.shape_has_zero()
235 && !other.shape_has_zero()
236 {
237 iejoin::iejoin_par
238 } else {
239 iejoin::iejoin
240 };
241 return func(
242 left_df,
243 other,
244 selected_left,
245 selected_right,
246 &options,
247 args.suffix,
248 args.slice,
249 );
250 }
251
252 if selected_left.len() == 1 {
254 let s_left = &selected_left[0];
255 let s_right = &selected_right[0];
256 let drop_names: Option<Vec<PlSmallStr>> =
257 if should_coalesce { None } else { Some(vec![]) };
258 return match args.how {
259 JoinType::Inner => left_df
260 ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
261 JoinType::Left => dispatch_left_right::left_join_from_series(
262 self.to_df().clone(),
263 other,
264 s_left,
265 s_right,
266 args,
267 _verbose,
268 drop_names,
269 ),
270 JoinType::Right => dispatch_left_right::right_join_from_series(
271 self.to_df(),
272 other.clone(),
273 s_left,
274 s_right,
275 args,
276 _verbose,
277 drop_names,
278 ),
279 JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
280 #[cfg(feature = "semi_anti_join")]
281 JoinType::Anti => left_df._semi_anti_join_from_series(
282 s_left,
283 s_right,
284 args.slice,
285 true,
286 args.nulls_equal,
287 ),
288 #[cfg(feature = "semi_anti_join")]
289 JoinType::Semi => left_df._semi_anti_join_from_series(
290 s_left,
291 s_right,
292 args.slice,
293 false,
294 args.nulls_equal,
295 ),
296 #[cfg(feature = "asof_join")]
297 JoinType::AsOf(options) => match (options.left_by, options.right_by) {
298 (Some(left_by), Some(right_by)) => left_df._join_asof_by(
299 other,
300 s_left,
301 s_right,
302 left_by,
303 right_by,
304 options.strategy,
305 options.tolerance.map(|v| v.into_value()),
306 args.suffix.clone(),
307 args.slice,
308 should_coalesce,
309 options.allow_eq,
310 options.check_sortedness,
311 ),
312 (None, None) => left_df._join_asof(
313 other,
314 s_left,
315 s_right,
316 options.strategy,
317 options.tolerance.map(|v| v.into_value()),
318 args.suffix,
319 args.slice,
320 should_coalesce,
321 options.allow_eq,
322 options.check_sortedness,
323 ),
324 _ => {
325 panic!("expected by arguments on both sides")
326 },
327 },
328 #[cfg(feature = "iejoin")]
329 JoinType::IEJoin => {
330 unreachable!()
331 },
332 JoinType::Cross => {
333 unreachable!()
334 },
335 };
336 }
337 let (lhs_keys, rhs_keys) = if (left_df.height() == 0 || other.height() == 0)
338 && matches!(&args.how, JoinType::Inner)
339 {
340 let a = Series::full_null("".into(), 0, &DataType::Null);
343 (a.clone(), a)
344 } else {
345 (
347 prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
348 prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
349 )
350 };
351
352 let drop_names = if should_coalesce {
353 if args.how == JoinType::Right {
354 selected_left
355 .iter()
356 .map(|s| s.name().clone())
357 .collect::<Vec<_>>()
358 } else {
359 selected_right
360 .iter()
361 .map(|s| s.name().clone())
362 .collect::<Vec<_>>()
363 }
364 } else {
365 vec![]
366 };
367
368 match args.how {
370 #[cfg(feature = "asof_join")]
371 JoinType::AsOf(_) => polars_bail!(
372 ComputeError: "asof join not supported for join on multiple keys"
373 ),
374 #[cfg(feature = "iejoin")]
375 JoinType::IEJoin => {
376 unreachable!()
377 },
378 JoinType::Cross => {
379 unreachable!()
380 },
381 JoinType::Full => {
382 let names_left = selected_left
383 .iter()
384 .map(|s| s.name().clone())
385 .collect::<Vec<_>>();
386 args.coalesce = JoinCoalesce::KeepColumns;
387 let suffix = args.suffix.clone();
388 let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
389
390 if should_coalesce {
391 Ok(_coalesce_full_join(
392 out?,
393 names_left.as_slice(),
394 drop_names.as_slice(),
395 suffix,
396 left_df,
397 ))
398 } else {
399 out
400 }
401 },
402 JoinType::Inner => left_df._inner_join_from_series(
403 other,
404 &lhs_keys,
405 &rhs_keys,
406 args,
407 _verbose,
408 Some(drop_names),
409 ),
410 JoinType::Left => dispatch_left_right::left_join_from_series(
411 left_df.clone(),
412 other,
413 &lhs_keys,
414 &rhs_keys,
415 args,
416 _verbose,
417 Some(drop_names),
418 ),
419 JoinType::Right => dispatch_left_right::right_join_from_series(
420 left_df,
421 other.clone(),
422 &lhs_keys,
423 &rhs_keys,
424 args,
425 _verbose,
426 Some(drop_names),
427 ),
428 #[cfg(feature = "semi_anti_join")]
429 JoinType::Anti | JoinType::Semi => self._join_impl(
430 other,
431 vec![lhs_keys],
432 vec![rhs_keys],
433 args,
434 options,
435 _check_rechunk,
436 _verbose,
437 ),
438 }
439 }
440
441 fn inner_join(
453 &self,
454 other: &DataFrame,
455 left_on: impl IntoIterator<Item = impl AsRef<str>>,
456 right_on: impl IntoIterator<Item = impl AsRef<str>>,
457 ) -> PolarsResult<DataFrame> {
458 self.join(
459 other,
460 left_on,
461 right_on,
462 JoinArgs::new(JoinType::Inner),
463 None,
464 )
465 }
466
467 fn left_join(
503 &self,
504 other: &DataFrame,
505 left_on: impl IntoIterator<Item = impl AsRef<str>>,
506 right_on: impl IntoIterator<Item = impl AsRef<str>>,
507 ) -> PolarsResult<DataFrame> {
508 self.join(
509 other,
510 left_on,
511 right_on,
512 JoinArgs::new(JoinType::Left),
513 None,
514 )
515 }
516
517 fn full_join(
528 &self,
529 other: &DataFrame,
530 left_on: impl IntoIterator<Item = impl AsRef<str>>,
531 right_on: impl IntoIterator<Item = impl AsRef<str>>,
532 ) -> PolarsResult<DataFrame> {
533 self.join(
534 other,
535 left_on,
536 right_on,
537 JoinArgs::new(JoinType::Full),
538 None,
539 )
540 }
541}
542
543trait DataFrameJoinOpsPrivate: IntoDf {
544 fn _inner_join_from_series(
545 &self,
546 other: &DataFrame,
547 s_left: &Series,
548 s_right: &Series,
549 args: JoinArgs,
550 verbose: bool,
551 drop_names: Option<Vec<PlSmallStr>>,
552 ) -> PolarsResult<DataFrame> {
553 let left_df = self.to_df();
554 let ((join_tuples_left, join_tuples_right), sorted) =
555 _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
556
557 let mut join_tuples_left = &*join_tuples_left;
558 let mut join_tuples_right = &*join_tuples_right;
559
560 if let Some((offset, len)) = args.slice {
561 join_tuples_left = slice_slice(join_tuples_left, offset, len);
562 join_tuples_right = slice_slice(join_tuples_right, offset, len);
563 }
564
565 let other = if let Some(drop_names) = drop_names {
566 other.drop_many(drop_names)
567 } else {
568 other.drop(s_right.name()).unwrap()
569 };
570
571 let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
572 if sorted {
573 left.set_sorted_flag(IsSorted::Ascending);
574 }
575 let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
576
577 let already_left_sorted = sorted
578 && matches!(
579 args.maintain_order,
580 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
581 );
582 try_raise_keyboard_interrupt();
583 let (df_left, df_right) =
584 if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
585 let mut df = unsafe {
586 DataFrame::new_unchecked_infer_height(vec![
587 left.into_series().into(),
588 right.into_series().into(),
589 ])
590 };
591
592 let columns = match args.maintain_order {
593 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
594 MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
595 _ => unreachable!(),
596 };
597
598 let options = SortMultipleOptions::new()
599 .with_order_descending(false)
600 .with_maintain_order(true);
601
602 df.sort_in_place(columns, options)?;
603
604 let [mut a, b]: [Column; 2] = df.into_columns().try_into().unwrap();
605 if matches!(
606 args.maintain_order,
607 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
608 ) {
609 a.set_sorted_flag(IsSorted::Ascending);
610 }
611
612 POOL.join(
613 || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
615 || unsafe { other.take_unchecked(b.idx().unwrap()) },
616 )
617 } else {
618 POOL.join(
619 || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
621 || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
622 )
623 };
624
625 _finish_join(df_left, df_right, args.suffix)
626 }
627}
628
629impl DataFrameJoinOps for DataFrame {}
630impl DataFrameJoinOpsPrivate for DataFrame {}
631
632fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
633 let keys = s
634 .iter()
635 .map(|s| {
636 let phys = s.to_physical_repr();
637 match phys.dtype() {
638 #[cfg(feature = "dtype-f16")]
639 DataType::Float16 => phys.f16().unwrap().to_canonical().into_column(),
640 DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
641 DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
642 _ => phys.into_owned().into_column(),
643 }
644 })
645 .collect::<Vec<_>>();
646
647 if nulls_equal {
648 encode_rows_vertical_par_unordered(&keys)
649 } else {
650 encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
651 }
652}
653pub fn private_left_join_multiple_keys(
654 a: &DataFrame,
655 b: &DataFrame,
656 nulls_equal: bool,
657) -> PolarsResult<LeftJoinIds> {
658 let a_cols = a
660 .columns()
661 .iter()
662 .map(|c| c.as_materialized_series().clone())
663 .collect::<Vec<_>>();
664 let b_cols = b
665 .columns()
666 .iter()
667 .map(|c| c.as_materialized_series().clone())
668 .collect::<Vec<_>>();
669
670 let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
671 let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
672 sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
673}