polars_core/hashing/
vector_hasher.rs

1use std::hash::BuildHasher;
2
3use arrow::bitmap::utils::get_bit_unchecked;
4use polars_utils::aliases::PlSeedableRandomStateQuality;
5use polars_utils::hashing::{_boost_hash_combine, folded_multiply};
6use polars_utils::total_ord::{ToTotalOrd, TotalHash};
7use rayon::prelude::*;
8use xxhash_rust::xxh3::xxh3_64_with_seed;
9
10use super::*;
11use crate::POOL;
12use crate::prelude::*;
13use crate::series::implementations::null::NullChunked;
14
15// See: https://github.com/tkaitchuck/aHash/blob/f9acd508bd89e7c5b2877a9510098100f9018d64/src/operations.rs#L4
16const MULTIPLE: u64 = 6364136223846793005;
17
18// Read more:
19//  https://www.cockroachlabs.com/blog/vectorized-hash-joiner/
20//  http://myeyesareblind.com/2017/02/06/Combine-hash-values/
21
22pub trait VecHash {
23    /// Compute the hash for all values in the array.
24    fn vec_hash(
25        &self,
26        _random_state: PlSeedableRandomStateQuality,
27        _buf: &mut Vec<u64>,
28    ) -> PolarsResult<()> {
29        polars_bail!(un_impl = vec_hash);
30    }
31
32    fn vec_hash_combine(
33        &self,
34        _random_state: PlSeedableRandomStateQuality,
35        _hashes: &mut [u64],
36    ) -> PolarsResult<()> {
37        polars_bail!(un_impl = vec_hash_combine);
38    }
39}
40
41pub(crate) fn get_null_hash_value(random_state: &PlSeedableRandomStateQuality) -> u64 {
42    // we just start with a large prime number and hash that twice
43    // to get a constant hash value for null/None
44    let first = random_state.hash_one(3188347919usize);
45    random_state.hash_one(first)
46}
47
48fn insert_null_hash(
49    chunks: &[ArrayRef],
50    random_state: PlSeedableRandomStateQuality,
51    buf: &mut Vec<u64>,
52) {
53    let null_h = get_null_hash_value(&random_state);
54    let hashes = buf.as_mut_slice();
55
56    let mut offset = 0;
57    chunks.iter().for_each(|arr| {
58        if arr.null_count() > 0 {
59            let validity = arr.validity().unwrap();
60            let (slice, byte_offset, _) = validity.as_slice();
61            (0..validity.len())
62                .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
63                .zip(&mut hashes[offset..])
64                .for_each(|(valid, h)| {
65                    *h = [null_h, *h][valid as usize];
66                })
67        }
68        offset += arr.len();
69    });
70}
71
72fn numeric_vec_hash<T>(
73    ca: &ChunkedArray<T>,
74    random_state: PlSeedableRandomStateQuality,
75    buf: &mut Vec<u64>,
76) where
77    T: PolarsNumericType,
78    T::Native: TotalHash + ToTotalOrd,
79    <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
80{
81    // Note that we don't use the no null branch! This can break in unexpected ways.
82    // for instance with threading we split an array in n_threads, this may lead to
83    // splits that have no nulls and splits that have nulls. Then one array is hashed with
84    // Option<T> and the other array with T.
85    // Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
86    // the only deterministic seed.
87    buf.clear();
88    buf.reserve(ca.len());
89
90    #[allow(unused_unsafe)]
91    #[allow(clippy::useless_transmute)]
92    ca.downcast_iter().for_each(|arr| {
93        buf.extend(
94            arr.values()
95                .as_slice()
96                .iter()
97                .copied()
98                .map(|v| random_state.hash_one(v.to_total_ord())),
99        );
100    });
101    insert_null_hash(&ca.chunks, random_state, buf)
102}
103
104fn numeric_vec_hash_combine<T>(
105    ca: &ChunkedArray<T>,
106    random_state: PlSeedableRandomStateQuality,
107    hashes: &mut [u64],
108) where
109    T: PolarsNumericType,
110    T::Native: TotalHash + ToTotalOrd,
111    <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
112{
113    let null_h = get_null_hash_value(&random_state);
114
115    let mut offset = 0;
116    ca.downcast_iter().for_each(|arr| {
117        match arr.null_count() {
118            0 => arr
119                .values()
120                .as_slice()
121                .iter()
122                .zip(&mut hashes[offset..])
123                .for_each(|(v, h)| {
124                    // Inlined from ahash. This ensures we combine with the previous state.
125                    *h = folded_multiply(
126                        // Be careful not to xor the hash directly with the existing hash,
127                        // it would lead to 0-hashes for 2 columns containing equal values.
128                        random_state.hash_one(v.to_total_ord()) ^ folded_multiply(*h, MULTIPLE),
129                        MULTIPLE,
130                    );
131                }),
132            _ => {
133                let validity = arr.validity().unwrap();
134                let (slice, byte_offset, _) = validity.as_slice();
135                (0..validity.len())
136                    .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
137                    .zip(&mut hashes[offset..])
138                    .zip(arr.values().as_slice())
139                    .for_each(|((valid, h), l)| {
140                        let lh = random_state.hash_one(l.to_total_ord());
141                        let to_hash = [null_h, lh][valid as usize];
142                        *h = folded_multiply(to_hash ^ folded_multiply(*h, MULTIPLE), MULTIPLE);
143                    });
144            },
145        }
146        offset += arr.len();
147    });
148}
149
150macro_rules! vec_hash_numeric {
151    ($ca:ident) => {
152        impl VecHash for $ca {
153            fn vec_hash(
154                &self,
155                random_state: PlSeedableRandomStateQuality,
156                buf: &mut Vec<u64>,
157            ) -> PolarsResult<()> {
158                numeric_vec_hash(self, random_state, buf);
159                Ok(())
160            }
161
162            fn vec_hash_combine(
163                &self,
164                random_state: PlSeedableRandomStateQuality,
165                hashes: &mut [u64],
166            ) -> PolarsResult<()> {
167                numeric_vec_hash_combine(self, random_state, hashes);
168                Ok(())
169            }
170        }
171    };
172}
173
174vec_hash_numeric!(Int64Chunked);
175vec_hash_numeric!(Int32Chunked);
176vec_hash_numeric!(Int16Chunked);
177vec_hash_numeric!(Int8Chunked);
178vec_hash_numeric!(UInt64Chunked);
179vec_hash_numeric!(UInt32Chunked);
180vec_hash_numeric!(UInt16Chunked);
181vec_hash_numeric!(UInt8Chunked);
182vec_hash_numeric!(Float64Chunked);
183vec_hash_numeric!(Float32Chunked);
184#[cfg(any(feature = "dtype-decimal", feature = "dtype-i128"))]
185vec_hash_numeric!(Int128Chunked);
186
187impl VecHash for StringChunked {
188    fn vec_hash(
189        &self,
190        random_state: PlSeedableRandomStateQuality,
191        buf: &mut Vec<u64>,
192    ) -> PolarsResult<()> {
193        self.as_binary().vec_hash(random_state, buf)?;
194        Ok(())
195    }
196
197    fn vec_hash_combine(
198        &self,
199        random_state: PlSeedableRandomStateQuality,
200        hashes: &mut [u64],
201    ) -> PolarsResult<()> {
202        self.as_binary().vec_hash_combine(random_state, hashes)?;
203        Ok(())
204    }
205}
206
207// used in polars-pipe
208pub fn _hash_binary_array(
209    arr: &BinaryArray<i64>,
210    random_state: PlSeedableRandomStateQuality,
211    buf: &mut Vec<u64>,
212) {
213    let null_h = get_null_hash_value(&random_state);
214    if arr.null_count() == 0 {
215        // use the null_hash as seed to get a hash determined by `random_state` that is passed
216        buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
217    } else {
218        buf.extend(arr.into_iter().map(|opt_v| match opt_v {
219            Some(v) => xxh3_64_with_seed(v, null_h),
220            None => null_h,
221        }))
222    }
223}
224
225fn hash_binview_array(
226    arr: &BinaryViewArray,
227    random_state: PlSeedableRandomStateQuality,
228    buf: &mut Vec<u64>,
229) {
230    let null_h = get_null_hash_value(&random_state);
231    if arr.null_count() == 0 {
232        // use the null_hash as seed to get a hash determined by `random_state` that is passed
233        buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
234    } else {
235        buf.extend(arr.into_iter().map(|opt_v| match opt_v {
236            Some(v) => xxh3_64_with_seed(v, null_h),
237            None => null_h,
238        }))
239    }
240}
241
242impl VecHash for BinaryChunked {
243    fn vec_hash(
244        &self,
245        random_state: PlSeedableRandomStateQuality,
246        buf: &mut Vec<u64>,
247    ) -> PolarsResult<()> {
248        buf.clear();
249        buf.reserve(self.len());
250        self.downcast_iter()
251            .for_each(|arr| hash_binview_array(arr, random_state, buf));
252        Ok(())
253    }
254
255    fn vec_hash_combine(
256        &self,
257        random_state: PlSeedableRandomStateQuality,
258        hashes: &mut [u64],
259    ) -> PolarsResult<()> {
260        let null_h = get_null_hash_value(&random_state);
261
262        let mut offset = 0;
263        self.downcast_iter().for_each(|arr| {
264            match arr.null_count() {
265                0 => arr
266                    .values_iter()
267                    .zip(&mut hashes[offset..])
268                    .for_each(|(v, h)| {
269                        let l = xxh3_64_with_seed(v, null_h);
270                        *h = _boost_hash_combine(l, *h)
271                    }),
272                _ => {
273                    let validity = arr.validity().unwrap();
274                    let (slice, byte_offset, _) = validity.as_slice();
275                    (0..validity.len())
276                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
277                        .zip(&mut hashes[offset..])
278                        .zip(arr.values_iter())
279                        .for_each(|((valid, h), l)| {
280                            let l = if valid {
281                                xxh3_64_with_seed(l, null_h)
282                            } else {
283                                null_h
284                            };
285                            *h = _boost_hash_combine(l, *h)
286                        });
287                },
288            }
289            offset += arr.len();
290        });
291        Ok(())
292    }
293}
294
295impl VecHash for BinaryOffsetChunked {
296    fn vec_hash(
297        &self,
298        random_state: PlSeedableRandomStateQuality,
299        buf: &mut Vec<u64>,
300    ) -> PolarsResult<()> {
301        buf.clear();
302        buf.reserve(self.len());
303        self.downcast_iter()
304            .for_each(|arr| _hash_binary_array(arr, random_state, buf));
305        Ok(())
306    }
307
308    fn vec_hash_combine(
309        &self,
310        random_state: PlSeedableRandomStateQuality,
311        hashes: &mut [u64],
312    ) -> PolarsResult<()> {
313        let null_h = get_null_hash_value(&random_state);
314
315        let mut offset = 0;
316        self.downcast_iter().for_each(|arr| {
317            match arr.null_count() {
318                0 => arr
319                    .values_iter()
320                    .zip(&mut hashes[offset..])
321                    .for_each(|(v, h)| {
322                        let l = xxh3_64_with_seed(v, null_h);
323                        *h = _boost_hash_combine(l, *h)
324                    }),
325                _ => {
326                    let validity = arr.validity().unwrap();
327                    let (slice, byte_offset, _) = validity.as_slice();
328                    (0..validity.len())
329                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
330                        .zip(&mut hashes[offset..])
331                        .zip(arr.values_iter())
332                        .for_each(|((valid, h), l)| {
333                            let l = if valid {
334                                xxh3_64_with_seed(l, null_h)
335                            } else {
336                                null_h
337                            };
338                            *h = _boost_hash_combine(l, *h)
339                        });
340                },
341            }
342            offset += arr.len();
343        });
344        Ok(())
345    }
346}
347
348impl VecHash for NullChunked {
349    fn vec_hash(
350        &self,
351        random_state: PlSeedableRandomStateQuality,
352        buf: &mut Vec<u64>,
353    ) -> PolarsResult<()> {
354        let null_h = get_null_hash_value(&random_state);
355        buf.clear();
356        buf.resize(self.len(), null_h);
357        Ok(())
358    }
359
360    fn vec_hash_combine(
361        &self,
362        random_state: PlSeedableRandomStateQuality,
363        hashes: &mut [u64],
364    ) -> PolarsResult<()> {
365        let null_h = get_null_hash_value(&random_state);
366        hashes
367            .iter_mut()
368            .for_each(|h| *h = _boost_hash_combine(null_h, *h));
369        Ok(())
370    }
371}
372impl VecHash for BooleanChunked {
373    fn vec_hash(
374        &self,
375        random_state: PlSeedableRandomStateQuality,
376        buf: &mut Vec<u64>,
377    ) -> PolarsResult<()> {
378        buf.clear();
379        buf.reserve(self.len());
380        let true_h = random_state.hash_one(true);
381        let false_h = random_state.hash_one(false);
382        let null_h = get_null_hash_value(&random_state);
383        self.downcast_iter().for_each(|arr| {
384            if arr.null_count() == 0 {
385                buf.extend(arr.values_iter().map(|v| if v { true_h } else { false_h }))
386            } else {
387                buf.extend(arr.into_iter().map(|opt_v| match opt_v {
388                    Some(true) => true_h,
389                    Some(false) => false_h,
390                    None => null_h,
391                }))
392            }
393        });
394        Ok(())
395    }
396
397    fn vec_hash_combine(
398        &self,
399        random_state: PlSeedableRandomStateQuality,
400        hashes: &mut [u64],
401    ) -> PolarsResult<()> {
402        let true_h = random_state.hash_one(true);
403        let false_h = random_state.hash_one(false);
404        let null_h = get_null_hash_value(&random_state);
405
406        let mut offset = 0;
407        self.downcast_iter().for_each(|arr| {
408            match arr.null_count() {
409                0 => arr
410                    .values_iter()
411                    .zip(&mut hashes[offset..])
412                    .for_each(|(v, h)| {
413                        let l = if v { true_h } else { false_h };
414                        *h = _boost_hash_combine(l, *h)
415                    }),
416                _ => {
417                    let validity = arr.validity().unwrap();
418                    let (slice, byte_offset, _) = validity.as_slice();
419                    (0..validity.len())
420                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
421                        .zip(&mut hashes[offset..])
422                        .zip(arr.values())
423                        .for_each(|((valid, h), l)| {
424                            let l = if valid {
425                                if l { true_h } else { false_h }
426                            } else {
427                                null_h
428                            };
429                            *h = _boost_hash_combine(l, *h)
430                        });
431                },
432            }
433            offset += arr.len();
434        });
435        Ok(())
436    }
437}
438
439#[cfg(feature = "object")]
440impl<T> VecHash for ObjectChunked<T>
441where
442    T: PolarsObject,
443{
444    fn vec_hash(
445        &self,
446        random_state: PlSeedableRandomStateQuality,
447        buf: &mut Vec<u64>,
448    ) -> PolarsResult<()> {
449        // Note that we don't use the no null branch! This can break in unexpected ways.
450        // for instance with threading we split an array in n_threads, this may lead to
451        // splits that have no nulls and splits that have nulls. Then one array is hashed with
452        // Option<T> and the other array with T.
453        // Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
454        // the only deterministic seed.
455        buf.clear();
456        buf.reserve(self.len());
457
458        self.downcast_iter()
459            .for_each(|arr| buf.extend(arr.into_iter().map(|opt_v| random_state.hash_one(opt_v))));
460
461        Ok(())
462    }
463
464    fn vec_hash_combine(
465        &self,
466        random_state: PlSeedableRandomStateQuality,
467        hashes: &mut [u64],
468    ) -> PolarsResult<()> {
469        self.apply_to_slice(
470            |opt_v, h| {
471                let hashed = random_state.hash_one(opt_v);
472                _boost_hash_combine(hashed, *h)
473            },
474            hashes,
475        );
476        Ok(())
477    }
478}
479
480pub fn _df_rows_to_hashes_threaded_vertical(
481    keys: &[DataFrame],
482    build_hasher: Option<PlSeedableRandomStateQuality>,
483) -> PolarsResult<(Vec<UInt64Chunked>, PlSeedableRandomStateQuality)> {
484    let build_hasher = build_hasher.unwrap_or_default();
485
486    let hashes = POOL.install(|| {
487        keys.into_par_iter()
488            .map(|df| {
489                let hb = build_hasher;
490                let mut hashes = vec![];
491                columns_to_hashes(df.get_columns(), Some(hb), &mut hashes)?;
492                Ok(UInt64Chunked::from_vec(PlSmallStr::EMPTY, hashes))
493            })
494            .collect::<PolarsResult<Vec<_>>>()
495    })?;
496    Ok((hashes, build_hasher))
497}
498
499pub fn columns_to_hashes(
500    keys: &[Column],
501    build_hasher: Option<PlSeedableRandomStateQuality>,
502    hashes: &mut Vec<u64>,
503) -> PolarsResult<PlSeedableRandomStateQuality> {
504    let build_hasher = build_hasher.unwrap_or_default();
505
506    let mut iter = keys.iter();
507    let first = iter.next().expect("at least one key");
508    first.vec_hash(build_hasher, hashes)?;
509
510    for keys in iter {
511        keys.vec_hash_combine(build_hasher, hashes)?;
512    }
513
514    Ok(build_hasher)
515}