1mod args;
2#[cfg(feature = "asof_join")]
3mod asof;
4mod cross_join;
5mod dispatch_left_right;
6mod general;
7mod hash_join;
8#[cfg(feature = "iejoin")]
9mod iejoin;
10#[cfg(feature = "merge_sorted")]
11mod merge_sorted;
12
13use std::borrow::Cow;
14use std::fmt::{Debug, Display, Formatter};
15use std::hash::Hash;
16
17pub use args::*;
18use arrow::trusted_len::TrustedLen;
19#[cfg(feature = "asof_join")]
20pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
21pub use cross_join::CrossJoin;
22#[cfg(feature = "chunked_ids")]
23use either::Either;
24#[cfg(feature = "chunked_ids")]
25use general::create_chunked_index_mapping;
26pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
27pub use hash_join::*;
28use hashbrown::hash_map::{Entry, RawEntryMut};
29#[cfg(feature = "iejoin")]
30pub use iejoin::{IEJoinOptions, InequalityOperator};
31#[cfg(feature = "merge_sorted")]
32pub use merge_sorted::_merge_sorted_dfs;
33use polars_core::POOL;
34#[allow(unused_imports)]
35use polars_core::chunked_array::ops::row_encode::{
36 encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
37};
38use polars_core::hashing::_HASHMAP_INIT_SIZE;
39use polars_core::prelude::*;
40pub(super) use polars_core::series::IsSorted;
41use polars_core::utils::slice_offsets;
42#[allow(unused_imports)]
43use polars_core::utils::slice_slice;
44use polars_utils::hashing::BytesHash;
45use rayon::prelude::*;
46
47use self::cross_join::fused_cross_filter;
48use super::IntoDf;
49
50pub trait DataFrameJoinOps: IntoDf {
51 fn join(
87 &self,
88 other: &DataFrame,
89 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
90 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
91 args: JoinArgs,
92 options: Option<JoinTypeOptions>,
93 ) -> PolarsResult<DataFrame> {
94 let df_left = self.to_df();
95 let selected_left = df_left.select_columns(left_on)?;
96 let selected_right = other.select_columns(right_on)?;
97
98 let selected_left = selected_left
99 .into_iter()
100 .map(Column::take_materialized_series)
101 .collect::<Vec<_>>();
102 let selected_right = selected_right
103 .into_iter()
104 .map(Column::take_materialized_series)
105 .collect::<Vec<_>>();
106
107 self._join_impl(
108 other,
109 selected_left,
110 selected_right,
111 args,
112 options,
113 true,
114 false,
115 )
116 }
117
118 #[doc(hidden)]
119 #[allow(clippy::too_many_arguments)]
120 #[allow(unused_mut)]
121 fn _join_impl(
122 &self,
123 other: &DataFrame,
124 mut selected_left: Vec<Series>,
125 mut selected_right: Vec<Series>,
126 mut args: JoinArgs,
127 options: Option<JoinTypeOptions>,
128 _check_rechunk: bool,
129 _verbose: bool,
130 ) -> PolarsResult<DataFrame> {
131 let left_df = self.to_df();
132
133 #[cfg(feature = "cross_join")]
134 if let JoinType::Cross = args.how {
135 if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
136 assert!(args.slice.is_none());
137 return fused_cross_filter(
138 left_df,
139 other,
140 args.suffix.clone(),
141 cross_options,
142 args.maintain_order,
143 );
144 }
145 return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
146 }
147
148 fn clear(s: &mut [Series]) {
150 for s in s.iter_mut() {
151 if s.len() == 1 {
152 *s = s.clear()
153 }
154 }
155 }
156 if left_df.is_empty() {
157 clear(&mut selected_left);
158 }
159 if other.is_empty() {
160 clear(&mut selected_right);
161 }
162
163 let should_coalesce = args.should_coalesce();
164 assert_eq!(selected_left.len(), selected_right.len());
165
166 #[cfg(feature = "chunked_ids")]
167 {
168 if _check_rechunk
172 && !(matches!(args.how, JoinType::Left)
173 || std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
174 {
175 let mut left = Cow::Borrowed(left_df);
176 let mut right = Cow::Borrowed(other);
177 if left_df.should_rechunk() {
178 if _verbose {
179 eprintln!(
180 "{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
181 args.how,
182 left_df.width()
183 );
184 }
185
186 let mut tmp_left = left_df.clone();
187 tmp_left.as_single_chunk_par();
188 left = Cow::Owned(tmp_left);
189 }
190 if other.should_rechunk() {
191 if _verbose {
192 eprintln!(
193 "{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
194 args.how,
195 other.width()
196 );
197 }
198 let mut tmp_right = other.clone();
199 tmp_right.as_single_chunk_par();
200 right = Cow::Owned(tmp_right);
201 }
202 return left._join_impl(
203 &right,
204 selected_left,
205 selected_right,
206 args,
207 options,
208 false,
209 _verbose,
210 );
211 }
212 }
213
214 if let Some((l, r)) = selected_left
215 .iter()
216 .zip(&selected_right)
217 .find(|(l, r)| l.dtype() != r.dtype())
218 {
219 polars_bail!(
220 ComputeError:
221 format!(
222 "datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
223 l.name(), l.dtype(), r.name(), r.dtype()
224 )
225 );
226 };
227
228 #[cfg(feature = "iejoin")]
229 if let JoinType::IEJoin = args.how {
230 let Some(JoinTypeOptions::IEJoin(options)) = options else {
231 unreachable!()
232 };
233 let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
234 {
235 iejoin::iejoin_par
236 } else {
237 iejoin::iejoin
238 };
239 return func(
240 left_df,
241 other,
242 selected_left,
243 selected_right,
244 &options,
245 args.suffix,
246 args.slice,
247 );
248 }
249
250 if selected_left.len() == 1 {
252 let s_left = &selected_left[0];
253 let s_right = &selected_right[0];
254 let drop_names: Option<Vec<PlSmallStr>> =
255 if should_coalesce { None } else { Some(vec![]) };
256 return match args.how {
257 JoinType::Inner => left_df
258 ._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
259 JoinType::Left => dispatch_left_right::left_join_from_series(
260 self.to_df().clone(),
261 other,
262 s_left,
263 s_right,
264 args,
265 _verbose,
266 drop_names,
267 ),
268 JoinType::Right => dispatch_left_right::right_join_from_series(
269 self.to_df(),
270 other.clone(),
271 s_left,
272 s_right,
273 args,
274 _verbose,
275 drop_names,
276 ),
277 JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
278 #[cfg(feature = "semi_anti_join")]
279 JoinType::Anti => left_df._semi_anti_join_from_series(
280 s_left,
281 s_right,
282 args.slice,
283 true,
284 args.nulls_equal,
285 ),
286 #[cfg(feature = "semi_anti_join")]
287 JoinType::Semi => left_df._semi_anti_join_from_series(
288 s_left,
289 s_right,
290 args.slice,
291 false,
292 args.nulls_equal,
293 ),
294 #[cfg(feature = "asof_join")]
295 JoinType::AsOf(options) => match (options.left_by, options.right_by) {
296 (Some(left_by), Some(right_by)) => left_df._join_asof_by(
297 other,
298 s_left,
299 s_right,
300 left_by,
301 right_by,
302 options.strategy,
303 options.tolerance.map(|v| v.into_value()),
304 args.suffix.clone(),
305 args.slice,
306 should_coalesce,
307 options.allow_eq,
308 options.check_sortedness,
309 ),
310 (None, None) => left_df._join_asof(
311 other,
312 s_left,
313 s_right,
314 options.strategy,
315 options.tolerance.map(|v| v.into_value()),
316 args.suffix,
317 args.slice,
318 should_coalesce,
319 options.allow_eq,
320 options.check_sortedness,
321 ),
322 _ => {
323 panic!("expected by arguments on both sides")
324 },
325 },
326 #[cfg(feature = "iejoin")]
327 JoinType::IEJoin => {
328 unreachable!()
329 },
330 JoinType::Cross => {
331 unreachable!()
332 },
333 };
334 }
335 let (lhs_keys, rhs_keys) =
336 if (left_df.is_empty() || other.is_empty()) && matches!(&args.how, JoinType::Inner) {
337 let a = Series::full_null("".into(), 0, &DataType::Null);
340 (a.clone(), a)
341 } else {
342 (
344 prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
345 prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
346 )
347 };
348
349 let drop_names = if should_coalesce {
350 if args.how == JoinType::Right {
351 selected_left
352 .iter()
353 .map(|s| s.name().clone())
354 .collect::<Vec<_>>()
355 } else {
356 selected_right
357 .iter()
358 .map(|s| s.name().clone())
359 .collect::<Vec<_>>()
360 }
361 } else {
362 vec![]
363 };
364
365 match args.how {
367 #[cfg(feature = "asof_join")]
368 JoinType::AsOf(_) => polars_bail!(
369 ComputeError: "asof join not supported for join on multiple keys"
370 ),
371 #[cfg(feature = "iejoin")]
372 JoinType::IEJoin => {
373 unreachable!()
374 },
375 JoinType::Cross => {
376 unreachable!()
377 },
378 JoinType::Full => {
379 let names_left = selected_left
380 .iter()
381 .map(|s| s.name().clone())
382 .collect::<Vec<_>>();
383 args.coalesce = JoinCoalesce::KeepColumns;
384 let suffix = args.suffix.clone();
385 let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
386
387 if should_coalesce {
388 Ok(_coalesce_full_join(
389 out?,
390 names_left.as_slice(),
391 drop_names.as_slice(),
392 suffix,
393 left_df,
394 ))
395 } else {
396 out
397 }
398 },
399 JoinType::Inner => left_df._inner_join_from_series(
400 other,
401 &lhs_keys,
402 &rhs_keys,
403 args,
404 _verbose,
405 Some(drop_names),
406 ),
407 JoinType::Left => dispatch_left_right::left_join_from_series(
408 left_df.clone(),
409 other,
410 &lhs_keys,
411 &rhs_keys,
412 args,
413 _verbose,
414 Some(drop_names),
415 ),
416 JoinType::Right => dispatch_left_right::right_join_from_series(
417 left_df,
418 other.clone(),
419 &lhs_keys,
420 &rhs_keys,
421 args,
422 _verbose,
423 Some(drop_names),
424 ),
425 #[cfg(feature = "semi_anti_join")]
426 JoinType::Anti | JoinType::Semi => self._join_impl(
427 other,
428 vec![lhs_keys],
429 vec![rhs_keys],
430 args,
431 options,
432 _check_rechunk,
433 _verbose,
434 ),
435 }
436 }
437
438 fn inner_join(
450 &self,
451 other: &DataFrame,
452 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
453 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
454 ) -> PolarsResult<DataFrame> {
455 self.join(
456 other,
457 left_on,
458 right_on,
459 JoinArgs::new(JoinType::Inner),
460 None,
461 )
462 }
463
464 fn left_join(
500 &self,
501 other: &DataFrame,
502 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
503 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
504 ) -> PolarsResult<DataFrame> {
505 self.join(
506 other,
507 left_on,
508 right_on,
509 JoinArgs::new(JoinType::Left),
510 None,
511 )
512 }
513
514 fn full_join(
525 &self,
526 other: &DataFrame,
527 left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
528 right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
529 ) -> PolarsResult<DataFrame> {
530 self.join(
531 other,
532 left_on,
533 right_on,
534 JoinArgs::new(JoinType::Full),
535 None,
536 )
537 }
538}
539
540trait DataFrameJoinOpsPrivate: IntoDf {
541 fn _inner_join_from_series(
542 &self,
543 other: &DataFrame,
544 s_left: &Series,
545 s_right: &Series,
546 args: JoinArgs,
547 verbose: bool,
548 drop_names: Option<Vec<PlSmallStr>>,
549 ) -> PolarsResult<DataFrame> {
550 let left_df = self.to_df();
551 let ((join_tuples_left, join_tuples_right), sorted) =
552 _sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
553
554 let mut join_tuples_left = &*join_tuples_left;
555 let mut join_tuples_right = &*join_tuples_right;
556
557 if let Some((offset, len)) = args.slice {
558 join_tuples_left = slice_slice(join_tuples_left, offset, len);
559 join_tuples_right = slice_slice(join_tuples_right, offset, len);
560 }
561
562 let other = if let Some(drop_names) = drop_names {
563 other.drop_many(drop_names)
564 } else {
565 other.drop(s_right.name()).unwrap()
566 };
567
568 let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
569 if sorted {
570 left.set_sorted_flag(IsSorted::Ascending);
571 }
572 let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
573
574 let already_left_sorted = sorted
575 && matches!(
576 args.maintain_order,
577 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
578 );
579 try_raise_keyboard_interrupt();
580 let (df_left, df_right) =
581 if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
582 let mut df =
583 DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?;
584
585 let columns = match args.maintain_order {
586 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
587 MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
588 _ => unreachable!(),
589 };
590
591 let options = SortMultipleOptions::new()
592 .with_order_descending(false)
593 .with_maintain_order(true);
594
595 df.sort_in_place(columns, options)?;
596
597 let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap();
598 if matches!(
599 args.maintain_order,
600 MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
601 ) {
602 a.set_sorted_flag(IsSorted::Ascending);
603 }
604
605 POOL.join(
606 || unsafe { left_df.take_unchecked(a.idx().unwrap()) },
608 || unsafe { other.take_unchecked(b.idx().unwrap()) },
609 )
610 } else {
611 POOL.join(
612 || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
614 || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
615 )
616 };
617
618 _finish_join(df_left, df_right, args.suffix)
619 }
620}
621
622impl DataFrameJoinOps for DataFrame {}
623impl DataFrameJoinOpsPrivate for DataFrame {}
624
625fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
626 let keys = s
627 .iter()
628 .map(|s| {
629 let phys = s.to_physical_repr();
630 match phys.dtype() {
631 DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
632 DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
633 _ => phys.into_owned().into_column(),
634 }
635 })
636 .collect::<Vec<_>>();
637
638 if nulls_equal {
639 encode_rows_vertical_par_unordered(&keys)
640 } else {
641 encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
642 }
643}
644pub fn private_left_join_multiple_keys(
645 a: &DataFrame,
646 b: &DataFrame,
647 nulls_equal: bool,
648) -> PolarsResult<LeftJoinIds> {
649 let a_cols = a
651 .get_columns()
652 .iter()
653 .map(|c| c.as_materialized_series().clone())
654 .collect::<Vec<_>>();
655 let b_cols = b
656 .get_columns()
657 .iter()
658 .map(|c| c.as_materialized_series().clone())
659 .collect::<Vec<_>>();
660
661 let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
662 let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
663 sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
664}