1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10#[cfg(feature = "merge_sorted")]
11mod merge_sorted;
12
13use std::borrow::Cow;
14use std::fmt::{Debug, Display, Formatter};
15use std::hash::Hash;
16
17pub use args::*;
18use arrow::trusted_len::TrustedLen;
19#[cfg(feature = "asof_join")]
20pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
21pub use cross_join::CrossJoin;
22#[cfg(feature = "chunked_ids")]
23use either::Either;
24#[cfg(feature = "chunked_ids")]
25use general::create_chunked_index_mapping;
26pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
27pub use hash_join::*;
28use hashbrown::hash_map::{Entry, RawEntryMut};
29#[cfg(feature = "iejoin")]
30pub use iejoin::{IEJoinOptions, InequalityOperator};
31#[cfg(feature = "merge_sorted")]
32pub use merge_sorted::_merge_sorted_dfs;
33use polars_core::POOL;
34#[allow(unused_imports)]
35use polars_core::chunked_array::ops::row_encode::{
36 encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
37};
38use polars_core::hashing::_HASHMAP_INIT_SIZE;
39use polars_core::prelude::*;
40pub(super) use polars_core::series::IsSorted;
41use polars_core::utils::slice_offsets;
42#[allow(unused_imports)]
43use polars_core::utils::slice_slice;
44use polars_utils::hashing::BytesHash;
45use rayon::prelude::*;
46
47use self::cross_join::fused_cross_filter;
48use super::IntoDf;
49
50pub trait DataFrameJoinOps: IntoDf {
51 fn join(
87 &self,
88 other: &DataFrame,
89 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
90 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
91 args: JoinArgs,
92 options: Option<JoinTypeOptions>,
93 ) -> PolarsResult<DataFrame> {
94 let df_left = self.to_df();
95 let selected_left = df_left.select_columns(left_on)?;
96 let selected_right = other.select_columns(right_on)?;
97
98 let selected_left = selected_left
99 .into_iter()
100 .map(Column::take_materialized_series)
101 .collect::<Vec<_>>();
102 let selected_right = selected_right
103 .into_iter()
104 .map(Column::take_materialized_series)
105 .collect::<Vec<_>>();
106
107 self._join_impl(
108 other,
109 selected_left,
110 selected_right,
111 args,
112 options,
113 true,
114 false,
115 )
116 }
117
118 #[doc(hidden)]
119 #[allow(clippy::too_many_arguments)]
120 #[allow(unused_mut)]
121 fn _join_impl(
122 &self,
123 other: &DataFrame,
124 mut selected_left: Vec<Series>,
125 mut selected_right: Vec<Series>,
126 mut args: JoinArgs,
127 options: Option<JoinTypeOptions>,
128 _check_rechunk: bool,
129 _verbose: bool,
130 ) -> PolarsResult<DataFrame> {
131 let left_df = self.to_df();
132
133 #[cfg(feature = "cross_join")]
134 if let JoinType::Cross = args.how {
135 if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
136 assert!(args.slice.is_none());
137 return fused_cross_filter(left_df, other, args.suffix.clone(), cross_options);
138 }
139 return left_df.cross_join(other, args.suffix.clone(), args.slice);
140 }
141
142 fn clear(s: &mut [Series]) {
144 for s in s.iter_mut() {
145 if s.len() == 1 {
146 *s = s.clear()
147 }
148 }
149 }
150 if left_df.is_empty() {
151 clear(&mut selected_left);
152 }
153 if other.is_empty() {
154 clear(&mut selected_right);
155 }
156
157 let should_coalesce = args.should_coalesce();
158 assert_eq!(selected_left.len(), selected_right.len());
159
160 #[cfg(feature = "chunked_ids")]
161 {
162 if _check_rechunk
166 && !(matches!(args.how, JoinType::Left)
167 || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
168 {
169 let mut left = Cow::Borrowed(left_df);
170 let mut right = Cow::Borrowed(other);
171 if left_df.should_rechunk() {
172 if _verbose {
173 eprintln!(
174 "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
175 args.how,
176 left_df.width()
177 );
178 }
179
180 let mut tmp_left = left_df.clone();
181 tmp_left.as_single_chunk_par();
182 left = Cow::Owned(tmp_left);
183 }
184 if other.should_rechunk() {
185 if _verbose {
186 eprintln!(
187 "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
188 args.how,
189 other.width()
190 );
191 }
192 let mut tmp_right = other.clone();
193 tmp_right.as_single_chunk_par();
194 right = Cow::Owned(tmp_right);
195 }
196 return left._join_impl(
197 &right,
198 selected_left,
199 selected_right,
200 args,
201 options,
202 false,
203 _verbose,
204 );
205 }
206 }
207
208 if let Some((l, r)) = selected_left
209 .iter()
210 .zip(&selected_right)
211 .find(|(l, r)| l.dtype() != r.dtype())
212 {
213 polars_bail!(
214 ComputeError:
215 format!(
216 "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
217 l.name(), l.dtype(), r.name(), r.dtype()
218 )
219 );
220 };
221
222 #[cfg(feature = "iejoin")]
223 if let JoinType::IEJoin = args.how {
224 let Some(JoinTypeOptions::IEJoin(options)) = options else {
225 unreachable!()
226 };
227 let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
228 {
229 iejoin::iejoin_par
230 } else {
231 iejoin::iejoin
232 };
233 return func(
234 left_df,
235 other,
236 selected_left,
237 selected_right,
238 &options,
239 args.suffix,
240 args.slice,
241 );
242 }
243
244 if selected_left.len() == 1 {
246 let s_left = &selected_left[0];
247 let s_right = &selected_right[0];
248 let drop_names: Option<Vec<PlSmallStr>> =
249 if should_coalesce { None } else { Some(vec![]) };
250 return match args.how {
251 JoinType::Inner => left_df
252 ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
253 JoinType::Left => dispatch_left_right::left_join_from_series(
254 self.to_df().clone(),
255 other,
256 s_left,
257 s_right,
258 args,
259 _verbose,
260 drop_names,
261 ),
262 JoinType::Right => dispatch_left_right::right_join_from_series(
263 self.to_df(),
264 other.clone(),
265 s_left,
266 s_right,
267 args,
268 _verbose,
269 drop_names,
270 ),
271 JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
272 #[cfg(feature = "semi_anti_join")]
273 JoinType::Anti => left_df._semi_anti_join_from_series(
274 s_left,
275 s_right,
276 args.slice,
277 true,
278 args.nulls_equal,
279 ),
280 #[cfg(feature = "semi_anti_join")]
281 JoinType::Semi => left_df._semi_anti_join_from_series(
282 s_left,
283 s_right,
284 args.slice,
285 false,
286 args.nulls_equal,
287 ),
288 #[cfg(feature = "asof_join")]
289 JoinType::AsOf(options) => match (options.left_by, options.right_by) {
290 (Some(left_by), Some(right_by)) => left_df._join_asof_by(
291 other,
292 s_left,
293 s_right,
294 left_by,
295 right_by,
296 options.strategy,
297 options.tolerance.map(|v| v.into_value()),
298 args.suffix.clone(),
299 args.slice,
300 should_coalesce,
301 options.allow_eq,
302 options.check_sortedness,
303 ),
304 (None, None) => left_df._join_asof(
305 other,
306 s_left,
307 s_right,
308 options.strategy,
309 options.tolerance.map(|v| v.into_value()),
310 args.suffix,
311 args.slice,
312 should_coalesce,
313 options.allow_eq,
314 options.check_sortedness,
315 ),
316 _ => {
317 panic!("expected by arguments on both sides")
318 },
319 },
320 #[cfg(feature = "iejoin")]
321 JoinType::IEJoin => {
322 unreachable!()
323 },
324 JoinType::Cross => {
325 unreachable!()
326 },
327 };
328 }
329 let (lhs_keys, rhs_keys) =
330 if (left_df.is_empty() || other.is_empty()) && matches!(&args.how, JoinType::Inner) {
331 let a = Series::full_null("".into(), 0, &DataType::Null);
334 (a.clone(), a)
335 } else {
336 (
338 prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
339 prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
340 )
341 };
342
343 let drop_names = if should_coalesce {
344 if args.how == JoinType::Right {
345 selected_left
346 .iter()
347 .map(|s| s.name().clone())
348 .collect::<Vec<_>>()
349 } else {
350 selected_right
351 .iter()
352 .map(|s| s.name().clone())
353 .collect::<Vec<_>>()
354 }
355 } else {
356 vec![]
357 };
358
359 match args.how {
361 #[cfg(feature = "asof_join")]
362 JoinType::AsOf(_) => polars_bail!(
363 ComputeError: "asof join not supported for join on multiple keys"
364 ),
365 #[cfg(feature = "iejoin")]
366 JoinType::IEJoin => {
367 unreachable!()
368 },
369 JoinType::Cross => {
370 unreachable!()
371 },
372 JoinType::Full => {
373 let names_left = selected_left
374 .iter()
375 .map(|s| s.name().clone())
376 .collect::<Vec<_>>();
377 args.coalesce = JoinCoalesce::KeepColumns;
378 let suffix = args.suffix.clone();
379 let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
380
381 if should_coalesce {
382 Ok(_coalesce_full_join(
383 out?,
384 names_left.as_slice(),
385 drop_names.as_slice(),
386 suffix,
387 left_df,
388 ))
389 } else {
390 out
391 }
392 },
393 JoinType::Inner => left_df._inner_join_from_series(
394 other,
395 &lhs_keys,
396 &rhs_keys,
397 args,
398 _verbose,
399 Some(drop_names),
400 ),
401 JoinType::Left => dispatch_left_right::left_join_from_series(
402 left_df.clone(),
403 other,
404 &lhs_keys,
405 &rhs_keys,
406 args,
407 _verbose,
408 Some(drop_names),
409 ),
410 JoinType::Right => dispatch_left_right::right_join_from_series(
411 left_df,
412 other.clone(),
413 &lhs_keys,
414 &rhs_keys,
415 args,
416 _verbose,
417 Some(drop_names),
418 ),
419 #[cfg(feature = "semi_anti_join")]
420 JoinType::Anti | JoinType::Semi => self._join_impl(
421 other,
422 vec![lhs_keys],
423 vec![rhs_keys],
424 args,
425 options,
426 _check_rechunk,
427 _verbose,
428 ),
429 }
430 }
431
432 fn inner_join(
444 &self,
445 other: &DataFrame,
446 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
447 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
448 ) -> PolarsResult<DataFrame> {
449 self.join(
450 other,
451 left_on,
452 right_on,
453 JoinArgs::new(JoinType::Inner),
454 None,
455 )
456 }
457
458 fn left_join(
494 &self,
495 other: &DataFrame,
496 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
497 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
498 ) -> PolarsResult<DataFrame> {
499 self.join(
500 other,
501 left_on,
502 right_on,
503 JoinArgs::new(JoinType::Left),
504 None,
505 )
506 }
507
508 fn full_join(
519 &self,
520 other: &DataFrame,
521 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
522 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
523 ) -> PolarsResult<DataFrame> {
524 self.join(
525 other,
526 left_on,
527 right_on,
528 JoinArgs::new(JoinType::Full),
529 None,
530 )
531 }
532}
533
534trait DataFrameJoinOpsPrivate: IntoDf {
535 fn _inner_join_from_series(
536 &self,
537 other: &DataFrame,
538 s_left: &Series,
539 s_right: &Series,
540 args: JoinArgs,
541 verbose: bool,
542 drop_names: Option<Vec<PlSmallStr>>,
543 ) -> PolarsResult<DataFrame> {
544 let left_df = self.to_df();
545 let ((join_tuples_left, join_tuples_right), sorted) =
546 _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
547
548 let mut join_tuples_left = &*join_tuples_left;
549 let mut join_tuples_right = &*join_tuples_right;
550
551 if let Some((offset, len)) = args.slice {
552 join_tuples_left = slice_slice(join_tuples_left, offset, len);
553 join_tuples_right = slice_slice(join_tuples_right, offset, len);
554 }
555
556 let other = if let Some(drop_names) = drop_names {
557 other.drop_many(drop_names)
558 } else {
559 other.drop(s_right.name()).unwrap()
560 };
561
562 let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
563 if sorted {
564 left.set_sorted_flag(IsSorted::Ascending);
565 }
566 let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
567
568 let already_left_sorted = sorted
569 && matches!(
570 args.maintain_order,
571 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
572 );
573 try_raise_keyboard_interrupt();
574 let (df_left, df_right) =
575 if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
576 let mut df =
577 DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?;
578
579 let columns = match args.maintain_order {
580 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
581 MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
582 _ => unreachable!(),
583 };
584
585 let options = SortMultipleOptions::new()
586 .with_order_descending(false)
587 .with_maintain_order(true);
588
589 df.sort_in_place(columns, options)?;
590
591 let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap();
592 if matches!(
593 args.maintain_order,
594 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
595 ) {
596 a.set_sorted_flag(IsSorted::Ascending);
597 }
598
599 POOL.join(
600 || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
602 || unsafe { other.take_unchecked(b.idx().unwrap()) },
603 )
604 } else {
605 POOL.join(
606 || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
608 || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
609 )
610 };
611
612 _finish_join(df_left, df_right, args.suffix)
613 }
614}
615
616impl DataFrameJoinOps for DataFrame {}
617impl DataFrameJoinOpsPrivate for DataFrame {}
618
619fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
620 let keys = s
621 .iter()
622 .map(|s| {
623 let phys = s.to_physical_repr();
624 match phys.dtype() {
625 DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
626 DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
627 _ => phys.into_owned().into_column(),
628 }
629 })
630 .collect::<Vec<_>>();
631
632 if nulls_equal {
633 encode_rows_vertical_par_unordered(&keys)
634 } else {
635 encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
636 }
637}
638pub fn private_left_join_multiple_keys(
639 a: &DataFrame,
640 b: &DataFrame,
641 nulls_equal: bool,
642) -> PolarsResult<LeftJoinIds> {
643 let a_cols = a
645 .get_columns()
646 .iter()
647 .map(|c| c.as_materialized_series().clone())
648 .collect::<Vec<_>>();
649 let b_cols = b
650 .get_columns()
651 .iter()
652 .map(|c| c.as_materialized_series().clone())
653 .collect::<Vec<_>>();
654
655 let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
656 let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
657 sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
658}