Skip to main content

polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::error::feature_gated;
26#[cfg(feature = "pivot")]
27use polars_core::frame::PivotColumnNaming;
28use polars_core::prelude::*;
29use polars_core::query_result::QueryResult;
30use polars_io::RowIndex;
31use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38
39use crate::frame::cached_arenas::CachedArena;
40use crate::prelude::*;
41
42pub trait IntoLazy {
43    fn lazy(self) -> LazyFrame;
44}
45
46impl IntoLazy for DataFrame {
47    /// Convert the `DataFrame` into a `LazyFrame`
48    fn lazy(self) -> LazyFrame {
49        let lp = DslBuilder::from_existing_df(self).build();
50        LazyFrame {
51            logical_plan: lp,
52            opt_state: Default::default(),
53            cached_arena: Default::default(),
54        }
55    }
56}
57
58impl IntoLazy for LazyFrame {
59    fn lazy(self) -> LazyFrame {
60        self
61    }
62}
63
64/// Lazy abstraction over an eager `DataFrame`.
65///
66/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
67/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
68#[derive(Clone, Default)]
69#[must_use]
70pub struct LazyFrame {
71    pub logical_plan: DslPlan,
72    pub(crate) opt_state: OptFlags,
73    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
74}
75
76impl From<DslPlan> for LazyFrame {
77    fn from(plan: DslPlan) -> Self {
78        Self {
79            logical_plan: plan,
80            opt_state: OptFlags::default(),
81            cached_arena: Default::default(),
82        }
83    }
84}
85
86impl LazyFrame {
87    pub(crate) fn from_inner(
88        logical_plan: DslPlan,
89        opt_state: OptFlags,
90        cached_arena: Arc<Mutex<Option<CachedArena>>>,
91    ) -> Self {
92        Self {
93            logical_plan,
94            opt_state,
95            cached_arena,
96        }
97    }
98
99    pub(crate) fn get_plan_builder(self) -> DslBuilder {
100        DslBuilder::from(self.logical_plan)
101    }
102
103    fn get_opt_state(&self) -> OptFlags {
104        self.opt_state
105    }
106
107    pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
108        LazyFrame {
109            logical_plan,
110            opt_state,
111            cached_arena: Default::default(),
112        }
113    }
114
115    /// Get current optimizations.
116    pub fn get_current_optimizations(&self) -> OptFlags {
117        self.opt_state
118    }
119
120    /// Set allowed optimizations.
121    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
122        self.opt_state = opt_state;
123        self
124    }
125
126    /// Turn off all optimizations.
127    pub fn without_optimizations(self) -> Self {
128        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
129    }
130
131    /// Toggle projection pushdown optimization.
132    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
133        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
134        self
135    }
136
137    /// Toggle cluster with columns optimization.
138    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
139        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
140        self
141    }
142
143    /// Check if operations are order dependent and unset maintaining_order if
144    /// the order would not be observed.
145    pub fn with_check_order(mut self, toggle: bool) -> Self {
146        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
147        self
148    }
149
150    /// Toggle predicate pushdown optimization.
151    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
152        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
153        self
154    }
155
156    /// Toggle type coercion optimization.
157    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
158        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
159        self
160    }
161
162    /// Toggle type check optimization.
163    pub fn with_type_check(mut self, toggle: bool) -> Self {
164        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
165        self
166    }
167
168    /// Toggle expression simplification optimization on or off.
169    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
170        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
171        self
172    }
173
174    /// Toggle common subplan elimination optimization on or off
175    #[cfg(feature = "cse")]
176    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
177        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
178        self
179    }
180
181    /// Toggle common subexpression elimination optimization on or off
182    #[cfg(feature = "cse")]
183    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
184        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
185        self
186    }
187
188    /// Toggle slice pushdown optimization.
189    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
190        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
191        self
192    }
193
194    #[cfg(feature = "new_streaming")]
195    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
196        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
197        self
198    }
199
200    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
201    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
202        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
203        self
204    }
205
206    /// Run every node eagerly. This turns off multi-node optimizations.
207    pub fn _with_eager(mut self, toggle: bool) -> Self {
208        self.opt_state.set(OptFlags::EAGER, toggle);
209        self
210    }
211
212    /// Return a String describing the naive (un-optimized) logical plan.
213    pub fn describe_plan(&self) -> PolarsResult<String> {
214        Ok(self.clone().to_alp()?.describe())
215    }
216
217    /// Return a String describing the naive (un-optimized) logical plan in tree format.
218    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
219        Ok(self.clone().to_alp()?.describe_tree_format())
220    }
221
222    /// Return a String describing the optimized logical plan.
223    ///
224    /// Returns `Err` if optimizing the logical plan fails.
225    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
226        Ok(self.clone().to_alp_optimized()?.describe())
227    }
228
229    /// Return a String describing the optimized logical plan in tree format.
230    ///
231    /// Returns `Err` if optimizing the logical plan fails.
232    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
233        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
234    }
235
236    /// Return a String describing the logical plan.
237    ///
238    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
239    /// explains the naive, un-optimized plan.
240    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
241        if optimized {
242            self.describe_optimized_plan()
243        } else {
244            self.describe_plan()
245        }
246    }
247
248    /// Add a sort operation to the logical plan.
249    ///
250    /// Sorts the LazyFrame by the column name specified using the provided options.
251    ///
252    /// # Example
253    ///
254    /// Sort DataFrame by 'sepal_width' column:
255    /// ```rust
256    /// # use polars_core::prelude::*;
257    /// # use polars_lazy::prelude::*;
258    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
259    ///     df.lazy().sort(["sepal_width"], Default::default())
260    /// }
261    /// ```
262    /// Sort by a single column with specific order:
263    /// ```
264    /// # use polars_core::prelude::*;
265    /// # use polars_lazy::prelude::*;
266    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
267    ///     df.lazy().sort(
268    ///         ["sepal_width"],
269    ///         SortMultipleOptions::new()
270    ///             .with_order_descending(descending)
271    ///     )
272    /// }
273    /// ```
274    /// Sort by multiple columns with specifying order for each column:
275    /// ```
276    /// # use polars_core::prelude::*;
277    /// # use polars_lazy::prelude::*;
278    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
279    ///     df.lazy().sort(
280    ///         ["sepal_width", "sepal_length"],
281    ///         SortMultipleOptions::new()
282    ///             .with_order_descending_multi([false, true])
283    ///     )
284    /// }
285    /// ```
286    /// See [`SortMultipleOptions`] for more options.
287    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
288        let opt_state = self.get_opt_state();
289        let lp = self
290            .get_plan_builder()
291            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
292            .build();
293        Self::from_logical_plan(lp, opt_state)
294    }
295
296    /// Add a sort operation to the logical plan.
297    ///
298    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
299    /// concrete columns before sorting.
300    ///
301    /// See [`SortMultipleOptions`] for more options.
302    ///
303    /// # Example
304    ///
305    /// ```rust
306    /// use polars_core::prelude::*;
307    /// use polars_lazy::prelude::*;
308    ///
309    /// /// Sort DataFrame by 'sepal_width' column
310    /// fn example(df: DataFrame) -> LazyFrame {
311    ///       df.lazy()
312    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
313    /// }
314    /// ```
315    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
316        self,
317        by_exprs: E,
318        sort_options: SortMultipleOptions,
319    ) -> Self {
320        let by_exprs = by_exprs.as_ref().to_vec();
321        if by_exprs.is_empty() {
322            self
323        } else {
324            let opt_state = self.get_opt_state();
325            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
326            Self::from_logical_plan(lp, opt_state)
327        }
328    }
329
330    pub fn top_k<E: AsRef<[Expr]>>(
331        self,
332        k: IdxSize,
333        by_exprs: E,
334        sort_options: SortMultipleOptions,
335    ) -> Self {
336        // this will optimize to top-k
337        self.sort_by_exprs(
338            by_exprs,
339            sort_options.with_order_reversed().with_nulls_last(true),
340        )
341        .slice(0, k)
342    }
343
344    pub fn bottom_k<E: AsRef<[Expr]>>(
345        self,
346        k: IdxSize,
347        by_exprs: E,
348        sort_options: SortMultipleOptions,
349    ) -> Self {
350        // this will optimize to bottom-k
351        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
352            .slice(0, k)
353    }
354
355    /// Reverse the `DataFrame` from top to bottom.
356    ///
357    /// Row `i` becomes row `number_of_rows - i - 1`.
358    ///
359    /// # Example
360    ///
361    /// ```rust
362    /// use polars_core::prelude::*;
363    /// use polars_lazy::prelude::*;
364    ///
365    /// fn example(df: DataFrame) -> LazyFrame {
366    ///       df.lazy()
367    ///         .reverse()
368    /// }
369    /// ```
370    pub fn reverse(self) -> Self {
371        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
372    }
373
374    /// Rename columns in the DataFrame.
375    ///
376    /// `existing` and `new` are iterables of the same length containing the old and
377    /// corresponding new column names. Renaming happens to all `existing` columns
378    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
379    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
380    /// those columns that are actually found will be renamed (others will be ignored).
381    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
382    where
383        I: IntoIterator<Item = T>,
384        J: IntoIterator<Item = S>,
385        T: AsRef<str>,
386        S: AsRef<str>,
387    {
388        let iter = existing.into_iter();
389        let cap = iter.size_hint().0;
390        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
391        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
392
393        // TODO! should this error if `existing` and `new` have different lengths?
394        // Currently, the longer of the two is truncated.
395        for (existing, new) in iter.zip(new) {
396            let existing = existing.as_ref();
397            let new = new.as_ref();
398            if new != existing {
399                existing_vec.push(existing.into());
400                new_vec.push(new.into());
401            }
402        }
403
404        self.map_private(DslFunction::Rename {
405            existing: existing_vec.into(),
406            new: new_vec.into(),
407            strict,
408        })
409    }
410
411    /// Removes columns from the DataFrame.
412    /// Note that it's better to only select the columns you need
413    /// and let the projection pushdown optimize away the unneeded columns.
414    ///
415    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
416    /// error while materializing the [`LazyFrame`].
417    pub fn drop(self, columns: Selector) -> Self {
418        let opt_state = self.get_opt_state();
419        let lp = self.get_plan_builder().drop(columns).build();
420        Self::from_logical_plan(lp, opt_state)
421    }
422
423    /// Shift the values by a given period and fill the parts that will be empty due to this operation
424    /// with `Nones`.
425    ///
426    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
427    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
428        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
429    }
430
431    /// Shift the values by a given period and fill the parts that will be empty due to this operation
432    /// with the result of the `fill_value` expression.
433    ///
434    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
435    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
436        self.select(vec![
437            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
438        ])
439    }
440
441    /// Fill None values in the DataFrame with an expression.
442    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
443        let opt_state = self.get_opt_state();
444        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
445        Self::from_logical_plan(lp, opt_state)
446    }
447
448    /// Fill NaN values in the DataFrame with an expression.
449    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
450        let opt_state = self.get_opt_state();
451        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
452        Self::from_logical_plan(lp, opt_state)
453    }
454
455    /// Caches the result into a new LazyFrame.
456    ///
457    /// This should be used to prevent computations running multiple times.
458    pub fn cache(self) -> Self {
459        let opt_state = self.get_opt_state();
460        let lp = self.get_plan_builder().cache().build();
461        Self::from_logical_plan(lp, opt_state)
462    }
463
464    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
465    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
466        let cast_cols: Vec<Expr> = dtypes
467            .into_iter()
468            .map(|(name, dt)| {
469                let name = PlSmallStr::from_str(name);
470
471                if strict {
472                    col(name).strict_cast(dt)
473                } else {
474                    col(name).cast(dt)
475                }
476            })
477            .collect();
478
479        if cast_cols.is_empty() {
480            self
481        } else {
482            self.with_columns(cast_cols)
483        }
484    }
485
486    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
487    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
488        self.with_columns(vec![if strict {
489            col(PlSmallStr::from_static("*")).strict_cast(dtype)
490        } else {
491            col(PlSmallStr::from_static("*")).cast(dtype)
492        }])
493    }
494
495    pub fn optimize(
496        self,
497        lp_arena: &mut Arena<IR>,
498        expr_arena: &mut Arena<AExpr>,
499    ) -> PolarsResult<Node> {
500        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
501    }
502
503    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
504        let (mut lp_arena, mut expr_arena) = self.get_arenas();
505        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
506
507        Ok(IRPlan::new(node, lp_arena, expr_arena))
508    }
509
510    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
511        let (mut lp_arena, mut expr_arena) = self.get_arenas();
512        let node = to_alp(
513            self.logical_plan,
514            &mut expr_arena,
515            &mut lp_arena,
516            &mut self.opt_state,
517        )?;
518        let plan = IRPlan::new(node, lp_arena, expr_arena);
519        Ok(plan)
520    }
521
522    pub(crate) fn optimize_with_scratch(
523        self,
524        lp_arena: &mut Arena<IR>,
525        expr_arena: &mut Arena<AExpr>,
526        scratch: &mut Vec<Node>,
527    ) -> PolarsResult<Node> {
528        let lp_top = optimize(
529            self.logical_plan,
530            self.opt_state,
531            lp_arena,
532            expr_arena,
533            scratch,
534            apply_scan_predicate_to_scan_ir,
535        )?;
536
537        Ok(lp_top)
538    }
539
540    fn prepare_collect_post_opt<P>(
541        mut self,
542        check_sink: bool,
543        query_start: Option<std::time::Instant>,
544        post_opt: P,
545    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
546    where
547        P: FnOnce(
548            Node,
549            &mut Arena<IR>,
550            &mut Arena<AExpr>,
551            Option<std::time::Duration>,
552        ) -> PolarsResult<()>,
553    {
554        let (mut lp_arena, mut expr_arena) = self.get_arenas();
555
556        let mut scratch = vec![];
557        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
558
559        post_opt(
560            lp_top,
561            &mut lp_arena,
562            &mut expr_arena,
563            // Post optimization callback gets the time since the
564            // query was started as its "base" timepoint.
565            query_start.map(|s| s.elapsed()),
566        )?;
567
568        // sink should be replaced
569        let no_file_sink = if check_sink {
570            !matches!(
571                lp_arena.get(lp_top),
572                IR::Sink {
573                    payload: SinkTypeIR::File { .. },
574                    ..
575                }
576            )
577        } else {
578            true
579        };
580        let physical_plan = create_physical_plan(
581            lp_top,
582            &mut lp_arena,
583            &mut expr_arena,
584            BUILD_STREAMING_EXECUTOR,
585        )?;
586
587        let state = ExecutionState::new();
588        Ok((state, physical_plan, no_file_sink))
589    }
590
591    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
592    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
593    where
594        P: FnOnce(
595            Node,
596            &mut Arena<IR>,
597            &mut Arena<AExpr>,
598            Option<std::time::Duration>,
599        ) -> PolarsResult<()>,
600    {
601        let (mut state, mut physical_plan, _) =
602            self.prepare_collect_post_opt(false, None, post_opt)?;
603        physical_plan.execute(&mut state)
604    }
605
606    #[allow(unused_mut)]
607    fn prepare_collect(
608        self,
609        check_sink: bool,
610        query_start: Option<std::time::Instant>,
611    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
612        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
613    }
614
615    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
616    /// `engine`.
617    ///
618    /// The query is optimized prior to execution.
619    pub fn collect_with_engine(mut self, engine: Engine) -> PolarsResult<QueryResult> {
620        let engine = match engine {
621            Engine::Streaming => Engine::Streaming,
622            _ if std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1") => {
623                Engine::Streaming
624            },
625            Engine::Auto => Engine::InMemory,
626            v => v,
627        };
628
629        if engine != Engine::Streaming
630            && std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1")
631        {
632            feature_gated!("new_streaming", {
633                if let Some(r) = self.clone()._collect_with_streaming_suppress_todo_panic() {
634                    return r;
635                }
636            })
637        }
638
639        if let Engine::Streaming = engine {
640            feature_gated!("new_streaming", self = self.with_new_streaming(true))
641        }
642
643        let mut ir_plan = self.to_alp_optimized()?;
644
645        ir_plan.ensure_root_node_is_sink();
646
647        match engine {
648            Engine::Streaming => feature_gated!("new_streaming", {
649                polars_stream::run_query(
650                    ir_plan.lp_top,
651                    &mut ir_plan.lp_arena,
652                    &mut ir_plan.expr_arena,
653                )
654            }),
655            Engine::InMemory | Engine::Gpu => {
656                if let IR::SinkMultiple { inputs } = ir_plan.root() {
657                    polars_ensure!(
658                        engine != Engine::Gpu,
659                        InvalidOperation:
660                        "collect_all is not supported for the gpu engine"
661                    );
662
663                    return create_multiple_physical_plans(
664                        inputs.clone().as_slice(),
665                        &mut ir_plan.lp_arena,
666                        &mut ir_plan.expr_arena,
667                        BUILD_STREAMING_EXECUTOR,
668                    )?
669                    .execute()
670                    .map(QueryResult::Multiple);
671                }
672
673                let mut physical_plan = create_physical_plan(
674                    ir_plan.lp_top,
675                    &mut ir_plan.lp_arena,
676                    &mut ir_plan.expr_arena,
677                    BUILD_STREAMING_EXECUTOR,
678                )?;
679                let mut state = ExecutionState::new();
680                physical_plan.execute(&mut state).map(QueryResult::Single)
681            },
682            Engine::Auto => unreachable!(),
683        }
684    }
685
686    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
687        let sink_multiple = LazyFrame {
688            logical_plan: DslPlan::SinkMultiple { inputs: plans },
689            opt_state,
690            cached_arena: Default::default(),
691        };
692        sink_multiple.explain(true)
693    }
694
695    pub fn collect_all_with_engine(
696        plans: Vec<DslPlan>,
697        engine: Engine,
698        opt_state: OptFlags,
699    ) -> PolarsResult<Vec<DataFrame>> {
700        if plans.is_empty() {
701            return Ok(Vec::new());
702        }
703
704        LazyFrame {
705            logical_plan: DslPlan::SinkMultiple { inputs: plans },
706            opt_state,
707            cached_arena: Default::default(),
708        }
709        .collect_with_engine(engine)
710        .map(|r| r.unwrap_multiple())
711    }
712
713    /// Execute all the lazy operations and collect them into a [`DataFrame`].
714    ///
715    /// The query is optimized prior to execution.
716    ///
717    /// # Example
718    ///
719    /// ```rust
720    /// use polars_core::prelude::*;
721    /// use polars_lazy::prelude::*;
722    ///
723    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
724    ///     df.lazy()
725    ///       .group_by([col("foo")])
726    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
727    ///       .collect()
728    /// }
729    /// ```
730    pub fn collect(self) -> PolarsResult<DataFrame> {
731        self.collect_with_engine(Engine::Auto).map(|r| match r {
732            QueryResult::Single(df) => df,
733            // TODO: Should return query results
734            QueryResult::Multiple(_) => DataFrame::empty(),
735        })
736    }
737
738    /// Collect the query in batches.
739    ///
740    /// If lazy is true the query will not start until the first poll (or until
741    /// start is called on CollectBatches).
742    #[cfg(feature = "async")]
743    pub fn collect_batches(
744        self,
745        engine: Engine,
746        maintain_order: bool,
747        chunk_size: Option<NonZeroUsize>,
748        lazy: bool,
749    ) -> PolarsResult<CollectBatches> {
750        let (send, recv) = sync_channel(1);
751        let runner_send = send.clone();
752        let ldf = self.sink_batches(
753            PlanCallback::new(move |df| {
754                // Stop if receiver has closed.
755                let send_result = send.send(Ok(df));
756                Ok(send_result.is_err())
757            }),
758            maintain_order,
759            chunk_size,
760        )?;
761        let runner = move || {
762            // We use a tokio spawn_blocking here as it has a high blocking
763            // thread pool limit.
764            polars_io::pl_async::get_runtime().spawn_blocking(move || {
765                if let Err(e) = ldf.collect_with_engine(engine) {
766                    runner_send.send(Err(e)).ok();
767                }
768            });
769        };
770
771        let mut collect_batches = CollectBatches {
772            recv,
773            runner: Some(Box::new(runner)),
774        };
775        if !lazy {
776            collect_batches.start();
777        }
778        Ok(collect_batches)
779    }
780
781    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
782    // This version does profiling of the node execution.
783    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
784    where
785        P: FnOnce(
786            Node,
787            &mut Arena<IR>,
788            &mut Arena<AExpr>,
789            Option<std::time::Duration>,
790        ) -> PolarsResult<()>,
791    {
792        let query_start = std::time::Instant::now();
793        let (mut state, mut physical_plan, _) =
794            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
795        state.time_nodes(query_start);
796        let out = physical_plan.execute(&mut state)?;
797        let timer_df = state.finish_timer()?;
798        Ok((out, timer_df))
799    }
800
801    /// Profile a LazyFrame.
802    ///
803    /// This will run the query and return a tuple
804    /// containing the materialized DataFrame and a DataFrame that contains profiling information
805    /// of each node that is executed.
806    ///
807    /// The units of the timings are microseconds.
808    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
809        self._profile_post_opt(|_, _, _, _| Ok(()))
810    }
811
812    pub fn sink_batches(
813        mut self,
814        function: PlanCallback<DataFrame, bool>,
815        maintain_order: bool,
816        chunk_size: Option<NonZeroUsize>,
817    ) -> PolarsResult<Self> {
818        use polars_plan::prelude::sink::CallbackSinkType;
819
820        polars_ensure!(
821            !matches!(self.logical_plan, DslPlan::Sink { .. }),
822            InvalidOperation: "cannot create a sink on top of another sink"
823        );
824
825        self.logical_plan = DslPlan::Sink {
826            input: Arc::new(self.logical_plan),
827            payload: SinkType::Callback(CallbackSinkType {
828                function,
829                maintain_order,
830                chunk_size,
831            }),
832        };
833
834        Ok(self)
835    }
836
837    /// Collect with the streaming engine. Returns `None` if the streaming engine panics with a todo!.
838    #[cfg(feature = "new_streaming")]
839    fn _collect_with_streaming_suppress_todo_panic(
840        mut self,
841    ) -> Option<PolarsResult<polars_core::query_result::QueryResult>> {
842        self.opt_state |= OptFlags::NEW_STREAMING;
843        let mut ir_plan = match self.to_alp_optimized() {
844            Ok(v) => v,
845            Err(e) => return Some(Err(e)),
846        };
847
848        ir_plan.ensure_root_node_is_sink();
849
850        let f = || {
851            polars_stream::run_query(
852                ir_plan.lp_top,
853                &mut ir_plan.lp_arena,
854                &mut ir_plan.expr_arena,
855            )
856        };
857
858        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
859            Ok(v) => Some(v),
860            Err(e) => {
861                // Fallback to normal engine if error is due to not being implemented
862                // and auto_new_streaming is set, otherwise propagate error.
863                if e.downcast_ref::<&str>()
864                    .is_some_and(|s| s.starts_with("not yet implemented"))
865                {
866                    if polars_core::config::verbose() {
867                        eprintln!(
868                            "caught unimplemented error in new streaming engine, falling back to normal engine"
869                        );
870                    }
871                    None
872                } else {
873                    std::panic::resume_unwind(e)
874                }
875            },
876        }
877    }
878
879    pub fn sink(
880        mut self,
881        sink_type: SinkDestination,
882        file_format: FileWriteFormat,
883        unified_sink_args: UnifiedSinkArgs,
884    ) -> PolarsResult<Self> {
885        polars_ensure!(
886            !matches!(self.logical_plan, DslPlan::Sink { .. }),
887            InvalidOperation: "cannot create a sink on top of another sink"
888        );
889
890        self.logical_plan = DslPlan::Sink {
891            input: Arc::new(self.logical_plan),
892            payload: match sink_type {
893                SinkDestination::File { target } => SinkType::File(FileSinkOptions {
894                    target,
895                    file_format,
896                    unified_sink_args,
897                }),
898                SinkDestination::Partitioned {
899                    base_path,
900                    file_path_provider,
901                    partition_strategy,
902                    max_rows_per_file,
903                    approximate_bytes_per_file,
904                } => SinkType::Partitioned(PartitionedSinkOptions {
905                    base_path,
906                    file_path_provider,
907                    partition_strategy,
908                    file_format,
909                    unified_sink_args,
910                    max_rows_per_file,
911                    approximate_bytes_per_file,
912                }),
913            },
914        };
915        Ok(self)
916    }
917
918    /// Filter frame rows that match a predicate expression.
919    ///
920    /// The expression must yield boolean values (note that rows where the
921    /// predicate resolves to `null` are *not* included in the resulting frame).
922    ///
923    /// # Example
924    ///
925    /// ```rust
926    /// use polars_core::prelude::*;
927    /// use polars_lazy::prelude::*;
928    ///
929    /// fn example(df: DataFrame) -> LazyFrame {
930    ///       df.lazy()
931    ///         .filter(col("sepal_width").is_not_null())
932    ///         .select([col("sepal_width"), col("sepal_length")])
933    /// }
934    /// ```
935    pub fn filter(self, predicate: Expr) -> Self {
936        let opt_state = self.get_opt_state();
937        let lp = self.get_plan_builder().filter(predicate).build();
938        Self::from_logical_plan(lp, opt_state)
939    }
940
941    /// Remove frame rows that match a predicate expression.
942    ///
943    /// The expression must yield boolean values (note that rows where the
944    /// predicate resolves to `null` are *not* removed from the resulting frame).
945    ///
946    /// # Example
947    ///
948    /// ```rust
949    /// use polars_core::prelude::*;
950    /// use polars_lazy::prelude::*;
951    ///
952    /// fn example(df: DataFrame) -> LazyFrame {
953    ///       df.lazy()
954    ///         .remove(col("sepal_width").is_null())
955    ///         .select([col("sepal_width"), col("sepal_length")])
956    /// }
957    /// ```
958    pub fn remove(self, predicate: Expr) -> Self {
959        self.filter(predicate.neq_missing(lit(true)))
960    }
961
962    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
963    ///
964    /// Columns can be selected with [`col`];
965    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
966    ///
967    /// # Example
968    ///
969    /// ```rust
970    /// use polars_core::prelude::*;
971    /// use polars_lazy::prelude::*;
972    ///
973    /// /// This function selects column "foo" and column "bar".
974    /// /// Column "bar" is renamed to "ham".
975    /// fn example(df: DataFrame) -> LazyFrame {
976    ///       df.lazy()
977    ///         .select([col("foo"),
978    ///                   col("bar").alias("ham")])
979    /// }
980    ///
981    /// /// This function selects all columns except "foo"
982    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
983    ///       df.lazy()
984    ///         .select([all().exclude_cols(["foo"]).as_expr()])
985    /// }
986    /// ```
987    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
988        let exprs = exprs.as_ref().to_vec();
989        self.select_impl(
990            exprs,
991            ProjectionOptions {
992                run_parallel: true,
993                duplicate_check: true,
994                should_broadcast: true,
995            },
996        )
997    }
998
999    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1000        let exprs = exprs.as_ref().to_vec();
1001        self.select_impl(
1002            exprs,
1003            ProjectionOptions {
1004                run_parallel: false,
1005                duplicate_check: true,
1006                should_broadcast: true,
1007            },
1008        )
1009    }
1010
1011    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1012        let opt_state = self.get_opt_state();
1013        let lp = self.get_plan_builder().project(exprs, options).build();
1014        Self::from_logical_plan(lp, opt_state)
1015    }
1016
1017    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1018    ///
1019    /// Takes a list of expressions to group on.
1020    ///
1021    /// # Example
1022    ///
1023    /// ```rust
1024    /// use polars_core::prelude::*;
1025    /// use polars_lazy::prelude::*;
1026    ///
1027    /// fn example(df: DataFrame) -> LazyFrame {
1028    ///       df.lazy()
1029    ///        .group_by([col("date")])
1030    ///        .agg([
1031    ///            col("rain").min().alias("min_rain"),
1032    ///            col("rain").sum().alias("sum_rain"),
1033    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1034    ///        ])
1035    /// }
1036    /// ```
1037    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1038        let keys = by
1039            .as_ref()
1040            .iter()
1041            .map(|e| e.clone().into())
1042            .collect::<Vec<_>>();
1043        let opt_state = self.get_opt_state();
1044
1045        #[cfg(feature = "dynamic_group_by")]
1046        {
1047            LazyGroupBy {
1048                logical_plan: self.logical_plan,
1049                opt_state,
1050                keys,
1051                predicates: vec![],
1052                maintain_order: false,
1053                dynamic_options: None,
1054                rolling_options: None,
1055            }
1056        }
1057
1058        #[cfg(not(feature = "dynamic_group_by"))]
1059        {
1060            LazyGroupBy {
1061                logical_plan: self.logical_plan,
1062                opt_state,
1063                keys,
1064                predicates: vec![],
1065                maintain_order: false,
1066            }
1067        }
1068    }
1069
1070    /// Create rolling groups based on a time column.
1071    ///
1072    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1073    ///
1074    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1075    /// individual values and are not of constant intervals. For constant intervals use
1076    /// *group_by_dynamic*
1077    #[cfg(feature = "dynamic_group_by")]
1078    pub fn rolling<E: AsRef<[Expr]>>(
1079        mut self,
1080        index_column: Expr,
1081        group_by: E,
1082        mut options: RollingGroupOptions,
1083    ) -> LazyGroupBy {
1084        if let Expr::Column(name) = index_column {
1085            options.index_column = name;
1086        } else {
1087            let output_field = index_column
1088                .to_field(&self.collect_schema().unwrap())
1089                .unwrap();
1090            return self.with_column(index_column).rolling(
1091                Expr::Column(output_field.name().clone()),
1092                group_by,
1093                options,
1094            );
1095        }
1096        let opt_state = self.get_opt_state();
1097        LazyGroupBy {
1098            logical_plan: self.logical_plan,
1099            opt_state,
1100            predicates: vec![],
1101            keys: group_by.as_ref().to_vec(),
1102            maintain_order: true,
1103            dynamic_options: None,
1104            rolling_options: Some(options),
1105        }
1106    }
1107
1108    /// Group based on a time value (or index value of type Int32, Int64).
1109    ///
1110    /// Time windows are calculated and rows are assigned to windows. Different from a
1111    /// normal group_by is that a row can be member of multiple groups. The time/index
1112    /// window could be seen as a rolling window, with a window size determined by
1113    /// dates/times/values instead of slots in the DataFrame.
1114    ///
1115    /// A window is defined by:
1116    ///
1117    /// - every: interval of the window
1118    /// - period: length of the window
1119    /// - offset: offset of the window
1120    ///
1121    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1122    /// with a ordinary group_by on these keys.
1123    #[cfg(feature = "dynamic_group_by")]
1124    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1125        mut self,
1126        index_column: Expr,
1127        group_by: E,
1128        mut options: DynamicGroupOptions,
1129    ) -> LazyGroupBy {
1130        if let Expr::Column(name) = index_column {
1131            options.index_column = name;
1132        } else {
1133            let output_field = index_column
1134                .to_field(&self.collect_schema().unwrap())
1135                .unwrap();
1136            return self.with_column(index_column).group_by_dynamic(
1137                Expr::Column(output_field.name().clone()),
1138                group_by,
1139                options,
1140            );
1141        }
1142        let opt_state = self.get_opt_state();
1143        LazyGroupBy {
1144            logical_plan: self.logical_plan,
1145            opt_state,
1146            predicates: vec![],
1147            keys: group_by.as_ref().to_vec(),
1148            maintain_order: true,
1149            dynamic_options: Some(options),
1150            rolling_options: None,
1151        }
1152    }
1153
1154    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1155    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1156        let keys = by
1157            .as_ref()
1158            .iter()
1159            .map(|e| e.clone().into())
1160            .collect::<Vec<_>>();
1161        let opt_state = self.get_opt_state();
1162
1163        #[cfg(feature = "dynamic_group_by")]
1164        {
1165            LazyGroupBy {
1166                logical_plan: self.logical_plan,
1167                opt_state,
1168                keys,
1169                predicates: vec![],
1170                maintain_order: true,
1171                dynamic_options: None,
1172                rolling_options: None,
1173            }
1174        }
1175
1176        #[cfg(not(feature = "dynamic_group_by"))]
1177        {
1178            LazyGroupBy {
1179                logical_plan: self.logical_plan,
1180                opt_state,
1181                keys,
1182                predicates: vec![],
1183                maintain_order: true,
1184            }
1185        }
1186    }
1187
1188    /// Left anti join this query with another lazy query.
1189    ///
1190    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1191    /// flexible join logic, see [`join`](LazyFrame::join) or
1192    /// [`join_builder`](LazyFrame::join_builder).
1193    ///
1194    /// # Example
1195    ///
1196    /// ```rust
1197    /// use polars_core::prelude::*;
1198    /// use polars_lazy::prelude::*;
1199    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1200    ///         ldf
1201    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1202    /// }
1203    /// ```
1204    #[cfg(feature = "semi_anti_join")]
1205    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1206        self.join(
1207            other,
1208            [left_on.into()],
1209            [right_on.into()],
1210            JoinArgs::new(JoinType::Anti),
1211        )
1212    }
1213
1214    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1215    #[cfg(feature = "cross_join")]
1216    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1217        self.join(
1218            other,
1219            vec![],
1220            vec![],
1221            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1222        )
1223    }
1224
1225    /// Left outer join this query with another lazy query.
1226    ///
1227    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1228    /// flexible join logic, see [`join`](LazyFrame::join) or
1229    /// [`join_builder`](LazyFrame::join_builder).
1230    ///
1231    /// # Example
1232    ///
1233    /// ```rust
1234    /// use polars_core::prelude::*;
1235    /// use polars_lazy::prelude::*;
1236    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1237    ///         ldf
1238    ///         .left_join(other, col("foo"), col("bar"))
1239    /// }
1240    /// ```
1241    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1242        self.join(
1243            other,
1244            [left_on.into()],
1245            [right_on.into()],
1246            JoinArgs::new(JoinType::Left),
1247        )
1248    }
1249
1250    /// Inner join this query with another lazy query.
1251    ///
1252    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1253    /// flexible join logic, see [`join`](LazyFrame::join) or
1254    /// [`join_builder`](LazyFrame::join_builder).
1255    ///
1256    /// # Example
1257    ///
1258    /// ```rust
1259    /// use polars_core::prelude::*;
1260    /// use polars_lazy::prelude::*;
1261    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1262    ///         ldf
1263    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1264    /// }
1265    /// ```
1266    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1267        self.join(
1268            other,
1269            [left_on.into()],
1270            [right_on.into()],
1271            JoinArgs::new(JoinType::Inner),
1272        )
1273    }
1274
1275    /// Full outer join this query with another lazy query.
1276    ///
1277    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1278    /// flexible join logic, see [`join`](LazyFrame::join) or
1279    /// [`join_builder`](LazyFrame::join_builder).
1280    ///
1281    /// # Example
1282    ///
1283    /// ```rust
1284    /// use polars_core::prelude::*;
1285    /// use polars_lazy::prelude::*;
1286    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1287    ///         ldf
1288    ///         .full_join(other, col("foo"), col("bar"))
1289    /// }
1290    /// ```
1291    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1292        self.join(
1293            other,
1294            [left_on.into()],
1295            [right_on.into()],
1296            JoinArgs::new(JoinType::Full),
1297        )
1298    }
1299
1300    /// Left semi join this query with another lazy query.
1301    ///
1302    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1303    /// flexible join logic, see [`join`](LazyFrame::join) or
1304    /// [`join_builder`](LazyFrame::join_builder).
1305    ///
1306    /// # Example
1307    ///
1308    /// ```rust
1309    /// use polars_core::prelude::*;
1310    /// use polars_lazy::prelude::*;
1311    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1312    ///         ldf
1313    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1314    /// }
1315    /// ```
1316    #[cfg(feature = "semi_anti_join")]
1317    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1318        self.join(
1319            other,
1320            [left_on.into()],
1321            [right_on.into()],
1322            JoinArgs::new(JoinType::Semi),
1323        )
1324    }
1325
1326    /// Generic function to join two LazyFrames.
1327    ///
1328    /// `join` can join on multiple columns, given as two list of expressions, and with a
1329    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1330    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1331    /// over how columns are renamed and parallelization options, use
1332    /// [`join_builder`](LazyFrame::join_builder).
1333    ///
1334    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1335    ///
1336    /// # Example
1337    ///
1338    /// ```rust
1339    /// use polars_core::prelude::*;
1340    /// use polars_lazy::prelude::*;
1341    ///
1342    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1343    ///         ldf
1344    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1345    /// }
1346    /// ```
1347    pub fn join<E: AsRef<[Expr]>>(
1348        self,
1349        other: LazyFrame,
1350        left_on: E,
1351        right_on: E,
1352        args: JoinArgs,
1353    ) -> LazyFrame {
1354        let left_on = left_on.as_ref().to_vec();
1355        let right_on = right_on.as_ref().to_vec();
1356
1357        self._join_impl(other, left_on, right_on, args)
1358    }
1359
1360    fn _join_impl(
1361        self,
1362        other: LazyFrame,
1363        left_on: Vec<Expr>,
1364        right_on: Vec<Expr>,
1365        args: JoinArgs,
1366    ) -> LazyFrame {
1367        let JoinArgs {
1368            how,
1369            validation,
1370            suffix,
1371            slice,
1372            nulls_equal,
1373            coalesce,
1374            maintain_order,
1375            build_side,
1376        } = args;
1377
1378        if slice.is_some() {
1379            panic!("impl error: slice is not handled")
1380        }
1381
1382        let mut builder = self
1383            .join_builder()
1384            .with(other)
1385            .left_on(left_on)
1386            .right_on(right_on)
1387            .how(how)
1388            .validate(validation)
1389            .join_nulls(nulls_equal)
1390            .coalesce(coalesce)
1391            .maintain_order(maintain_order)
1392            .build_side(build_side);
1393
1394        if let Some(suffix) = suffix {
1395            builder = builder.suffix(suffix);
1396        }
1397
1398        // Note: args.slice is set by the optimizer
1399        builder.finish()
1400    }
1401
1402    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1403    ///
1404    /// After the `JoinBuilder` has been created and set up, calling
1405    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1406    /// representing the `join` operation.
1407    pub fn join_builder(self) -> JoinBuilder {
1408        JoinBuilder::new(self)
1409    }
1410
1411    /// Add or replace a column, given as an expression, to a DataFrame.
1412    ///
1413    /// # Example
1414    ///
1415    /// ```rust
1416    /// use polars_core::prelude::*;
1417    /// use polars_lazy::prelude::*;
1418    /// fn add_column(df: DataFrame) -> LazyFrame {
1419    ///     df.lazy()
1420    ///         .with_column(
1421    ///             when(col("sepal_length").lt(lit(5.0)))
1422    ///             .then(lit(10))
1423    ///             .otherwise(lit(1))
1424    ///             .alias("new_column_name"),
1425    ///         )
1426    /// }
1427    /// ```
1428    pub fn with_column(self, expr: Expr) -> LazyFrame {
1429        let opt_state = self.get_opt_state();
1430        let lp = self
1431            .get_plan_builder()
1432            .with_columns(
1433                vec![expr],
1434                ProjectionOptions {
1435                    run_parallel: false,
1436                    duplicate_check: true,
1437                    should_broadcast: true,
1438                },
1439            )
1440            .build();
1441        Self::from_logical_plan(lp, opt_state)
1442    }
1443
1444    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1445    ///
1446    /// # Example
1447    ///
1448    /// ```rust
1449    /// use polars_core::prelude::*;
1450    /// use polars_lazy::prelude::*;
1451    /// fn add_columns(df: DataFrame) -> LazyFrame {
1452    ///     df.lazy()
1453    ///         .with_columns(
1454    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1455    ///          )
1456    /// }
1457    /// ```
1458    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1459        let exprs = exprs.as_ref().to_vec();
1460        self.with_columns_impl(
1461            exprs,
1462            ProjectionOptions {
1463                run_parallel: true,
1464                duplicate_check: true,
1465                should_broadcast: true,
1466            },
1467        )
1468    }
1469
1470    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1471    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1472        let exprs = exprs.as_ref().to_vec();
1473        self.with_columns_impl(
1474            exprs,
1475            ProjectionOptions {
1476                run_parallel: false,
1477                duplicate_check: true,
1478                should_broadcast: true,
1479            },
1480        )
1481    }
1482
1483    /// Match or evolve to a certain schema.
1484    pub fn match_to_schema(
1485        self,
1486        schema: SchemaRef,
1487        per_column: Arc<[MatchToSchemaPerColumn]>,
1488        extra_columns: ExtraColumnsPolicy,
1489    ) -> LazyFrame {
1490        let opt_state = self.get_opt_state();
1491        let lp = self
1492            .get_plan_builder()
1493            .match_to_schema(schema, per_column, extra_columns)
1494            .build();
1495        Self::from_logical_plan(lp, opt_state)
1496    }
1497
1498    pub fn pipe_with_schema(
1499        self,
1500        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1501    ) -> Self {
1502        let opt_state = self.get_opt_state();
1503        let lp = self
1504            .get_plan_builder()
1505            .pipe_with_schema(vec![], callback)
1506            .build();
1507        Self::from_logical_plan(lp, opt_state)
1508    }
1509
1510    pub fn pipe_with_schemas(
1511        self,
1512        others: Vec<LazyFrame>,
1513        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1514    ) -> Self {
1515        let opt_state = self.get_opt_state();
1516        let lp = self
1517            .get_plan_builder()
1518            .pipe_with_schema(
1519                others.into_iter().map(|lf| lf.logical_plan).collect(),
1520                callback,
1521            )
1522            .build();
1523        Self::from_logical_plan(lp, opt_state)
1524    }
1525
1526    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1527        let opt_state = self.get_opt_state();
1528        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1529        Self::from_logical_plan(lp, opt_state)
1530    }
1531
1532    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1533        let contexts = contexts
1534            .as_ref()
1535            .iter()
1536            .map(|lf| lf.logical_plan.clone())
1537            .collect();
1538        let opt_state = self.get_opt_state();
1539        let lp = self.get_plan_builder().with_context(contexts).build();
1540        Self::from_logical_plan(lp, opt_state)
1541    }
1542
1543    /// Aggregate all the columns as their maximum values.
1544    ///
1545    /// Aggregated columns will have the same names as the original columns.
1546    pub fn max(self) -> Self {
1547        self.map_private(DslFunction::Stats(StatsFunction::Max))
1548    }
1549
1550    /// Aggregate all the columns as their minimum values.
1551    ///
1552    /// Aggregated columns will have the same names as the original columns.
1553    pub fn min(self) -> Self {
1554        self.map_private(DslFunction::Stats(StatsFunction::Min))
1555    }
1556
1557    /// Aggregate all the columns as their sum values.
1558    ///
1559    /// Aggregated columns will have the same names as the original columns.
1560    ///
1561    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1562    /// - For integer columns, the ordinary checks for overflow are performed:
1563    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1564    ///   silently wrap.
1565    /// - String columns will sum to None.
1566    pub fn sum(self) -> Self {
1567        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1568    }
1569
1570    /// Aggregate all the columns as their mean values.
1571    ///
1572    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1573    /// - String columns will have a mean of None.
1574    pub fn mean(self) -> Self {
1575        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1576    }
1577
1578    /// Aggregate all the columns as their median values.
1579    ///
1580    /// - Boolean and integer results are converted to `f64`. However, they are still
1581    ///   susceptible to overflow before this conversion occurs.
1582    /// - String columns will sum to None.
1583    pub fn median(self) -> Self {
1584        self.map_private(DslFunction::Stats(StatsFunction::Median))
1585    }
1586
1587    /// Aggregate all the columns as their quantile values.
1588    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1589        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1590            quantile,
1591            method,
1592        }))
1593    }
1594
1595    /// Aggregate all the columns as their standard deviation values.
1596    ///
1597    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1598    /// computing the variance, where `N` is the number of rows.
1599    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1600    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1601    /// > likelihood estimate of the variance for normally distributed variables. The
1602    /// > standard deviation computed in this function is the square root of the estimated
1603    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1604    /// > standard deviation per se.
1605    ///
1606    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1607    pub fn std(self, ddof: u8) -> Self {
1608        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1609    }
1610
1611    /// Aggregate all the columns as their variance values.
1612    ///
1613    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1614    /// computing the variance, where `N` is the number of rows.
1615    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1616    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1617    /// > likelihood estimate of the variance for normally distributed variables.
1618    ///
1619    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1620    pub fn var(self, ddof: u8) -> Self {
1621        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1622    }
1623
1624    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1625    pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1626        self.explode_impl(columns, options, false)
1627    }
1628
1629    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1630    fn explode_impl(
1631        self,
1632        columns: Selector,
1633        options: ExplodeOptions,
1634        allow_empty: bool,
1635    ) -> LazyFrame {
1636        let opt_state = self.get_opt_state();
1637        let lp = self
1638            .get_plan_builder()
1639            .explode(columns, options, allow_empty)
1640            .build();
1641        Self::from_logical_plan(lp, opt_state)
1642    }
1643
1644    /// Aggregate all the columns as the sum of their null value count.
1645    pub fn null_count(self) -> LazyFrame {
1646        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1647    }
1648
1649    /// Drop non-unique rows and maintain the order of kept rows.
1650    ///
1651    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1652    /// `None`, all columns are considered.
1653    pub fn unique_stable(
1654        self,
1655        subset: Option<Selector>,
1656        keep_strategy: UniqueKeepStrategy,
1657    ) -> LazyFrame {
1658        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1659        self.unique_stable_generic(subset, keep_strategy)
1660    }
1661
1662    pub fn unique_stable_generic(
1663        self,
1664        subset: Option<Vec<Expr>>,
1665        keep_strategy: UniqueKeepStrategy,
1666    ) -> LazyFrame {
1667        let opt_state = self.get_opt_state();
1668        let options = DistinctOptionsDSL {
1669            subset,
1670            maintain_order: true,
1671            keep_strategy,
1672        };
1673        let lp = self.get_plan_builder().distinct(options).build();
1674        Self::from_logical_plan(lp, opt_state)
1675    }
1676
1677    /// Drop non-unique rows without maintaining the order of kept rows.
1678    ///
1679    /// The order of the kept rows may change; to maintain the original row order, use
1680    /// [`unique_stable`](LazyFrame::unique_stable).
1681    ///
1682    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1683    /// all columns are considered.
1684    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1685        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1686        self.unique_generic(subset, keep_strategy)
1687    }
1688
1689    pub fn unique_generic(
1690        self,
1691        subset: Option<Vec<Expr>>,
1692        keep_strategy: UniqueKeepStrategy,
1693    ) -> LazyFrame {
1694        let opt_state = self.get_opt_state();
1695        let options = DistinctOptionsDSL {
1696            subset,
1697            maintain_order: false,
1698            keep_strategy,
1699        };
1700        let lp = self.get_plan_builder().distinct(options).build();
1701        Self::from_logical_plan(lp, opt_state)
1702    }
1703
1704    /// Drop rows containing one or more NaN values.
1705    ///
1706    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1707    /// floating point columns are considered.
1708    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1709        let opt_state = self.get_opt_state();
1710        let lp = self.get_plan_builder().drop_nans(subset).build();
1711        Self::from_logical_plan(lp, opt_state)
1712    }
1713
1714    /// Drop rows containing one or more None values.
1715    ///
1716    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1717    /// columns are considered.
1718    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1719        let opt_state = self.get_opt_state();
1720        let lp = self.get_plan_builder().drop_nulls(subset).build();
1721        Self::from_logical_plan(lp, opt_state)
1722    }
1723
1724    /// Slice the DataFrame using an offset (starting row) and a length.
1725    ///
1726    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1727    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1728    /// end.
1729    ///
1730    /// If `offset` and `len` are such that the slice extends beyond the end of the
1731    /// DataFrame, the portion between `offset` and the end will be returned. In this
1732    /// case, the number of rows in the returned DataFrame will be less than `len`.
1733    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1734        let opt_state = self.get_opt_state();
1735        let lp = self.get_plan_builder().slice(offset, len).build();
1736        Self::from_logical_plan(lp, opt_state)
1737    }
1738
1739    /// Remove all the rows of the LazyFrame.
1740    pub fn clear(self) -> LazyFrame {
1741        self.slice(0, 0)
1742    }
1743
1744    /// Get the first row.
1745    ///
1746    /// Equivalent to `self.slice(0, 1)`.
1747    pub fn first(self) -> LazyFrame {
1748        self.slice(0, 1)
1749    }
1750
1751    /// Get the last row.
1752    ///
1753    /// Equivalent to `self.slice(-1, 1)`.
1754    pub fn last(self) -> LazyFrame {
1755        self.slice(-1, 1)
1756    }
1757
1758    /// Get the last `n` rows.
1759    ///
1760    /// Equivalent to `self.slice(-(n as i64), n)`.
1761    pub fn tail(self, n: IdxSize) -> LazyFrame {
1762        let neg_tail = -(n as i64);
1763        self.slice(neg_tail, n)
1764    }
1765
1766    #[cfg(feature = "pivot")]
1767    #[expect(clippy::too_many_arguments)]
1768    pub fn pivot(
1769        self,
1770        on: Selector,
1771        on_columns: Arc<DataFrame>,
1772        index: Selector,
1773        values: Selector,
1774        agg: Expr,
1775        maintain_order: bool,
1776        separator: PlSmallStr,
1777        column_naming: PivotColumnNaming,
1778    ) -> LazyFrame {
1779        let opt_state = self.get_opt_state();
1780        let lp = self
1781            .get_plan_builder()
1782            .pivot(
1783                on,
1784                on_columns,
1785                index,
1786                values,
1787                agg,
1788                maintain_order,
1789                separator,
1790                column_naming,
1791            )
1792            .build();
1793        Self::from_logical_plan(lp, opt_state)
1794    }
1795
1796    /// Unpivot the DataFrame from wide to long format.
1797    ///
1798    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1799    #[cfg(feature = "pivot")]
1800    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1801        let opt_state = self.get_opt_state();
1802        let lp = self.get_plan_builder().unpivot(args).build();
1803        Self::from_logical_plan(lp, opt_state)
1804    }
1805
1806    /// Limit the DataFrame to the first `n` rows.
1807    pub fn limit(self, n: IdxSize) -> LazyFrame {
1808        self.slice(0, n)
1809    }
1810
1811    /// Apply a function/closure once the logical plan get executed.
1812    ///
1813    /// The function has access to the whole materialized DataFrame at the time it is
1814    /// called.
1815    ///
1816    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1817    /// with `LazyFrame::with_column` or `with_columns`.
1818    ///
1819    /// ## Warning
1820    /// This can blow up in your face if the schema is changed due to the operation. The
1821    /// optimizer relies on a correct schema.
1822    ///
1823    /// You can toggle certain optimizations off.
1824    pub fn map<F>(
1825        self,
1826        function: F,
1827        optimizations: AllowedOptimizations,
1828        schema: Option<Arc<dyn UdfSchema>>,
1829        name: Option<&'static str>,
1830    ) -> LazyFrame
1831    where
1832        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1833    {
1834        let opt_state = self.get_opt_state();
1835        let lp = self
1836            .get_plan_builder()
1837            .map(
1838                function,
1839                optimizations,
1840                schema,
1841                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1842            )
1843            .build();
1844        Self::from_logical_plan(lp, opt_state)
1845    }
1846
1847    #[cfg(feature = "python")]
1848    pub fn map_python(
1849        self,
1850        function: polars_utils::python_function::PythonFunction,
1851        optimizations: AllowedOptimizations,
1852        schema: Option<SchemaRef>,
1853        validate_output: bool,
1854    ) -> LazyFrame {
1855        let opt_state = self.get_opt_state();
1856        let lp = self
1857            .get_plan_builder()
1858            .map_python(function, optimizations, schema, validate_output)
1859            .build();
1860        Self::from_logical_plan(lp, opt_state)
1861    }
1862
1863    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1864        let opt_state = self.get_opt_state();
1865        let lp = self.get_plan_builder().map_private(function).build();
1866        Self::from_logical_plan(lp, opt_state)
1867    }
1868
1869    /// Add a new column at index 0 that counts the rows.
1870    ///
1871    /// `name` is the name of the new column. `offset` is where to start counting from; if
1872    /// `None`, it is set to `0`.
1873    ///
1874    /// # Warning
1875    /// This can have a negative effect on query performance. This may for instance block
1876    /// predicate pushdown optimization.
1877    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1878    where
1879        S: Into<PlSmallStr>,
1880    {
1881        let name = name.into();
1882
1883        match &self.logical_plan {
1884            v @ DslPlan::Scan {
1885                scan_type,
1886                unified_scan_args,
1887                ..
1888            } if unified_scan_args.row_index.is_none()
1889                && !matches!(
1890                    &**scan_type,
1891                    FileScanDsl::Anonymous { .. } | FileScanDsl::ExpandedPaths { .. }
1892                ) =>
1893            {
1894                let DslPlan::Scan {
1895                    sources,
1896                    mut unified_scan_args,
1897                    scan_type,
1898                    cached_ir: _,
1899                } = v.clone()
1900                else {
1901                    unreachable!()
1902                };
1903
1904                unified_scan_args.row_index = Some(RowIndex {
1905                    name,
1906                    offset: offset.unwrap_or(0),
1907                });
1908
1909                DslPlan::Scan {
1910                    sources,
1911                    unified_scan_args,
1912                    scan_type,
1913                    cached_ir: Default::default(),
1914                }
1915                .into()
1916            },
1917            _ => self.map_private(DslFunction::RowIndex { name, offset }),
1918        }
1919    }
1920
1921    /// Return the number of non-null elements for each column.
1922    pub fn count(self) -> LazyFrame {
1923        self.select(vec![col(PlSmallStr::from_static("*")).count()])
1924    }
1925
1926    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
1927    /// inserted as columns.
1928    #[cfg(feature = "dtype-struct")]
1929    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1930        self.map_private(DslFunction::Unnest {
1931            columns: cols,
1932            separator,
1933        })
1934    }
1935
1936    #[cfg(feature = "merge_sorted")]
1937    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1938    where
1939        S: Into<PlSmallStr>,
1940    {
1941        let key = key.into();
1942
1943        let lp = DslPlan::MergeSorted {
1944            input_left: Arc::new(self.logical_plan),
1945            input_right: Arc::new(other.logical_plan),
1946            key,
1947        };
1948        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1949    }
1950
1951    pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1952        let lp = DslPlan::MapFunction {
1953            input: Arc::new(self.logical_plan),
1954            function: DslFunction::Hint(hint),
1955        };
1956        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1957    }
1958}
1959
1960/// Utility struct for lazy group_by operation.
1961#[derive(Clone)]
1962pub struct LazyGroupBy {
1963    pub logical_plan: DslPlan,
1964    opt_state: OptFlags,
1965    keys: Vec<Expr>,
1966    predicates: Vec<Expr>,
1967    maintain_order: bool,
1968    #[cfg(feature = "dynamic_group_by")]
1969    dynamic_options: Option<DynamicGroupOptions>,
1970    #[cfg(feature = "dynamic_group_by")]
1971    rolling_options: Option<RollingGroupOptions>,
1972}
1973
1974impl From<LazyGroupBy> for LazyFrame {
1975    fn from(lgb: LazyGroupBy) -> Self {
1976        Self {
1977            logical_plan: lgb.logical_plan,
1978            opt_state: lgb.opt_state,
1979            cached_arena: Default::default(),
1980        }
1981    }
1982}
1983
1984impl LazyGroupBy {
1985    /// Filter groups with a predicate after aggregation.
1986    ///
1987    /// Similarly to the [LazyGroupBy::agg] method, the predicate must run an aggregation as it
1988    /// is evaluated on the groups.
1989    /// This method can be chained in which case all predicates must evaluate to `true` for a
1990    /// group to be kept.
1991    ///
1992    /// # Example
1993    ///
1994    /// ```rust
1995    /// use polars_core::prelude::*;
1996    /// use polars_lazy::prelude::*;
1997    ///
1998    /// fn example(df: DataFrame) -> LazyFrame {
1999    ///       df.lazy()
2000    ///        .group_by_stable([col("date")])
2001    ///        .having(col("rain").sum().gt(lit(10)))
2002    ///        .agg([col("rain").min().alias("min_rain")])
2003    /// }
2004    /// ```
2005    pub fn having(mut self, predicate: Expr) -> Self {
2006        self.predicates.push(predicate);
2007        self
2008    }
2009
2010    /// Group by and aggregate.
2011    ///
2012    /// Select a column with [col] and choose an aggregation.
2013    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2014    ///
2015    /// # Example
2016    ///
2017    /// ```rust
2018    /// use polars_core::prelude::*;
2019    /// use polars_lazy::prelude::*;
2020    ///
2021    /// fn example(df: DataFrame) -> LazyFrame {
2022    ///       df.lazy()
2023    ///        .group_by_stable([col("date")])
2024    ///        .agg([
2025    ///            col("rain").min().alias("min_rain"),
2026    ///            col("rain").sum().alias("sum_rain"),
2027    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2028    ///        ])
2029    /// }
2030    /// ```
2031    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2032        #[cfg(feature = "dynamic_group_by")]
2033        let lp = DslBuilder::from(self.logical_plan)
2034            .group_by(
2035                self.keys,
2036                self.predicates,
2037                aggs,
2038                None,
2039                self.maintain_order,
2040                self.dynamic_options,
2041                self.rolling_options,
2042            )
2043            .build();
2044
2045        #[cfg(not(feature = "dynamic_group_by"))]
2046        let lp = DslBuilder::from(self.logical_plan)
2047            .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2048            .build();
2049        LazyFrame::from_logical_plan(lp, self.opt_state)
2050    }
2051
2052    /// Return first n rows of each group
2053    pub fn head(self, n: Option<usize>) -> LazyFrame {
2054        let keys = self
2055            .keys
2056            .iter()
2057            .filter_map(|expr| expr_output_name(expr).ok())
2058            .collect::<Vec<_>>();
2059
2060        self.agg([all().as_expr().head(n)]).explode_impl(
2061            all() - by_name(keys.iter().cloned(), false, false),
2062            ExplodeOptions {
2063                empty_as_null: true,
2064                keep_nulls: true,
2065            },
2066            true,
2067        )
2068    }
2069
2070    /// Return last n rows of each group
2071    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2072        let keys = self
2073            .keys
2074            .iter()
2075            .filter_map(|expr| expr_output_name(expr).ok())
2076            .collect::<Vec<_>>();
2077
2078        self.agg([all().as_expr().tail(n)]).explode_impl(
2079            all() - by_name(keys.iter().cloned(), false, false),
2080            ExplodeOptions {
2081                empty_as_null: true,
2082                keep_nulls: true,
2083            },
2084            true,
2085        )
2086    }
2087
2088    /// Apply a function over the groups as a new DataFrame.
2089    ///
2090    /// **It is not recommended that you use this as materializing the DataFrame is very
2091    /// expensive.**
2092    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2093        if !self.predicates.is_empty() {
2094            panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2095        }
2096
2097        #[cfg(feature = "dynamic_group_by")]
2098        let options = GroupbyOptions {
2099            dynamic: self.dynamic_options,
2100            rolling: self.rolling_options,
2101            slice: None,
2102        };
2103
2104        #[cfg(not(feature = "dynamic_group_by"))]
2105        let options = GroupbyOptions { slice: None };
2106
2107        let lp = DslPlan::GroupBy {
2108            input: Arc::new(self.logical_plan),
2109            keys: self.keys,
2110            predicates: vec![],
2111            aggs: vec![],
2112            apply: Some((f, schema)),
2113            maintain_order: self.maintain_order,
2114            options: Arc::new(options),
2115        };
2116        LazyFrame::from_logical_plan(lp, self.opt_state)
2117    }
2118}
2119
2120#[must_use]
2121pub struct JoinBuilder {
2122    lf: LazyFrame,
2123    how: JoinType,
2124    other: Option<LazyFrame>,
2125    left_on: Vec<Expr>,
2126    right_on: Vec<Expr>,
2127    allow_parallel: bool,
2128    force_parallel: bool,
2129    suffix: Option<PlSmallStr>,
2130    validation: JoinValidation,
2131    nulls_equal: bool,
2132    coalesce: JoinCoalesce,
2133    maintain_order: MaintainOrderJoin,
2134    build_side: Option<JoinBuildSide>,
2135}
2136impl JoinBuilder {
2137    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2138    pub fn new(lf: LazyFrame) -> Self {
2139        Self {
2140            lf,
2141            other: None,
2142            how: JoinType::Inner,
2143            left_on: vec![],
2144            right_on: vec![],
2145            allow_parallel: true,
2146            force_parallel: false,
2147            suffix: None,
2148            validation: Default::default(),
2149            nulls_equal: false,
2150            coalesce: Default::default(),
2151            maintain_order: Default::default(),
2152            build_side: None,
2153        }
2154    }
2155
2156    /// The right table in the join.
2157    pub fn with(mut self, other: LazyFrame) -> Self {
2158        self.other = Some(other);
2159        self
2160    }
2161
2162    /// Select the join type.
2163    pub fn how(mut self, how: JoinType) -> Self {
2164        self.how = how;
2165        self
2166    }
2167
2168    pub fn validate(mut self, validation: JoinValidation) -> Self {
2169        self.validation = validation;
2170        self
2171    }
2172
2173    /// The expressions you want to join both tables on.
2174    ///
2175    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2176    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2177        let on = on.as_ref().to_vec();
2178        self.left_on.clone_from(&on);
2179        self.right_on = on;
2180        self
2181    }
2182
2183    /// The expressions you want to join the left table on.
2184    ///
2185    /// The passed expressions must be valid in the left table.
2186    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2187        self.left_on = on.as_ref().to_vec();
2188        self
2189    }
2190
2191    /// The expressions you want to join the right table on.
2192    ///
2193    /// The passed expressions must be valid in the right table.
2194    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2195        self.right_on = on.as_ref().to_vec();
2196        self
2197    }
2198
2199    /// Allow parallel table evaluation.
2200    pub fn allow_parallel(mut self, allow: bool) -> Self {
2201        self.allow_parallel = allow;
2202        self
2203    }
2204
2205    /// Force parallel table evaluation.
2206    pub fn force_parallel(mut self, force: bool) -> Self {
2207        self.force_parallel = force;
2208        self
2209    }
2210
2211    /// Join on null values. By default null values will never produce matches.
2212    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2213        self.nulls_equal = nulls_equal;
2214        self
2215    }
2216
2217    /// Suffix to add duplicate column names in join.
2218    /// Defaults to `"_right"` if this method is never called.
2219    pub fn suffix<S>(mut self, suffix: S) -> Self
2220    where
2221        S: Into<PlSmallStr>,
2222    {
2223        self.suffix = Some(suffix.into());
2224        self
2225    }
2226
2227    /// Whether to coalesce join columns.
2228    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2229        self.coalesce = coalesce;
2230        self
2231    }
2232
2233    /// Whether to preserve the row order.
2234    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2235        self.maintain_order = maintain_order;
2236        self
2237    }
2238
2239    /// Whether to prefer a specific build side.
2240    pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2241        self.build_side = build_side;
2242        self
2243    }
2244
2245    /// Finish builder
2246    pub fn finish(self) -> LazyFrame {
2247        let opt_state = self.lf.opt_state;
2248        let other = self.other.expect("'with' not set in join builder");
2249
2250        let args = JoinArgs {
2251            how: self.how,
2252            validation: self.validation,
2253            suffix: self.suffix,
2254            slice: None,
2255            nulls_equal: self.nulls_equal,
2256            coalesce: self.coalesce,
2257            maintain_order: self.maintain_order,
2258            build_side: self.build_side,
2259        };
2260
2261        let lp = self
2262            .lf
2263            .get_plan_builder()
2264            .join(
2265                other.logical_plan,
2266                self.left_on,
2267                self.right_on,
2268                JoinOptions {
2269                    allow_parallel: self.allow_parallel,
2270                    force_parallel: self.force_parallel,
2271                    args,
2272                }
2273                .into(),
2274            )
2275            .build();
2276        LazyFrame::from_logical_plan(lp, opt_state)
2277    }
2278
2279    // Finish with join predicates
2280    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2281        let opt_state = self.lf.opt_state;
2282        let other = self.other.expect("with not set");
2283
2284        // Decompose `And` conjunctions into their component expressions
2285        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2286            if let Expr::BinaryExpr {
2287                op: Operator::And,
2288                left,
2289                right,
2290            } = predicate
2291            {
2292                decompose_and((*left).clone(), expanded_predicates);
2293                decompose_and((*right).clone(), expanded_predicates);
2294            } else {
2295                expanded_predicates.push(predicate);
2296            }
2297        }
2298        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2299        for predicate in predicates {
2300            decompose_and(predicate, &mut expanded_predicates);
2301        }
2302        let predicates: Vec<Expr> = expanded_predicates;
2303
2304        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2305        #[cfg(feature = "is_between")]
2306        let predicates: Vec<Expr> = {
2307            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2308            for predicate in predicates {
2309                if let Expr::Function {
2310                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2311                    input,
2312                    ..
2313                } = &predicate
2314                {
2315                    if let [expr, lower, upper] = input.as_slice() {
2316                        match closed {
2317                            ClosedInterval::Both => {
2318                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2319                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2320                            },
2321                            ClosedInterval::Right => {
2322                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2323                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2324                            },
2325                            ClosedInterval::Left => {
2326                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2327                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2328                            },
2329                            ClosedInterval::None => {
2330                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2331                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2332                            },
2333                        }
2334                        continue;
2335                    }
2336                }
2337                expanded_predicates.push(predicate);
2338            }
2339            expanded_predicates
2340        };
2341
2342        let args = JoinArgs {
2343            how: self.how,
2344            validation: self.validation,
2345            suffix: self.suffix,
2346            slice: None,
2347            nulls_equal: self.nulls_equal,
2348            coalesce: self.coalesce,
2349            maintain_order: self.maintain_order,
2350            build_side: self.build_side,
2351        };
2352        let options = JoinOptions {
2353            allow_parallel: self.allow_parallel,
2354            force_parallel: self.force_parallel,
2355            args,
2356        };
2357
2358        let lp = DslPlan::Join {
2359            input_left: Arc::new(self.lf.logical_plan),
2360            input_right: Arc::new(other.logical_plan),
2361            left_on: Default::default(),
2362            right_on: Default::default(),
2363            predicates,
2364            options: Arc::from(options),
2365        };
2366
2367        LazyFrame::from_logical_plan(lp, opt_state)
2368    }
2369}
2370
2371pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2372    #[cfg(not(feature = "new_streaming"))]
2373    {
2374        None
2375    }
2376    #[cfg(feature = "new_streaming")]
2377    {
2378        Some(polars_stream::build_streaming_query_executor)
2379    }
2380};
2381
2382pub struct CollectBatches {
2383    recv: Receiver<PolarsResult<DataFrame>>,
2384    runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2385}
2386
2387impl CollectBatches {
2388    /// Start running the query, if not already.
2389    pub fn start(&mut self) {
2390        if let Some(runner) = self.runner.take() {
2391            runner()
2392        }
2393    }
2394}
2395
2396impl Iterator for CollectBatches {
2397    type Item = PolarsResult<DataFrame>;
2398
2399    fn next(&mut self) -> Option<Self::Item> {
2400        self.start();
2401        self.recv.recv().ok()
2402    }
2403}