1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3
4use arrow::offset::OffsetsBuffer;
5use polars_utils::idx_vec::IdxVec;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::prelude::*;
8
9use crate::POOL;
10use crate::prelude::*;
11use crate::utils::{NoNull, flatten, slice_slice};
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
16pub struct GroupsIdx {
17 pub(crate) sorted: bool,
18 first: Vec<IdxSize>,
20 all: Vec<IdxVec>,
22}
23
24pub type IdxItem = (IdxSize, IdxVec);
25pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
26
27impl Drop for GroupsIdx {
28 fn drop(&mut self) {
29 let v = std::mem::take(&mut self.all);
30 #[cfg(not(target_family = "wasm"))]
33 if v.len() > 1 << 16 {
34 std::thread::spawn(move || drop(v));
35 } else {
36 drop(v);
37 }
38
39 #[cfg(target_family = "wasm")]
40 drop(v);
41 }
42}
43
44impl From<Vec<IdxItem>> for GroupsIdx {
45 fn from(v: Vec<IdxItem>) -> Self {
46 v.into_iter().collect()
47 }
48}
49
50impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
51 fn from(v: Vec<Vec<IdxItem>>) -> Self {
52 let (cap, offsets) = flatten::cap_and_offsets(&v);
55 let mut first = Vec::with_capacity(cap);
56 let first_ptr = first.as_ptr() as usize;
57 let mut all = Vec::with_capacity(cap);
58 let all_ptr = all.as_ptr() as usize;
59
60 POOL.install(|| {
61 v.into_par_iter()
62 .zip(offsets)
63 .for_each(|(mut inner, offset)| {
64 unsafe {
65 let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
66 let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
67
68 let inner_ptr = inner.as_mut_ptr();
69 for i in 0..inner.len() {
70 let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
71 std::ptr::write(first.add(i), first_val);
72 std::ptr::write(all.add(i), vals);
73 }
74 inner.set_len(0);
77 }
78 });
79 });
80 unsafe {
81 all.set_len(cap);
82 first.set_len(cap);
83 }
84 GroupsIdx {
85 sorted: false,
86 first,
87 all,
88 }
89 }
90}
91
92impl GroupsIdx {
93 pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted: bool) -> Self {
94 Self { sorted, first, all }
95 }
96
97 pub fn sort(&mut self) {
98 if self.sorted {
99 return;
100 }
101 let mut idx = 0;
102 let first = std::mem::take(&mut self.first);
103 let mut idx_vals = first
105 .into_iter()
106 .map(|v| {
107 let out = [idx, v];
108 idx += 1;
109 out
110 })
111 .collect_trusted::<Vec<_>>();
112 idx_vals.sort_unstable_by_key(|v| v[1]);
113
114 let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
115 let take_all = || {
116 idx_vals
117 .iter()
118 .map(|v| unsafe {
119 let idx = v[0] as usize;
120 std::mem::take(self.all.get_unchecked_mut(idx))
121 })
122 .collect_trusted::<Vec<_>>()
123 };
124 let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
125 self.first = first;
126 self.all = all;
127 self.sorted = true
128 }
129 pub fn is_sorted_flag(&self) -> bool {
130 self.sorted
131 }
132
133 pub fn iter(
134 &self,
135 ) -> std::iter::Zip<
136 std::iter::Copied<std::slice::Iter<'_, IdxSize>>,
137 std::slice::Iter<'_, IdxVec>,
138 > {
139 self.into_iter()
140 }
141
142 pub fn all(&self) -> &[IdxVec] {
143 &self.all
144 }
145
146 pub fn first(&self) -> &[IdxSize] {
147 &self.first
148 }
149
150 pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
151 &mut self.first
152 }
153
154 pub(crate) fn len(&self) -> usize {
155 self.first.len()
156 }
157
158 pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem<'_> {
159 let first = *self.first.get_unchecked(index);
160 let all = self.all.get_unchecked(index);
161 (first, all)
162 }
163
164 pub fn new_empty() -> Self {
166 Self {
167 sorted: false,
168 first: vec![0],
169 all: vec![vec![].into()],
170 }
171 }
172}
173
174impl FromIterator<IdxItem> for GroupsIdx {
175 fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
176 let (first, all) = iter.into_iter().unzip();
177 GroupsIdx {
178 sorted: false,
179 first,
180 all,
181 }
182 }
183}
184
185impl<'a> IntoIterator for &'a GroupsIdx {
186 type Item = BorrowIdxItem<'a>;
187 type IntoIter = std::iter::Zip<
188 std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
189 std::slice::Iter<'a, IdxVec>,
190 >;
191
192 fn into_iter(self) -> Self::IntoIter {
193 self.first.iter().copied().zip(self.all.iter())
194 }
195}
196
197impl IntoIterator for GroupsIdx {
198 type Item = IdxItem;
199 type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
200
201 fn into_iter(mut self) -> Self::IntoIter {
202 let first = std::mem::take(&mut self.first);
203 let all = std::mem::take(&mut self.all);
204 first.into_iter().zip(all)
205 }
206}
207
208impl FromParallelIterator<IdxItem> for GroupsIdx {
209 fn from_par_iter<I>(par_iter: I) -> Self
210 where
211 I: IntoParallelIterator<Item = IdxItem>,
212 {
213 let (first, all) = par_iter.into_par_iter().unzip();
214 GroupsIdx {
215 sorted: false,
216 first,
217 all,
218 }
219 }
220}
221
222impl<'a> IntoParallelIterator for &'a GroupsIdx {
223 type Iter = rayon::iter::Zip<
224 rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
225 rayon::slice::Iter<'a, IdxVec>,
226 >;
227 type Item = BorrowIdxItem<'a>;
228
229 fn into_par_iter(self) -> Self::Iter {
230 self.first.par_iter().copied().zip(self.all.par_iter())
231 }
232}
233
234impl IntoParallelIterator for GroupsIdx {
235 type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
236 type Item = IdxItem;
237
238 fn into_par_iter(mut self) -> Self::Iter {
239 let first = std::mem::take(&mut self.first);
240 let all = std::mem::take(&mut self.all);
241 first.into_par_iter().zip(all.into_par_iter())
242 }
243}
244
245pub type GroupsSlice = Vec<[IdxSize; 2]>;
253
254#[derive(Debug, Clone, PartialEq, Eq)]
255pub enum GroupsType {
256 Idx(GroupsIdx),
257 Slice {
259 groups: GroupsSlice,
261 overlapping: bool,
264 monotonic: bool,
268 },
269}
270
271impl Default for GroupsType {
272 fn default() -> Self {
273 GroupsType::Idx(GroupsIdx::default())
274 }
275}
276
277impl GroupsType {
278 pub fn new_slice(groups: GroupsSlice, overlapping: bool, monotonic: bool) -> Self {
279 #[cfg(debug_assertions)]
280 {
281 fn groups_overlap(groups: &GroupsSlice) -> bool {
282 if groups.len() < 2 {
283 return false;
284 }
285 let mut groups = groups.clone();
286 groups.sort();
287 let mut prev_end = groups[0][1];
288
289 for g in &groups[1..] {
290 let start = g[0];
291 let end = g[1];
292 if start < prev_end {
293 return true;
294 }
295 if end > prev_end {
296 prev_end = end;
297 }
298 }
299 false
300 }
301
302 assert!(overlapping || !groups_overlap(&groups));
303
304 fn groups_are_monotonic(groups: &GroupsSlice) -> bool {
305 if groups.len() < 2 {
306 return true;
307 }
308
309 let (offset, len) = (groups[0][0], groups[0][1]);
310 let mut prev_start = offset;
311 let mut prev_end = offset + len;
312
313 for g in &groups[1..] {
314 let start = g[0];
315 let end = g[0] + g[1];
316
317 if start < prev_start || end < prev_end {
318 return false;
319 }
320
321 prev_start = start;
322 prev_end = end;
323 }
324 true
325 }
326
327 assert!(!monotonic || groups_are_monotonic(&groups));
328 }
329
330 Self::Slice {
331 groups,
332 overlapping,
333 monotonic,
334 }
335 }
336
337 pub fn into_idx(self) -> GroupsIdx {
338 match self {
339 GroupsType::Idx(groups) => groups,
340 GroupsType::Slice { groups, .. } => {
341 polars_warn!(
342 "Had to reallocate groups, missed an optimization opportunity. Please open an issue."
343 );
344 groups
345 .iter()
346 .map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
347 .collect()
348 },
349 }
350 }
351
352 pub(crate) fn prepare_list_agg(
353 &self,
354 total_len: usize,
355 ) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
356 let mut can_fast_explode = true;
357 match self {
358 GroupsType::Idx(groups) => {
359 let mut list_offset = Vec::with_capacity(self.len() + 1);
360 let mut gather_offsets = Vec::with_capacity(total_len);
361
362 let mut len_so_far = 0i64;
363 list_offset.push(len_so_far);
364
365 for idx in groups {
366 let idx = idx.1;
367 gather_offsets.extend_from_slice(idx);
368 len_so_far += idx.len() as i64;
369 list_offset.push(len_so_far);
370 can_fast_explode &= !idx.is_empty();
371 }
372 unsafe {
373 (
374 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
375 OffsetsBuffer::new_unchecked(list_offset.into()),
376 can_fast_explode,
377 )
378 }
379 },
380 GroupsType::Slice { groups, .. } => {
381 let mut list_offset = Vec::with_capacity(self.len() + 1);
382 let mut gather_offsets = Vec::with_capacity(total_len);
383 let mut len_so_far = 0i64;
384 list_offset.push(len_so_far);
385
386 for g in groups {
387 let len = g[1];
388 let offset = g[0];
389 gather_offsets.extend(offset..offset + len);
390
391 len_so_far += len as i64;
392 list_offset.push(len_so_far);
393 can_fast_explode &= len > 0;
394 }
395
396 unsafe {
397 (
398 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
399 OffsetsBuffer::new_unchecked(list_offset.into()),
400 can_fast_explode,
401 )
402 }
403 },
404 }
405 }
406
407 pub fn iter(&self) -> GroupsTypeIter<'_> {
408 GroupsTypeIter::new(self)
409 }
410
411 pub fn sort(&mut self) {
412 match self {
413 GroupsType::Idx(groups) => {
414 if !groups.is_sorted_flag() {
415 groups.sort()
416 }
417 },
418 GroupsType::Slice { .. } => {
419 },
421 }
422 }
423
424 pub(crate) fn is_sorted_flag(&self) -> bool {
425 match self {
426 GroupsType::Idx(groups) => groups.is_sorted_flag(),
427 GroupsType::Slice { .. } => true,
428 }
429 }
430
431 pub fn is_overlapping(&self) -> bool {
432 matches!(
433 self,
434 GroupsType::Slice {
435 overlapping: true,
436 ..
437 }
438 )
439 }
440
441 pub fn is_monotonic(&self) -> bool {
442 matches!(
443 self,
444 GroupsType::Slice {
445 monotonic: true,
446 ..
447 }
448 )
449 }
450
451 pub fn take_group_firsts(self) -> Vec<IdxSize> {
452 match self {
453 GroupsType::Idx(mut groups) => std::mem::take(&mut groups.first),
454 GroupsType::Slice { groups, .. } => {
455 groups.into_iter().map(|[first, _len]| first).collect()
456 },
457 }
458 }
459
460 pub fn check_lengths(self: &GroupsType, other: &GroupsType) -> PolarsResult<()> {
463 if std::ptr::eq(self, other) {
464 return Ok(());
465 }
466 polars_ensure!(self.iter().zip(other.iter()).all(|(a, b)| {
467 a.len() == b.len()
468 }), ShapeMismatch: "expressions must have matching group lengths");
469 Ok(())
470 }
471
472 pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
476 match self {
477 GroupsType::Idx(groups) => groups
478 .all
479 .iter()
480 .map(|idx| *idx.get_unchecked(idx.len() - 1))
481 .collect(),
482 GroupsType::Slice { groups, .. } => groups
483 .into_iter()
484 .map(|[first, len]| first + len - 1)
485 .collect(),
486 }
487 }
488
489 pub fn par_iter(&self) -> GroupsTypeParIter<'_> {
490 GroupsTypeParIter::new(self)
491 }
492
493 pub fn unwrap_idx(&self) -> &GroupsIdx {
499 match self {
500 GroupsType::Idx(groups) => groups,
501 GroupsType::Slice { .. } => panic!("groups are slices not index"),
502 }
503 }
504
505 pub fn unwrap_slice(&self) -> &GroupsSlice {
511 match self {
512 GroupsType::Slice { groups, .. } => groups,
513 GroupsType::Idx(_) => panic!("groups are index not slices"),
514 }
515 }
516
517 pub fn get(&self, index: usize) -> GroupsIndicator<'_> {
518 match self {
519 GroupsType::Idx(groups) => {
520 let first = groups.first[index];
521 let all = &groups.all[index];
522 GroupsIndicator::Idx((first, all))
523 },
524 GroupsType::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
525 }
526 }
527
528 pub fn idx_mut(&mut self) -> &mut GroupsIdx {
534 match self {
535 GroupsType::Idx(groups) => groups,
536 GroupsType::Slice { .. } => panic!("groups are slices not index"),
537 }
538 }
539
540 pub fn len(&self) -> usize {
541 match self {
542 GroupsType::Idx(groups) => groups.len(),
543 GroupsType::Slice { groups, .. } => groups.len(),
544 }
545 }
546
547 pub fn is_empty(&self) -> bool {
548 self.len() == 0
549 }
550
551 pub fn group_count(&self) -> IdxCa {
552 match self {
553 GroupsType::Idx(groups) => {
554 let ca: NoNull<IdxCa> = groups
555 .iter()
556 .map(|(_first, idx)| idx.len() as IdxSize)
557 .collect_trusted();
558 ca.into_inner()
559 },
560 GroupsType::Slice { groups, .. } => {
561 let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
562 ca.into_inner()
563 },
564 }
565 }
566 pub fn as_list_chunked(&self) -> ListChunked {
567 match self {
568 GroupsType::Idx(groups) => groups
569 .iter()
570 .map(|(_first, idx)| {
571 let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
572 ca.into_inner().into_series()
573 })
574 .collect_trusted(),
575 GroupsType::Slice { groups, .. } => groups
576 .iter()
577 .map(|&[first, len]| {
578 let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
579 ca.into_inner().into_series()
580 })
581 .collect_trusted(),
582 }
583 }
584
585 pub fn into_sliceable(self) -> GroupPositions {
586 let len = self.len();
587 slice_groups(Arc::new(self), 0, len)
588 }
589
590 pub fn num_elements(&self) -> usize {
591 match self {
592 GroupsType::Idx(i) => i.all().iter().map(|v| v.len()).sum(),
593 GroupsType::Slice {
594 groups,
595 overlapping: _,
596 monotonic: _,
597 } => groups.iter().map(|[_, l]| *l as usize).sum(),
598 }
599 }
600}
601
602impl From<GroupsIdx> for GroupsType {
603 fn from(groups: GroupsIdx) -> Self {
604 GroupsType::Idx(groups)
605 }
606}
607
608pub enum GroupsIndicator<'a> {
609 Idx(BorrowIdxItem<'a>),
610 Slice([IdxSize; 2]),
611}
612
613impl GroupsIndicator<'_> {
614 pub fn len(&self) -> usize {
615 match self {
616 GroupsIndicator::Idx(g) => g.1.len(),
617 GroupsIndicator::Slice([_, len]) => *len as usize,
618 }
619 }
620 pub fn first(&self) -> IdxSize {
621 match self {
622 GroupsIndicator::Idx(g) => g.0,
623 GroupsIndicator::Slice([first, _]) => *first,
624 }
625 }
626 pub fn is_empty(&self) -> bool {
627 self.len() == 0
628 }
629}
630
631pub struct GroupsTypeIter<'a> {
632 vals: &'a GroupsType,
633 len: usize,
634 idx: usize,
635}
636
637impl<'a> GroupsTypeIter<'a> {
638 fn new(vals: &'a GroupsType) -> Self {
639 let len = vals.len();
640 let idx = 0;
641 GroupsTypeIter { vals, len, idx }
642 }
643}
644
645impl<'a> Iterator for GroupsTypeIter<'a> {
646 type Item = GroupsIndicator<'a>;
647
648 fn nth(&mut self, n: usize) -> Option<Self::Item> {
649 self.idx = self.idx.saturating_add(n);
650 self.next()
651 }
652
653 fn next(&mut self) -> Option<Self::Item> {
654 if self.idx >= self.len {
655 return None;
656 }
657
658 let out = unsafe {
659 match self.vals {
660 GroupsType::Idx(groups) => {
661 let item = groups.get_unchecked(self.idx);
662 Some(GroupsIndicator::Idx(item))
663 },
664 GroupsType::Slice { groups, .. } => {
665 Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
666 },
667 }
668 };
669 self.idx += 1;
670 out
671 }
672}
673
674pub struct GroupsTypeParIter<'a> {
675 vals: &'a GroupsType,
676 len: usize,
677}
678
679impl<'a> GroupsTypeParIter<'a> {
680 fn new(vals: &'a GroupsType) -> Self {
681 let len = vals.len();
682 GroupsTypeParIter { vals, len }
683 }
684}
685
686impl<'a> ParallelIterator for GroupsTypeParIter<'a> {
687 type Item = GroupsIndicator<'a>;
688
689 fn drive_unindexed<C>(self, consumer: C) -> C::Result
690 where
691 C: UnindexedConsumer<Self::Item>,
692 {
693 (0..self.len)
694 .into_par_iter()
695 .map(|i| unsafe {
696 match self.vals {
697 GroupsType::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
698 GroupsType::Slice { groups, .. } => {
699 GroupsIndicator::Slice(*groups.get_unchecked(i))
700 },
701 }
702 })
703 .drive_unindexed(consumer)
704 }
705}
706
707#[derive(Debug)]
708pub struct GroupPositions {
709 sliced: ManuallyDrop<GroupsType>,
713 original: Arc<GroupsType>,
715 offset: i64,
716 len: usize,
717}
718
719impl Clone for GroupPositions {
720 fn clone(&self) -> Self {
721 let sliced = slice_groups_inner(&self.original, self.offset, self.len);
722
723 Self {
724 sliced,
725 original: self.original.clone(),
726 offset: self.offset,
727 len: self.len,
728 }
729 }
730}
731
732impl AsRef<GroupsType> for GroupPositions {
733 fn as_ref(&self) -> &GroupsType {
734 self.sliced.deref()
735 }
736}
737
738impl Deref for GroupPositions {
739 type Target = GroupsType;
740
741 fn deref(&self) -> &Self::Target {
742 self.sliced.deref()
743 }
744}
745
746impl Default for GroupPositions {
747 fn default() -> Self {
748 GroupsType::default().into_sliceable()
749 }
750}
751
752impl GroupPositions {
753 pub fn slice(&self, offset: i64, len: usize) -> Self {
754 let offset = self.offset + offset;
755 slice_groups(self.original.clone(), offset, len)
756 }
757
758 pub fn sort(&mut self) {
759 if !self.as_ref().is_sorted_flag() {
760 let original = Arc::make_mut(&mut self.original);
761 original.sort();
762
763 self.sliced = slice_groups_inner(original, self.offset, self.len);
764 }
765 }
766
767 pub fn unroll(mut self) -> GroupPositions {
768 match self.sliced.deref_mut() {
769 GroupsType::Idx(_) => self,
770 GroupsType::Slice {
771 overlapping: false, ..
772 } => self,
773 GroupsType::Slice { groups, .. } => {
774 let mut cum_offset = 0 as IdxSize;
777 let groups: Vec<_> = groups
778 .iter()
779 .map(|[_, len]| {
780 let new = [cum_offset, *len];
781 cum_offset += *len;
782 new
783 })
784 .collect();
785
786 GroupsType::new_slice(groups, false, true).into_sliceable()
787 },
788 }
789 }
790
791 pub fn as_unrolled_slice(&self) -> Option<&GroupsSlice> {
792 match &*self.sliced {
793 GroupsType::Idx(_) => None,
794 GroupsType::Slice {
795 groups: _,
796 overlapping: true,
797 monotonic: _,
798 } => None,
799 GroupsType::Slice {
800 groups,
801 overlapping: false,
802 monotonic: _,
803 } => Some(groups),
804 }
805 }
806
807 pub fn is_same(&self, other: &Self) -> bool {
809 Arc::ptr_eq(&self.original, &other.original)
810 && self.offset == other.offset
811 && self.len == other.len
812 }
813}
814
815fn slice_groups_inner(g: &GroupsType, offset: i64, len: usize) -> ManuallyDrop<GroupsType> {
816 match g {
822 GroupsType::Idx(groups) => {
823 let first = unsafe {
824 let first = slice_slice(groups.first(), offset, len);
825 let ptr = first.as_ptr() as *mut _;
826 Vec::from_raw_parts(ptr, first.len(), first.len())
827 };
828
829 let all = unsafe {
830 let all = slice_slice(groups.all(), offset, len);
831 let ptr = all.as_ptr() as *mut _;
832 Vec::from_raw_parts(ptr, all.len(), all.len())
833 };
834 ManuallyDrop::new(GroupsType::Idx(GroupsIdx::new(
835 first,
836 all,
837 groups.is_sorted_flag(),
838 )))
839 },
840 GroupsType::Slice {
841 groups,
842 overlapping,
843 monotonic,
844 } => {
845 let groups = unsafe {
846 let groups = slice_slice(groups, offset, len);
847 let ptr = groups.as_ptr() as *mut _;
848 Vec::from_raw_parts(ptr, groups.len(), groups.len())
849 };
850
851 ManuallyDrop::new(GroupsType::new_slice(groups, *overlapping, *monotonic))
852 },
853 }
854}
855
856fn slice_groups(g: Arc<GroupsType>, offset: i64, len: usize) -> GroupPositions {
857 let sliced = slice_groups_inner(g.as_ref(), offset, len);
858
859 GroupPositions {
860 sliced,
861 original: g,
862 offset,
863 len,
864 }
865}