polars_core/frame/
explode.rs

1use arrow::offset::OffsetsBuffer;
2use polars_utils::pl_str::PlSmallStr;
3use rayon::prelude::*;
4#[cfg(feature = "serde")]
5use serde::{Deserialize, Serialize};
6
7use crate::POOL;
8use crate::chunked_array::ops::explode::offsets_to_indexes;
9use crate::prelude::*;
10use crate::series::IsSorted;
11
12fn get_exploded(
13    series: &Series,
14    options: ExplodeOptions,
15) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
16    match series.dtype() {
17        DataType::List(_) => series.list().unwrap().explode_and_offsets(options),
18        #[cfg(feature = "dtype-array")]
19        DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(options),
20        _ => polars_bail!(opq = explode, series.dtype()),
21    }
22}
23
24/// Arguments for `LazyFrame::unpivot` function
25#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
27pub struct UnpivotArgsIR {
28    pub on: Vec<PlSmallStr>,
29    pub index: Vec<PlSmallStr>,
30    pub variable_name: PlSmallStr,
31    pub value_name: PlSmallStr,
32}
33
34impl UnpivotArgsIR {
35    pub fn new(
36        all_column_names: Vec<PlSmallStr>,
37        on: Option<Vec<PlSmallStr>>,
38        index: Vec<PlSmallStr>,
39        value_name: Option<PlSmallStr>,
40        variable_name: Option<PlSmallStr>,
41    ) -> Self {
42        let on = on.unwrap_or_else(|| {
43            // If value vars is empty we take all columns that are not in id_vars.
44            let index_set = PlHashSet::from_iter(index.iter().cloned());
45            all_column_names
46                .into_iter()
47                .filter(|s| !index_set.contains(s))
48                .collect()
49        });
50
51        Self {
52            on,
53            index,
54            variable_name: variable_name.unwrap_or_else(|| PlSmallStr::from_static("variable")),
55            value_name: value_name.unwrap_or_else(|| PlSmallStr::from_static("value")),
56        }
57    }
58}
59
60impl DataFrame {
61    pub fn explode_impl(
62        &self,
63        mut columns: Vec<Column>,
64        options: ExplodeOptions,
65    ) -> PolarsResult<DataFrame> {
66        polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
67        let mut df = self.clone();
68        if self.is_empty() {
69            for s in &columns {
70                df.with_column(s.as_materialized_series().explode(options)?)?;
71            }
72            return Ok(df);
73        }
74        columns.sort_by(|sa, sb| {
75            self.check_name_to_idx(sa.name().as_str())
76                .expect("checked above")
77                .partial_cmp(
78                    &self
79                        .check_name_to_idx(sb.name().as_str())
80                        .expect("checked above"),
81                )
82                .expect("cmp usize -> Ordering")
83        });
84
85        // first remove all the exploded columns
86        for s in &columns {
87            df = df.drop(s.name().as_str())?;
88        }
89
90        let exploded_columns = POOL.install(|| {
91            columns
92                .par_iter()
93                .map(|c| get_exploded(c.as_materialized_series(), options))
94                .map(|s| s.map(|(s, o)| (Column::from(s), o)))
95                .collect::<PolarsResult<Vec<_>>>()
96        })?;
97
98        fn process_column(
99            original_df: &DataFrame,
100            df: &mut DataFrame,
101            exploded: Column,
102        ) -> PolarsResult<()> {
103            if exploded.len() == df.height() || df.width() == 0 {
104                let col_idx = original_df.check_name_to_idx(exploded.name().as_str())?;
105                df.columns.insert(col_idx, exploded);
106            } else {
107                polars_bail!(
108                    ShapeMismatch: "exploded column(s) {:?} doesn't have the same length: {} \
109                    as the dataframe: {}", exploded.name(), exploded.name(), df.height(),
110                );
111            }
112            Ok(())
113        }
114
115        let check_offsets = || {
116            let first_offsets = exploded_columns[0].1.as_slice();
117            for (_, offsets) in &exploded_columns[1..] {
118                let offsets = offsets.as_slice();
119
120                let offset_l = first_offsets[0];
121                let offset_r = offsets[0];
122                let all_equal_len = first_offsets.len() != offsets.len() || {
123                    first_offsets
124                        .iter()
125                        .zip(offsets.iter())
126                        .all(|(l, r)| (*l - offset_l) == (*r - offset_r))
127                };
128
129                polars_ensure!(all_equal_len,
130                    ShapeMismatch: "exploded columns must have matching element counts"
131                )
132            }
133            Ok(())
134        };
135        let process_first = || {
136            let validity = columns[0].rechunk_validity();
137            let (exploded, offsets) = &exploded_columns[0];
138
139            let row_idx = offsets_to_indexes(
140                offsets.as_slice(),
141                exploded.len(),
142                options,
143                validity.as_ref(),
144            );
145            let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
146            row_idx.set_sorted_flag(IsSorted::Ascending);
147
148            // SAFETY:
149            // We just created indices that are in bounds.
150            let mut df = unsafe { df.take_unchecked(&row_idx) };
151            process_column(self, &mut df, exploded.clone())?;
152            PolarsResult::Ok(df)
153        };
154        let (df, result) = POOL.join(process_first, check_offsets);
155        let mut df = df?;
156        result?;
157
158        for (exploded, _) in exploded_columns.into_iter().skip(1) {
159            process_column(self, &mut df, exploded)?
160        }
161
162        Ok(df)
163    }
164    /// Explode `DataFrame` to long format by exploding a column with Lists.
165    ///
166    /// # Example
167    ///
168    /// ```ignore
169    /// # use polars_core::prelude::*;
170    /// let s0 = Series::new("a".into(), &[1i64, 2, 3]);
171    /// let s1 = Series::new("b".into(), &[1i64, 1, 1]);
172    /// let s2 = Series::new("c".into(), &[2i64, 2, 2]);
173    /// let list = Series::new("foo", &[s0, s1, s2]);
174    ///
175    /// let s0 = Series::new("B".into(), [1, 2, 3]);
176    /// let s1 = Series::new("C".into(), [1, 1, 1]);
177    /// let df = DataFrame::new(vec![list, s0, s1])?;
178    /// let exploded = df.explode(["foo"])?;
179    ///
180    /// println!("{:?}", df);
181    /// println!("{:?}", exploded);
182    /// # Ok::<(), PolarsError>(())
183    /// ```
184    /// Outputs:
185    ///
186    /// ```text
187    ///  +-------------+-----+-----+
188    ///  | foo         | B   | C   |
189    ///  | ---         | --- | --- |
190    ///  | list [i64]  | i32 | i32 |
191    ///  +=============+=====+=====+
192    ///  | "[1, 2, 3]" | 1   | 1   |
193    ///  +-------------+-----+-----+
194    ///  | "[1, 1, 1]" | 2   | 1   |
195    ///  +-------------+-----+-----+
196    ///  | "[2, 2, 2]" | 3   | 1   |
197    ///  +-------------+-----+-----+
198    ///
199    ///  +-----+-----+-----+
200    ///  | foo | B   | C   |
201    ///  | --- | --- | --- |
202    ///  | i64 | i32 | i32 |
203    ///  +=====+=====+=====+
204    ///  | 1   | 1   | 1   |
205    ///  +-----+-----+-----+
206    ///  | 2   | 1   | 1   |
207    ///  +-----+-----+-----+
208    ///  | 3   | 1   | 1   |
209    ///  +-----+-----+-----+
210    ///  | 1   | 2   | 1   |
211    ///  +-----+-----+-----+
212    ///  | 1   | 2   | 1   |
213    ///  +-----+-----+-----+
214    ///  | 1   | 2   | 1   |
215    ///  +-----+-----+-----+
216    ///  | 2   | 3   | 1   |
217    ///  +-----+-----+-----+
218    ///  | 2   | 3   | 1   |
219    ///  +-----+-----+-----+
220    ///  | 2   | 3   | 1   |
221    ///  +-----+-----+-----+
222    /// ```
223    pub fn explode<I, S>(&self, columns: I, options: ExplodeOptions) -> PolarsResult<DataFrame>
224    where
225        I: IntoIterator<Item = S>,
226        S: Into<PlSmallStr>,
227    {
228        // We need to sort the column by order of original occurrence. Otherwise the insert by index
229        // below will panic
230        let columns = self.select_columns(columns)?;
231        self.explode_impl(columns, options)
232    }
233}
234
235#[cfg(test)]
236mod test {
237    use crate::prelude::*;
238
239    #[test]
240    #[cfg(feature = "dtype-i8")]
241    #[cfg_attr(miri, ignore)]
242    fn test_explode() {
243        let s0 = Series::new(PlSmallStr::from_static("a"), &[1i8, 2, 3]);
244        let s1 = Series::new(PlSmallStr::from_static("b"), &[1i8, 1, 1]);
245        let s2 = Series::new(PlSmallStr::from_static("c"), &[2i8, 2, 2]);
246        let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1, s2]);
247
248        let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
249        let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
250        let df = DataFrame::new(vec![list, s0, s1]).unwrap();
251        let exploded = df
252            .explode(
253                ["foo"],
254                ExplodeOptions {
255                    empty_as_null: true,
256                    keep_nulls: true,
257                },
258            )
259            .unwrap();
260        assert_eq!(exploded.shape(), (9, 3));
261        assert_eq!(
262            exploded
263                .column("C")
264                .unwrap()
265                .as_materialized_series()
266                .i32()
267                .unwrap()
268                .get(8),
269            Some(1)
270        );
271        assert_eq!(
272            exploded
273                .column("B")
274                .unwrap()
275                .as_materialized_series()
276                .i32()
277                .unwrap()
278                .get(8),
279            Some(3)
280        );
281        assert_eq!(
282            exploded
283                .column("foo")
284                .unwrap()
285                .as_materialized_series()
286                .i8()
287                .unwrap()
288                .get(8),
289            Some(2)
290        );
291    }
292
293    #[test]
294    #[cfg_attr(miri, ignore)]
295    fn test_explode_df_empty_list() -> PolarsResult<()> {
296        let s0 = Series::new(PlSmallStr::from_static("a"), &[1, 2, 3]);
297        let s1 = Series::new(PlSmallStr::from_static("b"), &[1, 1, 1]);
298        let list = Column::new(
299            PlSmallStr::from_static("foo"),
300            &[s0, s1.clone(), s1.clear()],
301        );
302        let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
303        let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
304        let df = DataFrame::new(vec![list, s0.clone(), s1.clone()])?;
305
306        let out = df.explode(
307            ["foo"],
308            ExplodeOptions {
309                empty_as_null: true,
310                keep_nulls: true,
311            },
312        )?;
313        let expected = df![
314            "foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
315            "B" => [1, 1, 1, 2, 2, 2, 3],
316            "C" => [1, 1, 1, 1, 1, 1, 1],
317        ]?;
318
319        assert!(out.equals_missing(&expected));
320
321        let list = Column::new(
322            PlSmallStr::from_static("foo"),
323            [
324                s0.as_materialized_series().clone(),
325                s1.as_materialized_series().clear(),
326                s1.as_materialized_series().clone(),
327            ],
328        );
329        let df = DataFrame::new(vec![list, s0, s1])?;
330        let out = df.explode(
331            ["foo"],
332            ExplodeOptions {
333                empty_as_null: true,
334                keep_nulls: true,
335            },
336        )?;
337        let expected = df![
338            "foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
339            "B" => [1, 1, 1, 2, 3, 3, 3],
340            "C" => [1, 1, 1, 1, 1, 1, 1],
341        ]?;
342
343        assert!(out.equals_missing(&expected));
344        Ok(())
345    }
346
347    #[test]
348    #[cfg_attr(miri, ignore)]
349    fn test_explode_single_col() -> PolarsResult<()> {
350        let s0 = Series::new(PlSmallStr::from_static("a"), &[1i32, 2, 3]);
351        let s1 = Series::new(PlSmallStr::from_static("b"), &[1i32, 1, 1]);
352        let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
353        let df = DataFrame::new(vec![list])?;
354
355        let out = df.explode(
356            ["foo"],
357            ExplodeOptions {
358                empty_as_null: true,
359                keep_nulls: true,
360            },
361        )?;
362        let out = out
363            .column("foo")?
364            .as_materialized_series()
365            .i32()?
366            .into_no_null_iter()
367            .collect::<Vec<_>>();
368        assert_eq!(out, &[1i32, 2, 3, 1, 1, 1]);
369
370        Ok(())
371    }
372}