1use arrow::bitmap::utils::get_bit_unchecked;
2use polars_utils::hashing::{_boost_hash_combine, folded_multiply};
3use polars_utils::total_ord::{ToTotalOrd, TotalHash};
4use rayon::prelude::*;
5use xxhash_rust::xxh3::xxh3_64_with_seed;
6
7use super::*;
8use crate::POOL;
9use crate::prelude::*;
10use crate::series::implementations::null::NullChunked;
11
12const MULTIPLE: u64 = 6364136223846793005;
14
15pub trait VecHash {
20 fn vec_hash(&self, _random_state: PlRandomState, _buf: &mut Vec<u64>) -> PolarsResult<()> {
22 polars_bail!(un_impl = vec_hash);
23 }
24
25 fn vec_hash_combine(
26 &self,
27 _random_state: PlRandomState,
28 _hashes: &mut [u64],
29 ) -> PolarsResult<()> {
30 polars_bail!(un_impl = vec_hash_combine);
31 }
32}
33
34pub(crate) fn get_null_hash_value(random_state: &PlRandomState) -> u64 {
35 let first = random_state.hash_one(3188347919usize);
38 random_state.hash_one(first)
39}
40
41fn insert_null_hash(chunks: &[ArrayRef], random_state: PlRandomState, buf: &mut Vec<u64>) {
42 let null_h = get_null_hash_value(&random_state);
43 let hashes = buf.as_mut_slice();
44
45 let mut offset = 0;
46 chunks.iter().for_each(|arr| {
47 if arr.null_count() > 0 {
48 let validity = arr.validity().unwrap();
49 let (slice, byte_offset, _) = validity.as_slice();
50 (0..validity.len())
51 .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
52 .zip(&mut hashes[offset..])
53 .for_each(|(valid, h)| {
54 *h = [null_h, *h][valid as usize];
55 })
56 }
57 offset += arr.len();
58 });
59}
60
61fn numeric_vec_hash<T>(ca: &ChunkedArray<T>, random_state: PlRandomState, buf: &mut Vec<u64>)
62where
63 T: PolarsNumericType,
64 T::Native: TotalHash + ToTotalOrd,
65 <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
66{
67 buf.clear();
74 buf.reserve(ca.len());
75
76 #[allow(unused_unsafe)]
77 #[allow(clippy::useless_transmute)]
78 ca.downcast_iter().for_each(|arr| {
79 buf.extend(
80 arr.values()
81 .as_slice()
82 .iter()
83 .copied()
84 .map(|v| random_state.hash_one(v.to_total_ord())),
85 );
86 });
87 insert_null_hash(&ca.chunks, random_state, buf)
88}
89
90fn numeric_vec_hash_combine<T>(
91 ca: &ChunkedArray<T>,
92 random_state: PlRandomState,
93 hashes: &mut [u64],
94) where
95 T: PolarsNumericType,
96 T::Native: TotalHash + ToTotalOrd,
97 <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
98{
99 let null_h = get_null_hash_value(&random_state);
100
101 let mut offset = 0;
102 ca.downcast_iter().for_each(|arr| {
103 match arr.null_count() {
104 0 => arr
105 .values()
106 .as_slice()
107 .iter()
108 .zip(&mut hashes[offset..])
109 .for_each(|(v, h)| {
110 *h = folded_multiply(
112 random_state.hash_one(v.to_total_ord()) ^ folded_multiply(*h, MULTIPLE),
115 MULTIPLE,
116 );
117 }),
118 _ => {
119 let validity = arr.validity().unwrap();
120 let (slice, byte_offset, _) = validity.as_slice();
121 (0..validity.len())
122 .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
123 .zip(&mut hashes[offset..])
124 .zip(arr.values().as_slice())
125 .for_each(|((valid, h), l)| {
126 let lh = random_state.hash_one(l.to_total_ord());
127 let to_hash = [null_h, lh][valid as usize];
128 *h = folded_multiply(to_hash ^ folded_multiply(*h, MULTIPLE), MULTIPLE);
129 });
130 },
131 }
132 offset += arr.len();
133 });
134}
135
136macro_rules! vec_hash_numeric {
137 ($ca:ident) => {
138 impl VecHash for $ca {
139 fn vec_hash(
140 &self,
141 random_state: PlRandomState,
142 buf: &mut Vec<u64>,
143 ) -> PolarsResult<()> {
144 numeric_vec_hash(self, random_state, buf);
145 Ok(())
146 }
147
148 fn vec_hash_combine(
149 &self,
150 random_state: PlRandomState,
151 hashes: &mut [u64],
152 ) -> PolarsResult<()> {
153 numeric_vec_hash_combine(self, random_state, hashes);
154 Ok(())
155 }
156 }
157 };
158}
159
160vec_hash_numeric!(Int64Chunked);
161vec_hash_numeric!(Int32Chunked);
162vec_hash_numeric!(Int16Chunked);
163vec_hash_numeric!(Int8Chunked);
164vec_hash_numeric!(UInt64Chunked);
165vec_hash_numeric!(UInt32Chunked);
166vec_hash_numeric!(UInt16Chunked);
167vec_hash_numeric!(UInt8Chunked);
168vec_hash_numeric!(Float64Chunked);
169vec_hash_numeric!(Float32Chunked);
170#[cfg(any(feature = "dtype-decimal", feature = "dtype-i128"))]
171vec_hash_numeric!(Int128Chunked);
172
173impl VecHash for StringChunked {
174 fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
175 self.as_binary().vec_hash(random_state, buf)?;
176 Ok(())
177 }
178
179 fn vec_hash_combine(
180 &self,
181 random_state: PlRandomState,
182 hashes: &mut [u64],
183 ) -> PolarsResult<()> {
184 self.as_binary().vec_hash_combine(random_state, hashes)?;
185 Ok(())
186 }
187}
188
189pub fn _hash_binary_array(arr: &BinaryArray<i64>, random_state: PlRandomState, buf: &mut Vec<u64>) {
191 let null_h = get_null_hash_value(&random_state);
192 if arr.null_count() == 0 {
193 buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
195 } else {
196 buf.extend(arr.into_iter().map(|opt_v| match opt_v {
197 Some(v) => xxh3_64_with_seed(v, null_h),
198 None => null_h,
199 }))
200 }
201}
202
203fn hash_binview_array(arr: &BinaryViewArray, random_state: PlRandomState, buf: &mut Vec<u64>) {
204 let null_h = get_null_hash_value(&random_state);
205 if arr.null_count() == 0 {
206 buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
208 } else {
209 buf.extend(arr.into_iter().map(|opt_v| match opt_v {
210 Some(v) => xxh3_64_with_seed(v, null_h),
211 None => null_h,
212 }))
213 }
214}
215
216impl VecHash for BinaryChunked {
217 fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
218 buf.clear();
219 buf.reserve(self.len());
220 self.downcast_iter()
221 .for_each(|arr| hash_binview_array(arr, random_state.clone(), buf));
222 Ok(())
223 }
224
225 fn vec_hash_combine(
226 &self,
227 random_state: PlRandomState,
228 hashes: &mut [u64],
229 ) -> PolarsResult<()> {
230 let null_h = get_null_hash_value(&random_state);
231
232 let mut offset = 0;
233 self.downcast_iter().for_each(|arr| {
234 match arr.null_count() {
235 0 => arr
236 .values_iter()
237 .zip(&mut hashes[offset..])
238 .for_each(|(v, h)| {
239 let l = xxh3_64_with_seed(v, null_h);
240 *h = _boost_hash_combine(l, *h)
241 }),
242 _ => {
243 let validity = arr.validity().unwrap();
244 let (slice, byte_offset, _) = validity.as_slice();
245 (0..validity.len())
246 .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
247 .zip(&mut hashes[offset..])
248 .zip(arr.values_iter())
249 .for_each(|((valid, h), l)| {
250 let l = if valid {
251 xxh3_64_with_seed(l, null_h)
252 } else {
253 null_h
254 };
255 *h = _boost_hash_combine(l, *h)
256 });
257 },
258 }
259 offset += arr.len();
260 });
261 Ok(())
262 }
263}
264
265impl VecHash for BinaryOffsetChunked {
266 fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
267 buf.clear();
268 buf.reserve(self.len());
269 self.downcast_iter()
270 .for_each(|arr| _hash_binary_array(arr, random_state.clone(), buf));
271 Ok(())
272 }
273
274 fn vec_hash_combine(
275 &self,
276 random_state: PlRandomState,
277 hashes: &mut [u64],
278 ) -> PolarsResult<()> {
279 let null_h = get_null_hash_value(&random_state);
280
281 let mut offset = 0;
282 self.downcast_iter().for_each(|arr| {
283 match arr.null_count() {
284 0 => arr
285 .values_iter()
286 .zip(&mut hashes[offset..])
287 .for_each(|(v, h)| {
288 let l = xxh3_64_with_seed(v, null_h);
289 *h = _boost_hash_combine(l, *h)
290 }),
291 _ => {
292 let validity = arr.validity().unwrap();
293 let (slice, byte_offset, _) = validity.as_slice();
294 (0..validity.len())
295 .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
296 .zip(&mut hashes[offset..])
297 .zip(arr.values_iter())
298 .for_each(|((valid, h), l)| {
299 let l = if valid {
300 xxh3_64_with_seed(l, null_h)
301 } else {
302 null_h
303 };
304 *h = _boost_hash_combine(l, *h)
305 });
306 },
307 }
308 offset += arr.len();
309 });
310 Ok(())
311 }
312}
313
314impl VecHash for NullChunked {
315 fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
316 let null_h = get_null_hash_value(&random_state);
317 buf.clear();
318 buf.resize(self.len(), null_h);
319 Ok(())
320 }
321
322 fn vec_hash_combine(
323 &self,
324 random_state: PlRandomState,
325 hashes: &mut [u64],
326 ) -> PolarsResult<()> {
327 let null_h = get_null_hash_value(&random_state);
328 hashes
329 .iter_mut()
330 .for_each(|h| *h = _boost_hash_combine(null_h, *h));
331 Ok(())
332 }
333}
334impl VecHash for BooleanChunked {
335 fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
336 buf.clear();
337 buf.reserve(self.len());
338 let true_h = random_state.hash_one(true);
339 let false_h = random_state.hash_one(false);
340 let null_h = get_null_hash_value(&random_state);
341 self.downcast_iter().for_each(|arr| {
342 if arr.null_count() == 0 {
343 buf.extend(arr.values_iter().map(|v| if v { true_h } else { false_h }))
344 } else {
345 buf.extend(arr.into_iter().map(|opt_v| match opt_v {
346 Some(true) => true_h,
347 Some(false) => false_h,
348 None => null_h,
349 }))
350 }
351 });
352 Ok(())
353 }
354
355 fn vec_hash_combine(
356 &self,
357 random_state: PlRandomState,
358 hashes: &mut [u64],
359 ) -> PolarsResult<()> {
360 let true_h = random_state.hash_one(true);
361 let false_h = random_state.hash_one(false);
362 let null_h = get_null_hash_value(&random_state);
363
364 let mut offset = 0;
365 self.downcast_iter().for_each(|arr| {
366 match arr.null_count() {
367 0 => arr
368 .values_iter()
369 .zip(&mut hashes[offset..])
370 .for_each(|(v, h)| {
371 let l = if v { true_h } else { false_h };
372 *h = _boost_hash_combine(l, *h)
373 }),
374 _ => {
375 let validity = arr.validity().unwrap();
376 let (slice, byte_offset, _) = validity.as_slice();
377 (0..validity.len())
378 .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
379 .zip(&mut hashes[offset..])
380 .zip(arr.values())
381 .for_each(|((valid, h), l)| {
382 let l = if valid {
383 if l { true_h } else { false_h }
384 } else {
385 null_h
386 };
387 *h = _boost_hash_combine(l, *h)
388 });
389 },
390 }
391 offset += arr.len();
392 });
393 Ok(())
394 }
395}
396
397#[cfg(feature = "object")]
398impl<T> VecHash for ObjectChunked<T>
399where
400 T: PolarsObject,
401{
402 fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
403 buf.clear();
410 buf.reserve(self.len());
411
412 self.downcast_iter()
413 .for_each(|arr| buf.extend(arr.into_iter().map(|opt_v| random_state.hash_one(opt_v))));
414
415 Ok(())
416 }
417
418 fn vec_hash_combine(
419 &self,
420 random_state: PlRandomState,
421 hashes: &mut [u64],
422 ) -> PolarsResult<()> {
423 self.apply_to_slice(
424 |opt_v, h| {
425 let hashed = random_state.hash_one(opt_v);
426 _boost_hash_combine(hashed, *h)
427 },
428 hashes,
429 );
430 Ok(())
431 }
432}
433
434pub fn _df_rows_to_hashes_threaded_vertical(
435 keys: &[DataFrame],
436 hasher_builder: Option<PlRandomState>,
437) -> PolarsResult<(Vec<UInt64Chunked>, PlRandomState)> {
438 let hasher_builder = hasher_builder.unwrap_or_default();
439
440 let hashes = POOL.install(|| {
441 keys.into_par_iter()
442 .map(|df| {
443 let hb = hasher_builder.clone();
444 let mut hashes = vec![];
445 columns_to_hashes(df.get_columns(), Some(hb), &mut hashes)?;
446 Ok(UInt64Chunked::from_vec(PlSmallStr::EMPTY, hashes))
447 })
448 .collect::<PolarsResult<Vec<_>>>()
449 })?;
450 Ok((hashes, hasher_builder))
451}
452
453pub fn columns_to_hashes(
454 keys: &[Column],
455 build_hasher: Option<PlRandomState>,
456 hashes: &mut Vec<u64>,
457) -> PolarsResult<PlRandomState> {
458 let build_hasher = build_hasher.unwrap_or_default();
459
460 let mut iter = keys.iter();
461 let first = iter.next().expect("at least one key");
462 first.vec_hash(build_hasher.clone(), hashes)?;
463
464 for keys in iter {
465 keys.vec_hash_combine(build_hasher.clone(), hashes)?;
466 }
467
468 Ok(build_hasher)
469}