polars_core/chunked_array/
binary.rs1use std::hash::BuildHasher;
2
3use polars_utils::aliases::PlRandomState;
4use polars_utils::hashing::BytesHash;
5use rayon::prelude::*;
6
7use crate::POOL;
8use crate::prelude::*;
9use crate::utils::{_set_partition_size, _split_offsets};
10
11#[inline]
12fn fill_bytes_hashes<'a, T>(
13 ca: &'a ChunkedArray<T>,
14 null_h: u64,
15 hb: PlRandomState,
16) -> Vec<BytesHash<'a>>
17where
18 T: PolarsDataType,
19 <<T as PolarsDataType>::Array as StaticArray>::ValueT<'a>: AsRef<[u8]>,
20{
21 let mut byte_hashes = Vec::with_capacity(ca.len());
22 for arr in ca.downcast_iter() {
23 for opt_b in arr.iter() {
24 let opt_b = opt_b.as_ref().map(|v| v.as_ref());
25 let opt_b = unsafe { std::mem::transmute::<Option<&[u8]>, Option<&'a [u8]>>(opt_b) };
28 let hash = match opt_b {
29 Some(s) => hb.hash_one(s),
30 None => null_h,
31 };
32 byte_hashes.push(BytesHash::new(opt_b, hash))
33 }
34 }
35 byte_hashes
36}
37
38impl<T> ChunkedArray<T>
39where
40 T: PolarsDataType,
41 for<'a> <T::Array as StaticArray>::ValueT<'a>: AsRef<[u8]>,
42{
43 #[allow(clippy::needless_lifetimes)]
44 pub fn to_bytes_hashes<'a>(
45 &'a self,
46 mut multithreaded: bool,
47 hb: PlRandomState,
48 ) -> Vec<Vec<BytesHash<'a>>> {
49 multithreaded &= POOL.current_num_threads() > 1;
50 let null_h = hb.hash_one(0xde259df92c607d49_u64);
51
52 if multithreaded {
53 let n_partitions = _set_partition_size();
54
55 let split = _split_offsets(self.len(), n_partitions);
56
57 POOL.install(|| {
58 split
59 .into_par_iter()
60 .map(|(offset, len)| {
61 let ca = self.slice(offset as i64, len);
62 let byte_hashes = fill_bytes_hashes(&ca, null_h, hb);
63
64 unsafe {
67 std::mem::transmute::<Vec<BytesHash<'_>>, Vec<BytesHash<'a>>>(
68 byte_hashes,
69 )
70 }
71 })
72 .collect::<Vec<_>>()
73 })
74 } else {
75 vec![fill_bytes_hashes(self, null_h, hb)]
76 }
77 }
78}