polars_lazy/dsl/
eval.rs

1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::{ExpressionConversionState, create_physical_expr};
4use rayon::prelude::*;
5
6use super::*;
7use crate::prelude::*;
8
9pub(crate) fn eval_field_to_dtype(f: &Field, expr: &Expr, list: bool) -> Field {
10    // Dummy df to determine output dtype.
11    let dtype = f
12        .dtype()
13        .inner_dtype()
14        .cloned()
15        .unwrap_or_else(|| f.dtype().clone());
16
17    let df = Series::new_empty(PlSmallStr::EMPTY, &dtype).into_frame();
18
19    #[cfg(feature = "python")]
20    let out = {
21        use pyo3::Python;
22        Python::with_gil(|py| py.allow_threads(|| df.lazy().select([expr.clone()]).collect()))
23    };
24    #[cfg(not(feature = "python"))]
25    let out = { df.lazy().select([expr.clone()]).collect() };
26
27    match out {
28        Ok(out) => {
29            let dtype = out.get_columns()[0].dtype();
30            if list {
31                Field::new(f.name().clone(), DataType::List(Box::new(dtype.clone())))
32            } else {
33                Field::new(f.name().clone(), dtype.clone())
34            }
35        },
36        Err(_) => Field::new(f.name().clone(), DataType::Null),
37    }
38}
39
40pub trait ExprEvalExtension: IntoExpr + Sized {
41    /// Run an expression over a sliding window that increases `1` slot every iteration.
42    ///
43    /// # Warning
44    /// This can be really slow as it can have `O(n^2)` complexity. Don't use this for operations
45    /// that visit all elements.
46    fn cumulative_eval(self, expr: Expr, min_periods: usize, parallel: bool) -> Expr {
47        let this = self.into_expr();
48        let expr2 = expr.clone();
49        let func = move |mut c: Column| {
50            let name = c.name().clone();
51            c.rename(PlSmallStr::EMPTY);
52
53            // Ensure we get the new schema.
54            let output_field = eval_field_to_dtype(c.field().as_ref(), &expr, false);
55            let schema = Arc::new(Schema::from_iter(std::iter::once(output_field.clone())));
56
57            let expr = expr.clone();
58            let mut arena = Arena::with_capacity(10);
59            let aexpr = to_expr_ir(expr, &mut arena)?;
60            let phys_expr = create_physical_expr(
61                &aexpr,
62                Context::Default,
63                &arena,
64                &schema,
65                &mut ExpressionConversionState::new(true, 0),
66            )?;
67
68            let state = ExecutionState::new();
69
70            let finish = |out: Column| {
71                polars_ensure!(
72                    out.len() <= 1,
73                    ComputeError:
74                    "expected single value, got a result with length {}, {:?}",
75                    out.len(), out,
76                );
77                Ok(out.get(0).unwrap().into_static())
78            };
79
80            let avs = if parallel {
81                POOL.install(|| {
82                    (1..c.len() + 1)
83                        .into_par_iter()
84                        .map(|len| {
85                            let c = c.slice(0, len);
86                            if (len - c.null_count()) >= min_periods {
87                                let df = c.clone().into_frame();
88                                let out = phys_expr.evaluate(&df, &state)?.into_column();
89                                finish(out)
90                            } else {
91                                Ok(AnyValue::Null)
92                            }
93                        })
94                        .collect::<PolarsResult<Vec<_>>>()
95                })?
96            } else {
97                let mut df_container = DataFrame::empty();
98                (1..c.len() + 1)
99                    .map(|len| {
100                        let c = c.slice(0, len);
101                        if (len - c.null_count()) >= min_periods {
102                            unsafe {
103                                df_container.with_column_unchecked(c.into_column());
104                                let out = phys_expr.evaluate(&df_container, &state)?.into_column();
105                                df_container.clear_columns();
106                                finish(out)
107                            }
108                        } else {
109                            Ok(AnyValue::Null)
110                        }
111                    })
112                    .collect::<PolarsResult<Vec<_>>>()?
113            };
114            let c = Column::new(name, avs);
115
116            if c.dtype() != output_field.dtype() {
117                c.cast(output_field.dtype()).map(Some)
118            } else {
119                Ok(Some(c))
120            }
121        };
122
123        this.apply(
124            func,
125            GetOutput::map_field(move |f| Ok(eval_field_to_dtype(f, &expr2, false))),
126        )
127        .with_fmt("expanding_eval")
128    }
129}
130
131impl ExprEvalExtension for Expr {}