1use arrow::offset::OffsetsBuffer;
2use polars_utils::pl_str::PlSmallStr;
3use rayon::prelude::*;
4#[cfg(feature = "serde")]
5use serde::{Deserialize, Serialize};
6
7use crate::POOL;
8use crate::chunked_array::ops::explode::offsets_to_indexes;
9use crate::prelude::*;
10use crate::series::IsSorted;
11
12fn get_exploded(series: &Series) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
13 match series.dtype() {
14 DataType::List(_) => series.list().unwrap().explode_and_offsets(),
15 #[cfg(feature = "dtype-array")]
16 DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(),
17 _ => polars_bail!(opq = explode, series.dtype()),
18 }
19}
20
21#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
23#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
24pub struct UnpivotArgsIR {
25 pub on: Vec<PlSmallStr>,
26 pub index: Vec<PlSmallStr>,
27 pub variable_name: Option<PlSmallStr>,
28 pub value_name: Option<PlSmallStr>,
29}
30
31impl DataFrame {
32 pub fn explode_impl(&self, mut columns: Vec<Column>) -> PolarsResult<DataFrame> {
33 polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
34 let mut df = self.clone();
35 if self.is_empty() {
36 for s in &columns {
37 df.with_column(s.as_materialized_series().explode()?)?;
38 }
39 return Ok(df);
40 }
41 columns.sort_by(|sa, sb| {
42 self.check_name_to_idx(sa.name().as_str())
43 .expect("checked above")
44 .partial_cmp(
45 &self
46 .check_name_to_idx(sb.name().as_str())
47 .expect("checked above"),
48 )
49 .expect("cmp usize -> Ordering")
50 });
51
52 for s in &columns {
54 df = df.drop(s.name().as_str())?;
55 }
56
57 let exploded_columns = POOL.install(|| {
58 columns
59 .par_iter()
60 .map(Column::as_materialized_series)
61 .map(get_exploded)
62 .map(|s| s.map(|(s, o)| (Column::from(s), o)))
63 .collect::<PolarsResult<Vec<_>>>()
64 })?;
65
66 fn process_column(
67 original_df: &DataFrame,
68 df: &mut DataFrame,
69 exploded: Column,
70 ) -> PolarsResult<()> {
71 if exploded.len() == df.height() || df.width() == 0 {
72 let col_idx = original_df.check_name_to_idx(exploded.name().as_str())?;
73 df.columns.insert(col_idx, exploded);
74 } else {
75 polars_bail!(
76 ShapeMismatch: "exploded column(s) {:?} doesn't have the same length: {} \
77 as the dataframe: {}", exploded.name(), exploded.name(), df.height(),
78 );
79 }
80 Ok(())
81 }
82
83 let check_offsets = || {
84 let first_offsets = exploded_columns[0].1.as_slice();
85 for (_, offsets) in &exploded_columns[1..] {
86 let offsets = offsets.as_slice();
87
88 let offset_l = first_offsets[0];
89 let offset_r = offsets[0];
90 let all_equal_len = first_offsets.len() != offsets.len() || {
91 first_offsets
92 .iter()
93 .zip(offsets.iter())
94 .all(|(l, r)| (*l - offset_l) == (*r - offset_r))
95 };
96
97 polars_ensure!(all_equal_len,
98 ShapeMismatch: "exploded columns must have matching element counts"
99 )
100 }
101 Ok(())
102 };
103 let process_first = || {
104 let (exploded, offsets) = &exploded_columns[0];
105
106 let row_idx = offsets_to_indexes(offsets.as_slice(), exploded.len());
107 let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
108 row_idx.set_sorted_flag(IsSorted::Ascending);
109
110 let mut df = unsafe { df.take_unchecked(&row_idx) };
113 process_column(self, &mut df, exploded.clone())?;
114 PolarsResult::Ok(df)
115 };
116 let (df, result) = POOL.join(process_first, check_offsets);
117 let mut df = df?;
118 result?;
119
120 for (exploded, _) in exploded_columns.into_iter().skip(1) {
121 process_column(self, &mut df, exploded)?
122 }
123
124 Ok(df)
125 }
126 pub fn explode<I, S>(&self, columns: I) -> PolarsResult<DataFrame>
186 where
187 I: IntoIterator<Item = S>,
188 S: Into<PlSmallStr>,
189 {
190 let columns = self.select_columns(columns)?;
193 self.explode_impl(columns)
194 }
195}
196
197#[cfg(test)]
198mod test {
199 use crate::prelude::*;
200
201 #[test]
202 #[cfg(feature = "dtype-i8")]
203 #[cfg_attr(miri, ignore)]
204 fn test_explode() {
205 let s0 = Series::new(PlSmallStr::from_static("a"), &[1i8, 2, 3]);
206 let s1 = Series::new(PlSmallStr::from_static("b"), &[1i8, 1, 1]);
207 let s2 = Series::new(PlSmallStr::from_static("c"), &[2i8, 2, 2]);
208 let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1, s2]);
209
210 let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
211 let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
212 let df = DataFrame::new(vec![list, s0.clone(), s1.clone()]).unwrap();
213 let exploded = df.explode(["foo"]).unwrap();
214 assert_eq!(exploded.shape(), (9, 3));
215 assert_eq!(
216 exploded
217 .column("C")
218 .unwrap()
219 .as_materialized_series()
220 .i32()
221 .unwrap()
222 .get(8),
223 Some(1)
224 );
225 assert_eq!(
226 exploded
227 .column("B")
228 .unwrap()
229 .as_materialized_series()
230 .i32()
231 .unwrap()
232 .get(8),
233 Some(3)
234 );
235 assert_eq!(
236 exploded
237 .column("foo")
238 .unwrap()
239 .as_materialized_series()
240 .i8()
241 .unwrap()
242 .get(8),
243 Some(2)
244 );
245 }
246
247 #[test]
248 #[cfg_attr(miri, ignore)]
249 fn test_explode_df_empty_list() -> PolarsResult<()> {
250 let s0 = Series::new(PlSmallStr::from_static("a"), &[1, 2, 3]);
251 let s1 = Series::new(PlSmallStr::from_static("b"), &[1, 1, 1]);
252 let list = Column::new(
253 PlSmallStr::from_static("foo"),
254 &[s0, s1.clone(), s1.clear()],
255 );
256 let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
257 let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
258 let df = DataFrame::new(vec![list, s0.clone(), s1.clone()])?;
259
260 let out = df.explode(["foo"])?;
261 let expected = df![
262 "foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
263 "B" => [1, 1, 1, 2, 2, 2, 3],
264 "C" => [1, 1, 1, 1, 1, 1, 1],
265 ]?;
266
267 assert!(out.equals_missing(&expected));
268
269 let list = Column::new(
270 PlSmallStr::from_static("foo"),
271 [
272 s0.as_materialized_series().clone(),
273 s1.as_materialized_series().clear(),
274 s1.as_materialized_series().clone(),
275 ],
276 );
277 let df = DataFrame::new(vec![list, s0, s1])?;
278 let out = df.explode(["foo"])?;
279 let expected = df![
280 "foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
281 "B" => [1, 1, 1, 2, 3, 3, 3],
282 "C" => [1, 1, 1, 1, 1, 1, 1],
283 ]?;
284
285 assert!(out.equals_missing(&expected));
286 Ok(())
287 }
288
289 #[test]
290 #[cfg_attr(miri, ignore)]
291 fn test_explode_single_col() -> PolarsResult<()> {
292 let s0 = Series::new(PlSmallStr::from_static("a"), &[1i32, 2, 3]);
293 let s1 = Series::new(PlSmallStr::from_static("b"), &[1i32, 1, 1]);
294 let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
295 let df = DataFrame::new(vec![list])?;
296
297 let out = df.explode(["foo"])?;
298 let out = out
299 .column("foo")?
300 .as_materialized_series()
301 .i32()?
302 .into_no_null_iter()
303 .collect::<Vec<_>>();
304 assert_eq!(out, &[1i32, 2, 3, 1, 1, 1]);
305
306 Ok(())
307 }
308}