polars_core/chunked_array/ops/explode_and_offsets.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
use arrow::offset::OffsetsBuffer;
use polars_compute::gather::take_unchecked;
use super::*;
impl ListChunked {
fn specialized(
&self,
values: ArrayRef,
offsets: &[i64],
offsets_buf: OffsetsBuffer<i64>,
) -> (Series, OffsetsBuffer<i64>) {
// SAFETY: inner_dtype should be correct
let values = unsafe {
Series::from_chunks_and_dtype_unchecked(
self.name().clone(),
vec![values],
&self.inner_dtype().to_physical(),
)
};
use crate::chunked_array::ops::explode::ExplodeByOffsets;
let mut values = match values.dtype() {
DataType::Boolean => {
let t = values.bool().unwrap();
ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
},
DataType::Null => {
let t = values.null().unwrap();
ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
},
dtype => {
with_match_physical_numeric_polars_type!(dtype, |$T| {
let t: &ChunkedArray<$T> = values.as_ref().as_ref();
ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
})
},
};
// let mut values = values.explode_by_offsets(offsets);
// restore logical type
values = unsafe { values.from_physical_unchecked(self.inner_dtype()) }.unwrap();
(values, offsets_buf)
}
}
impl ChunkExplode for ListChunked {
fn offsets(&self) -> PolarsResult<OffsetsBuffer<i64>> {
let ca = self.rechunk();
let listarr: &LargeListArray = ca.downcast_iter().next().unwrap();
let offsets = listarr.offsets().clone();
Ok(offsets)
}
fn explode_and_offsets(&self) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
// A list array's memory layout is actually already 'exploded', so we can just take the
// values array of the list. And we also return a slice of the offsets. This slice can be
// used to find the old list layout or indexes to expand a DataFrame in the same manner as
// the `explode` operation.
let ca = self.rechunk();
let listarr: &LargeListArray = ca.downcast_iter().next().unwrap();
let offsets_buf = listarr.offsets().clone();
let offsets = listarr.offsets().as_slice();
let mut values = listarr.values().clone();
let (mut s, offsets) = if ca._can_fast_explode() {
// ensure that the value array is sliced
// as a list only slices its offsets on a slice operation
// we only do this in fast-explode as for the other
// branch the offsets must coincide with the values.
if !offsets.is_empty() {
let start = offsets[0] as usize;
let len = offsets[offsets.len() - 1] as usize - start;
// SAFETY:
// we are in bounds
values = unsafe { values.sliced_unchecked(start, len) };
}
// SAFETY: inner_dtype should be correct
(
unsafe {
Series::from_chunks_and_dtype_unchecked(
self.name().clone(),
vec![values],
&self.inner_dtype().to_physical(),
)
},
offsets_buf,
)
} else {
// during tests
// test that this code branch is not hit with list arrays that could be fast exploded
#[cfg(test)]
{
let mut last = offsets[0];
let mut has_empty = false;
for &o in &offsets[1..] {
if o == last {
has_empty = true;
}
last = o;
}
if !has_empty && offsets[0] == 0 {
panic!("could have fast exploded")
}
}
let (indices, new_offsets) = if listarr.null_count() == 0 {
// SPECIALIZED path.
let inner_phys = self.inner_dtype().to_physical();
if inner_phys.is_primitive_numeric() || inner_phys.is_null() || inner_phys.is_bool()
{
return Ok(self.specialized(values, offsets, offsets_buf));
}
// Use gather
let mut indices =
MutablePrimitiveArray::<IdxSize>::with_capacity(*offsets_buf.last() as usize);
let mut new_offsets = Vec::with_capacity(listarr.len() + 1);
let mut current_offset = 0i64;
let mut iter = offsets.iter();
if let Some(mut previous) = iter.next().copied() {
new_offsets.push(current_offset);
iter.for_each(|&offset| {
let len = offset - previous;
let start = previous as IdxSize;
let end = offset as IdxSize;
if len == 0 {
indices.push_null();
} else {
indices.extend_trusted_len_values(start..end);
}
current_offset += len;
previous = offset;
new_offsets.push(current_offset);
})
}
(indices, new_offsets)
} else {
// we have already ensure that validity is not none.
let validity = listarr.validity().unwrap();
let mut indices =
MutablePrimitiveArray::<IdxSize>::with_capacity(*offsets_buf.last() as usize);
let mut new_offsets = Vec::with_capacity(listarr.len() + 1);
let mut current_offset = 0i64;
let mut iter = offsets.iter();
if let Some(mut previous) = iter.next().copied() {
new_offsets.push(current_offset);
iter.enumerate().for_each(|(i, &offset)| {
let len = offset - previous;
let start = previous as IdxSize;
let end = offset as IdxSize;
// SAFETY: we are within bounds
if unsafe { validity.get_bit_unchecked(i) } {
// explode expects null value if sublist is empty.
if len == 0 {
indices.push_null();
} else {
indices.extend_trusted_len_values(start..end);
}
current_offset += len;
} else {
indices.push_null();
}
previous = offset;
new_offsets.push(current_offset);
})
}
(indices, new_offsets)
};
// SAFETY: the indices we generate are in bounds
let chunk = unsafe { take_unchecked(values.as_ref(), &indices.into()) };
// SAFETY: inner_dtype should be correct
let s = unsafe {
Series::from_chunks_and_dtype_unchecked(
self.name().clone(),
vec![chunk],
&self.inner_dtype().to_physical(),
)
};
// SAFETY: monotonically increasing
let new_offsets = unsafe { OffsetsBuffer::new_unchecked(new_offsets.into()) };
(s, new_offsets)
};
debug_assert_eq!(s.name(), self.name());
// restore logical type
s = unsafe { s.from_physical_unchecked(self.inner_dtype()) }.unwrap();
Ok((s, offsets))
}
}
#[cfg(feature = "dtype-array")]
impl ChunkExplode for ArrayChunked {
fn offsets(&self) -> PolarsResult<OffsetsBuffer<i64>> {
// fast-path for non-null array.
if self.null_count() == 0 {
let width = self.width() as i64;
let offsets = (0..self.len() + 1)
.map(|i| {
let i = i as i64;
i * width
})
.collect::<Vec<_>>();
// SAFETY: monotonically increasing
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
return Ok(offsets);
}
let ca = self.rechunk();
let arr = ca.downcast_iter().next().unwrap();
// we have already ensure that validity is not none.
let validity = arr.validity().unwrap();
let width = arr.size();
let mut current_offset = 0i64;
let offsets = (0..=arr.len())
.map(|i| {
if i == 0 {
return current_offset;
}
// SAFETY: we are within bounds
if unsafe { validity.get_bit_unchecked(i - 1) } {
current_offset += width as i64
}
current_offset
})
.collect::<Vec<_>>();
// SAFETY: monotonically increasing
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
Ok(offsets)
}
fn explode_and_offsets(&self) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
let ca = self.rechunk();
let arr = ca.downcast_iter().next().unwrap();
// fast-path for non-null array.
if arr.null_count() == 0 {
let s = Series::try_from((self.name().clone(), arr.values().clone()))
.unwrap()
.cast(ca.inner_dtype())?;
let width = self.width() as i64;
let offsets = (0..self.len() + 1)
.map(|i| {
let i = i as i64;
i * width
})
.collect::<Vec<_>>();
// SAFETY: monotonically increasing
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
return Ok((s, offsets));
}
// we have already ensure that validity is not none.
let validity = arr.validity().unwrap();
let values = arr.values();
let width = arr.size();
let mut indices = MutablePrimitiveArray::<IdxSize>::with_capacity(
values.len() - arr.null_count() * (width - 1),
);
let mut offsets = Vec::with_capacity(arr.len() + 1);
let mut current_offset = 0i64;
offsets.push(current_offset);
(0..arr.len()).for_each(|i| {
// SAFETY: we are within bounds
if unsafe { validity.get_bit_unchecked(i) } {
let start = (i * width) as IdxSize;
let end = start + width as IdxSize;
indices.extend_trusted_len_values(start..end);
current_offset += width as i64;
} else {
indices.push_null();
}
offsets.push(current_offset);
});
// SAFETY: the indices we generate are in bounds
let chunk = unsafe { take_unchecked(&**values, &indices.into()) };
// SAFETY: monotonically increasing
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
Ok((
// SAFETY: inner_dtype should be correct
unsafe {
Series::from_chunks_and_dtype_unchecked(
ca.name().clone(),
vec![chunk],
ca.inner_dtype(),
)
},
offsets,
))
}
}