1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use polars_core::prelude::*;
use rayon::prelude::*;

use super::*;
use crate::physical_plan::planner::create_physical_expr;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

pub(crate) fn eval_field_to_dtype(f: &Field, expr: &Expr, list: bool) -> Field {
    // Dummy df to determine output dtype.
    let dtype = f
        .data_type()
        .inner_dtype()
        .cloned()
        .unwrap_or_else(|| f.data_type().clone());

    let df = Series::new_empty("", &dtype).into_frame();

    #[cfg(feature = "python")]
    let out = {
        use pyo3::Python;
        Python::with_gil(|py| py.allow_threads(|| df.lazy().select([expr.clone()]).collect()))
    };
    #[cfg(not(feature = "python"))]
    let out = { df.lazy().select([expr.clone()]).collect() };

    match out {
        Ok(out) => {
            let dtype = out.get_columns()[0].dtype();
            if list {
                Field::new(f.name(), DataType::List(Box::new(dtype.clone())))
            } else {
                Field::new(f.name(), dtype.clone())
            }
        },
        Err(_) => Field::new(f.name(), DataType::Null),
    }
}

pub trait ExprEvalExtension: IntoExpr + Sized {
    /// Run an expression over a sliding window that increases `1` slot every iteration.
    ///
    /// # Warning
    /// This can be really slow as it can have `O(n^2)` complexity. Don't use this for operations
    /// that visit all elements.
    fn cumulative_eval(self, expr: Expr, min_periods: usize, parallel: bool) -> Expr {
        let this = self.into_expr();
        let expr2 = expr.clone();
        let func = move |mut s: Series| {
            let name = s.name().to_string();
            s.rename("");

            // Ensure we get the new schema.
            let output_field = eval_field_to_dtype(s.field().as_ref(), &expr, false);

            let expr = expr.clone();
            let mut arena = Arena::with_capacity(10);
            let aexpr = to_expr_ir(expr, &mut arena);
            let phys_expr = create_physical_expr(
                &aexpr,
                Context::Default,
                &arena,
                None,
                &mut Default::default(),
            )?;

            let state = ExecutionState::new();

            let finish = |out: Series| {
                polars_ensure!(
                    out.len() <= 1,
                    ComputeError:
                    "expected single value, got a result with length {}, {:?}",
                    out.len(), out,
                );
                Ok(out.get(0).unwrap().into_static().unwrap())
            };

            let avs = if parallel {
                (1..s.len() + 1)
                    .into_par_iter()
                    .map(|len| {
                        let s = s.slice(0, len);
                        if (len - s.null_count()) >= min_periods {
                            let df = s.into_frame();
                            let out = phys_expr.evaluate(&df, &state)?;
                            finish(out)
                        } else {
                            Ok(AnyValue::Null)
                        }
                    })
                    .collect::<PolarsResult<Vec<_>>>()?
            } else {
                let mut df_container = DataFrame::empty();
                (1..s.len() + 1)
                    .map(|len| {
                        let s = s.slice(0, len);
                        if (len - s.null_count()) >= min_periods {
                            unsafe {
                                df_container.get_columns_mut().push(s);
                                let out = phys_expr.evaluate(&df_container, &state)?;
                                df_container.get_columns_mut().clear();
                                finish(out)
                            }
                        } else {
                            Ok(AnyValue::Null)
                        }
                    })
                    .collect::<PolarsResult<Vec<_>>>()?
            };
            let s = Series::new(&name, avs);

            if s.dtype() != output_field.data_type() {
                s.cast(output_field.data_type()).map(Some)
            } else {
                Ok(Some(s))
            }
        };

        this.apply(
            func,
            GetOutput::map_field(move |f| eval_field_to_dtype(f, &expr2, false)),
        )
        .with_fmt("expanding_eval")
    }
}

impl ExprEvalExtension for Expr {}