polars_lazy/physical_plan/streaming/
construct_pipeline.rs

1use std::cell::RefCell;
2use std::rc::Rc;
3use std::sync::Mutex;
4
5use polars_core::config::verbose;
6use polars_core::prelude::*;
7use polars_expr::{ExpressionConversionState, create_physical_expr};
8use polars_io::predicates::{PhysicalIoExpr, StatsEvaluator};
9use polars_pipe::expressions::PhysicalPipedExpr;
10use polars_pipe::operators::chunks::DataChunk;
11use polars_pipe::pipeline::{
12    CallBacks, PipeLine, create_pipeline, execute_pipeline, get_dummy_operator, get_operator,
13};
14use polars_plan::prelude::expr_ir::ExprIR;
15
16use crate::physical_plan::streaming::tree::{PipelineNode, Tree};
17use crate::prelude::*;
18
19pub struct Wrap(Arc<dyn PhysicalExpr>);
20
21impl PhysicalIoExpr for Wrap {
22    fn evaluate_io(&self, df: &DataFrame) -> PolarsResult<Series> {
23        let h = PhysicalIoHelper {
24            expr: self.0.clone(),
25            has_window_function: false,
26        };
27        h.evaluate_io(df)
28    }
29    fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
30        self.0.as_stats_evaluator()
31    }
32}
33impl PhysicalPipedExpr for Wrap {
34    fn evaluate(&self, chunk: &DataChunk, state: &ExecutionState) -> PolarsResult<Series> {
35        self.0
36            .evaluate(&chunk.data, state)
37            .map(|c| c.take_materialized_series())
38    }
39    fn field(&self, input_schema: &Schema) -> PolarsResult<Field> {
40        self.0.to_field(input_schema)
41    }
42
43    fn expression(&self) -> Expr {
44        self.0.as_expression().unwrap().clone()
45    }
46}
47
48fn to_physical_piped_expr(
49    expr: &ExprIR,
50    expr_arena: &Arena<AExpr>,
51    schema: &SchemaRef,
52) -> PolarsResult<Arc<dyn PhysicalPipedExpr>> {
53    // this is a double Arc<dyn> explore if we can create a single of it.
54    create_physical_expr(
55        expr,
56        Context::Default,
57        expr_arena,
58        schema,
59        &mut ExpressionConversionState::new(false),
60    )
61    .map(|e| Arc::new(Wrap(e)) as Arc<dyn PhysicalPipedExpr>)
62}
63
64fn jit_insert_slice(
65    node: Node,
66    lp_arena: &mut Arena<IR>,
67    sink_nodes: &mut Vec<(usize, Node, Rc<RefCell<u32>>)>,
68    operator_offset: usize,
69) {
70    // if the join has a slice, we add a new slice node
71    // note that we take the offset + 1, because we want to
72    // slice AFTER the join has happened and the join will be an
73    // operator
74    // NOTE: Don't do this for union, that doesn't work.
75    // TODO! Deal with this in the optimizer.
76    use IR::*;
77    let (offset, len) = match lp_arena.get(node) {
78        Join { options, .. } if options.args.slice.is_some() => {
79            let Some((offset, len)) = options.args.slice else {
80                unreachable!()
81            };
82            (offset, len)
83        },
84        _ => return,
85    };
86
87    let slice_node = lp_arena.add(Slice {
88        input: node,
89        offset,
90        len: len as IdxSize,
91    });
92    sink_nodes.push((operator_offset + 1, slice_node, Rc::new(RefCell::new(1))));
93}
94
95pub(super) fn construct(
96    tree: Tree,
97    lp_arena: &mut Arena<IR>,
98    expr_arena: &mut Arena<AExpr>,
99    fmt: bool,
100) -> PolarsResult<Option<Node>> {
101    use IR::*;
102
103    let mut pipelines = Vec::with_capacity(tree.len());
104    let mut callbacks = CallBacks::new();
105
106    let is_verbose = verbose();
107
108    // First traverse the branches and nodes to determine how often a sink is
109    // shared.
110    // This shared count will be used in the pipeline to determine
111    // when the sink can be finalized.
112    let mut sink_share_count = PlHashMap::new();
113    let n_branches = tree.len();
114    if n_branches > 1 {
115        for branch in &tree {
116            for op in branch.operators_sinks.iter() {
117                match op {
118                    PipelineNode::Sink(sink) => {
119                        let count = sink_share_count
120                            .entry(sink.0)
121                            .or_insert(Rc::new(RefCell::new(0u32)));
122                        *count.borrow_mut() += 1;
123                    },
124                    PipelineNode::RhsJoin(node) => {
125                        let _ = callbacks.insert(*node, get_dummy_operator());
126                    },
127                    _ => {},
128                }
129            }
130        }
131    }
132
133    // Shared sinks are stored in a cache, so that they share state.
134    // If the shared sink is already in cache, that one is used.
135    let mut sink_cache = PlHashMap::new();
136    let mut final_sink = None;
137
138    for branch in tree {
139        // The file sink is always to the top of the tree
140        // not every branch has a final sink. For instance rhs join branches
141        if let Some(node) = branch.get_final_sink() {
142            if matches!(lp_arena.get(node), IR::Sink { .. }) {
143                final_sink = Some(node)
144            }
145        }
146        // should be reset for every branch
147        let mut sink_nodes = vec![];
148
149        let mut operators = Vec::with_capacity(branch.operators_sinks.len());
150        let mut operator_nodes = Vec::with_capacity(branch.operators_sinks.len());
151
152        // iterate from leaves upwards
153        let mut iter = branch.operators_sinks.into_iter().rev();
154
155        for pipeline_node in &mut iter {
156            let operator_offset = operators.len();
157            match pipeline_node {
158                PipelineNode::Sink(node) => {
159                    let shared_count = if n_branches > 1 {
160                        // should be here
161                        sink_share_count.get(&node.0).unwrap().clone()
162                    } else {
163                        Rc::new(RefCell::new(1))
164                    };
165                    sink_nodes.push((operator_offset, node, shared_count))
166                },
167                PipelineNode::Operator(node) => {
168                    operator_nodes.push(node);
169                    let op = get_operator(node, lp_arena, expr_arena, &to_physical_piped_expr)?;
170                    operators.push(op);
171                },
172                PipelineNode::Union(node) => {
173                    operator_nodes.push(node);
174                    let op = get_operator(node, lp_arena, expr_arena, &to_physical_piped_expr)?;
175                    operators.push(op);
176                },
177                PipelineNode::RhsJoin(node) => {
178                    operator_nodes.push(node);
179                    jit_insert_slice(node, lp_arena, &mut sink_nodes, operator_offset);
180                    let op = callbacks.get(&node).unwrap().clone();
181                    operators.push(Box::new(op))
182                },
183            }
184        }
185
186        let pipeline = create_pipeline(
187            &branch.sources,
188            operators,
189            sink_nodes,
190            lp_arena,
191            expr_arena,
192            to_physical_piped_expr,
193            is_verbose,
194            &mut sink_cache,
195            &mut callbacks,
196        )?;
197        pipelines.push(pipeline);
198    }
199
200    let Some(final_sink) = final_sink else {
201        return Ok(None);
202    };
203    let insertion_location = match lp_arena.get(final_sink) {
204        // this was inserted only during conversion and does not exist
205        // in the original tree, so we take the input, as that's where
206        // we connect into the original tree.
207        Sink {
208            input,
209            payload: SinkTypeIR::Memory,
210        } => *input,
211        // Other sinks were not inserted during conversion,
212        // so they are returned as-is
213        Sink { .. } => final_sink,
214        _ => unreachable!(),
215    };
216    // keep the original around for formatting purposes
217    let original_lp = if fmt {
218        let original_lp = IRPlan::new(insertion_location, lp_arena.clone(), expr_arena.clone());
219        Some(original_lp)
220    } else {
221        None
222    };
223
224    // Replace the part of the logical plan with a `MapFunction` that will execute the pipeline.
225    let schema = lp_arena
226        .get(insertion_location)
227        .schema(lp_arena)
228        .into_owned();
229    let pipeline_node = get_pipeline_node(lp_arena, pipelines, schema, original_lp);
230    lp_arena.replace(insertion_location, pipeline_node);
231
232    Ok(Some(final_sink))
233}
234
235fn get_pipeline_node(
236    lp_arena: &mut Arena<IR>,
237    mut pipelines: Vec<PipeLine>,
238    schema: SchemaRef,
239    original_lp: Option<IRPlan>,
240) -> IR {
241    // create a dummy input as the map function will call the input
242    // so we just create a scan that returns an empty df
243    let dummy = lp_arena.add(IR::DataFrameScan {
244        df: Arc::new(DataFrame::empty()),
245        schema: Arc::new(Schema::default()),
246        output_schema: None,
247    });
248
249    IR::MapFunction {
250        function: FunctionIR::Pipeline {
251            function: Arc::new(Mutex::new(move |_df: DataFrame| {
252                let state = ExecutionState::new();
253                if state.verbose() {
254                    eprintln!("RUN STREAMING PIPELINE");
255                    eprintln!("{:?}", &pipelines)
256                }
257                execute_pipeline(state, std::mem::take(&mut pipelines))
258            })),
259            schema,
260            original: original_lp.map(Arc::new),
261        },
262        input: dummy,
263    }
264}