1use arrow::offset::OffsetsBuffer;
2use polars_utils::pl_str::PlSmallStr;
3use rayon::prelude::*;
4#[cfg(feature = "serde")]
5use serde::{Deserialize, Serialize};
6
7use crate::POOL;
8use crate::chunked_array::ops::explode::offsets_to_indexes;
9use crate::prelude::*;
10use crate::series::IsSorted;
11
12fn get_exploded(
13 series: &Series,
14 options: ExplodeOptions,
15) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
16 match series.dtype() {
17 DataType::List(_) => series.list().unwrap().explode_and_offsets(options),
18 #[cfg(feature = "dtype-array")]
19 DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(options),
20 _ => polars_bail!(opq = explode, series.dtype()),
21 }
22}
23
24#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
26#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
27pub struct UnpivotArgsIR {
28 pub on: Vec<PlSmallStr>,
29 pub index: Vec<PlSmallStr>,
30 pub variable_name: Option<PlSmallStr>,
31 pub value_name: Option<PlSmallStr>,
32}
33
34impl DataFrame {
35 pub fn explode_impl(
36 &self,
37 mut columns: Vec<Column>,
38 options: ExplodeOptions,
39 ) -> PolarsResult<DataFrame> {
40 polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
41 let mut df = self.clone();
42 if self.is_empty() {
43 for s in &columns {
44 df.with_column(s.as_materialized_series().explode(options)?)?;
45 }
46 return Ok(df);
47 }
48 columns.sort_by(|sa, sb| {
49 self.check_name_to_idx(sa.name().as_str())
50 .expect("checked above")
51 .partial_cmp(
52 &self
53 .check_name_to_idx(sb.name().as_str())
54 .expect("checked above"),
55 )
56 .expect("cmp usize -> Ordering")
57 });
58
59 for s in &columns {
61 df = df.drop(s.name().as_str())?;
62 }
63
64 let exploded_columns = POOL.install(|| {
65 columns
66 .par_iter()
67 .map(|c| get_exploded(c.as_materialized_series(), options))
68 .map(|s| s.map(|(s, o)| (Column::from(s), o)))
69 .collect::<PolarsResult<Vec<_>>>()
70 })?;
71
72 fn process_column(
73 original_df: &DataFrame,
74 df: &mut DataFrame,
75 exploded: Column,
76 ) -> PolarsResult<()> {
77 if exploded.len() == df.height() || df.width() == 0 {
78 let col_idx = original_df.check_name_to_idx(exploded.name().as_str())?;
79 df.columns.insert(col_idx, exploded);
80 } else {
81 polars_bail!(
82 ShapeMismatch: "exploded column(s) {:?} doesn't have the same length: {} \
83 as the dataframe: {}", exploded.name(), exploded.name(), df.height(),
84 );
85 }
86 Ok(())
87 }
88
89 let check_offsets = || {
90 let first_offsets = exploded_columns[0].1.as_slice();
91 for (_, offsets) in &exploded_columns[1..] {
92 let offsets = offsets.as_slice();
93
94 let offset_l = first_offsets[0];
95 let offset_r = offsets[0];
96 let all_equal_len = first_offsets.len() != offsets.len() || {
97 first_offsets
98 .iter()
99 .zip(offsets.iter())
100 .all(|(l, r)| (*l - offset_l) == (*r - offset_r))
101 };
102
103 polars_ensure!(all_equal_len,
104 ShapeMismatch: "exploded columns must have matching element counts"
105 )
106 }
107 Ok(())
108 };
109 let process_first = || {
110 let validity = columns[0].rechunk_validity();
111 let (exploded, offsets) = &exploded_columns[0];
112
113 let row_idx = offsets_to_indexes(
114 offsets.as_slice(),
115 exploded.len(),
116 options,
117 validity.as_ref(),
118 );
119 let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
120 row_idx.set_sorted_flag(IsSorted::Ascending);
121
122 let mut df = unsafe { df.take_unchecked(&row_idx) };
125 process_column(self, &mut df, exploded.clone())?;
126 PolarsResult::Ok(df)
127 };
128 let (df, result) = POOL.join(process_first, check_offsets);
129 let mut df = df?;
130 result?;
131
132 for (exploded, _) in exploded_columns.into_iter().skip(1) {
133 process_column(self, &mut df, exploded)?
134 }
135
136 Ok(df)
137 }
138 pub fn explode<I, S>(&self, columns: I, options: ExplodeOptions) -> PolarsResult<DataFrame>
198 where
199 I: IntoIterator<Item = S>,
200 S: Into<PlSmallStr>,
201 {
202 let columns = self.select_columns(columns)?;
205 self.explode_impl(columns, options)
206 }
207}
208
209#[cfg(test)]
210mod test {
211 use crate::prelude::*;
212
213 #[test]
214 #[cfg(feature = "dtype-i8")]
215 #[cfg_attr(miri, ignore)]
216 fn test_explode() {
217 let s0 = Series::new(PlSmallStr::from_static("a"), &[1i8, 2, 3]);
218 let s1 = Series::new(PlSmallStr::from_static("b"), &[1i8, 1, 1]);
219 let s2 = Series::new(PlSmallStr::from_static("c"), &[2i8, 2, 2]);
220 let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1, s2]);
221
222 let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
223 let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
224 let df = DataFrame::new(vec![list, s0, s1]).unwrap();
225 let exploded = df
226 .explode(
227 ["foo"],
228 ExplodeOptions {
229 empty_as_null: true,
230 keep_nulls: true,
231 },
232 )
233 .unwrap();
234 assert_eq!(exploded.shape(), (9, 3));
235 assert_eq!(
236 exploded
237 .column("C")
238 .unwrap()
239 .as_materialized_series()
240 .i32()
241 .unwrap()
242 .get(8),
243 Some(1)
244 );
245 assert_eq!(
246 exploded
247 .column("B")
248 .unwrap()
249 .as_materialized_series()
250 .i32()
251 .unwrap()
252 .get(8),
253 Some(3)
254 );
255 assert_eq!(
256 exploded
257 .column("foo")
258 .unwrap()
259 .as_materialized_series()
260 .i8()
261 .unwrap()
262 .get(8),
263 Some(2)
264 );
265 }
266
267 #[test]
268 #[cfg_attr(miri, ignore)]
269 fn test_explode_df_empty_list() -> PolarsResult<()> {
270 let s0 = Series::new(PlSmallStr::from_static("a"), &[1, 2, 3]);
271 let s1 = Series::new(PlSmallStr::from_static("b"), &[1, 1, 1]);
272 let list = Column::new(
273 PlSmallStr::from_static("foo"),
274 &[s0, s1.clone(), s1.clear()],
275 );
276 let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
277 let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
278 let df = DataFrame::new(vec![list, s0.clone(), s1.clone()])?;
279
280 let out = df.explode(
281 ["foo"],
282 ExplodeOptions {
283 empty_as_null: true,
284 keep_nulls: true,
285 },
286 )?;
287 let expected = df![
288 "foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
289 "B" => [1, 1, 1, 2, 2, 2, 3],
290 "C" => [1, 1, 1, 1, 1, 1, 1],
291 ]?;
292
293 assert!(out.equals_missing(&expected));
294
295 let list = Column::new(
296 PlSmallStr::from_static("foo"),
297 [
298 s0.as_materialized_series().clone(),
299 s1.as_materialized_series().clear(),
300 s1.as_materialized_series().clone(),
301 ],
302 );
303 let df = DataFrame::new(vec![list, s0, s1])?;
304 let out = df.explode(
305 ["foo"],
306 ExplodeOptions {
307 empty_as_null: true,
308 keep_nulls: true,
309 },
310 )?;
311 let expected = df![
312 "foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
313 "B" => [1, 1, 1, 2, 3, 3, 3],
314 "C" => [1, 1, 1, 1, 1, 1, 1],
315 ]?;
316
317 assert!(out.equals_missing(&expected));
318 Ok(())
319 }
320
321 #[test]
322 #[cfg_attr(miri, ignore)]
323 fn test_explode_single_col() -> PolarsResult<()> {
324 let s0 = Series::new(PlSmallStr::from_static("a"), &[1i32, 2, 3]);
325 let s1 = Series::new(PlSmallStr::from_static("b"), &[1i32, 1, 1]);
326 let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
327 let df = DataFrame::new(vec![list])?;
328
329 let out = df.explode(
330 ["foo"],
331 ExplodeOptions {
332 empty_as_null: true,
333 keep_nulls: true,
334 },
335 )?;
336 let out = out
337 .column("foo")?
338 .as_materialized_series()
339 .i32()?
340 .into_no_null_iter()
341 .collect::<Vec<_>>();
342 assert_eq!(out, &[1i32, 2, 3, 1, 1, 1]);
343
344 Ok(())
345 }
346}