polars_core/frame/group_by/
position.rs

1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3
4use arrow::offset::OffsetsBuffer;
5use polars_utils::idx_vec::IdxVec;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::prelude::*;
8
9use crate::POOL;
10use crate::prelude::*;
11use crate::utils::{NoNull, flatten, slice_slice};
12
13/// Indexes of the groups, the first index is stored separately.
14/// this make sorting fast.
15#[derive(Debug, Clone, PartialEq, Eq, Default)]
16pub struct GroupsIdx {
17    pub(crate) sorted: bool,
18    first: Vec<IdxSize>,
19    all: Vec<IdxVec>,
20}
21
22pub type IdxItem = (IdxSize, IdxVec);
23pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
24
25impl Drop for GroupsIdx {
26    fn drop(&mut self) {
27        let v = std::mem::take(&mut self.all);
28        // ~65k took approximately 1ms on local machine, so from that point we drop on other thread
29        // to stop query from being blocked
30        #[cfg(not(target_family = "wasm"))]
31        if v.len() > 1 << 16 {
32            std::thread::spawn(move || drop(v));
33        } else {
34            drop(v);
35        }
36
37        #[cfg(target_family = "wasm")]
38        drop(v);
39    }
40}
41
42impl From<Vec<IdxItem>> for GroupsIdx {
43    fn from(v: Vec<IdxItem>) -> Self {
44        v.into_iter().collect()
45    }
46}
47
48impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
49    fn from(v: Vec<Vec<IdxItem>>) -> Self {
50        // single threaded flatten: 10% faster than `iter().flatten().collect()
51        // this is the multi-threaded impl of that
52        let (cap, offsets) = flatten::cap_and_offsets(&v);
53        let mut first = Vec::with_capacity(cap);
54        let first_ptr = first.as_ptr() as usize;
55        let mut all = Vec::with_capacity(cap);
56        let all_ptr = all.as_ptr() as usize;
57
58        POOL.install(|| {
59            v.into_par_iter()
60                .zip(offsets)
61                .for_each(|(mut inner, offset)| {
62                    unsafe {
63                        let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
64                        let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
65
66                        let inner_ptr = inner.as_mut_ptr();
67                        for i in 0..inner.len() {
68                            let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
69                            std::ptr::write(first.add(i), first_val);
70                            std::ptr::write(all.add(i), vals);
71                        }
72                        // set len to 0 so that the contents will not get dropped
73                        // they are moved to `first` and `all`
74                        inner.set_len(0);
75                    }
76                });
77        });
78        unsafe {
79            all.set_len(cap);
80            first.set_len(cap);
81        }
82        GroupsIdx {
83            sorted: false,
84            first,
85            all,
86        }
87    }
88}
89
90impl GroupsIdx {
91    pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted: bool) -> Self {
92        Self { sorted, first, all }
93    }
94
95    pub fn sort(&mut self) {
96        if self.sorted {
97            return;
98        }
99        let mut idx = 0;
100        let first = std::mem::take(&mut self.first);
101        // store index and values so that we can sort those
102        let mut idx_vals = first
103            .into_iter()
104            .map(|v| {
105                let out = [idx, v];
106                idx += 1;
107                out
108            })
109            .collect_trusted::<Vec<_>>();
110        idx_vals.sort_unstable_by_key(|v| v[1]);
111
112        let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
113        let take_all = || {
114            idx_vals
115                .iter()
116                .map(|v| unsafe {
117                    let idx = v[0] as usize;
118                    std::mem::take(self.all.get_unchecked_mut(idx))
119                })
120                .collect_trusted::<Vec<_>>()
121        };
122        let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
123        self.first = first;
124        self.all = all;
125        self.sorted = true
126    }
127    pub fn is_sorted_flag(&self) -> bool {
128        self.sorted
129    }
130
131    pub fn iter(
132        &self,
133    ) -> std::iter::Zip<
134        std::iter::Copied<std::slice::Iter<'_, IdxSize>>,
135        std::slice::Iter<'_, IdxVec>,
136    > {
137        self.into_iter()
138    }
139
140    pub fn all(&self) -> &[IdxVec] {
141        &self.all
142    }
143
144    pub fn first(&self) -> &[IdxSize] {
145        &self.first
146    }
147
148    pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
149        &mut self.first
150    }
151
152    pub(crate) fn len(&self) -> usize {
153        self.first.len()
154    }
155
156    pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem<'_> {
157        let first = *self.first.get_unchecked(index);
158        let all = self.all.get_unchecked(index);
159        (first, all)
160    }
161
162    // Create an 'empty group', containing 1 group of length 0
163    pub fn new_empty() -> Self {
164        Self {
165            sorted: false,
166            first: vec![0],
167            all: vec![vec![].into()],
168        }
169    }
170}
171
172impl FromIterator<IdxItem> for GroupsIdx {
173    fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
174        let (first, all) = iter.into_iter().unzip();
175        GroupsIdx {
176            sorted: false,
177            first,
178            all,
179        }
180    }
181}
182
183impl<'a> IntoIterator for &'a GroupsIdx {
184    type Item = BorrowIdxItem<'a>;
185    type IntoIter = std::iter::Zip<
186        std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
187        std::slice::Iter<'a, IdxVec>,
188    >;
189
190    fn into_iter(self) -> Self::IntoIter {
191        self.first.iter().copied().zip(self.all.iter())
192    }
193}
194
195impl IntoIterator for GroupsIdx {
196    type Item = IdxItem;
197    type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
198
199    fn into_iter(mut self) -> Self::IntoIter {
200        let first = std::mem::take(&mut self.first);
201        let all = std::mem::take(&mut self.all);
202        first.into_iter().zip(all)
203    }
204}
205
206impl FromParallelIterator<IdxItem> for GroupsIdx {
207    fn from_par_iter<I>(par_iter: I) -> Self
208    where
209        I: IntoParallelIterator<Item = IdxItem>,
210    {
211        let (first, all) = par_iter.into_par_iter().unzip();
212        GroupsIdx {
213            sorted: false,
214            first,
215            all,
216        }
217    }
218}
219
220impl<'a> IntoParallelIterator for &'a GroupsIdx {
221    type Iter = rayon::iter::Zip<
222        rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
223        rayon::slice::Iter<'a, IdxVec>,
224    >;
225    type Item = BorrowIdxItem<'a>;
226
227    fn into_par_iter(self) -> Self::Iter {
228        self.first.par_iter().copied().zip(self.all.par_iter())
229    }
230}
231
232impl IntoParallelIterator for GroupsIdx {
233    type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
234    type Item = IdxItem;
235
236    fn into_par_iter(mut self) -> Self::Iter {
237        let first = std::mem::take(&mut self.first);
238        let all = std::mem::take(&mut self.all);
239        first.into_par_iter().zip(all.into_par_iter())
240    }
241}
242
243/// Every group is indicated by an array where the
244///  - first value is an index to the start of the group
245///  - second value is the length of the group
246///
247/// Only used when group values are stored together
248///
249/// This type should have the invariant that it is always sorted in ascending order.
250pub type GroupsSlice = Vec<[IdxSize; 2]>;
251
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub enum GroupsType {
254    Idx(GroupsIdx),
255    /// Slice is always sorted in ascending order.
256    Slice {
257        // the groups slices
258        groups: GroupsSlice,
259        /// Indicates if the groups may overlap, i.e., at least one index MAY be
260        /// included in more than one group slice.
261        overlapping: bool,
262        /// Indicates if the groups are rolling, i.e. for every consecutive group
263        /// slice (offset, len), and given start = offset and end = offset + len,
264        /// then both new_start >= start AND new_end >= end MUST be true.
265        monotonic: bool,
266    },
267}
268
269impl Default for GroupsType {
270    fn default() -> Self {
271        GroupsType::Idx(GroupsIdx::default())
272    }
273}
274
275impl GroupsType {
276    pub fn new_slice(groups: GroupsSlice, overlapping: bool, monotonic: bool) -> Self {
277        #[cfg(debug_assertions)]
278        {
279            fn groups_overlap(groups: &GroupsSlice) -> bool {
280                if groups.len() < 2 {
281                    return false;
282                }
283                let mut groups = groups.clone();
284                groups.sort();
285                let mut prev_end = groups[0][1];
286
287                for g in &groups[1..] {
288                    let start = g[0];
289                    let end = g[1];
290                    if start < prev_end {
291                        return true;
292                    }
293                    if end > prev_end {
294                        prev_end = end;
295                    }
296                }
297                false
298            }
299
300            assert!(overlapping || !groups_overlap(&groups));
301
302            fn groups_are_monotonic(groups: &GroupsSlice) -> bool {
303                if groups.len() < 2 {
304                    return true;
305                }
306
307                let (offset, len) = (groups[0][0], groups[0][1]);
308                let mut prev_start = offset;
309                let mut prev_end = offset + len;
310
311                for g in &groups[1..] {
312                    let start = g[0];
313                    let end = g[0] + g[1];
314
315                    if start < prev_start || end < prev_end {
316                        return false;
317                    }
318
319                    prev_start = start;
320                    prev_end = end;
321                }
322                true
323            }
324
325            assert!(!monotonic || groups_are_monotonic(&groups));
326        }
327
328        Self::Slice {
329            groups,
330            overlapping,
331            monotonic,
332        }
333    }
334
335    pub fn into_idx(self) -> GroupsIdx {
336        match self {
337            GroupsType::Idx(groups) => groups,
338            GroupsType::Slice { groups, .. } => {
339                polars_warn!(
340                    "Had to reallocate groups, missed an optimization opportunity. Please open an issue."
341                );
342                groups
343                    .iter()
344                    .map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
345                    .collect()
346            },
347        }
348    }
349
350    pub(crate) fn prepare_list_agg(
351        &self,
352        total_len: usize,
353    ) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
354        let mut can_fast_explode = true;
355        match self {
356            GroupsType::Idx(groups) => {
357                let mut list_offset = Vec::with_capacity(self.len() + 1);
358                let mut gather_offsets = Vec::with_capacity(total_len);
359
360                let mut len_so_far = 0i64;
361                list_offset.push(len_so_far);
362
363                for idx in groups {
364                    let idx = idx.1;
365                    gather_offsets.extend_from_slice(idx);
366                    len_so_far += idx.len() as i64;
367                    list_offset.push(len_so_far);
368                    can_fast_explode &= !idx.is_empty();
369                }
370                unsafe {
371                    (
372                        Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
373                        OffsetsBuffer::new_unchecked(list_offset.into()),
374                        can_fast_explode,
375                    )
376                }
377            },
378            GroupsType::Slice { groups, .. } => {
379                let mut list_offset = Vec::with_capacity(self.len() + 1);
380                let mut gather_offsets = Vec::with_capacity(total_len);
381                let mut len_so_far = 0i64;
382                list_offset.push(len_so_far);
383
384                for g in groups {
385                    let len = g[1];
386                    let offset = g[0];
387                    gather_offsets.extend(offset..offset + len);
388
389                    len_so_far += len as i64;
390                    list_offset.push(len_so_far);
391                    can_fast_explode &= len > 0;
392                }
393
394                unsafe {
395                    (
396                        Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
397                        OffsetsBuffer::new_unchecked(list_offset.into()),
398                        can_fast_explode,
399                    )
400                }
401            },
402        }
403    }
404
405    pub fn iter(&self) -> GroupsTypeIter<'_> {
406        GroupsTypeIter::new(self)
407    }
408
409    pub fn sort(&mut self) {
410        match self {
411            GroupsType::Idx(groups) => {
412                if !groups.is_sorted_flag() {
413                    groups.sort()
414                }
415            },
416            GroupsType::Slice { .. } => {
417                // invariant of the type
418            },
419        }
420    }
421
422    pub(crate) fn is_sorted_flag(&self) -> bool {
423        match self {
424            GroupsType::Idx(groups) => groups.is_sorted_flag(),
425            GroupsType::Slice { .. } => true,
426        }
427    }
428
429    pub fn is_overlapping(&self) -> bool {
430        matches!(
431            self,
432            GroupsType::Slice {
433                overlapping: true,
434                ..
435            }
436        )
437    }
438
439    pub fn is_monotonic(&self) -> bool {
440        matches!(
441            self,
442            GroupsType::Slice {
443                monotonic: true,
444                ..
445            }
446        )
447    }
448
449    pub fn take_group_firsts(self) -> Vec<IdxSize> {
450        match self {
451            GroupsType::Idx(mut groups) => std::mem::take(&mut groups.first),
452            GroupsType::Slice { groups, .. } => {
453                groups.into_iter().map(|[first, _len]| first).collect()
454            },
455        }
456    }
457
458    /// Checks if groups are of equal length. The caller is responsible for
459    /// updating the groups by calling `groups()` prior to calling this method.
460    pub fn check_lengths(self: &GroupsType, other: &GroupsType) -> PolarsResult<()> {
461        if std::ptr::eq(self, other) {
462            return Ok(());
463        }
464        polars_ensure!(self.iter().zip(other.iter()).all(|(a, b)| {
465            a.len() == b.len()
466        }), ShapeMismatch: "expressions must have matching group lengths");
467        Ok(())
468    }
469
470    /// # Safety
471    /// This will not do any bounds checks. The caller must ensure
472    /// all groups have members.
473    pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
474        match self {
475            GroupsType::Idx(groups) => groups
476                .all
477                .iter()
478                .map(|idx| *idx.get_unchecked(idx.len() - 1))
479                .collect(),
480            GroupsType::Slice { groups, .. } => groups
481                .into_iter()
482                .map(|[first, len]| first + len - 1)
483                .collect(),
484        }
485    }
486
487    pub fn par_iter(&self) -> GroupsTypeParIter<'_> {
488        GroupsTypeParIter::new(self)
489    }
490
491    /// Get a reference to the `GroupsIdx`.
492    ///
493    /// # Panic
494    ///
495    /// panics if the groups are a slice.
496    pub fn unwrap_idx(&self) -> &GroupsIdx {
497        match self {
498            GroupsType::Idx(groups) => groups,
499            GroupsType::Slice { .. } => panic!("groups are slices not index"),
500        }
501    }
502
503    /// Get a reference to the `GroupsSlice`.
504    ///
505    /// # Panic
506    ///
507    /// panics if the groups are an idx.
508    pub fn unwrap_slice(&self) -> &GroupsSlice {
509        match self {
510            GroupsType::Slice { groups, .. } => groups,
511            GroupsType::Idx(_) => panic!("groups are index not slices"),
512        }
513    }
514
515    pub fn get(&self, index: usize) -> GroupsIndicator<'_> {
516        match self {
517            GroupsType::Idx(groups) => {
518                let first = groups.first[index];
519                let all = &groups.all[index];
520                GroupsIndicator::Idx((first, all))
521            },
522            GroupsType::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
523        }
524    }
525
526    /// Get a mutable reference to the `GroupsIdx`.
527    ///
528    /// # Panic
529    ///
530    /// panics if the groups are a slice.
531    pub fn idx_mut(&mut self) -> &mut GroupsIdx {
532        match self {
533            GroupsType::Idx(groups) => groups,
534            GroupsType::Slice { .. } => panic!("groups are slices not index"),
535        }
536    }
537
538    pub fn len(&self) -> usize {
539        match self {
540            GroupsType::Idx(groups) => groups.len(),
541            GroupsType::Slice { groups, .. } => groups.len(),
542        }
543    }
544
545    pub fn is_empty(&self) -> bool {
546        self.len() == 0
547    }
548
549    pub fn group_count(&self) -> IdxCa {
550        match self {
551            GroupsType::Idx(groups) => {
552                let ca: NoNull<IdxCa> = groups
553                    .iter()
554                    .map(|(_first, idx)| idx.len() as IdxSize)
555                    .collect_trusted();
556                ca.into_inner()
557            },
558            GroupsType::Slice { groups, .. } => {
559                let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
560                ca.into_inner()
561            },
562        }
563    }
564    pub fn as_list_chunked(&self) -> ListChunked {
565        match self {
566            GroupsType::Idx(groups) => groups
567                .iter()
568                .map(|(_first, idx)| {
569                    let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
570                    ca.into_inner().into_series()
571                })
572                .collect_trusted(),
573            GroupsType::Slice { groups, .. } => groups
574                .iter()
575                .map(|&[first, len]| {
576                    let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
577                    ca.into_inner().into_series()
578                })
579                .collect_trusted(),
580        }
581    }
582
583    pub fn into_sliceable(self) -> GroupPositions {
584        let len = self.len();
585        slice_groups(Arc::new(self), 0, len)
586    }
587
588    pub fn num_elements(&self) -> usize {
589        match self {
590            GroupsType::Idx(i) => i.all().iter().map(|v| v.len()).sum(),
591            GroupsType::Slice {
592                groups,
593                overlapping: _,
594                monotonic: _,
595            } => groups.iter().map(|[_, l]| *l as usize).sum(),
596        }
597    }
598}
599
600impl From<GroupsIdx> for GroupsType {
601    fn from(groups: GroupsIdx) -> Self {
602        GroupsType::Idx(groups)
603    }
604}
605
606pub enum GroupsIndicator<'a> {
607    Idx(BorrowIdxItem<'a>),
608    Slice([IdxSize; 2]),
609}
610
611impl GroupsIndicator<'_> {
612    pub fn len(&self) -> usize {
613        match self {
614            GroupsIndicator::Idx(g) => g.1.len(),
615            GroupsIndicator::Slice([_, len]) => *len as usize,
616        }
617    }
618    pub fn first(&self) -> IdxSize {
619        match self {
620            GroupsIndicator::Idx(g) => g.0,
621            GroupsIndicator::Slice([first, _]) => *first,
622        }
623    }
624    pub fn is_empty(&self) -> bool {
625        self.len() == 0
626    }
627}
628
629pub struct GroupsTypeIter<'a> {
630    vals: &'a GroupsType,
631    len: usize,
632    idx: usize,
633}
634
635impl<'a> GroupsTypeIter<'a> {
636    fn new(vals: &'a GroupsType) -> Self {
637        let len = vals.len();
638        let idx = 0;
639        GroupsTypeIter { vals, len, idx }
640    }
641}
642
643impl<'a> Iterator for GroupsTypeIter<'a> {
644    type Item = GroupsIndicator<'a>;
645
646    fn nth(&mut self, n: usize) -> Option<Self::Item> {
647        self.idx = self.idx.saturating_add(n);
648        self.next()
649    }
650
651    fn next(&mut self) -> Option<Self::Item> {
652        if self.idx >= self.len {
653            return None;
654        }
655
656        let out = unsafe {
657            match self.vals {
658                GroupsType::Idx(groups) => {
659                    let item = groups.get_unchecked(self.idx);
660                    Some(GroupsIndicator::Idx(item))
661                },
662                GroupsType::Slice { groups, .. } => {
663                    Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
664                },
665            }
666        };
667        self.idx += 1;
668        out
669    }
670}
671
672pub struct GroupsTypeParIter<'a> {
673    vals: &'a GroupsType,
674    len: usize,
675}
676
677impl<'a> GroupsTypeParIter<'a> {
678    fn new(vals: &'a GroupsType) -> Self {
679        let len = vals.len();
680        GroupsTypeParIter { vals, len }
681    }
682}
683
684impl<'a> ParallelIterator for GroupsTypeParIter<'a> {
685    type Item = GroupsIndicator<'a>;
686
687    fn drive_unindexed<C>(self, consumer: C) -> C::Result
688    where
689        C: UnindexedConsumer<Self::Item>,
690    {
691        (0..self.len)
692            .into_par_iter()
693            .map(|i| unsafe {
694                match self.vals {
695                    GroupsType::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
696                    GroupsType::Slice { groups, .. } => {
697                        GroupsIndicator::Slice(*groups.get_unchecked(i))
698                    },
699                }
700            })
701            .drive_unindexed(consumer)
702    }
703}
704
705#[derive(Debug)]
706pub struct GroupPositions {
707    // SAFETY: sliced is a shallow clone of original
708    // It emulates a shared reference, not an exclusive reference
709    // Its data must not be mutated through direct access
710    sliced: ManuallyDrop<GroupsType>,
711    // Unsliced buffer
712    original: Arc<GroupsType>,
713    offset: i64,
714    len: usize,
715}
716
717impl Clone for GroupPositions {
718    fn clone(&self) -> Self {
719        let sliced = slice_groups_inner(&self.original, self.offset, self.len);
720
721        Self {
722            sliced,
723            original: self.original.clone(),
724            offset: self.offset,
725            len: self.len,
726        }
727    }
728}
729
730impl AsRef<GroupsType> for GroupPositions {
731    fn as_ref(&self) -> &GroupsType {
732        self.sliced.deref()
733    }
734}
735
736impl Deref for GroupPositions {
737    type Target = GroupsType;
738
739    fn deref(&self) -> &Self::Target {
740        self.sliced.deref()
741    }
742}
743
744impl Default for GroupPositions {
745    fn default() -> Self {
746        GroupsType::default().into_sliceable()
747    }
748}
749
750impl GroupPositions {
751    pub fn slice(&self, offset: i64, len: usize) -> Self {
752        let offset = self.offset + offset;
753        slice_groups(
754            self.original.clone(),
755            offset,
756            // invariant that len should be in bounds, so truncate if not
757            if len > self.len { self.len } else { len },
758        )
759    }
760
761    pub fn sort(&mut self) {
762        if !self.as_ref().is_sorted_flag() {
763            let original = Arc::make_mut(&mut self.original);
764            original.sort();
765
766            self.sliced = slice_groups_inner(original, self.offset, self.len);
767        }
768    }
769
770    pub fn unroll(mut self) -> GroupPositions {
771        match self.sliced.deref_mut() {
772            GroupsType::Idx(_) => self,
773            GroupsType::Slice {
774                overlapping: false, ..
775            } => self,
776            GroupsType::Slice { groups, .. } => {
777                // SAFETY: sliced is a shallow partial clone of original.
778                // A new owning Vec is required per GH issue #21859
779                let mut cum_offset = 0 as IdxSize;
780                let groups: Vec<_> = groups
781                    .iter()
782                    .map(|[_, len]| {
783                        let new = [cum_offset, *len];
784                        cum_offset += *len;
785                        new
786                    })
787                    .collect();
788
789                GroupsType::new_slice(groups, false, true).into_sliceable()
790            },
791        }
792    }
793
794    pub fn as_unrolled_slice(&self) -> Option<&GroupsSlice> {
795        match &*self.sliced {
796            GroupsType::Idx(_) => None,
797            GroupsType::Slice {
798                groups: _,
799                overlapping: true,
800                monotonic: _,
801            } => None,
802            GroupsType::Slice {
803                groups,
804                overlapping: false,
805                monotonic: _,
806            } => Some(groups),
807        }
808    }
809}
810
811fn slice_groups_inner(g: &GroupsType, offset: i64, len: usize) -> ManuallyDrop<GroupsType> {
812    // SAFETY:
813    // we create new `Vec`s from the sliced groups. But we wrap them in ManuallyDrop
814    // so that we never call drop on them.
815    // These groups lifetimes are bounded to the `g`. This must remain valid
816    // for the scope of the aggregation.
817    match g {
818        GroupsType::Idx(groups) => {
819            let first = unsafe {
820                let first = slice_slice(groups.first(), offset, len);
821                let ptr = first.as_ptr() as *mut _;
822                Vec::from_raw_parts(ptr, first.len(), first.len())
823            };
824
825            let all = unsafe {
826                let all = slice_slice(groups.all(), offset, len);
827                let ptr = all.as_ptr() as *mut _;
828                Vec::from_raw_parts(ptr, all.len(), all.len())
829            };
830            ManuallyDrop::new(GroupsType::Idx(GroupsIdx::new(
831                first,
832                all,
833                groups.is_sorted_flag(),
834            )))
835        },
836        GroupsType::Slice {
837            groups,
838            overlapping,
839            monotonic,
840        } => {
841            let groups = unsafe {
842                let groups = slice_slice(groups, offset, len);
843                let ptr = groups.as_ptr() as *mut _;
844                Vec::from_raw_parts(ptr, groups.len(), groups.len())
845            };
846
847            ManuallyDrop::new(GroupsType::new_slice(groups, *overlapping, *monotonic))
848        },
849    }
850}
851
852fn slice_groups(g: Arc<GroupsType>, offset: i64, len: usize) -> GroupPositions {
853    let sliced = slice_groups_inner(g.as_ref(), offset, len);
854
855    GroupPositions {
856        sliced,
857        original: g,
858        offset,
859        len,
860    }
861}