pub(super) mod single_keys;
mod single_keys_dispatch;
mod single_keys_inner;
mod single_keys_left;
mod single_keys_outer;
#[cfg(feature = "semi_anti_join")]
mod single_keys_semi_anti;
pub(super) mod sort_merge;
use arrow::array::ArrayRef;
use polars_core::utils::_set_partition_size;
use polars_core::POOL;
use polars_utils::index::ChunkId;
pub(super) use single_keys::*;
#[cfg(feature = "asof_join")]
pub(super) use single_keys_dispatch::prepare_bytes;
pub use single_keys_dispatch::SeriesJoin;
use single_keys_inner::*;
use single_keys_left::*;
use single_keys_outer::*;
#[cfg(feature = "semi_anti_join")]
use single_keys_semi_anti::*;
pub(crate) use sort_merge::*;
pub use super::*;
#[cfg(feature = "chunked_ids")]
use crate::chunked_array::gather::chunked::DfTake;
pub fn default_join_ids() -> ChunkJoinOptIds {
#[cfg(feature = "chunked_ids")]
{
Either::Left(vec![])
}
#[cfg(not(feature = "chunked_ids"))]
{
vec![]
}
}
macro_rules! det_hash_prone_order {
($self:expr, $other:expr) => {{
if $self.len() > $other.len() {
($self, $other, false)
} else {
($other, $self, true)
}
}};
}
#[cfg(feature = "performant")]
use arrow::legacy::conversion::primitive_to_vec;
pub(super) use det_hash_prone_order;
pub trait JoinDispatch: IntoDf {
#[cfg(feature = "chunked_ids")]
unsafe fn create_left_df_chunked(&self, chunk_ids: &[ChunkId], left_join: bool) -> DataFrame {
let df_self = self.to_df();
if left_join && chunk_ids.len() == df_self.height() {
df_self.clone()
} else {
let sorted = if left_join {
IsSorted::Ascending
} else {
IsSorted::Not
};
df_self._take_chunked_unchecked(chunk_ids, sorted)
}
}
unsafe fn _create_left_df_from_slice(
&self,
join_tuples: &[IdxSize],
left_join: bool,
sorted_tuple_idx: bool,
) -> DataFrame {
let df_self = self.to_df();
if left_join && join_tuples.len() == df_self.height() {
df_self.clone()
} else {
let sorted = if left_join || sorted_tuple_idx {
IsSorted::Ascending
} else {
IsSorted::Not
};
df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)
}
}
#[cfg(not(feature = "chunked_ids"))]
fn _finish_left_join(
&self,
ids: LeftJoinIds,
other: &DataFrame,
args: JoinArgs,
) -> PolarsResult<DataFrame> {
let ca_self = self.to_df();
let (left_idx, right_idx) = ids;
let materialize_left =
|| unsafe { ca_self._create_left_df_from_slice(&left_idx, true, true) };
let materialize_right = || {
let right_idx = &*right_idx;
unsafe { IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx)) }
};
let (df_left, df_right) = POOL.join(materialize_left, materialize_right);
_finish_join(df_left, df_right, args.suffix.as_deref())
}
#[cfg(feature = "chunked_ids")]
fn _finish_left_join(
&self,
ids: LeftJoinIds,
other: &DataFrame,
args: JoinArgs,
) -> PolarsResult<DataFrame> {
let ca_self = self.to_df();
let suffix = &args.suffix;
let (left_idx, right_idx) = ids;
let materialize_left = || match left_idx {
ChunkJoinIds::Left(left_idx) => unsafe {
let mut left_idx = &*left_idx;
if let Some((offset, len)) = args.slice {
left_idx = slice_slice(left_idx, offset, len);
}
ca_self._create_left_df_from_slice(left_idx, true, true)
},
ChunkJoinIds::Right(left_idx) => unsafe {
let mut left_idx = &*left_idx;
if let Some((offset, len)) = args.slice {
left_idx = slice_slice(left_idx, offset, len);
}
ca_self.create_left_df_chunked(left_idx, true)
},
};
let materialize_right = || match right_idx {
ChunkJoinOptIds::Left(right_idx) => unsafe {
let mut right_idx = &*right_idx;
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx))
},
ChunkJoinOptIds::Right(right_idx) => unsafe {
let mut right_idx = &*right_idx;
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
other._take_opt_chunked_unchecked(right_idx)
},
};
let (df_left, df_right) = POOL.join(materialize_left, materialize_right);
_finish_join(df_left, df_right, suffix.as_deref())
}
fn _left_join_from_series(
&self,
other: &DataFrame,
s_left: &Series,
s_right: &Series,
args: JoinArgs,
verbose: bool,
drop_names: Option<&[&str]>,
) -> PolarsResult<DataFrame> {
let df_self = self.to_df();
#[cfg(feature = "dtype-categorical")]
_check_categorical_src(s_left.dtype(), s_right.dtype())?;
let mut left = df_self.clone();
let mut s_left = s_left.clone();
if let Some((offset, len)) = args.slice {
if offset == 0 {
left = left.slice(0, len);
s_left = s_left.slice(0, len);
}
}
let mut right = Cow::Borrowed(other);
let mut s_right = s_right.clone();
if left.should_rechunk() {
left.as_single_chunk_par();
s_left = s_left.rechunk();
}
if right.should_rechunk() {
let mut other = other.clone();
other.as_single_chunk_par();
right = Cow::Owned(other);
s_right = s_right.rechunk();
}
let ids = sort_or_hash_left(&s_left, &s_right, verbose, args.validation, args.join_nulls)?;
let right = if let Some(drop_names) = drop_names {
right.drop_many(drop_names)
} else {
right.drop(s_right.name()).unwrap()
};
left._finish_left_join(ids, &right, args)
}
#[cfg(feature = "semi_anti_join")]
unsafe fn _finish_anti_semi_join(
&self,
mut idx: &[IdxSize],
slice: Option<(i64, usize)>,
) -> DataFrame {
let ca_self = self.to_df();
if let Some((offset, len)) = slice {
idx = slice_slice(idx, offset, len);
}
ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)
}
#[cfg(feature = "semi_anti_join")]
fn _semi_anti_join_from_series(
&self,
s_left: &Series,
s_right: &Series,
slice: Option<(i64, usize)>,
anti: bool,
join_nulls: bool,
) -> PolarsResult<DataFrame> {
let ca_self = self.to_df();
#[cfg(feature = "dtype-categorical")]
_check_categorical_src(s_left.dtype(), s_right.dtype())?;
let idx = s_left.hash_join_semi_anti(s_right, anti, join_nulls);
Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })
}
fn _full_join_from_series(
&self,
other: &DataFrame,
s_left: &Series,
s_right: &Series,
args: JoinArgs,
) -> PolarsResult<DataFrame> {
let df_self = self.to_df();
#[cfg(feature = "dtype-categorical")]
_check_categorical_src(s_left.dtype(), s_right.dtype())?;
let (mut join_idx_l, mut join_idx_r) =
s_left.hash_join_outer(s_right, args.validation, args.join_nulls)?;
if let Some((offset, len)) = args.slice {
let (offset, len) = slice_offsets(offset, len, join_idx_l.len());
join_idx_l.slice(offset, len);
join_idx_r.slice(offset, len);
}
let idx_ca_l = IdxCa::with_chunk("", join_idx_l);
let idx_ca_r = IdxCa::with_chunk("", join_idx_r);
let (df_left, df_right) = POOL.join(
|| unsafe { df_self.take_unchecked(&idx_ca_l) },
|| unsafe { other.take_unchecked(&idx_ca_r) },
);
let coalesce = args.coalesce.coalesce(&JoinType::Full);
let out = _finish_join(df_left, df_right, args.suffix.as_deref());
if coalesce {
Ok(_coalesce_full_join(
out?,
&[s_left.name()],
&[s_right.name()],
args.suffix.as_deref(),
df_self,
))
} else {
out
}
}
}
impl JoinDispatch for DataFrame {}