polars_core/hashing/
vector_hasher.rs

1use std::hash::BuildHasher;
2
3use arrow::bitmap::utils::get_bit_unchecked;
4use polars_utils::aliases::PlSeedableRandomStateQuality;
5use polars_utils::hashing::{_boost_hash_combine, folded_multiply};
6use polars_utils::total_ord::{ToTotalOrd, TotalHash};
7use rayon::prelude::*;
8use xxhash_rust::xxh3::xxh3_64_with_seed;
9
10use super::*;
11use crate::POOL;
12use crate::prelude::*;
13use crate::series::implementations::null::NullChunked;
14
15// See: https://github.com/tkaitchuck/aHash/blob/f9acd508bd89e7c5b2877a9510098100f9018d64/src/operations.rs#L4
16const MULTIPLE: u64 = 6364136223846793005;
17
18// Read more:
19//  https://www.cockroachlabs.com/blog/vectorized-hash-joiner/
20//  http://myeyesareblind.com/2017/02/06/Combine-hash-values/
21
22pub trait VecHash {
23    /// Compute the hash for all values in the array.
24    fn vec_hash(
25        &self,
26        _random_state: PlSeedableRandomStateQuality,
27        _buf: &mut Vec<u64>,
28    ) -> PolarsResult<()>;
29
30    fn vec_hash_combine(
31        &self,
32        _random_state: PlSeedableRandomStateQuality,
33        _hashes: &mut [u64],
34    ) -> PolarsResult<()>;
35}
36
37pub(crate) fn get_null_hash_value(random_state: &PlSeedableRandomStateQuality) -> u64 {
38    // we just start with a large prime number and hash that twice
39    // to get a constant hash value for null/None
40    let first = random_state.hash_one(3188347919usize);
41    random_state.hash_one(first)
42}
43
44fn insert_null_hash(
45    chunks: &[ArrayRef],
46    random_state: PlSeedableRandomStateQuality,
47    buf: &mut Vec<u64>,
48) {
49    let null_h = get_null_hash_value(&random_state);
50    let hashes = buf.as_mut_slice();
51
52    let mut offset = 0;
53    chunks.iter().for_each(|arr| {
54        if arr.null_count() > 0 {
55            let validity = arr.validity().unwrap();
56            let (slice, byte_offset, _) = validity.as_slice();
57            (0..validity.len())
58                .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
59                .zip(&mut hashes[offset..])
60                .for_each(|(valid, h)| {
61                    *h = [null_h, *h][valid as usize];
62                })
63        }
64        offset += arr.len();
65    });
66}
67
68fn numeric_vec_hash<T>(
69    ca: &ChunkedArray<T>,
70    random_state: PlSeedableRandomStateQuality,
71    buf: &mut Vec<u64>,
72) where
73    T: PolarsNumericType,
74    T::Native: TotalHash + ToTotalOrd,
75    <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
76{
77    // Note that we don't use the no null branch! This can break in unexpected ways.
78    // for instance with threading we split an array in n_threads, this may lead to
79    // splits that have no nulls and splits that have nulls. Then one array is hashed with
80    // Option<T> and the other array with T.
81    // Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
82    // the only deterministic seed.
83    buf.clear();
84    buf.reserve(ca.len());
85
86    #[allow(unused_unsafe)]
87    #[allow(clippy::useless_transmute)]
88    ca.downcast_iter().for_each(|arr| {
89        buf.extend(
90            arr.values()
91                .as_slice()
92                .iter()
93                .copied()
94                .map(|v| random_state.hash_one(v.to_total_ord())),
95        );
96    });
97    insert_null_hash(&ca.chunks, random_state, buf)
98}
99
100fn numeric_vec_hash_combine<T>(
101    ca: &ChunkedArray<T>,
102    random_state: PlSeedableRandomStateQuality,
103    hashes: &mut [u64],
104) where
105    T: PolarsNumericType,
106    T::Native: TotalHash + ToTotalOrd,
107    <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
108{
109    let null_h = get_null_hash_value(&random_state);
110
111    let mut offset = 0;
112    ca.downcast_iter().for_each(|arr| {
113        match arr.null_count() {
114            0 => arr
115                .values()
116                .as_slice()
117                .iter()
118                .zip(&mut hashes[offset..])
119                .for_each(|(v, h)| {
120                    // Inlined from ahash. This ensures we combine with the previous state.
121                    *h = folded_multiply(
122                        // Be careful not to xor the hash directly with the existing hash,
123                        // it would lead to 0-hashes for 2 columns containing equal values.
124                        random_state.hash_one(v.to_total_ord()) ^ folded_multiply(*h, MULTIPLE),
125                        MULTIPLE,
126                    );
127                }),
128            _ => {
129                let validity = arr.validity().unwrap();
130                let (slice, byte_offset, _) = validity.as_slice();
131                (0..validity.len())
132                    .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
133                    .zip(&mut hashes[offset..])
134                    .zip(arr.values().as_slice())
135                    .for_each(|((valid, h), l)| {
136                        let lh = random_state.hash_one(l.to_total_ord());
137                        let to_hash = [null_h, lh][valid as usize];
138                        *h = folded_multiply(to_hash ^ folded_multiply(*h, MULTIPLE), MULTIPLE);
139                    });
140            },
141        }
142        offset += arr.len();
143    });
144}
145
146macro_rules! vec_hash_numeric {
147    ($ca:ident) => {
148        impl VecHash for $ca {
149            fn vec_hash(
150                &self,
151                random_state: PlSeedableRandomStateQuality,
152                buf: &mut Vec<u64>,
153            ) -> PolarsResult<()> {
154                numeric_vec_hash(self, random_state, buf);
155                Ok(())
156            }
157
158            fn vec_hash_combine(
159                &self,
160                random_state: PlSeedableRandomStateQuality,
161                hashes: &mut [u64],
162            ) -> PolarsResult<()> {
163                numeric_vec_hash_combine(self, random_state, hashes);
164                Ok(())
165            }
166        }
167    };
168}
169
170vec_hash_numeric!(Int64Chunked);
171vec_hash_numeric!(Int32Chunked);
172vec_hash_numeric!(Int16Chunked);
173vec_hash_numeric!(Int8Chunked);
174vec_hash_numeric!(UInt64Chunked);
175vec_hash_numeric!(UInt32Chunked);
176vec_hash_numeric!(UInt16Chunked);
177vec_hash_numeric!(UInt8Chunked);
178vec_hash_numeric!(Float64Chunked);
179vec_hash_numeric!(Float32Chunked);
180#[cfg(any(feature = "dtype-decimal", feature = "dtype-i128"))]
181vec_hash_numeric!(Int128Chunked);
182
183impl VecHash for StringChunked {
184    fn vec_hash(
185        &self,
186        random_state: PlSeedableRandomStateQuality,
187        buf: &mut Vec<u64>,
188    ) -> PolarsResult<()> {
189        self.as_binary().vec_hash(random_state, buf)?;
190        Ok(())
191    }
192
193    fn vec_hash_combine(
194        &self,
195        random_state: PlSeedableRandomStateQuality,
196        hashes: &mut [u64],
197    ) -> PolarsResult<()> {
198        self.as_binary().vec_hash_combine(random_state, hashes)?;
199        Ok(())
200    }
201}
202
203// used in polars-pipe
204pub fn _hash_binary_array(
205    arr: &BinaryArray<i64>,
206    random_state: PlSeedableRandomStateQuality,
207    buf: &mut Vec<u64>,
208) {
209    let null_h = get_null_hash_value(&random_state);
210    if arr.null_count() == 0 {
211        // use the null_hash as seed to get a hash determined by `random_state` that is passed
212        buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
213    } else {
214        buf.extend(arr.into_iter().map(|opt_v| match opt_v {
215            Some(v) => xxh3_64_with_seed(v, null_h),
216            None => null_h,
217        }))
218    }
219}
220
221fn hash_binview_array(
222    arr: &BinaryViewArray,
223    random_state: PlSeedableRandomStateQuality,
224    buf: &mut Vec<u64>,
225) {
226    let null_h = get_null_hash_value(&random_state);
227    if arr.null_count() == 0 {
228        // use the null_hash as seed to get a hash determined by `random_state` that is passed
229        buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
230    } else {
231        buf.extend(arr.into_iter().map(|opt_v| match opt_v {
232            Some(v) => xxh3_64_with_seed(v, null_h),
233            None => null_h,
234        }))
235    }
236}
237
238impl VecHash for BinaryChunked {
239    fn vec_hash(
240        &self,
241        random_state: PlSeedableRandomStateQuality,
242        buf: &mut Vec<u64>,
243    ) -> PolarsResult<()> {
244        buf.clear();
245        buf.reserve(self.len());
246        self.downcast_iter()
247            .for_each(|arr| hash_binview_array(arr, random_state, buf));
248        Ok(())
249    }
250
251    fn vec_hash_combine(
252        &self,
253        random_state: PlSeedableRandomStateQuality,
254        hashes: &mut [u64],
255    ) -> PolarsResult<()> {
256        let null_h = get_null_hash_value(&random_state);
257
258        let mut offset = 0;
259        self.downcast_iter().for_each(|arr| {
260            match arr.null_count() {
261                0 => arr
262                    .values_iter()
263                    .zip(&mut hashes[offset..])
264                    .for_each(|(v, h)| {
265                        let l = xxh3_64_with_seed(v, null_h);
266                        *h = _boost_hash_combine(l, *h)
267                    }),
268                _ => {
269                    let validity = arr.validity().unwrap();
270                    let (slice, byte_offset, _) = validity.as_slice();
271                    (0..validity.len())
272                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
273                        .zip(&mut hashes[offset..])
274                        .zip(arr.values_iter())
275                        .for_each(|((valid, h), l)| {
276                            let l = if valid {
277                                xxh3_64_with_seed(l, null_h)
278                            } else {
279                                null_h
280                            };
281                            *h = _boost_hash_combine(l, *h)
282                        });
283                },
284            }
285            offset += arr.len();
286        });
287        Ok(())
288    }
289}
290
291impl VecHash for BinaryOffsetChunked {
292    fn vec_hash(
293        &self,
294        random_state: PlSeedableRandomStateQuality,
295        buf: &mut Vec<u64>,
296    ) -> PolarsResult<()> {
297        buf.clear();
298        buf.reserve(self.len());
299        self.downcast_iter()
300            .for_each(|arr| _hash_binary_array(arr, random_state, buf));
301        Ok(())
302    }
303
304    fn vec_hash_combine(
305        &self,
306        random_state: PlSeedableRandomStateQuality,
307        hashes: &mut [u64],
308    ) -> PolarsResult<()> {
309        let null_h = get_null_hash_value(&random_state);
310
311        let mut offset = 0;
312        self.downcast_iter().for_each(|arr| {
313            match arr.null_count() {
314                0 => arr
315                    .values_iter()
316                    .zip(&mut hashes[offset..])
317                    .for_each(|(v, h)| {
318                        let l = xxh3_64_with_seed(v, null_h);
319                        *h = _boost_hash_combine(l, *h)
320                    }),
321                _ => {
322                    let validity = arr.validity().unwrap();
323                    let (slice, byte_offset, _) = validity.as_slice();
324                    (0..validity.len())
325                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
326                        .zip(&mut hashes[offset..])
327                        .zip(arr.values_iter())
328                        .for_each(|((valid, h), l)| {
329                            let l = if valid {
330                                xxh3_64_with_seed(l, null_h)
331                            } else {
332                                null_h
333                            };
334                            *h = _boost_hash_combine(l, *h)
335                        });
336                },
337            }
338            offset += arr.len();
339        });
340        Ok(())
341    }
342}
343
344impl VecHash for NullChunked {
345    fn vec_hash(
346        &self,
347        random_state: PlSeedableRandomStateQuality,
348        buf: &mut Vec<u64>,
349    ) -> PolarsResult<()> {
350        let null_h = get_null_hash_value(&random_state);
351        buf.clear();
352        buf.resize(self.len(), null_h);
353        Ok(())
354    }
355
356    fn vec_hash_combine(
357        &self,
358        random_state: PlSeedableRandomStateQuality,
359        hashes: &mut [u64],
360    ) -> PolarsResult<()> {
361        let null_h = get_null_hash_value(&random_state);
362        hashes
363            .iter_mut()
364            .for_each(|h| *h = _boost_hash_combine(null_h, *h));
365        Ok(())
366    }
367}
368impl VecHash for BooleanChunked {
369    fn vec_hash(
370        &self,
371        random_state: PlSeedableRandomStateQuality,
372        buf: &mut Vec<u64>,
373    ) -> PolarsResult<()> {
374        buf.clear();
375        buf.reserve(self.len());
376        let true_h = random_state.hash_one(true);
377        let false_h = random_state.hash_one(false);
378        let null_h = get_null_hash_value(&random_state);
379        self.downcast_iter().for_each(|arr| {
380            if arr.null_count() == 0 {
381                buf.extend(arr.values_iter().map(|v| if v { true_h } else { false_h }))
382            } else {
383                buf.extend(arr.into_iter().map(|opt_v| match opt_v {
384                    Some(true) => true_h,
385                    Some(false) => false_h,
386                    None => null_h,
387                }))
388            }
389        });
390        Ok(())
391    }
392
393    fn vec_hash_combine(
394        &self,
395        random_state: PlSeedableRandomStateQuality,
396        hashes: &mut [u64],
397    ) -> PolarsResult<()> {
398        let true_h = random_state.hash_one(true);
399        let false_h = random_state.hash_one(false);
400        let null_h = get_null_hash_value(&random_state);
401
402        let mut offset = 0;
403        self.downcast_iter().for_each(|arr| {
404            match arr.null_count() {
405                0 => arr
406                    .values_iter()
407                    .zip(&mut hashes[offset..])
408                    .for_each(|(v, h)| {
409                        let l = if v { true_h } else { false_h };
410                        *h = _boost_hash_combine(l, *h)
411                    }),
412                _ => {
413                    let validity = arr.validity().unwrap();
414                    let (slice, byte_offset, _) = validity.as_slice();
415                    (0..validity.len())
416                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
417                        .zip(&mut hashes[offset..])
418                        .zip(arr.values())
419                        .for_each(|((valid, h), l)| {
420                            let l = if valid {
421                                if l { true_h } else { false_h }
422                            } else {
423                                null_h
424                            };
425                            *h = _boost_hash_combine(l, *h)
426                        });
427                },
428            }
429            offset += arr.len();
430        });
431        Ok(())
432    }
433}
434
435#[cfg(feature = "object")]
436impl<T> VecHash for ObjectChunked<T>
437where
438    T: PolarsObject,
439{
440    fn vec_hash(
441        &self,
442        random_state: PlSeedableRandomStateQuality,
443        buf: &mut Vec<u64>,
444    ) -> PolarsResult<()> {
445        // Note that we don't use the no null branch! This can break in unexpected ways.
446        // for instance with threading we split an array in n_threads, this may lead to
447        // splits that have no nulls and splits that have nulls. Then one array is hashed with
448        // Option<T> and the other array with T.
449        // Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
450        // the only deterministic seed.
451        buf.clear();
452        buf.reserve(self.len());
453
454        self.downcast_iter()
455            .for_each(|arr| buf.extend(arr.into_iter().map(|opt_v| random_state.hash_one(opt_v))));
456
457        Ok(())
458    }
459
460    fn vec_hash_combine(
461        &self,
462        random_state: PlSeedableRandomStateQuality,
463        hashes: &mut [u64],
464    ) -> PolarsResult<()> {
465        self.apply_to_slice(
466            |opt_v, h| {
467                let hashed = random_state.hash_one(opt_v);
468                _boost_hash_combine(hashed, *h)
469            },
470            hashes,
471        );
472        Ok(())
473    }
474}
475
476pub fn _df_rows_to_hashes_threaded_vertical(
477    keys: &[DataFrame],
478    build_hasher: Option<PlSeedableRandomStateQuality>,
479) -> PolarsResult<(Vec<UInt64Chunked>, PlSeedableRandomStateQuality)> {
480    let build_hasher = build_hasher.unwrap_or_default();
481
482    let hashes = POOL.install(|| {
483        keys.into_par_iter()
484            .map(|df| {
485                let hb = build_hasher;
486                let mut hashes = vec![];
487                columns_to_hashes(df.get_columns(), Some(hb), &mut hashes)?;
488                Ok(UInt64Chunked::from_vec(PlSmallStr::EMPTY, hashes))
489            })
490            .collect::<PolarsResult<Vec<_>>>()
491    })?;
492    Ok((hashes, build_hasher))
493}
494
495pub fn columns_to_hashes(
496    keys: &[Column],
497    build_hasher: Option<PlSeedableRandomStateQuality>,
498    hashes: &mut Vec<u64>,
499) -> PolarsResult<PlSeedableRandomStateQuality> {
500    let build_hasher = build_hasher.unwrap_or_default();
501
502    let mut iter = keys.iter();
503    let first = iter.next().expect("at least one key");
504    first.vec_hash(build_hasher, hashes)?;
505
506    for keys in iter {
507        keys.vec_hash_combine(build_hasher, hashes)?;
508    }
509
510    Ok(build_hasher)
511}