1use arrow::offset::OffsetsBuffer;
2use polars_utils::pl_str::PlSmallStr;
3use rayon::prelude::*;
4#[cfg(feature = "serde")]
5use serde::{Deserialize, Serialize};
6
7use crate::POOL;
8use crate::chunked_array::ops::explode::offsets_to_indexes;
9use crate::prelude::*;
10use crate::series::IsSorted;
11
12fn get_exploded(
13 series: &Series,
14 options: ExplodeOptions,
15) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
16 match series.dtype() {
17 DataType::List(_) => series.list().unwrap().explode_and_offsets(options),
18 #[cfg(feature = "dtype-array")]
19 DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(options),
20 _ => polars_bail!(opq = explode, series.dtype()),
21 }
22}
23
24#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
27pub struct UnpivotArgsIR {
28 pub on: Vec<PlSmallStr>,
29 pub index: Vec<PlSmallStr>,
30 pub variable_name: PlSmallStr,
31 pub value_name: PlSmallStr,
32}
33
34impl UnpivotArgsIR {
35 pub fn new(
36 all_column_names: Vec<PlSmallStr>,
37 on: Option<Vec<PlSmallStr>>,
38 index: Vec<PlSmallStr>,
39 value_name: Option<PlSmallStr>,
40 variable_name: Option<PlSmallStr>,
41 ) -> Self {
42 let on = on.unwrap_or_else(|| {
43 let index_set = PlHashSet::from_iter(index.iter().cloned());
45 all_column_names
46 .into_iter()
47 .filter(|s| !index_set.contains(s))
48 .collect()
49 });
50
51 Self {
52 on,
53 index,
54 variable_name: variable_name.unwrap_or_else(|| PlSmallStr::from_static("variable")),
55 value_name: value_name.unwrap_or_else(|| PlSmallStr::from_static("value")),
56 }
57 }
58}
59
60impl DataFrame {
61 pub fn explode_impl(
62 &self,
63 mut columns: Vec<Column>,
64 options: ExplodeOptions,
65 ) -> PolarsResult<DataFrame> {
66 polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
67 let mut df = self.clone();
68 if self.shape_has_zero() {
69 for s in &columns {
70 df.with_column(s.as_materialized_series().explode(options)?.into_column())?;
71 }
72 return Ok(df);
73 }
74
75 columns.sort_by_key(|c| self.try_get_column_index(c.name()).unwrap());
76
77 for s in &columns {
79 df = df.drop(s.name().as_str())?;
80 }
81
82 let exploded_columns = POOL.install(|| {
83 columns
84 .par_iter()
85 .map(|c| get_exploded(c.as_materialized_series(), options))
86 .map(|s| s.map(|(s, o)| (Column::from(s), o)))
87 .collect::<PolarsResult<Vec<_>>>()
88 })?;
89
90 fn process_column(
91 original_df: &DataFrame,
92 df: &mut DataFrame,
93 exploded: Column,
94 ) -> PolarsResult<()> {
95 if df.shape() == (0, 0) {
96 unsafe { df.set_height(exploded.len()) };
97 }
98
99 if exploded.len() == df.height() {
100 let col_idx = original_df.try_get_column_index(exploded.name().as_str())?;
101 unsafe { df.columns_mut() }.insert(col_idx, exploded);
102 } else {
103 polars_bail!(
104 ShapeMismatch: "exploded column(s) {:?} doesn't have the same length: {} \
105 as the dataframe: {}", exploded.name(), exploded.name(), df.height(),
106 );
107 }
108 Ok(())
109 }
110
111 let check_offsets = || {
112 let first_offsets = exploded_columns[0].1.as_slice();
113 for (_, offsets) in &exploded_columns[1..] {
114 let offsets = offsets.as_slice();
115
116 let offset_l = first_offsets[0];
117 let offset_r = offsets[0];
118 let all_equal_len = first_offsets.len() != offsets.len() || {
119 first_offsets
120 .iter()
121 .zip(offsets.iter())
122 .all(|(l, r)| (*l - offset_l) == (*r - offset_r))
123 };
124
125 polars_ensure!(all_equal_len,
126 ShapeMismatch: "exploded columns must have matching element counts"
127 )
128 }
129 Ok(())
130 };
131 let process_first = || {
132 let validity = columns[0].rechunk_validity();
133 let (exploded, offsets) = &exploded_columns[0];
134
135 let row_idx = offsets_to_indexes(
136 offsets.as_slice(),
137 exploded.len(),
138 options,
139 validity.as_ref(),
140 );
141 let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
142 row_idx.set_sorted_flag(IsSorted::Ascending);
143
144 let mut df = unsafe { df.take_unchecked(&row_idx) };
147 process_column(self, &mut df, exploded.clone())?;
148 PolarsResult::Ok(df)
149 };
150 let (df, result) = POOL.join(process_first, check_offsets);
151 let mut df = df?;
152 result?;
153
154 for (exploded, _) in exploded_columns.into_iter().skip(1) {
155 process_column(self, &mut df, exploded)?
156 }
157
158 Ok(df)
159 }
160 pub fn explode<I, S>(&self, columns: I, options: ExplodeOptions) -> PolarsResult<DataFrame>
220 where
221 I: IntoIterator<Item = S>,
222 S: AsRef<str>,
223 {
224 let columns = self.select_to_vec(columns)?;
227 self.explode_impl(columns, options)
228 }
229}
230
231#[cfg(test)]
232mod test {
233 use crate::prelude::*;
234
235 #[test]
236 #[cfg(feature = "dtype-i8")]
237 #[cfg_attr(miri, ignore)]
238 fn test_explode() {
239 let s0 = Series::new(PlSmallStr::from_static("a"), &[1i8, 2, 3]);
240 let s1 = Series::new(PlSmallStr::from_static("b"), &[1i8, 1, 1]);
241 let s2 = Series::new(PlSmallStr::from_static("c"), &[2i8, 2, 2]);
242 let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1, s2]);
243
244 let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
245 let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
246 let df = DataFrame::new_infer_height(vec![list, s0, s1]).unwrap();
247 let exploded = df
248 .explode(
249 ["foo"],
250 ExplodeOptions {
251 empty_as_null: true,
252 keep_nulls: true,
253 },
254 )
255 .unwrap();
256 assert_eq!(exploded.shape(), (9, 3));
257 assert_eq!(
258 exploded
259 .column("C")
260 .unwrap()
261 .as_materialized_series()
262 .i32()
263 .unwrap()
264 .get(8),
265 Some(1)
266 );
267 assert_eq!(
268 exploded
269 .column("B")
270 .unwrap()
271 .as_materialized_series()
272 .i32()
273 .unwrap()
274 .get(8),
275 Some(3)
276 );
277 assert_eq!(
278 exploded
279 .column("foo")
280 .unwrap()
281 .as_materialized_series()
282 .i8()
283 .unwrap()
284 .get(8),
285 Some(2)
286 );
287 }
288
289 #[test]
290 #[cfg_attr(miri, ignore)]
291 fn test_explode_df_empty_list() -> PolarsResult<()> {
292 let s0 = Series::new(PlSmallStr::from_static("a"), &[1, 2, 3]);
293 let s1 = Series::new(PlSmallStr::from_static("b"), &[1, 1, 1]);
294 let list = Column::new(
295 PlSmallStr::from_static("foo"),
296 &[s0, s1.clone(), s1.clear()],
297 );
298 let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
299 let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
300 let df = DataFrame::new_infer_height(vec![list, s0.clone(), s1.clone()])?;
301
302 let out = df.explode(
303 ["foo"],
304 ExplodeOptions {
305 empty_as_null: true,
306 keep_nulls: true,
307 },
308 )?;
309 let expected = df![
310 "foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
311 "B" => [1, 1, 1, 2, 2, 2, 3],
312 "C" => [1, 1, 1, 1, 1, 1, 1],
313 ]?;
314
315 assert!(out.equals_missing(&expected));
316
317 let list = Column::new(
318 PlSmallStr::from_static("foo"),
319 [
320 s0.as_materialized_series().clone(),
321 s1.as_materialized_series().clear(),
322 s1.as_materialized_series().clone(),
323 ],
324 );
325 let df = DataFrame::new_infer_height(vec![list, s0, s1])?;
326 let out = df.explode(
327 ["foo"],
328 ExplodeOptions {
329 empty_as_null: true,
330 keep_nulls: true,
331 },
332 )?;
333 let expected = df![
334 "foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
335 "B" => [1, 1, 1, 2, 3, 3, 3],
336 "C" => [1, 1, 1, 1, 1, 1, 1],
337 ]?;
338
339 assert!(out.equals_missing(&expected));
340 Ok(())
341 }
342
343 #[test]
344 #[cfg_attr(miri, ignore)]
345 fn test_explode_single_col() -> PolarsResult<()> {
346 let s0 = Series::new(PlSmallStr::from_static("a"), &[1i32, 2, 3]);
347 let s1 = Series::new(PlSmallStr::from_static("b"), &[1i32, 1, 1]);
348 let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
349 let df = DataFrame::new_infer_height(vec![list])?;
350
351 let out = df.explode(
352 ["foo"],
353 ExplodeOptions {
354 empty_as_null: true,
355 keep_nulls: true,
356 },
357 )?;
358 let out = out
359 .column("foo")?
360 .as_materialized_series()
361 .i32()?
362 .into_no_null_iter()
363 .collect::<Vec<_>>();
364 assert_eq!(out, &[1i32, 2, 3, 1, 1, 1]);
365
366 Ok(())
367 }
368}