1use arrow::offset::OffsetsBuffer;
2use polars_utils::pl_str::PlSmallStr;
3use rayon::prelude::*;
4#[cfg(feature = "serde")]
5use serde::{Deserialize, Serialize};
6
7use crate::POOL;
8use crate::chunked_array::ops::explode::offsets_to_indexes;
9use crate::prelude::*;
10use crate::series::IsSorted;
11
12fn get_exploded(
13 series: &Series,
14 options: ExplodeOptions,
15) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
16 match series.dtype() {
17 DataType::List(_) => series.list().unwrap().explode_and_offsets(options),
18 #[cfg(feature = "dtype-array")]
19 DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(options),
20 _ => polars_bail!(opq = explode, series.dtype()),
21 }
22}
23
24#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
27pub struct UnpivotArgsIR {
28 pub on: Vec<PlSmallStr>,
29 pub index: Vec<PlSmallStr>,
30 pub variable_name: PlSmallStr,
31 pub value_name: PlSmallStr,
32}
33
34impl UnpivotArgsIR {
35 pub fn new(
36 all_column_names: Vec<PlSmallStr>,
37 on: Option<Vec<PlSmallStr>>,
38 index: Vec<PlSmallStr>,
39 value_name: Option<PlSmallStr>,
40 variable_name: Option<PlSmallStr>,
41 ) -> Self {
42 let on = on.unwrap_or_else(|| {
43 let index_set = PlHashSet::from_iter(index.iter().cloned());
45 all_column_names
46 .into_iter()
47 .filter(|s| !index_set.contains(s))
48 .collect()
49 });
50
51 Self {
52 on,
53 index,
54 variable_name: variable_name.unwrap_or_else(|| PlSmallStr::from_static("variable")),
55 value_name: value_name.unwrap_or_else(|| PlSmallStr::from_static("value")),
56 }
57 }
58}
59
60impl DataFrame {
61 pub fn explode_impl(
62 &self,
63 mut columns: Vec<Column>,
64 options: ExplodeOptions,
65 ) -> PolarsResult<DataFrame> {
66 polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
67 let mut df = self.clone();
68 if self.is_empty() {
69 for s in &columns {
70 df.with_column(s.as_materialized_series().explode(options)?)?;
71 }
72 return Ok(df);
73 }
74 columns.sort_by(|sa, sb| {
75 self.check_name_to_idx(sa.name().as_str())
76 .expect("checked above")
77 .partial_cmp(
78 &self
79 .check_name_to_idx(sb.name().as_str())
80 .expect("checked above"),
81 )
82 .expect("cmp usize -> Ordering")
83 });
84
85 for s in &columns {
87 df = df.drop(s.name().as_str())?;
88 }
89
90 let exploded_columns = POOL.install(|| {
91 columns
92 .par_iter()
93 .map(|c| get_exploded(c.as_materialized_series(), options))
94 .map(|s| s.map(|(s, o)| (Column::from(s), o)))
95 .collect::<PolarsResult<Vec<_>>>()
96 })?;
97
98 fn process_column(
99 original_df: &DataFrame,
100 df: &mut DataFrame,
101 exploded: Column,
102 ) -> PolarsResult<()> {
103 if exploded.len() == df.height() || df.width() == 0 {
104 let col_idx = original_df.check_name_to_idx(exploded.name().as_str())?;
105 df.columns.insert(col_idx, exploded);
106 } else {
107 polars_bail!(
108 ShapeMismatch: "exploded column(s) {:?} doesn't have the same length: {} \
109 as the dataframe: {}", exploded.name(), exploded.name(), df.height(),
110 );
111 }
112 Ok(())
113 }
114
115 let check_offsets = || {
116 let first_offsets = exploded_columns[0].1.as_slice();
117 for (_, offsets) in &exploded_columns[1..] {
118 let offsets = offsets.as_slice();
119
120 let offset_l = first_offsets[0];
121 let offset_r = offsets[0];
122 let all_equal_len = first_offsets.len() != offsets.len() || {
123 first_offsets
124 .iter()
125 .zip(offsets.iter())
126 .all(|(l, r)| (*l - offset_l) == (*r - offset_r))
127 };
128
129 polars_ensure!(all_equal_len,
130 ShapeMismatch: "exploded columns must have matching element counts"
131 )
132 }
133 Ok(())
134 };
135 let process_first = || {
136 let validity = columns[0].rechunk_validity();
137 let (exploded, offsets) = &exploded_columns[0];
138
139 let row_idx = offsets_to_indexes(
140 offsets.as_slice(),
141 exploded.len(),
142 options,
143 validity.as_ref(),
144 );
145 let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
146 row_idx.set_sorted_flag(IsSorted::Ascending);
147
148 let mut df = unsafe { df.take_unchecked(&row_idx) };
151 process_column(self, &mut df, exploded.clone())?;
152 PolarsResult::Ok(df)
153 };
154 let (df, result) = POOL.join(process_first, check_offsets);
155 let mut df = df?;
156 result?;
157
158 for (exploded, _) in exploded_columns.into_iter().skip(1) {
159 process_column(self, &mut df, exploded)?
160 }
161
162 Ok(df)
163 }
164 pub fn explode<I, S>(&self, columns: I, options: ExplodeOptions) -> PolarsResult<DataFrame>
224 where
225 I: IntoIterator<Item = S>,
226 S: Into<PlSmallStr>,
227 {
228 let columns = self.select_columns(columns)?;
231 self.explode_impl(columns, options)
232 }
233}
234
235#[cfg(test)]
236mod test {
237 use crate::prelude::*;
238
239 #[test]
240 #[cfg(feature = "dtype-i8")]
241 #[cfg_attr(miri, ignore)]
242 fn test_explode() {
243 let s0 = Series::new(PlSmallStr::from_static("a"), &[1i8, 2, 3]);
244 let s1 = Series::new(PlSmallStr::from_static("b"), &[1i8, 1, 1]);
245 let s2 = Series::new(PlSmallStr::from_static("c"), &[2i8, 2, 2]);
246 let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1, s2]);
247
248 let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
249 let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
250 let df = DataFrame::new(vec![list, s0, s1]).unwrap();
251 let exploded = df
252 .explode(
253 ["foo"],
254 ExplodeOptions {
255 empty_as_null: true,
256 keep_nulls: true,
257 },
258 )
259 .unwrap();
260 assert_eq!(exploded.shape(), (9, 3));
261 assert_eq!(
262 exploded
263 .column("C")
264 .unwrap()
265 .as_materialized_series()
266 .i32()
267 .unwrap()
268 .get(8),
269 Some(1)
270 );
271 assert_eq!(
272 exploded
273 .column("B")
274 .unwrap()
275 .as_materialized_series()
276 .i32()
277 .unwrap()
278 .get(8),
279 Some(3)
280 );
281 assert_eq!(
282 exploded
283 .column("foo")
284 .unwrap()
285 .as_materialized_series()
286 .i8()
287 .unwrap()
288 .get(8),
289 Some(2)
290 );
291 }
292
293 #[test]
294 #[cfg_attr(miri, ignore)]
295 fn test_explode_df_empty_list() -> PolarsResult<()> {
296 let s0 = Series::new(PlSmallStr::from_static("a"), &[1, 2, 3]);
297 let s1 = Series::new(PlSmallStr::from_static("b"), &[1, 1, 1]);
298 let list = Column::new(
299 PlSmallStr::from_static("foo"),
300 &[s0, s1.clone(), s1.clear()],
301 );
302 let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
303 let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
304 let df = DataFrame::new(vec![list, s0.clone(), s1.clone()])?;
305
306 let out = df.explode(
307 ["foo"],
308 ExplodeOptions {
309 empty_as_null: true,
310 keep_nulls: true,
311 },
312 )?;
313 let expected = df![
314 "foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
315 "B" => [1, 1, 1, 2, 2, 2, 3],
316 "C" => [1, 1, 1, 1, 1, 1, 1],
317 ]?;
318
319 assert!(out.equals_missing(&expected));
320
321 let list = Column::new(
322 PlSmallStr::from_static("foo"),
323 [
324 s0.as_materialized_series().clone(),
325 s1.as_materialized_series().clear(),
326 s1.as_materialized_series().clone(),
327 ],
328 );
329 let df = DataFrame::new(vec![list, s0, s1])?;
330 let out = df.explode(
331 ["foo"],
332 ExplodeOptions {
333 empty_as_null: true,
334 keep_nulls: true,
335 },
336 )?;
337 let expected = df![
338 "foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
339 "B" => [1, 1, 1, 2, 3, 3, 3],
340 "C" => [1, 1, 1, 1, 1, 1, 1],
341 ]?;
342
343 assert!(out.equals_missing(&expected));
344 Ok(())
345 }
346
347 #[test]
348 #[cfg_attr(miri, ignore)]
349 fn test_explode_single_col() -> PolarsResult<()> {
350 let s0 = Series::new(PlSmallStr::from_static("a"), &[1i32, 2, 3]);
351 let s1 = Series::new(PlSmallStr::from_static("b"), &[1i32, 1, 1]);
352 let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
353 let df = DataFrame::new(vec![list])?;
354
355 let out = df.explode(
356 ["foo"],
357 ExplodeOptions {
358 empty_as_null: true,
359 keep_nulls: true,
360 },
361 )?;
362 let out = out
363 .column("foo")?
364 .as_materialized_series()
365 .i32()?
366 .into_no_null_iter()
367 .collect::<Vec<_>>();
368 assert_eq!(out, &[1i32, 2, 3, 1, 1, 1]);
369
370 Ok(())
371 }
372}