polars_core/hashing/
vector_hasher.rs

1use std::hash::BuildHasher;
2
3use arrow::bitmap::utils::get_bit_unchecked;
4use polars_utils::aliases::PlSeedableRandomStateQuality;
5use polars_utils::hashing::{_boost_hash_combine, folded_multiply};
6use polars_utils::total_ord::{ToTotalOrd, TotalHash};
7use rayon::prelude::*;
8use xxhash_rust::xxh3::xxh3_64_with_seed;
9
10use super::*;
11use crate::POOL;
12use crate::prelude::*;
13use crate::series::implementations::null::NullChunked;
14
15// See: https://github.com/tkaitchuck/aHash/blob/f9acd508bd89e7c5b2877a9510098100f9018d64/src/operations.rs#L4
16const MULTIPLE: u64 = 6364136223846793005;
17
18// Read more:
19//  https://www.cockroachlabs.com/blog/vectorized-hash-joiner/
20//  http://myeyesareblind.com/2017/02/06/Combine-hash-values/
21
22pub trait VecHash {
23    /// Compute the hash for all values in the array.
24    fn vec_hash(
25        &self,
26        _random_state: PlSeedableRandomStateQuality,
27        _buf: &mut Vec<u64>,
28    ) -> PolarsResult<()>;
29
30    fn vec_hash_combine(
31        &self,
32        _random_state: PlSeedableRandomStateQuality,
33        _hashes: &mut [u64],
34    ) -> PolarsResult<()>;
35}
36
37pub(crate) fn get_null_hash_value(random_state: &PlSeedableRandomStateQuality) -> u64 {
38    // we just start with a large prime number and hash that twice
39    // to get a constant hash value for null/None
40    let first = random_state.hash_one(3188347919usize);
41    random_state.hash_one(first)
42}
43
44fn insert_null_hash(
45    chunks: &[ArrayRef],
46    random_state: PlSeedableRandomStateQuality,
47    buf: &mut Vec<u64>,
48) {
49    let null_h = get_null_hash_value(&random_state);
50    let hashes = buf.as_mut_slice();
51
52    let mut offset = 0;
53    chunks.iter().for_each(|arr| {
54        if arr.null_count() > 0 {
55            let validity = arr.validity().unwrap();
56            let (slice, byte_offset, _) = validity.as_slice();
57            (0..validity.len())
58                .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
59                .zip(&mut hashes[offset..])
60                .for_each(|(valid, h)| {
61                    *h = [null_h, *h][valid as usize];
62                })
63        }
64        offset += arr.len();
65    });
66}
67
68fn numeric_vec_hash<T>(
69    ca: &ChunkedArray<T>,
70    random_state: PlSeedableRandomStateQuality,
71    buf: &mut Vec<u64>,
72) where
73    T: PolarsNumericType,
74    T::Native: TotalHash + ToTotalOrd,
75    <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
76{
77    // Note that we don't use the no null branch! This can break in unexpected ways.
78    // for instance with threading we split an array in n_threads, this may lead to
79    // splits that have no nulls and splits that have nulls. Then one array is hashed with
80    // Option<T> and the other array with T.
81    // Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
82    // the only deterministic seed.
83    buf.clear();
84    buf.reserve(ca.len());
85
86    #[allow(unused_unsafe)]
87    #[allow(clippy::useless_transmute)]
88    ca.downcast_iter().for_each(|arr| {
89        buf.extend(
90            arr.values()
91                .as_slice()
92                .iter()
93                .copied()
94                .map(|v| random_state.hash_one(v.to_total_ord())),
95        );
96    });
97    insert_null_hash(&ca.chunks, random_state, buf)
98}
99
100fn numeric_vec_hash_combine<T>(
101    ca: &ChunkedArray<T>,
102    random_state: PlSeedableRandomStateQuality,
103    hashes: &mut [u64],
104) where
105    T: PolarsNumericType,
106    T::Native: TotalHash + ToTotalOrd,
107    <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
108{
109    let null_h = get_null_hash_value(&random_state);
110
111    let mut offset = 0;
112    ca.downcast_iter().for_each(|arr| {
113        match arr.null_count() {
114            0 => arr
115                .values()
116                .as_slice()
117                .iter()
118                .zip(&mut hashes[offset..])
119                .for_each(|(v, h)| {
120                    // Inlined from ahash. This ensures we combine with the previous state.
121                    *h = folded_multiply(
122                        // Be careful not to xor the hash directly with the existing hash,
123                        // it would lead to 0-hashes for 2 columns containing equal values.
124                        random_state.hash_one(v.to_total_ord()) ^ folded_multiply(*h, MULTIPLE),
125                        MULTIPLE,
126                    );
127                }),
128            _ => {
129                let validity = arr.validity().unwrap();
130                let (slice, byte_offset, _) = validity.as_slice();
131                (0..validity.len())
132                    .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
133                    .zip(&mut hashes[offset..])
134                    .zip(arr.values().as_slice())
135                    .for_each(|((valid, h), l)| {
136                        let lh = random_state.hash_one(l.to_total_ord());
137                        let to_hash = [null_h, lh][valid as usize];
138                        *h = folded_multiply(to_hash ^ folded_multiply(*h, MULTIPLE), MULTIPLE);
139                    });
140            },
141        }
142        offset += arr.len();
143    });
144}
145
146macro_rules! vec_hash_numeric {
147    ($ca:ident) => {
148        impl VecHash for $ca {
149            fn vec_hash(
150                &self,
151                random_state: PlSeedableRandomStateQuality,
152                buf: &mut Vec<u64>,
153            ) -> PolarsResult<()> {
154                numeric_vec_hash(self, random_state, buf);
155                Ok(())
156            }
157
158            fn vec_hash_combine(
159                &self,
160                random_state: PlSeedableRandomStateQuality,
161                hashes: &mut [u64],
162            ) -> PolarsResult<()> {
163                numeric_vec_hash_combine(self, random_state, hashes);
164                Ok(())
165            }
166        }
167    };
168}
169
170vec_hash_numeric!(Int64Chunked);
171vec_hash_numeric!(Int32Chunked);
172vec_hash_numeric!(Int16Chunked);
173vec_hash_numeric!(Int8Chunked);
174vec_hash_numeric!(UInt64Chunked);
175vec_hash_numeric!(UInt32Chunked);
176vec_hash_numeric!(UInt16Chunked);
177vec_hash_numeric!(UInt8Chunked);
178vec_hash_numeric!(Float64Chunked);
179vec_hash_numeric!(Float32Chunked);
180#[cfg(feature = "dtype-u128")]
181vec_hash_numeric!(UInt128Chunked);
182#[cfg(any(feature = "dtype-decimal", feature = "dtype-i128"))]
183vec_hash_numeric!(Int128Chunked);
184
185impl VecHash for StringChunked {
186    fn vec_hash(
187        &self,
188        random_state: PlSeedableRandomStateQuality,
189        buf: &mut Vec<u64>,
190    ) -> PolarsResult<()> {
191        self.as_binary().vec_hash(random_state, buf)?;
192        Ok(())
193    }
194
195    fn vec_hash_combine(
196        &self,
197        random_state: PlSeedableRandomStateQuality,
198        hashes: &mut [u64],
199    ) -> PolarsResult<()> {
200        self.as_binary().vec_hash_combine(random_state, hashes)?;
201        Ok(())
202    }
203}
204
205// used in polars-pipe
206pub fn _hash_binary_array(
207    arr: &BinaryArray<i64>,
208    random_state: PlSeedableRandomStateQuality,
209    buf: &mut Vec<u64>,
210) {
211    let null_h = get_null_hash_value(&random_state);
212    if arr.null_count() == 0 {
213        // use the null_hash as seed to get a hash determined by `random_state` that is passed
214        buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
215    } else {
216        buf.extend(arr.into_iter().map(|opt_v| match opt_v {
217            Some(v) => xxh3_64_with_seed(v, null_h),
218            None => null_h,
219        }))
220    }
221}
222
223fn hash_binview_array(
224    arr: &BinaryViewArray,
225    random_state: PlSeedableRandomStateQuality,
226    buf: &mut Vec<u64>,
227) {
228    let null_h = get_null_hash_value(&random_state);
229    if arr.null_count() == 0 {
230        // use the null_hash as seed to get a hash determined by `random_state` that is passed
231        buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
232    } else {
233        buf.extend(arr.into_iter().map(|opt_v| match opt_v {
234            Some(v) => xxh3_64_with_seed(v, null_h),
235            None => null_h,
236        }))
237    }
238}
239
240impl VecHash for BinaryChunked {
241    fn vec_hash(
242        &self,
243        random_state: PlSeedableRandomStateQuality,
244        buf: &mut Vec<u64>,
245    ) -> PolarsResult<()> {
246        buf.clear();
247        buf.reserve(self.len());
248        self.downcast_iter()
249            .for_each(|arr| hash_binview_array(arr, random_state, buf));
250        Ok(())
251    }
252
253    fn vec_hash_combine(
254        &self,
255        random_state: PlSeedableRandomStateQuality,
256        hashes: &mut [u64],
257    ) -> PolarsResult<()> {
258        let null_h = get_null_hash_value(&random_state);
259
260        let mut offset = 0;
261        self.downcast_iter().for_each(|arr| {
262            match arr.null_count() {
263                0 => arr
264                    .values_iter()
265                    .zip(&mut hashes[offset..])
266                    .for_each(|(v, h)| {
267                        let l = xxh3_64_with_seed(v, null_h);
268                        *h = _boost_hash_combine(l, *h)
269                    }),
270                _ => {
271                    let validity = arr.validity().unwrap();
272                    let (slice, byte_offset, _) = validity.as_slice();
273                    (0..validity.len())
274                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
275                        .zip(&mut hashes[offset..])
276                        .zip(arr.values_iter())
277                        .for_each(|((valid, h), l)| {
278                            let l = if valid {
279                                xxh3_64_with_seed(l, null_h)
280                            } else {
281                                null_h
282                            };
283                            *h = _boost_hash_combine(l, *h)
284                        });
285                },
286            }
287            offset += arr.len();
288        });
289        Ok(())
290    }
291}
292
293impl VecHash for BinaryOffsetChunked {
294    fn vec_hash(
295        &self,
296        random_state: PlSeedableRandomStateQuality,
297        buf: &mut Vec<u64>,
298    ) -> PolarsResult<()> {
299        buf.clear();
300        buf.reserve(self.len());
301        self.downcast_iter()
302            .for_each(|arr| _hash_binary_array(arr, random_state, buf));
303        Ok(())
304    }
305
306    fn vec_hash_combine(
307        &self,
308        random_state: PlSeedableRandomStateQuality,
309        hashes: &mut [u64],
310    ) -> PolarsResult<()> {
311        let null_h = get_null_hash_value(&random_state);
312
313        let mut offset = 0;
314        self.downcast_iter().for_each(|arr| {
315            match arr.null_count() {
316                0 => arr
317                    .values_iter()
318                    .zip(&mut hashes[offset..])
319                    .for_each(|(v, h)| {
320                        let l = xxh3_64_with_seed(v, null_h);
321                        *h = _boost_hash_combine(l, *h)
322                    }),
323                _ => {
324                    let validity = arr.validity().unwrap();
325                    let (slice, byte_offset, _) = validity.as_slice();
326                    (0..validity.len())
327                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
328                        .zip(&mut hashes[offset..])
329                        .zip(arr.values_iter())
330                        .for_each(|((valid, h), l)| {
331                            let l = if valid {
332                                xxh3_64_with_seed(l, null_h)
333                            } else {
334                                null_h
335                            };
336                            *h = _boost_hash_combine(l, *h)
337                        });
338                },
339            }
340            offset += arr.len();
341        });
342        Ok(())
343    }
344}
345
346impl VecHash for NullChunked {
347    fn vec_hash(
348        &self,
349        random_state: PlSeedableRandomStateQuality,
350        buf: &mut Vec<u64>,
351    ) -> PolarsResult<()> {
352        let null_h = get_null_hash_value(&random_state);
353        buf.clear();
354        buf.resize(self.len(), null_h);
355        Ok(())
356    }
357
358    fn vec_hash_combine(
359        &self,
360        random_state: PlSeedableRandomStateQuality,
361        hashes: &mut [u64],
362    ) -> PolarsResult<()> {
363        let null_h = get_null_hash_value(&random_state);
364        hashes
365            .iter_mut()
366            .for_each(|h| *h = _boost_hash_combine(null_h, *h));
367        Ok(())
368    }
369}
370impl VecHash for BooleanChunked {
371    fn vec_hash(
372        &self,
373        random_state: PlSeedableRandomStateQuality,
374        buf: &mut Vec<u64>,
375    ) -> PolarsResult<()> {
376        buf.clear();
377        buf.reserve(self.len());
378        let true_h = random_state.hash_one(true);
379        let false_h = random_state.hash_one(false);
380        let null_h = get_null_hash_value(&random_state);
381        self.downcast_iter().for_each(|arr| {
382            if arr.null_count() == 0 {
383                buf.extend(arr.values_iter().map(|v| if v { true_h } else { false_h }))
384            } else {
385                buf.extend(arr.into_iter().map(|opt_v| match opt_v {
386                    Some(true) => true_h,
387                    Some(false) => false_h,
388                    None => null_h,
389                }))
390            }
391        });
392        Ok(())
393    }
394
395    fn vec_hash_combine(
396        &self,
397        random_state: PlSeedableRandomStateQuality,
398        hashes: &mut [u64],
399    ) -> PolarsResult<()> {
400        let true_h = random_state.hash_one(true);
401        let false_h = random_state.hash_one(false);
402        let null_h = get_null_hash_value(&random_state);
403
404        let mut offset = 0;
405        self.downcast_iter().for_each(|arr| {
406            match arr.null_count() {
407                0 => arr
408                    .values_iter()
409                    .zip(&mut hashes[offset..])
410                    .for_each(|(v, h)| {
411                        let l = if v { true_h } else { false_h };
412                        *h = _boost_hash_combine(l, *h)
413                    }),
414                _ => {
415                    let validity = arr.validity().unwrap();
416                    let (slice, byte_offset, _) = validity.as_slice();
417                    (0..validity.len())
418                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
419                        .zip(&mut hashes[offset..])
420                        .zip(arr.values())
421                        .for_each(|((valid, h), l)| {
422                            let l = if valid {
423                                if l { true_h } else { false_h }
424                            } else {
425                                null_h
426                            };
427                            *h = _boost_hash_combine(l, *h)
428                        });
429                },
430            }
431            offset += arr.len();
432        });
433        Ok(())
434    }
435}
436
437#[cfg(feature = "object")]
438impl<T> VecHash for ObjectChunked<T>
439where
440    T: PolarsObject,
441{
442    fn vec_hash(
443        &self,
444        random_state: PlSeedableRandomStateQuality,
445        buf: &mut Vec<u64>,
446    ) -> PolarsResult<()> {
447        // Note that we don't use the no null branch! This can break in unexpected ways.
448        // for instance with threading we split an array in n_threads, this may lead to
449        // splits that have no nulls and splits that have nulls. Then one array is hashed with
450        // Option<T> and the other array with T.
451        // Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
452        // the only deterministic seed.
453        buf.clear();
454        buf.reserve(self.len());
455
456        self.downcast_iter()
457            .for_each(|arr| buf.extend(arr.into_iter().map(|opt_v| random_state.hash_one(opt_v))));
458
459        Ok(())
460    }
461
462    fn vec_hash_combine(
463        &self,
464        random_state: PlSeedableRandomStateQuality,
465        hashes: &mut [u64],
466    ) -> PolarsResult<()> {
467        self.apply_to_slice(
468            |opt_v, h| {
469                let hashed = random_state.hash_one(opt_v);
470                _boost_hash_combine(hashed, *h)
471            },
472            hashes,
473        );
474        Ok(())
475    }
476}
477
478pub fn _df_rows_to_hashes_threaded_vertical(
479    keys: &[DataFrame],
480    build_hasher: Option<PlSeedableRandomStateQuality>,
481) -> PolarsResult<(Vec<UInt64Chunked>, PlSeedableRandomStateQuality)> {
482    let build_hasher = build_hasher.unwrap_or_default();
483
484    let hashes = POOL.install(|| {
485        keys.into_par_iter()
486            .map(|df| {
487                let hb = build_hasher;
488                let mut hashes = vec![];
489                columns_to_hashes(df.get_columns(), Some(hb), &mut hashes)?;
490                Ok(UInt64Chunked::from_vec(PlSmallStr::EMPTY, hashes))
491            })
492            .collect::<PolarsResult<Vec<_>>>()
493    })?;
494    Ok((hashes, build_hasher))
495}
496
497pub fn columns_to_hashes(
498    keys: &[Column],
499    build_hasher: Option<PlSeedableRandomStateQuality>,
500    hashes: &mut Vec<u64>,
501) -> PolarsResult<PlSeedableRandomStateQuality> {
502    let build_hasher = build_hasher.unwrap_or_default();
503
504    let mut iter = keys.iter();
505    let first = iter.next().expect("at least one key");
506    first.vec_hash(build_hasher, hashes)?;
507
508    for keys in iter {
509        keys.vec_hash_combine(build_hasher, hashes)?;
510    }
511
512    Ok(build_hasher)
513}