polars_core/frame/
explode.rs

1use arrow::offset::OffsetsBuffer;
2use polars_utils::pl_str::PlSmallStr;
3use rayon::prelude::*;
4#[cfg(feature = "serde")]
5use serde::{Deserialize, Serialize};
6
7use crate::POOL;
8use crate::chunked_array::ops::explode::offsets_to_indexes;
9use crate::prelude::*;
10use crate::series::IsSorted;
11
12fn get_exploded(
13    series: &Series,
14    options: ExplodeOptions,
15) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
16    match series.dtype() {
17        DataType::List(_) => series.list().unwrap().explode_and_offsets(options),
18        #[cfg(feature = "dtype-array")]
19        DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(options),
20        _ => polars_bail!(opq = explode, series.dtype()),
21    }
22}
23
24/// Arguments for `LazyFrame::unpivot` function
25#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
27pub struct UnpivotArgsIR {
28    pub on: Vec<PlSmallStr>,
29    pub index: Vec<PlSmallStr>,
30    pub variable_name: PlSmallStr,
31    pub value_name: PlSmallStr,
32}
33
34impl UnpivotArgsIR {
35    pub fn new(
36        all_column_names: Vec<PlSmallStr>,
37        on: Option<Vec<PlSmallStr>>,
38        index: Vec<PlSmallStr>,
39        value_name: Option<PlSmallStr>,
40        variable_name: Option<PlSmallStr>,
41    ) -> Self {
42        let on = on.unwrap_or_else(|| {
43            // If value vars is empty we take all columns that are not in id_vars.
44            let index_set = PlHashSet::from_iter(index.iter().cloned());
45            all_column_names
46                .into_iter()
47                .filter(|s| !index_set.contains(s))
48                .collect()
49        });
50
51        Self {
52            on,
53            index,
54            variable_name: variable_name.unwrap_or_else(|| PlSmallStr::from_static("variable")),
55            value_name: value_name.unwrap_or_else(|| PlSmallStr::from_static("value")),
56        }
57    }
58}
59
60impl DataFrame {
61    pub fn explode_impl(
62        &self,
63        mut columns: Vec<Column>,
64        options: ExplodeOptions,
65    ) -> PolarsResult<DataFrame> {
66        polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
67        let mut df = self.clone();
68        if self.shape_has_zero() {
69            for s in &columns {
70                df.with_column(s.as_materialized_series().explode(options)?.into_column())?;
71            }
72            return Ok(df);
73        }
74
75        columns.sort_by_key(|c| self.try_get_column_index(c.name()).unwrap());
76
77        // first remove all the exploded columns
78        for s in &columns {
79            df = df.drop(s.name().as_str())?;
80        }
81
82        let exploded_columns = POOL.install(|| {
83            columns
84                .par_iter()
85                .map(|c| get_exploded(c.as_materialized_series(), options))
86                .map(|s| s.map(|(s, o)| (Column::from(s), o)))
87                .collect::<PolarsResult<Vec<_>>>()
88        })?;
89
90        fn process_column(
91            original_df: &DataFrame,
92            df: &mut DataFrame,
93            exploded: Column,
94        ) -> PolarsResult<()> {
95            if df.shape() == (0, 0) {
96                unsafe { df.set_height(exploded.len()) };
97            }
98
99            if exploded.len() == df.height() {
100                let col_idx = original_df.try_get_column_index(exploded.name().as_str())?;
101                unsafe { df.columns_mut() }.insert(col_idx, exploded);
102            } else {
103                polars_bail!(
104                    ShapeMismatch: "exploded column(s) {:?} doesn't have the same length: {} \
105                    as the dataframe: {}", exploded.name(), exploded.name(), df.height(),
106                );
107            }
108            Ok(())
109        }
110
111        let check_offsets = || {
112            let first_offsets = exploded_columns[0].1.as_slice();
113            for (_, offsets) in &exploded_columns[1..] {
114                let offsets = offsets.as_slice();
115
116                let offset_l = first_offsets[0];
117                let offset_r = offsets[0];
118                let all_equal_len = first_offsets.len() != offsets.len() || {
119                    first_offsets
120                        .iter()
121                        .zip(offsets.iter())
122                        .all(|(l, r)| (*l - offset_l) == (*r - offset_r))
123                };
124
125                polars_ensure!(all_equal_len,
126                    ShapeMismatch: "exploded columns must have matching element counts"
127                )
128            }
129            Ok(())
130        };
131        let process_first = || {
132            let validity = columns[0].rechunk_validity();
133            let (exploded, offsets) = &exploded_columns[0];
134
135            let row_idx = offsets_to_indexes(
136                offsets.as_slice(),
137                exploded.len(),
138                options,
139                validity.as_ref(),
140            );
141            let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
142            row_idx.set_sorted_flag(IsSorted::Ascending);
143
144            // SAFETY:
145            // We just created indices that are in bounds.
146            let mut df = unsafe { df.take_unchecked(&row_idx) };
147            process_column(self, &mut df, exploded.clone())?;
148            PolarsResult::Ok(df)
149        };
150        let (df, result) = POOL.join(process_first, check_offsets);
151        let mut df = df?;
152        result?;
153
154        for (exploded, _) in exploded_columns.into_iter().skip(1) {
155            process_column(self, &mut df, exploded)?
156        }
157
158        Ok(df)
159    }
160    /// Explode `DataFrame` to long format by exploding a column with Lists.
161    ///
162    /// # Example
163    ///
164    /// ```ignore
165    /// # use polars_core::prelude::*;
166    /// let s0 = Series::new("a".into(), &[1i64, 2, 3]);
167    /// let s1 = Series::new("b".into(), &[1i64, 1, 1]);
168    /// let s2 = Series::new("c".into(), &[2i64, 2, 2]);
169    /// let list = Series::new("foo", &[s0, s1, s2]);
170    ///
171    /// let s0 = Series::new("B".into(), [1, 2, 3]);
172    /// let s1 = Series::new("C".into(), [1, 1, 1]);
173    /// let df = DataFrame::new_infer_height(vec![list, s0, s1])?;
174    /// let exploded = df.explode(["foo"])?;
175    ///
176    /// println!("{:?}", df);
177    /// println!("{:?}", exploded);
178    /// # Ok::<(), PolarsError>(())
179    /// ```
180    /// Outputs:
181    ///
182    /// ```text
183    ///  +-------------+-----+-----+
184    ///  | foo         | B   | C   |
185    ///  | ---         | --- | --- |
186    ///  | list [i64]  | i32 | i32 |
187    ///  +=============+=====+=====+
188    ///  | "[1, 2, 3]" | 1   | 1   |
189    ///  +-------------+-----+-----+
190    ///  | "[1, 1, 1]" | 2   | 1   |
191    ///  +-------------+-----+-----+
192    ///  | "[2, 2, 2]" | 3   | 1   |
193    ///  +-------------+-----+-----+
194    ///
195    ///  +-----+-----+-----+
196    ///  | foo | B   | C   |
197    ///  | --- | --- | --- |
198    ///  | i64 | i32 | i32 |
199    ///  +=====+=====+=====+
200    ///  | 1   | 1   | 1   |
201    ///  +-----+-----+-----+
202    ///  | 2   | 1   | 1   |
203    ///  +-----+-----+-----+
204    ///  | 3   | 1   | 1   |
205    ///  +-----+-----+-----+
206    ///  | 1   | 2   | 1   |
207    ///  +-----+-----+-----+
208    ///  | 1   | 2   | 1   |
209    ///  +-----+-----+-----+
210    ///  | 1   | 2   | 1   |
211    ///  +-----+-----+-----+
212    ///  | 2   | 3   | 1   |
213    ///  +-----+-----+-----+
214    ///  | 2   | 3   | 1   |
215    ///  +-----+-----+-----+
216    ///  | 2   | 3   | 1   |
217    ///  +-----+-----+-----+
218    /// ```
219    pub fn explode<I, S>(&self, columns: I, options: ExplodeOptions) -> PolarsResult<DataFrame>
220    where
221        I: IntoIterator<Item = S>,
222        S: AsRef<str>,
223    {
224        // We need to sort the column by order of original occurrence. Otherwise the insert by index
225        // below will panic
226        let columns = self.select_to_vec(columns)?;
227        self.explode_impl(columns, options)
228    }
229}
230
231#[cfg(test)]
232mod test {
233    use crate::prelude::*;
234
235    #[test]
236    #[cfg(feature = "dtype-i8")]
237    #[cfg_attr(miri, ignore)]
238    fn test_explode() {
239        let s0 = Series::new(PlSmallStr::from_static("a"), &[1i8, 2, 3]);
240        let s1 = Series::new(PlSmallStr::from_static("b"), &[1i8, 1, 1]);
241        let s2 = Series::new(PlSmallStr::from_static("c"), &[2i8, 2, 2]);
242        let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1, s2]);
243
244        let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
245        let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
246        let df = DataFrame::new_infer_height(vec![list, s0, s1]).unwrap();
247        let exploded = df
248            .explode(
249                ["foo"],
250                ExplodeOptions {
251                    empty_as_null: true,
252                    keep_nulls: true,
253                },
254            )
255            .unwrap();
256        assert_eq!(exploded.shape(), (9, 3));
257        assert_eq!(
258            exploded
259                .column("C")
260                .unwrap()
261                .as_materialized_series()
262                .i32()
263                .unwrap()
264                .get(8),
265            Some(1)
266        );
267        assert_eq!(
268            exploded
269                .column("B")
270                .unwrap()
271                .as_materialized_series()
272                .i32()
273                .unwrap()
274                .get(8),
275            Some(3)
276        );
277        assert_eq!(
278            exploded
279                .column("foo")
280                .unwrap()
281                .as_materialized_series()
282                .i8()
283                .unwrap()
284                .get(8),
285            Some(2)
286        );
287    }
288
289    #[test]
290    #[cfg_attr(miri, ignore)]
291    fn test_explode_df_empty_list() -> PolarsResult<()> {
292        let s0 = Series::new(PlSmallStr::from_static("a"), &[1, 2, 3]);
293        let s1 = Series::new(PlSmallStr::from_static("b"), &[1, 1, 1]);
294        let list = Column::new(
295            PlSmallStr::from_static("foo"),
296            &[s0, s1.clone(), s1.clear()],
297        );
298        let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
299        let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
300        let df = DataFrame::new_infer_height(vec![list, s0.clone(), s1.clone()])?;
301
302        let out = df.explode(
303            ["foo"],
304            ExplodeOptions {
305                empty_as_null: true,
306                keep_nulls: true,
307            },
308        )?;
309        let expected = df![
310            "foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
311            "B" => [1, 1, 1, 2, 2, 2, 3],
312            "C" => [1, 1, 1, 1, 1, 1, 1],
313        ]?;
314
315        assert!(out.equals_missing(&expected));
316
317        let list = Column::new(
318            PlSmallStr::from_static("foo"),
319            [
320                s0.as_materialized_series().clone(),
321                s1.as_materialized_series().clear(),
322                s1.as_materialized_series().clone(),
323            ],
324        );
325        let df = DataFrame::new_infer_height(vec![list, s0, s1])?;
326        let out = df.explode(
327            ["foo"],
328            ExplodeOptions {
329                empty_as_null: true,
330                keep_nulls: true,
331            },
332        )?;
333        let expected = df![
334            "foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
335            "B" => [1, 1, 1, 2, 3, 3, 3],
336            "C" => [1, 1, 1, 1, 1, 1, 1],
337        ]?;
338
339        assert!(out.equals_missing(&expected));
340        Ok(())
341    }
342
343    #[test]
344    #[cfg_attr(miri, ignore)]
345    fn test_explode_single_col() -> PolarsResult<()> {
346        let s0 = Series::new(PlSmallStr::from_static("a"), &[1i32, 2, 3]);
347        let s1 = Series::new(PlSmallStr::from_static("b"), &[1i32, 1, 1]);
348        let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
349        let df = DataFrame::new_infer_height(vec![list])?;
350
351        let out = df.explode(
352            ["foo"],
353            ExplodeOptions {
354                empty_as_null: true,
355                keep_nulls: true,
356            },
357        )?;
358        let out = out
359            .column("foo")?
360            .as_materialized_series()
361            .i32()?
362            .into_no_null_iter()
363            .collect::<Vec<_>>();
364        assert_eq!(out, &[1i32, 2, 3, 1, 1, 1]);
365
366        Ok(())
367    }
368}