1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::{ExpressionConversionState, create_physical_expr};
4use rayon::prelude::*;
5
6use super::*;
7use crate::prelude::*;
8
9pub(crate) fn eval_field_to_dtype(f: &Field, expr: &Expr, list: bool) -> Field {
10 let dtype = f
12 .dtype()
13 .inner_dtype()
14 .cloned()
15 .unwrap_or_else(|| f.dtype().clone());
16
17 let df = Series::new_empty(PlSmallStr::EMPTY, &dtype).into_frame();
18
19 #[cfg(feature = "python")]
20 let out = {
21 use pyo3::Python;
22 Python::with_gil(|py| py.allow_threads(|| df.lazy().select([expr.clone()]).collect()))
23 };
24 #[cfg(not(feature = "python"))]
25 let out = { df.lazy().select([expr.clone()]).collect() };
26
27 match out {
28 Ok(out) => {
29 let dtype = out.get_columns()[0].dtype();
30 if list {
31 Field::new(f.name().clone(), DataType::List(Box::new(dtype.clone())))
32 } else {
33 Field::new(f.name().clone(), dtype.clone())
34 }
35 },
36 Err(_) => Field::new(f.name().clone(), DataType::Null),
37 }
38}
39
40pub trait ExprEvalExtension: IntoExpr + Sized {
41 fn cumulative_eval(self, expr: Expr, min_periods: usize, parallel: bool) -> Expr {
47 let this = self.into_expr();
48 let expr2 = expr.clone();
49 let func = move |mut c: Column| {
50 let name = c.name().clone();
51 c.rename(PlSmallStr::EMPTY);
52
53 let output_field = eval_field_to_dtype(c.field().as_ref(), &expr, false);
55 let schema = Arc::new(Schema::from_iter(std::iter::once(output_field.clone())));
56
57 let expr = expr.clone();
58 let mut arena = Arena::with_capacity(10);
59 let aexpr = to_expr_ir(expr, &mut arena)?;
60 let phys_expr = create_physical_expr(
61 &aexpr,
62 Context::Default,
63 &arena,
64 &schema,
65 &mut ExpressionConversionState::new(true, 0),
66 )?;
67
68 let state = ExecutionState::new();
69
70 let finish = |out: Column| {
71 polars_ensure!(
72 out.len() <= 1,
73 ComputeError:
74 "expected single value, got a result with length {}, {:?}",
75 out.len(), out,
76 );
77 Ok(out.get(0).unwrap().into_static())
78 };
79
80 let avs = if parallel {
81 POOL.install(|| {
82 (1..c.len() + 1)
83 .into_par_iter()
84 .map(|len| {
85 let c = c.slice(0, len);
86 if (len - c.null_count()) >= min_periods {
87 let df = c.clone().into_frame();
88 let out = phys_expr.evaluate(&df, &state)?.into_column();
89 finish(out)
90 } else {
91 Ok(AnyValue::Null)
92 }
93 })
94 .collect::<PolarsResult<Vec<_>>>()
95 })?
96 } else {
97 let mut df_container = DataFrame::empty();
98 (1..c.len() + 1)
99 .map(|len| {
100 let c = c.slice(0, len);
101 if (len - c.null_count()) >= min_periods {
102 unsafe {
103 df_container.with_column_unchecked(c.into_column());
104 let out = phys_expr.evaluate(&df_container, &state)?.into_column();
105 df_container.clear_columns();
106 finish(out)
107 }
108 } else {
109 Ok(AnyValue::Null)
110 }
111 })
112 .collect::<PolarsResult<Vec<_>>>()?
113 };
114 let c = Column::new(name, avs);
115
116 if c.dtype() != output_field.dtype() {
117 c.cast(output_field.dtype()).map(Some)
118 } else {
119 Ok(Some(c))
120 }
121 };
122
123 this.apply(
124 func,
125 GetOutput::map_field(move |f| Ok(eval_field_to_dtype(f, &expr2, false))),
126 )
127 .with_fmt("expanding_eval")
128 }
129}
130
131impl ExprEvalExtension for Expr {}