polars_lazy/physical_plan/streaming/
tree.rs
1use std::collections::BTreeSet;
2use std::fmt::Debug;
3
4use polars_plan::prelude::*;
5
6#[derive(Copy, Clone, Debug)]
7pub(super) enum PipelineNode {
8 Sink(Node),
9 Operator(Node),
10 RhsJoin(Node),
11 Union(Node),
12}
13
14impl PipelineNode {
15 pub(super) fn node(self) -> Node {
16 match self {
17 Self::Sink(node) => node,
18 Self::Operator(node) => node,
19 Self::RhsJoin(node) => node,
20 Self::Union(node) => node,
21 }
22 }
23}
24
25#[derive(Default, Debug, Clone)]
27pub(super) struct Branch {
28 pub(super) execution_id: u32,
36 pub(super) streamable: bool,
37 pub(super) sources: Vec<Node>,
38 pub(super) join_count: u32,
40 pub(super) operators_sinks: Vec<PipelineNode>,
42}
43
44fn sink_node(pl_node: &PipelineNode) -> Option<Node> {
45 match pl_node {
46 PipelineNode::Sink(node) => Some(*node),
47 _ => None,
48 }
49}
50
51impl Branch {
52 pub(super) fn get_final_sink(&self) -> Option<Node> {
53 self.operators_sinks.iter().find_map(sink_node)
56 }
57 pub(super) fn split(&self) -> Self {
58 Self {
59 execution_id: self.execution_id,
60 streamable: self.streamable,
61 join_count: self.join_count,
62 ..Default::default()
63 }
64 }
65
66 pub(super) fn split_from_sink(&self) -> Self {
68 match self
69 .operators_sinks
70 .iter()
71 .rposition(|pl_node| sink_node(pl_node).is_some())
72 {
73 None => self.split(),
74 Some(pos) => Self {
75 execution_id: self.execution_id,
76 streamable: self.streamable,
77 join_count: self.join_count,
78 operators_sinks: self.operators_sinks[pos..].to_vec(),
79 ..Default::default()
80 },
81 }
82 }
83}
84
85type TreeRef<'a> = &'a [Branch];
87pub(super) type Tree = Vec<Branch>;
88
89pub(super) fn is_valid_tree(tree: TreeRef) -> bool {
94 if tree.is_empty() {
95 return false;
96 };
97 let joins_in_tree = tree.iter().map(|branch| branch.join_count).sum::<u32>();
98 let branches_in_tree = tree.len() as u32;
99
100 if (branches_in_tree - 1) != joins_in_tree {
102 return false;
103 }
104
105 let mut left_joins = BTreeSet::new();
107 for branch in tree {
108 for pl_node in &branch.operators_sinks {
109 if !matches!(pl_node, PipelineNode::RhsJoin(_)) {
110 left_joins.insert(pl_node.node().0);
111 }
112 }
113 }
114 for branch in tree {
115 for pl_node in &branch.operators_sinks {
116 if matches!(pl_node, PipelineNode::RhsJoin(_))
118 && !left_joins.contains(&pl_node.node().0)
119 {
120 return false;
121 }
122 }
123 }
124 true
125}
126
127#[cfg(debug_assertions)]
128#[allow(unused)]
129pub(super) fn dbg_branch(b: &Branch, lp_arena: &Arena<IR>) {
130 if b.streamable {
138 print!("streamable: ")
139 } else {
140 print!("non-streamable: ")
141 }
142 for src in &b.sources {
143 let lp = lp_arena.get(*src);
144 print!("{}, ", lp.name());
145 }
146 print!("=> ");
147
148 for pl_node in &b.operators_sinks {
149 let lp = lp_arena.get(pl_node.node());
150 if matches!(pl_node, PipelineNode::RhsJoin(_)) {
151 print!("rhs_join_placeholder -> ");
152 } else {
153 print!("{} -> ", lp.name());
154 }
155 }
156 println!();
157}
158
159#[cfg(debug_assertions)]
160#[allow(unused)]
161pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena<IR>, expr_arena: &Arena<AExpr>) {
162 if tree.is_empty() {
163 println!("EMPTY TREE");
164 return;
165 }
166 let root = tree
167 .iter()
168 .map(|branch| {
169 let pl_node = branch.operators_sinks.last().unwrap();
170 pl_node.node()
171 })
172 .max_by_key(|root| {
173 lp_arena.iter(*root).count()
176 })
177 .unwrap();
178
179 println!("SUBPLAN ELIGIBLE FOR STREAMING:");
180 println!(
181 "{}\n",
182 IRPlanRef {
183 lp_top: root,
184 lp_arena,
185 expr_arena
186 }
187 .display()
188 );
189
190 println!("PIPELINE TREE:");
191 for (i, branch) in tree.iter().enumerate() {
192 print!("{i}: ");
193 dbg_branch(branch, lp_arena);
194 }
195}