1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3
4use arrow::offset::OffsetsBuffer;
5use polars_utils::idx_vec::IdxVec;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::prelude::*;
8
9use crate::POOL;
10use crate::prelude::*;
11use crate::utils::{NoNull, flatten, slice_slice};
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
16pub struct GroupsIdx {
17 pub(crate) sorted: bool,
18 first: Vec<IdxSize>,
19 all: Vec<IdxVec>,
20}
21
22pub type IdxItem = (IdxSize, IdxVec);
23pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
24
25impl Drop for GroupsIdx {
26 fn drop(&mut self) {
27 let v = std::mem::take(&mut self.all);
28 #[cfg(not(target_family = "wasm"))]
31 if v.len() > 1 << 16 {
32 std::thread::spawn(move || drop(v));
33 } else {
34 drop(v);
35 }
36
37 #[cfg(target_family = "wasm")]
38 drop(v);
39 }
40}
41
42impl From<Vec<IdxItem>> for GroupsIdx {
43 fn from(v: Vec<IdxItem>) -> Self {
44 v.into_iter().collect()
45 }
46}
47
48impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
49 fn from(v: Vec<Vec<IdxItem>>) -> Self {
50 let (cap, offsets) = flatten::cap_and_offsets(&v);
53 let mut first = Vec::with_capacity(cap);
54 let first_ptr = first.as_ptr() as usize;
55 let mut all = Vec::with_capacity(cap);
56 let all_ptr = all.as_ptr() as usize;
57
58 POOL.install(|| {
59 v.into_par_iter()
60 .zip(offsets)
61 .for_each(|(mut inner, offset)| {
62 unsafe {
63 let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
64 let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
65
66 let inner_ptr = inner.as_mut_ptr();
67 for i in 0..inner.len() {
68 let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
69 std::ptr::write(first.add(i), first_val);
70 std::ptr::write(all.add(i), vals);
71 }
72 inner.set_len(0);
75 }
76 });
77 });
78 unsafe {
79 all.set_len(cap);
80 first.set_len(cap);
81 }
82 GroupsIdx {
83 sorted: false,
84 first,
85 all,
86 }
87 }
88}
89
90impl GroupsIdx {
91 pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted: bool) -> Self {
92 Self { sorted, first, all }
93 }
94
95 pub fn sort(&mut self) {
96 if self.sorted {
97 return;
98 }
99 let mut idx = 0;
100 let first = std::mem::take(&mut self.first);
101 let mut idx_vals = first
103 .into_iter()
104 .map(|v| {
105 let out = [idx, v];
106 idx += 1;
107 out
108 })
109 .collect_trusted::<Vec<_>>();
110 idx_vals.sort_unstable_by_key(|v| v[1]);
111
112 let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
113 let take_all = || {
114 idx_vals
115 .iter()
116 .map(|v| unsafe {
117 let idx = v[0] as usize;
118 std::mem::take(self.all.get_unchecked_mut(idx))
119 })
120 .collect_trusted::<Vec<_>>()
121 };
122 let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
123 self.first = first;
124 self.all = all;
125 self.sorted = true
126 }
127 pub fn is_sorted_flag(&self) -> bool {
128 self.sorted
129 }
130
131 pub fn iter(
132 &self,
133 ) -> std::iter::Zip<
134 std::iter::Copied<std::slice::Iter<'_, IdxSize>>,
135 std::slice::Iter<'_, IdxVec>,
136 > {
137 self.into_iter()
138 }
139
140 pub fn all(&self) -> &[IdxVec] {
141 &self.all
142 }
143
144 pub fn first(&self) -> &[IdxSize] {
145 &self.first
146 }
147
148 pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
149 &mut self.first
150 }
151
152 pub(crate) fn len(&self) -> usize {
153 self.first.len()
154 }
155
156 pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem<'_> {
157 let first = *self.first.get_unchecked(index);
158 let all = self.all.get_unchecked(index);
159 (first, all)
160 }
161
162 pub fn new_empty() -> Self {
164 Self {
165 sorted: false,
166 first: vec![0],
167 all: vec![vec![].into()],
168 }
169 }
170}
171
172impl FromIterator<IdxItem> for GroupsIdx {
173 fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
174 let (first, all) = iter.into_iter().unzip();
175 GroupsIdx {
176 sorted: false,
177 first,
178 all,
179 }
180 }
181}
182
183impl<'a> IntoIterator for &'a GroupsIdx {
184 type Item = BorrowIdxItem<'a>;
185 type IntoIter = std::iter::Zip<
186 std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
187 std::slice::Iter<'a, IdxVec>,
188 >;
189
190 fn into_iter(self) -> Self::IntoIter {
191 self.first.iter().copied().zip(self.all.iter())
192 }
193}
194
195impl IntoIterator for GroupsIdx {
196 type Item = IdxItem;
197 type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
198
199 fn into_iter(mut self) -> Self::IntoIter {
200 let first = std::mem::take(&mut self.first);
201 let all = std::mem::take(&mut self.all);
202 first.into_iter().zip(all)
203 }
204}
205
206impl FromParallelIterator<IdxItem> for GroupsIdx {
207 fn from_par_iter<I>(par_iter: I) -> Self
208 where
209 I: IntoParallelIterator<Item = IdxItem>,
210 {
211 let (first, all) = par_iter.into_par_iter().unzip();
212 GroupsIdx {
213 sorted: false,
214 first,
215 all,
216 }
217 }
218}
219
220impl<'a> IntoParallelIterator for &'a GroupsIdx {
221 type Iter = rayon::iter::Zip<
222 rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
223 rayon::slice::Iter<'a, IdxVec>,
224 >;
225 type Item = BorrowIdxItem<'a>;
226
227 fn into_par_iter(self) -> Self::Iter {
228 self.first.par_iter().copied().zip(self.all.par_iter())
229 }
230}
231
232impl IntoParallelIterator for GroupsIdx {
233 type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
234 type Item = IdxItem;
235
236 fn into_par_iter(mut self) -> Self::Iter {
237 let first = std::mem::take(&mut self.first);
238 let all = std::mem::take(&mut self.all);
239 first.into_par_iter().zip(all.into_par_iter())
240 }
241}
242
243pub type GroupsSlice = Vec<[IdxSize; 2]>;
251
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub enum GroupsType {
254 Idx(GroupsIdx),
255 Slice {
257 groups: GroupsSlice,
259 overlapping: bool,
264 },
265}
266
267impl Default for GroupsType {
268 fn default() -> Self {
269 GroupsType::Idx(GroupsIdx::default())
270 }
271}
272
273impl GroupsType {
274 pub fn into_idx(self) -> GroupsIdx {
275 match self {
276 GroupsType::Idx(groups) => groups,
277 GroupsType::Slice { groups, .. } => {
278 polars_warn!(
279 "Had to reallocate groups, missed an optimization opportunity. Please open an issue."
280 );
281 groups
282 .iter()
283 .map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
284 .collect()
285 },
286 }
287 }
288
289 pub(crate) fn prepare_list_agg(
290 &self,
291 total_len: usize,
292 ) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
293 let mut can_fast_explode = true;
294 match self {
295 GroupsType::Idx(groups) => {
296 let mut list_offset = Vec::with_capacity(self.len() + 1);
297 let mut gather_offsets = Vec::with_capacity(total_len);
298
299 let mut len_so_far = 0i64;
300 list_offset.push(len_so_far);
301
302 for idx in groups {
303 let idx = idx.1;
304 gather_offsets.extend_from_slice(idx);
305 len_so_far += idx.len() as i64;
306 list_offset.push(len_so_far);
307 can_fast_explode &= !idx.is_empty();
308 }
309 unsafe {
310 (
311 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
312 OffsetsBuffer::new_unchecked(list_offset.into()),
313 can_fast_explode,
314 )
315 }
316 },
317 GroupsType::Slice { groups, .. } => {
318 let mut list_offset = Vec::with_capacity(self.len() + 1);
319 let mut gather_offsets = Vec::with_capacity(total_len);
320 let mut len_so_far = 0i64;
321 list_offset.push(len_so_far);
322
323 for g in groups {
324 let len = g[1];
325 let offset = g[0];
326 gather_offsets.extend(offset..offset + len);
327
328 len_so_far += len as i64;
329 list_offset.push(len_so_far);
330 can_fast_explode &= len > 0;
331 }
332
333 unsafe {
334 (
335 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
336 OffsetsBuffer::new_unchecked(list_offset.into()),
337 can_fast_explode,
338 )
339 }
340 },
341 }
342 }
343
344 pub fn iter(&self) -> GroupsTypeIter<'_> {
345 GroupsTypeIter::new(self)
346 }
347
348 pub fn sort(&mut self) {
349 match self {
350 GroupsType::Idx(groups) => {
351 if !groups.is_sorted_flag() {
352 groups.sort()
353 }
354 },
355 GroupsType::Slice { .. } => {
356 },
358 }
359 }
360
361 pub(crate) fn is_sorted_flag(&self) -> bool {
362 match self {
363 GroupsType::Idx(groups) => groups.is_sorted_flag(),
364 GroupsType::Slice { .. } => true,
365 }
366 }
367
368 pub fn is_overlapping(&self) -> bool {
369 matches!(
370 self,
371 GroupsType::Slice {
372 overlapping: true,
373 ..
374 }
375 )
376 }
377
378 pub fn take_group_firsts(self) -> Vec<IdxSize> {
379 match self {
380 GroupsType::Idx(mut groups) => std::mem::take(&mut groups.first),
381 GroupsType::Slice { groups, .. } => {
382 groups.into_iter().map(|[first, _len]| first).collect()
383 },
384 }
385 }
386
387 pub fn check_lengths(self: &GroupsType, other: &GroupsType) -> PolarsResult<()> {
390 if std::ptr::eq(self, other) {
391 return Ok(());
392 }
393 polars_ensure!(self.iter().zip(other.iter()).all(|(a, b)| {
394 a.len() == b.len()
395 }), ShapeMismatch: "expressions must have matching group lengths");
396 Ok(())
397 }
398
399 pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
403 match self {
404 GroupsType::Idx(groups) => groups
405 .all
406 .iter()
407 .map(|idx| *idx.get_unchecked(idx.len() - 1))
408 .collect(),
409 GroupsType::Slice { groups, .. } => groups
410 .into_iter()
411 .map(|[first, len]| first + len - 1)
412 .collect(),
413 }
414 }
415
416 pub fn par_iter(&self) -> GroupsTypeParIter<'_> {
417 GroupsTypeParIter::new(self)
418 }
419
420 pub fn unwrap_idx(&self) -> &GroupsIdx {
426 match self {
427 GroupsType::Idx(groups) => groups,
428 GroupsType::Slice { .. } => panic!("groups are slices not index"),
429 }
430 }
431
432 pub fn unwrap_slice(&self) -> &GroupsSlice {
438 match self {
439 GroupsType::Slice { groups, .. } => groups,
440 GroupsType::Idx(_) => panic!("groups are index not slices"),
441 }
442 }
443
444 pub fn get(&self, index: usize) -> GroupsIndicator<'_> {
445 match self {
446 GroupsType::Idx(groups) => {
447 let first = groups.first[index];
448 let all = &groups.all[index];
449 GroupsIndicator::Idx((first, all))
450 },
451 GroupsType::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
452 }
453 }
454
455 pub fn idx_mut(&mut self) -> &mut GroupsIdx {
461 match self {
462 GroupsType::Idx(groups) => groups,
463 GroupsType::Slice { .. } => panic!("groups are slices not index"),
464 }
465 }
466
467 pub fn len(&self) -> usize {
468 match self {
469 GroupsType::Idx(groups) => groups.len(),
470 GroupsType::Slice { groups, .. } => groups.len(),
471 }
472 }
473
474 pub fn is_empty(&self) -> bool {
475 self.len() == 0
476 }
477
478 pub fn group_count(&self) -> IdxCa {
479 match self {
480 GroupsType::Idx(groups) => {
481 let ca: NoNull<IdxCa> = groups
482 .iter()
483 .map(|(_first, idx)| idx.len() as IdxSize)
484 .collect_trusted();
485 ca.into_inner()
486 },
487 GroupsType::Slice { groups, .. } => {
488 let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
489 ca.into_inner()
490 },
491 }
492 }
493 pub fn as_list_chunked(&self) -> ListChunked {
494 match self {
495 GroupsType::Idx(groups) => groups
496 .iter()
497 .map(|(_first, idx)| {
498 let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
499 ca.into_inner().into_series()
500 })
501 .collect_trusted(),
502 GroupsType::Slice { groups, .. } => groups
503 .iter()
504 .map(|&[first, len]| {
505 let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
506 ca.into_inner().into_series()
507 })
508 .collect_trusted(),
509 }
510 }
511
512 pub fn into_sliceable(self) -> GroupPositions {
513 let len = self.len();
514 slice_groups(Arc::new(self), 0, len)
515 }
516}
517
518impl From<GroupsIdx> for GroupsType {
519 fn from(groups: GroupsIdx) -> Self {
520 GroupsType::Idx(groups)
521 }
522}
523
524pub enum GroupsIndicator<'a> {
525 Idx(BorrowIdxItem<'a>),
526 Slice([IdxSize; 2]),
527}
528
529impl GroupsIndicator<'_> {
530 pub fn len(&self) -> usize {
531 match self {
532 GroupsIndicator::Idx(g) => g.1.len(),
533 GroupsIndicator::Slice([_, len]) => *len as usize,
534 }
535 }
536 pub fn first(&self) -> IdxSize {
537 match self {
538 GroupsIndicator::Idx(g) => g.0,
539 GroupsIndicator::Slice([first, _]) => *first,
540 }
541 }
542 pub fn is_empty(&self) -> bool {
543 self.len() == 0
544 }
545}
546
547pub struct GroupsTypeIter<'a> {
548 vals: &'a GroupsType,
549 len: usize,
550 idx: usize,
551}
552
553impl<'a> GroupsTypeIter<'a> {
554 fn new(vals: &'a GroupsType) -> Self {
555 let len = vals.len();
556 let idx = 0;
557 GroupsTypeIter { vals, len, idx }
558 }
559}
560
561impl<'a> Iterator for GroupsTypeIter<'a> {
562 type Item = GroupsIndicator<'a>;
563
564 fn nth(&mut self, n: usize) -> Option<Self::Item> {
565 self.idx = self.idx.saturating_add(n);
566 self.next()
567 }
568
569 fn next(&mut self) -> Option<Self::Item> {
570 if self.idx >= self.len {
571 return None;
572 }
573
574 let out = unsafe {
575 match self.vals {
576 GroupsType::Idx(groups) => {
577 let item = groups.get_unchecked(self.idx);
578 Some(GroupsIndicator::Idx(item))
579 },
580 GroupsType::Slice { groups, .. } => {
581 Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
582 },
583 }
584 };
585 self.idx += 1;
586 out
587 }
588}
589
590pub struct GroupsTypeParIter<'a> {
591 vals: &'a GroupsType,
592 len: usize,
593}
594
595impl<'a> GroupsTypeParIter<'a> {
596 fn new(vals: &'a GroupsType) -> Self {
597 let len = vals.len();
598 GroupsTypeParIter { vals, len }
599 }
600}
601
602impl<'a> ParallelIterator for GroupsTypeParIter<'a> {
603 type Item = GroupsIndicator<'a>;
604
605 fn drive_unindexed<C>(self, consumer: C) -> C::Result
606 where
607 C: UnindexedConsumer<Self::Item>,
608 {
609 (0..self.len)
610 .into_par_iter()
611 .map(|i| unsafe {
612 match self.vals {
613 GroupsType::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
614 GroupsType::Slice { groups, .. } => {
615 GroupsIndicator::Slice(*groups.get_unchecked(i))
616 },
617 }
618 })
619 .drive_unindexed(consumer)
620 }
621}
622
623#[derive(Debug)]
624pub struct GroupPositions {
625 sliced: ManuallyDrop<GroupsType>,
629 original: Arc<GroupsType>,
631 offset: i64,
632 len: usize,
633}
634
635impl Clone for GroupPositions {
636 fn clone(&self) -> Self {
637 let sliced = slice_groups_inner(&self.original, self.offset, self.len);
638
639 Self {
640 sliced,
641 original: self.original.clone(),
642 offset: self.offset,
643 len: self.len,
644 }
645 }
646}
647
648impl AsRef<GroupsType> for GroupPositions {
649 fn as_ref(&self) -> &GroupsType {
650 self.sliced.deref()
651 }
652}
653
654impl Deref for GroupPositions {
655 type Target = GroupsType;
656
657 fn deref(&self) -> &Self::Target {
658 self.sliced.deref()
659 }
660}
661
662impl Default for GroupPositions {
663 fn default() -> Self {
664 GroupsType::default().into_sliceable()
665 }
666}
667
668impl GroupPositions {
669 pub fn slice(&self, offset: i64, len: usize) -> Self {
670 let offset = self.offset + offset;
671 slice_groups(
672 self.original.clone(),
673 offset,
674 if len > self.len { self.len } else { len },
676 )
677 }
678
679 pub fn sort(&mut self) {
680 if !self.as_ref().is_sorted_flag() {
681 let original = Arc::make_mut(&mut self.original);
682 original.sort();
683
684 self.sliced = slice_groups_inner(original, self.offset, self.len);
685 }
686 }
687
688 pub fn unroll(mut self) -> GroupPositions {
689 match self.sliced.deref_mut() {
690 GroupsType::Idx(_) => self,
691 GroupsType::Slice {
692 overlapping: false, ..
693 } => self,
694 GroupsType::Slice { groups, .. } => {
695 let mut cum_offset = 0 as IdxSize;
698 let groups: Vec<_> = groups
699 .iter()
700 .map(|[_, len]| {
701 let new = [cum_offset, *len];
702 cum_offset += *len;
703 new
704 })
705 .collect();
706
707 GroupsType::Slice {
708 groups,
709 overlapping: false,
710 }
711 .into_sliceable()
712 },
713 }
714 }
715
716 pub fn as_unrolled_slice(&self) -> Option<&GroupsSlice> {
717 match &*self.sliced {
718 GroupsType::Idx(_) => None,
719 GroupsType::Slice {
720 overlapping: true, ..
721 } => None,
722 GroupsType::Slice {
723 groups,
724 overlapping: false,
725 } => Some(groups),
726 }
727 }
728}
729
730fn slice_groups_inner(g: &GroupsType, offset: i64, len: usize) -> ManuallyDrop<GroupsType> {
731 match g {
737 GroupsType::Idx(groups) => {
738 let first = unsafe {
739 let first = slice_slice(groups.first(), offset, len);
740 let ptr = first.as_ptr() as *mut _;
741 Vec::from_raw_parts(ptr, first.len(), first.len())
742 };
743
744 let all = unsafe {
745 let all = slice_slice(groups.all(), offset, len);
746 let ptr = all.as_ptr() as *mut _;
747 Vec::from_raw_parts(ptr, all.len(), all.len())
748 };
749 ManuallyDrop::new(GroupsType::Idx(GroupsIdx::new(
750 first,
751 all,
752 groups.is_sorted_flag(),
753 )))
754 },
755 GroupsType::Slice {
756 groups,
757 overlapping,
758 } => {
759 let groups = unsafe {
760 let groups = slice_slice(groups, offset, len);
761 let ptr = groups.as_ptr() as *mut _;
762 Vec::from_raw_parts(ptr, groups.len(), groups.len())
763 };
764
765 ManuallyDrop::new(GroupsType::Slice {
766 groups,
767 overlapping: *overlapping,
768 })
769 },
770 }
771}
772
773fn slice_groups(g: Arc<GroupsType>, offset: i64, len: usize) -> GroupPositions {
774 let sliced = slice_groups_inner(g.as_ref(), offset, len);
775
776 GroupPositions {
777 sliced,
778 original: g,
779 offset,
780 len,
781 }
782}