polars_lazy/dsl/
functions.rs1use polars_core::prelude::*;
7pub use polars_plan::dsl::functions::*;
8use polars_plan::prelude::UnionArgs;
9use rayon::prelude::*;
10
11use crate::prelude::*;
12
13pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
14 inputs: L,
15 args: UnionArgs,
16) -> PolarsResult<LazyFrame> {
17 let mut inputs = inputs.as_ref().to_vec();
18
19 let lf = std::mem::take(
20 inputs
21 .get_mut(0)
22 .ok_or_else(|| polars_err!(NoData: "empty container given"))?,
23 );
24
25 let opt_state = lf.opt_state;
26 let cached_arenas = lf.cached_arena.clone();
27
28 let mut lps = Vec::with_capacity(inputs.len());
29 lps.push(lf.logical_plan);
30
31 for lf in &mut inputs[1..] {
32 let lp = std::mem::take(&mut lf.logical_plan);
33 lps.push(lp)
34 }
35
36 let lp = DslPlan::Union { inputs: lps, args };
37 Ok(LazyFrame::from_inner(lp, opt_state, cached_arenas))
38}
39
40#[cfg(feature = "diagonal_concat")]
41pub fn concat_lf_diagonal<L: AsRef<[LazyFrame]>>(
44 inputs: L,
45 mut args: UnionArgs,
46) -> PolarsResult<LazyFrame> {
47 args.diagonal = true;
48 concat_impl(inputs, args)
49}
50
51pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
53 inputs: L,
54 args: UnionArgs,
55) -> PolarsResult<LazyFrame> {
56 let lfs = inputs.as_ref();
57 let (opt_state, cached_arena) = lfs
58 .first()
59 .map(|lf| (lf.opt_state, lf.cached_arena.clone()))
60 .ok_or_else(
61 || polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"),
62 )?;
63
64 let options = HConcatOptions {
65 parallel: args.parallel,
66 };
67 let lp = DslPlan::HConcat {
68 inputs: lfs.iter().map(|lf| lf.logical_plan.clone()).collect(),
69 options,
70 };
71 Ok(LazyFrame::from_inner(lp, opt_state, cached_arena))
72}
73
74pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
76 concat_impl(inputs, args)
77}
78
79pub fn collect_all<I>(lfs: I) -> PolarsResult<Vec<DataFrame>>
81where
82 I: IntoParallelIterator<Item = LazyFrame>,
83{
84 let iter = lfs.into_par_iter();
85
86 polars_core::POOL.install(|| iter.map(|lf| lf.collect()).collect())
87}
88
89#[cfg(test)]
90mod test {
91 #[allow(unused_imports)]
93 use super::*;
94
95 #[test]
96 #[cfg(feature = "diagonal_concat")]
97 fn test_diag_concat_lf() -> PolarsResult<()> {
98 let a = df![
99 "a" => [1, 2],
100 "b" => ["a", "b"]
101 ]?;
102
103 let b = df![
104 "b" => ["a", "b"],
105 "c" => [1, 2]
106 ]?;
107
108 let c = df![
109 "a" => [5, 7],
110 "c" => [1, 2],
111 "d" => [1, 2]
112 ]?;
113
114 let out = concat_lf_diagonal(
115 &[a.lazy(), b.lazy(), c.lazy()],
116 UnionArgs {
117 rechunk: false,
118 parallel: false,
119 ..Default::default()
120 },
121 )?
122 .collect()?;
123
124 let expected = df![
125 "a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
126 "b" => [Some("a"), Some("b"), Some("a"), Some("b"), None, None],
127 "c" => [None, None, Some(1), Some(2), Some(1), Some(2)],
128 "d" => [None, None, None, None, Some(1), Some(2)]
129 ]?;
130
131 assert!(out.equals_missing(&expected));
132
133 Ok(())
134 }
135}