1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use polars_core::prelude::*;
use rayon::prelude::*;

use super::*;
use crate::physical_plan::planner::{create_physical_expr, ExpressionConversionState};
use crate::prelude::*;

pub(crate) fn eval_field_to_dtype(f: &Field, expr: &Expr, list: bool) -> Field {
    // Dummy df to determine output dtype.
    let dtype = f
        .data_type()
        .inner_dtype()
        .cloned()
        .unwrap_or_else(|| f.data_type().clone());

    let df = Series::new_empty("", &dtype).into_frame();

    #[cfg(feature = "python")]
    let out = {
        use pyo3::Python;
        Python::with_gil(|py| py.allow_threads(|| df.lazy().select([expr.clone()]).collect()))
    };
    #[cfg(not(feature = "python"))]
    let out = { df.lazy().select([expr.clone()]).collect() };

    match out {
        Ok(out) => {
            let dtype = out.get_columns()[0].dtype();
            if list {
                Field::new(f.name(), DataType::List(Box::new(dtype.clone())))
            } else {
                Field::new(f.name(), dtype.clone())
            }
        },
        Err(_) => Field::new(f.name(), DataType::Null),
    }
}

pub trait ExprEvalExtension: IntoExpr + Sized {
    /// Run an expression over a sliding window that increases `1` slot every iteration.
    ///
    /// # Warning
    /// This can be really slow as it can have `O(n^2)` complexity. Don't use this for operations
    /// that visit all elements.
    fn cumulative_eval(self, expr: Expr, min_periods: usize, parallel: bool) -> Expr {
        let this = self.into_expr();
        let expr2 = expr.clone();
        let func = move |mut s: Series| {
            let name = s.name().to_string();
            s.rename("");

            // Ensure we get the new schema.
            let output_field = eval_field_to_dtype(s.field().as_ref(), &expr, false);

            let expr = expr.clone();
            let mut arena = Arena::with_capacity(10);
            let aexpr = to_expr_ir(expr, &mut arena);
            let phys_expr = create_physical_expr(
                &aexpr,
                Context::Default,
                &arena,
                None,
                &mut ExpressionConversionState::new(true, 0),
            )?;

            let state = ExecutionState::new();

            let finish = |out: Series| {
                polars_ensure!(
                    out.len() <= 1,
                    ComputeError:
                    "expected single value, got a result with length {}, {:?}",
                    out.len(), out,
                );
                Ok(out.get(0).unwrap().into_static().unwrap())
            };

            let avs = if parallel {
                (1..s.len() + 1)
                    .into_par_iter()
                    .map(|len| {
                        let s = s.slice(0, len);
                        if (len - s.null_count()) >= min_periods {
                            let df = s.into_frame();
                            let out = phys_expr.evaluate(&df, &state)?;
                            finish(out)
                        } else {
                            Ok(AnyValue::Null)
                        }
                    })
                    .collect::<PolarsResult<Vec<_>>>()?
            } else {
                let mut df_container = DataFrame::empty();
                (1..s.len() + 1)
                    .map(|len| {
                        let s = s.slice(0, len);
                        if (len - s.null_count()) >= min_periods {
                            unsafe {
                                df_container.get_columns_mut().push(s);
                                let out = phys_expr.evaluate(&df_container, &state)?;
                                df_container.get_columns_mut().clear();
                                finish(out)
                            }
                        } else {
                            Ok(AnyValue::Null)
                        }
                    })
                    .collect::<PolarsResult<Vec<_>>>()?
            };
            let s = Series::new(&name, avs);

            if s.dtype() != output_field.data_type() {
                s.cast(output_field.data_type()).map(Some)
            } else {
                Ok(Some(s))
            }
        };

        this.apply(
            func,
            GetOutput::map_field(move |f| eval_field_to_dtype(f, &expr2, false)),
        )
        .with_fmt("expanding_eval")
    }
}

impl ExprEvalExtension for Expr {}