polars_core/hashing/
vector_hasher.rs

1use arrow::bitmap::utils::get_bit_unchecked;
2use polars_utils::hashing::{_boost_hash_combine, folded_multiply};
3use polars_utils::total_ord::{ToTotalOrd, TotalHash};
4use rayon::prelude::*;
5use xxhash_rust::xxh3::xxh3_64_with_seed;
6
7use super::*;
8use crate::POOL;
9use crate::prelude::*;
10use crate::series::implementations::null::NullChunked;
11
12// See: https://github.com/tkaitchuck/aHash/blob/f9acd508bd89e7c5b2877a9510098100f9018d64/src/operations.rs#L4
13const MULTIPLE: u64 = 6364136223846793005;
14
15// Read more:
16//  https://www.cockroachlabs.com/blog/vectorized-hash-joiner/
17//  http://myeyesareblind.com/2017/02/06/Combine-hash-values/
18
19pub trait VecHash {
20    /// Compute the hash for all values in the array.
21    fn vec_hash(&self, _random_state: PlRandomState, _buf: &mut Vec<u64>) -> PolarsResult<()> {
22        polars_bail!(un_impl = vec_hash);
23    }
24
25    fn vec_hash_combine(
26        &self,
27        _random_state: PlRandomState,
28        _hashes: &mut [u64],
29    ) -> PolarsResult<()> {
30        polars_bail!(un_impl = vec_hash_combine);
31    }
32}
33
34pub(crate) fn get_null_hash_value(random_state: &PlRandomState) -> u64 {
35    // we just start with a large prime number and hash that twice
36    // to get a constant hash value for null/None
37    let first = random_state.hash_one(3188347919usize);
38    random_state.hash_one(first)
39}
40
41fn insert_null_hash(chunks: &[ArrayRef], random_state: PlRandomState, buf: &mut Vec<u64>) {
42    let null_h = get_null_hash_value(&random_state);
43    let hashes = buf.as_mut_slice();
44
45    let mut offset = 0;
46    chunks.iter().for_each(|arr| {
47        if arr.null_count() > 0 {
48            let validity = arr.validity().unwrap();
49            let (slice, byte_offset, _) = validity.as_slice();
50            (0..validity.len())
51                .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
52                .zip(&mut hashes[offset..])
53                .for_each(|(valid, h)| {
54                    *h = [null_h, *h][valid as usize];
55                })
56        }
57        offset += arr.len();
58    });
59}
60
61fn numeric_vec_hash<T>(ca: &ChunkedArray<T>, random_state: PlRandomState, buf: &mut Vec<u64>)
62where
63    T: PolarsNumericType,
64    T::Native: TotalHash + ToTotalOrd,
65    <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
66{
67    // Note that we don't use the no null branch! This can break in unexpected ways.
68    // for instance with threading we split an array in n_threads, this may lead to
69    // splits that have no nulls and splits that have nulls. Then one array is hashed with
70    // Option<T> and the other array with T.
71    // Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
72    // the only deterministic seed.
73    buf.clear();
74    buf.reserve(ca.len());
75
76    #[allow(unused_unsafe)]
77    #[allow(clippy::useless_transmute)]
78    ca.downcast_iter().for_each(|arr| {
79        buf.extend(
80            arr.values()
81                .as_slice()
82                .iter()
83                .copied()
84                .map(|v| random_state.hash_one(v.to_total_ord())),
85        );
86    });
87    insert_null_hash(&ca.chunks, random_state, buf)
88}
89
90fn numeric_vec_hash_combine<T>(
91    ca: &ChunkedArray<T>,
92    random_state: PlRandomState,
93    hashes: &mut [u64],
94) where
95    T: PolarsNumericType,
96    T::Native: TotalHash + ToTotalOrd,
97    <T::Native as ToTotalOrd>::TotalOrdItem: Hash,
98{
99    let null_h = get_null_hash_value(&random_state);
100
101    let mut offset = 0;
102    ca.downcast_iter().for_each(|arr| {
103        match arr.null_count() {
104            0 => arr
105                .values()
106                .as_slice()
107                .iter()
108                .zip(&mut hashes[offset..])
109                .for_each(|(v, h)| {
110                    // Inlined from ahash. This ensures we combine with the previous state.
111                    *h = folded_multiply(
112                        // Be careful not to xor the hash directly with the existing hash,
113                        // it would lead to 0-hashes for 2 columns containing equal values.
114                        random_state.hash_one(v.to_total_ord()) ^ folded_multiply(*h, MULTIPLE),
115                        MULTIPLE,
116                    );
117                }),
118            _ => {
119                let validity = arr.validity().unwrap();
120                let (slice, byte_offset, _) = validity.as_slice();
121                (0..validity.len())
122                    .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
123                    .zip(&mut hashes[offset..])
124                    .zip(arr.values().as_slice())
125                    .for_each(|((valid, h), l)| {
126                        let lh = random_state.hash_one(l.to_total_ord());
127                        let to_hash = [null_h, lh][valid as usize];
128                        *h = folded_multiply(to_hash ^ folded_multiply(*h, MULTIPLE), MULTIPLE);
129                    });
130            },
131        }
132        offset += arr.len();
133    });
134}
135
136macro_rules! vec_hash_numeric {
137    ($ca:ident) => {
138        impl VecHash for $ca {
139            fn vec_hash(
140                &self,
141                random_state: PlRandomState,
142                buf: &mut Vec<u64>,
143            ) -> PolarsResult<()> {
144                numeric_vec_hash(self, random_state, buf);
145                Ok(())
146            }
147
148            fn vec_hash_combine(
149                &self,
150                random_state: PlRandomState,
151                hashes: &mut [u64],
152            ) -> PolarsResult<()> {
153                numeric_vec_hash_combine(self, random_state, hashes);
154                Ok(())
155            }
156        }
157    };
158}
159
160vec_hash_numeric!(Int64Chunked);
161vec_hash_numeric!(Int32Chunked);
162vec_hash_numeric!(Int16Chunked);
163vec_hash_numeric!(Int8Chunked);
164vec_hash_numeric!(UInt64Chunked);
165vec_hash_numeric!(UInt32Chunked);
166vec_hash_numeric!(UInt16Chunked);
167vec_hash_numeric!(UInt8Chunked);
168vec_hash_numeric!(Float64Chunked);
169vec_hash_numeric!(Float32Chunked);
170#[cfg(any(feature = "dtype-decimal", feature = "dtype-i128"))]
171vec_hash_numeric!(Int128Chunked);
172
173impl VecHash for StringChunked {
174    fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
175        self.as_binary().vec_hash(random_state, buf)?;
176        Ok(())
177    }
178
179    fn vec_hash_combine(
180        &self,
181        random_state: PlRandomState,
182        hashes: &mut [u64],
183    ) -> PolarsResult<()> {
184        self.as_binary().vec_hash_combine(random_state, hashes)?;
185        Ok(())
186    }
187}
188
189// used in polars-pipe
190pub fn _hash_binary_array(arr: &BinaryArray<i64>, random_state: PlRandomState, buf: &mut Vec<u64>) {
191    let null_h = get_null_hash_value(&random_state);
192    if arr.null_count() == 0 {
193        // use the null_hash as seed to get a hash determined by `random_state` that is passed
194        buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
195    } else {
196        buf.extend(arr.into_iter().map(|opt_v| match opt_v {
197            Some(v) => xxh3_64_with_seed(v, null_h),
198            None => null_h,
199        }))
200    }
201}
202
203fn hash_binview_array(arr: &BinaryViewArray, random_state: PlRandomState, buf: &mut Vec<u64>) {
204    let null_h = get_null_hash_value(&random_state);
205    if arr.null_count() == 0 {
206        // use the null_hash as seed to get a hash determined by `random_state` that is passed
207        buf.extend(arr.values_iter().map(|v| xxh3_64_with_seed(v, null_h)))
208    } else {
209        buf.extend(arr.into_iter().map(|opt_v| match opt_v {
210            Some(v) => xxh3_64_with_seed(v, null_h),
211            None => null_h,
212        }))
213    }
214}
215
216impl VecHash for BinaryChunked {
217    fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
218        buf.clear();
219        buf.reserve(self.len());
220        self.downcast_iter()
221            .for_each(|arr| hash_binview_array(arr, random_state.clone(), buf));
222        Ok(())
223    }
224
225    fn vec_hash_combine(
226        &self,
227        random_state: PlRandomState,
228        hashes: &mut [u64],
229    ) -> PolarsResult<()> {
230        let null_h = get_null_hash_value(&random_state);
231
232        let mut offset = 0;
233        self.downcast_iter().for_each(|arr| {
234            match arr.null_count() {
235                0 => arr
236                    .values_iter()
237                    .zip(&mut hashes[offset..])
238                    .for_each(|(v, h)| {
239                        let l = xxh3_64_with_seed(v, null_h);
240                        *h = _boost_hash_combine(l, *h)
241                    }),
242                _ => {
243                    let validity = arr.validity().unwrap();
244                    let (slice, byte_offset, _) = validity.as_slice();
245                    (0..validity.len())
246                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
247                        .zip(&mut hashes[offset..])
248                        .zip(arr.values_iter())
249                        .for_each(|((valid, h), l)| {
250                            let l = if valid {
251                                xxh3_64_with_seed(l, null_h)
252                            } else {
253                                null_h
254                            };
255                            *h = _boost_hash_combine(l, *h)
256                        });
257                },
258            }
259            offset += arr.len();
260        });
261        Ok(())
262    }
263}
264
265impl VecHash for BinaryOffsetChunked {
266    fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
267        buf.clear();
268        buf.reserve(self.len());
269        self.downcast_iter()
270            .for_each(|arr| _hash_binary_array(arr, random_state.clone(), buf));
271        Ok(())
272    }
273
274    fn vec_hash_combine(
275        &self,
276        random_state: PlRandomState,
277        hashes: &mut [u64],
278    ) -> PolarsResult<()> {
279        let null_h = get_null_hash_value(&random_state);
280
281        let mut offset = 0;
282        self.downcast_iter().for_each(|arr| {
283            match arr.null_count() {
284                0 => arr
285                    .values_iter()
286                    .zip(&mut hashes[offset..])
287                    .for_each(|(v, h)| {
288                        let l = xxh3_64_with_seed(v, null_h);
289                        *h = _boost_hash_combine(l, *h)
290                    }),
291                _ => {
292                    let validity = arr.validity().unwrap();
293                    let (slice, byte_offset, _) = validity.as_slice();
294                    (0..validity.len())
295                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
296                        .zip(&mut hashes[offset..])
297                        .zip(arr.values_iter())
298                        .for_each(|((valid, h), l)| {
299                            let l = if valid {
300                                xxh3_64_with_seed(l, null_h)
301                            } else {
302                                null_h
303                            };
304                            *h = _boost_hash_combine(l, *h)
305                        });
306                },
307            }
308            offset += arr.len();
309        });
310        Ok(())
311    }
312}
313
314impl VecHash for NullChunked {
315    fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
316        let null_h = get_null_hash_value(&random_state);
317        buf.clear();
318        buf.resize(self.len(), null_h);
319        Ok(())
320    }
321
322    fn vec_hash_combine(
323        &self,
324        random_state: PlRandomState,
325        hashes: &mut [u64],
326    ) -> PolarsResult<()> {
327        let null_h = get_null_hash_value(&random_state);
328        hashes
329            .iter_mut()
330            .for_each(|h| *h = _boost_hash_combine(null_h, *h));
331        Ok(())
332    }
333}
334impl VecHash for BooleanChunked {
335    fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
336        buf.clear();
337        buf.reserve(self.len());
338        let true_h = random_state.hash_one(true);
339        let false_h = random_state.hash_one(false);
340        let null_h = get_null_hash_value(&random_state);
341        self.downcast_iter().for_each(|arr| {
342            if arr.null_count() == 0 {
343                buf.extend(arr.values_iter().map(|v| if v { true_h } else { false_h }))
344            } else {
345                buf.extend(arr.into_iter().map(|opt_v| match opt_v {
346                    Some(true) => true_h,
347                    Some(false) => false_h,
348                    None => null_h,
349                }))
350            }
351        });
352        Ok(())
353    }
354
355    fn vec_hash_combine(
356        &self,
357        random_state: PlRandomState,
358        hashes: &mut [u64],
359    ) -> PolarsResult<()> {
360        let true_h = random_state.hash_one(true);
361        let false_h = random_state.hash_one(false);
362        let null_h = get_null_hash_value(&random_state);
363
364        let mut offset = 0;
365        self.downcast_iter().for_each(|arr| {
366            match arr.null_count() {
367                0 => arr
368                    .values_iter()
369                    .zip(&mut hashes[offset..])
370                    .for_each(|(v, h)| {
371                        let l = if v { true_h } else { false_h };
372                        *h = _boost_hash_combine(l, *h)
373                    }),
374                _ => {
375                    let validity = arr.validity().unwrap();
376                    let (slice, byte_offset, _) = validity.as_slice();
377                    (0..validity.len())
378                        .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
379                        .zip(&mut hashes[offset..])
380                        .zip(arr.values())
381                        .for_each(|((valid, h), l)| {
382                            let l = if valid {
383                                if l { true_h } else { false_h }
384                            } else {
385                                null_h
386                            };
387                            *h = _boost_hash_combine(l, *h)
388                        });
389                },
390            }
391            offset += arr.len();
392        });
393        Ok(())
394    }
395}
396
397#[cfg(feature = "object")]
398impl<T> VecHash for ObjectChunked<T>
399where
400    T: PolarsObject,
401{
402    fn vec_hash(&self, random_state: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
403        // Note that we don't use the no null branch! This can break in unexpected ways.
404        // for instance with threading we split an array in n_threads, this may lead to
405        // splits that have no nulls and splits that have nulls. Then one array is hashed with
406        // Option<T> and the other array with T.
407        // Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
408        // the only deterministic seed.
409        buf.clear();
410        buf.reserve(self.len());
411
412        self.downcast_iter()
413            .for_each(|arr| buf.extend(arr.into_iter().map(|opt_v| random_state.hash_one(opt_v))));
414
415        Ok(())
416    }
417
418    fn vec_hash_combine(
419        &self,
420        random_state: PlRandomState,
421        hashes: &mut [u64],
422    ) -> PolarsResult<()> {
423        self.apply_to_slice(
424            |opt_v, h| {
425                let hashed = random_state.hash_one(opt_v);
426                _boost_hash_combine(hashed, *h)
427            },
428            hashes,
429        );
430        Ok(())
431    }
432}
433
434pub fn _df_rows_to_hashes_threaded_vertical(
435    keys: &[DataFrame],
436    hasher_builder: Option<PlRandomState>,
437) -> PolarsResult<(Vec<UInt64Chunked>, PlRandomState)> {
438    let hasher_builder = hasher_builder.unwrap_or_default();
439
440    let hashes = POOL.install(|| {
441        keys.into_par_iter()
442            .map(|df| {
443                let hb = hasher_builder.clone();
444                let mut hashes = vec![];
445                columns_to_hashes(df.get_columns(), Some(hb), &mut hashes)?;
446                Ok(UInt64Chunked::from_vec(PlSmallStr::EMPTY, hashes))
447            })
448            .collect::<PolarsResult<Vec<_>>>()
449    })?;
450    Ok((hashes, hasher_builder))
451}
452
453pub fn columns_to_hashes(
454    keys: &[Column],
455    build_hasher: Option<PlRandomState>,
456    hashes: &mut Vec<u64>,
457) -> PolarsResult<PlRandomState> {
458    let build_hasher = build_hasher.unwrap_or_default();
459
460    let mut iter = keys.iter();
461    let first = iter.next().expect("at least one key");
462    first.vec_hash(build_hasher.clone(), hashes)?;
463
464    for keys in iter {
465        keys.vec_hash_combine(build_hasher.clone(), hashes)?;
466    }
467
468    Ok(build_hasher)
469}