polars_lazy/dsl/
functions.rs

1//! # Functions
2//!
3//! Function on multiple expressions.
4//!
5
6use polars_core::prelude::*;
7pub use polars_plan::dsl::functions::*;
8use polars_plan::prelude::UnionArgs;
9use rayon::prelude::*;
10
11use crate::prelude::*;
12
13pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
14    inputs: L,
15    args: UnionArgs,
16) -> PolarsResult<LazyFrame> {
17    let mut inputs = inputs.as_ref().to_vec();
18
19    let lf = std::mem::take(
20        inputs
21            .get_mut(0)
22            .ok_or_else(|| polars_err!(NoData: "empty container given"))?,
23    );
24
25    let opt_state = lf.opt_state;
26    let cached_arenas = lf.cached_arena.clone();
27
28    let mut lps = Vec::with_capacity(inputs.len());
29    lps.push(lf.logical_plan);
30
31    for lf in &mut inputs[1..] {
32        let lp = std::mem::take(&mut lf.logical_plan);
33        lps.push(lp)
34    }
35
36    let lp = DslPlan::Union { inputs: lps, args };
37    Ok(LazyFrame::from_inner(lp, opt_state, cached_arenas))
38}
39
40#[cfg(feature = "diagonal_concat")]
41/// Concat [LazyFrame]s diagonally.
42/// Calls [`concat`][concat()] internally.
43pub fn concat_lf_diagonal<L: AsRef<[LazyFrame]>>(
44    inputs: L,
45    mut args: UnionArgs,
46) -> PolarsResult<LazyFrame> {
47    args.diagonal = true;
48    concat_impl(inputs, args)
49}
50
51/// Concat [LazyFrame]s horizontally.
52pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
53    inputs: L,
54    args: UnionArgs,
55) -> PolarsResult<LazyFrame> {
56    let lfs = inputs.as_ref();
57    let (opt_state, cached_arena) = lfs
58        .first()
59        .map(|lf| (lf.opt_state, lf.cached_arena.clone()))
60        .ok_or_else(
61            || polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"),
62        )?;
63
64    let options = HConcatOptions {
65        parallel: args.parallel,
66    };
67    let lp = DslPlan::HConcat {
68        inputs: lfs.iter().map(|lf| lf.logical_plan.clone()).collect(),
69        options,
70    };
71    Ok(LazyFrame::from_inner(lp, opt_state, cached_arena))
72}
73
74/// Concat multiple [`LazyFrame`]s vertically.
75pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
76    concat_impl(inputs, args)
77}
78
79/// Collect all [`LazyFrame`] computations.
80pub fn collect_all<I>(lfs: I) -> PolarsResult<Vec<DataFrame>>
81where
82    I: IntoParallelIterator<Item = LazyFrame>,
83{
84    let iter = lfs.into_par_iter();
85
86    polars_core::POOL.install(|| iter.map(|lf| lf.collect()).collect())
87}
88
89#[cfg(test)]
90mod test {
91    // used only if feature="diagonal_concat"
92    #[allow(unused_imports)]
93    use super::*;
94
95    #[test]
96    #[cfg(feature = "diagonal_concat")]
97    fn test_diag_concat_lf() -> PolarsResult<()> {
98        let a = df![
99            "a" => [1, 2],
100            "b" => ["a", "b"]
101        ]?;
102
103        let b = df![
104            "b" => ["a", "b"],
105            "c" => [1, 2]
106        ]?;
107
108        let c = df![
109            "a" => [5, 7],
110            "c" => [1, 2],
111            "d" => [1, 2]
112        ]?;
113
114        let out = concat_lf_diagonal(
115            &[a.lazy(), b.lazy(), c.lazy()],
116            UnionArgs {
117                rechunk: false,
118                parallel: false,
119                ..Default::default()
120            },
121        )?
122        .collect()?;
123
124        let expected = df![
125            "a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
126            "b" => [Some("a"), Some("b"), Some("a"), Some("b"), None, None],
127            "c" => [None, None, Some(1), Some(2), Some(1), Some(2)],
128            "d" => [None, None, None, None, Some(1), Some(2)]
129        ]?;
130
131        assert!(out.equals_missing(&expected));
132
133        Ok(())
134    }
135}