1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3
4use arrow::offset::OffsetsBuffer;
5use polars_utils::idx_vec::IdxVec;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::prelude::*;
8
9use crate::prelude::*;
10use crate::runtime::RAYON;
11use crate::utils::{NoNull, flatten, slice_slice};
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
16pub struct GroupsIdx {
17 pub(crate) sorted_by_first_idx: bool,
18 first: Vec<IdxSize>,
20 all: Vec<IdxVec>,
22}
23
24pub type IdxItem = (IdxSize, IdxVec);
25pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
26
27impl Drop for GroupsIdx {
28 fn drop(&mut self) {
29 let v = std::mem::take(&mut self.all);
30 #[cfg(not(target_family = "wasm"))]
33 if v.len() > 1 << 16 {
34 std::thread::spawn(move || drop(v));
35 } else {
36 drop(v);
37 }
38
39 #[cfg(target_family = "wasm")]
40 drop(v);
41 }
42}
43
44impl From<Vec<IdxItem>> for GroupsIdx {
45 fn from(v: Vec<IdxItem>) -> Self {
46 v.into_iter().collect()
47 }
48}
49
50impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
51 fn from(v: Vec<Vec<IdxItem>>) -> Self {
52 let (cap, offsets) = flatten::cap_and_offsets(&v);
55 let mut first = Vec::with_capacity(cap);
56 let first_ptr = first.as_ptr() as usize;
57 let mut all = Vec::with_capacity(cap);
58 let all_ptr = all.as_ptr() as usize;
59
60 RAYON.install(|| {
61 v.into_par_iter()
62 .zip(offsets)
63 .for_each(|(mut inner, offset)| {
64 unsafe {
65 let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
66 let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
67
68 let inner_ptr = inner.as_mut_ptr();
69 for i in 0..inner.len() {
70 let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
71 std::ptr::write(first.add(i), first_val);
72 std::ptr::write(all.add(i), vals);
73 }
74 inner.set_len(0);
77 }
78 });
79 });
80 unsafe {
81 all.set_len(cap);
82 first.set_len(cap);
83 }
84 GroupsIdx {
85 sorted_by_first_idx: false,
86 first,
87 all,
88 }
89 }
90}
91
92impl GroupsIdx {
93 pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted_by_first_idx: bool) -> Self {
94 Self {
95 sorted_by_first_idx,
96 first,
97 all,
98 }
99 }
100
101 pub fn sort_by_first_idx(&mut self) {
102 if self.sorted_by_first_idx {
103 return;
104 }
105 let mut idx = 0;
106 let first = std::mem::take(&mut self.first);
107 let mut idx_vals = first
109 .into_iter()
110 .map(|v| {
111 let out = [idx, v];
112 idx += 1;
113 out
114 })
115 .collect_trusted::<Vec<_>>();
116 idx_vals.sort_unstable_by_key(|v| v[1]);
117
118 let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
119 let take_all = || {
120 idx_vals
121 .iter()
122 .map(|v| unsafe {
123 let idx = v[0] as usize;
124 std::mem::take(self.all.get_unchecked_mut(idx))
125 })
126 .collect_trusted::<Vec<_>>()
127 };
128 let (first, all) = RAYON.install(|| rayon::join(take_first, take_all));
129 self.first = first;
130 self.all = all;
131 self.sorted_by_first_idx = true
132 }
133 pub fn is_sorted_by_first_idx(&self) -> bool {
134 self.sorted_by_first_idx
135 }
136
137 pub fn iter(
138 &self,
139 ) -> std::iter::Zip<
140 std::iter::Copied<std::slice::Iter<'_, IdxSize>>,
141 std::slice::Iter<'_, IdxVec>,
142 > {
143 self.into_iter()
144 }
145
146 pub fn all(&self) -> &[IdxVec] {
147 &self.all
148 }
149
150 pub fn first(&self) -> &[IdxSize] {
151 &self.first
152 }
153
154 pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
155 &mut self.first
156 }
157
158 pub(crate) fn len(&self) -> usize {
159 self.first.len()
160 }
161
162 pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem<'_> {
163 let first = *self.first.get_unchecked(index);
164 let all = self.all.get_unchecked(index);
165 (first, all)
166 }
167
168 pub fn new_empty() -> Self {
170 Self {
171 sorted_by_first_idx: false,
172 first: vec![0],
173 all: vec![vec![].into()],
174 }
175 }
176}
177
178impl FromIterator<IdxItem> for GroupsIdx {
179 fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
180 let (first, all) = iter.into_iter().unzip();
181 GroupsIdx {
182 sorted_by_first_idx: false,
183 first,
184 all,
185 }
186 }
187}
188
189impl<'a> IntoIterator for &'a GroupsIdx {
190 type Item = BorrowIdxItem<'a>;
191 type IntoIter = std::iter::Zip<
192 std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
193 std::slice::Iter<'a, IdxVec>,
194 >;
195
196 fn into_iter(self) -> Self::IntoIter {
197 self.first.iter().copied().zip(self.all.iter())
198 }
199}
200
201impl IntoIterator for GroupsIdx {
202 type Item = IdxItem;
203 type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
204
205 fn into_iter(mut self) -> Self::IntoIter {
206 let first = std::mem::take(&mut self.first);
207 let all = std::mem::take(&mut self.all);
208 first.into_iter().zip(all)
209 }
210}
211
212impl FromParallelIterator<IdxItem> for GroupsIdx {
213 fn from_par_iter<I>(par_iter: I) -> Self
214 where
215 I: IntoParallelIterator<Item = IdxItem>,
216 {
217 let (first, all) = par_iter.into_par_iter().unzip();
218 GroupsIdx {
219 sorted_by_first_idx: false,
220 first,
221 all,
222 }
223 }
224}
225
226impl<'a> IntoParallelIterator for &'a GroupsIdx {
227 type Iter = rayon::iter::Zip<
228 rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
229 rayon::slice::Iter<'a, IdxVec>,
230 >;
231 type Item = BorrowIdxItem<'a>;
232
233 fn into_par_iter(self) -> Self::Iter {
234 self.first.par_iter().copied().zip(self.all.par_iter())
235 }
236}
237
238impl IntoParallelIterator for GroupsIdx {
239 type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
240 type Item = IdxItem;
241
242 fn into_par_iter(mut self) -> Self::Iter {
243 let first = std::mem::take(&mut self.first);
244 let all = std::mem::take(&mut self.all);
245 first.into_par_iter().zip(all.into_par_iter())
246 }
247}
248
249pub type GroupsSlice = Vec<[IdxSize; 2]>;
258
259#[derive(Debug, Clone, PartialEq, Eq)]
260pub enum GroupsType {
261 Idx(GroupsIdx),
262 Slice {
263 groups: GroupsSlice,
265 overlapping: bool,
268 monotonic: bool,
272 },
273}
274
275impl Default for GroupsType {
276 fn default() -> Self {
277 GroupsType::Idx(GroupsIdx::default())
278 }
279}
280
281impl GroupsType {
282 pub fn new_slice(groups: GroupsSlice, overlapping: bool, monotonic: bool) -> Self {
283 #[cfg(debug_assertions)]
284 {
285 fn groups_overlap(groups: &GroupsSlice) -> bool {
286 if groups.len() < 2 {
287 return false;
288 }
289 let mut groups = groups.clone();
290 groups.sort();
291 let mut prev_end = groups[0][1];
292
293 for g in &groups[1..] {
294 let start = g[0];
295 let end = g[1];
296 if start < prev_end {
297 return true;
298 }
299 if end > prev_end {
300 prev_end = end;
301 }
302 }
303 false
304 }
305
306 assert!(overlapping || !groups_overlap(&groups));
307
308 fn groups_are_monotonic(groups: &GroupsSlice) -> bool {
309 if groups.len() < 2 {
310 return true;
311 }
312
313 let (offset, len) = (groups[0][0], groups[0][1]);
314 let mut prev_start = offset;
315 let mut prev_end = offset + len;
316
317 for g in &groups[1..] {
318 let start = g[0];
319 let end = g[0] + g[1];
320
321 if start < prev_start || end < prev_end {
322 return false;
323 }
324
325 prev_start = start;
326 prev_end = end;
327 }
328 true
329 }
330
331 assert!(!monotonic || groups_are_monotonic(&groups));
332 }
333
334 Self::Slice {
335 groups,
336 overlapping,
337 monotonic,
338 }
339 }
340
341 pub fn into_idx(self) -> GroupsIdx {
342 match self {
343 GroupsType::Idx(groups) => groups,
344 GroupsType::Slice { groups, .. } => {
345 polars_warn!(
346 "Had to reallocate groups, missed an optimization opportunity. Please open an issue."
347 );
348 groups
349 .iter()
350 .map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
351 .collect()
352 },
353 }
354 }
355
356 pub(crate) fn prepare_list_agg(
357 &self,
358 total_len: usize,
359 ) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
360 let mut can_fast_explode = true;
361 match self {
362 GroupsType::Idx(groups) => {
363 let mut list_offset = Vec::with_capacity(self.len() + 1);
364 let mut gather_offsets = Vec::with_capacity(total_len);
365
366 let mut len_so_far = 0i64;
367 list_offset.push(len_so_far);
368
369 for idx in groups {
370 let idx = idx.1;
371 gather_offsets.extend_from_slice(idx);
372 len_so_far += idx.len() as i64;
373 list_offset.push(len_so_far);
374 can_fast_explode &= !idx.is_empty();
375 }
376 unsafe {
377 (
378 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
379 OffsetsBuffer::new_unchecked(list_offset.into()),
380 can_fast_explode,
381 )
382 }
383 },
384 GroupsType::Slice { groups, .. } => {
385 let mut list_offset = Vec::with_capacity(self.len() + 1);
386 let mut gather_offsets = Vec::with_capacity(total_len);
387 let mut len_so_far = 0i64;
388 list_offset.push(len_so_far);
389
390 for g in groups {
391 let len = g[1];
392 let offset = g[0];
393 gather_offsets.extend(offset..offset + len);
394
395 len_so_far += len as i64;
396 list_offset.push(len_so_far);
397 can_fast_explode &= len > 0;
398 }
399
400 unsafe {
401 (
402 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
403 OffsetsBuffer::new_unchecked(list_offset.into()),
404 can_fast_explode,
405 )
406 }
407 },
408 }
409 }
410
411 pub fn iter(&self) -> GroupsTypeIter<'_> {
412 GroupsTypeIter::new(self)
413 }
414
415 pub fn sort_by_first_idx(&mut self) {
416 match self {
417 GroupsType::Idx(groups) => {
418 if !groups.is_sorted_by_first_idx() {
419 groups.sort_by_first_idx()
420 }
421 },
422 GroupsType::Slice { .. } => {
423 },
425 }
426 }
427
428 pub(crate) fn is_sorted_by_first_idx(&self) -> bool {
429 match self {
430 GroupsType::Idx(groups) => groups.is_sorted_by_first_idx(),
431 GroupsType::Slice { .. } => true,
432 }
433 }
434
435 pub fn is_overlapping(&self) -> bool {
436 matches!(
437 self,
438 GroupsType::Slice {
439 overlapping: true,
440 ..
441 }
442 )
443 }
444
445 pub fn is_monotonic(&self) -> bool {
446 matches!(
447 self,
448 GroupsType::Slice {
449 monotonic: true,
450 ..
451 }
452 )
453 }
454
455 pub fn take_group_firsts(self) -> Vec<IdxSize> {
456 match self {
457 GroupsType::Idx(mut groups) => std::mem::take(&mut groups.first),
458 GroupsType::Slice { groups, .. } => {
459 groups.into_iter().map(|[first, _len]| first).collect()
460 },
461 }
462 }
463
464 pub fn check_lengths(self: &GroupsType, other: &GroupsType) -> PolarsResult<()> {
467 if std::ptr::eq(self, other) {
468 return Ok(());
469 }
470 polars_ensure!(self.iter().zip(other.iter()).all(|(a, b)| {
471 a.len() == b.len()
472 }), ShapeMismatch: "expressions must have matching group lengths");
473 Ok(())
474 }
475
476 pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
480 match self {
481 GroupsType::Idx(groups) => groups
482 .all
483 .iter()
484 .map(|idx| *idx.get_unchecked(idx.len() - 1))
485 .collect(),
486 GroupsType::Slice { groups, .. } => groups
487 .into_iter()
488 .map(|[first, len]| first + len - 1)
489 .collect(),
490 }
491 }
492
493 pub fn par_iter(&self) -> GroupsTypeParIter<'_> {
494 GroupsTypeParIter::new(self)
495 }
496
497 pub fn unwrap_idx(&self) -> &GroupsIdx {
503 match self {
504 GroupsType::Idx(groups) => groups,
505 GroupsType::Slice { .. } => panic!("groups are slices not index"),
506 }
507 }
508
509 pub fn unwrap_slice(&self) -> &GroupsSlice {
515 match self {
516 GroupsType::Slice { groups, .. } => groups,
517 GroupsType::Idx(_) => panic!("groups are index not slices"),
518 }
519 }
520
521 pub fn get(&self, index: usize) -> GroupsIndicator<'_> {
522 match self {
523 GroupsType::Idx(groups) => {
524 let first = groups.first[index];
525 let all = &groups.all[index];
526 GroupsIndicator::Idx((first, all))
527 },
528 GroupsType::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
529 }
530 }
531
532 pub fn idx_mut(&mut self) -> &mut GroupsIdx {
538 match self {
539 GroupsType::Idx(groups) => groups,
540 GroupsType::Slice { .. } => panic!("groups are slices not index"),
541 }
542 }
543
544 pub fn len(&self) -> usize {
545 match self {
546 GroupsType::Idx(groups) => groups.len(),
547 GroupsType::Slice { groups, .. } => groups.len(),
548 }
549 }
550
551 pub fn is_empty(&self) -> bool {
552 self.len() == 0
553 }
554
555 pub fn group_count(&self) -> IdxCa {
556 match self {
557 GroupsType::Idx(groups) => {
558 let ca: NoNull<IdxCa> = groups
559 .iter()
560 .map(|(_first, idx)| idx.len() as IdxSize)
561 .collect_trusted();
562 ca.into_inner()
563 },
564 GroupsType::Slice { groups, .. } => {
565 let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
566 ca.into_inner()
567 },
568 }
569 }
570 pub fn as_list_chunked(&self) -> ListChunked {
571 match self {
572 GroupsType::Idx(groups) => groups
573 .iter()
574 .map(|(_first, idx)| {
575 let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
576 ca.into_inner().into_series()
577 })
578 .collect_trusted(),
579 GroupsType::Slice { groups, .. } => groups
580 .iter()
581 .map(|&[first, len]| {
582 let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
583 ca.into_inner().into_series()
584 })
585 .collect_trusted(),
586 }
587 }
588
589 pub fn into_sliceable(self) -> GroupPositions {
590 let len = self.len();
591 slice_groups(Arc::new(self), 0, len)
592 }
593
594 pub fn num_elements(&self) -> usize {
595 match self {
596 GroupsType::Idx(i) => i.all().iter().map(|v| v.len()).sum(),
597 GroupsType::Slice {
598 groups,
599 overlapping: _,
600 monotonic: _,
601 } => groups.iter().map(|[_, l]| *l as usize).sum(),
602 }
603 }
604}
605
606impl From<GroupsIdx> for GroupsType {
607 fn from(groups: GroupsIdx) -> Self {
608 GroupsType::Idx(groups)
609 }
610}
611
612pub enum GroupsIndicator<'a> {
613 Idx(BorrowIdxItem<'a>),
614 Slice([IdxSize; 2]),
615}
616
617impl GroupsIndicator<'_> {
618 pub fn len(&self) -> usize {
619 match self {
620 GroupsIndicator::Idx(g) => g.1.len(),
621 GroupsIndicator::Slice([_, len]) => *len as usize,
622 }
623 }
624 pub fn first(&self) -> IdxSize {
625 match self {
626 GroupsIndicator::Idx(g) => g.0,
627 GroupsIndicator::Slice([first, _]) => *first,
628 }
629 }
630 pub fn is_empty(&self) -> bool {
631 self.len() == 0
632 }
633}
634
635pub struct GroupsTypeIter<'a> {
636 vals: &'a GroupsType,
637 len: usize,
638 idx: usize,
639}
640
641impl<'a> GroupsTypeIter<'a> {
642 fn new(vals: &'a GroupsType) -> Self {
643 let len = vals.len();
644 let idx = 0;
645 GroupsTypeIter { vals, len, idx }
646 }
647}
648
649impl<'a> Iterator for GroupsTypeIter<'a> {
650 type Item = GroupsIndicator<'a>;
651
652 fn nth(&mut self, n: usize) -> Option<Self::Item> {
653 self.idx = self.idx.saturating_add(n);
654 self.next()
655 }
656
657 fn next(&mut self) -> Option<Self::Item> {
658 if self.idx >= self.len {
659 return None;
660 }
661
662 let out = unsafe {
663 match self.vals {
664 GroupsType::Idx(groups) => {
665 let item = groups.get_unchecked(self.idx);
666 Some(GroupsIndicator::Idx(item))
667 },
668 GroupsType::Slice { groups, .. } => {
669 Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
670 },
671 }
672 };
673 self.idx += 1;
674 out
675 }
676}
677
678pub struct GroupsTypeParIter<'a> {
679 vals: &'a GroupsType,
680 len: usize,
681}
682
683impl<'a> GroupsTypeParIter<'a> {
684 fn new(vals: &'a GroupsType) -> Self {
685 let len = vals.len();
686 GroupsTypeParIter { vals, len }
687 }
688}
689
690impl<'a> ParallelIterator for GroupsTypeParIter<'a> {
691 type Item = GroupsIndicator<'a>;
692
693 fn drive_unindexed<C>(self, consumer: C) -> C::Result
694 where
695 C: UnindexedConsumer<Self::Item>,
696 {
697 (0..self.len)
698 .into_par_iter()
699 .map(|i| unsafe {
700 match self.vals {
701 GroupsType::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
702 GroupsType::Slice { groups, .. } => {
703 GroupsIndicator::Slice(*groups.get_unchecked(i))
704 },
705 }
706 })
707 .drive_unindexed(consumer)
708 }
709}
710
711#[derive(Debug)]
712pub struct GroupPositions {
713 sliced: ManuallyDrop<GroupsType>,
717 original: Arc<GroupsType>,
719 offset: i64,
720 len: usize,
721}
722
723impl Clone for GroupPositions {
724 fn clone(&self) -> Self {
725 let sliced = slice_groups_inner(&self.original, self.offset, self.len);
726
727 Self {
728 sliced,
729 original: self.original.clone(),
730 offset: self.offset,
731 len: self.len,
732 }
733 }
734}
735
736impl AsRef<GroupsType> for GroupPositions {
737 fn as_ref(&self) -> &GroupsType {
738 self.sliced.deref()
739 }
740}
741
742impl Deref for GroupPositions {
743 type Target = GroupsType;
744
745 fn deref(&self) -> &Self::Target {
746 self.sliced.deref()
747 }
748}
749
750impl Default for GroupPositions {
751 fn default() -> Self {
752 GroupsType::default().into_sliceable()
753 }
754}
755
756impl GroupPositions {
757 pub fn slice(&self, offset: i64, len: usize) -> Self {
758 let offset = self.offset + offset;
759 slice_groups(self.original.clone(), offset, len)
760 }
761
762 pub fn sort_by_first_idx(&mut self) {
763 if !self.as_ref().is_sorted_by_first_idx() {
764 let original = Arc::make_mut(&mut self.original);
765 original.sort_by_first_idx();
766
767 self.sliced = slice_groups_inner(original, self.offset, self.len);
768 }
769 }
770
771 pub fn unroll(mut self) -> GroupPositions {
772 match self.sliced.deref_mut() {
773 GroupsType::Idx(_) => self,
774 GroupsType::Slice {
775 overlapping: false, ..
776 } => self,
777 GroupsType::Slice { groups, .. } => {
778 let mut cum_offset = 0 as IdxSize;
781 let groups: Vec<_> = groups
782 .iter()
783 .map(|[_, len]| {
784 let new = [cum_offset, *len];
785 cum_offset += *len;
786 new
787 })
788 .collect();
789
790 GroupsType::new_slice(groups, false, true).into_sliceable()
791 },
792 }
793 }
794
795 pub fn as_unrolled_slice(&self) -> Option<&GroupsSlice> {
796 match &*self.sliced {
797 GroupsType::Idx(_) => None,
798 GroupsType::Slice {
799 groups: _,
800 overlapping: true,
801 monotonic: _,
802 } => None,
803 GroupsType::Slice {
804 groups,
805 overlapping: false,
806 monotonic: _,
807 } => Some(groups),
808 }
809 }
810
811 pub fn is_same(&self, other: &Self) -> bool {
813 Arc::ptr_eq(&self.original, &other.original)
814 && self.offset == other.offset
815 && self.len == other.len
816 }
817}
818
819fn slice_groups_inner(g: &GroupsType, offset: i64, len: usize) -> ManuallyDrop<GroupsType> {
820 match g {
826 GroupsType::Idx(groups) => {
827 let first = unsafe {
828 let first = slice_slice(groups.first(), offset, len);
829 let ptr = first.as_ptr() as *mut _;
830 Vec::from_raw_parts(ptr, first.len(), first.len())
831 };
832
833 let all = unsafe {
834 let all = slice_slice(groups.all(), offset, len);
835 let ptr = all.as_ptr() as *mut _;
836 Vec::from_raw_parts(ptr, all.len(), all.len())
837 };
838 ManuallyDrop::new(GroupsType::Idx(GroupsIdx::new(
839 first,
840 all,
841 groups.is_sorted_by_first_idx(),
842 )))
843 },
844 GroupsType::Slice {
845 groups,
846 overlapping,
847 monotonic,
848 } => {
849 let groups = unsafe {
850 let groups = slice_slice(groups, offset, len);
851 let ptr = groups.as_ptr() as *mut _;
852 Vec::from_raw_parts(ptr, groups.len(), groups.len())
853 };
854
855 ManuallyDrop::new(GroupsType::new_slice(groups, *overlapping, *monotonic))
856 },
857 }
858}
859
860fn slice_groups(g: Arc<GroupsType>, offset: i64, len: usize) -> GroupPositions {
861 let sliced = slice_groups_inner(g.as_ref(), offset, len);
862
863 GroupPositions {
864 sliced,
865 original: g,
866 offset,
867 len,
868 }
869}