polars_lazy/dsl/
list.rs

1use std::sync::Mutex;
2
3use arrow::array::ValueSize;
4use arrow::legacy::utils::CustomIterTools;
5use polars_core::chunked_array::from_iterator_par::ChunkedCollectParIterExt;
6use polars_core::prelude::*;
7use polars_plan::constants::MAP_LIST_NAME;
8use polars_plan::dsl::*;
9use rayon::prelude::*;
10
11use crate::physical_plan::exotic::prepare_expression_for_context;
12use crate::prelude::*;
13
14pub trait IntoListNameSpace {
15    fn into_list_name_space(self) -> ListNameSpace;
16}
17
18impl IntoListNameSpace for ListNameSpace {
19    fn into_list_name_space(self) -> ListNameSpace {
20        self
21    }
22}
23
24fn offsets_to_groups(offsets: &[i64]) -> Option<GroupPositions> {
25    let mut start = offsets[0];
26    let end = *offsets.last().unwrap();
27    if IdxSize::try_from(end - start).is_err() {
28        return None;
29    }
30    let groups = offsets
31        .iter()
32        .skip(1)
33        .map(|end| {
34            let offset = start as IdxSize;
35            let len = (*end - start) as IdxSize;
36            start = *end;
37            [offset, len]
38        })
39        .collect();
40    Some(
41        GroupsType::Slice {
42            groups,
43            rolling: false,
44        }
45        .into_sliceable(),
46    )
47}
48
49fn run_per_sublist(
50    s: Column,
51    lst: &ListChunked,
52    expr: &Expr,
53    parallel: bool,
54    output_field: Field,
55) -> PolarsResult<Option<Column>> {
56    let phys_expr = prepare_expression_for_context(
57        PlSmallStr::EMPTY,
58        expr,
59        lst.inner_dtype(),
60        Context::Default,
61    )?;
62
63    let state = ExecutionState::new();
64
65    let mut err = None;
66    let mut ca: ListChunked = if parallel {
67        let m_err = Mutex::new(None);
68        let ca: ListChunked = lst
69            .par_iter()
70            .map(|opt_s| {
71                opt_s.and_then(|s| {
72                    let df = s.into_frame();
73                    let out = phys_expr.evaluate(&df, &state);
74                    match out {
75                        Ok(s) => Some(s.take_materialized_series()),
76                        Err(e) => {
77                            *m_err.lock().unwrap() = Some(e);
78                            None
79                        },
80                    }
81                })
82            })
83            .collect_ca_with_dtype(PlSmallStr::EMPTY, output_field.dtype.clone());
84        err = m_err.into_inner().unwrap();
85        ca
86    } else {
87        let mut df_container = DataFrame::empty();
88
89        lst.into_iter()
90            .map(|s| {
91                s.and_then(|s| unsafe {
92                    df_container.with_column_unchecked(s.into_column());
93                    let out = phys_expr.evaluate(&df_container, &state);
94                    df_container.clear_columns();
95                    match out {
96                        Ok(s) => Some(s.take_materialized_series()),
97                        Err(e) => {
98                            err = Some(e);
99                            None
100                        },
101                    }
102                })
103            })
104            .collect_trusted()
105    };
106    if let Some(err) = err {
107        return Err(err);
108    }
109
110    ca.rename(s.name().clone());
111
112    if ca.dtype() != output_field.dtype() {
113        ca.cast(output_field.dtype()).map(Column::from).map(Some)
114    } else {
115        Ok(Some(ca.into_column()))
116    }
117}
118
119fn run_on_group_by_engine(
120    name: PlSmallStr,
121    lst: &ListChunked,
122    expr: &Expr,
123) -> PolarsResult<Option<Column>> {
124    let lst = lst.rechunk();
125    let arr = lst.downcast_as_array();
126    let groups = offsets_to_groups(arr.offsets()).unwrap();
127
128    // List elements in a series.
129    let values = Series::try_from((PlSmallStr::EMPTY, arr.values().clone())).unwrap();
130    let inner_dtype = lst.inner_dtype();
131    // SAFETY:
132    // Invariant in List means values physicals can be cast to inner dtype
133    let values = unsafe { values.from_physical_unchecked(inner_dtype).unwrap() };
134
135    let df_context = values.into_frame();
136    let phys_expr =
137        prepare_expression_for_context(PlSmallStr::EMPTY, expr, inner_dtype, Context::Aggregation)?;
138
139    let state = ExecutionState::new();
140    let mut ac = phys_expr.evaluate_on_groups(&df_context, &groups, &state)?;
141    let out = match ac.agg_state() {
142        AggState::AggregatedScalar(_) => {
143            let out = ac.aggregated();
144            out.as_list().into_column()
145        },
146        _ => ac.aggregated(),
147    };
148    Ok(Some(out.with_name(name).into_column()))
149}
150
151pub trait ListNameSpaceExtension: IntoListNameSpace + Sized {
152    /// Run any [`Expr`] on these lists elements
153    fn eval(self, expr: Expr, parallel: bool) -> Expr {
154        let this = self.into_list_name_space();
155
156        let expr2 = expr.clone();
157        let func = move |c: Column| {
158            for e in expr.into_iter() {
159                match e {
160                    #[cfg(feature = "dtype-categorical")]
161                    Expr::Cast {
162                        dtype: DataType::Categorical(_, _) | DataType::Enum(_, _),
163                        ..
164                    } => {
165                        polars_bail!(
166                            ComputeError: "casting to categorical not allowed in `list.eval`"
167                        )
168                    },
169                    Expr::Column(name) => {
170                        polars_ensure!(
171                            name.is_empty(),
172                            ComputeError:
173                            "named columns are not allowed in `list.eval`; consider using `element` or `col(\"\")`"
174                        );
175                    },
176                    _ => {},
177                }
178            }
179            let lst = c.list()?.clone();
180
181            // # fast returns
182            // ensure we get the new schema
183            let output_field = eval_field_to_dtype(lst.ref_field(), &expr, true);
184            if lst.is_empty() {
185                return Ok(Some(Column::new_empty(
186                    c.name().clone(),
187                    output_field.dtype(),
188                )));
189            }
190            if lst.null_count() == lst.len() {
191                return Ok(Some(c.cast(output_field.dtype())?.into_column()));
192            }
193
194            let fits_idx_size = lst.get_values_size() <= (IdxSize::MAX as usize);
195            // If a users passes a return type to `apply`, e.g. `return_dtype=pl.Int64`,
196            // this fails as the list builder expects `List<Int64>`, so let's skip that for now.
197            let is_user_apply = || {
198                expr.into_iter().any(|e| matches!(e, Expr::AnonymousFunction { options, .. } if options.fmt_str == MAP_LIST_NAME))
199            };
200
201            if fits_idx_size && c.null_count() == 0 && !is_user_apply() {
202                run_on_group_by_engine(c.name().clone(), &lst, &expr)
203            } else {
204                run_per_sublist(c, &lst, &expr, parallel, output_field)
205            }
206        };
207
208        this.0
209            .map(
210                func,
211                GetOutput::map_field(move |f| Ok(eval_field_to_dtype(f, &expr2, true))),
212            )
213            .with_fmt("eval")
214    }
215}
216
217impl ListNameSpaceExtension for ListNameSpace {}