Skip to main content

polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::error::feature_gated;
26#[cfg(feature = "pivot")]
27use polars_core::frame::PivotColumnNaming;
28use polars_core::prelude::*;
29use polars_core::query_result::QueryResult;
30use polars_io::RowIndex;
31use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38
39use crate::frame::cached_arenas::CachedArena;
40use crate::prelude::*;
41
42pub trait IntoLazy {
43    fn lazy(self) -> LazyFrame;
44}
45
46impl IntoLazy for DataFrame {
47    /// Convert the `DataFrame` into a `LazyFrame`
48    fn lazy(self) -> LazyFrame {
49        let lp = DslBuilder::from_existing_df(self).build();
50        LazyFrame {
51            logical_plan: lp,
52            opt_state: Default::default(),
53            cached_arena: Default::default(),
54        }
55    }
56}
57
58impl IntoLazy for LazyFrame {
59    fn lazy(self) -> LazyFrame {
60        self
61    }
62}
63
64/// Lazy abstraction over an eager `DataFrame`.
65///
66/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
67/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
68#[derive(Clone, Default)]
69#[must_use]
70pub struct LazyFrame {
71    pub logical_plan: DslPlan,
72    pub(crate) opt_state: OptFlags,
73    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
74}
75
76impl From<DslPlan> for LazyFrame {
77    fn from(plan: DslPlan) -> Self {
78        Self {
79            logical_plan: plan,
80            opt_state: OptFlags::default(),
81            cached_arena: Default::default(),
82        }
83    }
84}
85
86impl LazyFrame {
87    pub(crate) fn from_inner(
88        logical_plan: DslPlan,
89        opt_state: OptFlags,
90        cached_arena: Arc<Mutex<Option<CachedArena>>>,
91    ) -> Self {
92        Self {
93            logical_plan,
94            opt_state,
95            cached_arena,
96        }
97    }
98
99    pub(crate) fn get_plan_builder(self) -> DslBuilder {
100        DslBuilder::from(self.logical_plan)
101    }
102
103    fn get_opt_state(&self) -> OptFlags {
104        self.opt_state
105    }
106
107    pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
108        LazyFrame {
109            logical_plan,
110            opt_state,
111            cached_arena: Default::default(),
112        }
113    }
114
115    /// Get current optimizations.
116    pub fn get_current_optimizations(&self) -> OptFlags {
117        self.opt_state
118    }
119
120    /// Set allowed optimizations.
121    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
122        self.opt_state = opt_state;
123        self
124    }
125
126    /// Turn off all optimizations.
127    pub fn without_optimizations(self) -> Self {
128        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
129    }
130
131    /// Toggle projection pushdown optimization.
132    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
133        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
134        self
135    }
136
137    /// Toggle cluster with columns optimization.
138    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
139        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
140        self
141    }
142
143    /// Check if operations are order dependent and unset maintaining_order if
144    /// the order would not be observed.
145    pub fn with_check_order(mut self, toggle: bool) -> Self {
146        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
147        self
148    }
149
150    /// Toggle predicate pushdown optimization.
151    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
152        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
153        self
154    }
155
156    /// Toggle type coercion optimization.
157    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
158        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
159        self
160    }
161
162    /// Toggle type check optimization.
163    pub fn with_type_check(mut self, toggle: bool) -> Self {
164        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
165        self
166    }
167
168    /// Toggle expression simplification optimization on or off.
169    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
170        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
171        self
172    }
173
174    /// Toggle common subplan elimination optimization on or off
175    #[cfg(feature = "cse")]
176    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
177        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
178        self
179    }
180
181    /// Toggle common subexpression elimination optimization on or off
182    #[cfg(feature = "cse")]
183    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
184        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
185        self
186    }
187
188    /// Toggle slice pushdown optimization.
189    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
190        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
191        self
192    }
193
194    #[cfg(feature = "new_streaming")]
195    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
196        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
197        self
198    }
199
200    pub fn with_gpu(mut self, toggle: bool) -> Self {
201        self.opt_state.set(OptFlags::GPU, toggle);
202        self
203    }
204
205    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
206    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
207        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
208        self
209    }
210
211    /// Run every node eagerly. This turns off multi-node optimizations.
212    pub fn _with_eager(mut self, toggle: bool) -> Self {
213        self.opt_state.set(OptFlags::EAGER, toggle);
214        self
215    }
216
217    /// Return a String describing the naive (un-optimized) logical plan.
218    pub fn describe_plan(&self) -> PolarsResult<String> {
219        Ok(self.clone().to_alp()?.describe())
220    }
221
222    /// Return a String describing the naive (un-optimized) logical plan in tree format.
223    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
224        Ok(self.clone().to_alp()?.describe_tree_format())
225    }
226
227    /// Return a String describing the optimized logical plan.
228    ///
229    /// Returns `Err` if optimizing the logical plan fails.
230    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
231        Ok(self.clone().to_alp_optimized()?.describe())
232    }
233
234    /// Return a String describing the optimized logical plan in tree format.
235    ///
236    /// Returns `Err` if optimizing the logical plan fails.
237    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
238        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
239    }
240
241    /// Return a String describing the logical plan.
242    ///
243    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
244    /// explains the naive, un-optimized plan.
245    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
246        if optimized {
247            self.describe_optimized_plan()
248        } else {
249            self.describe_plan()
250        }
251    }
252
253    /// Add a sort operation to the logical plan.
254    ///
255    /// Sorts the LazyFrame by the column name specified using the provided options.
256    ///
257    /// # Example
258    ///
259    /// Sort DataFrame by 'sepal_width' column:
260    /// ```rust
261    /// # use polars_core::prelude::*;
262    /// # use polars_lazy::prelude::*;
263    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
264    ///     df.lazy().sort(["sepal_width"], Default::default())
265    /// }
266    /// ```
267    /// Sort by a single column with specific order:
268    /// ```
269    /// # use polars_core::prelude::*;
270    /// # use polars_lazy::prelude::*;
271    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
272    ///     df.lazy().sort(
273    ///         ["sepal_width"],
274    ///         SortMultipleOptions::new()
275    ///             .with_order_descending(descending)
276    ///     )
277    /// }
278    /// ```
279    /// Sort by multiple columns with specifying order for each column:
280    /// ```
281    /// # use polars_core::prelude::*;
282    /// # use polars_lazy::prelude::*;
283    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
284    ///     df.lazy().sort(
285    ///         ["sepal_width", "sepal_length"],
286    ///         SortMultipleOptions::new()
287    ///             .with_order_descending_multi([false, true])
288    ///     )
289    /// }
290    /// ```
291    /// See [`SortMultipleOptions`] for more options.
292    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
293        let opt_state = self.get_opt_state();
294        let lp = self
295            .get_plan_builder()
296            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
297            .build();
298        Self::from_logical_plan(lp, opt_state)
299    }
300
301    /// Add a sort operation to the logical plan.
302    ///
303    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
304    /// concrete columns before sorting.
305    ///
306    /// See [`SortMultipleOptions`] for more options.
307    ///
308    /// # Example
309    ///
310    /// ```rust
311    /// use polars_core::prelude::*;
312    /// use polars_lazy::prelude::*;
313    ///
314    /// /// Sort DataFrame by 'sepal_width' column
315    /// fn example(df: DataFrame) -> LazyFrame {
316    ///       df.lazy()
317    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
318    /// }
319    /// ```
320    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
321        self,
322        by_exprs: E,
323        sort_options: SortMultipleOptions,
324    ) -> Self {
325        let by_exprs = by_exprs.as_ref().to_vec();
326        if by_exprs.is_empty() {
327            self
328        } else {
329            let opt_state = self.get_opt_state();
330            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
331            Self::from_logical_plan(lp, opt_state)
332        }
333    }
334
335    pub fn top_k<E: AsRef<[Expr]>>(
336        self,
337        k: IdxSize,
338        by_exprs: E,
339        sort_options: SortMultipleOptions,
340    ) -> Self {
341        // this will optimize to top-k
342        self.sort_by_exprs(
343            by_exprs,
344            sort_options.with_order_reversed().with_nulls_last(true),
345        )
346        .slice(0, k)
347    }
348
349    pub fn bottom_k<E: AsRef<[Expr]>>(
350        self,
351        k: IdxSize,
352        by_exprs: E,
353        sort_options: SortMultipleOptions,
354    ) -> Self {
355        // this will optimize to bottom-k
356        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
357            .slice(0, k)
358    }
359
360    /// Reverse the `DataFrame` from top to bottom.
361    ///
362    /// Row `i` becomes row `number_of_rows - i - 1`.
363    ///
364    /// # Example
365    ///
366    /// ```rust
367    /// use polars_core::prelude::*;
368    /// use polars_lazy::prelude::*;
369    ///
370    /// fn example(df: DataFrame) -> LazyFrame {
371    ///       df.lazy()
372    ///         .reverse()
373    /// }
374    /// ```
375    pub fn reverse(self) -> Self {
376        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
377    }
378
379    /// Rename columns in the DataFrame.
380    ///
381    /// `existing` and `new` are iterables of the same length containing the old and
382    /// corresponding new column names. Renaming happens to all `existing` columns
383    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
384    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
385    /// those columns that are actually found will be renamed (others will be ignored).
386    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
387    where
388        I: IntoIterator<Item = T>,
389        J: IntoIterator<Item = S>,
390        T: AsRef<str>,
391        S: AsRef<str>,
392    {
393        let iter = existing.into_iter();
394        let cap = iter.size_hint().0;
395        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
396        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
397
398        // TODO! should this error if `existing` and `new` have different lengths?
399        // Currently, the longer of the two is truncated.
400        for (existing, new) in iter.zip(new) {
401            let existing = existing.as_ref();
402            let new = new.as_ref();
403            if new != existing {
404                existing_vec.push(existing.into());
405                new_vec.push(new.into());
406            }
407        }
408
409        self.map_private(DslFunction::Rename {
410            existing: existing_vec.into(),
411            new: new_vec.into(),
412            strict,
413        })
414    }
415
416    /// Removes columns from the DataFrame.
417    /// Note that it's better to only select the columns you need
418    /// and let the projection pushdown optimize away the unneeded columns.
419    ///
420    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
421    /// error while materializing the [`LazyFrame`].
422    pub fn drop(self, columns: Selector) -> Self {
423        let opt_state = self.get_opt_state();
424        let lp = self.get_plan_builder().drop(columns).build();
425        Self::from_logical_plan(lp, opt_state)
426    }
427
428    /// Shift the values by a given period and fill the parts that will be empty due to this operation
429    /// with `Nones`.
430    ///
431    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
432    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
433        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
434    }
435
436    /// Shift the values by a given period and fill the parts that will be empty due to this operation
437    /// with the result of the `fill_value` expression.
438    ///
439    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
440    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
441        self.select(vec![
442            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
443        ])
444    }
445
446    /// Fill None values in the DataFrame with an expression.
447    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448        let opt_state = self.get_opt_state();
449        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
450        Self::from_logical_plan(lp, opt_state)
451    }
452
453    /// Fill NaN values in the DataFrame with an expression.
454    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
455        let opt_state = self.get_opt_state();
456        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
457        Self::from_logical_plan(lp, opt_state)
458    }
459
460    /// Caches the result into a new LazyFrame.
461    ///
462    /// This should be used to prevent computations running multiple times.
463    pub fn cache(self) -> Self {
464        let opt_state = self.get_opt_state();
465        let lp = self.get_plan_builder().cache().build();
466        Self::from_logical_plan(lp, opt_state)
467    }
468
469    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
470    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
471        let cast_cols: Vec<Expr> = dtypes
472            .into_iter()
473            .map(|(name, dt)| {
474                let name = PlSmallStr::from_str(name);
475
476                if strict {
477                    col(name).strict_cast(dt)
478                } else {
479                    col(name).cast(dt)
480                }
481            })
482            .collect();
483
484        if cast_cols.is_empty() {
485            self
486        } else {
487            self.with_columns(cast_cols)
488        }
489    }
490
491    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
492    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
493        self.with_columns(vec![if strict {
494            col(PlSmallStr::from_static("*")).strict_cast(dtype)
495        } else {
496            col(PlSmallStr::from_static("*")).cast(dtype)
497        }])
498    }
499
500    pub fn optimize(
501        self,
502        lp_arena: &mut Arena<IR>,
503        expr_arena: &mut Arena<AExpr>,
504    ) -> PolarsResult<Node> {
505        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
506    }
507
508    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
509        let (mut lp_arena, mut expr_arena) = self.get_arenas();
510        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
511
512        Ok(IRPlan::new(node, lp_arena, expr_arena))
513    }
514
515    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
516        let (mut lp_arena, mut expr_arena) = self.get_arenas();
517        let node = to_alp(
518            self.logical_plan,
519            &mut expr_arena,
520            &mut lp_arena,
521            &mut self.opt_state,
522        )?;
523        let plan = IRPlan::new(node, lp_arena, expr_arena);
524        Ok(plan)
525    }
526
527    pub(crate) fn optimize_with_scratch(
528        self,
529        lp_arena: &mut Arena<IR>,
530        expr_arena: &mut Arena<AExpr>,
531        scratch: &mut Vec<Node>,
532    ) -> PolarsResult<Node> {
533        let lp_top = optimize(
534            self.logical_plan,
535            self.opt_state,
536            lp_arena,
537            expr_arena,
538            scratch,
539            apply_scan_predicate_to_scan_ir,
540        )?;
541
542        Ok(lp_top)
543    }
544
545    fn prepare_collect_post_opt<P>(
546        mut self,
547        check_sink: bool,
548        query_start: Option<std::time::Instant>,
549        post_opt: P,
550    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
551    where
552        P: FnOnce(
553            Node,
554            &mut Arena<IR>,
555            &mut Arena<AExpr>,
556            Option<std::time::Duration>,
557        ) -> PolarsResult<()>,
558    {
559        let (mut lp_arena, mut expr_arena) = self.get_arenas();
560
561        let mut scratch = vec![];
562        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
563
564        post_opt(
565            lp_top,
566            &mut lp_arena,
567            &mut expr_arena,
568            // Post optimization callback gets the time since the
569            // query was started as its "base" timepoint.
570            query_start.map(|s| s.elapsed()),
571        )?;
572
573        // sink should be replaced
574        let no_file_sink = if check_sink {
575            !matches!(
576                lp_arena.get(lp_top),
577                IR::Sink {
578                    payload: SinkTypeIR::File { .. },
579                    ..
580                }
581            )
582        } else {
583            true
584        };
585        let physical_plan = create_physical_plan(
586            lp_top,
587            &mut lp_arena,
588            &mut expr_arena,
589            BUILD_STREAMING_EXECUTOR,
590        )?;
591
592        let state = ExecutionState::new();
593        Ok((state, physical_plan, no_file_sink))
594    }
595
596    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
597    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
598    where
599        P: FnOnce(
600            Node,
601            &mut Arena<IR>,
602            &mut Arena<AExpr>,
603            Option<std::time::Duration>,
604        ) -> PolarsResult<()>,
605    {
606        let (mut state, mut physical_plan, _) =
607            self.prepare_collect_post_opt(false, None, post_opt)?;
608        physical_plan.execute(&mut state)
609    }
610
611    #[allow(unused_mut)]
612    fn prepare_collect(
613        self,
614        check_sink: bool,
615        query_start: Option<std::time::Instant>,
616    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
617        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
618    }
619
620    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
621    /// `engine`.
622    ///
623    /// The query is optimized prior to execution.
624    pub fn collect_with_engine(mut self, engine: Engine) -> PolarsResult<QueryResult> {
625        let engine = match engine {
626            Engine::Streaming => Engine::Streaming,
627            _ if std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1") => {
628                Engine::Streaming
629            },
630            Engine::Auto => Engine::InMemory,
631            v => v,
632        };
633
634        if engine != Engine::Streaming
635            && std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1")
636        {
637            feature_gated!("new_streaming", {
638                if let Some(r) = self.clone()._collect_with_streaming_suppress_todo_panic() {
639                    return r;
640                }
641            })
642        }
643        match engine {
644            Engine::Streaming => {
645                feature_gated!("new_streaming", self = self.with_new_streaming(true))
646            },
647            Engine::Gpu => self = self.with_gpu(true),
648            _ => (),
649        }
650
651        let mut ir_plan = self.to_alp_optimized()?;
652
653        ir_plan.ensure_root_node_is_sink();
654
655        match engine {
656            Engine::Streaming => feature_gated!("new_streaming", {
657                polars_stream::run_query(
658                    ir_plan.lp_top,
659                    &mut ir_plan.lp_arena,
660                    &mut ir_plan.expr_arena,
661                )
662            }),
663            Engine::InMemory | Engine::Gpu => {
664                if let IR::SinkMultiple { inputs } = ir_plan.root() {
665                    polars_ensure!(
666                        engine != Engine::Gpu,
667                        InvalidOperation:
668                        "collect_all is not supported for the gpu engine"
669                    );
670
671                    return create_multiple_physical_plans(
672                        inputs.clone().as_slice(),
673                        &mut ir_plan.lp_arena,
674                        &mut ir_plan.expr_arena,
675                        BUILD_STREAMING_EXECUTOR,
676                    )?
677                    .execute()
678                    .map(QueryResult::Multiple);
679                }
680
681                let mut physical_plan = create_physical_plan(
682                    ir_plan.lp_top,
683                    &mut ir_plan.lp_arena,
684                    &mut ir_plan.expr_arena,
685                    BUILD_STREAMING_EXECUTOR,
686                )?;
687                let mut state = ExecutionState::new();
688                physical_plan.execute(&mut state).map(QueryResult::Single)
689            },
690            Engine::Auto => unreachable!(),
691        }
692    }
693
694    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
695        let sink_multiple = LazyFrame {
696            logical_plan: DslPlan::SinkMultiple { inputs: plans },
697            opt_state,
698            cached_arena: Default::default(),
699        };
700        sink_multiple.explain(true)
701    }
702
703    pub fn collect_all_with_engine(
704        plans: Vec<DslPlan>,
705        engine: Engine,
706        opt_state: OptFlags,
707    ) -> PolarsResult<Vec<DataFrame>> {
708        if plans.is_empty() {
709            return Ok(Vec::new());
710        }
711
712        LazyFrame {
713            logical_plan: DslPlan::SinkMultiple { inputs: plans },
714            opt_state,
715            cached_arena: Default::default(),
716        }
717        .collect_with_engine(engine)
718        .map(|r| r.unwrap_multiple())
719    }
720
721    /// Execute all the lazy operations and collect them into a [`DataFrame`].
722    ///
723    /// The query is optimized prior to execution.
724    ///
725    /// # Example
726    ///
727    /// ```rust
728    /// use polars_core::prelude::*;
729    /// use polars_lazy::prelude::*;
730    ///
731    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
732    ///     df.lazy()
733    ///       .group_by([col("foo")])
734    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
735    ///       .collect()
736    /// }
737    /// ```
738    pub fn collect(self) -> PolarsResult<DataFrame> {
739        self.collect_with_engine(Engine::Auto).map(|r| match r {
740            QueryResult::Single(df) => df,
741            // TODO: Should return query results
742            QueryResult::Multiple(_) => DataFrame::empty(),
743        })
744    }
745
746    /// Collect the query in batches.
747    ///
748    /// If lazy is true the query will not start until the first poll (or until
749    /// start is called on CollectBatches).
750    #[cfg(feature = "async")]
751    pub fn collect_batches(
752        self,
753        engine: Engine,
754        maintain_order: bool,
755        chunk_size: Option<NonZeroUsize>,
756        lazy: bool,
757    ) -> PolarsResult<CollectBatches> {
758        let (send, recv) = sync_channel(1);
759        let runner_send = send.clone();
760        let ldf = self.sink_batches(
761            PlanCallback::new(move |df| {
762                // Stop if receiver has closed.
763                let send_result = send.send(Ok(df));
764                Ok(send_result.is_err())
765            }),
766            maintain_order,
767            chunk_size,
768        )?;
769        let runner = move || {
770            // We use a tokio spawn_blocking here as it has a high blocking
771            // thread pool limit.
772
773            polars_core::runtime::ASYNC.spawn_blocking(move || {
774                if let Err(e) = ldf.collect_with_engine(engine) {
775                    runner_send.send(Err(e)).ok();
776                }
777            });
778        };
779
780        let mut collect_batches = CollectBatches {
781            recv,
782            runner: Some(Box::new(runner)),
783        };
784        if !lazy {
785            collect_batches.start();
786        }
787        Ok(collect_batches)
788    }
789
790    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
791    // This version does profiling of the node execution.
792    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
793    where
794        P: FnOnce(
795            Node,
796            &mut Arena<IR>,
797            &mut Arena<AExpr>,
798            Option<std::time::Duration>,
799        ) -> PolarsResult<()>,
800    {
801        let query_start = std::time::Instant::now();
802        let (mut state, mut physical_plan, _) =
803            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
804        state.time_nodes(query_start);
805        let out = physical_plan.execute(&mut state)?;
806        let timer_df = state.finish_timer()?;
807        Ok((out, timer_df))
808    }
809
810    /// Profile a LazyFrame.
811    ///
812    /// This will run the query and return a tuple
813    /// containing the materialized DataFrame and a DataFrame that contains profiling information
814    /// of each node that is executed.
815    ///
816    /// The units of the timings are microseconds.
817    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
818        self._profile_post_opt(|_, _, _, _| Ok(()))
819    }
820
821    pub fn sink_batches(
822        mut self,
823        function: PlanCallback<DataFrame, bool>,
824        maintain_order: bool,
825        chunk_size: Option<NonZeroUsize>,
826    ) -> PolarsResult<Self> {
827        use polars_plan::prelude::sink::CallbackSinkType;
828
829        polars_ensure!(
830            !matches!(self.logical_plan, DslPlan::Sink { .. }),
831            InvalidOperation: "cannot create a sink on top of another sink"
832        );
833
834        self.logical_plan = DslPlan::Sink {
835            input: Arc::new(self.logical_plan),
836            payload: SinkType::Callback(CallbackSinkType {
837                function,
838                maintain_order,
839                chunk_size,
840            }),
841        };
842
843        Ok(self)
844    }
845
846    /// Collect with the streaming engine. Returns `None` if the streaming engine panics with a todo!.
847    #[cfg(feature = "new_streaming")]
848    fn _collect_with_streaming_suppress_todo_panic(
849        mut self,
850    ) -> Option<PolarsResult<polars_core::query_result::QueryResult>> {
851        self.opt_state |= OptFlags::NEW_STREAMING;
852        let mut ir_plan = match self.to_alp_optimized() {
853            Ok(v) => v,
854            Err(e) => return Some(Err(e)),
855        };
856
857        ir_plan.ensure_root_node_is_sink();
858
859        let f = || {
860            polars_stream::run_query(
861                ir_plan.lp_top,
862                &mut ir_plan.lp_arena,
863                &mut ir_plan.expr_arena,
864            )
865        };
866
867        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
868            Ok(v) => Some(v),
869            Err(e) => {
870                // Fallback to normal engine if error is due to not being implemented
871                // and auto_new_streaming is set, otherwise propagate error.
872                if e.downcast_ref::<&str>()
873                    .is_some_and(|s| s.starts_with("not yet implemented"))
874                {
875                    if polars_core::config::verbose() {
876                        eprintln!(
877                            "caught unimplemented error in new streaming engine, falling back to normal engine"
878                        );
879                    }
880                    None
881                } else {
882                    std::panic::resume_unwind(e)
883                }
884            },
885        }
886    }
887
888    pub fn sink(
889        mut self,
890        sink_type: SinkDestination,
891        file_format: FileWriteFormat,
892        unified_sink_args: UnifiedSinkArgs,
893    ) -> PolarsResult<Self> {
894        polars_ensure!(
895            !matches!(self.logical_plan, DslPlan::Sink { .. }),
896            InvalidOperation: "cannot create a sink on top of another sink"
897        );
898
899        self.logical_plan = DslPlan::Sink {
900            input: Arc::new(self.logical_plan),
901            payload: match sink_type {
902                SinkDestination::File { target } => SinkType::File(FileSinkOptions {
903                    target,
904                    file_format,
905                    unified_sink_args,
906                }),
907                SinkDestination::Partitioned {
908                    base_path,
909                    file_path_provider,
910                    partition_strategy,
911                    max_rows_per_file,
912                    approximate_bytes_per_file,
913                } => SinkType::Partitioned(PartitionedSinkOptions {
914                    base_path,
915                    file_path_provider,
916                    partition_strategy,
917                    file_format,
918                    unified_sink_args,
919                    max_rows_per_file,
920                    approximate_bytes_per_file,
921                }),
922            },
923        };
924        Ok(self)
925    }
926
927    /// Filter frame rows that match a predicate expression.
928    ///
929    /// The expression must yield boolean values (note that rows where the
930    /// predicate resolves to `null` are *not* included in the resulting frame).
931    ///
932    /// # Example
933    ///
934    /// ```rust
935    /// use polars_core::prelude::*;
936    /// use polars_lazy::prelude::*;
937    ///
938    /// fn example(df: DataFrame) -> LazyFrame {
939    ///       df.lazy()
940    ///         .filter(col("sepal_width").is_not_null())
941    ///         .select([col("sepal_width"), col("sepal_length")])
942    /// }
943    /// ```
944    pub fn filter(self, predicate: Expr) -> Self {
945        let opt_state = self.get_opt_state();
946        let lp = self.get_plan_builder().filter(predicate).build();
947        Self::from_logical_plan(lp, opt_state)
948    }
949
950    /// Remove frame rows that match a predicate expression.
951    ///
952    /// The expression must yield boolean values (note that rows where the
953    /// predicate resolves to `null` are *not* removed from the resulting frame).
954    ///
955    /// # Example
956    ///
957    /// ```rust
958    /// use polars_core::prelude::*;
959    /// use polars_lazy::prelude::*;
960    ///
961    /// fn example(df: DataFrame) -> LazyFrame {
962    ///       df.lazy()
963    ///         .remove(col("sepal_width").is_null())
964    ///         .select([col("sepal_width"), col("sepal_length")])
965    /// }
966    /// ```
967    pub fn remove(self, predicate: Expr) -> Self {
968        self.filter(predicate.neq_missing(lit(true)))
969    }
970
971    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
972    ///
973    /// Columns can be selected with [`col`];
974    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
975    ///
976    /// # Example
977    ///
978    /// ```rust
979    /// use polars_core::prelude::*;
980    /// use polars_lazy::prelude::*;
981    ///
982    /// /// This function selects column "foo" and column "bar".
983    /// /// Column "bar" is renamed to "ham".
984    /// fn example(df: DataFrame) -> LazyFrame {
985    ///       df.lazy()
986    ///         .select([col("foo"),
987    ///                   col("bar").alias("ham")])
988    /// }
989    ///
990    /// /// This function selects all columns except "foo"
991    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
992    ///       df.lazy()
993    ///         .select([all().exclude_cols(["foo"]).as_expr()])
994    /// }
995    /// ```
996    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
997        let exprs = exprs.as_ref().to_vec();
998        self.select_impl(
999            exprs,
1000            ProjectionOptions {
1001                run_parallel: true,
1002                duplicate_check: true,
1003                should_broadcast: true,
1004            },
1005        )
1006    }
1007
1008    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1009        let exprs = exprs.as_ref().to_vec();
1010        self.select_impl(
1011            exprs,
1012            ProjectionOptions {
1013                run_parallel: false,
1014                duplicate_check: true,
1015                should_broadcast: true,
1016            },
1017        )
1018    }
1019
1020    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1021        let opt_state = self.get_opt_state();
1022        let lp = self.get_plan_builder().project(exprs, options).build();
1023        Self::from_logical_plan(lp, opt_state)
1024    }
1025
1026    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1027    ///
1028    /// Takes a list of expressions to group on.
1029    ///
1030    /// # Example
1031    ///
1032    /// ```rust
1033    /// use polars_core::prelude::*;
1034    /// use polars_lazy::prelude::*;
1035    ///
1036    /// fn example(df: DataFrame) -> LazyFrame {
1037    ///       df.lazy()
1038    ///        .group_by([col("date")])
1039    ///        .agg([
1040    ///            col("rain").min().alias("min_rain"),
1041    ///            col("rain").sum().alias("sum_rain"),
1042    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1043    ///        ])
1044    /// }
1045    /// ```
1046    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1047        let keys = by
1048            .as_ref()
1049            .iter()
1050            .map(|e| e.clone().into())
1051            .collect::<Vec<_>>();
1052        let opt_state = self.get_opt_state();
1053
1054        #[cfg(feature = "dynamic_group_by")]
1055        {
1056            LazyGroupBy {
1057                logical_plan: self.logical_plan,
1058                opt_state,
1059                keys,
1060                predicates: vec![],
1061                maintain_order: false,
1062                dynamic_options: None,
1063                rolling_options: None,
1064            }
1065        }
1066
1067        #[cfg(not(feature = "dynamic_group_by"))]
1068        {
1069            LazyGroupBy {
1070                logical_plan: self.logical_plan,
1071                opt_state,
1072                keys,
1073                predicates: vec![],
1074                maintain_order: false,
1075            }
1076        }
1077    }
1078
1079    /// Create rolling groups based on a time column.
1080    ///
1081    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1082    ///
1083    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1084    /// individual values and are not of constant intervals. For constant intervals use
1085    /// *group_by_dynamic*
1086    #[cfg(feature = "dynamic_group_by")]
1087    pub fn rolling<E: AsRef<[Expr]>>(
1088        mut self,
1089        index_column: Expr,
1090        group_by: E,
1091        mut options: RollingGroupOptions,
1092    ) -> LazyGroupBy {
1093        if let Expr::Column(name) = index_column {
1094            options.index_column = name;
1095        } else {
1096            let output_field = index_column
1097                .to_field(&self.collect_schema().unwrap())
1098                .unwrap();
1099            return self.with_column(index_column).rolling(
1100                Expr::Column(output_field.name().clone()),
1101                group_by,
1102                options,
1103            );
1104        }
1105        let opt_state = self.get_opt_state();
1106        LazyGroupBy {
1107            logical_plan: self.logical_plan,
1108            opt_state,
1109            predicates: vec![],
1110            keys: group_by.as_ref().to_vec(),
1111            maintain_order: true,
1112            dynamic_options: None,
1113            rolling_options: Some(options),
1114        }
1115    }
1116
1117    /// Group based on a time value (or index value of type Int32, Int64).
1118    ///
1119    /// Time windows are calculated and rows are assigned to windows. Different from a
1120    /// normal group_by is that a row can be member of multiple groups. The time/index
1121    /// window could be seen as a rolling window, with a window size determined by
1122    /// dates/times/values instead of slots in the DataFrame.
1123    ///
1124    /// A window is defined by:
1125    ///
1126    /// - every: interval of the window
1127    /// - period: length of the window
1128    /// - offset: offset of the window
1129    ///
1130    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1131    /// with a ordinary group_by on these keys.
1132    #[cfg(feature = "dynamic_group_by")]
1133    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1134        mut self,
1135        index_column: Expr,
1136        group_by: E,
1137        mut options: DynamicGroupOptions,
1138    ) -> LazyGroupBy {
1139        if let Expr::Column(name) = index_column {
1140            options.index_column = name;
1141        } else {
1142            let output_field = index_column
1143                .to_field(&self.collect_schema().unwrap())
1144                .unwrap();
1145            return self.with_column(index_column).group_by_dynamic(
1146                Expr::Column(output_field.name().clone()),
1147                group_by,
1148                options,
1149            );
1150        }
1151        let opt_state = self.get_opt_state();
1152        LazyGroupBy {
1153            logical_plan: self.logical_plan,
1154            opt_state,
1155            predicates: vec![],
1156            keys: group_by.as_ref().to_vec(),
1157            maintain_order: true,
1158            dynamic_options: Some(options),
1159            rolling_options: None,
1160        }
1161    }
1162
1163    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1164    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1165        let keys = by
1166            .as_ref()
1167            .iter()
1168            .map(|e| e.clone().into())
1169            .collect::<Vec<_>>();
1170        let opt_state = self.get_opt_state();
1171
1172        #[cfg(feature = "dynamic_group_by")]
1173        {
1174            LazyGroupBy {
1175                logical_plan: self.logical_plan,
1176                opt_state,
1177                keys,
1178                predicates: vec![],
1179                maintain_order: true,
1180                dynamic_options: None,
1181                rolling_options: None,
1182            }
1183        }
1184
1185        #[cfg(not(feature = "dynamic_group_by"))]
1186        {
1187            LazyGroupBy {
1188                logical_plan: self.logical_plan,
1189                opt_state,
1190                keys,
1191                predicates: vec![],
1192                maintain_order: true,
1193            }
1194        }
1195    }
1196
1197    /// Left anti join this query with another lazy query.
1198    ///
1199    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1200    /// flexible join logic, see [`join`](LazyFrame::join) or
1201    /// [`join_builder`](LazyFrame::join_builder).
1202    ///
1203    /// # Example
1204    ///
1205    /// ```rust
1206    /// use polars_core::prelude::*;
1207    /// use polars_lazy::prelude::*;
1208    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1209    ///         ldf
1210    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1211    /// }
1212    /// ```
1213    #[cfg(feature = "semi_anti_join")]
1214    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1215        self.join(
1216            other,
1217            [left_on.into()],
1218            [right_on.into()],
1219            JoinArgs::new(JoinType::Anti),
1220        )
1221    }
1222
1223    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1224    #[cfg(feature = "cross_join")]
1225    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1226        self.join(
1227            other,
1228            vec![],
1229            vec![],
1230            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1231        )
1232    }
1233
1234    /// Left outer join this query with another lazy query.
1235    ///
1236    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1237    /// flexible join logic, see [`join`](LazyFrame::join) or
1238    /// [`join_builder`](LazyFrame::join_builder).
1239    ///
1240    /// # Example
1241    ///
1242    /// ```rust
1243    /// use polars_core::prelude::*;
1244    /// use polars_lazy::prelude::*;
1245    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1246    ///         ldf
1247    ///         .left_join(other, col("foo"), col("bar"))
1248    /// }
1249    /// ```
1250    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1251        self.join(
1252            other,
1253            [left_on.into()],
1254            [right_on.into()],
1255            JoinArgs::new(JoinType::Left),
1256        )
1257    }
1258
1259    /// Inner join this query with another lazy query.
1260    ///
1261    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1262    /// flexible join logic, see [`join`](LazyFrame::join) or
1263    /// [`join_builder`](LazyFrame::join_builder).
1264    ///
1265    /// # Example
1266    ///
1267    /// ```rust
1268    /// use polars_core::prelude::*;
1269    /// use polars_lazy::prelude::*;
1270    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1271    ///         ldf
1272    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1273    /// }
1274    /// ```
1275    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1276        self.join(
1277            other,
1278            [left_on.into()],
1279            [right_on.into()],
1280            JoinArgs::new(JoinType::Inner),
1281        )
1282    }
1283
1284    /// Full outer join this query with another lazy query.
1285    ///
1286    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1287    /// flexible join logic, see [`join`](LazyFrame::join) or
1288    /// [`join_builder`](LazyFrame::join_builder).
1289    ///
1290    /// # Example
1291    ///
1292    /// ```rust
1293    /// use polars_core::prelude::*;
1294    /// use polars_lazy::prelude::*;
1295    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1296    ///         ldf
1297    ///         .full_join(other, col("foo"), col("bar"))
1298    /// }
1299    /// ```
1300    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1301        self.join(
1302            other,
1303            [left_on.into()],
1304            [right_on.into()],
1305            JoinArgs::new(JoinType::Full),
1306        )
1307    }
1308
1309    /// Left semi join this query with another lazy query.
1310    ///
1311    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1312    /// flexible join logic, see [`join`](LazyFrame::join) or
1313    /// [`join_builder`](LazyFrame::join_builder).
1314    ///
1315    /// # Example
1316    ///
1317    /// ```rust
1318    /// use polars_core::prelude::*;
1319    /// use polars_lazy::prelude::*;
1320    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1321    ///         ldf
1322    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1323    /// }
1324    /// ```
1325    #[cfg(feature = "semi_anti_join")]
1326    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1327        self.join(
1328            other,
1329            [left_on.into()],
1330            [right_on.into()],
1331            JoinArgs::new(JoinType::Semi),
1332        )
1333    }
1334
1335    /// Generic function to join two LazyFrames.
1336    ///
1337    /// `join` can join on multiple columns, given as two list of expressions, and with a
1338    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1339    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1340    /// over how columns are renamed and parallelization options, use
1341    /// [`join_builder`](LazyFrame::join_builder).
1342    ///
1343    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1344    ///
1345    /// # Example
1346    ///
1347    /// ```rust
1348    /// use polars_core::prelude::*;
1349    /// use polars_lazy::prelude::*;
1350    ///
1351    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1352    ///         ldf
1353    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1354    /// }
1355    /// ```
1356    pub fn join<E: AsRef<[Expr]>>(
1357        self,
1358        other: LazyFrame,
1359        left_on: E,
1360        right_on: E,
1361        args: JoinArgs,
1362    ) -> LazyFrame {
1363        let left_on = left_on.as_ref().to_vec();
1364        let right_on = right_on.as_ref().to_vec();
1365
1366        self._join_impl(other, left_on, right_on, args)
1367    }
1368
1369    fn _join_impl(
1370        self,
1371        other: LazyFrame,
1372        left_on: Vec<Expr>,
1373        right_on: Vec<Expr>,
1374        args: JoinArgs,
1375    ) -> LazyFrame {
1376        let JoinArgs {
1377            how,
1378            validation,
1379            suffix,
1380            slice,
1381            nulls_equal,
1382            coalesce,
1383            maintain_order,
1384            build_side,
1385        } = args;
1386
1387        if slice.is_some() {
1388            panic!("impl error: slice is not handled")
1389        }
1390
1391        let mut builder = self
1392            .join_builder()
1393            .with(other)
1394            .left_on(left_on)
1395            .right_on(right_on)
1396            .how(how)
1397            .validate(validation)
1398            .join_nulls(nulls_equal)
1399            .coalesce(coalesce)
1400            .maintain_order(maintain_order)
1401            .build_side(build_side);
1402
1403        if let Some(suffix) = suffix {
1404            builder = builder.suffix(suffix);
1405        }
1406
1407        // Note: args.slice is set by the optimizer
1408        builder.finish()
1409    }
1410
1411    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1412    ///
1413    /// After the `JoinBuilder` has been created and set up, calling
1414    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1415    /// representing the `join` operation.
1416    pub fn join_builder(self) -> JoinBuilder {
1417        JoinBuilder::new(self)
1418    }
1419
1420    /// Gathers rows from this DataFrame based on the indices in idxs.
1421    ///
1422    /// idxs must only have a single column of indices.
1423    pub fn gather(self, idxs: LazyFrame, null_on_oob: bool) -> LazyFrame {
1424        let opt_state = self.get_opt_state();
1425        let lp = self
1426            .get_plan_builder()
1427            .gather(idxs.logical_plan, null_on_oob)
1428            .build();
1429        Self::from_logical_plan(lp, opt_state)
1430    }
1431
1432    /// Add or replace a column, given as an expression, to a DataFrame.
1433    ///
1434    /// # Example
1435    ///
1436    /// ```rust
1437    /// use polars_core::prelude::*;
1438    /// use polars_lazy::prelude::*;
1439    /// fn add_column(df: DataFrame) -> LazyFrame {
1440    ///     df.lazy()
1441    ///         .with_column(
1442    ///             when(col("sepal_length").lt(lit(5.0)))
1443    ///             .then(lit(10))
1444    ///             .otherwise(lit(1))
1445    ///             .alias("new_column_name"),
1446    ///         )
1447    /// }
1448    /// ```
1449    pub fn with_column(self, expr: Expr) -> LazyFrame {
1450        let opt_state = self.get_opt_state();
1451        let lp = self
1452            .get_plan_builder()
1453            .with_columns(
1454                vec![expr],
1455                ProjectionOptions {
1456                    run_parallel: false,
1457                    duplicate_check: true,
1458                    should_broadcast: true,
1459                },
1460            )
1461            .build();
1462        Self::from_logical_plan(lp, opt_state)
1463    }
1464
1465    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1466    ///
1467    /// # Example
1468    ///
1469    /// ```rust
1470    /// use polars_core::prelude::*;
1471    /// use polars_lazy::prelude::*;
1472    /// fn add_columns(df: DataFrame) -> LazyFrame {
1473    ///     df.lazy()
1474    ///         .with_columns(
1475    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1476    ///          )
1477    /// }
1478    /// ```
1479    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1480        let exprs = exprs.as_ref().to_vec();
1481        self.with_columns_impl(
1482            exprs,
1483            ProjectionOptions {
1484                run_parallel: true,
1485                duplicate_check: true,
1486                should_broadcast: true,
1487            },
1488        )
1489    }
1490
1491    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1492    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1493        let exprs = exprs.as_ref().to_vec();
1494        self.with_columns_impl(
1495            exprs,
1496            ProjectionOptions {
1497                run_parallel: false,
1498                duplicate_check: true,
1499                should_broadcast: true,
1500            },
1501        )
1502    }
1503
1504    /// Match or evolve to a certain schema.
1505    pub fn match_to_schema(
1506        self,
1507        schema: SchemaRef,
1508        per_column: Arc<[MatchToSchemaPerColumn]>,
1509        extra_columns: ExtraColumnsPolicy,
1510    ) -> LazyFrame {
1511        let opt_state = self.get_opt_state();
1512        let lp = self
1513            .get_plan_builder()
1514            .match_to_schema(schema, per_column, extra_columns)
1515            .build();
1516        Self::from_logical_plan(lp, opt_state)
1517    }
1518
1519    pub fn pipe_with_schema(
1520        self,
1521        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1522    ) -> Self {
1523        let opt_state = self.get_opt_state();
1524        let lp = self
1525            .get_plan_builder()
1526            .pipe_with_schema(vec![], callback)
1527            .build();
1528        Self::from_logical_plan(lp, opt_state)
1529    }
1530
1531    pub fn pipe_with_schemas(
1532        self,
1533        others: Vec<LazyFrame>,
1534        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1535    ) -> Self {
1536        let opt_state = self.get_opt_state();
1537        let lp = self
1538            .get_plan_builder()
1539            .pipe_with_schema(
1540                others.into_iter().map(|lf| lf.logical_plan).collect(),
1541                callback,
1542            )
1543            .build();
1544        Self::from_logical_plan(lp, opt_state)
1545    }
1546
1547    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1548        let opt_state = self.get_opt_state();
1549        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1550        Self::from_logical_plan(lp, opt_state)
1551    }
1552
1553    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1554        let contexts = contexts
1555            .as_ref()
1556            .iter()
1557            .map(|lf| lf.logical_plan.clone())
1558            .collect();
1559        let opt_state = self.get_opt_state();
1560        let lp = self.get_plan_builder().with_context(contexts).build();
1561        Self::from_logical_plan(lp, opt_state)
1562    }
1563
1564    /// Aggregate all the columns as their maximum values.
1565    ///
1566    /// Aggregated columns will have the same names as the original columns.
1567    pub fn max(self) -> Self {
1568        self.map_private(DslFunction::Stats(StatsFunction::Max))
1569    }
1570
1571    /// Aggregate all the columns as their minimum values.
1572    ///
1573    /// Aggregated columns will have the same names as the original columns.
1574    pub fn min(self) -> Self {
1575        self.map_private(DslFunction::Stats(StatsFunction::Min))
1576    }
1577
1578    /// Aggregate all the columns as their sum values.
1579    ///
1580    /// Aggregated columns will have the same names as the original columns.
1581    ///
1582    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1583    /// - For integer columns, the ordinary checks for overflow are performed:
1584    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1585    ///   silently wrap.
1586    /// - String columns will sum to None.
1587    pub fn sum(self) -> Self {
1588        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1589    }
1590
1591    /// Aggregate all the columns as their mean values.
1592    ///
1593    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1594    /// - String columns will have a mean of None.
1595    pub fn mean(self) -> Self {
1596        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1597    }
1598
1599    /// Aggregate all the columns as their median values.
1600    ///
1601    /// - Boolean and integer results are converted to `f64`. However, they are still
1602    ///   susceptible to overflow before this conversion occurs.
1603    /// - String columns will sum to None.
1604    pub fn median(self) -> Self {
1605        self.map_private(DslFunction::Stats(StatsFunction::Median))
1606    }
1607
1608    /// Aggregate all the columns as their quantile values.
1609    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1610        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1611            quantile,
1612            method,
1613        }))
1614    }
1615
1616    /// Aggregate all the columns as their standard deviation values.
1617    ///
1618    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1619    /// computing the variance, where `N` is the number of rows.
1620    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1621    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1622    /// > likelihood estimate of the variance for normally distributed variables. The
1623    /// > standard deviation computed in this function is the square root of the estimated
1624    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1625    /// > standard deviation per se.
1626    ///
1627    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1628    pub fn std(self, ddof: u8) -> Self {
1629        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1630    }
1631
1632    /// Aggregate all the columns as their variance values.
1633    ///
1634    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1635    /// computing the variance, where `N` is the number of rows.
1636    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1637    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1638    /// > likelihood estimate of the variance for normally distributed variables.
1639    ///
1640    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1641    pub fn var(self, ddof: u8) -> Self {
1642        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1643    }
1644
1645    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1646    pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1647        self.explode_impl(columns, options, false)
1648    }
1649
1650    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1651    fn explode_impl(
1652        self,
1653        columns: Selector,
1654        options: ExplodeOptions,
1655        allow_empty: bool,
1656    ) -> LazyFrame {
1657        let opt_state = self.get_opt_state();
1658        let lp = self
1659            .get_plan_builder()
1660            .explode(columns, options, allow_empty)
1661            .build();
1662        Self::from_logical_plan(lp, opt_state)
1663    }
1664
1665    /// Aggregate all the columns as the sum of their null value count.
1666    pub fn null_count(self) -> LazyFrame {
1667        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1668    }
1669
1670    /// Drop non-unique rows and maintain the order of kept rows.
1671    ///
1672    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1673    /// `None`, all columns are considered.
1674    pub fn unique_stable(
1675        self,
1676        subset: Option<Selector>,
1677        keep_strategy: UniqueKeepStrategy,
1678    ) -> LazyFrame {
1679        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1680        self.unique_stable_generic(subset, keep_strategy)
1681    }
1682
1683    pub fn unique_stable_generic(
1684        self,
1685        subset: Option<Vec<Expr>>,
1686        keep_strategy: UniqueKeepStrategy,
1687    ) -> LazyFrame {
1688        let opt_state = self.get_opt_state();
1689        let options = DistinctOptionsDSL {
1690            subset,
1691            maintain_order: true,
1692            keep_strategy,
1693        };
1694        let lp = self.get_plan_builder().distinct(options).build();
1695        Self::from_logical_plan(lp, opt_state)
1696    }
1697
1698    /// Drop non-unique rows without maintaining the order of kept rows.
1699    ///
1700    /// The order of the kept rows may change; to maintain the original row order, use
1701    /// [`unique_stable`](LazyFrame::unique_stable).
1702    ///
1703    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1704    /// all columns are considered.
1705    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1706        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1707        self.unique_generic(subset, keep_strategy)
1708    }
1709
1710    pub fn unique_generic(
1711        self,
1712        subset: Option<Vec<Expr>>,
1713        keep_strategy: UniqueKeepStrategy,
1714    ) -> LazyFrame {
1715        let opt_state = self.get_opt_state();
1716        let options = DistinctOptionsDSL {
1717            subset,
1718            maintain_order: false,
1719            keep_strategy,
1720        };
1721        let lp = self.get_plan_builder().distinct(options).build();
1722        Self::from_logical_plan(lp, opt_state)
1723    }
1724
1725    /// Drop rows containing one or more NaN values.
1726    ///
1727    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1728    /// floating point columns are considered.
1729    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1730        let opt_state = self.get_opt_state();
1731        let lp = self.get_plan_builder().drop_nans(subset).build();
1732        Self::from_logical_plan(lp, opt_state)
1733    }
1734
1735    /// Drop rows containing one or more None values.
1736    ///
1737    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1738    /// columns are considered.
1739    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1740        let opt_state = self.get_opt_state();
1741        let lp = self.get_plan_builder().drop_nulls(subset).build();
1742        Self::from_logical_plan(lp, opt_state)
1743    }
1744
1745    /// Slice the DataFrame using an offset (starting row) and a length.
1746    ///
1747    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1748    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1749    /// end.
1750    ///
1751    /// If `offset` and `len` are such that the slice extends beyond the end of the
1752    /// DataFrame, the portion between `offset` and the end will be returned. In this
1753    /// case, the number of rows in the returned DataFrame will be less than `len`.
1754    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1755        let opt_state = self.get_opt_state();
1756        let lp = self.get_plan_builder().slice(offset, len).build();
1757        Self::from_logical_plan(lp, opt_state)
1758    }
1759
1760    /// Remove all the rows of the LazyFrame.
1761    pub fn clear(self) -> LazyFrame {
1762        self.slice(0, 0)
1763    }
1764
1765    /// Get the first row.
1766    ///
1767    /// Equivalent to `self.slice(0, 1)`.
1768    pub fn first(self) -> LazyFrame {
1769        self.slice(0, 1)
1770    }
1771
1772    /// Get the last row.
1773    ///
1774    /// Equivalent to `self.slice(-1, 1)`.
1775    pub fn last(self) -> LazyFrame {
1776        self.slice(-1, 1)
1777    }
1778
1779    /// Get the last `n` rows.
1780    ///
1781    /// Equivalent to `self.slice(-(n as i64), n)`.
1782    pub fn tail(self, n: IdxSize) -> LazyFrame {
1783        let neg_tail = -(n as i64);
1784        self.slice(neg_tail, n)
1785    }
1786
1787    #[cfg(feature = "pivot")]
1788    #[expect(clippy::too_many_arguments)]
1789    pub fn pivot(
1790        self,
1791        on: Selector,
1792        on_columns: Arc<DataFrame>,
1793        index: Selector,
1794        values: Selector,
1795        agg: Expr,
1796        maintain_order: bool,
1797        separator: PlSmallStr,
1798        column_naming: PivotColumnNaming,
1799    ) -> LazyFrame {
1800        let opt_state = self.get_opt_state();
1801        let lp = self
1802            .get_plan_builder()
1803            .pivot(
1804                on,
1805                on_columns,
1806                index,
1807                values,
1808                agg,
1809                maintain_order,
1810                separator,
1811                column_naming,
1812            )
1813            .build();
1814        Self::from_logical_plan(lp, opt_state)
1815    }
1816
1817    /// Unpivot the DataFrame from wide to long format.
1818    ///
1819    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1820    #[cfg(feature = "pivot")]
1821    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1822        let opt_state = self.get_opt_state();
1823        let lp = self.get_plan_builder().unpivot(args).build();
1824        Self::from_logical_plan(lp, opt_state)
1825    }
1826
1827    /// Limit the DataFrame to the first `n` rows.
1828    pub fn limit(self, n: IdxSize) -> LazyFrame {
1829        self.slice(0, n)
1830    }
1831
1832    /// Apply a function/closure once the logical plan get executed.
1833    ///
1834    /// The function has access to the whole materialized DataFrame at the time it is
1835    /// called.
1836    ///
1837    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1838    /// with `LazyFrame::with_column` or `with_columns`.
1839    ///
1840    /// ## Warning
1841    /// This can blow up in your face if the schema is changed due to the operation. The
1842    /// optimizer relies on a correct schema.
1843    ///
1844    /// You can toggle certain optimizations off.
1845    pub fn map<F>(
1846        self,
1847        function: F,
1848        optimizations: AllowedOptimizations,
1849        schema: Option<Arc<dyn UdfSchema>>,
1850        name: Option<&'static str>,
1851    ) -> LazyFrame
1852    where
1853        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1854    {
1855        let opt_state = self.get_opt_state();
1856        let lp = self
1857            .get_plan_builder()
1858            .map(
1859                function,
1860                optimizations,
1861                schema,
1862                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1863            )
1864            .build();
1865        Self::from_logical_plan(lp, opt_state)
1866    }
1867
1868    #[cfg(feature = "python")]
1869    pub fn map_python(
1870        self,
1871        function: polars_utils::python_function::PythonFunction,
1872        optimizations: AllowedOptimizations,
1873        schema: Option<SchemaRef>,
1874        validate_output: bool,
1875    ) -> LazyFrame {
1876        let opt_state = self.get_opt_state();
1877        let lp = self
1878            .get_plan_builder()
1879            .map_python(function, optimizations, schema, validate_output)
1880            .build();
1881        Self::from_logical_plan(lp, opt_state)
1882    }
1883
1884    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1885        let opt_state = self.get_opt_state();
1886        let lp = self.get_plan_builder().map_private(function).build();
1887        Self::from_logical_plan(lp, opt_state)
1888    }
1889
1890    /// Add a new column at index 0 that counts the rows.
1891    ///
1892    /// `name` is the name of the new column. `offset` is where to start counting from; if
1893    /// `None`, it is set to `0`.
1894    ///
1895    /// # Warning
1896    /// This can have a negative effect on query performance. This may for instance block
1897    /// predicate pushdown optimization.
1898    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1899    where
1900        S: Into<PlSmallStr>,
1901    {
1902        let name = name.into();
1903
1904        match &self.logical_plan {
1905            v @ DslPlan::Scan {
1906                scan_type,
1907                unified_scan_args,
1908                ..
1909            } if unified_scan_args.row_index.is_none()
1910                && !matches!(
1911                    &**scan_type,
1912                    FileScanDsl::Anonymous { .. } | FileScanDsl::ExpandedPaths { .. }
1913                ) =>
1914            {
1915                let DslPlan::Scan {
1916                    sources,
1917                    mut unified_scan_args,
1918                    scan_type,
1919                    cached_ir: _,
1920                } = v.clone()
1921                else {
1922                    unreachable!()
1923                };
1924
1925                unified_scan_args.row_index = Some(RowIndex {
1926                    name,
1927                    offset: offset.unwrap_or(0),
1928                });
1929
1930                DslPlan::Scan {
1931                    sources,
1932                    unified_scan_args,
1933                    scan_type,
1934                    cached_ir: Default::default(),
1935                }
1936                .into()
1937            },
1938            _ => self.map_private(DslFunction::RowIndex { name, offset }),
1939        }
1940    }
1941
1942    /// Return the number of non-null elements for each column.
1943    pub fn count(self) -> LazyFrame {
1944        self.select(vec![col(PlSmallStr::from_static("*")).count()])
1945    }
1946
1947    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
1948    /// inserted as columns.
1949    #[cfg(feature = "dtype-struct")]
1950    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1951        self.map_private(DslFunction::Unnest {
1952            columns: cols,
1953            separator,
1954        })
1955    }
1956
1957    #[cfg(feature = "merge_sorted")]
1958    pub fn merge_sorted<S>(
1959        self,
1960        other: LazyFrame,
1961        key: S,
1962        maintain_order: bool,
1963    ) -> PolarsResult<LazyFrame>
1964    where
1965        S: Into<PlSmallStr>,
1966    {
1967        let key = key.into();
1968
1969        let lp = DslPlan::MergeSorted {
1970            input_left: Arc::new(self.logical_plan),
1971            input_right: Arc::new(other.logical_plan),
1972            key,
1973            maintain_order,
1974        };
1975        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1976    }
1977
1978    pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1979        let lp = DslPlan::MapFunction {
1980            input: Arc::new(self.logical_plan),
1981            function: DslFunction::Hint(hint),
1982        };
1983        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1984    }
1985}
1986
1987/// Utility struct for lazy group_by operation.
1988#[derive(Clone)]
1989pub struct LazyGroupBy {
1990    pub logical_plan: DslPlan,
1991    opt_state: OptFlags,
1992    keys: Vec<Expr>,
1993    predicates: Vec<Expr>,
1994    maintain_order: bool,
1995    #[cfg(feature = "dynamic_group_by")]
1996    dynamic_options: Option<DynamicGroupOptions>,
1997    #[cfg(feature = "dynamic_group_by")]
1998    rolling_options: Option<RollingGroupOptions>,
1999}
2000
2001impl From<LazyGroupBy> for LazyFrame {
2002    fn from(lgb: LazyGroupBy) -> Self {
2003        Self {
2004            logical_plan: lgb.logical_plan,
2005            opt_state: lgb.opt_state,
2006            cached_arena: Default::default(),
2007        }
2008    }
2009}
2010
2011impl LazyGroupBy {
2012    /// Filter groups with a predicate after aggregation.
2013    ///
2014    /// Similarly to the [LazyGroupBy::agg] method, the predicate must run an aggregation as it
2015    /// is evaluated on the groups.
2016    /// This method can be chained in which case all predicates must evaluate to `true` for a
2017    /// group to be kept.
2018    ///
2019    /// # Example
2020    ///
2021    /// ```rust
2022    /// use polars_core::prelude::*;
2023    /// use polars_lazy::prelude::*;
2024    ///
2025    /// fn example(df: DataFrame) -> LazyFrame {
2026    ///       df.lazy()
2027    ///        .group_by_stable([col("date")])
2028    ///        .having(col("rain").sum().gt(lit(10)))
2029    ///        .agg([col("rain").min().alias("min_rain")])
2030    /// }
2031    /// ```
2032    pub fn having(mut self, predicate: Expr) -> Self {
2033        self.predicates.push(predicate);
2034        self
2035    }
2036
2037    /// Group by and aggregate.
2038    ///
2039    /// Select a column with [col] and choose an aggregation.
2040    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2041    ///
2042    /// # Example
2043    ///
2044    /// ```rust
2045    /// use polars_core::prelude::*;
2046    /// use polars_lazy::prelude::*;
2047    ///
2048    /// fn example(df: DataFrame) -> LazyFrame {
2049    ///       df.lazy()
2050    ///        .group_by_stable([col("date")])
2051    ///        .agg([
2052    ///            col("rain").min().alias("min_rain"),
2053    ///            col("rain").sum().alias("sum_rain"),
2054    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2055    ///        ])
2056    /// }
2057    /// ```
2058    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2059        #[cfg(feature = "dynamic_group_by")]
2060        let lp = DslBuilder::from(self.logical_plan)
2061            .group_by(
2062                self.keys,
2063                self.predicates,
2064                aggs,
2065                None,
2066                self.maintain_order,
2067                self.dynamic_options,
2068                self.rolling_options,
2069            )
2070            .build();
2071
2072        #[cfg(not(feature = "dynamic_group_by"))]
2073        let lp = DslBuilder::from(self.logical_plan)
2074            .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2075            .build();
2076        LazyFrame::from_logical_plan(lp, self.opt_state)
2077    }
2078
2079    /// Return first n rows of each group
2080    pub fn head(self, n: Option<usize>) -> LazyFrame {
2081        let keys = self
2082            .keys
2083            .iter()
2084            .filter_map(|expr| expr_output_name(expr).ok())
2085            .collect::<Vec<_>>();
2086
2087        self.agg([all().as_expr().head(n)]).explode_impl(
2088            all() - by_name(keys.iter().cloned(), false, false),
2089            ExplodeOptions {
2090                empty_as_null: true,
2091                keep_nulls: true,
2092            },
2093            true,
2094        )
2095    }
2096
2097    /// Return last n rows of each group
2098    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2099        let keys = self
2100            .keys
2101            .iter()
2102            .filter_map(|expr| expr_output_name(expr).ok())
2103            .collect::<Vec<_>>();
2104
2105        self.agg([all().as_expr().tail(n)]).explode_impl(
2106            all() - by_name(keys.iter().cloned(), false, false),
2107            ExplodeOptions {
2108                empty_as_null: true,
2109                keep_nulls: true,
2110            },
2111            true,
2112        )
2113    }
2114
2115    /// Apply a function over the groups as a new DataFrame.
2116    ///
2117    /// **It is not recommended that you use this as materializing the DataFrame is very
2118    /// expensive.**
2119    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2120        if !self.predicates.is_empty() {
2121            panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2122        }
2123
2124        #[cfg(feature = "dynamic_group_by")]
2125        let options = GroupbyOptions {
2126            dynamic: self.dynamic_options,
2127            rolling: self.rolling_options,
2128            slice: None,
2129        };
2130
2131        #[cfg(not(feature = "dynamic_group_by"))]
2132        let options = GroupbyOptions { slice: None };
2133
2134        let lp = DslPlan::GroupBy {
2135            input: Arc::new(self.logical_plan),
2136            keys: self.keys,
2137            predicates: vec![],
2138            aggs: vec![],
2139            apply: Some((f, schema)),
2140            maintain_order: self.maintain_order,
2141            options: Arc::new(options),
2142        };
2143        LazyFrame::from_logical_plan(lp, self.opt_state)
2144    }
2145}
2146
2147#[must_use]
2148pub struct JoinBuilder {
2149    lf: LazyFrame,
2150    how: JoinType,
2151    other: Option<LazyFrame>,
2152    left_on: Vec<Expr>,
2153    right_on: Vec<Expr>,
2154    allow_parallel: bool,
2155    force_parallel: bool,
2156    suffix: Option<PlSmallStr>,
2157    validation: JoinValidation,
2158    nulls_equal: bool,
2159    coalesce: JoinCoalesce,
2160    maintain_order: MaintainOrderJoin,
2161    build_side: Option<JoinBuildSide>,
2162}
2163impl JoinBuilder {
2164    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2165    pub fn new(lf: LazyFrame) -> Self {
2166        Self {
2167            lf,
2168            other: None,
2169            how: JoinType::Inner,
2170            left_on: vec![],
2171            right_on: vec![],
2172            allow_parallel: true,
2173            force_parallel: false,
2174            suffix: None,
2175            validation: Default::default(),
2176            nulls_equal: false,
2177            coalesce: Default::default(),
2178            maintain_order: Default::default(),
2179            build_side: None,
2180        }
2181    }
2182
2183    /// The right table in the join.
2184    pub fn with(mut self, other: LazyFrame) -> Self {
2185        self.other = Some(other);
2186        self
2187    }
2188
2189    /// Select the join type.
2190    pub fn how(mut self, how: JoinType) -> Self {
2191        self.how = how;
2192        self
2193    }
2194
2195    pub fn validate(mut self, validation: JoinValidation) -> Self {
2196        self.validation = validation;
2197        self
2198    }
2199
2200    /// The expressions you want to join both tables on.
2201    ///
2202    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2203    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2204        let on = on.as_ref().to_vec();
2205        self.left_on.clone_from(&on);
2206        self.right_on = on;
2207        self
2208    }
2209
2210    /// The expressions you want to join the left table on.
2211    ///
2212    /// The passed expressions must be valid in the left table.
2213    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2214        self.left_on = on.as_ref().to_vec();
2215        self
2216    }
2217
2218    /// The expressions you want to join the right table on.
2219    ///
2220    /// The passed expressions must be valid in the right table.
2221    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2222        self.right_on = on.as_ref().to_vec();
2223        self
2224    }
2225
2226    /// Allow parallel table evaluation.
2227    pub fn allow_parallel(mut self, allow: bool) -> Self {
2228        self.allow_parallel = allow;
2229        self
2230    }
2231
2232    /// Force parallel table evaluation.
2233    pub fn force_parallel(mut self, force: bool) -> Self {
2234        self.force_parallel = force;
2235        self
2236    }
2237
2238    /// Join on null values. By default null values will never produce matches.
2239    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2240        self.nulls_equal = nulls_equal;
2241        self
2242    }
2243
2244    /// Suffix to add duplicate column names in join.
2245    /// Defaults to `"_right"` if this method is never called.
2246    pub fn suffix<S>(mut self, suffix: S) -> Self
2247    where
2248        S: Into<PlSmallStr>,
2249    {
2250        self.suffix = Some(suffix.into());
2251        self
2252    }
2253
2254    /// Whether to coalesce join columns.
2255    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2256        self.coalesce = coalesce;
2257        self
2258    }
2259
2260    /// Whether to preserve the row order.
2261    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2262        self.maintain_order = maintain_order;
2263        self
2264    }
2265
2266    /// Whether to prefer a specific build side.
2267    pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2268        self.build_side = build_side;
2269        self
2270    }
2271
2272    /// Finish builder
2273    pub fn finish(self) -> LazyFrame {
2274        let opt_state = self.lf.opt_state;
2275        let other = self.other.expect("'with' not set in join builder");
2276
2277        let args = JoinArgs {
2278            how: self.how,
2279            validation: self.validation,
2280            suffix: self.suffix,
2281            slice: None,
2282            nulls_equal: self.nulls_equal,
2283            coalesce: self.coalesce,
2284            maintain_order: self.maintain_order,
2285            build_side: self.build_side,
2286        };
2287
2288        let lp = self
2289            .lf
2290            .get_plan_builder()
2291            .join(
2292                other.logical_plan,
2293                self.left_on,
2294                self.right_on,
2295                JoinOptions {
2296                    allow_parallel: self.allow_parallel,
2297                    force_parallel: self.force_parallel,
2298                    args,
2299                }
2300                .into(),
2301            )
2302            .build();
2303        LazyFrame::from_logical_plan(lp, opt_state)
2304    }
2305
2306    // Finish with join predicates
2307    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2308        let opt_state = self.lf.opt_state;
2309        let other = self.other.expect("with not set");
2310
2311        // Decompose `And` conjunctions into their component expressions
2312        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2313            if let Expr::BinaryExpr {
2314                op: Operator::And,
2315                left,
2316                right,
2317            } = predicate
2318            {
2319                decompose_and((*left).clone(), expanded_predicates);
2320                decompose_and((*right).clone(), expanded_predicates);
2321            } else {
2322                expanded_predicates.push(predicate);
2323            }
2324        }
2325        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2326        for predicate in predicates {
2327            decompose_and(predicate, &mut expanded_predicates);
2328        }
2329        let predicates: Vec<Expr> = expanded_predicates;
2330
2331        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2332        #[cfg(feature = "is_between")]
2333        let predicates: Vec<Expr> = {
2334            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2335            for predicate in predicates {
2336                if let Expr::Function {
2337                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2338                    input,
2339                    ..
2340                } = &predicate
2341                {
2342                    if let [expr, lower, upper] = input.as_slice() {
2343                        match closed {
2344                            ClosedInterval::Both => {
2345                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2346                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2347                            },
2348                            ClosedInterval::Right => {
2349                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2350                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2351                            },
2352                            ClosedInterval::Left => {
2353                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2354                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2355                            },
2356                            ClosedInterval::None => {
2357                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2358                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2359                            },
2360                        }
2361                        continue;
2362                    }
2363                }
2364                expanded_predicates.push(predicate);
2365            }
2366            expanded_predicates
2367        };
2368
2369        let args = JoinArgs {
2370            how: self.how,
2371            validation: self.validation,
2372            suffix: self.suffix,
2373            slice: None,
2374            nulls_equal: self.nulls_equal,
2375            coalesce: self.coalesce,
2376            maintain_order: self.maintain_order,
2377            build_side: self.build_side,
2378        };
2379        let options = JoinOptions {
2380            allow_parallel: self.allow_parallel,
2381            force_parallel: self.force_parallel,
2382            args,
2383        };
2384
2385        let lp = DslPlan::Join {
2386            input_left: Arc::new(self.lf.logical_plan),
2387            input_right: Arc::new(other.logical_plan),
2388            left_on: Default::default(),
2389            right_on: Default::default(),
2390            predicates,
2391            options: Arc::from(options),
2392        };
2393
2394        LazyFrame::from_logical_plan(lp, opt_state)
2395    }
2396}
2397
2398pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2399    #[cfg(not(feature = "new_streaming"))]
2400    {
2401        None
2402    }
2403    #[cfg(feature = "new_streaming")]
2404    {
2405        Some(polars_stream::build_streaming_query_executor)
2406    }
2407};
2408
2409pub struct CollectBatches {
2410    recv: Receiver<PolarsResult<DataFrame>>,
2411    runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2412}
2413
2414impl CollectBatches {
2415    /// Start running the query, if not already.
2416    pub fn start(&mut self) {
2417        if let Some(runner) = self.runner.take() {
2418            runner()
2419        }
2420    }
2421}
2422
2423impl Iterator for CollectBatches {
2424    type Item = PolarsResult<DataFrame>;
2425
2426    fn next(&mut self) -> Option<Self::Item> {
2427        self.start();
2428        self.recv.recv().ok()
2429    }
2430}