1use std::sync::Mutex;
2
3use arrow::array::ValueSize;
4use arrow::legacy::utils::CustomIterTools;
5use polars_core::chunked_array::from_iterator_par::ChunkedCollectParIterExt;
6use polars_core::prelude::*;
7use polars_plan::constants::MAP_LIST_NAME;
8use polars_plan::dsl::*;
9use rayon::prelude::*;
10
11use crate::physical_plan::exotic::prepare_expression_for_context;
12use crate::prelude::*;
13
14pub trait IntoListNameSpace {
15 fn into_list_name_space(self) -> ListNameSpace;
16}
17
18impl IntoListNameSpace for ListNameSpace {
19 fn into_list_name_space(self) -> ListNameSpace {
20 self
21 }
22}
23
24fn offsets_to_groups(offsets: &[i64]) -> Option<GroupPositions> {
25 let mut start = offsets[0];
26 let end = *offsets.last().unwrap();
27 if IdxSize::try_from(end - start).is_err() {
28 return None;
29 }
30 let groups = offsets
31 .iter()
32 .skip(1)
33 .map(|end| {
34 let offset = start as IdxSize;
35 let len = (*end - start) as IdxSize;
36 start = *end;
37 [offset, len]
38 })
39 .collect();
40 Some(
41 GroupsType::Slice {
42 groups,
43 rolling: false,
44 }
45 .into_sliceable(),
46 )
47}
48
49fn run_per_sublist(
50 s: Column,
51 lst: &ListChunked,
52 expr: &Expr,
53 parallel: bool,
54 output_field: Field,
55) -> PolarsResult<Option<Column>> {
56 let phys_expr = prepare_expression_for_context(
57 PlSmallStr::EMPTY,
58 expr,
59 lst.inner_dtype(),
60 Context::Default,
61 )?;
62
63 let state = ExecutionState::new();
64
65 let mut err = None;
66 let mut ca: ListChunked = if parallel {
67 let m_err = Mutex::new(None);
68 let ca: ListChunked = lst
69 .par_iter()
70 .map(|opt_s| {
71 opt_s.and_then(|s| {
72 let df = s.into_frame();
73 let out = phys_expr.evaluate(&df, &state);
74 match out {
75 Ok(s) => Some(s.take_materialized_series()),
76 Err(e) => {
77 *m_err.lock().unwrap() = Some(e);
78 None
79 },
80 }
81 })
82 })
83 .collect_ca_with_dtype(PlSmallStr::EMPTY, output_field.dtype.clone());
84 err = m_err.into_inner().unwrap();
85 ca
86 } else {
87 let mut df_container = DataFrame::empty();
88
89 lst.into_iter()
90 .map(|s| {
91 s.and_then(|s| unsafe {
92 df_container.with_column_unchecked(s.into_column());
93 let out = phys_expr.evaluate(&df_container, &state);
94 df_container.clear_columns();
95 match out {
96 Ok(s) => Some(s.take_materialized_series()),
97 Err(e) => {
98 err = Some(e);
99 None
100 },
101 }
102 })
103 })
104 .collect_trusted()
105 };
106 if let Some(err) = err {
107 return Err(err);
108 }
109
110 ca.rename(s.name().clone());
111
112 if ca.dtype() != output_field.dtype() {
113 ca.cast(output_field.dtype()).map(Column::from).map(Some)
114 } else {
115 Ok(Some(ca.into_column()))
116 }
117}
118
119fn run_on_group_by_engine(
120 name: PlSmallStr,
121 lst: &ListChunked,
122 expr: &Expr,
123) -> PolarsResult<Option<Column>> {
124 let lst = lst.rechunk();
125 let arr = lst.downcast_as_array();
126 let groups = offsets_to_groups(arr.offsets()).unwrap();
127
128 let values = Series::try_from((PlSmallStr::EMPTY, arr.values().clone())).unwrap();
130 let inner_dtype = lst.inner_dtype();
131 let values = unsafe { values.from_physical_unchecked(inner_dtype).unwrap() };
134
135 let df_context = values.into_frame();
136 let phys_expr =
137 prepare_expression_for_context(PlSmallStr::EMPTY, expr, inner_dtype, Context::Aggregation)?;
138
139 let state = ExecutionState::new();
140 let mut ac = phys_expr.evaluate_on_groups(&df_context, &groups, &state)?;
141 let out = match ac.agg_state() {
142 AggState::AggregatedScalar(_) => {
143 let out = ac.aggregated();
144 out.as_list().into_column()
145 },
146 _ => ac.aggregated(),
147 };
148 Ok(Some(out.with_name(name).into_column()))
149}
150
151pub trait ListNameSpaceExtension: IntoListNameSpace + Sized {
152 fn eval(self, expr: Expr, parallel: bool) -> Expr {
154 let this = self.into_list_name_space();
155
156 let expr2 = expr.clone();
157 let func = move |c: Column| {
158 for e in expr.into_iter() {
159 match e {
160 #[cfg(feature = "dtype-categorical")]
161 Expr::Cast {
162 dtype: DataType::Categorical(_, _) | DataType::Enum(_, _),
163 ..
164 } => {
165 polars_bail!(
166 ComputeError: "casting to categorical not allowed in `list.eval`"
167 )
168 },
169 Expr::Column(name) => {
170 polars_ensure!(
171 name.is_empty(),
172 ComputeError:
173 "named columns are not allowed in `list.eval`; consider using `element` or `col(\"\")`"
174 );
175 },
176 _ => {},
177 }
178 }
179 let lst = c.list()?.clone();
180
181 let output_field = eval_field_to_dtype(lst.ref_field(), &expr, true);
184 if lst.is_empty() {
185 return Ok(Some(Column::new_empty(
186 c.name().clone(),
187 output_field.dtype(),
188 )));
189 }
190 if lst.null_count() == lst.len() {
191 return Ok(Some(c.cast(output_field.dtype())?.into_column()));
192 }
193
194 let fits_idx_size = lst.get_values_size() <= (IdxSize::MAX as usize);
195 let is_user_apply = || {
198 expr.into_iter().any(|e| matches!(e, Expr::AnonymousFunction { options, .. } if options.fmt_str == MAP_LIST_NAME))
199 };
200
201 if fits_idx_size && c.null_count() == 0 && !is_user_apply() {
202 run_on_group_by_engine(c.name().clone(), &lst, &expr)
203 } else {
204 run_per_sublist(c, &lst, &expr, parallel, output_field)
205 }
206 };
207
208 this.0
209 .map(
210 func,
211 GetOutput::map_field(move |f| Ok(eval_field_to_dtype(f, &expr2, true))),
212 )
213 .with_fmt("eval")
214 }
215}
216
217impl ListNameSpaceExtension for ListNameSpace {}