polars_lazy/dsl/
list.rs

1use std::borrow::Cow;
2use std::sync::Mutex;
3
4use arrow::array::{Array, ListArray, ValueSize};
5use arrow::legacy::utils::CustomIterTools;
6use polars_core::POOL;
7use polars_core::chunked_array::from_iterator_par::ChunkedCollectParIterExt;
8use polars_core::prelude::*;
9use polars_plan::constants::MAP_LIST_NAME;
10use polars_plan::dsl::*;
11use rayon::prelude::*;
12
13use crate::physical_plan::exotic::prepare_expression_for_context;
14use crate::prelude::*;
15
16pub trait IntoListNameSpace {
17    fn into_list_name_space(self) -> ListNameSpace;
18}
19
20impl IntoListNameSpace for ListNameSpace {
21    fn into_list_name_space(self) -> ListNameSpace {
22        self
23    }
24}
25
26fn offsets_to_groups(offsets: &[i64]) -> Option<GroupPositions> {
27    let mut start = offsets[0];
28    let end = *offsets.last().unwrap();
29    if IdxSize::try_from(end - start).is_err() {
30        return None;
31    }
32    let groups = offsets
33        .iter()
34        .skip(1)
35        .map(|end| {
36            let offset = start as IdxSize;
37            let len = (*end - start) as IdxSize;
38            start = *end;
39            [offset, len]
40        })
41        .collect();
42    Some(
43        GroupsType::Slice {
44            groups,
45            rolling: false,
46        }
47        .into_sliceable(),
48    )
49}
50
51fn run_per_sublist(
52    s: Column,
53    lst: &ListChunked,
54    expr: &Expr,
55    parallel: bool,
56    output_field: Field,
57) -> PolarsResult<Option<Column>> {
58    let phys_expr = prepare_expression_for_context(
59        PlSmallStr::EMPTY,
60        expr,
61        lst.inner_dtype(),
62        Context::Default,
63    )?;
64
65    let state = ExecutionState::new();
66
67    let mut err = None;
68    let mut ca: ListChunked = if parallel {
69        let m_err = Mutex::new(None);
70        let ca: ListChunked = POOL.install(|| {
71            lst.par_iter()
72                .map(|opt_s| {
73                    opt_s.and_then(|s| {
74                        let df = s.into_frame();
75                        let out = phys_expr.evaluate(&df, &state);
76                        match out {
77                            Ok(s) => Some(s.take_materialized_series()),
78                            Err(e) => {
79                                *m_err.lock().unwrap() = Some(e);
80                                None
81                            },
82                        }
83                    })
84                })
85                .collect_ca_with_dtype(PlSmallStr::EMPTY, output_field.dtype.clone())
86        });
87        err = m_err.into_inner().unwrap();
88        ca
89    } else {
90        let mut df_container = DataFrame::empty();
91
92        lst.into_iter()
93            .map(|s| {
94                s.and_then(|s| unsafe {
95                    df_container.with_column_unchecked(s.into_column());
96                    let out = phys_expr.evaluate(&df_container, &state);
97                    df_container.clear_columns();
98                    match out {
99                        Ok(s) => Some(s.take_materialized_series()),
100                        Err(e) => {
101                            err = Some(e);
102                            None
103                        },
104                    }
105                })
106            })
107            .collect_trusted()
108    };
109    if let Some(err) = err {
110        return Err(err);
111    }
112
113    ca.rename(s.name().clone());
114
115    if ca.dtype() != output_field.dtype() {
116        ca.cast(output_field.dtype()).map(Column::from).map(Some)
117    } else {
118        Ok(Some(ca.into_column()))
119    }
120}
121
122fn run_on_group_by_engine(
123    name: PlSmallStr,
124    lst: &ListChunked,
125    expr: &Expr,
126) -> PolarsResult<Option<Column>> {
127    let lst = lst.rechunk();
128    let arr = lst.downcast_as_array();
129    let groups = offsets_to_groups(arr.offsets()).unwrap();
130
131    // List elements in a series.
132    let values = Series::try_from((PlSmallStr::EMPTY, arr.values().clone())).unwrap();
133    let inner_dtype = lst.inner_dtype();
134    // SAFETY:
135    // Invariant in List means values physicals can be cast to inner dtype
136    let values = unsafe { values.from_physical_unchecked(inner_dtype).unwrap() };
137
138    let df_context = values.into_frame();
139    let phys_expr =
140        prepare_expression_for_context(PlSmallStr::EMPTY, expr, inner_dtype, Context::Aggregation)?;
141
142    let state = ExecutionState::new();
143    let mut ac = phys_expr.evaluate_on_groups(&df_context, &groups, &state)?;
144    let out = match ac.agg_state() {
145        AggState::AggregatedScalar(_) => {
146            let out = ac.aggregated();
147            out.as_list().into_column()
148        },
149        _ => ac.aggregated(),
150    };
151    Ok(Some(out.with_name(name).into_column()))
152}
153
154fn run_elementwise_on_values(
155    lst: &ListChunked,
156    expr: &Expr,
157    parallel: bool,
158    output_field: Field,
159) -> PolarsResult<Column> {
160    if lst.chunks().is_empty() {
161        return Ok(Column::new_empty(output_field.name, &output_field.dtype));
162    }
163
164    let phys_expr = prepare_expression_for_context(
165        PlSmallStr::EMPTY,
166        expr,
167        lst.inner_dtype(),
168        Context::Default,
169    )?;
170
171    let lst = lst
172        .trim_lists_to_normalized_offsets()
173        .map_or(Cow::Borrowed(lst), Cow::Owned);
174
175    let output_arrow_dtype = output_field.dtype().clone().to_arrow(CompatLevel::newest());
176    let output_arrow_dtype_physical = output_arrow_dtype.underlying_physical_type();
177
178    let state = ExecutionState::new();
179
180    let apply_to_chunk = |arr: &dyn Array| {
181        let arr: &ListArray<i64> = arr.as_any().downcast_ref().unwrap();
182
183        let values = unsafe {
184            Series::from_chunks_and_dtype_unchecked(
185                PlSmallStr::EMPTY,
186                vec![arr.values().clone()],
187                lst.inner_dtype(),
188            )
189        };
190
191        let df = values.into_frame();
192
193        phys_expr.evaluate(&df, &state).map(|values| {
194            let values = values.take_materialized_series().rechunk().chunks()[0].clone();
195
196            ListArray::<i64>::new(
197                output_arrow_dtype_physical.clone(),
198                arr.offsets().clone(),
199                values,
200                arr.validity().cloned(),
201            )
202            .boxed()
203        })
204    };
205
206    let chunks = if parallel && lst.chunks().len() > 1 {
207        POOL.install(|| {
208            lst.chunks()
209                .into_par_iter()
210                .map(|x| apply_to_chunk(&**x))
211                .collect::<PolarsResult<Vec<Box<dyn Array>>>>()
212        })?
213    } else {
214        lst.chunks()
215            .iter()
216            .map(|x| apply_to_chunk(&**x))
217            .collect::<PolarsResult<Vec<Box<dyn Array>>>>()?
218    };
219
220    Ok(unsafe {
221        ListChunked::from_chunks(output_field.name.clone(), chunks)
222            .cast_unchecked(output_field.dtype())
223            .unwrap()
224    }
225    .into_column())
226}
227
228pub trait ListNameSpaceExtension: IntoListNameSpace + Sized {
229    /// Run any [`Expr`] on these lists elements
230    fn eval(self, expr: Expr, parallel: bool) -> Expr {
231        let mut expr_arena = Arena::with_capacity(4);
232
233        let (pd_group, returns_scalar) = to_aexpr(expr.clone(), &mut expr_arena).map_or(
234            (ExprPushdownGroup::Barrier, true),
235            |node| {
236                let mut pd_group = ExprPushdownGroup::Pushable;
237                pd_group.update_with_expr_rec(expr_arena.get(node), &expr_arena, None);
238
239                (pd_group, is_scalar_ae(node, &expr_arena))
240            },
241        );
242
243        let this = self.into_list_name_space();
244
245        let expr2 = expr.clone();
246        let func = move |c: Column| {
247            for e in expr.into_iter() {
248                match e {
249                    #[cfg(feature = "dtype-categorical")]
250                    Expr::Cast {
251                        dtype: DataType::Categorical(_, _) | DataType::Enum(_, _),
252                        ..
253                    } => {
254                        polars_bail!(
255                            ComputeError: "casting to categorical not allowed in `list.eval`"
256                        )
257                    },
258                    Expr::Column(name) => {
259                        polars_ensure!(
260                            name.is_empty(),
261                            ComputeError:
262                            "named columns are not allowed in `list.eval`; consider using `element` or `col(\"\")`"
263                        );
264                    },
265                    _ => {},
266                }
267            }
268
269            let lst = c.list()?.clone();
270
271            // # fast returns
272            // ensure we get the new schema
273            let output_field = eval_field_to_dtype(lst.ref_field(), &expr, true);
274            if lst.is_empty() {
275                return Ok(Some(Column::new_empty(
276                    c.name().clone(),
277                    output_field.dtype(),
278                )));
279            }
280            if lst.null_count() == lst.len() {
281                return Ok(Some(c.cast(output_field.dtype())?.into_column()));
282            }
283
284            let fits_idx_size = lst.get_values_size() <= (IdxSize::MAX as usize);
285            // If a users passes a return type to `apply`, e.g. `return_dtype=pl.Int64`,
286            // this fails as the list builder expects `List<Int64>`, so let's skip that for now.
287            let is_user_apply = || {
288                expr.into_iter().any(|e| matches!(e, Expr::AnonymousFunction { options, .. } if options.fmt_str == MAP_LIST_NAME))
289            };
290
291            if match pd_group {
292                ExprPushdownGroup::Pushable => true,
293                ExprPushdownGroup::Fallible => !lst.has_nulls(),
294                ExprPushdownGroup::Barrier => false,
295            } && !returns_scalar
296            {
297                run_elementwise_on_values(&lst, &expr, parallel, output_field).map(Some)
298            } else if fits_idx_size && c.null_count() == 0 && !is_user_apply() {
299                run_on_group_by_engine(c.name().clone(), &lst, &expr)
300            } else {
301                run_per_sublist(c, &lst, &expr, parallel, output_field)
302            }
303        };
304
305        this.0
306            .map(
307                func,
308                GetOutput::map_field(move |f| Ok(eval_field_to_dtype(f, &expr2, true))),
309            )
310            .with_fmt("eval")
311    }
312}
313
314impl ListNameSpaceExtension for ListNameSpace {}