polars_lazy/dsl/
functions.rs1use polars_core::prelude::*;
7pub use polars_plan::dsl::functions::*;
8use polars_plan::prelude::UnionArgs;
9use rayon::prelude::*;
10
11use crate::prelude::*;
12
13pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
14 inputs: L,
15 args: UnionArgs,
16) -> PolarsResult<LazyFrame> {
17 let mut inputs = inputs.as_ref().to_vec();
18
19 let lf = std::mem::take(
20 inputs
21 .get_mut(0)
22 .ok_or_else(|| polars_err!(NoData: "empty container given"))?,
23 );
24
25 let opt_state = lf.opt_state;
26 let cached_arenas = lf.cached_arena.clone();
27
28 let mut lps = Vec::with_capacity(inputs.len());
29 lps.push(lf.logical_plan);
30
31 for lf in &mut inputs[1..] {
32 let lp = std::mem::take(&mut lf.logical_plan);
33 lps.push(lp)
34 }
35
36 let lp = DslPlan::Union { inputs: lps, args };
37 Ok(LazyFrame::from_inner(lp, opt_state, cached_arenas))
38}
39
40#[cfg(feature = "diagonal_concat")]
41pub fn concat_lf_diagonal<L: AsRef<[LazyFrame]>>(
44 inputs: L,
45 mut args: UnionArgs,
46) -> PolarsResult<LazyFrame> {
47 args.diagonal = true;
48 concat_impl(inputs, args)
49}
50
51pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
53 inputs: L,
54 args: UnionArgs,
55) -> PolarsResult<LazyFrame> {
56 let lfs = inputs.as_ref();
57 let (opt_state, cached_arena) = lfs
58 .first()
59 .map(|lf| (lf.opt_state, lf.cached_arena.clone()))
60 .ok_or_else(
61 || polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"),
62 )?;
63
64 let options = HConcatOptions {
65 parallel: args.parallel,
66 strict: args.strict,
67 };
68 let lp = DslPlan::HConcat {
69 inputs: lfs.iter().map(|lf| lf.logical_plan.clone()).collect(),
70 options,
71 };
72 Ok(LazyFrame::from_inner(lp, opt_state, cached_arena))
73}
74
75pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
77 concat_impl(inputs, args)
78}
79
80pub fn collect_all<I>(lfs: I) -> PolarsResult<Vec<DataFrame>>
82where
83 I: IntoParallelIterator<Item = LazyFrame>,
84{
85 let iter = lfs.into_par_iter();
86
87 polars_core::POOL.install(|| iter.map(|lf| lf.collect()).collect())
88}
89
90#[cfg(test)]
91mod test {
92 #[allow(unused_imports)]
94 use super::*;
95
96 #[test]
97 #[cfg(feature = "diagonal_concat")]
98 fn test_diag_concat_lf() -> PolarsResult<()> {
99 let a = df![
100 "a" => [1, 2],
101 "b" => ["a", "b"]
102 ]?;
103
104 let b = df![
105 "b" => ["a", "b"],
106 "c" => [1, 2]
107 ]?;
108
109 let c = df![
110 "a" => [5, 7],
111 "c" => [1, 2],
112 "d" => [1, 2]
113 ]?;
114
115 let out = concat_lf_diagonal(
116 &[a.lazy(), b.lazy(), c.lazy()],
117 UnionArgs {
118 rechunk: false,
119 parallel: false,
120 ..Default::default()
121 },
122 )?
123 .collect()?;
124
125 let expected = df![
126 "a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
127 "b" => [Some("a"), Some("b"), Some("a"), Some("b"), None, None],
128 "c" => [None, None, Some(1), Some(2), Some(1), Some(2)],
129 "d" => [None, None, None, None, Some(1), Some(2)]
130 ]?;
131
132 assert!(out.equals_missing(&expected));
133
134 Ok(())
135 }
136}