1use std::borrow::Cow;
2use std::sync::Mutex;
3
4use arrow::array::{Array, ListArray, ValueSize};
5use arrow::legacy::utils::CustomIterTools;
6use polars_core::POOL;
7use polars_core::chunked_array::from_iterator_par::ChunkedCollectParIterExt;
8use polars_core::prelude::*;
9use polars_plan::constants::MAP_LIST_NAME;
10use polars_plan::dsl::*;
11use rayon::prelude::*;
12
13use crate::physical_plan::exotic::prepare_expression_for_context;
14use crate::prelude::*;
15
16pub trait IntoListNameSpace {
17 fn into_list_name_space(self) -> ListNameSpace;
18}
19
20impl IntoListNameSpace for ListNameSpace {
21 fn into_list_name_space(self) -> ListNameSpace {
22 self
23 }
24}
25
26fn offsets_to_groups(offsets: &[i64]) -> Option<GroupPositions> {
27 let mut start = offsets[0];
28 let end = *offsets.last().unwrap();
29 if IdxSize::try_from(end - start).is_err() {
30 return None;
31 }
32 let groups = offsets
33 .iter()
34 .skip(1)
35 .map(|end| {
36 let offset = start as IdxSize;
37 let len = (*end - start) as IdxSize;
38 start = *end;
39 [offset, len]
40 })
41 .collect();
42 Some(
43 GroupsType::Slice {
44 groups,
45 rolling: false,
46 }
47 .into_sliceable(),
48 )
49}
50
51fn run_per_sublist(
52 s: Column,
53 lst: &ListChunked,
54 expr: &Expr,
55 parallel: bool,
56 output_field: Field,
57) -> PolarsResult<Option<Column>> {
58 let phys_expr = prepare_expression_for_context(
59 PlSmallStr::EMPTY,
60 expr,
61 lst.inner_dtype(),
62 Context::Default,
63 )?;
64
65 let state = ExecutionState::new();
66
67 let mut err = None;
68 let mut ca: ListChunked = if parallel {
69 let m_err = Mutex::new(None);
70 let ca: ListChunked = POOL.install(|| {
71 lst.par_iter()
72 .map(|opt_s| {
73 opt_s.and_then(|s| {
74 let df = s.into_frame();
75 let out = phys_expr.evaluate(&df, &state);
76 match out {
77 Ok(s) => Some(s.take_materialized_series()),
78 Err(e) => {
79 *m_err.lock().unwrap() = Some(e);
80 None
81 },
82 }
83 })
84 })
85 .collect_ca_with_dtype(PlSmallStr::EMPTY, output_field.dtype.clone())
86 });
87 err = m_err.into_inner().unwrap();
88 ca
89 } else {
90 let mut df_container = DataFrame::empty();
91
92 lst.into_iter()
93 .map(|s| {
94 s.and_then(|s| unsafe {
95 df_container.with_column_unchecked(s.into_column());
96 let out = phys_expr.evaluate(&df_container, &state);
97 df_container.clear_columns();
98 match out {
99 Ok(s) => Some(s.take_materialized_series()),
100 Err(e) => {
101 err = Some(e);
102 None
103 },
104 }
105 })
106 })
107 .collect_trusted()
108 };
109 if let Some(err) = err {
110 return Err(err);
111 }
112
113 ca.rename(s.name().clone());
114
115 if ca.dtype() != output_field.dtype() {
116 ca.cast(output_field.dtype()).map(Column::from).map(Some)
117 } else {
118 Ok(Some(ca.into_column()))
119 }
120}
121
122fn run_on_group_by_engine(
123 name: PlSmallStr,
124 lst: &ListChunked,
125 expr: &Expr,
126) -> PolarsResult<Option<Column>> {
127 let lst = lst.rechunk();
128 let arr = lst.downcast_as_array();
129 let groups = offsets_to_groups(arr.offsets()).unwrap();
130
131 let values = Series::try_from((PlSmallStr::EMPTY, arr.values().clone())).unwrap();
133 let inner_dtype = lst.inner_dtype();
134 let values = unsafe { values.from_physical_unchecked(inner_dtype).unwrap() };
137
138 let df_context = values.into_frame();
139 let phys_expr =
140 prepare_expression_for_context(PlSmallStr::EMPTY, expr, inner_dtype, Context::Aggregation)?;
141
142 let state = ExecutionState::new();
143 let mut ac = phys_expr.evaluate_on_groups(&df_context, &groups, &state)?;
144 let out = match ac.agg_state() {
145 AggState::AggregatedScalar(_) => {
146 let out = ac.aggregated();
147 out.as_list().into_column()
148 },
149 _ => ac.aggregated(),
150 };
151 Ok(Some(out.with_name(name).into_column()))
152}
153
154fn run_elementwise_on_values(
155 lst: &ListChunked,
156 expr: &Expr,
157 parallel: bool,
158 output_field: Field,
159) -> PolarsResult<Column> {
160 if lst.chunks().is_empty() {
161 return Ok(Column::new_empty(output_field.name, &output_field.dtype));
162 }
163
164 let phys_expr = prepare_expression_for_context(
165 PlSmallStr::EMPTY,
166 expr,
167 lst.inner_dtype(),
168 Context::Default,
169 )?;
170
171 let lst = lst
172 .trim_lists_to_normalized_offsets()
173 .map_or(Cow::Borrowed(lst), Cow::Owned);
174
175 let output_arrow_dtype = output_field.dtype().clone().to_arrow(CompatLevel::newest());
176 let output_arrow_dtype_physical = output_arrow_dtype.underlying_physical_type();
177
178 let state = ExecutionState::new();
179
180 let apply_to_chunk = |arr: &dyn Array| {
181 let arr: &ListArray<i64> = arr.as_any().downcast_ref().unwrap();
182
183 let values = unsafe {
184 Series::from_chunks_and_dtype_unchecked(
185 PlSmallStr::EMPTY,
186 vec![arr.values().clone()],
187 lst.inner_dtype(),
188 )
189 };
190
191 let df = values.into_frame();
192
193 phys_expr.evaluate(&df, &state).map(|values| {
194 let values = values.take_materialized_series().rechunk().chunks()[0].clone();
195
196 ListArray::<i64>::new(
197 output_arrow_dtype_physical.clone(),
198 arr.offsets().clone(),
199 values,
200 arr.validity().cloned(),
201 )
202 .boxed()
203 })
204 };
205
206 let chunks = if parallel && lst.chunks().len() > 1 {
207 POOL.install(|| {
208 lst.chunks()
209 .into_par_iter()
210 .map(|x| apply_to_chunk(&**x))
211 .collect::<PolarsResult<Vec<Box<dyn Array>>>>()
212 })?
213 } else {
214 lst.chunks()
215 .iter()
216 .map(|x| apply_to_chunk(&**x))
217 .collect::<PolarsResult<Vec<Box<dyn Array>>>>()?
218 };
219
220 Ok(unsafe {
221 ListChunked::from_chunks(output_field.name.clone(), chunks)
222 .cast_unchecked(output_field.dtype())
223 .unwrap()
224 }
225 .into_column())
226}
227
228pub trait ListNameSpaceExtension: IntoListNameSpace + Sized {
229 fn eval(self, expr: Expr, parallel: bool) -> Expr {
231 let mut expr_arena = Arena::with_capacity(4);
232
233 let (pd_group, returns_scalar) = to_aexpr(expr.clone(), &mut expr_arena).map_or(
234 (ExprPushdownGroup::Barrier, true),
235 |node| {
236 let mut pd_group = ExprPushdownGroup::Pushable;
237 pd_group.update_with_expr_rec(expr_arena.get(node), &expr_arena, None);
238
239 (pd_group, is_scalar_ae(node, &expr_arena))
240 },
241 );
242
243 let this = self.into_list_name_space();
244
245 let expr2 = expr.clone();
246 let func = move |c: Column| {
247 for e in expr.into_iter() {
248 match e {
249 #[cfg(feature = "dtype-categorical")]
250 Expr::Cast {
251 dtype: DataType::Categorical(_, _) | DataType::Enum(_, _),
252 ..
253 } => {
254 polars_bail!(
255 ComputeError: "casting to categorical not allowed in `list.eval`"
256 )
257 },
258 Expr::Column(name) => {
259 polars_ensure!(
260 name.is_empty(),
261 ComputeError:
262 "named columns are not allowed in `list.eval`; consider using `element` or `col(\"\")`"
263 );
264 },
265 _ => {},
266 }
267 }
268
269 let lst = c.list()?.clone();
270
271 let output_field = eval_field_to_dtype(lst.ref_field(), &expr, true);
274 if lst.is_empty() {
275 return Ok(Some(Column::new_empty(
276 c.name().clone(),
277 output_field.dtype(),
278 )));
279 }
280 if lst.null_count() == lst.len() {
281 return Ok(Some(c.cast(output_field.dtype())?.into_column()));
282 }
283
284 let fits_idx_size = lst.get_values_size() <= (IdxSize::MAX as usize);
285 let is_user_apply = || {
288 expr.into_iter().any(|e| matches!(e, Expr::AnonymousFunction { options, .. } if options.fmt_str == MAP_LIST_NAME))
289 };
290
291 if match pd_group {
292 ExprPushdownGroup::Pushable => true,
293 ExprPushdownGroup::Fallible => !lst.has_nulls(),
294 ExprPushdownGroup::Barrier => false,
295 } && !returns_scalar
296 {
297 run_elementwise_on_values(&lst, &expr, parallel, output_field).map(Some)
298 } else if fits_idx_size && c.null_count() == 0 && !is_user_apply() {
299 run_on_group_by_engine(c.name().clone(), &lst, &expr)
300 } else {
301 run_per_sublist(c, &lst, &expr, parallel, output_field)
302 }
303 };
304
305 this.0
306 .map(
307 func,
308 GetOutput::map_field(move |f| Ok(eval_field_to_dtype(f, &expr2, true))),
309 )
310 .with_fmt("eval")
311 }
312}
313
314impl ListNameSpaceExtension for ListNameSpace {}