1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4#[cfg(feature = "dtype-categorical")]
5mod checks;
6mod cross_join;
7mod dispatch_left_right;
8mod general;
9mod hash_join;
10#[cfg(feature = "iejoin")]
11mod iejoin;
12#[cfg(feature = "merge_sorted")]
13mod merge_sorted;
14
15use std::borrow::Cow;
16use std::fmt::{Debug, Display, Formatter};
17use std::hash::Hash;
18
19pub use args::*;
20use arrow::trusted_len::TrustedLen;
21#[cfg(feature = "asof_join")]
22pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
23#[cfg(feature = "dtype-categorical")]
24pub(crate) use checks::*;
25pub use cross_join::CrossJoin;
26#[cfg(feature = "chunked_ids")]
27use either::Either;
28#[cfg(feature = "chunked_ids")]
29use general::create_chunked_index_mapping;
30pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
31pub use hash_join::*;
32use hashbrown::hash_map::{Entry, RawEntryMut};
33#[cfg(feature = "iejoin")]
34pub use iejoin::{IEJoinOptions, InequalityOperator};
35#[cfg(feature = "merge_sorted")]
36pub use merge_sorted::_merge_sorted_dfs;
37use polars_core::POOL;
38#[allow(unused_imports)]
39use polars_core::chunked_array::ops::row_encode::{
40 encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
41};
42use polars_core::hashing::_HASHMAP_INIT_SIZE;
43use polars_core::prelude::*;
44pub(super) use polars_core::series::IsSorted;
45use polars_core::utils::slice_offsets;
46#[allow(unused_imports)]
47use polars_core::utils::slice_slice;
48use polars_utils::hashing::BytesHash;
49use rayon::prelude::*;
50
51use self::cross_join::fused_cross_filter;
52use super::IntoDf;
53
54pub trait DataFrameJoinOps: IntoDf {
55 fn join(
91 &self,
92 other: &DataFrame,
93 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
94 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
95 args: JoinArgs,
96 options: Option<JoinTypeOptions>,
97 ) -> PolarsResult<DataFrame> {
98 let df_left = self.to_df();
99 let selected_left = df_left.select_columns(left_on)?;
100 let selected_right = other.select_columns(right_on)?;
101
102 let selected_left = selected_left
103 .into_iter()
104 .map(Column::take_materialized_series)
105 .collect::<Vec<_>>();
106 let selected_right = selected_right
107 .into_iter()
108 .map(Column::take_materialized_series)
109 .collect::<Vec<_>>();
110
111 self._join_impl(
112 other,
113 selected_left,
114 selected_right,
115 args,
116 options,
117 true,
118 false,
119 )
120 }
121
122 #[doc(hidden)]
123 #[allow(clippy::too_many_arguments)]
124 #[allow(unused_mut)]
125 fn _join_impl(
126 &self,
127 other: &DataFrame,
128 mut selected_left: Vec<Series>,
129 mut selected_right: Vec<Series>,
130 mut args: JoinArgs,
131 options: Option<JoinTypeOptions>,
132 _check_rechunk: bool,
133 _verbose: bool,
134 ) -> PolarsResult<DataFrame> {
135 let left_df = self.to_df();
136
137 #[cfg(feature = "cross_join")]
138 if let JoinType::Cross = args.how {
139 if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
140 assert!(args.slice.is_none());
141 return fused_cross_filter(left_df, other, args.suffix.clone(), cross_options);
142 }
143 return left_df.cross_join(other, args.suffix.clone(), args.slice);
144 }
145
146 fn clear(s: &mut [Series]) {
148 for s in s.iter_mut() {
149 if s.len() == 1 {
150 *s = s.clear()
151 }
152 }
153 }
154 if left_df.is_empty() {
155 clear(&mut selected_left);
156 }
157 if other.is_empty() {
158 clear(&mut selected_right);
159 }
160
161 let should_coalesce = args.should_coalesce();
162 assert_eq!(selected_left.len(), selected_right.len());
163
164 #[cfg(feature = "chunked_ids")]
165 {
166 if _check_rechunk
170 && !(matches!(args.how, JoinType::Left)
171 || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
172 {
173 let mut left = Cow::Borrowed(left_df);
174 let mut right = Cow::Borrowed(other);
175 if left_df.should_rechunk() {
176 if _verbose {
177 eprintln!(
178 "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
179 args.how,
180 left_df.width()
181 );
182 }
183
184 let mut tmp_left = left_df.clone();
185 tmp_left.as_single_chunk_par();
186 left = Cow::Owned(tmp_left);
187 }
188 if other.should_rechunk() {
189 if _verbose {
190 eprintln!(
191 "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
192 args.how,
193 other.width()
194 );
195 }
196 let mut tmp_right = other.clone();
197 tmp_right.as_single_chunk_par();
198 right = Cow::Owned(tmp_right);
199 }
200 return left._join_impl(
201 &right,
202 selected_left,
203 selected_right,
204 args,
205 options,
206 false,
207 _verbose,
208 );
209 }
210 }
211
212 if let Some((l, r)) = selected_left
213 .iter()
214 .zip(&selected_right)
215 .find(|(l, r)| l.dtype() != r.dtype())
216 {
217 polars_bail!(
218 ComputeError:
219 format!(
220 "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
221 l.name(), l.dtype(), r.name(), r.dtype()
222 )
223 );
224 };
225
226 #[cfg(feature = "dtype-categorical")]
227 for (l, r) in selected_left.iter_mut().zip(selected_right.iter_mut()) {
228 match _check_categorical_src(l.dtype(), r.dtype()) {
229 Ok(_) => {},
230 Err(_) => {
231 let (ca_left, ca_right) =
232 make_categoricals_compatible(l.categorical()?, r.categorical()?)?;
233 *l = ca_left.into_series().with_name(l.name().clone());
234 *r = ca_right.into_series().with_name(r.name().clone());
235 },
236 }
237 }
238
239 #[cfg(feature = "iejoin")]
240 if let JoinType::IEJoin = args.how {
241 let Some(JoinTypeOptions::IEJoin(options)) = options else {
242 unreachable!()
243 };
244 let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
245 {
246 iejoin::iejoin_par
247 } else {
248 iejoin::iejoin
249 };
250 return func(
251 left_df,
252 other,
253 selected_left,
254 selected_right,
255 &options,
256 args.suffix,
257 args.slice,
258 );
259 }
260
261 if selected_left.len() == 1 {
263 let s_left = &selected_left[0];
264 let s_right = &selected_right[0];
265 let drop_names: Option<Vec<PlSmallStr>> =
266 if should_coalesce { None } else { Some(vec![]) };
267 return match args.how {
268 JoinType::Inner => left_df
269 ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
270 JoinType::Left => dispatch_left_right::left_join_from_series(
271 self.to_df().clone(),
272 other,
273 s_left,
274 s_right,
275 args,
276 _verbose,
277 drop_names,
278 ),
279 JoinType::Right => dispatch_left_right::right_join_from_series(
280 self.to_df(),
281 other.clone(),
282 s_left,
283 s_right,
284 args,
285 _verbose,
286 drop_names,
287 ),
288 JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
289 #[cfg(feature = "semi_anti_join")]
290 JoinType::Anti => left_df._semi_anti_join_from_series(
291 s_left,
292 s_right,
293 args.slice,
294 true,
295 args.nulls_equal,
296 ),
297 #[cfg(feature = "semi_anti_join")]
298 JoinType::Semi => left_df._semi_anti_join_from_series(
299 s_left,
300 s_right,
301 args.slice,
302 false,
303 args.nulls_equal,
304 ),
305 #[cfg(feature = "asof_join")]
306 JoinType::AsOf(options) => match (options.left_by, options.right_by) {
307 (Some(left_by), Some(right_by)) => left_df._join_asof_by(
308 other,
309 s_left,
310 s_right,
311 left_by,
312 right_by,
313 options.strategy,
314 options.tolerance,
315 args.suffix.clone(),
316 args.slice,
317 should_coalesce,
318 options.allow_eq,
319 options.check_sortedness,
320 ),
321 (None, None) => left_df._join_asof(
322 other,
323 s_left,
324 s_right,
325 options.strategy,
326 options.tolerance,
327 args.suffix,
328 args.slice,
329 should_coalesce,
330 options.allow_eq,
331 options.check_sortedness,
332 ),
333 _ => {
334 panic!("expected by arguments on both sides")
335 },
336 },
337 #[cfg(feature = "iejoin")]
338 JoinType::IEJoin => {
339 unreachable!()
340 },
341 JoinType::Cross => {
342 unreachable!()
343 },
344 };
345 }
346 let (lhs_keys, rhs_keys) =
347 if left_df.is_empty() || other.is_empty() && matches!(&args.how, JoinType::Inner) {
348 let a = Series::full_null("".into(), 0, &DataType::Null);
351 (a.clone(), a)
352 } else {
353 (
355 prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
356 prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
357 )
358 };
359
360 let drop_names = if should_coalesce {
361 if args.how == JoinType::Right {
362 selected_left
363 .iter()
364 .map(|s| s.name().clone())
365 .collect::<Vec<_>>()
366 } else {
367 selected_right
368 .iter()
369 .map(|s| s.name().clone())
370 .collect::<Vec<_>>()
371 }
372 } else {
373 vec![]
374 };
375
376 match args.how {
378 #[cfg(feature = "asof_join")]
379 JoinType::AsOf(_) => polars_bail!(
380 ComputeError: "asof join not supported for join on multiple keys"
381 ),
382 #[cfg(feature = "iejoin")]
383 JoinType::IEJoin => {
384 unreachable!()
385 },
386 JoinType::Cross => {
387 unreachable!()
388 },
389 JoinType::Full => {
390 let names_left = selected_left
391 .iter()
392 .map(|s| s.name().clone())
393 .collect::<Vec<_>>();
394 args.coalesce = JoinCoalesce::KeepColumns;
395 let suffix = args.suffix.clone();
396 let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
397
398 if should_coalesce {
399 Ok(_coalesce_full_join(
400 out?,
401 names_left.as_slice(),
402 drop_names.as_slice(),
403 suffix.clone(),
404 left_df,
405 ))
406 } else {
407 out
408 }
409 },
410 JoinType::Inner => left_df._inner_join_from_series(
411 other,
412 &lhs_keys,
413 &rhs_keys,
414 args,
415 _verbose,
416 Some(drop_names),
417 ),
418 JoinType::Left => dispatch_left_right::left_join_from_series(
419 left_df.clone(),
420 other,
421 &lhs_keys,
422 &rhs_keys,
423 args,
424 _verbose,
425 Some(drop_names),
426 ),
427 JoinType::Right => dispatch_left_right::right_join_from_series(
428 left_df,
429 other.clone(),
430 &lhs_keys,
431 &rhs_keys,
432 args,
433 _verbose,
434 Some(drop_names),
435 ),
436 #[cfg(feature = "semi_anti_join")]
437 JoinType::Anti | JoinType::Semi => self._join_impl(
438 other,
439 vec![lhs_keys],
440 vec![rhs_keys],
441 args,
442 options,
443 _check_rechunk,
444 _verbose,
445 ),
446 }
447 }
448
449 fn inner_join(
461 &self,
462 other: &DataFrame,
463 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
464 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
465 ) -> PolarsResult<DataFrame> {
466 self.join(
467 other,
468 left_on,
469 right_on,
470 JoinArgs::new(JoinType::Inner),
471 None,
472 )
473 }
474
475 fn left_join(
511 &self,
512 other: &DataFrame,
513 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
514 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
515 ) -> PolarsResult<DataFrame> {
516 self.join(
517 other,
518 left_on,
519 right_on,
520 JoinArgs::new(JoinType::Left),
521 None,
522 )
523 }
524
525 fn full_join(
536 &self,
537 other: &DataFrame,
538 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
539 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
540 ) -> PolarsResult<DataFrame> {
541 self.join(
542 other,
543 left_on,
544 right_on,
545 JoinArgs::new(JoinType::Full),
546 None,
547 )
548 }
549}
550
551trait DataFrameJoinOpsPrivate: IntoDf {
552 fn _inner_join_from_series(
553 &self,
554 other: &DataFrame,
555 s_left: &Series,
556 s_right: &Series,
557 args: JoinArgs,
558 verbose: bool,
559 drop_names: Option<Vec<PlSmallStr>>,
560 ) -> PolarsResult<DataFrame> {
561 let left_df = self.to_df();
562 #[cfg(feature = "dtype-categorical")]
563 _check_categorical_src(s_left.dtype(), s_right.dtype())?;
564 let ((join_tuples_left, join_tuples_right), sorted) =
565 _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
566
567 let mut join_tuples_left = &*join_tuples_left;
568 let mut join_tuples_right = &*join_tuples_right;
569
570 if let Some((offset, len)) = args.slice {
571 join_tuples_left = slice_slice(join_tuples_left, offset, len);
572 join_tuples_right = slice_slice(join_tuples_right, offset, len);
573 }
574
575 let other = if let Some(drop_names) = drop_names {
576 other.drop_many(drop_names)
577 } else {
578 other.drop(s_right.name()).unwrap()
579 };
580
581 let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
582 if sorted {
583 left.set_sorted_flag(IsSorted::Ascending);
584 }
585 let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
586
587 let already_left_sorted = sorted
588 && matches!(
589 args.maintain_order,
590 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
591 );
592 try_raise_keyboard_interrupt();
593 let (df_left, df_right) =
594 if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
595 let mut df =
596 DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?;
597
598 let columns = match args.maintain_order {
599 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
600 MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
601 _ => unreachable!(),
602 };
603
604 let options = SortMultipleOptions::new()
605 .with_order_descending(false)
606 .with_maintain_order(true);
607
608 df.sort_in_place(columns, options)?;
609
610 let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap();
611 if matches!(
612 args.maintain_order,
613 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
614 ) {
615 a.set_sorted_flag(IsSorted::Ascending);
616 }
617
618 POOL.join(
619 || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
621 || unsafe { other.take_unchecked(b.idx().unwrap()) },
622 )
623 } else {
624 POOL.join(
625 || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
627 || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
628 )
629 };
630
631 _finish_join(df_left, df_right, args.suffix.clone())
632 }
633}
634
635impl DataFrameJoinOps for DataFrame {}
636impl DataFrameJoinOpsPrivate for DataFrame {}
637
638fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
639 let keys = s
640 .iter()
641 .map(|s| {
642 let phys = s.to_physical_repr();
643 match phys.dtype() {
644 DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
645 DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
646 _ => phys.into_owned().into_column(),
647 }
648 })
649 .collect::<Vec<_>>();
650
651 if nulls_equal {
652 encode_rows_vertical_par_unordered(&keys)
653 } else {
654 encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
655 }
656}
657pub fn private_left_join_multiple_keys(
658 a: &DataFrame,
659 b: &DataFrame,
660 nulls_equal: bool,
661) -> PolarsResult<LeftJoinIds> {
662 let a_cols = a
664 .get_columns()
665 .iter()
666 .map(|c| c.as_materialized_series().clone())
667 .collect::<Vec<_>>();
668 let b_cols = b
669 .get_columns()
670 .iter()
671 .map(|c| c.as_materialized_series().clone())
672 .collect::<Vec<_>>();
673
674 let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
675 let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
676 sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
677}