use std::mem::ManuallyDrop;
use std::ops::Deref;
use arrow::offset::OffsetsBuffer;
use polars_utils::idx_vec::IdxVec;
use polars_utils::sync::SyncPtr;
use rayon::iter::plumbing::UnindexedConsumer;
use rayon::prelude::*;
use crate::prelude::*;
use crate::utils::{flatten, slice_slice, NoNull};
use crate::POOL;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct GroupsIdx {
pub(crate) sorted: bool,
first: Vec<IdxSize>,
all: Vec<IdxVec>,
}
pub type IdxItem = (IdxSize, IdxVec);
pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
impl Drop for GroupsIdx {
fn drop(&mut self) {
let v = std::mem::take(&mut self.all);
#[cfg(not(target_family = "wasm"))]
if v.len() > 1 << 16 {
std::thread::spawn(move || drop(v));
} else {
drop(v);
}
#[cfg(target_family = "wasm")]
drop(v);
}
}
impl From<Vec<IdxItem>> for GroupsIdx {
fn from(v: Vec<IdxItem>) -> Self {
v.into_iter().collect()
}
}
impl From<Vec<(Vec<IdxSize>, Vec<IdxVec>)>> for GroupsIdx {
fn from(v: Vec<(Vec<IdxSize>, Vec<IdxVec>)>) -> Self {
let cap = v.iter().map(|v| v.0.len()).sum::<usize>();
let offsets = v
.iter()
.scan(0_usize, |acc, v| {
let out = *acc;
*acc += v.0.len();
Some(out)
})
.collect::<Vec<_>>();
let mut global_first = Vec::with_capacity(cap);
let global_first_ptr = unsafe { SyncPtr::new(global_first.as_mut_ptr()) };
let mut global_all = Vec::with_capacity(cap);
let global_all_ptr = unsafe { SyncPtr::new(global_all.as_mut_ptr()) };
POOL.install(|| {
v.into_par_iter().zip(offsets).for_each(
|((local_first_vals, mut local_all_vals), offset)| unsafe {
let global_first: *mut IdxSize = global_first_ptr.get();
let global_all: *mut IdxVec = global_all_ptr.get();
let global_first = global_first.add(offset);
let global_all = global_all.add(offset);
std::ptr::copy_nonoverlapping(
local_first_vals.as_ptr(),
global_first,
local_first_vals.len(),
);
std::ptr::copy_nonoverlapping(
local_all_vals.as_ptr(),
global_all,
local_all_vals.len(),
);
local_all_vals.set_len(0);
},
);
});
unsafe {
global_all.set_len(cap);
global_first.set_len(cap);
}
GroupsIdx {
sorted: false,
first: global_first,
all: global_all,
}
}
}
impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
fn from(v: Vec<Vec<IdxItem>>) -> Self {
let (cap, offsets) = flatten::cap_and_offsets(&v);
let mut first = Vec::with_capacity(cap);
let first_ptr = first.as_ptr() as usize;
let mut all = Vec::with_capacity(cap);
let all_ptr = all.as_ptr() as usize;
POOL.install(|| {
v.into_par_iter()
.zip(offsets)
.for_each(|(mut inner, offset)| {
unsafe {
let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
let inner_ptr = inner.as_mut_ptr();
for i in 0..inner.len() {
let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
std::ptr::write(first.add(i), first_val);
std::ptr::write(all.add(i), vals);
}
inner.set_len(0);
}
});
});
unsafe {
all.set_len(cap);
first.set_len(cap);
}
GroupsIdx {
sorted: false,
first,
all,
}
}
}
impl GroupsIdx {
pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted: bool) -> Self {
Self { sorted, first, all }
}
pub fn sort(&mut self) {
let mut idx = 0;
let first = std::mem::take(&mut self.first);
let mut idx_vals = first
.into_iter()
.map(|v| {
let out = [idx, v];
idx += 1;
out
})
.collect_trusted::<Vec<_>>();
idx_vals.sort_unstable_by_key(|v| v[1]);
let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
let take_all = || {
idx_vals
.iter()
.map(|v| unsafe {
let idx = v[0] as usize;
std::mem::take(self.all.get_unchecked_mut(idx))
})
.collect_trusted::<Vec<_>>()
};
let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
self.first = first;
self.all = all;
self.sorted = true
}
pub fn is_sorted_flag(&self) -> bool {
self.sorted
}
pub fn iter(
&self,
) -> std::iter::Zip<std::iter::Copied<std::slice::Iter<IdxSize>>, std::slice::Iter<IdxVec>>
{
self.into_iter()
}
pub fn all(&self) -> &[IdxVec] {
&self.all
}
pub fn first(&self) -> &[IdxSize] {
&self.first
}
pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
&mut self.first
}
pub(crate) fn len(&self) -> usize {
self.first.len()
}
pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem {
let first = *self.first.get_unchecked(index);
let all = self.all.get_unchecked(index);
(first, all)
}
}
impl FromIterator<IdxItem> for GroupsIdx {
fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
let (first, all) = iter.into_iter().unzip();
GroupsIdx {
sorted: false,
first,
all,
}
}
}
impl<'a> IntoIterator for &'a GroupsIdx {
type Item = BorrowIdxItem<'a>;
type IntoIter = std::iter::Zip<
std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
std::slice::Iter<'a, IdxVec>,
>;
fn into_iter(self) -> Self::IntoIter {
self.first.iter().copied().zip(self.all.iter())
}
}
impl IntoIterator for GroupsIdx {
type Item = IdxItem;
type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
fn into_iter(mut self) -> Self::IntoIter {
let first = std::mem::take(&mut self.first);
let all = std::mem::take(&mut self.all);
first.into_iter().zip(all)
}
}
impl FromParallelIterator<IdxItem> for GroupsIdx {
fn from_par_iter<I>(par_iter: I) -> Self
where
I: IntoParallelIterator<Item = IdxItem>,
{
let (first, all) = par_iter.into_par_iter().unzip();
GroupsIdx {
sorted: false,
first,
all,
}
}
}
impl<'a> IntoParallelIterator for &'a GroupsIdx {
type Iter = rayon::iter::Zip<
rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
rayon::slice::Iter<'a, IdxVec>,
>;
type Item = BorrowIdxItem<'a>;
fn into_par_iter(self) -> Self::Iter {
self.first.par_iter().copied().zip(self.all.par_iter())
}
}
impl IntoParallelIterator for GroupsIdx {
type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
type Item = IdxItem;
fn into_par_iter(mut self) -> Self::Iter {
let first = std::mem::take(&mut self.first);
let all = std::mem::take(&mut self.all);
first.into_par_iter().zip(all.into_par_iter())
}
}
pub type GroupsSlice = Vec<[IdxSize; 2]>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GroupsProxy {
Idx(GroupsIdx),
Slice {
groups: GroupsSlice,
rolling: bool,
},
}
impl Default for GroupsProxy {
fn default() -> Self {
GroupsProxy::Idx(GroupsIdx::default())
}
}
impl GroupsProxy {
pub fn into_idx(self) -> GroupsIdx {
match self {
GroupsProxy::Idx(groups) => groups,
GroupsProxy::Slice { groups, .. } => {
polars_warn!("Had to reallocate groups, missed an optimization opportunity. Please open an issue.");
groups
.iter()
.map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
.collect()
},
}
}
pub(crate) fn prepare_list_agg(
&self,
total_len: usize,
) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
let mut can_fast_explode = true;
match self {
GroupsProxy::Idx(groups) => {
let mut list_offset = Vec::with_capacity(self.len() + 1);
let mut gather_offsets = Vec::with_capacity(total_len);
let mut len_so_far = 0i64;
list_offset.push(len_so_far);
for idx in groups {
let idx = idx.1;
gather_offsets.extend_from_slice(idx);
len_so_far += idx.len() as i64;
list_offset.push(len_so_far);
can_fast_explode &= !idx.is_empty();
}
unsafe {
(
Some(IdxCa::from_vec("", gather_offsets)),
OffsetsBuffer::new_unchecked(list_offset.into()),
can_fast_explode,
)
}
},
GroupsProxy::Slice { groups, .. } => {
let mut list_offset = Vec::with_capacity(self.len() + 1);
let mut gather_offsets = Vec::with_capacity(total_len);
let mut len_so_far = 0i64;
list_offset.push(len_so_far);
for g in groups {
let len = g[1];
let offset = g[0];
gather_offsets.extend(offset..offset + len);
len_so_far += len as i64;
list_offset.push(len_so_far);
can_fast_explode &= len > 0;
}
unsafe {
(
Some(IdxCa::from_vec("", gather_offsets)),
OffsetsBuffer::new_unchecked(list_offset.into()),
can_fast_explode,
)
}
},
}
}
pub fn iter(&self) -> GroupsProxyIter {
GroupsProxyIter::new(self)
}
pub fn sort(&mut self) {
match self {
GroupsProxy::Idx(groups) => {
if !groups.is_sorted_flag() {
groups.sort()
}
},
GroupsProxy::Slice { .. } => {
},
}
}
pub(crate) fn is_sorted_flag(&self) -> bool {
match self {
GroupsProxy::Idx(groups) => groups.is_sorted_flag(),
GroupsProxy::Slice { .. } => true,
}
}
pub fn take_group_firsts(self) -> Vec<IdxSize> {
match self {
GroupsProxy::Idx(mut groups) => std::mem::take(&mut groups.first),
GroupsProxy::Slice { groups, .. } => {
groups.into_iter().map(|[first, _len]| first).collect()
},
}
}
pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
match self {
GroupsProxy::Idx(groups) => groups
.all
.iter()
.map(|idx| *idx.get_unchecked(idx.len() - 1))
.collect(),
GroupsProxy::Slice { groups, .. } => groups
.into_iter()
.map(|[first, len]| first + len - 1)
.collect(),
}
}
pub fn par_iter(&self) -> GroupsProxyParIter {
GroupsProxyParIter::new(self)
}
pub fn unwrap_idx(&self) -> &GroupsIdx {
match self {
GroupsProxy::Idx(groups) => groups,
GroupsProxy::Slice { .. } => panic!("groups are slices not index"),
}
}
pub fn unwrap_slice(&self) -> &GroupsSlice {
match self {
GroupsProxy::Slice { groups, .. } => groups,
GroupsProxy::Idx(_) => panic!("groups are index not slices"),
}
}
pub fn get(&self, index: usize) -> GroupsIndicator {
match self {
GroupsProxy::Idx(groups) => {
let first = groups.first[index];
let all = &groups.all[index];
GroupsIndicator::Idx((first, all))
},
GroupsProxy::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
}
}
pub fn idx_mut(&mut self) -> &mut GroupsIdx {
match self {
GroupsProxy::Idx(groups) => groups,
GroupsProxy::Slice { .. } => panic!("groups are slices not index"),
}
}
pub fn len(&self) -> usize {
match self {
GroupsProxy::Idx(groups) => groups.len(),
GroupsProxy::Slice { groups, .. } => groups.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn group_count(&self) -> IdxCa {
match self {
GroupsProxy::Idx(groups) => {
let ca: NoNull<IdxCa> = groups
.iter()
.map(|(_first, idx)| idx.len() as IdxSize)
.collect_trusted();
ca.into_inner()
},
GroupsProxy::Slice { groups, .. } => {
let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
ca.into_inner()
},
}
}
pub fn as_list_chunked(&self) -> ListChunked {
match self {
GroupsProxy::Idx(groups) => groups
.iter()
.map(|(_first, idx)| {
let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
ca.into_inner().into_series()
})
.collect_trusted(),
GroupsProxy::Slice { groups, .. } => groups
.iter()
.map(|&[first, len]| {
let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
ca.into_inner().into_series()
})
.collect_trusted(),
}
}
pub fn unroll(self) -> GroupsProxy {
match self {
GroupsProxy::Idx(_) => self,
GroupsProxy::Slice { rolling: false, .. } => self,
GroupsProxy::Slice { mut groups, .. } => {
let mut offset = 0 as IdxSize;
for g in groups.iter_mut() {
g[0] = offset;
offset += g[1];
}
GroupsProxy::Slice {
groups,
rolling: false,
}
},
}
}
pub fn slice(&self, offset: i64, len: usize) -> SlicedGroups {
let sliced = match self {
GroupsProxy::Idx(groups) => {
let first = unsafe {
let first = slice_slice(groups.first(), offset, len);
let ptr = first.as_ptr() as *mut _;
Vec::from_raw_parts(ptr, first.len(), first.len())
};
let all = unsafe {
let all = slice_slice(groups.all(), offset, len);
let ptr = all.as_ptr() as *mut _;
Vec::from_raw_parts(ptr, all.len(), all.len())
};
ManuallyDrop::new(GroupsProxy::Idx(GroupsIdx::new(
first,
all,
groups.is_sorted_flag(),
)))
},
GroupsProxy::Slice { groups, rolling } => {
let groups = unsafe {
let groups = slice_slice(groups, offset, len);
let ptr = groups.as_ptr() as *mut _;
Vec::from_raw_parts(ptr, groups.len(), groups.len())
};
ManuallyDrop::new(GroupsProxy::Slice {
groups,
rolling: *rolling,
})
},
};
SlicedGroups {
sliced,
borrowed: self,
}
}
}
impl From<GroupsIdx> for GroupsProxy {
fn from(groups: GroupsIdx) -> Self {
GroupsProxy::Idx(groups)
}
}
pub enum GroupsIndicator<'a> {
Idx(BorrowIdxItem<'a>),
Slice([IdxSize; 2]),
}
impl<'a> GroupsIndicator<'a> {
pub fn len(&self) -> usize {
match self {
GroupsIndicator::Idx(g) => g.1.len(),
GroupsIndicator::Slice([_, len]) => *len as usize,
}
}
pub fn first(&self) -> IdxSize {
match self {
GroupsIndicator::Idx(g) => g.0,
GroupsIndicator::Slice([first, _]) => *first,
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub struct GroupsProxyIter<'a> {
vals: &'a GroupsProxy,
len: usize,
idx: usize,
}
impl<'a> GroupsProxyIter<'a> {
fn new(vals: &'a GroupsProxy) -> Self {
let len = vals.len();
let idx = 0;
GroupsProxyIter { vals, len, idx }
}
}
impl<'a> Iterator for GroupsProxyIter<'a> {
type Item = GroupsIndicator<'a>;
fn nth(&mut self, n: usize) -> Option<Self::Item> {
self.idx = self.idx.saturating_add(n);
self.next()
}
fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.len {
return None;
}
let out = unsafe {
match self.vals {
GroupsProxy::Idx(groups) => {
let item = groups.get_unchecked(self.idx);
Some(GroupsIndicator::Idx(item))
},
GroupsProxy::Slice { groups, .. } => {
Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
},
}
};
self.idx += 1;
out
}
}
pub struct GroupsProxyParIter<'a> {
vals: &'a GroupsProxy,
len: usize,
}
impl<'a> GroupsProxyParIter<'a> {
fn new(vals: &'a GroupsProxy) -> Self {
let len = vals.len();
GroupsProxyParIter { vals, len }
}
}
impl<'a> ParallelIterator for GroupsProxyParIter<'a> {
type Item = GroupsIndicator<'a>;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
(0..self.len)
.into_par_iter()
.map(|i| unsafe {
match self.vals {
GroupsProxy::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
GroupsProxy::Slice { groups, .. } => {
GroupsIndicator::Slice(*groups.get_unchecked(i))
},
}
})
.drive_unindexed(consumer)
}
}
pub struct SlicedGroups<'a> {
sliced: ManuallyDrop<GroupsProxy>,
#[allow(dead_code)]
borrowed: &'a GroupsProxy,
}
impl Deref for SlicedGroups<'_> {
type Target = GroupsProxy;
fn deref(&self) -> &Self::Target {
self.sliced.deref()
}
}