polars_core/chunked_array/ops/
explode_and_offsets.rs

1use arrow::offset::OffsetsBuffer;
2use polars_compute::gather::take_unchecked;
3
4use super::*;
5
6impl ListChunked {
7    fn explode_specialized(
8        &self,
9        values: ArrayRef,
10        offsets: &[i64],
11        offsets_buf: OffsetsBuffer<i64>,
12        options: ExplodeOptions,
13    ) -> (Series, OffsetsBuffer<i64>) {
14        // SAFETY: inner_dtype should be correct
15        let values = unsafe {
16            Series::from_chunks_and_dtype_unchecked(
17                self.name().clone(),
18                vec![values],
19                &self.inner_dtype().to_physical(),
20            )
21        };
22
23        use crate::chunked_array::ops::explode::ExplodeByOffsets;
24
25        let mut values = match values.dtype() {
26            DataType::Boolean => {
27                let t = values.bool().unwrap();
28                ExplodeByOffsets::explode_by_offsets(t, offsets, options).into_series()
29            },
30            DataType::Null => {
31                let t = values.null().unwrap();
32                ExplodeByOffsets::explode_by_offsets(t, offsets, options).into_series()
33            },
34            dtype => {
35                with_match_physical_numeric_polars_type!(dtype, |$T| {
36                    let t: &ChunkedArray<$T> = values.as_ref().as_ref();
37                    ExplodeByOffsets::explode_by_offsets(t, offsets, options).into_series()
38                })
39            },
40        };
41
42        // let mut values = values.explode_by_offsets(offsets);
43        // restore logical type
44        values = unsafe { values.from_physical_unchecked(self.inner_dtype()) }.unwrap();
45
46        (values, offsets_buf)
47    }
48}
49
50impl ChunkExplode for ListChunked {
51    fn offsets(&self) -> PolarsResult<OffsetsBuffer<i64>> {
52        let ca = self.rechunk();
53        let listarr: &LargeListArray = ca.downcast_iter().next().unwrap();
54        let offsets = listarr.offsets().clone();
55        Ok(offsets)
56    }
57
58    fn explode_and_offsets(
59        &self,
60        options: ExplodeOptions,
61    ) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
62        // A list array's memory layout is actually already 'exploded', so we can just take the
63        // values array of the list. And we also return a slice of the offsets. This slice can be
64        // used to find the old list layout or indexes to expand a DataFrame in the same manner as
65        // the `explode` operation.
66        let ca = self.rechunk();
67        let listarr: &LargeListArray = ca.downcast_iter().next().unwrap();
68        let offsets_buf = listarr.offsets().clone();
69        let offsets = listarr.offsets().as_slice();
70        let mut values = listarr.values().clone();
71
72        let (mut s, offsets) = if ca._can_fast_explode()
73            && (!options.keep_nulls || !ca.has_nulls())
74            && (!options.empty_as_null || !ca.has_empty_lists())
75        {
76            // ensure that the value array is sliced
77            // as a list only slices its offsets on a slice operation
78
79            // we only do this in fast-explode as for the other
80            // branch the offsets must coincide with the values.
81            if !offsets.is_empty() {
82                let start = offsets[0] as usize;
83                let len = offsets[offsets.len() - 1] as usize - start;
84                // SAFETY:
85                // we are in bounds
86                values = unsafe { values.sliced_unchecked(start, len) };
87            }
88            // SAFETY: inner_dtype should be correct
89            (
90                unsafe {
91                    Series::from_chunks_and_dtype_unchecked(
92                        self.name().clone(),
93                        vec![values],
94                        &self.inner_dtype().to_physical(),
95                    )
96                },
97                offsets_buf,
98            )
99        } else {
100            // during tests
101            // test that this code branch is not hit with list arrays that could be fast exploded
102            #[cfg(test)]
103            {
104                let mut last = offsets[0];
105                let mut has_empty = false;
106                for &o in &offsets[1..] {
107                    if o == last {
108                        has_empty = true;
109                    }
110                    last = o;
111                }
112                if !has_empty && offsets[0] == 0 {
113                    panic!("could have fast exploded")
114                }
115            }
116            let (indices, new_offsets) = if listarr.null_count() == 0 {
117                // SPECIALIZED path.
118                let inner_phys = self.inner_dtype().to_physical();
119                if inner_phys.is_primitive_numeric() || inner_phys.is_null() || inner_phys.is_bool()
120                {
121                    return Ok(self.explode_specialized(values, offsets, offsets_buf, options));
122                }
123                // Use gather
124                let mut indices =
125                    MutablePrimitiveArray::<IdxSize>::with_capacity(*offsets_buf.last() as usize);
126                let mut new_offsets = Vec::with_capacity(listarr.len() + 1);
127                let mut current_offset = 0i64;
128                let mut iter = offsets.iter();
129                if let Some(mut previous) = iter.next().copied() {
130                    new_offsets.push(current_offset);
131                    iter.for_each(|&offset| {
132                        let len = offset - previous;
133                        let start = previous as IdxSize;
134                        let end = offset as IdxSize;
135
136                        if options.empty_as_null && len == 0 {
137                            indices.push_null();
138                        } else {
139                            indices.extend_trusted_len_values(start..end);
140                        }
141                        current_offset += len;
142                        previous = offset;
143                        new_offsets.push(current_offset);
144                    })
145                }
146                (indices, new_offsets)
147            } else {
148                // we have already ensure that validity is not none.
149                let validity = listarr.validity().unwrap();
150
151                let mut indices =
152                    MutablePrimitiveArray::<IdxSize>::with_capacity(*offsets_buf.last() as usize);
153                let mut new_offsets = Vec::with_capacity(listarr.len() + 1);
154                let mut current_offset = 0i64;
155                let mut iter = offsets.iter();
156                if let Some(mut previous) = iter.next().copied() {
157                    new_offsets.push(current_offset);
158                    iter.enumerate().for_each(|(i, &offset)| {
159                        let len = offset - previous;
160                        let start = previous as IdxSize;
161                        let end = offset as IdxSize;
162                        // SAFETY: we are within bounds
163                        if unsafe { validity.get_bit_unchecked(i) } {
164                            // explode expects null value if sublist is empty.
165                            if options.empty_as_null && len == 0 {
166                                indices.push_null();
167                            } else {
168                                indices.extend_trusted_len_values(start..end);
169                            }
170                            current_offset += len;
171                        } else if options.keep_nulls {
172                            indices.push_null();
173                        }
174                        previous = offset;
175                        new_offsets.push(current_offset);
176                    })
177                }
178                (indices, new_offsets)
179            };
180
181            // SAFETY: the indices we generate are in bounds
182            let chunk = unsafe { take_unchecked(values.as_ref(), &indices.into()) };
183            // SAFETY: inner_dtype should be correct
184            let s = unsafe {
185                Series::from_chunks_and_dtype_unchecked(
186                    self.name().clone(),
187                    vec![chunk],
188                    &self.inner_dtype().to_physical(),
189                )
190            };
191            // SAFETY: monotonically increasing
192            let new_offsets = unsafe { OffsetsBuffer::new_unchecked(new_offsets.into()) };
193            (s, new_offsets)
194        };
195        debug_assert_eq!(s.name(), self.name());
196        // restore logical type
197        s = unsafe { s.from_physical_unchecked(self.inner_dtype()) }.unwrap();
198
199        Ok((s, offsets))
200    }
201}
202
203#[cfg(feature = "dtype-array")]
204impl ChunkExplode for ArrayChunked {
205    fn offsets(&self) -> PolarsResult<OffsetsBuffer<i64>> {
206        // fast-path for non-null array.
207        if self.null_count() == 0 {
208            let width = self.width() as i64;
209            let offsets = (0..self.len() + 1)
210                .map(|i| {
211                    let i = i as i64;
212                    i * width
213                })
214                .collect::<Vec<_>>();
215            // SAFETY: monotonically increasing
216            let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
217
218            return Ok(offsets);
219        }
220
221        let ca = self.rechunk();
222        let arr = ca.downcast_iter().next().unwrap();
223        // we have already ensure that validity is not none.
224        let validity = arr.validity().unwrap();
225        let width = arr.size();
226
227        let mut current_offset = 0i64;
228        let offsets = (0..=arr.len())
229            .map(|i| {
230                if i == 0 {
231                    return current_offset;
232                }
233                // SAFETY: we are within bounds
234                if unsafe { validity.get_bit_unchecked(i - 1) } {
235                    current_offset += width as i64
236                }
237                current_offset
238            })
239            .collect::<Vec<_>>();
240        // SAFETY: monotonically increasing
241        let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
242        Ok(offsets)
243    }
244
245    fn explode_and_offsets(
246        &self,
247        options: ExplodeOptions,
248    ) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
249        if self.width() == 0 {
250            let mut num_nulls = 0;
251            if options.empty_as_null {
252                num_nulls += self.len() - self.null_count();
253            }
254            if options.keep_nulls {
255                num_nulls += self.null_count();
256            }
257            let offsets = (0..num_nulls as i64 + 1).collect::<Vec<i64>>();
258            // SAFETY: monotonically increasing
259            let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
260            let s = Column::new_scalar(
261                self.name().clone(),
262                Scalar::null(self.inner_dtype().clone()),
263                num_nulls,
264            )
265            .take_materialized_series();
266
267            return Ok((s, offsets));
268        }
269
270        let ca = self.rechunk();
271        let arr = ca.downcast_iter().next().unwrap();
272        // fast-path for non-null array.
273        if arr.null_count() == 0 {
274            let s = unsafe {
275                Series::from_chunks_and_dtype_unchecked(
276                    self.name().clone(),
277                    vec![arr.values().clone()],
278                    ca.inner_dtype(),
279                )
280            };
281            let width = self.width() as i64;
282            let offsets = (0..self.len() + 1)
283                .map(|i| {
284                    let i = i as i64;
285                    i * width
286                })
287                .collect::<Vec<_>>();
288            // SAFETY: monotonically increasing
289            let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
290            return Ok((s, offsets));
291        }
292
293        // we have already ensure that validity is not none.
294        let validity = arr.validity().unwrap();
295        let values = arr.values();
296        let width = arr.size();
297
298        let mut indices = MutablePrimitiveArray::<IdxSize>::with_capacity(
299            values.len() - arr.null_count() * (width - 1),
300        );
301        let mut offsets = Vec::with_capacity(arr.len() + 1);
302        let mut current_offset = 0i64;
303        offsets.push(current_offset);
304        (0..arr.len()).for_each(|i| {
305            // SAFETY: we are within bounds
306            if unsafe { validity.get_bit_unchecked(i) } {
307                let start = (i * width) as IdxSize;
308                let end = start + width as IdxSize;
309                indices.extend_trusted_len_values(start..end);
310                current_offset += width as i64;
311            } else if options.keep_nulls {
312                indices.push_null();
313            }
314            offsets.push(current_offset);
315        });
316
317        // SAFETY: the indices we generate are in bounds
318        let chunk = unsafe { take_unchecked(&**values, &indices.into()) };
319        // SAFETY: monotonically increasing
320        let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
321
322        Ok((
323            // SAFETY: inner_dtype should be correct
324            unsafe {
325                Series::from_chunks_and_dtype_unchecked(
326                    ca.name().clone(),
327                    vec![chunk],
328                    ca.inner_dtype(),
329                )
330            },
331            offsets,
332        ))
333    }
334}