polars_lazy/dsl/
functions.rs

1//! # Functions
2//!
3//! Function on multiple expressions.
4//!
5
6use polars_core::prelude::*;
7pub use polars_plan::dsl::functions::*;
8use polars_plan::prelude::UnionArgs;
9use rayon::prelude::*;
10
11use crate::prelude::*;
12
13pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
14    inputs: L,
15    args: UnionArgs,
16) -> PolarsResult<LazyFrame> {
17    let mut inputs = inputs.as_ref().to_vec();
18
19    let lf = std::mem::take(
20        inputs
21            .get_mut(0)
22            .ok_or_else(|| polars_err!(NoData: "empty container given"))?,
23    );
24
25    let opt_state = lf.opt_state;
26    let cached_arenas = lf.cached_arena.clone();
27
28    let mut lps = Vec::with_capacity(inputs.len());
29    lps.push(lf.logical_plan);
30
31    for lf in &mut inputs[1..] {
32        let lp = std::mem::take(&mut lf.logical_plan);
33        lps.push(lp)
34    }
35
36    let lp = DslPlan::Union { inputs: lps, args };
37    Ok(LazyFrame::from_inner(lp, opt_state, cached_arenas))
38}
39
40#[cfg(feature = "diagonal_concat")]
41/// Concat [LazyFrame]s diagonally.
42/// Calls [`concat`][concat()] internally.
43pub fn concat_lf_diagonal<L: AsRef<[LazyFrame]>>(
44    inputs: L,
45    mut args: UnionArgs,
46) -> PolarsResult<LazyFrame> {
47    args.diagonal = true;
48    concat_impl(inputs, args)
49}
50
51/// Concat [LazyFrame]s horizontally.
52pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
53    inputs: L,
54    args: UnionArgs,
55) -> PolarsResult<LazyFrame> {
56    let lfs = inputs.as_ref();
57    let (opt_state, cached_arena) = lfs
58        .first()
59        .map(|lf| (lf.opt_state, lf.cached_arena.clone()))
60        .ok_or_else(
61            || polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"),
62        )?;
63
64    let options = HConcatOptions {
65        parallel: args.parallel,
66        strict: args.strict,
67    };
68    let lp = DslPlan::HConcat {
69        inputs: lfs.iter().map(|lf| lf.logical_plan.clone()).collect(),
70        options,
71    };
72    Ok(LazyFrame::from_inner(lp, opt_state, cached_arena))
73}
74
75/// Concat multiple [`LazyFrame`]s vertically.
76pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
77    concat_impl(inputs, args)
78}
79
80/// Collect all [`LazyFrame`] computations.
81pub fn collect_all<I>(lfs: I) -> PolarsResult<Vec<DataFrame>>
82where
83    I: IntoParallelIterator<Item = LazyFrame>,
84{
85    let iter = lfs.into_par_iter();
86
87    polars_core::POOL.install(|| iter.map(|lf| lf.collect()).collect())
88}
89
90#[cfg(test)]
91mod test {
92    // used only if feature="diagonal_concat"
93    #[allow(unused_imports)]
94    use super::*;
95
96    #[test]
97    #[cfg(feature = "diagonal_concat")]
98    fn test_diag_concat_lf() -> PolarsResult<()> {
99        let a = df![
100            "a" => [1, 2],
101            "b" => ["a", "b"]
102        ]?;
103
104        let b = df![
105            "b" => ["a", "b"],
106            "c" => [1, 2]
107        ]?;
108
109        let c = df![
110            "a" => [5, 7],
111            "c" => [1, 2],
112            "d" => [1, 2]
113        ]?;
114
115        let out = concat_lf_diagonal(
116            &[a.lazy(), b.lazy(), c.lazy()],
117            UnionArgs {
118                rechunk: false,
119                parallel: false,
120                ..Default::default()
121            },
122        )?
123        .collect()?;
124
125        let expected = df![
126            "a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
127            "b" => [Some("a"), Some("b"), Some("a"), Some("b"), None, None],
128            "c" => [None, None, Some(1), Some(2), Some(1), Some(2)],
129            "d" => [None, None, None, None, Some(1), Some(2)]
130        ]?;
131
132        assert!(out.equals_missing(&expected));
133
134        Ok(())
135    }
136}