polars_lazy/physical_plan/streaming/
tree.rs

1use std::collections::BTreeSet;
2use std::fmt::Debug;
3
4use polars_plan::prelude::*;
5
6#[derive(Copy, Clone, Debug)]
7pub(super) enum PipelineNode {
8    Sink(Node),
9    Operator(Node),
10    RhsJoin(Node),
11    Union(Node),
12}
13
14impl PipelineNode {
15    pub(super) fn node(self) -> Node {
16        match self {
17            Self::Sink(node) => node,
18            Self::Operator(node) => node,
19            Self::RhsJoin(node) => node,
20            Self::Union(node) => node,
21        }
22    }
23}
24
25/// Represents a pipeline/ branch in a subquery tree
26#[derive(Default, Debug, Clone)]
27pub(super) struct Branch {
28    // During traversal of ALP
29    // we determine the execution order
30    // as traversal order == execution order
31    // we can increment this counter
32    // the individual branches are then flattened
33    // sorted and executed in reversed order
34    // (to traverse from leaves to root)
35    pub(super) execution_id: u32,
36    pub(super) streamable: bool,
37    pub(super) sources: Vec<Node>,
38    // joins seen in whole branch (we count a union as joins with multiple counts)
39    pub(super) join_count: u32,
40    // node is operator/sink
41    pub(super) operators_sinks: Vec<PipelineNode>,
42}
43
44fn sink_node(pl_node: &PipelineNode) -> Option<Node> {
45    match pl_node {
46        PipelineNode::Sink(node) => Some(*node),
47        _ => None,
48    }
49}
50
51impl Branch {
52    pub(super) fn get_final_sink(&self) -> Option<Node> {
53        // this is still in the order of discovery
54        // so the first sink is the final one.
55        self.operators_sinks.iter().find_map(sink_node)
56    }
57    pub(super) fn split(&self) -> Self {
58        Self {
59            execution_id: self.execution_id,
60            streamable: self.streamable,
61            join_count: self.join_count,
62            ..Default::default()
63        }
64    }
65
66    /// this will share the sink
67    pub(super) fn split_from_sink(&self) -> Self {
68        match self
69            .operators_sinks
70            .iter()
71            .rposition(|pl_node| sink_node(pl_node).is_some())
72        {
73            None => self.split(),
74            Some(pos) => Self {
75                execution_id: self.execution_id,
76                streamable: self.streamable,
77                join_count: self.join_count,
78                operators_sinks: self.operators_sinks[pos..].to_vec(),
79                ..Default::default()
80            },
81        }
82    }
83}
84
85/// Represents a subquery tree of pipelines.
86type TreeRef<'a> = &'a [Branch];
87pub(super) type Tree = Vec<Branch>;
88
89/// We validate a tree in order to check if it is eligible for streaming.
90/// It could be that a join branch wasn't added during collection of branches
91/// (because it contained a non-streamable node). This function checks if every join
92/// node has a match.
93pub(super) fn is_valid_tree(tree: TreeRef) -> bool {
94    if tree.is_empty() {
95        return false;
96    };
97    let joins_in_tree = tree.iter().map(|branch| branch.join_count).sum::<u32>();
98    let branches_in_tree = tree.len() as u32;
99
100    // all join branches should be added, if not we skip the tree, as it is invalid
101    if (branches_in_tree - 1) != joins_in_tree {
102        return false;
103    }
104
105    // rhs joins will initially be placeholders
106    let mut left_joins = BTreeSet::new();
107    for branch in tree {
108        for pl_node in &branch.operators_sinks {
109            if !matches!(pl_node, PipelineNode::RhsJoin(_)) {
110                left_joins.insert(pl_node.node().0);
111            }
112        }
113    }
114    for branch in tree {
115        for pl_node in &branch.operators_sinks {
116            // check if every rhs join has a lhs join node
117            if matches!(pl_node, PipelineNode::RhsJoin(_))
118                && !left_joins.contains(&pl_node.node().0)
119            {
120                return false;
121            }
122        }
123    }
124    true
125}
126
127#[cfg(debug_assertions)]
128#[allow(unused)]
129pub(super) fn dbg_branch(b: &Branch, lp_arena: &Arena<IR>) {
130    // streamable: bool,
131    // sources: Vec<Node>,
132    // // joins seen in whole branch (we count a union as joins with multiple counts)
133    // join_count: IdxSize,
134    // // node is operator/sink
135    // operators_sinks: Vec<(IsSink, IsRhsJoin, Node)>,
136
137    if b.streamable {
138        print!("streamable: ")
139    } else {
140        print!("non-streamable: ")
141    }
142    for src in &b.sources {
143        let lp = lp_arena.get(*src);
144        print!("{}, ", lp.name());
145    }
146    print!("=> ");
147
148    for pl_node in &b.operators_sinks {
149        let lp = lp_arena.get(pl_node.node());
150        if matches!(pl_node, PipelineNode::RhsJoin(_)) {
151            print!("rhs_join_placeholder -> ");
152        } else {
153            print!("{} -> ", lp.name());
154        }
155    }
156    println!();
157}
158
159#[cfg(debug_assertions)]
160#[allow(unused)]
161pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena<IR>, expr_arena: &Arena<AExpr>) {
162    if tree.is_empty() {
163        println!("EMPTY TREE");
164        return;
165    }
166    let root = tree
167        .iter()
168        .map(|branch| {
169            let pl_node = branch.operators_sinks.last().unwrap();
170            pl_node.node()
171        })
172        .max_by_key(|root| {
173            // count the children of this root
174            // the branch with the most children is the root of the whole tree
175            lp_arena.iter(*root).count()
176        })
177        .unwrap();
178
179    println!("SUBPLAN ELIGIBLE FOR STREAMING:");
180    println!(
181        "{}\n",
182        IRPlanRef {
183            lp_top: root,
184            lp_arena,
185            expr_arena
186        }
187        .display()
188    );
189
190    println!("PIPELINE TREE:");
191    for (i, branch) in tree.iter().enumerate() {
192        print!("{i}: ");
193        dbg_branch(branch, lp_arena);
194    }
195}