polars_lazy/physical_plan/streaming/
construct_pipeline.rs
1use std::cell::RefCell;
2use std::rc::Rc;
3use std::sync::Mutex;
4
5use polars_core::config::verbose;
6use polars_core::prelude::*;
7use polars_expr::{ExpressionConversionState, create_physical_expr};
8use polars_io::predicates::{PhysicalIoExpr, StatsEvaluator};
9use polars_pipe::expressions::PhysicalPipedExpr;
10use polars_pipe::operators::chunks::DataChunk;
11use polars_pipe::pipeline::{
12 CallBacks, PipeLine, create_pipeline, execute_pipeline, get_dummy_operator, get_operator,
13};
14use polars_plan::prelude::expr_ir::ExprIR;
15
16use crate::physical_plan::streaming::tree::{PipelineNode, Tree};
17use crate::prelude::*;
18
19pub struct Wrap(Arc<dyn PhysicalExpr>);
20
21impl PhysicalIoExpr for Wrap {
22 fn evaluate_io(&self, df: &DataFrame) -> PolarsResult<Series> {
23 let h = PhysicalIoHelper {
24 expr: self.0.clone(),
25 has_window_function: false,
26 };
27 h.evaluate_io(df)
28 }
29 fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
30 self.0.as_stats_evaluator()
31 }
32}
33impl PhysicalPipedExpr for Wrap {
34 fn evaluate(&self, chunk: &DataChunk, state: &ExecutionState) -> PolarsResult<Series> {
35 self.0
36 .evaluate(&chunk.data, state)
37 .map(|c| c.take_materialized_series())
38 }
39 fn field(&self, input_schema: &Schema) -> PolarsResult<Field> {
40 self.0.to_field(input_schema)
41 }
42
43 fn expression(&self) -> Expr {
44 self.0.as_expression().unwrap().clone()
45 }
46}
47
48fn to_physical_piped_expr(
49 expr: &ExprIR,
50 expr_arena: &Arena<AExpr>,
51 schema: &SchemaRef,
52) -> PolarsResult<Arc<dyn PhysicalPipedExpr>> {
53 create_physical_expr(
55 expr,
56 Context::Default,
57 expr_arena,
58 schema,
59 &mut ExpressionConversionState::new(false),
60 )
61 .map(|e| Arc::new(Wrap(e)) as Arc<dyn PhysicalPipedExpr>)
62}
63
64fn jit_insert_slice(
65 node: Node,
66 lp_arena: &mut Arena<IR>,
67 sink_nodes: &mut Vec<(usize, Node, Rc<RefCell<u32>>)>,
68 operator_offset: usize,
69) {
70 use IR::*;
77 let (offset, len) = match lp_arena.get(node) {
78 Join { options, .. } if options.args.slice.is_some() => {
79 let Some((offset, len)) = options.args.slice else {
80 unreachable!()
81 };
82 (offset, len)
83 },
84 _ => return,
85 };
86
87 let slice_node = lp_arena.add(Slice {
88 input: node,
89 offset,
90 len: len as IdxSize,
91 });
92 sink_nodes.push((operator_offset + 1, slice_node, Rc::new(RefCell::new(1))));
93}
94
95pub(super) fn construct(
96 tree: Tree,
97 lp_arena: &mut Arena<IR>,
98 expr_arena: &mut Arena<AExpr>,
99 fmt: bool,
100) -> PolarsResult<Option<Node>> {
101 use IR::*;
102
103 let mut pipelines = Vec::with_capacity(tree.len());
104 let mut callbacks = CallBacks::new();
105
106 let is_verbose = verbose();
107
108 let mut sink_share_count = PlHashMap::new();
113 let n_branches = tree.len();
114 if n_branches > 1 {
115 for branch in &tree {
116 for op in branch.operators_sinks.iter() {
117 match op {
118 PipelineNode::Sink(sink) => {
119 let count = sink_share_count
120 .entry(sink.0)
121 .or_insert(Rc::new(RefCell::new(0u32)));
122 *count.borrow_mut() += 1;
123 },
124 PipelineNode::RhsJoin(node) => {
125 let _ = callbacks.insert(*node, get_dummy_operator());
126 },
127 _ => {},
128 }
129 }
130 }
131 }
132
133 let mut sink_cache = PlHashMap::new();
136 let mut final_sink = None;
137
138 for branch in tree {
139 if let Some(node) = branch.get_final_sink() {
142 if matches!(lp_arena.get(node), IR::Sink { .. }) {
143 final_sink = Some(node)
144 }
145 }
146 let mut sink_nodes = vec![];
148
149 let mut operators = Vec::with_capacity(branch.operators_sinks.len());
150 let mut operator_nodes = Vec::with_capacity(branch.operators_sinks.len());
151
152 let mut iter = branch.operators_sinks.into_iter().rev();
154
155 for pipeline_node in &mut iter {
156 let operator_offset = operators.len();
157 match pipeline_node {
158 PipelineNode::Sink(node) => {
159 let shared_count = if n_branches > 1 {
160 sink_share_count.get(&node.0).unwrap().clone()
162 } else {
163 Rc::new(RefCell::new(1))
164 };
165 sink_nodes.push((operator_offset, node, shared_count))
166 },
167 PipelineNode::Operator(node) => {
168 operator_nodes.push(node);
169 let op = get_operator(node, lp_arena, expr_arena, &to_physical_piped_expr)?;
170 operators.push(op);
171 },
172 PipelineNode::Union(node) => {
173 operator_nodes.push(node);
174 let op = get_operator(node, lp_arena, expr_arena, &to_physical_piped_expr)?;
175 operators.push(op);
176 },
177 PipelineNode::RhsJoin(node) => {
178 operator_nodes.push(node);
179 jit_insert_slice(node, lp_arena, &mut sink_nodes, operator_offset);
180 let op = callbacks.get(&node).unwrap().clone();
181 operators.push(Box::new(op))
182 },
183 }
184 }
185
186 let pipeline = create_pipeline(
187 &branch.sources,
188 operators,
189 sink_nodes,
190 lp_arena,
191 expr_arena,
192 to_physical_piped_expr,
193 is_verbose,
194 &mut sink_cache,
195 &mut callbacks,
196 )?;
197 pipelines.push(pipeline);
198 }
199
200 let Some(final_sink) = final_sink else {
201 return Ok(None);
202 };
203 let insertion_location = match lp_arena.get(final_sink) {
204 Sink {
208 input,
209 payload: SinkTypeIR::Memory,
210 } => *input,
211 Sink { .. } => final_sink,
214 _ => unreachable!(),
215 };
216 let original_lp = if fmt {
218 let original_lp = IRPlan::new(insertion_location, lp_arena.clone(), expr_arena.clone());
219 Some(original_lp)
220 } else {
221 None
222 };
223
224 let schema = lp_arena
226 .get(insertion_location)
227 .schema(lp_arena)
228 .into_owned();
229 let pipeline_node = get_pipeline_node(lp_arena, pipelines, schema, original_lp);
230 lp_arena.replace(insertion_location, pipeline_node);
231
232 Ok(Some(final_sink))
233}
234
235fn get_pipeline_node(
236 lp_arena: &mut Arena<IR>,
237 mut pipelines: Vec<PipeLine>,
238 schema: SchemaRef,
239 original_lp: Option<IRPlan>,
240) -> IR {
241 let dummy = lp_arena.add(IR::DataFrameScan {
244 df: Arc::new(DataFrame::empty()),
245 schema: Arc::new(Schema::default()),
246 output_schema: None,
247 });
248
249 IR::MapFunction {
250 function: FunctionIR::Pipeline {
251 function: Arc::new(Mutex::new(move |_df: DataFrame| {
252 let state = ExecutionState::new();
253 if state.verbose() {
254 eprintln!("RUN STREAMING PIPELINE");
255 eprintln!("{:?}", &pipelines)
256 }
257 execute_pipeline(state, std::mem::take(&mut pipelines))
258 })),
259 schema,
260 original: original_lp.map(Arc::new),
261 },
262 input: dummy,
263 }
264}