polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::{Arc, Mutex};
12
13pub use anonymous_scan::*;
14#[cfg(feature = "csv")]
15pub use csv::*;
16#[cfg(not(target_arch = "wasm32"))]
17pub use exitable::*;
18pub use file_list_reader::*;
19#[cfg(feature = "json")]
20pub use ndjson::*;
21#[cfg(feature = "parquet")]
22pub use parquet::*;
23use polars_compute::rolling::QuantileMethod;
24use polars_core::POOL;
25use polars_core::error::feature_gated;
26use polars_core::prelude::*;
27use polars_io::RowIndex;
28use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
29use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
30use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
31#[cfg(feature = "is_between")]
32use polars_ops::prelude::ClosedInterval;
33pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
34use polars_utils::pl_str::PlSmallStr;
35use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
36
37use crate::frame::cached_arenas::CachedArena;
38use crate::prelude::*;
39
40pub trait IntoLazy {
41    fn lazy(self) -> LazyFrame;
42}
43
44impl IntoLazy for DataFrame {
45    /// Convert the `DataFrame` into a `LazyFrame`
46    fn lazy(self) -> LazyFrame {
47        let lp = DslBuilder::from_existing_df(self).build();
48        LazyFrame {
49            logical_plan: lp,
50            opt_state: Default::default(),
51            cached_arena: Default::default(),
52        }
53    }
54}
55
56impl IntoLazy for LazyFrame {
57    fn lazy(self) -> LazyFrame {
58        self
59    }
60}
61
62/// Lazy abstraction over an eager `DataFrame`.
63///
64/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
65/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
66#[derive(Clone, Default)]
67#[must_use]
68pub struct LazyFrame {
69    pub logical_plan: DslPlan,
70    pub(crate) opt_state: OptFlags,
71    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
72}
73
74impl From<DslPlan> for LazyFrame {
75    fn from(plan: DslPlan) -> Self {
76        Self {
77            logical_plan: plan,
78            opt_state: OptFlags::default(),
79            cached_arena: Default::default(),
80        }
81    }
82}
83
84impl LazyFrame {
85    pub(crate) fn from_inner(
86        logical_plan: DslPlan,
87        opt_state: OptFlags,
88        cached_arena: Arc<Mutex<Option<CachedArena>>>,
89    ) -> Self {
90        Self {
91            logical_plan,
92            opt_state,
93            cached_arena,
94        }
95    }
96
97    pub(crate) fn get_plan_builder(self) -> DslBuilder {
98        DslBuilder::from(self.logical_plan)
99    }
100
101    fn get_opt_state(&self) -> OptFlags {
102        self.opt_state
103    }
104
105    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
106        LazyFrame {
107            logical_plan,
108            opt_state,
109            cached_arena: Default::default(),
110        }
111    }
112
113    /// Get current optimizations.
114    pub fn get_current_optimizations(&self) -> OptFlags {
115        self.opt_state
116    }
117
118    /// Set allowed optimizations.
119    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
120        self.opt_state = opt_state;
121        self
122    }
123
124    /// Turn off all optimizations.
125    pub fn without_optimizations(self) -> Self {
126        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
127    }
128
129    /// Toggle projection pushdown optimization.
130    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
131        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
132        self
133    }
134
135    /// Toggle cluster with columns optimization.
136    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
137        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
138        self
139    }
140
141    /// Check if operations are order dependent and unset maintaining_order if
142    /// the order would not be observed.
143    pub fn with_check_order(mut self, toggle: bool) -> Self {
144        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
145        self
146    }
147
148    /// Toggle predicate pushdown optimization.
149    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
150        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
151        self
152    }
153
154    /// Toggle type coercion optimization.
155    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
156        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
157        self
158    }
159
160    /// Toggle type check optimization.
161    pub fn with_type_check(mut self, toggle: bool) -> Self {
162        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
163        self
164    }
165
166    /// Toggle expression simplification optimization on or off.
167    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
168        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
169        self
170    }
171
172    /// Toggle common subplan elimination optimization on or off
173    #[cfg(feature = "cse")]
174    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
175        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
176        self
177    }
178
179    /// Toggle common subexpression elimination optimization on or off
180    #[cfg(feature = "cse")]
181    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
182        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
183        self
184    }
185
186    /// Toggle slice pushdown optimization.
187    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
188        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
189        self
190    }
191
192    #[cfg(feature = "new_streaming")]
193    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
194        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
195        self
196    }
197
198    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
199    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
200        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
201        self
202    }
203
204    /// Run every node eagerly. This turns off multi-node optimizations.
205    pub fn _with_eager(mut self, toggle: bool) -> Self {
206        self.opt_state.set(OptFlags::EAGER, toggle);
207        self
208    }
209
210    /// Return a String describing the naive (un-optimized) logical plan.
211    pub fn describe_plan(&self) -> PolarsResult<String> {
212        Ok(self.clone().to_alp()?.describe())
213    }
214
215    /// Return a String describing the naive (un-optimized) logical plan in tree format.
216    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
217        Ok(self.clone().to_alp()?.describe_tree_format())
218    }
219
220    /// Return a String describing the optimized logical plan.
221    ///
222    /// Returns `Err` if optimizing the logical plan fails.
223    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
224        Ok(self.clone().to_alp_optimized()?.describe())
225    }
226
227    /// Return a String describing the optimized logical plan in tree format.
228    ///
229    /// Returns `Err` if optimizing the logical plan fails.
230    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
231        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
232    }
233
234    /// Return a String describing the logical plan.
235    ///
236    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
237    /// explains the naive, un-optimized plan.
238    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
239        if optimized {
240            self.describe_optimized_plan()
241        } else {
242            self.describe_plan()
243        }
244    }
245
246    /// Add a sort operation to the logical plan.
247    ///
248    /// Sorts the LazyFrame by the column name specified using the provided options.
249    ///
250    /// # Example
251    ///
252    /// Sort DataFrame by 'sepal_width' column:
253    /// ```rust
254    /// # use polars_core::prelude::*;
255    /// # use polars_lazy::prelude::*;
256    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
257    ///     df.lazy().sort(["sepal_width"], Default::default())
258    /// }
259    /// ```
260    /// Sort by a single column with specific order:
261    /// ```
262    /// # use polars_core::prelude::*;
263    /// # use polars_lazy::prelude::*;
264    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
265    ///     df.lazy().sort(
266    ///         ["sepal_width"],
267    ///         SortMultipleOptions::new()
268    ///             .with_order_descending(descending)
269    ///     )
270    /// }
271    /// ```
272    /// Sort by multiple columns with specifying order for each column:
273    /// ```
274    /// # use polars_core::prelude::*;
275    /// # use polars_lazy::prelude::*;
276    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
277    ///     df.lazy().sort(
278    ///         ["sepal_width", "sepal_length"],
279    ///         SortMultipleOptions::new()
280    ///             .with_order_descending_multi([false, true])
281    ///     )
282    /// }
283    /// ```
284    /// See [`SortMultipleOptions`] for more options.
285    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
286        let opt_state = self.get_opt_state();
287        let lp = self
288            .get_plan_builder()
289            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
290            .build();
291        Self::from_logical_plan(lp, opt_state)
292    }
293
294    /// Add a sort operation to the logical plan.
295    ///
296    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
297    /// concrete columns before sorting.
298    ///
299    /// See [`SortMultipleOptions`] for more options.
300    ///
301    /// # Example
302    ///
303    /// ```rust
304    /// use polars_core::prelude::*;
305    /// use polars_lazy::prelude::*;
306    ///
307    /// /// Sort DataFrame by 'sepal_width' column
308    /// fn example(df: DataFrame) -> LazyFrame {
309    ///       df.lazy()
310    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
311    /// }
312    /// ```
313    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
314        self,
315        by_exprs: E,
316        sort_options: SortMultipleOptions,
317    ) -> Self {
318        let by_exprs = by_exprs.as_ref().to_vec();
319        if by_exprs.is_empty() {
320            self
321        } else {
322            let opt_state = self.get_opt_state();
323            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
324            Self::from_logical_plan(lp, opt_state)
325        }
326    }
327
328    pub fn top_k<E: AsRef<[Expr]>>(
329        self,
330        k: IdxSize,
331        by_exprs: E,
332        sort_options: SortMultipleOptions,
333    ) -> Self {
334        // this will optimize to top-k
335        self.sort_by_exprs(
336            by_exprs,
337            sort_options.with_order_reversed().with_nulls_last(true),
338        )
339        .slice(0, k)
340    }
341
342    pub fn bottom_k<E: AsRef<[Expr]>>(
343        self,
344        k: IdxSize,
345        by_exprs: E,
346        sort_options: SortMultipleOptions,
347    ) -> Self {
348        // this will optimize to bottom-k
349        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
350            .slice(0, k)
351    }
352
353    /// Reverse the `DataFrame` from top to bottom.
354    ///
355    /// Row `i` becomes row `number_of_rows - i - 1`.
356    ///
357    /// # Example
358    ///
359    /// ```rust
360    /// use polars_core::prelude::*;
361    /// use polars_lazy::prelude::*;
362    ///
363    /// fn example(df: DataFrame) -> LazyFrame {
364    ///       df.lazy()
365    ///         .reverse()
366    /// }
367    /// ```
368    pub fn reverse(self) -> Self {
369        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
370    }
371
372    /// Rename columns in the DataFrame.
373    ///
374    /// `existing` and `new` are iterables of the same length containing the old and
375    /// corresponding new column names. Renaming happens to all `existing` columns
376    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
377    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
378    /// those columns that are actually found will be renamed (others will be ignored).
379    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
380    where
381        I: IntoIterator<Item = T>,
382        J: IntoIterator<Item = S>,
383        T: AsRef<str>,
384        S: AsRef<str>,
385    {
386        let iter = existing.into_iter();
387        let cap = iter.size_hint().0;
388        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
389        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390
391        // TODO! should this error if `existing` and `new` have different lengths?
392        // Currently, the longer of the two is truncated.
393        for (existing, new) in iter.zip(new) {
394            let existing = existing.as_ref();
395            let new = new.as_ref();
396            if new != existing {
397                existing_vec.push(existing.into());
398                new_vec.push(new.into());
399            }
400        }
401
402        self.map_private(DslFunction::Rename {
403            existing: existing_vec.into(),
404            new: new_vec.into(),
405            strict,
406        })
407    }
408
409    /// Removes columns from the DataFrame.
410    /// Note that it's better to only select the columns you need
411    /// and let the projection pushdown optimize away the unneeded columns.
412    ///
413    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
414    /// error while materializing the [`LazyFrame`].
415    pub fn drop(self, columns: Selector) -> Self {
416        let opt_state = self.get_opt_state();
417        let lp = self.get_plan_builder().drop(columns).build();
418        Self::from_logical_plan(lp, opt_state)
419    }
420
421    /// Shift the values by a given period and fill the parts that will be empty due to this operation
422    /// with `Nones`.
423    ///
424    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
425    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
426        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
427    }
428
429    /// Shift the values by a given period and fill the parts that will be empty due to this operation
430    /// with the result of the `fill_value` expression.
431    ///
432    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
433    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
434        self.select(vec![
435            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
436        ])
437    }
438
439    /// Fill None values in the DataFrame with an expression.
440    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
441        let opt_state = self.get_opt_state();
442        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
443        Self::from_logical_plan(lp, opt_state)
444    }
445
446    /// Fill NaN values in the DataFrame with an expression.
447    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448        let opt_state = self.get_opt_state();
449        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
450        Self::from_logical_plan(lp, opt_state)
451    }
452
453    /// Caches the result into a new LazyFrame.
454    ///
455    /// This should be used to prevent computations running multiple times.
456    pub fn cache(self) -> Self {
457        let opt_state = self.get_opt_state();
458        let lp = self.get_plan_builder().cache().build();
459        Self::from_logical_plan(lp, opt_state)
460    }
461
462    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
463    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
464        let cast_cols: Vec<Expr> = dtypes
465            .into_iter()
466            .map(|(name, dt)| {
467                let name = PlSmallStr::from_str(name);
468
469                if strict {
470                    col(name).strict_cast(dt)
471                } else {
472                    col(name).cast(dt)
473                }
474            })
475            .collect();
476
477        if cast_cols.is_empty() {
478            self
479        } else {
480            self.with_columns(cast_cols)
481        }
482    }
483
484    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
485    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
486        self.with_columns(vec![if strict {
487            col(PlSmallStr::from_static("*")).strict_cast(dtype)
488        } else {
489            col(PlSmallStr::from_static("*")).cast(dtype)
490        }])
491    }
492
493    pub fn optimize(
494        self,
495        lp_arena: &mut Arena<IR>,
496        expr_arena: &mut Arena<AExpr>,
497    ) -> PolarsResult<Node> {
498        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
499    }
500
501    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
502        let (mut lp_arena, mut expr_arena) = self.get_arenas();
503        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
504
505        Ok(IRPlan::new(node, lp_arena, expr_arena))
506    }
507
508    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
509        let (mut lp_arena, mut expr_arena) = self.get_arenas();
510        let node = to_alp(
511            self.logical_plan,
512            &mut expr_arena,
513            &mut lp_arena,
514            &mut self.opt_state,
515        )?;
516        let plan = IRPlan::new(node, lp_arena, expr_arena);
517        Ok(plan)
518    }
519
520    pub(crate) fn optimize_with_scratch(
521        self,
522        lp_arena: &mut Arena<IR>,
523        expr_arena: &mut Arena<AExpr>,
524        scratch: &mut Vec<Node>,
525    ) -> PolarsResult<Node> {
526        #[allow(unused_mut)]
527        let mut opt_state = self.opt_state;
528        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
529
530        #[cfg(feature = "cse")]
531        if new_streaming {
532            // The new streaming engine can't deal with the way the common
533            // subexpression elimination adds length-incorrect with_columns.
534            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
535        }
536
537        let lp_top = optimize(
538            self.logical_plan,
539            opt_state,
540            lp_arena,
541            expr_arena,
542            scratch,
543            apply_scan_predicate_to_scan_ir,
544        )?;
545
546        Ok(lp_top)
547    }
548
549    fn prepare_collect_post_opt<P>(
550        mut self,
551        check_sink: bool,
552        query_start: Option<std::time::Instant>,
553        post_opt: P,
554    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
555    where
556        P: FnOnce(
557            Node,
558            &mut Arena<IR>,
559            &mut Arena<AExpr>,
560            Option<std::time::Duration>,
561        ) -> PolarsResult<()>,
562    {
563        let (mut lp_arena, mut expr_arena) = self.get_arenas();
564
565        let mut scratch = vec![];
566        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
567
568        post_opt(
569            lp_top,
570            &mut lp_arena,
571            &mut expr_arena,
572            // Post optimization callback gets the time since the
573            // query was started as its "base" timepoint.
574            query_start.map(|s| s.elapsed()),
575        )?;
576
577        // sink should be replaced
578        let no_file_sink = if check_sink {
579            !matches!(
580                lp_arena.get(lp_top),
581                IR::Sink {
582                    payload: SinkTypeIR::File { .. },
583                    ..
584                }
585            )
586        } else {
587            true
588        };
589        let physical_plan = create_physical_plan(
590            lp_top,
591            &mut lp_arena,
592            &mut expr_arena,
593            BUILD_STREAMING_EXECUTOR,
594        )?;
595
596        let state = ExecutionState::new();
597        Ok((state, physical_plan, no_file_sink))
598    }
599
600    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
601    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
602    where
603        P: FnOnce(
604            Node,
605            &mut Arena<IR>,
606            &mut Arena<AExpr>,
607            Option<std::time::Duration>,
608        ) -> PolarsResult<()>,
609    {
610        let (mut state, mut physical_plan, _) =
611            self.prepare_collect_post_opt(false, None, post_opt)?;
612        physical_plan.execute(&mut state)
613    }
614
615    #[allow(unused_mut)]
616    fn prepare_collect(
617        self,
618        check_sink: bool,
619        query_start: Option<std::time::Instant>,
620    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
621        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
622    }
623
624    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
625    /// `engine`.
626    ///
627    /// The query is optimized prior to execution.
628    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
629        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
630            payload.clone()
631        } else {
632            self.logical_plan = DslPlan::Sink {
633                input: Arc::new(self.logical_plan),
634                payload: SinkType::Memory,
635            };
636            SinkType::Memory
637        };
638
639        // Default engine for collect is InMemory, sink_* is Streaming
640        if engine == Engine::Auto {
641            engine = match payload {
642                #[cfg(feature = "new_streaming")]
643                SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
644                _ => Engine::InMemory,
645            };
646        }
647        // Gpu uses some hacks to dispatch.
648        if engine == Engine::Gpu {
649            engine = Engine::InMemory;
650        }
651
652        #[cfg(feature = "new_streaming")]
653        {
654            if let Some(result) = self.try_new_streaming_if_requested() {
655                return result.map(|v| v.unwrap_single());
656            }
657        }
658
659        match engine {
660            Engine::Auto => unreachable!(),
661            Engine::Streaming => {
662                feature_gated!("new_streaming", self = self.with_new_streaming(true))
663            },
664            _ => {},
665        }
666        let mut alp_plan = self.clone().to_alp_optimized()?;
667
668        match engine {
669            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
670                let result = polars_stream::run_query(
671                    alp_plan.lp_top,
672                    &mut alp_plan.lp_arena,
673                    &mut alp_plan.expr_arena,
674                );
675                result.map(|v| v.unwrap_single())
676            }),
677            Engine::Gpu => {
678                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
679            },
680            Engine::InMemory => {
681                let mut physical_plan = create_physical_plan(
682                    alp_plan.lp_top,
683                    &mut alp_plan.lp_arena,
684                    &mut alp_plan.expr_arena,
685                    BUILD_STREAMING_EXECUTOR,
686                )?;
687                let mut state = ExecutionState::new();
688                physical_plan.execute(&mut state)
689            },
690        }
691    }
692
693    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
694        let sink_multiple = LazyFrame {
695            logical_plan: DslPlan::SinkMultiple { inputs: plans },
696            opt_state,
697            cached_arena: Default::default(),
698        };
699        sink_multiple.explain(true)
700    }
701
702    pub fn collect_all_with_engine(
703        plans: Vec<DslPlan>,
704        mut engine: Engine,
705        opt_state: OptFlags,
706    ) -> PolarsResult<Vec<DataFrame>> {
707        if plans.is_empty() {
708            return Ok(Vec::new());
709        }
710
711        // Default engine for collect_all is InMemory
712        if engine == Engine::Auto {
713            engine = Engine::InMemory;
714        }
715        // Gpu uses some hacks to dispatch.
716        if engine == Engine::Gpu {
717            engine = Engine::InMemory;
718        }
719
720        let mut sink_multiple = LazyFrame {
721            logical_plan: DslPlan::SinkMultiple { inputs: plans },
722            opt_state,
723            cached_arena: Default::default(),
724        };
725
726        #[cfg(feature = "new_streaming")]
727        {
728            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
729                return result.map(|v| v.unwrap_multiple());
730            }
731        }
732
733        match engine {
734            Engine::Auto => unreachable!(),
735            Engine::Streaming => {
736                feature_gated!(
737                    "new_streaming",
738                    sink_multiple = sink_multiple.with_new_streaming(true)
739                )
740            },
741            _ => {},
742        }
743        let mut alp_plan = sink_multiple.to_alp_optimized()?;
744
745        if engine == Engine::Streaming {
746            feature_gated!("new_streaming", {
747                let result = polars_stream::run_query(
748                    alp_plan.lp_top,
749                    &mut alp_plan.lp_arena,
750                    &mut alp_plan.expr_arena,
751                );
752                return result.map(|v| v.unwrap_multiple());
753            });
754        }
755
756        let IR::SinkMultiple { inputs } = alp_plan.root() else {
757            unreachable!()
758        };
759
760        let mut multiplan = create_multiple_physical_plans(
761            inputs.clone().as_slice(),
762            &mut alp_plan.lp_arena,
763            &mut alp_plan.expr_arena,
764            BUILD_STREAMING_EXECUTOR,
765        )?;
766
767        match engine {
768            Engine::Gpu => polars_bail!(
769                InvalidOperation: "collect_all is not supported for the gpu engine"
770            ),
771            Engine::InMemory => {
772                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
773                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
774                // within bounds
775                let mut state = ExecutionState::new();
776                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
777                    cache_prefiller.execute(&mut state)?;
778                }
779                let out = POOL.install(|| {
780                    multiplan
781                        .physical_plans
782                        .chunks_mut(POOL.current_num_threads() * 3)
783                        .map(|chunk| {
784                            chunk
785                                .into_par_iter()
786                                .enumerate()
787                                .map(|(idx, input)| {
788                                    let mut input = std::mem::take(input);
789                                    let mut state = state.split();
790                                    state.branch_idx += idx;
791
792                                    let df = input.execute(&mut state)?;
793                                    Ok(df)
794                                })
795                                .collect::<PolarsResult<Vec<_>>>()
796                        })
797                        .collect::<PolarsResult<Vec<_>>>()
798                });
799                Ok(out?.into_iter().flatten().collect())
800            },
801            _ => unreachable!(),
802        }
803    }
804
805    /// Execute all the lazy operations and collect them into a [`DataFrame`].
806    ///
807    /// The query is optimized prior to execution.
808    ///
809    /// # Example
810    ///
811    /// ```rust
812    /// use polars_core::prelude::*;
813    /// use polars_lazy::prelude::*;
814    ///
815    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
816    ///     df.lazy()
817    ///       .group_by([col("foo")])
818    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
819    ///       .collect()
820    /// }
821    /// ```
822    pub fn collect(self) -> PolarsResult<DataFrame> {
823        self.collect_with_engine(Engine::InMemory)
824    }
825
826    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
827    // This version does profiling of the node execution.
828    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
829    where
830        P: FnOnce(
831            Node,
832            &mut Arena<IR>,
833            &mut Arena<AExpr>,
834            Option<std::time::Duration>,
835        ) -> PolarsResult<()>,
836    {
837        let query_start = std::time::Instant::now();
838        let (mut state, mut physical_plan, _) =
839            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
840        state.time_nodes(query_start);
841        let out = physical_plan.execute(&mut state)?;
842        let timer_df = state.finish_timer()?;
843        Ok((out, timer_df))
844    }
845
846    /// Profile a LazyFrame.
847    ///
848    /// This will run the query and return a tuple
849    /// containing the materialized DataFrame and a DataFrame that contains profiling information
850    /// of each node that is executed.
851    ///
852    /// The units of the timings are microseconds.
853    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
854        self._profile_post_opt(|_, _, _, _| Ok(()))
855    }
856
857    pub fn sink_batches(
858        mut self,
859        function: PlanCallback<DataFrame, bool>,
860        maintain_order: bool,
861        chunk_size: Option<NonZeroUsize>,
862    ) -> PolarsResult<Self> {
863        use polars_plan::prelude::sink::CallbackSinkType;
864
865        polars_ensure!(
866            !matches!(self.logical_plan, DslPlan::Sink { .. }),
867            InvalidOperation: "cannot create a sink on top of another sink"
868        );
869
870        self.logical_plan = DslPlan::Sink {
871            input: Arc::new(self.logical_plan),
872            payload: SinkType::Callback(CallbackSinkType {
873                function,
874                maintain_order,
875                chunk_size,
876            }),
877        };
878
879        Ok(self)
880    }
881
882    #[cfg(feature = "new_streaming")]
883    pub fn try_new_streaming_if_requested(
884        &mut self,
885    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
886        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
887        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
888
889        if auto_new_streaming || force_new_streaming {
890            // Try to run using the new streaming engine, falling back
891            // if it fails in a todo!() error if auto_new_streaming is set.
892            let mut new_stream_lazy = self.clone();
893            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
894            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
895                Ok(v) => v,
896                Err(e) => return Some(Err(e)),
897            };
898
899            let f = || {
900                polars_stream::run_query(
901                    alp_plan.lp_top,
902                    &mut alp_plan.lp_arena,
903                    &mut alp_plan.expr_arena,
904                )
905            };
906
907            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
908                Ok(v) => return Some(v),
909                Err(e) => {
910                    // Fallback to normal engine if error is due to not being implemented
911                    // and auto_new_streaming is set, otherwise propagate error.
912                    if !force_new_streaming
913                        && auto_new_streaming
914                        && e.downcast_ref::<&str>()
915                            .map(|s| s.starts_with("not yet implemented"))
916                            .unwrap_or(false)
917                    {
918                        if polars_core::config::verbose() {
919                            eprintln!(
920                                "caught unimplemented error in new streaming engine, falling back to normal engine"
921                            );
922                        }
923                    } else {
924                        std::panic::resume_unwind(e);
925                    }
926                },
927            }
928        }
929
930        None
931    }
932
933    pub fn sink(
934        mut self,
935        sink_type: SinkDestination,
936        file_format: impl Into<Arc<FileType>>,
937        unified_sink_args: UnifiedSinkArgs,
938    ) -> PolarsResult<Self> {
939        polars_ensure!(
940            !matches!(self.logical_plan, DslPlan::Sink { .. }),
941            InvalidOperation: "cannot create a sink on top of another sink"
942        );
943
944        self.logical_plan = DslPlan::Sink {
945            input: Arc::new(self.logical_plan),
946            payload: match sink_type {
947                SinkDestination::File { target } => SinkType::File(FileSinkOptions {
948                    target,
949                    file_format: file_format.into(),
950                    unified_sink_args,
951                }),
952                SinkDestination::Partitioned {
953                    base_path,
954                    file_path_provider,
955                    partition_strategy,
956                    finish_callback,
957                    max_rows_per_file,
958                    approximate_bytes_per_file,
959                } => SinkType::Partitioned(PartitionedSinkOptions {
960                    base_path,
961                    file_path_provider,
962                    partition_strategy,
963                    finish_callback,
964                    file_format: file_format.into(),
965                    unified_sink_args,
966                    max_rows_per_file,
967                    approximate_bytes_per_file,
968                }),
969            },
970        };
971        Ok(self)
972    }
973
974    /// Filter frame rows that match a predicate expression.
975    ///
976    /// The expression must yield boolean values (note that rows where the
977    /// predicate resolves to `null` are *not* included in the resulting frame).
978    ///
979    /// # Example
980    ///
981    /// ```rust
982    /// use polars_core::prelude::*;
983    /// use polars_lazy::prelude::*;
984    ///
985    /// fn example(df: DataFrame) -> LazyFrame {
986    ///       df.lazy()
987    ///         .filter(col("sepal_width").is_not_null())
988    ///         .select([col("sepal_width"), col("sepal_length")])
989    /// }
990    /// ```
991    pub fn filter(self, predicate: Expr) -> Self {
992        let opt_state = self.get_opt_state();
993        let lp = self.get_plan_builder().filter(predicate).build();
994        Self::from_logical_plan(lp, opt_state)
995    }
996
997    /// Remove frame rows that match a predicate expression.
998    ///
999    /// The expression must yield boolean values (note that rows where the
1000    /// predicate resolves to `null` are *not* removed from the resulting frame).
1001    ///
1002    /// # Example
1003    ///
1004    /// ```rust
1005    /// use polars_core::prelude::*;
1006    /// use polars_lazy::prelude::*;
1007    ///
1008    /// fn example(df: DataFrame) -> LazyFrame {
1009    ///       df.lazy()
1010    ///         .remove(col("sepal_width").is_null())
1011    ///         .select([col("sepal_width"), col("sepal_length")])
1012    /// }
1013    /// ```
1014    pub fn remove(self, predicate: Expr) -> Self {
1015        self.filter(predicate.neq_missing(lit(true)))
1016    }
1017
1018    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1019    ///
1020    /// Columns can be selected with [`col`];
1021    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1022    ///
1023    /// # Example
1024    ///
1025    /// ```rust
1026    /// use polars_core::prelude::*;
1027    /// use polars_lazy::prelude::*;
1028    ///
1029    /// /// This function selects column "foo" and column "bar".
1030    /// /// Column "bar" is renamed to "ham".
1031    /// fn example(df: DataFrame) -> LazyFrame {
1032    ///       df.lazy()
1033    ///         .select([col("foo"),
1034    ///                   col("bar").alias("ham")])
1035    /// }
1036    ///
1037    /// /// This function selects all columns except "foo"
1038    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1039    ///       df.lazy()
1040    ///         .select([all().exclude_cols(["foo"]).as_expr()])
1041    /// }
1042    /// ```
1043    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1044        let exprs = exprs.as_ref().to_vec();
1045        self.select_impl(
1046            exprs,
1047            ProjectionOptions {
1048                run_parallel: true,
1049                duplicate_check: true,
1050                should_broadcast: true,
1051            },
1052        )
1053    }
1054
1055    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1056        let exprs = exprs.as_ref().to_vec();
1057        self.select_impl(
1058            exprs,
1059            ProjectionOptions {
1060                run_parallel: false,
1061                duplicate_check: true,
1062                should_broadcast: true,
1063            },
1064        )
1065    }
1066
1067    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1068        let opt_state = self.get_opt_state();
1069        let lp = self.get_plan_builder().project(exprs, options).build();
1070        Self::from_logical_plan(lp, opt_state)
1071    }
1072
1073    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1074    ///
1075    /// Takes a list of expressions to group on.
1076    ///
1077    /// # Example
1078    ///
1079    /// ```rust
1080    /// use polars_core::prelude::*;
1081    /// use polars_lazy::prelude::*;
1082    ///
1083    /// fn example(df: DataFrame) -> LazyFrame {
1084    ///       df.lazy()
1085    ///        .group_by([col("date")])
1086    ///        .agg([
1087    ///            col("rain").min().alias("min_rain"),
1088    ///            col("rain").sum().alias("sum_rain"),
1089    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1090    ///        ])
1091    /// }
1092    /// ```
1093    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1094        let keys = by
1095            .as_ref()
1096            .iter()
1097            .map(|e| e.clone().into())
1098            .collect::<Vec<_>>();
1099        let opt_state = self.get_opt_state();
1100
1101        #[cfg(feature = "dynamic_group_by")]
1102        {
1103            LazyGroupBy {
1104                logical_plan: self.logical_plan,
1105                opt_state,
1106                keys,
1107                predicates: vec![],
1108                maintain_order: false,
1109                dynamic_options: None,
1110                rolling_options: None,
1111            }
1112        }
1113
1114        #[cfg(not(feature = "dynamic_group_by"))]
1115        {
1116            LazyGroupBy {
1117                logical_plan: self.logical_plan,
1118                opt_state,
1119                keys,
1120                predicates: vec![],
1121                maintain_order: false,
1122            }
1123        }
1124    }
1125
1126    /// Create rolling groups based on a time column.
1127    ///
1128    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1129    ///
1130    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1131    /// individual values and are not of constant intervals. For constant intervals use
1132    /// *group_by_dynamic*
1133    #[cfg(feature = "dynamic_group_by")]
1134    pub fn rolling<E: AsRef<[Expr]>>(
1135        mut self,
1136        index_column: Expr,
1137        group_by: E,
1138        mut options: RollingGroupOptions,
1139    ) -> LazyGroupBy {
1140        if let Expr::Column(name) = index_column {
1141            options.index_column = name;
1142        } else {
1143            let output_field = index_column
1144                .to_field(&self.collect_schema().unwrap())
1145                .unwrap();
1146            return self.with_column(index_column).rolling(
1147                Expr::Column(output_field.name().clone()),
1148                group_by,
1149                options,
1150            );
1151        }
1152        let opt_state = self.get_opt_state();
1153        LazyGroupBy {
1154            logical_plan: self.logical_plan,
1155            opt_state,
1156            predicates: vec![],
1157            keys: group_by.as_ref().to_vec(),
1158            maintain_order: true,
1159            dynamic_options: None,
1160            rolling_options: Some(options),
1161        }
1162    }
1163
1164    /// Group based on a time value (or index value of type Int32, Int64).
1165    ///
1166    /// Time windows are calculated and rows are assigned to windows. Different from a
1167    /// normal group_by is that a row can be member of multiple groups. The time/index
1168    /// window could be seen as a rolling window, with a window size determined by
1169    /// dates/times/values instead of slots in the DataFrame.
1170    ///
1171    /// A window is defined by:
1172    ///
1173    /// - every: interval of the window
1174    /// - period: length of the window
1175    /// - offset: offset of the window
1176    ///
1177    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1178    /// with a ordinary group_by on these keys.
1179    #[cfg(feature = "dynamic_group_by")]
1180    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1181        mut self,
1182        index_column: Expr,
1183        group_by: E,
1184        mut options: DynamicGroupOptions,
1185    ) -> LazyGroupBy {
1186        if let Expr::Column(name) = index_column {
1187            options.index_column = name;
1188        } else {
1189            let output_field = index_column
1190                .to_field(&self.collect_schema().unwrap())
1191                .unwrap();
1192            return self.with_column(index_column).group_by_dynamic(
1193                Expr::Column(output_field.name().clone()),
1194                group_by,
1195                options,
1196            );
1197        }
1198        let opt_state = self.get_opt_state();
1199        LazyGroupBy {
1200            logical_plan: self.logical_plan,
1201            opt_state,
1202            predicates: vec![],
1203            keys: group_by.as_ref().to_vec(),
1204            maintain_order: true,
1205            dynamic_options: Some(options),
1206            rolling_options: None,
1207        }
1208    }
1209
1210    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1211    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1212        let keys = by
1213            .as_ref()
1214            .iter()
1215            .map(|e| e.clone().into())
1216            .collect::<Vec<_>>();
1217        let opt_state = self.get_opt_state();
1218
1219        #[cfg(feature = "dynamic_group_by")]
1220        {
1221            LazyGroupBy {
1222                logical_plan: self.logical_plan,
1223                opt_state,
1224                keys,
1225                predicates: vec![],
1226                maintain_order: true,
1227                dynamic_options: None,
1228                rolling_options: None,
1229            }
1230        }
1231
1232        #[cfg(not(feature = "dynamic_group_by"))]
1233        {
1234            LazyGroupBy {
1235                logical_plan: self.logical_plan,
1236                opt_state,
1237                keys,
1238                predicates: vec![],
1239                maintain_order: true,
1240            }
1241        }
1242    }
1243
1244    /// Left anti join this query with another lazy query.
1245    ///
1246    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1247    /// flexible join logic, see [`join`](LazyFrame::join) or
1248    /// [`join_builder`](LazyFrame::join_builder).
1249    ///
1250    /// # Example
1251    ///
1252    /// ```rust
1253    /// use polars_core::prelude::*;
1254    /// use polars_lazy::prelude::*;
1255    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1256    ///         ldf
1257    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1258    /// }
1259    /// ```
1260    #[cfg(feature = "semi_anti_join")]
1261    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1262        self.join(
1263            other,
1264            [left_on.into()],
1265            [right_on.into()],
1266            JoinArgs::new(JoinType::Anti),
1267        )
1268    }
1269
1270    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1271    #[cfg(feature = "cross_join")]
1272    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1273        self.join(
1274            other,
1275            vec![],
1276            vec![],
1277            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1278        )
1279    }
1280
1281    /// Left outer join this query with another lazy query.
1282    ///
1283    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1284    /// flexible join logic, see [`join`](LazyFrame::join) or
1285    /// [`join_builder`](LazyFrame::join_builder).
1286    ///
1287    /// # Example
1288    ///
1289    /// ```rust
1290    /// use polars_core::prelude::*;
1291    /// use polars_lazy::prelude::*;
1292    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1293    ///         ldf
1294    ///         .left_join(other, col("foo"), col("bar"))
1295    /// }
1296    /// ```
1297    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1298        self.join(
1299            other,
1300            [left_on.into()],
1301            [right_on.into()],
1302            JoinArgs::new(JoinType::Left),
1303        )
1304    }
1305
1306    /// Inner join this query with another lazy query.
1307    ///
1308    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1309    /// flexible join logic, see [`join`](LazyFrame::join) or
1310    /// [`join_builder`](LazyFrame::join_builder).
1311    ///
1312    /// # Example
1313    ///
1314    /// ```rust
1315    /// use polars_core::prelude::*;
1316    /// use polars_lazy::prelude::*;
1317    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1318    ///         ldf
1319    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1320    /// }
1321    /// ```
1322    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1323        self.join(
1324            other,
1325            [left_on.into()],
1326            [right_on.into()],
1327            JoinArgs::new(JoinType::Inner),
1328        )
1329    }
1330
1331    /// Full outer join this query with another lazy query.
1332    ///
1333    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1334    /// flexible join logic, see [`join`](LazyFrame::join) or
1335    /// [`join_builder`](LazyFrame::join_builder).
1336    ///
1337    /// # Example
1338    ///
1339    /// ```rust
1340    /// use polars_core::prelude::*;
1341    /// use polars_lazy::prelude::*;
1342    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1343    ///         ldf
1344    ///         .full_join(other, col("foo"), col("bar"))
1345    /// }
1346    /// ```
1347    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1348        self.join(
1349            other,
1350            [left_on.into()],
1351            [right_on.into()],
1352            JoinArgs::new(JoinType::Full),
1353        )
1354    }
1355
1356    /// Left semi join this query with another lazy query.
1357    ///
1358    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1359    /// flexible join logic, see [`join`](LazyFrame::join) or
1360    /// [`join_builder`](LazyFrame::join_builder).
1361    ///
1362    /// # Example
1363    ///
1364    /// ```rust
1365    /// use polars_core::prelude::*;
1366    /// use polars_lazy::prelude::*;
1367    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1368    ///         ldf
1369    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1370    /// }
1371    /// ```
1372    #[cfg(feature = "semi_anti_join")]
1373    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1374        self.join(
1375            other,
1376            [left_on.into()],
1377            [right_on.into()],
1378            JoinArgs::new(JoinType::Semi),
1379        )
1380    }
1381
1382    /// Generic function to join two LazyFrames.
1383    ///
1384    /// `join` can join on multiple columns, given as two list of expressions, and with a
1385    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1386    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1387    /// over how columns are renamed and parallelization options, use
1388    /// [`join_builder`](LazyFrame::join_builder).
1389    ///
1390    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1391    ///
1392    /// # Example
1393    ///
1394    /// ```rust
1395    /// use polars_core::prelude::*;
1396    /// use polars_lazy::prelude::*;
1397    ///
1398    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1399    ///         ldf
1400    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1401    /// }
1402    /// ```
1403    pub fn join<E: AsRef<[Expr]>>(
1404        self,
1405        other: LazyFrame,
1406        left_on: E,
1407        right_on: E,
1408        args: JoinArgs,
1409    ) -> LazyFrame {
1410        let left_on = left_on.as_ref().to_vec();
1411        let right_on = right_on.as_ref().to_vec();
1412
1413        self._join_impl(other, left_on, right_on, args)
1414    }
1415
1416    fn _join_impl(
1417        self,
1418        other: LazyFrame,
1419        left_on: Vec<Expr>,
1420        right_on: Vec<Expr>,
1421        args: JoinArgs,
1422    ) -> LazyFrame {
1423        let JoinArgs {
1424            how,
1425            validation,
1426            suffix,
1427            slice,
1428            nulls_equal,
1429            coalesce,
1430            maintain_order,
1431        } = args;
1432
1433        if slice.is_some() {
1434            panic!("impl error: slice is not handled")
1435        }
1436
1437        let mut builder = self
1438            .join_builder()
1439            .with(other)
1440            .left_on(left_on)
1441            .right_on(right_on)
1442            .how(how)
1443            .validate(validation)
1444            .join_nulls(nulls_equal)
1445            .coalesce(coalesce)
1446            .maintain_order(maintain_order);
1447
1448        if let Some(suffix) = suffix {
1449            builder = builder.suffix(suffix);
1450        }
1451
1452        // Note: args.slice is set by the optimizer
1453        builder.finish()
1454    }
1455
1456    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1457    ///
1458    /// After the `JoinBuilder` has been created and set up, calling
1459    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1460    /// representing the `join` operation.
1461    pub fn join_builder(self) -> JoinBuilder {
1462        JoinBuilder::new(self)
1463    }
1464
1465    /// Add or replace a column, given as an expression, to a DataFrame.
1466    ///
1467    /// # Example
1468    ///
1469    /// ```rust
1470    /// use polars_core::prelude::*;
1471    /// use polars_lazy::prelude::*;
1472    /// fn add_column(df: DataFrame) -> LazyFrame {
1473    ///     df.lazy()
1474    ///         .with_column(
1475    ///             when(col("sepal_length").lt(lit(5.0)))
1476    ///             .then(lit(10))
1477    ///             .otherwise(lit(1))
1478    ///             .alias("new_column_name"),
1479    ///         )
1480    /// }
1481    /// ```
1482    pub fn with_column(self, expr: Expr) -> LazyFrame {
1483        let opt_state = self.get_opt_state();
1484        let lp = self
1485            .get_plan_builder()
1486            .with_columns(
1487                vec![expr],
1488                ProjectionOptions {
1489                    run_parallel: false,
1490                    duplicate_check: true,
1491                    should_broadcast: true,
1492                },
1493            )
1494            .build();
1495        Self::from_logical_plan(lp, opt_state)
1496    }
1497
1498    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1499    ///
1500    /// # Example
1501    ///
1502    /// ```rust
1503    /// use polars_core::prelude::*;
1504    /// use polars_lazy::prelude::*;
1505    /// fn add_columns(df: DataFrame) -> LazyFrame {
1506    ///     df.lazy()
1507    ///         .with_columns(
1508    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1509    ///          )
1510    /// }
1511    /// ```
1512    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1513        let exprs = exprs.as_ref().to_vec();
1514        self.with_columns_impl(
1515            exprs,
1516            ProjectionOptions {
1517                run_parallel: true,
1518                duplicate_check: true,
1519                should_broadcast: true,
1520            },
1521        )
1522    }
1523
1524    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1525    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1526        let exprs = exprs.as_ref().to_vec();
1527        self.with_columns_impl(
1528            exprs,
1529            ProjectionOptions {
1530                run_parallel: false,
1531                duplicate_check: true,
1532                should_broadcast: true,
1533            },
1534        )
1535    }
1536
1537    /// Match or evolve to a certain schema.
1538    pub fn match_to_schema(
1539        self,
1540        schema: SchemaRef,
1541        per_column: Arc<[MatchToSchemaPerColumn]>,
1542        extra_columns: ExtraColumnsPolicy,
1543    ) -> LazyFrame {
1544        let opt_state = self.get_opt_state();
1545        let lp = self
1546            .get_plan_builder()
1547            .match_to_schema(schema, per_column, extra_columns)
1548            .build();
1549        Self::from_logical_plan(lp, opt_state)
1550    }
1551
1552    pub fn pipe_with_schema(
1553        self,
1554        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1555    ) -> Self {
1556        let opt_state = self.get_opt_state();
1557        let lp = self
1558            .get_plan_builder()
1559            .pipe_with_schema(vec![], callback)
1560            .build();
1561        Self::from_logical_plan(lp, opt_state)
1562    }
1563
1564    pub fn pipe_with_schemas(
1565        self,
1566        others: Vec<LazyFrame>,
1567        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1568    ) -> Self {
1569        let opt_state = self.get_opt_state();
1570        let lp = self
1571            .get_plan_builder()
1572            .pipe_with_schema(
1573                others.into_iter().map(|lf| lf.logical_plan).collect(),
1574                callback,
1575            )
1576            .build();
1577        Self::from_logical_plan(lp, opt_state)
1578    }
1579
1580    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1581        let opt_state = self.get_opt_state();
1582        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1583        Self::from_logical_plan(lp, opt_state)
1584    }
1585
1586    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1587        let contexts = contexts
1588            .as_ref()
1589            .iter()
1590            .map(|lf| lf.logical_plan.clone())
1591            .collect();
1592        let opt_state = self.get_opt_state();
1593        let lp = self.get_plan_builder().with_context(contexts).build();
1594        Self::from_logical_plan(lp, opt_state)
1595    }
1596
1597    /// Aggregate all the columns as their maximum values.
1598    ///
1599    /// Aggregated columns will have the same names as the original columns.
1600    pub fn max(self) -> Self {
1601        self.map_private(DslFunction::Stats(StatsFunction::Max))
1602    }
1603
1604    /// Aggregate all the columns as their minimum values.
1605    ///
1606    /// Aggregated columns will have the same names as the original columns.
1607    pub fn min(self) -> Self {
1608        self.map_private(DslFunction::Stats(StatsFunction::Min))
1609    }
1610
1611    /// Aggregate all the columns as their sum values.
1612    ///
1613    /// Aggregated columns will have the same names as the original columns.
1614    ///
1615    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1616    /// - For integer columns, the ordinary checks for overflow are performed:
1617    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1618    ///   silently wrap.
1619    /// - String columns will sum to None.
1620    pub fn sum(self) -> Self {
1621        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1622    }
1623
1624    /// Aggregate all the columns as their mean values.
1625    ///
1626    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1627    /// - String columns will have a mean of None.
1628    pub fn mean(self) -> Self {
1629        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1630    }
1631
1632    /// Aggregate all the columns as their median values.
1633    ///
1634    /// - Boolean and integer results are converted to `f64`. However, they are still
1635    ///   susceptible to overflow before this conversion occurs.
1636    /// - String columns will sum to None.
1637    pub fn median(self) -> Self {
1638        self.map_private(DslFunction::Stats(StatsFunction::Median))
1639    }
1640
1641    /// Aggregate all the columns as their quantile values.
1642    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1643        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1644            quantile,
1645            method,
1646        }))
1647    }
1648
1649    /// Aggregate all the columns as their standard deviation values.
1650    ///
1651    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1652    /// computing the variance, where `N` is the number of rows.
1653    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1654    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1655    /// > likelihood estimate of the variance for normally distributed variables. The
1656    /// > standard deviation computed in this function is the square root of the estimated
1657    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1658    /// > standard deviation per se.
1659    ///
1660    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1661    pub fn std(self, ddof: u8) -> Self {
1662        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1663    }
1664
1665    /// Aggregate all the columns as their variance values.
1666    ///
1667    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1668    /// computing the variance, where `N` is the number of rows.
1669    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1670    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1671    /// > likelihood estimate of the variance for normally distributed variables.
1672    ///
1673    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1674    pub fn var(self, ddof: u8) -> Self {
1675        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1676    }
1677
1678    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1679    pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1680        self.explode_impl(columns, options, false)
1681    }
1682
1683    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1684    fn explode_impl(
1685        self,
1686        columns: Selector,
1687        options: ExplodeOptions,
1688        allow_empty: bool,
1689    ) -> LazyFrame {
1690        let opt_state = self.get_opt_state();
1691        let lp = self
1692            .get_plan_builder()
1693            .explode(columns, options, allow_empty)
1694            .build();
1695        Self::from_logical_plan(lp, opt_state)
1696    }
1697
1698    /// Aggregate all the columns as the sum of their null value count.
1699    pub fn null_count(self) -> LazyFrame {
1700        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1701    }
1702
1703    /// Drop non-unique rows and maintain the order of kept rows.
1704    ///
1705    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1706    /// `None`, all columns are considered.
1707    pub fn unique_stable(
1708        self,
1709        subset: Option<Selector>,
1710        keep_strategy: UniqueKeepStrategy,
1711    ) -> LazyFrame {
1712        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1713        self.unique_stable_generic(subset, keep_strategy)
1714    }
1715
1716    pub fn unique_stable_generic(
1717        self,
1718        subset: Option<Vec<Expr>>,
1719        keep_strategy: UniqueKeepStrategy,
1720    ) -> LazyFrame {
1721        let opt_state = self.get_opt_state();
1722        let options = DistinctOptionsDSL {
1723            subset,
1724            maintain_order: true,
1725            keep_strategy,
1726        };
1727        let lp = self.get_plan_builder().distinct(options).build();
1728        Self::from_logical_plan(lp, opt_state)
1729    }
1730
1731    /// Drop non-unique rows without maintaining the order of kept rows.
1732    ///
1733    /// The order of the kept rows may change; to maintain the original row order, use
1734    /// [`unique_stable`](LazyFrame::unique_stable).
1735    ///
1736    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1737    /// all columns are considered.
1738    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1739        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1740        self.unique_generic(subset, keep_strategy)
1741    }
1742
1743    pub fn unique_generic(
1744        self,
1745        subset: Option<Vec<Expr>>,
1746        keep_strategy: UniqueKeepStrategy,
1747    ) -> LazyFrame {
1748        let opt_state = self.get_opt_state();
1749        let options = DistinctOptionsDSL {
1750            subset,
1751            maintain_order: false,
1752            keep_strategy,
1753        };
1754        let lp = self.get_plan_builder().distinct(options).build();
1755        Self::from_logical_plan(lp, opt_state)
1756    }
1757
1758    /// Drop rows containing one or more NaN values.
1759    ///
1760    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1761    /// floating point columns are considered.
1762    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1763        let opt_state = self.get_opt_state();
1764        let lp = self.get_plan_builder().drop_nans(subset).build();
1765        Self::from_logical_plan(lp, opt_state)
1766    }
1767
1768    /// Drop rows containing one or more None values.
1769    ///
1770    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1771    /// columns are considered.
1772    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1773        let opt_state = self.get_opt_state();
1774        let lp = self.get_plan_builder().drop_nulls(subset).build();
1775        Self::from_logical_plan(lp, opt_state)
1776    }
1777
1778    /// Slice the DataFrame using an offset (starting row) and a length.
1779    ///
1780    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1781    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1782    /// end.
1783    ///
1784    /// If `offset` and `len` are such that the slice extends beyond the end of the
1785    /// DataFrame, the portion between `offset` and the end will be returned. In this
1786    /// case, the number of rows in the returned DataFrame will be less than `len`.
1787    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1788        let opt_state = self.get_opt_state();
1789        let lp = self.get_plan_builder().slice(offset, len).build();
1790        Self::from_logical_plan(lp, opt_state)
1791    }
1792
1793    /// Get the first row.
1794    ///
1795    /// Equivalent to `self.slice(0, 1)`.
1796    pub fn first(self) -> LazyFrame {
1797        self.slice(0, 1)
1798    }
1799
1800    /// Get the last row.
1801    ///
1802    /// Equivalent to `self.slice(-1, 1)`.
1803    pub fn last(self) -> LazyFrame {
1804        self.slice(-1, 1)
1805    }
1806
1807    /// Get the last `n` rows.
1808    ///
1809    /// Equivalent to `self.slice(-(n as i64), n)`.
1810    pub fn tail(self, n: IdxSize) -> LazyFrame {
1811        let neg_tail = -(n as i64);
1812        self.slice(neg_tail, n)
1813    }
1814
1815    #[cfg(feature = "pivot")]
1816    #[expect(clippy::too_many_arguments)]
1817    pub fn pivot(
1818        self,
1819        on: Selector,
1820        on_columns: Arc<DataFrame>,
1821        index: Selector,
1822        values: Selector,
1823        agg: Expr,
1824        maintain_order: bool,
1825        separator: PlSmallStr,
1826    ) -> LazyFrame {
1827        let opt_state = self.get_opt_state();
1828        let lp = self
1829            .get_plan_builder()
1830            .pivot(
1831                on,
1832                on_columns,
1833                index,
1834                values,
1835                agg,
1836                maintain_order,
1837                separator,
1838            )
1839            .build();
1840        Self::from_logical_plan(lp, opt_state)
1841    }
1842
1843    /// Unpivot the DataFrame from wide to long format.
1844    ///
1845    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1846    #[cfg(feature = "pivot")]
1847    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1848        let opt_state = self.get_opt_state();
1849        let lp = self.get_plan_builder().unpivot(args).build();
1850        Self::from_logical_plan(lp, opt_state)
1851    }
1852
1853    /// Limit the DataFrame to the first `n` rows.
1854    pub fn limit(self, n: IdxSize) -> LazyFrame {
1855        self.slice(0, n)
1856    }
1857
1858    /// Apply a function/closure once the logical plan get executed.
1859    ///
1860    /// The function has access to the whole materialized DataFrame at the time it is
1861    /// called.
1862    ///
1863    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1864    /// with `LazyFrame::with_column` or `with_columns`.
1865    ///
1866    /// ## Warning
1867    /// This can blow up in your face if the schema is changed due to the operation. The
1868    /// optimizer relies on a correct schema.
1869    ///
1870    /// You can toggle certain optimizations off.
1871    pub fn map<F>(
1872        self,
1873        function: F,
1874        optimizations: AllowedOptimizations,
1875        schema: Option<Arc<dyn UdfSchema>>,
1876        name: Option<&'static str>,
1877    ) -> LazyFrame
1878    where
1879        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1880    {
1881        let opt_state = self.get_opt_state();
1882        let lp = self
1883            .get_plan_builder()
1884            .map(
1885                function,
1886                optimizations,
1887                schema,
1888                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1889            )
1890            .build();
1891        Self::from_logical_plan(lp, opt_state)
1892    }
1893
1894    #[cfg(feature = "python")]
1895    pub fn map_python(
1896        self,
1897        function: polars_utils::python_function::PythonFunction,
1898        optimizations: AllowedOptimizations,
1899        schema: Option<SchemaRef>,
1900        validate_output: bool,
1901    ) -> LazyFrame {
1902        let opt_state = self.get_opt_state();
1903        let lp = self
1904            .get_plan_builder()
1905            .map_python(function, optimizations, schema, validate_output)
1906            .build();
1907        Self::from_logical_plan(lp, opt_state)
1908    }
1909
1910    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1911        let opt_state = self.get_opt_state();
1912        let lp = self.get_plan_builder().map_private(function).build();
1913        Self::from_logical_plan(lp, opt_state)
1914    }
1915
1916    /// Add a new column at index 0 that counts the rows.
1917    ///
1918    /// `name` is the name of the new column. `offset` is where to start counting from; if
1919    /// `None`, it is set to `0`.
1920    ///
1921    /// # Warning
1922    /// This can have a negative effect on query performance. This may for instance block
1923    /// predicate pushdown optimization.
1924    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1925    where
1926        S: Into<PlSmallStr>,
1927    {
1928        let name = name.into();
1929
1930        match &self.logical_plan {
1931            v @ DslPlan::Scan {
1932                scan_type,
1933                unified_scan_args,
1934                ..
1935            } if unified_scan_args.row_index.is_none()
1936                && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1937            {
1938                let DslPlan::Scan {
1939                    sources,
1940                    mut unified_scan_args,
1941                    scan_type,
1942                    cached_ir: _,
1943                } = v.clone()
1944                else {
1945                    unreachable!()
1946                };
1947
1948                unified_scan_args.row_index = Some(RowIndex {
1949                    name,
1950                    offset: offset.unwrap_or(0),
1951                });
1952
1953                DslPlan::Scan {
1954                    sources,
1955                    unified_scan_args,
1956                    scan_type,
1957                    cached_ir: Default::default(),
1958                }
1959                .into()
1960            },
1961            _ => self.map_private(DslFunction::RowIndex { name, offset }),
1962        }
1963    }
1964
1965    /// Return the number of non-null elements for each column.
1966    pub fn count(self) -> LazyFrame {
1967        self.select(vec![col(PlSmallStr::from_static("*")).count()])
1968    }
1969
1970    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
1971    /// inserted as columns.
1972    #[cfg(feature = "dtype-struct")]
1973    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1974        self.map_private(DslFunction::Unnest {
1975            columns: cols,
1976            separator,
1977        })
1978    }
1979
1980    #[cfg(feature = "merge_sorted")]
1981    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1982    where
1983        S: Into<PlSmallStr>,
1984    {
1985        let key = key.into();
1986
1987        let lp = DslPlan::MergeSorted {
1988            input_left: Arc::new(self.logical_plan),
1989            input_right: Arc::new(other.logical_plan),
1990            key,
1991        };
1992        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1993    }
1994
1995    pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1996        let lp = DslPlan::MapFunction {
1997            input: Arc::new(self.logical_plan),
1998            function: DslFunction::Hint(hint),
1999        };
2000        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2001    }
2002}
2003
2004/// Utility struct for lazy group_by operation.
2005#[derive(Clone)]
2006pub struct LazyGroupBy {
2007    pub logical_plan: DslPlan,
2008    opt_state: OptFlags,
2009    keys: Vec<Expr>,
2010    predicates: Vec<Expr>,
2011    maintain_order: bool,
2012    #[cfg(feature = "dynamic_group_by")]
2013    dynamic_options: Option<DynamicGroupOptions>,
2014    #[cfg(feature = "dynamic_group_by")]
2015    rolling_options: Option<RollingGroupOptions>,
2016}
2017
2018impl From<LazyGroupBy> for LazyFrame {
2019    fn from(lgb: LazyGroupBy) -> Self {
2020        Self {
2021            logical_plan: lgb.logical_plan,
2022            opt_state: lgb.opt_state,
2023            cached_arena: Default::default(),
2024        }
2025    }
2026}
2027
2028impl LazyGroupBy {
2029    /// Filter groups with a predicate after aggregation.
2030    ///
2031    /// Similarly to the [LazyGroupBy::agg] method, the predicate must run an aggregation as it
2032    /// is evaluated on the groups.
2033    /// This method can be chained in which case all predicates must evaluate to `true` for a
2034    /// group to be kept.
2035    ///
2036    /// # Example
2037    ///
2038    /// ```rust
2039    /// use polars_core::prelude::*;
2040    /// use polars_lazy::prelude::*;
2041    ///
2042    /// fn example(df: DataFrame) -> LazyFrame {
2043    ///       df.lazy()
2044    ///        .group_by_stable([col("date")])
2045    ///        .having(col("rain").sum().gt(lit(10)))
2046    ///        .agg([col("rain").min().alias("min_rain")])
2047    /// }
2048    /// ```
2049    pub fn having(mut self, predicate: Expr) -> Self {
2050        self.predicates.push(predicate);
2051        self
2052    }
2053
2054    /// Group by and aggregate.
2055    ///
2056    /// Select a column with [col] and choose an aggregation.
2057    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2058    ///
2059    /// # Example
2060    ///
2061    /// ```rust
2062    /// use polars_core::prelude::*;
2063    /// use polars_lazy::prelude::*;
2064    ///
2065    /// fn example(df: DataFrame) -> LazyFrame {
2066    ///       df.lazy()
2067    ///        .group_by_stable([col("date")])
2068    ///        .agg([
2069    ///            col("rain").min().alias("min_rain"),
2070    ///            col("rain").sum().alias("sum_rain"),
2071    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2072    ///        ])
2073    /// }
2074    /// ```
2075    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2076        #[cfg(feature = "dynamic_group_by")]
2077        let lp = DslBuilder::from(self.logical_plan)
2078            .group_by(
2079                self.keys,
2080                self.predicates,
2081                aggs,
2082                None,
2083                self.maintain_order,
2084                self.dynamic_options,
2085                self.rolling_options,
2086            )
2087            .build();
2088
2089        #[cfg(not(feature = "dynamic_group_by"))]
2090        let lp = DslBuilder::from(self.logical_plan)
2091            .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2092            .build();
2093        LazyFrame::from_logical_plan(lp, self.opt_state)
2094    }
2095
2096    /// Return first n rows of each group
2097    pub fn head(self, n: Option<usize>) -> LazyFrame {
2098        let keys = self
2099            .keys
2100            .iter()
2101            .filter_map(|expr| expr_output_name(expr).ok())
2102            .collect::<Vec<_>>();
2103
2104        self.agg([all().as_expr().head(n)]).explode_impl(
2105            all() - by_name(keys.iter().cloned(), false),
2106            ExplodeOptions {
2107                empty_as_null: true,
2108                keep_nulls: true,
2109            },
2110            true,
2111        )
2112    }
2113
2114    /// Return last n rows of each group
2115    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2116        let keys = self
2117            .keys
2118            .iter()
2119            .filter_map(|expr| expr_output_name(expr).ok())
2120            .collect::<Vec<_>>();
2121
2122        self.agg([all().as_expr().tail(n)]).explode_impl(
2123            all() - by_name(keys.iter().cloned(), false),
2124            ExplodeOptions {
2125                empty_as_null: true,
2126                keep_nulls: true,
2127            },
2128            true,
2129        )
2130    }
2131
2132    /// Apply a function over the groups as a new DataFrame.
2133    ///
2134    /// **It is not recommended that you use this as materializing the DataFrame is very
2135    /// expensive.**
2136    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2137        if !self.predicates.is_empty() {
2138            panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2139        }
2140
2141        #[cfg(feature = "dynamic_group_by")]
2142        let options = GroupbyOptions {
2143            dynamic: self.dynamic_options,
2144            rolling: self.rolling_options,
2145            slice: None,
2146        };
2147
2148        #[cfg(not(feature = "dynamic_group_by"))]
2149        let options = GroupbyOptions { slice: None };
2150
2151        let lp = DslPlan::GroupBy {
2152            input: Arc::new(self.logical_plan),
2153            keys: self.keys,
2154            predicates: vec![],
2155            aggs: vec![],
2156            apply: Some((f, schema)),
2157            maintain_order: self.maintain_order,
2158            options: Arc::new(options),
2159        };
2160        LazyFrame::from_logical_plan(lp, self.opt_state)
2161    }
2162}
2163
2164#[must_use]
2165pub struct JoinBuilder {
2166    lf: LazyFrame,
2167    how: JoinType,
2168    other: Option<LazyFrame>,
2169    left_on: Vec<Expr>,
2170    right_on: Vec<Expr>,
2171    allow_parallel: bool,
2172    force_parallel: bool,
2173    suffix: Option<PlSmallStr>,
2174    validation: JoinValidation,
2175    nulls_equal: bool,
2176    coalesce: JoinCoalesce,
2177    maintain_order: MaintainOrderJoin,
2178}
2179impl JoinBuilder {
2180    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2181    pub fn new(lf: LazyFrame) -> Self {
2182        Self {
2183            lf,
2184            other: None,
2185            how: JoinType::Inner,
2186            left_on: vec![],
2187            right_on: vec![],
2188            allow_parallel: true,
2189            force_parallel: false,
2190            suffix: None,
2191            validation: Default::default(),
2192            nulls_equal: false,
2193            coalesce: Default::default(),
2194            maintain_order: Default::default(),
2195        }
2196    }
2197
2198    /// The right table in the join.
2199    pub fn with(mut self, other: LazyFrame) -> Self {
2200        self.other = Some(other);
2201        self
2202    }
2203
2204    /// Select the join type.
2205    pub fn how(mut self, how: JoinType) -> Self {
2206        self.how = how;
2207        self
2208    }
2209
2210    pub fn validate(mut self, validation: JoinValidation) -> Self {
2211        self.validation = validation;
2212        self
2213    }
2214
2215    /// The expressions you want to join both tables on.
2216    ///
2217    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2218    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2219        let on = on.as_ref().to_vec();
2220        self.left_on.clone_from(&on);
2221        self.right_on = on;
2222        self
2223    }
2224
2225    /// The expressions you want to join the left table on.
2226    ///
2227    /// The passed expressions must be valid in the left table.
2228    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2229        self.left_on = on.as_ref().to_vec();
2230        self
2231    }
2232
2233    /// The expressions you want to join the right table on.
2234    ///
2235    /// The passed expressions must be valid in the right table.
2236    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2237        self.right_on = on.as_ref().to_vec();
2238        self
2239    }
2240
2241    /// Allow parallel table evaluation.
2242    pub fn allow_parallel(mut self, allow: bool) -> Self {
2243        self.allow_parallel = allow;
2244        self
2245    }
2246
2247    /// Force parallel table evaluation.
2248    pub fn force_parallel(mut self, force: bool) -> Self {
2249        self.force_parallel = force;
2250        self
2251    }
2252
2253    /// Join on null values. By default null values will never produce matches.
2254    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2255        self.nulls_equal = nulls_equal;
2256        self
2257    }
2258
2259    /// Suffix to add duplicate column names in join.
2260    /// Defaults to `"_right"` if this method is never called.
2261    pub fn suffix<S>(mut self, suffix: S) -> Self
2262    where
2263        S: Into<PlSmallStr>,
2264    {
2265        self.suffix = Some(suffix.into());
2266        self
2267    }
2268
2269    /// Whether to coalesce join columns.
2270    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2271        self.coalesce = coalesce;
2272        self
2273    }
2274
2275    /// Whether to preserve the row order.
2276    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2277        self.maintain_order = maintain_order;
2278        self
2279    }
2280
2281    /// Finish builder
2282    pub fn finish(self) -> LazyFrame {
2283        let opt_state = self.lf.opt_state;
2284        let other = self.other.expect("'with' not set in join builder");
2285
2286        let args = JoinArgs {
2287            how: self.how,
2288            validation: self.validation,
2289            suffix: self.suffix,
2290            slice: None,
2291            nulls_equal: self.nulls_equal,
2292            coalesce: self.coalesce,
2293            maintain_order: self.maintain_order,
2294        };
2295
2296        let lp = self
2297            .lf
2298            .get_plan_builder()
2299            .join(
2300                other.logical_plan,
2301                self.left_on,
2302                self.right_on,
2303                JoinOptions {
2304                    allow_parallel: self.allow_parallel,
2305                    force_parallel: self.force_parallel,
2306                    args,
2307                }
2308                .into(),
2309            )
2310            .build();
2311        LazyFrame::from_logical_plan(lp, opt_state)
2312    }
2313
2314    // Finish with join predicates
2315    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2316        let opt_state = self.lf.opt_state;
2317        let other = self.other.expect("with not set");
2318
2319        // Decompose `And` conjunctions into their component expressions
2320        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2321            if let Expr::BinaryExpr {
2322                op: Operator::And,
2323                left,
2324                right,
2325            } = predicate
2326            {
2327                decompose_and((*left).clone(), expanded_predicates);
2328                decompose_and((*right).clone(), expanded_predicates);
2329            } else {
2330                expanded_predicates.push(predicate);
2331            }
2332        }
2333        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2334        for predicate in predicates {
2335            decompose_and(predicate, &mut expanded_predicates);
2336        }
2337        let predicates: Vec<Expr> = expanded_predicates;
2338
2339        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2340        #[cfg(feature = "is_between")]
2341        let predicates: Vec<Expr> = {
2342            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2343            for predicate in predicates {
2344                if let Expr::Function {
2345                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2346                    input,
2347                    ..
2348                } = &predicate
2349                {
2350                    if let [expr, lower, upper] = input.as_slice() {
2351                        match closed {
2352                            ClosedInterval::Both => {
2353                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2354                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2355                            },
2356                            ClosedInterval::Right => {
2357                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2358                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2359                            },
2360                            ClosedInterval::Left => {
2361                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2362                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2363                            },
2364                            ClosedInterval::None => {
2365                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2366                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2367                            },
2368                        }
2369                        continue;
2370                    }
2371                }
2372                expanded_predicates.push(predicate);
2373            }
2374            expanded_predicates
2375        };
2376
2377        let args = JoinArgs {
2378            how: self.how,
2379            validation: self.validation,
2380            suffix: self.suffix,
2381            slice: None,
2382            nulls_equal: self.nulls_equal,
2383            coalesce: self.coalesce,
2384            maintain_order: self.maintain_order,
2385        };
2386        let options = JoinOptions {
2387            allow_parallel: self.allow_parallel,
2388            force_parallel: self.force_parallel,
2389            args,
2390        };
2391
2392        let lp = DslPlan::Join {
2393            input_left: Arc::new(self.lf.logical_plan),
2394            input_right: Arc::new(other.logical_plan),
2395            left_on: Default::default(),
2396            right_on: Default::default(),
2397            predicates,
2398            options: Arc::from(options),
2399        };
2400
2401        LazyFrame::from_logical_plan(lp, opt_state)
2402    }
2403}
2404
2405pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2406    #[cfg(not(feature = "new_streaming"))]
2407    {
2408        None
2409    }
2410    #[cfg(feature = "new_streaming")]
2411    {
2412        Some(polars_stream::build_streaming_query_executor)
2413    }
2414};