use arrow::bitmap::utils::get_bit_unchecked;
use polars_utils::total_ord::{ToTotalOrd, TotalHash};
use rayon::prelude::*;
use xxhash_rust::xxh3::xxh3_64_with_seed;
use super::*;
use crate::prelude::*;
use crate::series::implementations::null::NullChunked;
use crate::POOL;
const MULTIPLE: u64 = 6364136223846793005;
pub trait VecHash {
fn vec_hash(&self, _random_state: RandomState, _buf: &mut Vec<u64>) -> PolarsResult<()> {
polars_bail!(un_impl = vec_hash);
}
fn vec_hash_combine(
&self,
_random_state: RandomState,
_hashes: &mut [u64],
) -> PolarsResult<()> {
polars_bail!(un_impl = vec_hash_combine);
}
}
pub(crate) const fn folded_multiply(s: u64, by: u64) -> u64 {
let result = (s as u128).wrapping_mul(by as u128);
((result & 0xffff_ffff_ffff_ffff) as u64) ^ ((result >> 64) as u64)
}
pub(crate) fn get_null_hash_value(random_state: &RandomState) -> u64 {
let first = random_state.hash_one(3188347919usize);
random_state.hash_one(first)
}
fn insert_null_hash(chunks: &[ArrayRef], random_state: RandomState, buf: &mut Vec<u64>) {
let null_h = get_null_hash_value(&random_state);
let hashes = buf.as_mut_slice();
let mut offset = 0;
chunks.iter().for_each(|arr| {
if arr.null_count() > 0 {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.for_each(|(valid, h)| {
*h = [null_h, *h][valid as usize];
})
}
offset += arr.len();
});
}
fn numeric_vec_hash<T>(ca: &ChunkedArray<T>, random_state: RandomState, buf: &mut Vec<u64>)
where
T: PolarsNumericType,
T::Native: TotalHash + ToTotalOrd,
<T::Native as ToTotalOrd>::TotalOrdItem: Hash,
{
buf.clear();
buf.reserve(ca.len());
#[allow(unused_unsafe)]
#[allow(clippy::useless_transmute)]
ca.downcast_iter().for_each(|arr| {
buf.extend(
arr.values()
.as_slice()
.iter()
.copied()
.map(|v| random_state.hash_one(v.to_total_ord())),
);
});
insert_null_hash(&ca.chunks, random_state, buf)
}
fn numeric_vec_hash_combine<T>(ca: &ChunkedArray<T>, random_state: RandomState, hashes: &mut [u64])
where
T: PolarsNumericType,
T::Native: TotalHash + ToTotalOrd,
<T::Native as ToTotalOrd>::TotalOrdItem: Hash,
{
let null_h = get_null_hash_value(&random_state);
let mut offset = 0;
ca.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values()
.as_slice()
.iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
*h = folded_multiply(
random_state.hash_one(v.to_total_ord()) ^ folded_multiply(*h, MULTIPLE),
MULTIPLE,
);
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.zip(arr.values().as_slice())
.for_each(|((valid, h), l)| {
let lh = random_state.hash_one(l.to_total_ord());
let to_hash = [null_h, lh][valid as usize];
*h = folded_multiply(to_hash ^ folded_multiply(*h, MULTIPLE), MULTIPLE);
});
},
}
offset += arr.len();
});
}
macro_rules! vec_hash_numeric {
($ca:ident) => {
impl VecHash for $ca {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
numeric_vec_hash(self, random_state, buf);
Ok(())
}
fn vec_hash_combine(
&self,
random_state: RandomState,
hashes: &mut [u64],
) -> PolarsResult<()> {
numeric_vec_hash_combine(self, random_state, hashes);
Ok(())
}
}
};
}
vec_hash_numeric!(Int64Chunked);
vec_hash_numeric!(Int32Chunked);
vec_hash_numeric!(Int16Chunked);
vec_hash_numeric!(Int8Chunked);
vec_hash_numeric!(UInt64Chunked);
vec_hash_numeric!(UInt32Chunked);
vec_hash_numeric!(UInt16Chunked);
vec_hash_numeric!(UInt8Chunked);
vec_hash_numeric!(Float64Chunked);
vec_hash_numeric!(Float32Chunked);
#[cfg(feature = "dtype-decimal")]
vec_hash_numeric!(Int128Chunked);
impl VecHash for StringChunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
self.as_binary().vec_hash(random_state, buf)?;
Ok(())
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) -> PolarsResult<()> {
self.as_binary().vec_hash_combine(random_state, hashes)?;
Ok(())
}
}
pub fn _hash_binary_array(arr: &BinaryArray<i64>, random_state: RandomState, buf: &mut Vec<u64>) {
let null_h = get_null_hash_value(&random_state);
if arr.null_count() == 0 {
buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
} else {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => xxh3_64_with_seed(v, null_h),
None => null_h,
}))
}
}
fn hash_binview_array(arr: &BinaryViewArray, random_state: RandomState, buf: &mut Vec<u64>) {
let null_h = get_null_hash_value(&random_state);
if arr.null_count() == 0 {
buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
} else {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => xxh3_64_with_seed(v, null_h),
None => null_h,
}))
}
}
impl VecHash for BinaryChunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
buf.clear();
buf.reserve(self.len());
self.downcast_iter()
.for_each(|arr| hash_binview_array(arr, random_state.clone(), buf));
Ok(())
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) -> PolarsResult<()> {
let null_h = get_null_hash_value(&random_state);
let mut offset = 0;
self.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values_iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let l = xxh3_64_with_seed(v, null_h);
*h = _boost_hash_combine(l, *h)
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.zip(arr.values_iter())
.for_each(|((valid, h), l)| {
let l = if valid {
xxh3_64_with_seed(l, null_h)
} else {
null_h
};
*h = _boost_hash_combine(l, *h)
});
},
}
offset += arr.len();
});
Ok(())
}
}
impl VecHash for BinaryOffsetChunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
buf.clear();
buf.reserve(self.len());
self.downcast_iter()
.for_each(|arr| _hash_binary_array(arr, random_state.clone(), buf));
Ok(())
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) -> PolarsResult<()> {
let null_h = get_null_hash_value(&random_state);
let mut offset = 0;
self.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values_iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let l = xxh3_64_with_seed(v, null_h);
*h = _boost_hash_combine(l, *h)
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.zip(arr.values_iter())
.for_each(|((valid, h), l)| {
let l = if valid {
xxh3_64_with_seed(l, null_h)
} else {
null_h
};
*h = _boost_hash_combine(l, *h)
});
},
}
offset += arr.len();
});
Ok(())
}
}
impl VecHash for NullChunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
let null_h = get_null_hash_value(&random_state);
buf.clear();
buf.resize(self.len(), null_h);
Ok(())
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) -> PolarsResult<()> {
let null_h = get_null_hash_value(&random_state);
hashes
.iter_mut()
.for_each(|h| *h = _boost_hash_combine(null_h, *h));
Ok(())
}
}
impl VecHash for BooleanChunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
buf.clear();
buf.reserve(self.len());
let true_h = random_state.hash_one(true);
let false_h = random_state.hash_one(false);
let null_h = get_null_hash_value(&random_state);
self.downcast_iter().for_each(|arr| {
if arr.null_count() == 0 {
buf.extend(arr.values_iter().map(|v| if v { true_h } else { false_h }))
} else {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(true) => true_h,
Some(false) => false_h,
None => null_h,
}))
}
});
Ok(())
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) -> PolarsResult<()> {
let true_h = random_state.hash_one(true);
let false_h = random_state.hash_one(false);
let null_h = get_null_hash_value(&random_state);
let mut offset = 0;
self.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values_iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let l = if v { true_h } else { false_h };
*h = _boost_hash_combine(l, *h)
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.zip(arr.values())
.for_each(|((valid, h), l)| {
let l = if valid {
if l {
true_h
} else {
false_h
}
} else {
null_h
};
*h = _boost_hash_combine(l, *h)
});
},
}
offset += arr.len();
});
Ok(())
}
}
#[cfg(feature = "object")]
impl<T> VecHash for ObjectChunked<T>
where
T: PolarsObject,
{
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
buf.clear();
buf.reserve(self.len());
self.downcast_iter()
.for_each(|arr| buf.extend(arr.into_iter().map(|opt_v| random_state.hash_one(opt_v))));
Ok(())
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) -> PolarsResult<()> {
self.apply_to_slice(
|opt_v, h| {
let hashed = random_state.hash_one(opt_v);
_boost_hash_combine(hashed, *h)
},
hashes,
);
Ok(())
}
}
pub fn _df_rows_to_hashes_threaded_vertical(
keys: &[DataFrame],
hasher_builder: Option<RandomState>,
) -> PolarsResult<(Vec<UInt64Chunked>, RandomState)> {
let hasher_builder = hasher_builder.unwrap_or_default();
let hashes = POOL.install(|| {
keys.into_par_iter()
.map(|df| {
let hb = hasher_builder.clone();
let mut hashes = vec![];
series_to_hashes(df.get_columns(), Some(hb), &mut hashes)?;
Ok(UInt64Chunked::from_vec("", hashes))
})
.collect::<PolarsResult<Vec<_>>>()
})?;
Ok((hashes, hasher_builder))
}
pub(crate) fn series_to_hashes(
keys: &[Series],
build_hasher: Option<RandomState>,
hashes: &mut Vec<u64>,
) -> PolarsResult<RandomState> {
let build_hasher = build_hasher.unwrap_or_default();
let mut iter = keys.iter();
let first = iter.next().expect("at least one key");
first.vec_hash(build_hasher.clone(), hashes)?;
for keys in iter {
keys.vec_hash_combine(build_hasher.clone(), hashes)?;
}
Ok(build_hasher)
}