1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3
4use arrow::offset::OffsetsBuffer;
5use polars_utils::idx_vec::IdxVec;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::prelude::*;
8
9use crate::POOL;
10use crate::prelude::*;
11use crate::utils::{NoNull, flatten, slice_slice};
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
16pub struct GroupsIdx {
17 pub(crate) sorted: bool,
18 first: Vec<IdxSize>,
19 all: Vec<IdxVec>,
20}
21
22pub type IdxItem = (IdxSize, IdxVec);
23pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
24
25impl Drop for GroupsIdx {
26 fn drop(&mut self) {
27 let v = std::mem::take(&mut self.all);
28 #[cfg(not(target_family = "wasm"))]
31 if v.len() > 1 << 16 {
32 std::thread::spawn(move || drop(v));
33 } else {
34 drop(v);
35 }
36
37 #[cfg(target_family = "wasm")]
38 drop(v);
39 }
40}
41
42impl From<Vec<IdxItem>> for GroupsIdx {
43 fn from(v: Vec<IdxItem>) -> Self {
44 v.into_iter().collect()
45 }
46}
47
48impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
49 fn from(v: Vec<Vec<IdxItem>>) -> Self {
50 let (cap, offsets) = flatten::cap_and_offsets(&v);
53 let mut first = Vec::with_capacity(cap);
54 let first_ptr = first.as_ptr() as usize;
55 let mut all = Vec::with_capacity(cap);
56 let all_ptr = all.as_ptr() as usize;
57
58 POOL.install(|| {
59 v.into_par_iter()
60 .zip(offsets)
61 .for_each(|(mut inner, offset)| {
62 unsafe {
63 let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
64 let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
65
66 let inner_ptr = inner.as_mut_ptr();
67 for i in 0..inner.len() {
68 let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
69 std::ptr::write(first.add(i), first_val);
70 std::ptr::write(all.add(i), vals);
71 }
72 inner.set_len(0);
75 }
76 });
77 });
78 unsafe {
79 all.set_len(cap);
80 first.set_len(cap);
81 }
82 GroupsIdx {
83 sorted: false,
84 first,
85 all,
86 }
87 }
88}
89
90impl GroupsIdx {
91 pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted: bool) -> Self {
92 Self { sorted, first, all }
93 }
94
95 pub fn sort(&mut self) {
96 if self.sorted {
97 return;
98 }
99 let mut idx = 0;
100 let first = std::mem::take(&mut self.first);
101 let mut idx_vals = first
103 .into_iter()
104 .map(|v| {
105 let out = [idx, v];
106 idx += 1;
107 out
108 })
109 .collect_trusted::<Vec<_>>();
110 idx_vals.sort_unstable_by_key(|v| v[1]);
111
112 let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
113 let take_all = || {
114 idx_vals
115 .iter()
116 .map(|v| unsafe {
117 let idx = v[0] as usize;
118 std::mem::take(self.all.get_unchecked_mut(idx))
119 })
120 .collect_trusted::<Vec<_>>()
121 };
122 let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
123 self.first = first;
124 self.all = all;
125 self.sorted = true
126 }
127 pub fn is_sorted_flag(&self) -> bool {
128 self.sorted
129 }
130
131 pub fn iter(
132 &self,
133 ) -> std::iter::Zip<
134 std::iter::Copied<std::slice::Iter<'_, IdxSize>>,
135 std::slice::Iter<'_, IdxVec>,
136 > {
137 self.into_iter()
138 }
139
140 pub fn all(&self) -> &[IdxVec] {
141 &self.all
142 }
143
144 pub fn first(&self) -> &[IdxSize] {
145 &self.first
146 }
147
148 pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
149 &mut self.first
150 }
151
152 pub(crate) fn len(&self) -> usize {
153 self.first.len()
154 }
155
156 pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem<'_> {
157 let first = *self.first.get_unchecked(index);
158 let all = self.all.get_unchecked(index);
159 (first, all)
160 }
161
162 pub fn new_empty() -> Self {
164 Self {
165 sorted: false,
166 first: vec![0],
167 all: vec![vec![].into()],
168 }
169 }
170}
171
172impl FromIterator<IdxItem> for GroupsIdx {
173 fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
174 let (first, all) = iter.into_iter().unzip();
175 GroupsIdx {
176 sorted: false,
177 first,
178 all,
179 }
180 }
181}
182
183impl<'a> IntoIterator for &'a GroupsIdx {
184 type Item = BorrowIdxItem<'a>;
185 type IntoIter = std::iter::Zip<
186 std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
187 std::slice::Iter<'a, IdxVec>,
188 >;
189
190 fn into_iter(self) -> Self::IntoIter {
191 self.first.iter().copied().zip(self.all.iter())
192 }
193}
194
195impl IntoIterator for GroupsIdx {
196 type Item = IdxItem;
197 type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
198
199 fn into_iter(mut self) -> Self::IntoIter {
200 let first = std::mem::take(&mut self.first);
201 let all = std::mem::take(&mut self.all);
202 first.into_iter().zip(all)
203 }
204}
205
206impl FromParallelIterator<IdxItem> for GroupsIdx {
207 fn from_par_iter<I>(par_iter: I) -> Self
208 where
209 I: IntoParallelIterator<Item = IdxItem>,
210 {
211 let (first, all) = par_iter.into_par_iter().unzip();
212 GroupsIdx {
213 sorted: false,
214 first,
215 all,
216 }
217 }
218}
219
220impl<'a> IntoParallelIterator for &'a GroupsIdx {
221 type Iter = rayon::iter::Zip<
222 rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
223 rayon::slice::Iter<'a, IdxVec>,
224 >;
225 type Item = BorrowIdxItem<'a>;
226
227 fn into_par_iter(self) -> Self::Iter {
228 self.first.par_iter().copied().zip(self.all.par_iter())
229 }
230}
231
232impl IntoParallelIterator for GroupsIdx {
233 type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
234 type Item = IdxItem;
235
236 fn into_par_iter(mut self) -> Self::Iter {
237 let first = std::mem::take(&mut self.first);
238 let all = std::mem::take(&mut self.all);
239 first.into_par_iter().zip(all.into_par_iter())
240 }
241}
242
243pub type GroupsSlice = Vec<[IdxSize; 2]>;
251
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub enum GroupsType {
254 Idx(GroupsIdx),
255 Slice {
257 groups: GroupsSlice,
259 overlapping: bool,
264 },
265}
266
267impl Default for GroupsType {
268 fn default() -> Self {
269 GroupsType::Idx(GroupsIdx::default())
270 }
271}
272
273impl GroupsType {
274 pub fn into_idx(self) -> GroupsIdx {
275 match self {
276 GroupsType::Idx(groups) => groups,
277 GroupsType::Slice { groups, .. } => {
278 polars_warn!(
279 "Had to reallocate groups, missed an optimization opportunity. Please open an issue."
280 );
281 groups
282 .iter()
283 .map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
284 .collect()
285 },
286 }
287 }
288
289 pub(crate) fn prepare_list_agg(
290 &self,
291 total_len: usize,
292 ) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
293 let mut can_fast_explode = true;
294 match self {
295 GroupsType::Idx(groups) => {
296 let mut list_offset = Vec::with_capacity(self.len() + 1);
297 let mut gather_offsets = Vec::with_capacity(total_len);
298
299 let mut len_so_far = 0i64;
300 list_offset.push(len_so_far);
301
302 for idx in groups {
303 let idx = idx.1;
304 gather_offsets.extend_from_slice(idx);
305 len_so_far += idx.len() as i64;
306 list_offset.push(len_so_far);
307 can_fast_explode &= !idx.is_empty();
308 }
309 unsafe {
310 (
311 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
312 OffsetsBuffer::new_unchecked(list_offset.into()),
313 can_fast_explode,
314 )
315 }
316 },
317 GroupsType::Slice { groups, .. } => {
318 let mut list_offset = Vec::with_capacity(self.len() + 1);
319 let mut gather_offsets = Vec::with_capacity(total_len);
320 let mut len_so_far = 0i64;
321 list_offset.push(len_so_far);
322
323 for g in groups {
324 let len = g[1];
325 let offset = g[0];
326 gather_offsets.extend(offset..offset + len);
327
328 len_so_far += len as i64;
329 list_offset.push(len_so_far);
330 can_fast_explode &= len > 0;
331 }
332
333 unsafe {
334 (
335 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
336 OffsetsBuffer::new_unchecked(list_offset.into()),
337 can_fast_explode,
338 )
339 }
340 },
341 }
342 }
343
344 pub fn iter(&self) -> GroupsTypeIter<'_> {
345 GroupsTypeIter::new(self)
346 }
347
348 pub fn sort(&mut self) {
349 match self {
350 GroupsType::Idx(groups) => {
351 if !groups.is_sorted_flag() {
352 groups.sort()
353 }
354 },
355 GroupsType::Slice { .. } => {
356 },
358 }
359 }
360
361 pub(crate) fn is_sorted_flag(&self) -> bool {
362 match self {
363 GroupsType::Idx(groups) => groups.is_sorted_flag(),
364 GroupsType::Slice { .. } => true,
365 }
366 }
367
368 pub fn is_overlapping(&self) -> bool {
369 matches!(
370 self,
371 GroupsType::Slice {
372 overlapping: true,
373 ..
374 }
375 )
376 }
377
378 pub fn take_group_firsts(self) -> Vec<IdxSize> {
379 match self {
380 GroupsType::Idx(mut groups) => std::mem::take(&mut groups.first),
381 GroupsType::Slice { groups, .. } => {
382 groups.into_iter().map(|[first, _len]| first).collect()
383 },
384 }
385 }
386
387 pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
391 match self {
392 GroupsType::Idx(groups) => groups
393 .all
394 .iter()
395 .map(|idx| *idx.get_unchecked(idx.len() - 1))
396 .collect(),
397 GroupsType::Slice { groups, .. } => groups
398 .into_iter()
399 .map(|[first, len]| first + len - 1)
400 .collect(),
401 }
402 }
403
404 pub fn par_iter(&self) -> GroupsTypeParIter<'_> {
405 GroupsTypeParIter::new(self)
406 }
407
408 pub fn unwrap_idx(&self) -> &GroupsIdx {
414 match self {
415 GroupsType::Idx(groups) => groups,
416 GroupsType::Slice { .. } => panic!("groups are slices not index"),
417 }
418 }
419
420 pub fn unwrap_slice(&self) -> &GroupsSlice {
426 match self {
427 GroupsType::Slice { groups, .. } => groups,
428 GroupsType::Idx(_) => panic!("groups are index not slices"),
429 }
430 }
431
432 pub fn get(&self, index: usize) -> GroupsIndicator<'_> {
433 match self {
434 GroupsType::Idx(groups) => {
435 let first = groups.first[index];
436 let all = &groups.all[index];
437 GroupsIndicator::Idx((first, all))
438 },
439 GroupsType::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
440 }
441 }
442
443 pub fn idx_mut(&mut self) -> &mut GroupsIdx {
449 match self {
450 GroupsType::Idx(groups) => groups,
451 GroupsType::Slice { .. } => panic!("groups are slices not index"),
452 }
453 }
454
455 pub fn len(&self) -> usize {
456 match self {
457 GroupsType::Idx(groups) => groups.len(),
458 GroupsType::Slice { groups, .. } => groups.len(),
459 }
460 }
461
462 pub fn is_empty(&self) -> bool {
463 self.len() == 0
464 }
465
466 pub fn group_count(&self) -> IdxCa {
467 match self {
468 GroupsType::Idx(groups) => {
469 let ca: NoNull<IdxCa> = groups
470 .iter()
471 .map(|(_first, idx)| idx.len() as IdxSize)
472 .collect_trusted();
473 ca.into_inner()
474 },
475 GroupsType::Slice { groups, .. } => {
476 let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
477 ca.into_inner()
478 },
479 }
480 }
481 pub fn as_list_chunked(&self) -> ListChunked {
482 match self {
483 GroupsType::Idx(groups) => groups
484 .iter()
485 .map(|(_first, idx)| {
486 let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
487 ca.into_inner().into_series()
488 })
489 .collect_trusted(),
490 GroupsType::Slice { groups, .. } => groups
491 .iter()
492 .map(|&[first, len]| {
493 let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
494 ca.into_inner().into_series()
495 })
496 .collect_trusted(),
497 }
498 }
499
500 pub fn into_sliceable(self) -> GroupPositions {
501 let len = self.len();
502 slice_groups(Arc::new(self), 0, len)
503 }
504
505 pub fn check_lengths(self: &GroupsType, other: &GroupsType) -> PolarsResult<()> {
506 if std::ptr::eq(self, other) {
507 return Ok(());
508 }
509 polars_ensure!(self.iter().zip(other.iter()).all(|(a, b)| {
510 a.len() == b.len()
511 }), ComputeError: "expressions must have matching group lengths");
512 Ok(())
513 }
514}
515
516impl From<GroupsIdx> for GroupsType {
517 fn from(groups: GroupsIdx) -> Self {
518 GroupsType::Idx(groups)
519 }
520}
521
522pub enum GroupsIndicator<'a> {
523 Idx(BorrowIdxItem<'a>),
524 Slice([IdxSize; 2]),
525}
526
527impl GroupsIndicator<'_> {
528 pub fn len(&self) -> usize {
529 match self {
530 GroupsIndicator::Idx(g) => g.1.len(),
531 GroupsIndicator::Slice([_, len]) => *len as usize,
532 }
533 }
534 pub fn first(&self) -> IdxSize {
535 match self {
536 GroupsIndicator::Idx(g) => g.0,
537 GroupsIndicator::Slice([first, _]) => *first,
538 }
539 }
540 pub fn is_empty(&self) -> bool {
541 self.len() == 0
542 }
543}
544
545pub struct GroupsTypeIter<'a> {
546 vals: &'a GroupsType,
547 len: usize,
548 idx: usize,
549}
550
551impl<'a> GroupsTypeIter<'a> {
552 fn new(vals: &'a GroupsType) -> Self {
553 let len = vals.len();
554 let idx = 0;
555 GroupsTypeIter { vals, len, idx }
556 }
557}
558
559impl<'a> Iterator for GroupsTypeIter<'a> {
560 type Item = GroupsIndicator<'a>;
561
562 fn nth(&mut self, n: usize) -> Option<Self::Item> {
563 self.idx = self.idx.saturating_add(n);
564 self.next()
565 }
566
567 fn next(&mut self) -> Option<Self::Item> {
568 if self.idx >= self.len {
569 return None;
570 }
571
572 let out = unsafe {
573 match self.vals {
574 GroupsType::Idx(groups) => {
575 let item = groups.get_unchecked(self.idx);
576 Some(GroupsIndicator::Idx(item))
577 },
578 GroupsType::Slice { groups, .. } => {
579 Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
580 },
581 }
582 };
583 self.idx += 1;
584 out
585 }
586}
587
588pub struct GroupsTypeParIter<'a> {
589 vals: &'a GroupsType,
590 len: usize,
591}
592
593impl<'a> GroupsTypeParIter<'a> {
594 fn new(vals: &'a GroupsType) -> Self {
595 let len = vals.len();
596 GroupsTypeParIter { vals, len }
597 }
598}
599
600impl<'a> ParallelIterator for GroupsTypeParIter<'a> {
601 type Item = GroupsIndicator<'a>;
602
603 fn drive_unindexed<C>(self, consumer: C) -> C::Result
604 where
605 C: UnindexedConsumer<Self::Item>,
606 {
607 (0..self.len)
608 .into_par_iter()
609 .map(|i| unsafe {
610 match self.vals {
611 GroupsType::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
612 GroupsType::Slice { groups, .. } => {
613 GroupsIndicator::Slice(*groups.get_unchecked(i))
614 },
615 }
616 })
617 .drive_unindexed(consumer)
618 }
619}
620
621#[derive(Debug)]
622pub struct GroupPositions {
623 sliced: ManuallyDrop<GroupsType>,
627 original: Arc<GroupsType>,
629 offset: i64,
630 len: usize,
631}
632
633impl Clone for GroupPositions {
634 fn clone(&self) -> Self {
635 let sliced = slice_groups_inner(&self.original, self.offset, self.len);
636
637 Self {
638 sliced,
639 original: self.original.clone(),
640 offset: self.offset,
641 len: self.len,
642 }
643 }
644}
645
646impl PartialEq for GroupPositions {
647 fn eq(&self, other: &Self) -> bool {
648 self.offset == other.offset && self.len == other.len && self.sliced == other.sliced
649 }
650}
651
652impl AsRef<GroupsType> for GroupPositions {
653 fn as_ref(&self) -> &GroupsType {
654 self.sliced.deref()
655 }
656}
657
658impl Deref for GroupPositions {
659 type Target = GroupsType;
660
661 fn deref(&self) -> &Self::Target {
662 self.sliced.deref()
663 }
664}
665
666impl Default for GroupPositions {
667 fn default() -> Self {
668 GroupsType::default().into_sliceable()
669 }
670}
671
672impl GroupPositions {
673 pub fn slice(&self, offset: i64, len: usize) -> Self {
674 let offset = self.offset + offset;
675 slice_groups(
676 self.original.clone(),
677 offset,
678 if len > self.len { self.len } else { len },
680 )
681 }
682
683 pub fn sort(&mut self) {
684 if !self.as_ref().is_sorted_flag() {
685 let original = Arc::make_mut(&mut self.original);
686 original.sort();
687
688 self.sliced = slice_groups_inner(original, self.offset, self.len);
689 }
690 }
691
692 pub fn unroll(mut self) -> GroupPositions {
693 match self.sliced.deref_mut() {
694 GroupsType::Idx(_) => self,
695 GroupsType::Slice {
696 overlapping: false, ..
697 } => self,
698 GroupsType::Slice { groups, .. } => {
699 let mut cum_offset = 0 as IdxSize;
702 let groups: Vec<_> = groups
703 .iter()
704 .map(|[_, len]| {
705 let new = [cum_offset, *len];
706 cum_offset += *len;
707 new
708 })
709 .collect();
710
711 GroupsType::Slice {
712 groups,
713 overlapping: false,
714 }
715 .into_sliceable()
716 },
717 }
718 }
719
720 pub fn as_unrolled_slice(&self) -> Option<&GroupsSlice> {
721 match &*self.sliced {
722 GroupsType::Idx(_) => None,
723 GroupsType::Slice {
724 overlapping: true, ..
725 } => None,
726 GroupsType::Slice {
727 groups,
728 overlapping: false,
729 } => Some(groups),
730 }
731 }
732}
733
734fn slice_groups_inner(g: &GroupsType, offset: i64, len: usize) -> ManuallyDrop<GroupsType> {
735 match g {
741 GroupsType::Idx(groups) => {
742 let first = unsafe {
743 let first = slice_slice(groups.first(), offset, len);
744 let ptr = first.as_ptr() as *mut _;
745 Vec::from_raw_parts(ptr, first.len(), first.len())
746 };
747
748 let all = unsafe {
749 let all = slice_slice(groups.all(), offset, len);
750 let ptr = all.as_ptr() as *mut _;
751 Vec::from_raw_parts(ptr, all.len(), all.len())
752 };
753 ManuallyDrop::new(GroupsType::Idx(GroupsIdx::new(
754 first,
755 all,
756 groups.is_sorted_flag(),
757 )))
758 },
759 GroupsType::Slice {
760 groups,
761 overlapping,
762 } => {
763 let groups = unsafe {
764 let groups = slice_slice(groups, offset, len);
765 let ptr = groups.as_ptr() as *mut _;
766 Vec::from_raw_parts(ptr, groups.len(), groups.len())
767 };
768
769 ManuallyDrop::new(GroupsType::Slice {
770 groups,
771 overlapping: *overlapping,
772 })
773 },
774 }
775}
776
777fn slice_groups(g: Arc<GroupsType>, offset: i64, len: usize) -> GroupPositions {
778 let sliced = slice_groups_inner(g.as_ref(), offset, len);
779
780 GroupPositions {
781 sliced,
782 original: g,
783 offset,
784 len,
785 }
786}