1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3
4use arrow::offset::OffsetsBuffer;
5use polars_utils::idx_vec::IdxVec;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::prelude::*;
8
9use crate::POOL;
10use crate::prelude::*;
11use crate::utils::{NoNull, flatten, slice_slice};
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
16pub struct GroupsIdx {
17 pub(crate) sorted: bool,
18 first: Vec<IdxSize>,
19 all: Vec<IdxVec>,
20}
21
22pub type IdxItem = (IdxSize, IdxVec);
23pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
24
25impl Drop for GroupsIdx {
26 fn drop(&mut self) {
27 let v = std::mem::take(&mut self.all);
28 #[cfg(not(target_family = "wasm"))]
31 if v.len() > 1 << 16 {
32 std::thread::spawn(move || drop(v));
33 } else {
34 drop(v);
35 }
36
37 #[cfg(target_family = "wasm")]
38 drop(v);
39 }
40}
41
42impl From<Vec<IdxItem>> for GroupsIdx {
43 fn from(v: Vec<IdxItem>) -> Self {
44 v.into_iter().collect()
45 }
46}
47
48impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
49 fn from(v: Vec<Vec<IdxItem>>) -> Self {
50 let (cap, offsets) = flatten::cap_and_offsets(&v);
53 let mut first = Vec::with_capacity(cap);
54 let first_ptr = first.as_ptr() as usize;
55 let mut all = Vec::with_capacity(cap);
56 let all_ptr = all.as_ptr() as usize;
57
58 POOL.install(|| {
59 v.into_par_iter()
60 .zip(offsets)
61 .for_each(|(mut inner, offset)| {
62 unsafe {
63 let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
64 let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
65
66 let inner_ptr = inner.as_mut_ptr();
67 for i in 0..inner.len() {
68 let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
69 std::ptr::write(first.add(i), first_val);
70 std::ptr::write(all.add(i), vals);
71 }
72 inner.set_len(0);
75 }
76 });
77 });
78 unsafe {
79 all.set_len(cap);
80 first.set_len(cap);
81 }
82 GroupsIdx {
83 sorted: false,
84 first,
85 all,
86 }
87 }
88}
89
90impl GroupsIdx {
91 pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted: bool) -> Self {
92 Self { sorted, first, all }
93 }
94
95 pub fn sort(&mut self) {
96 if self.sorted {
97 return;
98 }
99 let mut idx = 0;
100 let first = std::mem::take(&mut self.first);
101 let mut idx_vals = first
103 .into_iter()
104 .map(|v| {
105 let out = [idx, v];
106 idx += 1;
107 out
108 })
109 .collect_trusted::<Vec<_>>();
110 idx_vals.sort_unstable_by_key(|v| v[1]);
111
112 let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
113 let take_all = || {
114 idx_vals
115 .iter()
116 .map(|v| unsafe {
117 let idx = v[0] as usize;
118 std::mem::take(self.all.get_unchecked_mut(idx))
119 })
120 .collect_trusted::<Vec<_>>()
121 };
122 let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
123 self.first = first;
124 self.all = all;
125 self.sorted = true
126 }
127 pub fn is_sorted_flag(&self) -> bool {
128 self.sorted
129 }
130
131 pub fn iter(
132 &self,
133 ) -> std::iter::Zip<
134 std::iter::Copied<std::slice::Iter<'_, IdxSize>>,
135 std::slice::Iter<'_, IdxVec>,
136 > {
137 self.into_iter()
138 }
139
140 pub fn all(&self) -> &[IdxVec] {
141 &self.all
142 }
143
144 pub fn first(&self) -> &[IdxSize] {
145 &self.first
146 }
147
148 pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
149 &mut self.first
150 }
151
152 pub(crate) fn len(&self) -> usize {
153 self.first.len()
154 }
155
156 pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem<'_> {
157 let first = *self.first.get_unchecked(index);
158 let all = self.all.get_unchecked(index);
159 (first, all)
160 }
161
162 pub fn new_empty() -> Self {
164 Self {
165 sorted: false,
166 first: vec![0],
167 all: vec![vec![].into()],
168 }
169 }
170}
171
172impl FromIterator<IdxItem> for GroupsIdx {
173 fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
174 let (first, all) = iter.into_iter().unzip();
175 GroupsIdx {
176 sorted: false,
177 first,
178 all,
179 }
180 }
181}
182
183impl<'a> IntoIterator for &'a GroupsIdx {
184 type Item = BorrowIdxItem<'a>;
185 type IntoIter = std::iter::Zip<
186 std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
187 std::slice::Iter<'a, IdxVec>,
188 >;
189
190 fn into_iter(self) -> Self::IntoIter {
191 self.first.iter().copied().zip(self.all.iter())
192 }
193}
194
195impl IntoIterator for GroupsIdx {
196 type Item = IdxItem;
197 type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
198
199 fn into_iter(mut self) -> Self::IntoIter {
200 let first = std::mem::take(&mut self.first);
201 let all = std::mem::take(&mut self.all);
202 first.into_iter().zip(all)
203 }
204}
205
206impl FromParallelIterator<IdxItem> for GroupsIdx {
207 fn from_par_iter<I>(par_iter: I) -> Self
208 where
209 I: IntoParallelIterator<Item = IdxItem>,
210 {
211 let (first, all) = par_iter.into_par_iter().unzip();
212 GroupsIdx {
213 sorted: false,
214 first,
215 all,
216 }
217 }
218}
219
220impl<'a> IntoParallelIterator for &'a GroupsIdx {
221 type Iter = rayon::iter::Zip<
222 rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
223 rayon::slice::Iter<'a, IdxVec>,
224 >;
225 type Item = BorrowIdxItem<'a>;
226
227 fn into_par_iter(self) -> Self::Iter {
228 self.first.par_iter().copied().zip(self.all.par_iter())
229 }
230}
231
232impl IntoParallelIterator for GroupsIdx {
233 type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
234 type Item = IdxItem;
235
236 fn into_par_iter(mut self) -> Self::Iter {
237 let first = std::mem::take(&mut self.first);
238 let all = std::mem::take(&mut self.all);
239 first.into_par_iter().zip(all.into_par_iter())
240 }
241}
242
243pub type GroupsSlice = Vec<[IdxSize; 2]>;
251
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub enum GroupsType {
254 Idx(GroupsIdx),
255 Slice {
257 groups: GroupsSlice,
259 rolling: bool,
261 },
262}
263
264impl Default for GroupsType {
265 fn default() -> Self {
266 GroupsType::Idx(GroupsIdx::default())
267 }
268}
269
270impl GroupsType {
271 pub fn into_idx(self) -> GroupsIdx {
272 match self {
273 GroupsType::Idx(groups) => groups,
274 GroupsType::Slice { groups, .. } => {
275 polars_warn!(
276 "Had to reallocate groups, missed an optimization opportunity. Please open an issue."
277 );
278 groups
279 .iter()
280 .map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
281 .collect()
282 },
283 }
284 }
285
286 pub(crate) fn prepare_list_agg(
287 &self,
288 total_len: usize,
289 ) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
290 let mut can_fast_explode = true;
291 match self {
292 GroupsType::Idx(groups) => {
293 let mut list_offset = Vec::with_capacity(self.len() + 1);
294 let mut gather_offsets = Vec::with_capacity(total_len);
295
296 let mut len_so_far = 0i64;
297 list_offset.push(len_so_far);
298
299 for idx in groups {
300 let idx = idx.1;
301 gather_offsets.extend_from_slice(idx);
302 len_so_far += idx.len() as i64;
303 list_offset.push(len_so_far);
304 can_fast_explode &= !idx.is_empty();
305 }
306 unsafe {
307 (
308 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
309 OffsetsBuffer::new_unchecked(list_offset.into()),
310 can_fast_explode,
311 )
312 }
313 },
314 GroupsType::Slice { groups, .. } => {
315 let mut list_offset = Vec::with_capacity(self.len() + 1);
316 let mut gather_offsets = Vec::with_capacity(total_len);
317 let mut len_so_far = 0i64;
318 list_offset.push(len_so_far);
319
320 for g in groups {
321 let len = g[1];
322 let offset = g[0];
323 gather_offsets.extend(offset..offset + len);
324
325 len_so_far += len as i64;
326 list_offset.push(len_so_far);
327 can_fast_explode &= len > 0;
328 }
329
330 unsafe {
331 (
332 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
333 OffsetsBuffer::new_unchecked(list_offset.into()),
334 can_fast_explode,
335 )
336 }
337 },
338 }
339 }
340
341 pub fn iter(&self) -> GroupsTypeIter<'_> {
342 GroupsTypeIter::new(self)
343 }
344
345 pub fn sort(&mut self) {
346 match self {
347 GroupsType::Idx(groups) => {
348 if !groups.is_sorted_flag() {
349 groups.sort()
350 }
351 },
352 GroupsType::Slice { .. } => {
353 },
355 }
356 }
357
358 pub(crate) fn is_sorted_flag(&self) -> bool {
359 match self {
360 GroupsType::Idx(groups) => groups.is_sorted_flag(),
361 GroupsType::Slice { .. } => true,
362 }
363 }
364
365 pub fn take_group_firsts(self) -> Vec<IdxSize> {
366 match self {
367 GroupsType::Idx(mut groups) => std::mem::take(&mut groups.first),
368 GroupsType::Slice { groups, .. } => {
369 groups.into_iter().map(|[first, _len]| first).collect()
370 },
371 }
372 }
373
374 pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
378 match self {
379 GroupsType::Idx(groups) => groups
380 .all
381 .iter()
382 .map(|idx| *idx.get_unchecked(idx.len() - 1))
383 .collect(),
384 GroupsType::Slice { groups, .. } => groups
385 .into_iter()
386 .map(|[first, len]| first + len - 1)
387 .collect(),
388 }
389 }
390
391 pub fn par_iter(&self) -> GroupsTypeParIter<'_> {
392 GroupsTypeParIter::new(self)
393 }
394
395 pub fn unwrap_idx(&self) -> &GroupsIdx {
401 match self {
402 GroupsType::Idx(groups) => groups,
403 GroupsType::Slice { .. } => panic!("groups are slices not index"),
404 }
405 }
406
407 pub fn unwrap_slice(&self) -> &GroupsSlice {
413 match self {
414 GroupsType::Slice { groups, .. } => groups,
415 GroupsType::Idx(_) => panic!("groups are index not slices"),
416 }
417 }
418
419 pub fn get(&self, index: usize) -> GroupsIndicator<'_> {
420 match self {
421 GroupsType::Idx(groups) => {
422 let first = groups.first[index];
423 let all = &groups.all[index];
424 GroupsIndicator::Idx((first, all))
425 },
426 GroupsType::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
427 }
428 }
429
430 pub fn idx_mut(&mut self) -> &mut GroupsIdx {
436 match self {
437 GroupsType::Idx(groups) => groups,
438 GroupsType::Slice { .. } => panic!("groups are slices not index"),
439 }
440 }
441
442 pub fn len(&self) -> usize {
443 match self {
444 GroupsType::Idx(groups) => groups.len(),
445 GroupsType::Slice { groups, .. } => groups.len(),
446 }
447 }
448
449 pub fn is_empty(&self) -> bool {
450 self.len() == 0
451 }
452
453 pub fn group_count(&self) -> IdxCa {
454 match self {
455 GroupsType::Idx(groups) => {
456 let ca: NoNull<IdxCa> = groups
457 .iter()
458 .map(|(_first, idx)| idx.len() as IdxSize)
459 .collect_trusted();
460 ca.into_inner()
461 },
462 GroupsType::Slice { groups, .. } => {
463 let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
464 ca.into_inner()
465 },
466 }
467 }
468 pub fn as_list_chunked(&self) -> ListChunked {
469 match self {
470 GroupsType::Idx(groups) => groups
471 .iter()
472 .map(|(_first, idx)| {
473 let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
474 ca.into_inner().into_series()
475 })
476 .collect_trusted(),
477 GroupsType::Slice { groups, .. } => groups
478 .iter()
479 .map(|&[first, len]| {
480 let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
481 ca.into_inner().into_series()
482 })
483 .collect_trusted(),
484 }
485 }
486
487 pub fn into_sliceable(self) -> GroupPositions {
488 let len = self.len();
489 slice_groups(Arc::new(self), 0, len)
490 }
491}
492
493impl From<GroupsIdx> for GroupsType {
494 fn from(groups: GroupsIdx) -> Self {
495 GroupsType::Idx(groups)
496 }
497}
498
499pub enum GroupsIndicator<'a> {
500 Idx(BorrowIdxItem<'a>),
501 Slice([IdxSize; 2]),
502}
503
504impl GroupsIndicator<'_> {
505 pub fn len(&self) -> usize {
506 match self {
507 GroupsIndicator::Idx(g) => g.1.len(),
508 GroupsIndicator::Slice([_, len]) => *len as usize,
509 }
510 }
511 pub fn first(&self) -> IdxSize {
512 match self {
513 GroupsIndicator::Idx(g) => g.0,
514 GroupsIndicator::Slice([first, _]) => *first,
515 }
516 }
517 pub fn is_empty(&self) -> bool {
518 self.len() == 0
519 }
520}
521
522pub struct GroupsTypeIter<'a> {
523 vals: &'a GroupsType,
524 len: usize,
525 idx: usize,
526}
527
528impl<'a> GroupsTypeIter<'a> {
529 fn new(vals: &'a GroupsType) -> Self {
530 let len = vals.len();
531 let idx = 0;
532 GroupsTypeIter { vals, len, idx }
533 }
534}
535
536impl<'a> Iterator for GroupsTypeIter<'a> {
537 type Item = GroupsIndicator<'a>;
538
539 fn nth(&mut self, n: usize) -> Option<Self::Item> {
540 self.idx = self.idx.saturating_add(n);
541 self.next()
542 }
543
544 fn next(&mut self) -> Option<Self::Item> {
545 if self.idx >= self.len {
546 return None;
547 }
548
549 let out = unsafe {
550 match self.vals {
551 GroupsType::Idx(groups) => {
552 let item = groups.get_unchecked(self.idx);
553 Some(GroupsIndicator::Idx(item))
554 },
555 GroupsType::Slice { groups, .. } => {
556 Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
557 },
558 }
559 };
560 self.idx += 1;
561 out
562 }
563}
564
565pub struct GroupsTypeParIter<'a> {
566 vals: &'a GroupsType,
567 len: usize,
568}
569
570impl<'a> GroupsTypeParIter<'a> {
571 fn new(vals: &'a GroupsType) -> Self {
572 let len = vals.len();
573 GroupsTypeParIter { vals, len }
574 }
575}
576
577impl<'a> ParallelIterator for GroupsTypeParIter<'a> {
578 type Item = GroupsIndicator<'a>;
579
580 fn drive_unindexed<C>(self, consumer: C) -> C::Result
581 where
582 C: UnindexedConsumer<Self::Item>,
583 {
584 (0..self.len)
585 .into_par_iter()
586 .map(|i| unsafe {
587 match self.vals {
588 GroupsType::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
589 GroupsType::Slice { groups, .. } => {
590 GroupsIndicator::Slice(*groups.get_unchecked(i))
591 },
592 }
593 })
594 .drive_unindexed(consumer)
595 }
596}
597
598#[derive(Debug)]
599pub struct GroupPositions {
600 sliced: ManuallyDrop<GroupsType>,
604 original: Arc<GroupsType>,
606 offset: i64,
607 len: usize,
608}
609
610impl Clone for GroupPositions {
611 fn clone(&self) -> Self {
612 let sliced = slice_groups_inner(&self.original, self.offset, self.len);
613
614 Self {
615 sliced,
616 original: self.original.clone(),
617 offset: self.offset,
618 len: self.len,
619 }
620 }
621}
622
623impl PartialEq for GroupPositions {
624 fn eq(&self, other: &Self) -> bool {
625 self.offset == other.offset && self.len == other.len && self.sliced == other.sliced
626 }
627}
628
629impl AsRef<GroupsType> for GroupPositions {
630 fn as_ref(&self) -> &GroupsType {
631 self.sliced.deref()
632 }
633}
634
635impl Deref for GroupPositions {
636 type Target = GroupsType;
637
638 fn deref(&self) -> &Self::Target {
639 self.sliced.deref()
640 }
641}
642
643impl Default for GroupPositions {
644 fn default() -> Self {
645 GroupsType::default().into_sliceable()
646 }
647}
648
649impl GroupPositions {
650 pub fn slice(&self, offset: i64, len: usize) -> Self {
651 let offset = self.offset + offset;
652 slice_groups(
653 self.original.clone(),
654 offset,
655 if len > self.len { self.len } else { len },
657 )
658 }
659
660 pub fn sort(&mut self) {
661 if !self.as_ref().is_sorted_flag() {
662 let original = Arc::make_mut(&mut self.original);
663 original.sort();
664
665 self.sliced = slice_groups_inner(original, self.offset, self.len);
666 }
667 }
668
669 pub fn unroll(mut self) -> GroupPositions {
670 match self.sliced.deref_mut() {
671 GroupsType::Idx(_) => self,
672 GroupsType::Slice { rolling: false, .. } => self,
673 GroupsType::Slice { groups, .. } => {
674 let mut cum_offset = 0 as IdxSize;
677 let groups: Vec<_> = groups
678 .iter()
679 .map(|[_, len]| {
680 let new = [cum_offset, *len];
681 cum_offset += *len;
682 new
683 })
684 .collect();
685
686 GroupsType::Slice {
687 groups,
688 rolling: false,
689 }
690 .into_sliceable()
691 },
692 }
693 }
694}
695
696fn slice_groups_inner(g: &GroupsType, offset: i64, len: usize) -> ManuallyDrop<GroupsType> {
697 match g {
703 GroupsType::Idx(groups) => {
704 let first = unsafe {
705 let first = slice_slice(groups.first(), offset, len);
706 let ptr = first.as_ptr() as *mut _;
707 Vec::from_raw_parts(ptr, first.len(), first.len())
708 };
709
710 let all = unsafe {
711 let all = slice_slice(groups.all(), offset, len);
712 let ptr = all.as_ptr() as *mut _;
713 Vec::from_raw_parts(ptr, all.len(), all.len())
714 };
715 ManuallyDrop::new(GroupsType::Idx(GroupsIdx::new(
716 first,
717 all,
718 groups.is_sorted_flag(),
719 )))
720 },
721 GroupsType::Slice { groups, rolling } => {
722 let groups = unsafe {
723 let groups = slice_slice(groups, offset, len);
724 let ptr = groups.as_ptr() as *mut _;
725 Vec::from_raw_parts(ptr, groups.len(), groups.len())
726 };
727
728 ManuallyDrop::new(GroupsType::Slice {
729 groups,
730 rolling: *rolling,
731 })
732 },
733 }
734}
735
736fn slice_groups(g: Arc<GroupsType>, offset: i64, len: usize) -> GroupPositions {
737 let sliced = slice_groups_inner(g.as_ref(), offset, len);
738
739 GroupPositions {
740 sliced,
741 original: g,
742 offset,
743 len,
744 }
745}