1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3
4use arrow::offset::OffsetsBuffer;
5use polars_utils::idx_vec::IdxVec;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::prelude::*;
8
9use crate::POOL;
10use crate::prelude::*;
11use crate::utils::{NoNull, flatten, slice_slice};
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
16pub struct GroupsIdx {
17 pub(crate) sorted: bool,
18 first: Vec<IdxSize>,
19 all: Vec<IdxVec>,
20}
21
22pub type IdxItem = (IdxSize, IdxVec);
23pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
24
25impl Drop for GroupsIdx {
26 fn drop(&mut self) {
27 let v = std::mem::take(&mut self.all);
28 #[cfg(not(target_family = "wasm"))]
31 if v.len() > 1 << 16 {
32 std::thread::spawn(move || drop(v));
33 } else {
34 drop(v);
35 }
36
37 #[cfg(target_family = "wasm")]
38 drop(v);
39 }
40}
41
42impl From<Vec<IdxItem>> for GroupsIdx {
43 fn from(v: Vec<IdxItem>) -> Self {
44 v.into_iter().collect()
45 }
46}
47
48impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
49 fn from(v: Vec<Vec<IdxItem>>) -> Self {
50 let (cap, offsets) = flatten::cap_and_offsets(&v);
53 let mut first = Vec::with_capacity(cap);
54 let first_ptr = first.as_ptr() as usize;
55 let mut all = Vec::with_capacity(cap);
56 let all_ptr = all.as_ptr() as usize;
57
58 POOL.install(|| {
59 v.into_par_iter()
60 .zip(offsets)
61 .for_each(|(mut inner, offset)| {
62 unsafe {
63 let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
64 let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
65
66 let inner_ptr = inner.as_mut_ptr();
67 for i in 0..inner.len() {
68 let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
69 std::ptr::write(first.add(i), first_val);
70 std::ptr::write(all.add(i), vals);
71 }
72 inner.set_len(0);
75 }
76 });
77 });
78 unsafe {
79 all.set_len(cap);
80 first.set_len(cap);
81 }
82 GroupsIdx {
83 sorted: false,
84 first,
85 all,
86 }
87 }
88}
89
90impl GroupsIdx {
91 pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted: bool) -> Self {
92 Self { sorted, first, all }
93 }
94
95 pub fn sort(&mut self) {
96 if self.sorted {
97 return;
98 }
99 let mut idx = 0;
100 let first = std::mem::take(&mut self.first);
101 let mut idx_vals = first
103 .into_iter()
104 .map(|v| {
105 let out = [idx, v];
106 idx += 1;
107 out
108 })
109 .collect_trusted::<Vec<_>>();
110 idx_vals.sort_unstable_by_key(|v| v[1]);
111
112 let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
113 let take_all = || {
114 idx_vals
115 .iter()
116 .map(|v| unsafe {
117 let idx = v[0] as usize;
118 std::mem::take(self.all.get_unchecked_mut(idx))
119 })
120 .collect_trusted::<Vec<_>>()
121 };
122 let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
123 self.first = first;
124 self.all = all;
125 self.sorted = true
126 }
127 pub fn is_sorted_flag(&self) -> bool {
128 self.sorted
129 }
130
131 pub fn iter(
132 &self,
133 ) -> std::iter::Zip<
134 std::iter::Copied<std::slice::Iter<'_, IdxSize>>,
135 std::slice::Iter<'_, IdxVec>,
136 > {
137 self.into_iter()
138 }
139
140 pub fn all(&self) -> &[IdxVec] {
141 &self.all
142 }
143
144 pub fn first(&self) -> &[IdxSize] {
145 &self.first
146 }
147
148 pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
149 &mut self.first
150 }
151
152 pub(crate) fn len(&self) -> usize {
153 self.first.len()
154 }
155
156 pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem<'_> {
157 let first = *self.first.get_unchecked(index);
158 let all = self.all.get_unchecked(index);
159 (first, all)
160 }
161
162 pub fn new_empty() -> Self {
164 Self {
165 sorted: false,
166 first: vec![0],
167 all: vec![vec![].into()],
168 }
169 }
170}
171
172impl FromIterator<IdxItem> for GroupsIdx {
173 fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
174 let (first, all) = iter.into_iter().unzip();
175 GroupsIdx {
176 sorted: false,
177 first,
178 all,
179 }
180 }
181}
182
183impl<'a> IntoIterator for &'a GroupsIdx {
184 type Item = BorrowIdxItem<'a>;
185 type IntoIter = std::iter::Zip<
186 std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
187 std::slice::Iter<'a, IdxVec>,
188 >;
189
190 fn into_iter(self) -> Self::IntoIter {
191 self.first.iter().copied().zip(self.all.iter())
192 }
193}
194
195impl IntoIterator for GroupsIdx {
196 type Item = IdxItem;
197 type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
198
199 fn into_iter(mut self) -> Self::IntoIter {
200 let first = std::mem::take(&mut self.first);
201 let all = std::mem::take(&mut self.all);
202 first.into_iter().zip(all)
203 }
204}
205
206impl FromParallelIterator<IdxItem> for GroupsIdx {
207 fn from_par_iter<I>(par_iter: I) -> Self
208 where
209 I: IntoParallelIterator<Item = IdxItem>,
210 {
211 let (first, all) = par_iter.into_par_iter().unzip();
212 GroupsIdx {
213 sorted: false,
214 first,
215 all,
216 }
217 }
218}
219
220impl<'a> IntoParallelIterator for &'a GroupsIdx {
221 type Iter = rayon::iter::Zip<
222 rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
223 rayon::slice::Iter<'a, IdxVec>,
224 >;
225 type Item = BorrowIdxItem<'a>;
226
227 fn into_par_iter(self) -> Self::Iter {
228 self.first.par_iter().copied().zip(self.all.par_iter())
229 }
230}
231
232impl IntoParallelIterator for GroupsIdx {
233 type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
234 type Item = IdxItem;
235
236 fn into_par_iter(mut self) -> Self::Iter {
237 let first = std::mem::take(&mut self.first);
238 let all = std::mem::take(&mut self.all);
239 first.into_par_iter().zip(all.into_par_iter())
240 }
241}
242
243pub type GroupsSlice = Vec<[IdxSize; 2]>;
251
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub enum GroupsType {
254 Idx(GroupsIdx),
255 Slice {
257 groups: GroupsSlice,
259 overlapping: bool,
262 monotonic: bool,
266 },
267}
268
269impl Default for GroupsType {
270 fn default() -> Self {
271 GroupsType::Idx(GroupsIdx::default())
272 }
273}
274
275impl GroupsType {
276 pub fn new_slice(groups: GroupsSlice, overlapping: bool, monotonic: bool) -> Self {
277 #[cfg(debug_assertions)]
278 {
279 fn groups_overlap(groups: &GroupsSlice) -> bool {
280 if groups.len() < 2 {
281 return false;
282 }
283 let mut groups = groups.clone();
284 groups.sort();
285 let mut prev_end = groups[0][1];
286
287 for g in &groups[1..] {
288 let start = g[0];
289 let end = g[1];
290 if start < prev_end {
291 return true;
292 }
293 if end > prev_end {
294 prev_end = end;
295 }
296 }
297 false
298 }
299
300 assert!(overlapping || !groups_overlap(&groups));
301
302 fn groups_are_monotonic(groups: &GroupsSlice) -> bool {
303 if groups.len() < 2 {
304 return true;
305 }
306
307 let (offset, len) = (groups[0][0], groups[0][1]);
308 let mut prev_start = offset;
309 let mut prev_end = offset + len;
310
311 for g in &groups[1..] {
312 let start = g[0];
313 let end = g[0] + g[1];
314
315 if start < prev_start || end < prev_end {
316 return false;
317 }
318
319 prev_start = start;
320 prev_end = end;
321 }
322 true
323 }
324
325 assert!(!monotonic || groups_are_monotonic(&groups));
326 }
327
328 Self::Slice {
329 groups,
330 overlapping,
331 monotonic,
332 }
333 }
334
335 pub fn into_idx(self) -> GroupsIdx {
336 match self {
337 GroupsType::Idx(groups) => groups,
338 GroupsType::Slice { groups, .. } => {
339 polars_warn!(
340 "Had to reallocate groups, missed an optimization opportunity. Please open an issue."
341 );
342 groups
343 .iter()
344 .map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
345 .collect()
346 },
347 }
348 }
349
350 pub(crate) fn prepare_list_agg(
351 &self,
352 total_len: usize,
353 ) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
354 let mut can_fast_explode = true;
355 match self {
356 GroupsType::Idx(groups) => {
357 let mut list_offset = Vec::with_capacity(self.len() + 1);
358 let mut gather_offsets = Vec::with_capacity(total_len);
359
360 let mut len_so_far = 0i64;
361 list_offset.push(len_so_far);
362
363 for idx in groups {
364 let idx = idx.1;
365 gather_offsets.extend_from_slice(idx);
366 len_so_far += idx.len() as i64;
367 list_offset.push(len_so_far);
368 can_fast_explode &= !idx.is_empty();
369 }
370 unsafe {
371 (
372 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
373 OffsetsBuffer::new_unchecked(list_offset.into()),
374 can_fast_explode,
375 )
376 }
377 },
378 GroupsType::Slice { groups, .. } => {
379 let mut list_offset = Vec::with_capacity(self.len() + 1);
380 let mut gather_offsets = Vec::with_capacity(total_len);
381 let mut len_so_far = 0i64;
382 list_offset.push(len_so_far);
383
384 for g in groups {
385 let len = g[1];
386 let offset = g[0];
387 gather_offsets.extend(offset..offset + len);
388
389 len_so_far += len as i64;
390 list_offset.push(len_so_far);
391 can_fast_explode &= len > 0;
392 }
393
394 unsafe {
395 (
396 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
397 OffsetsBuffer::new_unchecked(list_offset.into()),
398 can_fast_explode,
399 )
400 }
401 },
402 }
403 }
404
405 pub fn iter(&self) -> GroupsTypeIter<'_> {
406 GroupsTypeIter::new(self)
407 }
408
409 pub fn sort(&mut self) {
410 match self {
411 GroupsType::Idx(groups) => {
412 if !groups.is_sorted_flag() {
413 groups.sort()
414 }
415 },
416 GroupsType::Slice { .. } => {
417 },
419 }
420 }
421
422 pub(crate) fn is_sorted_flag(&self) -> bool {
423 match self {
424 GroupsType::Idx(groups) => groups.is_sorted_flag(),
425 GroupsType::Slice { .. } => true,
426 }
427 }
428
429 pub fn is_overlapping(&self) -> bool {
430 matches!(
431 self,
432 GroupsType::Slice {
433 overlapping: true,
434 ..
435 }
436 )
437 }
438
439 pub fn is_monotonic(&self) -> bool {
440 matches!(
441 self,
442 GroupsType::Slice {
443 monotonic: true,
444 ..
445 }
446 )
447 }
448
449 pub fn take_group_firsts(self) -> Vec<IdxSize> {
450 match self {
451 GroupsType::Idx(mut groups) => std::mem::take(&mut groups.first),
452 GroupsType::Slice { groups, .. } => {
453 groups.into_iter().map(|[first, _len]| first).collect()
454 },
455 }
456 }
457
458 pub fn check_lengths(self: &GroupsType, other: &GroupsType) -> PolarsResult<()> {
461 if std::ptr::eq(self, other) {
462 return Ok(());
463 }
464 polars_ensure!(self.iter().zip(other.iter()).all(|(a, b)| {
465 a.len() == b.len()
466 }), ShapeMismatch: "expressions must have matching group lengths");
467 Ok(())
468 }
469
470 pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
474 match self {
475 GroupsType::Idx(groups) => groups
476 .all
477 .iter()
478 .map(|idx| *idx.get_unchecked(idx.len() - 1))
479 .collect(),
480 GroupsType::Slice { groups, .. } => groups
481 .into_iter()
482 .map(|[first, len]| first + len - 1)
483 .collect(),
484 }
485 }
486
487 pub fn par_iter(&self) -> GroupsTypeParIter<'_> {
488 GroupsTypeParIter::new(self)
489 }
490
491 pub fn unwrap_idx(&self) -> &GroupsIdx {
497 match self {
498 GroupsType::Idx(groups) => groups,
499 GroupsType::Slice { .. } => panic!("groups are slices not index"),
500 }
501 }
502
503 pub fn unwrap_slice(&self) -> &GroupsSlice {
509 match self {
510 GroupsType::Slice { groups, .. } => groups,
511 GroupsType::Idx(_) => panic!("groups are index not slices"),
512 }
513 }
514
515 pub fn get(&self, index: usize) -> GroupsIndicator<'_> {
516 match self {
517 GroupsType::Idx(groups) => {
518 let first = groups.first[index];
519 let all = &groups.all[index];
520 GroupsIndicator::Idx((first, all))
521 },
522 GroupsType::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
523 }
524 }
525
526 pub fn idx_mut(&mut self) -> &mut GroupsIdx {
532 match self {
533 GroupsType::Idx(groups) => groups,
534 GroupsType::Slice { .. } => panic!("groups are slices not index"),
535 }
536 }
537
538 pub fn len(&self) -> usize {
539 match self {
540 GroupsType::Idx(groups) => groups.len(),
541 GroupsType::Slice { groups, .. } => groups.len(),
542 }
543 }
544
545 pub fn is_empty(&self) -> bool {
546 self.len() == 0
547 }
548
549 pub fn group_count(&self) -> IdxCa {
550 match self {
551 GroupsType::Idx(groups) => {
552 let ca: NoNull<IdxCa> = groups
553 .iter()
554 .map(|(_first, idx)| idx.len() as IdxSize)
555 .collect_trusted();
556 ca.into_inner()
557 },
558 GroupsType::Slice { groups, .. } => {
559 let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
560 ca.into_inner()
561 },
562 }
563 }
564 pub fn as_list_chunked(&self) -> ListChunked {
565 match self {
566 GroupsType::Idx(groups) => groups
567 .iter()
568 .map(|(_first, idx)| {
569 let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
570 ca.into_inner().into_series()
571 })
572 .collect_trusted(),
573 GroupsType::Slice { groups, .. } => groups
574 .iter()
575 .map(|&[first, len]| {
576 let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
577 ca.into_inner().into_series()
578 })
579 .collect_trusted(),
580 }
581 }
582
583 pub fn into_sliceable(self) -> GroupPositions {
584 let len = self.len();
585 slice_groups(Arc::new(self), 0, len)
586 }
587
588 pub fn num_elements(&self) -> usize {
589 match self {
590 GroupsType::Idx(i) => i.all().iter().map(|v| v.len()).sum(),
591 GroupsType::Slice {
592 groups,
593 overlapping: _,
594 monotonic: _,
595 } => groups.iter().map(|[_, l]| *l as usize).sum(),
596 }
597 }
598}
599
600impl From<GroupsIdx> for GroupsType {
601 fn from(groups: GroupsIdx) -> Self {
602 GroupsType::Idx(groups)
603 }
604}
605
606pub enum GroupsIndicator<'a> {
607 Idx(BorrowIdxItem<'a>),
608 Slice([IdxSize; 2]),
609}
610
611impl GroupsIndicator<'_> {
612 pub fn len(&self) -> usize {
613 match self {
614 GroupsIndicator::Idx(g) => g.1.len(),
615 GroupsIndicator::Slice([_, len]) => *len as usize,
616 }
617 }
618 pub fn first(&self) -> IdxSize {
619 match self {
620 GroupsIndicator::Idx(g) => g.0,
621 GroupsIndicator::Slice([first, _]) => *first,
622 }
623 }
624 pub fn is_empty(&self) -> bool {
625 self.len() == 0
626 }
627}
628
629pub struct GroupsTypeIter<'a> {
630 vals: &'a GroupsType,
631 len: usize,
632 idx: usize,
633}
634
635impl<'a> GroupsTypeIter<'a> {
636 fn new(vals: &'a GroupsType) -> Self {
637 let len = vals.len();
638 let idx = 0;
639 GroupsTypeIter { vals, len, idx }
640 }
641}
642
643impl<'a> Iterator for GroupsTypeIter<'a> {
644 type Item = GroupsIndicator<'a>;
645
646 fn nth(&mut self, n: usize) -> Option<Self::Item> {
647 self.idx = self.idx.saturating_add(n);
648 self.next()
649 }
650
651 fn next(&mut self) -> Option<Self::Item> {
652 if self.idx >= self.len {
653 return None;
654 }
655
656 let out = unsafe {
657 match self.vals {
658 GroupsType::Idx(groups) => {
659 let item = groups.get_unchecked(self.idx);
660 Some(GroupsIndicator::Idx(item))
661 },
662 GroupsType::Slice { groups, .. } => {
663 Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
664 },
665 }
666 };
667 self.idx += 1;
668 out
669 }
670}
671
672pub struct GroupsTypeParIter<'a> {
673 vals: &'a GroupsType,
674 len: usize,
675}
676
677impl<'a> GroupsTypeParIter<'a> {
678 fn new(vals: &'a GroupsType) -> Self {
679 let len = vals.len();
680 GroupsTypeParIter { vals, len }
681 }
682}
683
684impl<'a> ParallelIterator for GroupsTypeParIter<'a> {
685 type Item = GroupsIndicator<'a>;
686
687 fn drive_unindexed<C>(self, consumer: C) -> C::Result
688 where
689 C: UnindexedConsumer<Self::Item>,
690 {
691 (0..self.len)
692 .into_par_iter()
693 .map(|i| unsafe {
694 match self.vals {
695 GroupsType::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
696 GroupsType::Slice { groups, .. } => {
697 GroupsIndicator::Slice(*groups.get_unchecked(i))
698 },
699 }
700 })
701 .drive_unindexed(consumer)
702 }
703}
704
705#[derive(Debug)]
706pub struct GroupPositions {
707 sliced: ManuallyDrop<GroupsType>,
711 original: Arc<GroupsType>,
713 offset: i64,
714 len: usize,
715}
716
717impl Clone for GroupPositions {
718 fn clone(&self) -> Self {
719 let sliced = slice_groups_inner(&self.original, self.offset, self.len);
720
721 Self {
722 sliced,
723 original: self.original.clone(),
724 offset: self.offset,
725 len: self.len,
726 }
727 }
728}
729
730impl AsRef<GroupsType> for GroupPositions {
731 fn as_ref(&self) -> &GroupsType {
732 self.sliced.deref()
733 }
734}
735
736impl Deref for GroupPositions {
737 type Target = GroupsType;
738
739 fn deref(&self) -> &Self::Target {
740 self.sliced.deref()
741 }
742}
743
744impl Default for GroupPositions {
745 fn default() -> Self {
746 GroupsType::default().into_sliceable()
747 }
748}
749
750impl GroupPositions {
751 pub fn slice(&self, offset: i64, len: usize) -> Self {
752 let offset = self.offset + offset;
753 slice_groups(
754 self.original.clone(),
755 offset,
756 if len > self.len { self.len } else { len },
758 )
759 }
760
761 pub fn sort(&mut self) {
762 if !self.as_ref().is_sorted_flag() {
763 let original = Arc::make_mut(&mut self.original);
764 original.sort();
765
766 self.sliced = slice_groups_inner(original, self.offset, self.len);
767 }
768 }
769
770 pub fn unroll(mut self) -> GroupPositions {
771 match self.sliced.deref_mut() {
772 GroupsType::Idx(_) => self,
773 GroupsType::Slice {
774 overlapping: false, ..
775 } => self,
776 GroupsType::Slice { groups, .. } => {
777 let mut cum_offset = 0 as IdxSize;
780 let groups: Vec<_> = groups
781 .iter()
782 .map(|[_, len]| {
783 let new = [cum_offset, *len];
784 cum_offset += *len;
785 new
786 })
787 .collect();
788
789 GroupsType::new_slice(groups, false, true).into_sliceable()
790 },
791 }
792 }
793
794 pub fn as_unrolled_slice(&self) -> Option<&GroupsSlice> {
795 match &*self.sliced {
796 GroupsType::Idx(_) => None,
797 GroupsType::Slice {
798 groups: _,
799 overlapping: true,
800 monotonic: _,
801 } => None,
802 GroupsType::Slice {
803 groups,
804 overlapping: false,
805 monotonic: _,
806 } => Some(groups),
807 }
808 }
809}
810
811fn slice_groups_inner(g: &GroupsType, offset: i64, len: usize) -> ManuallyDrop<GroupsType> {
812 match g {
818 GroupsType::Idx(groups) => {
819 let first = unsafe {
820 let first = slice_slice(groups.first(), offset, len);
821 let ptr = first.as_ptr() as *mut _;
822 Vec::from_raw_parts(ptr, first.len(), first.len())
823 };
824
825 let all = unsafe {
826 let all = slice_slice(groups.all(), offset, len);
827 let ptr = all.as_ptr() as *mut _;
828 Vec::from_raw_parts(ptr, all.len(), all.len())
829 };
830 ManuallyDrop::new(GroupsType::Idx(GroupsIdx::new(
831 first,
832 all,
833 groups.is_sorted_flag(),
834 )))
835 },
836 GroupsType::Slice {
837 groups,
838 overlapping,
839 monotonic,
840 } => {
841 let groups = unsafe {
842 let groups = slice_slice(groups, offset, len);
843 let ptr = groups.as_ptr() as *mut _;
844 Vec::from_raw_parts(ptr, groups.len(), groups.len())
845 };
846
847 ManuallyDrop::new(GroupsType::new_slice(groups, *overlapping, *monotonic))
848 },
849 }
850}
851
852fn slice_groups(g: Arc<GroupsType>, offset: i64, len: usize) -> GroupPositions {
853 let sliced = slice_groups_inner(g.as_ref(), offset, len);
854
855 GroupPositions {
856 sliced,
857 original: g,
858 offset,
859 len,
860 }
861}