polars_core/frame/group_by/
into_groups.rs1use arrow::legacy::kernels::sort_partition::{
2 create_clean_partitions, partition_to_groups, partition_to_groups_amortized_varsize,
3};
4use polars_error::signals::try_raise_keyboard_interrupt;
5use polars_utils::total_ord::{ToTotalOrd, TotalHash};
6
7use super::*;
8use crate::chunked_array::cast::CastOptions;
9use crate::chunked_array::ops::row_encode::_get_rows_encoded_ca_unordered;
10use crate::config::verbose;
11use crate::series::BitRepr;
12use crate::utils::Container;
13use crate::utils::flatten::flatten_par;
14
15pub trait IntoGroupsType {
17 fn group_tuples(&self, _multithreaded: bool, _sorted: bool) -> PolarsResult<GroupsType> {
21 unimplemented!()
22 }
23}
24
25fn group_multithreaded<T: PolarsDataType>(ca: &ChunkedArray<T>) -> bool {
26 ca.len() > 1000 && POOL.current_num_threads() > 1
28}
29
30fn num_groups_proxy<T>(ca: &ChunkedArray<T>, multithreaded: bool, sorted: bool) -> GroupsType
31where
32 T: PolarsNumericType,
33 T::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
34 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash,
35{
36 if multithreaded && group_multithreaded(ca) {
37 let n_partitions = _set_partition_size();
38
39 if ca.null_count() == 0 {
41 let keys = ca
42 .downcast_iter()
43 .map(|arr| arr.values().as_slice())
44 .collect::<Vec<_>>();
45 group_by_threaded_slice(keys, n_partitions, sorted)
46 } else {
47 let keys = ca
48 .downcast_iter()
49 .map(|arr| arr.iter().map(|o| o.copied()))
50 .collect::<Vec<_>>();
51 group_by_threaded_iter(&keys, n_partitions, sorted)
52 }
53 } else if !ca.has_nulls() {
54 group_by(ca.into_no_null_iter(), sorted)
55 } else {
56 group_by(ca.iter(), sorted)
57 }
58}
59
60impl<T> ChunkedArray<T>
61where
62 T: PolarsNumericType,
63 T::Native: NumCast,
64{
65 fn create_groups_from_sorted(&self, multithreaded: bool) -> GroupsSlice {
66 if verbose() {
67 eprintln!("group_by keys are sorted; running sorted key fast path");
68 }
69 let arr = self.downcast_iter().next().unwrap();
70 if arr.is_empty() {
71 return GroupsSlice::default();
72 }
73 let mut values = arr.values().as_slice();
74 let null_count = arr.null_count();
75 let length = values.len();
76
77 if null_count == length {
79 return vec![[0, length as IdxSize]];
80 }
81
82 let mut nulls_first = false;
83 if null_count > 0 {
84 nulls_first = arr.get(0).is_none()
85 }
86
87 if nulls_first {
88 values = &values[null_count..];
89 } else {
90 values = &values[..length - null_count];
91 };
92
93 let n_threads = POOL.current_num_threads();
94 if multithreaded && n_threads > 1 {
95 let parts =
96 create_clean_partitions(values, n_threads, self.is_sorted_descending_flag());
97 let n_parts = parts.len();
98
99 let first_ptr = &values[0] as *const T::Native as usize;
100 let groups = parts.par_iter().enumerate().map(|(i, part)| {
101 let first_ptr = first_ptr as *const T::Native;
103
104 let part_first_ptr = &part[0] as *const T::Native;
105 let mut offset = unsafe { part_first_ptr.offset_from(first_ptr) } as IdxSize;
106
107 if nulls_first && i == 0 {
109 partition_to_groups(part, null_count as IdxSize, true, offset)
110 }
111 else if !nulls_first && i == n_parts - 1 {
113 partition_to_groups(part, null_count as IdxSize, false, offset)
114 }
115 else {
117 if nulls_first {
118 offset += null_count as IdxSize;
119 };
120
121 partition_to_groups(part, 0, false, offset)
122 }
123 });
124 let groups = POOL.install(|| groups.collect::<Vec<_>>());
125 flatten_par(&groups)
126 } else {
127 partition_to_groups(values, null_count as IdxSize, nulls_first, 0)
128 }
129 }
130}
131
132#[cfg(all(feature = "dtype-categorical", feature = "performant"))]
133impl<T: PolarsCategoricalType> IntoGroupsType for CategoricalChunked<T>
134where
135 ChunkedArray<T::PolarsPhysical>: IntoGroupsType,
136{
137 fn group_tuples(&self, multithreaded: bool, sorted: bool) -> PolarsResult<GroupsType> {
138 self.phys.group_tuples(multithreaded, sorted)
139 }
140}
141
142impl<T> IntoGroupsType for ChunkedArray<T>
143where
144 T: PolarsNumericType,
145 T::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,
146 <T::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash,
147{
148 fn group_tuples(&self, multithreaded: bool, sorted: bool) -> PolarsResult<GroupsType> {
149 if self.is_sorted_ascending_flag() || self.is_sorted_descending_flag() {
151 let groups = self.rechunk().create_groups_from_sorted(multithreaded);
153 return Ok(GroupsType::new_slice(groups, false, true));
154 }
155
156 let out = match self.dtype() {
157 #[cfg(feature = "dtype-f16")]
158 DataType::Float16 => {
159 let ca: &Float16Chunked = unsafe {
161 &*(self as *const ChunkedArray<T> as *const ChunkedArray<Float16Type>)
162 };
163 num_groups_proxy(ca, multithreaded, sorted)
164 },
165 DataType::Float32 => {
166 let ca: &Float32Chunked = unsafe {
168 &*(self as *const ChunkedArray<T> as *const ChunkedArray<Float32Type>)
169 };
170 num_groups_proxy(ca, multithreaded, sorted)
171 },
172 DataType::Float64 => {
173 let ca: &Float64Chunked = unsafe {
175 &*(self as *const ChunkedArray<T> as *const ChunkedArray<Float64Type>)
176 };
177 num_groups_proxy(ca, multithreaded, sorted)
178 },
179 _ => match self.to_bit_repr() {
180 BitRepr::U8(ca) => num_groups_proxy(&ca, multithreaded, sorted),
181 BitRepr::U16(ca) => num_groups_proxy(&ca, multithreaded, sorted),
182 BitRepr::U32(ca) => num_groups_proxy(&ca, multithreaded, sorted),
183 BitRepr::U64(ca) => num_groups_proxy(&ca, multithreaded, sorted),
184 #[cfg(feature = "dtype-u128")]
185 BitRepr::U128(ca) => num_groups_proxy(&ca, multithreaded, sorted),
186 },
187 };
188 try_raise_keyboard_interrupt();
189 Ok(out)
190 }
191}
192impl IntoGroupsType for BooleanChunked {
193 fn group_tuples(&self, mut multithreaded: bool, sorted: bool) -> PolarsResult<GroupsType> {
194 multithreaded &= POOL.current_num_threads() > 1;
195
196 #[cfg(feature = "performant")]
197 {
198 let ca = self
199 .cast_with_options(&DataType::UInt8, CastOptions::Overflowing)
200 .unwrap();
201 let ca = ca.u8().unwrap();
202 ca.group_tuples(multithreaded, sorted)
203 }
204 #[cfg(not(feature = "performant"))]
205 {
206 let ca = self
207 .cast_with_options(&DataType::UInt32, CastOptions::Overflowing)
208 .unwrap();
209 let ca = ca.u32().unwrap();
210 ca.group_tuples(multithreaded, sorted)
211 }
212 }
213}
214
215impl IntoGroupsType for StringChunked {
216 #[allow(clippy::needless_lifetimes)]
217 fn group_tuples<'a>(&'a self, multithreaded: bool, sorted: bool) -> PolarsResult<GroupsType> {
218 self.as_binary().group_tuples(multithreaded, sorted)
219 }
220}
221
222impl IntoGroupsType for BinaryChunked {
223 #[allow(clippy::needless_lifetimes)]
224 fn group_tuples<'a>(
225 &'a self,
226 mut multithreaded: bool,
227 sorted: bool,
228 ) -> PolarsResult<GroupsType> {
229 if self.is_sorted_any() && !self.has_nulls() && self.n_chunks() == 1 {
230 let arr = self.downcast_get(0).unwrap();
231 let values = arr.values_iter();
232 let mut out = Vec::with_capacity(values.len() / 30);
233 partition_to_groups_amortized_varsize(values, arr.len() as _, 0, false, 0, &mut out);
234 return Ok(GroupsType::new_slice(out, false, true));
235 }
236
237 multithreaded &= POOL.current_num_threads() > 1;
238 let bh = self.to_bytes_hashes(multithreaded, Default::default());
239
240 let out = if multithreaded {
241 let n_partitions = bh.len();
242 let bh = bh.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
244 group_by_threaded_slice(bh, n_partitions, sorted)
245 } else {
246 group_by(bh[0].iter(), sorted)
247 };
248 try_raise_keyboard_interrupt();
249 Ok(out)
250 }
251}
252
253impl IntoGroupsType for BinaryOffsetChunked {
254 #[allow(clippy::needless_lifetimes)]
255 fn group_tuples<'a>(
256 &'a self,
257 mut multithreaded: bool,
258 sorted: bool,
259 ) -> PolarsResult<GroupsType> {
260 if self.is_sorted_any() && !self.has_nulls() && self.n_chunks() == 1 {
261 let arr = self.downcast_get(0).unwrap();
262 let values = arr.values_iter();
263 let mut out = Vec::with_capacity(values.len() / 30);
264 partition_to_groups_amortized_varsize(values, arr.len() as _, 0, false, 0, &mut out);
265 return Ok(GroupsType::new_slice(out, false, true));
266 } else if self.is_sorted_any() {
267 let mut groups = Vec::new();
268
269 let Some(y) = self.chunks().iter().position(|k| !k.as_ref().is_empty()) else {
270 return Ok(GroupsType::new_slice(groups, false, true));
271 };
272
273 let mut start_idx = 0;
274 let mut i = 1;
275 let mut x = 1;
276 let mut start_value = self.downcast_chunks().get(y).unwrap().get(0);
277
278 for keys in self.downcast_iter().skip(y) {
279 if keys.has_nulls() {
280 for k in keys.iter().skip(x) {
281 if k != start_value {
282 groups.push([start_idx, i - start_idx]);
283 start_idx = i;
284 start_value = k;
285 }
286 i += 1;
287 }
288 } else {
289 for k in keys.values_iter().skip(x) {
290 if Some(k) != start_value {
291 groups.push([start_idx, i - start_idx]);
292 start_idx = i;
293 start_value = Some(k);
294 }
295 i += 1;
296 }
297 }
298 x = 0;
299 }
300
301 groups.push([start_idx, i - start_idx]);
302 return Ok(GroupsType::new_slice(groups, false, true));
303 }
304
305 multithreaded &= POOL.current_num_threads() > 1;
306 let bh = self.to_bytes_hashes(multithreaded, Default::default());
307
308 let out = if multithreaded {
309 let n_partitions = bh.len();
310 let bh = bh.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
312 group_by_threaded_slice(bh, n_partitions, sorted)
313 } else {
314 group_by(bh[0].iter(), sorted)
315 };
316 Ok(out)
317 }
318}
319
320impl IntoGroupsType for ListChunked {
321 #[allow(clippy::needless_lifetimes)]
322 #[allow(unused_variables)]
323 fn group_tuples<'a>(
324 &'a self,
325 mut multithreaded: bool,
326 sorted: bool,
327 ) -> PolarsResult<GroupsType> {
328 multithreaded &= POOL.current_num_threads() > 1;
329 let by = &[self.clone().into_column()];
330 let ca = if multithreaded {
331 encode_rows_vertical_par_unordered(by).unwrap()
332 } else {
333 _get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, by).unwrap()
334 };
335
336 ca.group_tuples(multithreaded, sorted)
337 }
338}
339
340#[cfg(feature = "dtype-array")]
341impl IntoGroupsType for ArrayChunked {
342 #[allow(clippy::needless_lifetimes)]
343 #[allow(unused_variables)]
344 fn group_tuples<'a>(
345 &'a self,
346 mut multithreaded: bool,
347 sorted: bool,
348 ) -> PolarsResult<GroupsType> {
349 multithreaded &= POOL.current_num_threads() > 1;
350 let by = &[self.clone().into_column()];
351 let ca = if multithreaded {
352 encode_rows_vertical_par_unordered(by).unwrap()
353 } else {
354 _get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, by).unwrap()
355 };
356 ca.group_tuples(multithreaded, sorted)
357 }
358}
359
360#[cfg(feature = "object")]
361impl<T> IntoGroupsType for ObjectChunked<T>
362where
363 T: PolarsObject,
364{
365 fn group_tuples(&self, _multithreaded: bool, sorted: bool) -> PolarsResult<GroupsType> {
366 Ok(group_by(self.into_iter(), sorted))
367 }
368}