polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::{Arc, Mutex};
12
13pub use anonymous_scan::*;
14#[cfg(feature = "csv")]
15pub use csv::*;
16#[cfg(not(target_arch = "wasm32"))]
17pub use exitable::*;
18pub use file_list_reader::*;
19#[cfg(feature = "json")]
20pub use ndjson::*;
21#[cfg(feature = "parquet")]
22pub use parquet::*;
23use polars_compute::rolling::QuantileMethod;
24use polars_core::POOL;
25use polars_core::error::feature_gated;
26use polars_core::prelude::*;
27use polars_io::RowIndex;
28use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
29use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
30use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
31#[cfg(feature = "is_between")]
32use polars_ops::prelude::ClosedInterval;
33pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
34use polars_utils::pl_str::PlSmallStr;
35use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
36
37use crate::frame::cached_arenas::CachedArena;
38use crate::prelude::*;
39
40pub trait IntoLazy {
41    fn lazy(self) -> LazyFrame;
42}
43
44impl IntoLazy for DataFrame {
45    /// Convert the `DataFrame` into a `LazyFrame`
46    fn lazy(self) -> LazyFrame {
47        let lp = DslBuilder::from_existing_df(self).build();
48        LazyFrame {
49            logical_plan: lp,
50            opt_state: Default::default(),
51            cached_arena: Default::default(),
52        }
53    }
54}
55
56impl IntoLazy for LazyFrame {
57    fn lazy(self) -> LazyFrame {
58        self
59    }
60}
61
62/// Lazy abstraction over an eager `DataFrame`.
63///
64/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
65/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
66#[derive(Clone, Default)]
67#[must_use]
68pub struct LazyFrame {
69    pub logical_plan: DslPlan,
70    pub(crate) opt_state: OptFlags,
71    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
72}
73
74impl From<DslPlan> for LazyFrame {
75    fn from(plan: DslPlan) -> Self {
76        Self {
77            logical_plan: plan,
78            opt_state: OptFlags::default(),
79            cached_arena: Default::default(),
80        }
81    }
82}
83
84impl LazyFrame {
85    pub(crate) fn from_inner(
86        logical_plan: DslPlan,
87        opt_state: OptFlags,
88        cached_arena: Arc<Mutex<Option<CachedArena>>>,
89    ) -> Self {
90        Self {
91            logical_plan,
92            opt_state,
93            cached_arena,
94        }
95    }
96
97    pub(crate) fn get_plan_builder(self) -> DslBuilder {
98        DslBuilder::from(self.logical_plan)
99    }
100
101    fn get_opt_state(&self) -> OptFlags {
102        self.opt_state
103    }
104
105    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
106        LazyFrame {
107            logical_plan,
108            opt_state,
109            cached_arena: Default::default(),
110        }
111    }
112
113    /// Get current optimizations.
114    pub fn get_current_optimizations(&self) -> OptFlags {
115        self.opt_state
116    }
117
118    /// Set allowed optimizations.
119    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
120        self.opt_state = opt_state;
121        self
122    }
123
124    /// Turn off all optimizations.
125    pub fn without_optimizations(self) -> Self {
126        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
127    }
128
129    /// Toggle projection pushdown optimization.
130    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
131        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
132        self
133    }
134
135    /// Toggle cluster with columns optimization.
136    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
137        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
138        self
139    }
140
141    /// Check if operations are order dependent and unset maintaining_order if
142    /// the order would not be observed.
143    pub fn with_check_order(mut self, toggle: bool) -> Self {
144        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
145        self
146    }
147
148    /// Toggle predicate pushdown optimization.
149    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
150        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
151        self
152    }
153
154    /// Toggle type coercion optimization.
155    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
156        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
157        self
158    }
159
160    /// Toggle type check optimization.
161    pub fn with_type_check(mut self, toggle: bool) -> Self {
162        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
163        self
164    }
165
166    /// Toggle expression simplification optimization on or off.
167    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
168        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
169        self
170    }
171
172    /// Toggle common subplan elimination optimization on or off
173    #[cfg(feature = "cse")]
174    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
175        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
176        self
177    }
178
179    /// Toggle common subexpression elimination optimization on or off
180    #[cfg(feature = "cse")]
181    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
182        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
183        self
184    }
185
186    /// Toggle slice pushdown optimization.
187    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
188        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
189        self
190    }
191
192    #[cfg(feature = "new_streaming")]
193    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
194        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
195        self
196    }
197
198    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
199    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
200        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
201        self
202    }
203
204    /// Run every node eagerly. This turns off multi-node optimizations.
205    pub fn _with_eager(mut self, toggle: bool) -> Self {
206        self.opt_state.set(OptFlags::EAGER, toggle);
207        self
208    }
209
210    /// Return a String describing the naive (un-optimized) logical plan.
211    pub fn describe_plan(&self) -> PolarsResult<String> {
212        Ok(self.clone().to_alp()?.describe())
213    }
214
215    /// Return a String describing the naive (un-optimized) logical plan in tree format.
216    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
217        Ok(self.clone().to_alp()?.describe_tree_format())
218    }
219
220    /// Return a String describing the optimized logical plan.
221    ///
222    /// Returns `Err` if optimizing the logical plan fails.
223    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
224        Ok(self.clone().to_alp_optimized()?.describe())
225    }
226
227    /// Return a String describing the optimized logical plan in tree format.
228    ///
229    /// Returns `Err` if optimizing the logical plan fails.
230    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
231        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
232    }
233
234    /// Return a String describing the logical plan.
235    ///
236    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
237    /// explains the naive, un-optimized plan.
238    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
239        if optimized {
240            self.describe_optimized_plan()
241        } else {
242            self.describe_plan()
243        }
244    }
245
246    /// Add a sort operation to the logical plan.
247    ///
248    /// Sorts the LazyFrame by the column name specified using the provided options.
249    ///
250    /// # Example
251    ///
252    /// Sort DataFrame by 'sepal_width' column:
253    /// ```rust
254    /// # use polars_core::prelude::*;
255    /// # use polars_lazy::prelude::*;
256    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
257    ///     df.lazy().sort(["sepal_width"], Default::default())
258    /// }
259    /// ```
260    /// Sort by a single column with specific order:
261    /// ```
262    /// # use polars_core::prelude::*;
263    /// # use polars_lazy::prelude::*;
264    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
265    ///     df.lazy().sort(
266    ///         ["sepal_width"],
267    ///         SortMultipleOptions::new()
268    ///             .with_order_descending(descending)
269    ///     )
270    /// }
271    /// ```
272    /// Sort by multiple columns with specifying order for each column:
273    /// ```
274    /// # use polars_core::prelude::*;
275    /// # use polars_lazy::prelude::*;
276    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
277    ///     df.lazy().sort(
278    ///         ["sepal_width", "sepal_length"],
279    ///         SortMultipleOptions::new()
280    ///             .with_order_descending_multi([false, true])
281    ///     )
282    /// }
283    /// ```
284    /// See [`SortMultipleOptions`] for more options.
285    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
286        let opt_state = self.get_opt_state();
287        let lp = self
288            .get_plan_builder()
289            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
290            .build();
291        Self::from_logical_plan(lp, opt_state)
292    }
293
294    /// Add a sort operation to the logical plan.
295    ///
296    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
297    /// concrete columns before sorting.
298    ///
299    /// See [`SortMultipleOptions`] for more options.
300    ///
301    /// # Example
302    ///
303    /// ```rust
304    /// use polars_core::prelude::*;
305    /// use polars_lazy::prelude::*;
306    ///
307    /// /// Sort DataFrame by 'sepal_width' column
308    /// fn example(df: DataFrame) -> LazyFrame {
309    ///       df.lazy()
310    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
311    /// }
312    /// ```
313    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
314        self,
315        by_exprs: E,
316        sort_options: SortMultipleOptions,
317    ) -> Self {
318        let by_exprs = by_exprs.as_ref().to_vec();
319        if by_exprs.is_empty() {
320            self
321        } else {
322            let opt_state = self.get_opt_state();
323            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
324            Self::from_logical_plan(lp, opt_state)
325        }
326    }
327
328    pub fn top_k<E: AsRef<[Expr]>>(
329        self,
330        k: IdxSize,
331        by_exprs: E,
332        sort_options: SortMultipleOptions,
333    ) -> Self {
334        // this will optimize to top-k
335        self.sort_by_exprs(
336            by_exprs,
337            sort_options.with_order_reversed().with_nulls_last(true),
338        )
339        .slice(0, k)
340    }
341
342    pub fn bottom_k<E: AsRef<[Expr]>>(
343        self,
344        k: IdxSize,
345        by_exprs: E,
346        sort_options: SortMultipleOptions,
347    ) -> Self {
348        // this will optimize to bottom-k
349        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
350            .slice(0, k)
351    }
352
353    /// Reverse the `DataFrame` from top to bottom.
354    ///
355    /// Row `i` becomes row `number_of_rows - i - 1`.
356    ///
357    /// # Example
358    ///
359    /// ```rust
360    /// use polars_core::prelude::*;
361    /// use polars_lazy::prelude::*;
362    ///
363    /// fn example(df: DataFrame) -> LazyFrame {
364    ///       df.lazy()
365    ///         .reverse()
366    /// }
367    /// ```
368    pub fn reverse(self) -> Self {
369        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
370    }
371
372    /// Rename columns in the DataFrame.
373    ///
374    /// `existing` and `new` are iterables of the same length containing the old and
375    /// corresponding new column names. Renaming happens to all `existing` columns
376    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
377    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
378    /// those columns that are actually found will be renamed (others will be ignored).
379    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
380    where
381        I: IntoIterator<Item = T>,
382        J: IntoIterator<Item = S>,
383        T: AsRef<str>,
384        S: AsRef<str>,
385    {
386        let iter = existing.into_iter();
387        let cap = iter.size_hint().0;
388        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
389        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390
391        // TODO! should this error if `existing` and `new` have different lengths?
392        // Currently, the longer of the two is truncated.
393        for (existing, new) in iter.zip(new) {
394            let existing = existing.as_ref();
395            let new = new.as_ref();
396            if new != existing {
397                existing_vec.push(existing.into());
398                new_vec.push(new.into());
399            }
400        }
401
402        self.map_private(DslFunction::Rename {
403            existing: existing_vec.into(),
404            new: new_vec.into(),
405            strict,
406        })
407    }
408
409    /// Removes columns from the DataFrame.
410    /// Note that it's better to only select the columns you need
411    /// and let the projection pushdown optimize away the unneeded columns.
412    ///
413    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
414    /// error while materializing the [`LazyFrame`].
415    pub fn drop(self, columns: Selector) -> Self {
416        let opt_state = self.get_opt_state();
417        let lp = self.get_plan_builder().drop(columns).build();
418        Self::from_logical_plan(lp, opt_state)
419    }
420
421    /// Shift the values by a given period and fill the parts that will be empty due to this operation
422    /// with `Nones`.
423    ///
424    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
425    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
426        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
427    }
428
429    /// Shift the values by a given period and fill the parts that will be empty due to this operation
430    /// with the result of the `fill_value` expression.
431    ///
432    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
433    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
434        self.select(vec![
435            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
436        ])
437    }
438
439    /// Fill None values in the DataFrame with an expression.
440    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
441        let opt_state = self.get_opt_state();
442        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
443        Self::from_logical_plan(lp, opt_state)
444    }
445
446    /// Fill NaN values in the DataFrame with an expression.
447    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448        let opt_state = self.get_opt_state();
449        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
450        Self::from_logical_plan(lp, opt_state)
451    }
452
453    /// Caches the result into a new LazyFrame.
454    ///
455    /// This should be used to prevent computations running multiple times.
456    pub fn cache(self) -> Self {
457        let opt_state = self.get_opt_state();
458        let lp = self.get_plan_builder().cache().build();
459        Self::from_logical_plan(lp, opt_state)
460    }
461
462    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
463    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
464        let cast_cols: Vec<Expr> = dtypes
465            .into_iter()
466            .map(|(name, dt)| {
467                let name = PlSmallStr::from_str(name);
468
469                if strict {
470                    col(name).strict_cast(dt)
471                } else {
472                    col(name).cast(dt)
473                }
474            })
475            .collect();
476
477        if cast_cols.is_empty() {
478            self
479        } else {
480            self.with_columns(cast_cols)
481        }
482    }
483
484    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
485    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
486        self.with_columns(vec![if strict {
487            col(PlSmallStr::from_static("*")).strict_cast(dtype)
488        } else {
489            col(PlSmallStr::from_static("*")).cast(dtype)
490        }])
491    }
492
493    pub fn optimize(
494        self,
495        lp_arena: &mut Arena<IR>,
496        expr_arena: &mut Arena<AExpr>,
497    ) -> PolarsResult<Node> {
498        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
499    }
500
501    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
502        let (mut lp_arena, mut expr_arena) = self.get_arenas();
503        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
504
505        Ok(IRPlan::new(node, lp_arena, expr_arena))
506    }
507
508    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
509        let (mut lp_arena, mut expr_arena) = self.get_arenas();
510        let node = to_alp(
511            self.logical_plan,
512            &mut expr_arena,
513            &mut lp_arena,
514            &mut self.opt_state,
515        )?;
516        let plan = IRPlan::new(node, lp_arena, expr_arena);
517        Ok(plan)
518    }
519
520    pub(crate) fn optimize_with_scratch(
521        self,
522        lp_arena: &mut Arena<IR>,
523        expr_arena: &mut Arena<AExpr>,
524        scratch: &mut Vec<Node>,
525    ) -> PolarsResult<Node> {
526        #[allow(unused_mut)]
527        let mut opt_state = self.opt_state;
528        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
529
530        #[cfg(feature = "cse")]
531        if new_streaming {
532            // The new streaming engine can't deal with the way the common
533            // subexpression elimination adds length-incorrect with_columns.
534            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
535        }
536
537        let lp_top = optimize(
538            self.logical_plan,
539            opt_state,
540            lp_arena,
541            expr_arena,
542            scratch,
543            apply_scan_predicate_to_scan_ir,
544        )?;
545
546        Ok(lp_top)
547    }
548
549    fn prepare_collect_post_opt<P>(
550        mut self,
551        check_sink: bool,
552        query_start: Option<std::time::Instant>,
553        post_opt: P,
554    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
555    where
556        P: FnOnce(
557            Node,
558            &mut Arena<IR>,
559            &mut Arena<AExpr>,
560            Option<std::time::Duration>,
561        ) -> PolarsResult<()>,
562    {
563        let (mut lp_arena, mut expr_arena) = self.get_arenas();
564
565        let mut scratch = vec![];
566        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
567
568        post_opt(
569            lp_top,
570            &mut lp_arena,
571            &mut expr_arena,
572            // Post optimization callback gets the time since the
573            // query was started as its "base" timepoint.
574            query_start.map(|s| s.elapsed()),
575        )?;
576
577        // sink should be replaced
578        let no_file_sink = if check_sink {
579            !matches!(
580                lp_arena.get(lp_top),
581                IR::Sink {
582                    payload: SinkTypeIR::File { .. },
583                    ..
584                }
585            )
586        } else {
587            true
588        };
589        let physical_plan = create_physical_plan(
590            lp_top,
591            &mut lp_arena,
592            &mut expr_arena,
593            BUILD_STREAMING_EXECUTOR,
594        )?;
595
596        let state = ExecutionState::new();
597        Ok((state, physical_plan, no_file_sink))
598    }
599
600    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
601    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
602    where
603        P: FnOnce(
604            Node,
605            &mut Arena<IR>,
606            &mut Arena<AExpr>,
607            Option<std::time::Duration>,
608        ) -> PolarsResult<()>,
609    {
610        let (mut state, mut physical_plan, _) =
611            self.prepare_collect_post_opt(false, None, post_opt)?;
612        physical_plan.execute(&mut state)
613    }
614
615    #[allow(unused_mut)]
616    fn prepare_collect(
617        self,
618        check_sink: bool,
619        query_start: Option<std::time::Instant>,
620    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
621        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
622    }
623
624    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
625    /// `engine`.
626    ///
627    /// The query is optimized prior to execution.
628    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
629        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
630            payload.clone()
631        } else {
632            self.logical_plan = DslPlan::Sink {
633                input: Arc::new(self.logical_plan),
634                payload: SinkType::Memory,
635            };
636            SinkType::Memory
637        };
638
639        // Default engine for collect is InMemory, sink_* is Streaming
640        if engine == Engine::Auto {
641            engine = match payload {
642                #[cfg(feature = "new_streaming")]
643                SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
644                _ => Engine::InMemory,
645            };
646        }
647        // Gpu uses some hacks to dispatch.
648        if engine == Engine::Gpu {
649            engine = Engine::InMemory;
650        }
651
652        #[cfg(feature = "new_streaming")]
653        {
654            if let Some(result) = self.try_new_streaming_if_requested() {
655                return result.map(|v| v.unwrap_single());
656            }
657        }
658
659        match engine {
660            Engine::Auto => unreachable!(),
661            Engine::Streaming => {
662                feature_gated!("new_streaming", self = self.with_new_streaming(true))
663            },
664            _ => {},
665        }
666        let mut alp_plan = self.clone().to_alp_optimized()?;
667
668        match engine {
669            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
670                let result = polars_stream::run_query(
671                    alp_plan.lp_top,
672                    &mut alp_plan.lp_arena,
673                    &mut alp_plan.expr_arena,
674                );
675                result.map(|v| v.unwrap_single())
676            }),
677            Engine::Gpu => {
678                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
679            },
680            Engine::InMemory => {
681                let mut physical_plan = create_physical_plan(
682                    alp_plan.lp_top,
683                    &mut alp_plan.lp_arena,
684                    &mut alp_plan.expr_arena,
685                    BUILD_STREAMING_EXECUTOR,
686                )?;
687                let mut state = ExecutionState::new();
688                physical_plan.execute(&mut state)
689            },
690        }
691    }
692
693    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
694        let sink_multiple = LazyFrame {
695            logical_plan: DslPlan::SinkMultiple { inputs: plans },
696            opt_state,
697            cached_arena: Default::default(),
698        };
699        sink_multiple.explain(true)
700    }
701
702    pub fn collect_all_with_engine(
703        plans: Vec<DslPlan>,
704        mut engine: Engine,
705        opt_state: OptFlags,
706    ) -> PolarsResult<Vec<DataFrame>> {
707        if plans.is_empty() {
708            return Ok(Vec::new());
709        }
710
711        // Default engine for collect_all is InMemory
712        if engine == Engine::Auto {
713            engine = Engine::InMemory;
714        }
715        // Gpu uses some hacks to dispatch.
716        if engine == Engine::Gpu {
717            engine = Engine::InMemory;
718        }
719
720        let mut sink_multiple = LazyFrame {
721            logical_plan: DslPlan::SinkMultiple { inputs: plans },
722            opt_state,
723            cached_arena: Default::default(),
724        };
725
726        #[cfg(feature = "new_streaming")]
727        {
728            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
729                return result.map(|v| v.unwrap_multiple());
730            }
731        }
732
733        match engine {
734            Engine::Auto => unreachable!(),
735            Engine::Streaming => {
736                feature_gated!(
737                    "new_streaming",
738                    sink_multiple = sink_multiple.with_new_streaming(true)
739                )
740            },
741            _ => {},
742        }
743        let mut alp_plan = sink_multiple.to_alp_optimized()?;
744
745        if engine == Engine::Streaming {
746            feature_gated!("new_streaming", {
747                let result = polars_stream::run_query(
748                    alp_plan.lp_top,
749                    &mut alp_plan.lp_arena,
750                    &mut alp_plan.expr_arena,
751                );
752                return result.map(|v| v.unwrap_multiple());
753            });
754        }
755
756        let IR::SinkMultiple { inputs } = alp_plan.root() else {
757            unreachable!()
758        };
759
760        let mut multiplan = create_multiple_physical_plans(
761            inputs.clone().as_slice(),
762            &mut alp_plan.lp_arena,
763            &mut alp_plan.expr_arena,
764            BUILD_STREAMING_EXECUTOR,
765        )?;
766
767        match engine {
768            Engine::Gpu => polars_bail!(
769                InvalidOperation: "collect_all is not supported for the gpu engine"
770            ),
771            Engine::InMemory => {
772                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
773                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
774                // within bounds
775                let mut state = ExecutionState::new();
776                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
777                    cache_prefiller.execute(&mut state)?;
778                }
779                let out = POOL.install(|| {
780                    multiplan
781                        .physical_plans
782                        .chunks_mut(POOL.current_num_threads() * 3)
783                        .map(|chunk| {
784                            chunk
785                                .into_par_iter()
786                                .enumerate()
787                                .map(|(idx, input)| {
788                                    let mut input = std::mem::take(input);
789                                    let mut state = state.split();
790                                    state.branch_idx += idx;
791
792                                    let df = input.execute(&mut state)?;
793                                    Ok(df)
794                                })
795                                .collect::<PolarsResult<Vec<_>>>()
796                        })
797                        .collect::<PolarsResult<Vec<_>>>()
798                });
799                Ok(out?.into_iter().flatten().collect())
800            },
801            _ => unreachable!(),
802        }
803    }
804
805    /// Execute all the lazy operations and collect them into a [`DataFrame`].
806    ///
807    /// The query is optimized prior to execution.
808    ///
809    /// # Example
810    ///
811    /// ```rust
812    /// use polars_core::prelude::*;
813    /// use polars_lazy::prelude::*;
814    ///
815    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
816    ///     df.lazy()
817    ///       .group_by([col("foo")])
818    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
819    ///       .collect()
820    /// }
821    /// ```
822    pub fn collect(self) -> PolarsResult<DataFrame> {
823        self.collect_with_engine(Engine::InMemory)
824    }
825
826    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
827    // This version does profiling of the node execution.
828    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
829    where
830        P: FnOnce(
831            Node,
832            &mut Arena<IR>,
833            &mut Arena<AExpr>,
834            Option<std::time::Duration>,
835        ) -> PolarsResult<()>,
836    {
837        let query_start = std::time::Instant::now();
838        let (mut state, mut physical_plan, _) =
839            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
840        state.time_nodes(query_start);
841        let out = physical_plan.execute(&mut state)?;
842        let timer_df = state.finish_timer()?;
843        Ok((out, timer_df))
844    }
845
846    /// Profile a LazyFrame.
847    ///
848    /// This will run the query and return a tuple
849    /// containing the materialized DataFrame and a DataFrame that contains profiling information
850    /// of each node that is executed.
851    ///
852    /// The units of the timings are microseconds.
853    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
854        self._profile_post_opt(|_, _, _, _| Ok(()))
855    }
856
857    pub fn sink_batches(
858        mut self,
859        function: PlanCallback<DataFrame, bool>,
860        maintain_order: bool,
861        chunk_size: Option<NonZeroUsize>,
862    ) -> PolarsResult<Self> {
863        use polars_plan::prelude::sink::CallbackSinkType;
864
865        polars_ensure!(
866            !matches!(self.logical_plan, DslPlan::Sink { .. }),
867            InvalidOperation: "cannot create a sink on top of another sink"
868        );
869
870        self.logical_plan = DslPlan::Sink {
871            input: Arc::new(self.logical_plan),
872            payload: SinkType::Callback(CallbackSinkType {
873                function,
874                maintain_order,
875                chunk_size,
876            }),
877        };
878
879        Ok(self)
880    }
881
882    #[cfg(feature = "new_streaming")]
883    pub fn try_new_streaming_if_requested(
884        &mut self,
885    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
886        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
887        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
888
889        if auto_new_streaming || force_new_streaming {
890            // Try to run using the new streaming engine, falling back
891            // if it fails in a todo!() error if auto_new_streaming is set.
892            let mut new_stream_lazy = self.clone();
893            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
894            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
895                Ok(v) => v,
896                Err(e) => return Some(Err(e)),
897            };
898
899            let f = || {
900                polars_stream::run_query(
901                    alp_plan.lp_top,
902                    &mut alp_plan.lp_arena,
903                    &mut alp_plan.expr_arena,
904                )
905            };
906
907            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
908                Ok(v) => return Some(v),
909                Err(e) => {
910                    // Fallback to normal engine if error is due to not being implemented
911                    // and auto_new_streaming is set, otherwise propagate error.
912                    if !force_new_streaming
913                        && auto_new_streaming
914                        && e.downcast_ref::<&str>()
915                            .map(|s| s.starts_with("not yet implemented"))
916                            .unwrap_or(false)
917                    {
918                        if polars_core::config::verbose() {
919                            eprintln!(
920                                "caught unimplemented error in new streaming engine, falling back to normal engine"
921                            );
922                        }
923                    } else {
924                        std::panic::resume_unwind(e);
925                    }
926                },
927            }
928        }
929
930        None
931    }
932
933    pub fn sink(
934        mut self,
935        sink_type: SinkDestination,
936        file_format: impl Into<Box<FileType>>,
937        unified_sink_args: UnifiedSinkArgs,
938    ) -> PolarsResult<Self> {
939        polars_ensure!(
940            !matches!(self.logical_plan, DslPlan::Sink { .. }),
941            InvalidOperation: "cannot create a sink on top of another sink"
942        );
943
944        self.logical_plan = DslPlan::Sink {
945            input: Arc::new(self.logical_plan),
946            payload: match sink_type {
947                SinkDestination::File { target } => SinkType::File(FileSinkOptions {
948                    target,
949                    file_format: file_format.into(),
950                    unified_sink_args,
951                }),
952                SinkDestination::Partitioned {
953                    base_path,
954                    file_path_provider,
955                    partition_strategy,
956                    finish_callback,
957                } => SinkType::Partitioned(PartitionedSinkOptions {
958                    base_path,
959                    file_path_provider,
960                    partition_strategy,
961                    finish_callback,
962                    file_format: file_format.into(),
963                    unified_sink_args,
964                }),
965            },
966        };
967        Ok(self)
968    }
969
970    /// Filter frame rows that match a predicate expression.
971    ///
972    /// The expression must yield boolean values (note that rows where the
973    /// predicate resolves to `null` are *not* included in the resulting frame).
974    ///
975    /// # Example
976    ///
977    /// ```rust
978    /// use polars_core::prelude::*;
979    /// use polars_lazy::prelude::*;
980    ///
981    /// fn example(df: DataFrame) -> LazyFrame {
982    ///       df.lazy()
983    ///         .filter(col("sepal_width").is_not_null())
984    ///         .select([col("sepal_width"), col("sepal_length")])
985    /// }
986    /// ```
987    pub fn filter(self, predicate: Expr) -> Self {
988        let opt_state = self.get_opt_state();
989        let lp = self.get_plan_builder().filter(predicate).build();
990        Self::from_logical_plan(lp, opt_state)
991    }
992
993    /// Remove frame rows that match a predicate expression.
994    ///
995    /// The expression must yield boolean values (note that rows where the
996    /// predicate resolves to `null` are *not* removed from the resulting frame).
997    ///
998    /// # Example
999    ///
1000    /// ```rust
1001    /// use polars_core::prelude::*;
1002    /// use polars_lazy::prelude::*;
1003    ///
1004    /// fn example(df: DataFrame) -> LazyFrame {
1005    ///       df.lazy()
1006    ///         .remove(col("sepal_width").is_null())
1007    ///         .select([col("sepal_width"), col("sepal_length")])
1008    /// }
1009    /// ```
1010    pub fn remove(self, predicate: Expr) -> Self {
1011        self.filter(predicate.neq_missing(lit(true)))
1012    }
1013
1014    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1015    ///
1016    /// Columns can be selected with [`col`];
1017    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1018    ///
1019    /// # Example
1020    ///
1021    /// ```rust
1022    /// use polars_core::prelude::*;
1023    /// use polars_lazy::prelude::*;
1024    ///
1025    /// /// This function selects column "foo" and column "bar".
1026    /// /// Column "bar" is renamed to "ham".
1027    /// fn example(df: DataFrame) -> LazyFrame {
1028    ///       df.lazy()
1029    ///         .select([col("foo"),
1030    ///                   col("bar").alias("ham")])
1031    /// }
1032    ///
1033    /// /// This function selects all columns except "foo"
1034    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1035    ///       df.lazy()
1036    ///         .select([all().exclude_cols(["foo"]).as_expr()])
1037    /// }
1038    /// ```
1039    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1040        let exprs = exprs.as_ref().to_vec();
1041        self.select_impl(
1042            exprs,
1043            ProjectionOptions {
1044                run_parallel: true,
1045                duplicate_check: true,
1046                should_broadcast: true,
1047            },
1048        )
1049    }
1050
1051    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1052        let exprs = exprs.as_ref().to_vec();
1053        self.select_impl(
1054            exprs,
1055            ProjectionOptions {
1056                run_parallel: false,
1057                duplicate_check: true,
1058                should_broadcast: true,
1059            },
1060        )
1061    }
1062
1063    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1064        let opt_state = self.get_opt_state();
1065        let lp = self.get_plan_builder().project(exprs, options).build();
1066        Self::from_logical_plan(lp, opt_state)
1067    }
1068
1069    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1070    ///
1071    /// Takes a list of expressions to group on.
1072    ///
1073    /// # Example
1074    ///
1075    /// ```rust
1076    /// use polars_core::prelude::*;
1077    /// use polars_lazy::prelude::*;
1078    ///
1079    /// fn example(df: DataFrame) -> LazyFrame {
1080    ///       df.lazy()
1081    ///        .group_by([col("date")])
1082    ///        .agg([
1083    ///            col("rain").min().alias("min_rain"),
1084    ///            col("rain").sum().alias("sum_rain"),
1085    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1086    ///        ])
1087    /// }
1088    /// ```
1089    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1090        let keys = by
1091            .as_ref()
1092            .iter()
1093            .map(|e| e.clone().into())
1094            .collect::<Vec<_>>();
1095        let opt_state = self.get_opt_state();
1096
1097        #[cfg(feature = "dynamic_group_by")]
1098        {
1099            LazyGroupBy {
1100                logical_plan: self.logical_plan,
1101                opt_state,
1102                keys,
1103                predicates: vec![],
1104                maintain_order: false,
1105                dynamic_options: None,
1106                rolling_options: None,
1107            }
1108        }
1109
1110        #[cfg(not(feature = "dynamic_group_by"))]
1111        {
1112            LazyGroupBy {
1113                logical_plan: self.logical_plan,
1114                opt_state,
1115                keys,
1116                predicates: vec![],
1117                maintain_order: false,
1118            }
1119        }
1120    }
1121
1122    /// Create rolling groups based on a time column.
1123    ///
1124    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1125    ///
1126    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1127    /// individual values and are not of constant intervals. For constant intervals use
1128    /// *group_by_dynamic*
1129    #[cfg(feature = "dynamic_group_by")]
1130    pub fn rolling<E: AsRef<[Expr]>>(
1131        mut self,
1132        index_column: Expr,
1133        group_by: E,
1134        mut options: RollingGroupOptions,
1135    ) -> LazyGroupBy {
1136        if let Expr::Column(name) = index_column {
1137            options.index_column = name;
1138        } else {
1139            let output_field = index_column
1140                .to_field(&self.collect_schema().unwrap())
1141                .unwrap();
1142            return self.with_column(index_column).rolling(
1143                Expr::Column(output_field.name().clone()),
1144                group_by,
1145                options,
1146            );
1147        }
1148        let opt_state = self.get_opt_state();
1149        LazyGroupBy {
1150            logical_plan: self.logical_plan,
1151            opt_state,
1152            predicates: vec![],
1153            keys: group_by.as_ref().to_vec(),
1154            maintain_order: true,
1155            dynamic_options: None,
1156            rolling_options: Some(options),
1157        }
1158    }
1159
1160    /// Group based on a time value (or index value of type Int32, Int64).
1161    ///
1162    /// Time windows are calculated and rows are assigned to windows. Different from a
1163    /// normal group_by is that a row can be member of multiple groups. The time/index
1164    /// window could be seen as a rolling window, with a window size determined by
1165    /// dates/times/values instead of slots in the DataFrame.
1166    ///
1167    /// A window is defined by:
1168    ///
1169    /// - every: interval of the window
1170    /// - period: length of the window
1171    /// - offset: offset of the window
1172    ///
1173    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1174    /// with a ordinary group_by on these keys.
1175    #[cfg(feature = "dynamic_group_by")]
1176    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1177        mut self,
1178        index_column: Expr,
1179        group_by: E,
1180        mut options: DynamicGroupOptions,
1181    ) -> LazyGroupBy {
1182        if let Expr::Column(name) = index_column {
1183            options.index_column = name;
1184        } else {
1185            let output_field = index_column
1186                .to_field(&self.collect_schema().unwrap())
1187                .unwrap();
1188            return self.with_column(index_column).group_by_dynamic(
1189                Expr::Column(output_field.name().clone()),
1190                group_by,
1191                options,
1192            );
1193        }
1194        let opt_state = self.get_opt_state();
1195        LazyGroupBy {
1196            logical_plan: self.logical_plan,
1197            opt_state,
1198            predicates: vec![],
1199            keys: group_by.as_ref().to_vec(),
1200            maintain_order: true,
1201            dynamic_options: Some(options),
1202            rolling_options: None,
1203        }
1204    }
1205
1206    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1207    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1208        let keys = by
1209            .as_ref()
1210            .iter()
1211            .map(|e| e.clone().into())
1212            .collect::<Vec<_>>();
1213        let opt_state = self.get_opt_state();
1214
1215        #[cfg(feature = "dynamic_group_by")]
1216        {
1217            LazyGroupBy {
1218                logical_plan: self.logical_plan,
1219                opt_state,
1220                keys,
1221                predicates: vec![],
1222                maintain_order: true,
1223                dynamic_options: None,
1224                rolling_options: None,
1225            }
1226        }
1227
1228        #[cfg(not(feature = "dynamic_group_by"))]
1229        {
1230            LazyGroupBy {
1231                logical_plan: self.logical_plan,
1232                opt_state,
1233                keys,
1234                predicates: vec![],
1235                maintain_order: true,
1236            }
1237        }
1238    }
1239
1240    /// Left anti join this query with another lazy query.
1241    ///
1242    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1243    /// flexible join logic, see [`join`](LazyFrame::join) or
1244    /// [`join_builder`](LazyFrame::join_builder).
1245    ///
1246    /// # Example
1247    ///
1248    /// ```rust
1249    /// use polars_core::prelude::*;
1250    /// use polars_lazy::prelude::*;
1251    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1252    ///         ldf
1253    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1254    /// }
1255    /// ```
1256    #[cfg(feature = "semi_anti_join")]
1257    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1258        self.join(
1259            other,
1260            [left_on.into()],
1261            [right_on.into()],
1262            JoinArgs::new(JoinType::Anti),
1263        )
1264    }
1265
1266    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1267    #[cfg(feature = "cross_join")]
1268    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1269        self.join(
1270            other,
1271            vec![],
1272            vec![],
1273            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1274        )
1275    }
1276
1277    /// Left outer join this query with another lazy query.
1278    ///
1279    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1280    /// flexible join logic, see [`join`](LazyFrame::join) or
1281    /// [`join_builder`](LazyFrame::join_builder).
1282    ///
1283    /// # Example
1284    ///
1285    /// ```rust
1286    /// use polars_core::prelude::*;
1287    /// use polars_lazy::prelude::*;
1288    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1289    ///         ldf
1290    ///         .left_join(other, col("foo"), col("bar"))
1291    /// }
1292    /// ```
1293    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1294        self.join(
1295            other,
1296            [left_on.into()],
1297            [right_on.into()],
1298            JoinArgs::new(JoinType::Left),
1299        )
1300    }
1301
1302    /// Inner join this query with another lazy query.
1303    ///
1304    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1305    /// flexible join logic, see [`join`](LazyFrame::join) or
1306    /// [`join_builder`](LazyFrame::join_builder).
1307    ///
1308    /// # Example
1309    ///
1310    /// ```rust
1311    /// use polars_core::prelude::*;
1312    /// use polars_lazy::prelude::*;
1313    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1314    ///         ldf
1315    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1316    /// }
1317    /// ```
1318    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1319        self.join(
1320            other,
1321            [left_on.into()],
1322            [right_on.into()],
1323            JoinArgs::new(JoinType::Inner),
1324        )
1325    }
1326
1327    /// Full outer join this query with another lazy query.
1328    ///
1329    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1330    /// flexible join logic, see [`join`](LazyFrame::join) or
1331    /// [`join_builder`](LazyFrame::join_builder).
1332    ///
1333    /// # Example
1334    ///
1335    /// ```rust
1336    /// use polars_core::prelude::*;
1337    /// use polars_lazy::prelude::*;
1338    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1339    ///         ldf
1340    ///         .full_join(other, col("foo"), col("bar"))
1341    /// }
1342    /// ```
1343    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1344        self.join(
1345            other,
1346            [left_on.into()],
1347            [right_on.into()],
1348            JoinArgs::new(JoinType::Full),
1349        )
1350    }
1351
1352    /// Left semi join this query with another lazy query.
1353    ///
1354    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1355    /// flexible join logic, see [`join`](LazyFrame::join) or
1356    /// [`join_builder`](LazyFrame::join_builder).
1357    ///
1358    /// # Example
1359    ///
1360    /// ```rust
1361    /// use polars_core::prelude::*;
1362    /// use polars_lazy::prelude::*;
1363    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1364    ///         ldf
1365    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1366    /// }
1367    /// ```
1368    #[cfg(feature = "semi_anti_join")]
1369    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1370        self.join(
1371            other,
1372            [left_on.into()],
1373            [right_on.into()],
1374            JoinArgs::new(JoinType::Semi),
1375        )
1376    }
1377
1378    /// Generic function to join two LazyFrames.
1379    ///
1380    /// `join` can join on multiple columns, given as two list of expressions, and with a
1381    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1382    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1383    /// over how columns are renamed and parallelization options, use
1384    /// [`join_builder`](LazyFrame::join_builder).
1385    ///
1386    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1387    ///
1388    /// # Example
1389    ///
1390    /// ```rust
1391    /// use polars_core::prelude::*;
1392    /// use polars_lazy::prelude::*;
1393    ///
1394    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1395    ///         ldf
1396    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1397    /// }
1398    /// ```
1399    pub fn join<E: AsRef<[Expr]>>(
1400        self,
1401        other: LazyFrame,
1402        left_on: E,
1403        right_on: E,
1404        args: JoinArgs,
1405    ) -> LazyFrame {
1406        let left_on = left_on.as_ref().to_vec();
1407        let right_on = right_on.as_ref().to_vec();
1408
1409        self._join_impl(other, left_on, right_on, args)
1410    }
1411
1412    fn _join_impl(
1413        self,
1414        other: LazyFrame,
1415        left_on: Vec<Expr>,
1416        right_on: Vec<Expr>,
1417        args: JoinArgs,
1418    ) -> LazyFrame {
1419        let JoinArgs {
1420            how,
1421            validation,
1422            suffix,
1423            slice,
1424            nulls_equal,
1425            coalesce,
1426            maintain_order,
1427        } = args;
1428
1429        if slice.is_some() {
1430            panic!("impl error: slice is not handled")
1431        }
1432
1433        let mut builder = self
1434            .join_builder()
1435            .with(other)
1436            .left_on(left_on)
1437            .right_on(right_on)
1438            .how(how)
1439            .validate(validation)
1440            .join_nulls(nulls_equal)
1441            .coalesce(coalesce)
1442            .maintain_order(maintain_order);
1443
1444        if let Some(suffix) = suffix {
1445            builder = builder.suffix(suffix);
1446        }
1447
1448        // Note: args.slice is set by the optimizer
1449        builder.finish()
1450    }
1451
1452    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1453    ///
1454    /// After the `JoinBuilder` has been created and set up, calling
1455    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1456    /// representing the `join` operation.
1457    pub fn join_builder(self) -> JoinBuilder {
1458        JoinBuilder::new(self)
1459    }
1460
1461    /// Add or replace a column, given as an expression, to a DataFrame.
1462    ///
1463    /// # Example
1464    ///
1465    /// ```rust
1466    /// use polars_core::prelude::*;
1467    /// use polars_lazy::prelude::*;
1468    /// fn add_column(df: DataFrame) -> LazyFrame {
1469    ///     df.lazy()
1470    ///         .with_column(
1471    ///             when(col("sepal_length").lt(lit(5.0)))
1472    ///             .then(lit(10))
1473    ///             .otherwise(lit(1))
1474    ///             .alias("new_column_name"),
1475    ///         )
1476    /// }
1477    /// ```
1478    pub fn with_column(self, expr: Expr) -> LazyFrame {
1479        let opt_state = self.get_opt_state();
1480        let lp = self
1481            .get_plan_builder()
1482            .with_columns(
1483                vec![expr],
1484                ProjectionOptions {
1485                    run_parallel: false,
1486                    duplicate_check: true,
1487                    should_broadcast: true,
1488                },
1489            )
1490            .build();
1491        Self::from_logical_plan(lp, opt_state)
1492    }
1493
1494    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1495    ///
1496    /// # Example
1497    ///
1498    /// ```rust
1499    /// use polars_core::prelude::*;
1500    /// use polars_lazy::prelude::*;
1501    /// fn add_columns(df: DataFrame) -> LazyFrame {
1502    ///     df.lazy()
1503    ///         .with_columns(
1504    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1505    ///          )
1506    /// }
1507    /// ```
1508    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1509        let exprs = exprs.as_ref().to_vec();
1510        self.with_columns_impl(
1511            exprs,
1512            ProjectionOptions {
1513                run_parallel: true,
1514                duplicate_check: true,
1515                should_broadcast: true,
1516            },
1517        )
1518    }
1519
1520    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1521    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1522        let exprs = exprs.as_ref().to_vec();
1523        self.with_columns_impl(
1524            exprs,
1525            ProjectionOptions {
1526                run_parallel: false,
1527                duplicate_check: true,
1528                should_broadcast: true,
1529            },
1530        )
1531    }
1532
1533    /// Match or evolve to a certain schema.
1534    pub fn match_to_schema(
1535        self,
1536        schema: SchemaRef,
1537        per_column: Arc<[MatchToSchemaPerColumn]>,
1538        extra_columns: ExtraColumnsPolicy,
1539    ) -> LazyFrame {
1540        let opt_state = self.get_opt_state();
1541        let lp = self
1542            .get_plan_builder()
1543            .match_to_schema(schema, per_column, extra_columns)
1544            .build();
1545        Self::from_logical_plan(lp, opt_state)
1546    }
1547
1548    pub fn pipe_with_schema(
1549        self,
1550        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1551    ) -> Self {
1552        let opt_state = self.get_opt_state();
1553        let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1554        Self::from_logical_plan(lp, opt_state)
1555    }
1556
1557    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1558        let opt_state = self.get_opt_state();
1559        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1560        Self::from_logical_plan(lp, opt_state)
1561    }
1562
1563    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1564        let contexts = contexts
1565            .as_ref()
1566            .iter()
1567            .map(|lf| lf.logical_plan.clone())
1568            .collect();
1569        let opt_state = self.get_opt_state();
1570        let lp = self.get_plan_builder().with_context(contexts).build();
1571        Self::from_logical_plan(lp, opt_state)
1572    }
1573
1574    /// Aggregate all the columns as their maximum values.
1575    ///
1576    /// Aggregated columns will have the same names as the original columns.
1577    pub fn max(self) -> Self {
1578        self.map_private(DslFunction::Stats(StatsFunction::Max))
1579    }
1580
1581    /// Aggregate all the columns as their minimum values.
1582    ///
1583    /// Aggregated columns will have the same names as the original columns.
1584    pub fn min(self) -> Self {
1585        self.map_private(DslFunction::Stats(StatsFunction::Min))
1586    }
1587
1588    /// Aggregate all the columns as their sum values.
1589    ///
1590    /// Aggregated columns will have the same names as the original columns.
1591    ///
1592    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1593    /// - For integer columns, the ordinary checks for overflow are performed:
1594    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1595    ///   silently wrap.
1596    /// - String columns will sum to None.
1597    pub fn sum(self) -> Self {
1598        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1599    }
1600
1601    /// Aggregate all the columns as their mean values.
1602    ///
1603    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1604    /// - String columns will have a mean of None.
1605    pub fn mean(self) -> Self {
1606        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1607    }
1608
1609    /// Aggregate all the columns as their median values.
1610    ///
1611    /// - Boolean and integer results are converted to `f64`. However, they are still
1612    ///   susceptible to overflow before this conversion occurs.
1613    /// - String columns will sum to None.
1614    pub fn median(self) -> Self {
1615        self.map_private(DslFunction::Stats(StatsFunction::Median))
1616    }
1617
1618    /// Aggregate all the columns as their quantile values.
1619    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1620        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1621            quantile,
1622            method,
1623        }))
1624    }
1625
1626    /// Aggregate all the columns as their standard deviation values.
1627    ///
1628    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1629    /// computing the variance, where `N` is the number of rows.
1630    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1631    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1632    /// > likelihood estimate of the variance for normally distributed variables. The
1633    /// > standard deviation computed in this function is the square root of the estimated
1634    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1635    /// > standard deviation per se.
1636    ///
1637    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1638    pub fn std(self, ddof: u8) -> Self {
1639        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1640    }
1641
1642    /// Aggregate all the columns as their variance values.
1643    ///
1644    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1645    /// computing the variance, where `N` is the number of rows.
1646    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1647    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1648    /// > likelihood estimate of the variance for normally distributed variables.
1649    ///
1650    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1651    pub fn var(self, ddof: u8) -> Self {
1652        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1653    }
1654
1655    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1656    pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1657        self.explode_impl(columns, options, false)
1658    }
1659
1660    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1661    fn explode_impl(
1662        self,
1663        columns: Selector,
1664        options: ExplodeOptions,
1665        allow_empty: bool,
1666    ) -> LazyFrame {
1667        let opt_state = self.get_opt_state();
1668        let lp = self
1669            .get_plan_builder()
1670            .explode(columns, options, allow_empty)
1671            .build();
1672        Self::from_logical_plan(lp, opt_state)
1673    }
1674
1675    /// Aggregate all the columns as the sum of their null value count.
1676    pub fn null_count(self) -> LazyFrame {
1677        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1678    }
1679
1680    /// Drop non-unique rows and maintain the order of kept rows.
1681    ///
1682    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1683    /// `None`, all columns are considered.
1684    pub fn unique_stable(
1685        self,
1686        subset: Option<Selector>,
1687        keep_strategy: UniqueKeepStrategy,
1688    ) -> LazyFrame {
1689        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1690        self.unique_stable_generic(subset, keep_strategy)
1691    }
1692
1693    pub fn unique_stable_generic(
1694        self,
1695        subset: Option<Vec<Expr>>,
1696        keep_strategy: UniqueKeepStrategy,
1697    ) -> LazyFrame {
1698        let opt_state = self.get_opt_state();
1699        let options = DistinctOptionsDSL {
1700            subset,
1701            maintain_order: true,
1702            keep_strategy,
1703        };
1704        let lp = self.get_plan_builder().distinct(options).build();
1705        Self::from_logical_plan(lp, opt_state)
1706    }
1707
1708    /// Drop non-unique rows without maintaining the order of kept rows.
1709    ///
1710    /// The order of the kept rows may change; to maintain the original row order, use
1711    /// [`unique_stable`](LazyFrame::unique_stable).
1712    ///
1713    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1714    /// all columns are considered.
1715    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1716        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1717        self.unique_generic(subset, keep_strategy)
1718    }
1719
1720    pub fn unique_generic(
1721        self,
1722        subset: Option<Vec<Expr>>,
1723        keep_strategy: UniqueKeepStrategy,
1724    ) -> LazyFrame {
1725        let opt_state = self.get_opt_state();
1726        let options = DistinctOptionsDSL {
1727            subset,
1728            maintain_order: false,
1729            keep_strategy,
1730        };
1731        let lp = self.get_plan_builder().distinct(options).build();
1732        Self::from_logical_plan(lp, opt_state)
1733    }
1734
1735    /// Drop rows containing one or more NaN values.
1736    ///
1737    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1738    /// floating point columns are considered.
1739    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1740        let opt_state = self.get_opt_state();
1741        let lp = self.get_plan_builder().drop_nans(subset).build();
1742        Self::from_logical_plan(lp, opt_state)
1743    }
1744
1745    /// Drop rows containing one or more None values.
1746    ///
1747    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1748    /// columns are considered.
1749    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1750        let opt_state = self.get_opt_state();
1751        let lp = self.get_plan_builder().drop_nulls(subset).build();
1752        Self::from_logical_plan(lp, opt_state)
1753    }
1754
1755    /// Slice the DataFrame using an offset (starting row) and a length.
1756    ///
1757    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1758    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1759    /// end.
1760    ///
1761    /// If `offset` and `len` are such that the slice extends beyond the end of the
1762    /// DataFrame, the portion between `offset` and the end will be returned. In this
1763    /// case, the number of rows in the returned DataFrame will be less than `len`.
1764    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1765        let opt_state = self.get_opt_state();
1766        let lp = self.get_plan_builder().slice(offset, len).build();
1767        Self::from_logical_plan(lp, opt_state)
1768    }
1769
1770    /// Get the first row.
1771    ///
1772    /// Equivalent to `self.slice(0, 1)`.
1773    pub fn first(self) -> LazyFrame {
1774        self.slice(0, 1)
1775    }
1776
1777    /// Get the last row.
1778    ///
1779    /// Equivalent to `self.slice(-1, 1)`.
1780    pub fn last(self) -> LazyFrame {
1781        self.slice(-1, 1)
1782    }
1783
1784    /// Get the last `n` rows.
1785    ///
1786    /// Equivalent to `self.slice(-(n as i64), n)`.
1787    pub fn tail(self, n: IdxSize) -> LazyFrame {
1788        let neg_tail = -(n as i64);
1789        self.slice(neg_tail, n)
1790    }
1791
1792    #[cfg(feature = "pivot")]
1793    #[expect(clippy::too_many_arguments)]
1794    pub fn pivot(
1795        self,
1796        on: Selector,
1797        on_columns: Arc<DataFrame>,
1798        index: Selector,
1799        values: Selector,
1800        agg: Expr,
1801        maintain_order: bool,
1802        separator: PlSmallStr,
1803    ) -> LazyFrame {
1804        let opt_state = self.get_opt_state();
1805        let lp = self
1806            .get_plan_builder()
1807            .pivot(
1808                on,
1809                on_columns,
1810                index,
1811                values,
1812                agg,
1813                maintain_order,
1814                separator,
1815            )
1816            .build();
1817        Self::from_logical_plan(lp, opt_state)
1818    }
1819
1820    /// Unpivot the DataFrame from wide to long format.
1821    ///
1822    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1823    #[cfg(feature = "pivot")]
1824    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1825        let opt_state = self.get_opt_state();
1826        let lp = self.get_plan_builder().unpivot(args).build();
1827        Self::from_logical_plan(lp, opt_state)
1828    }
1829
1830    /// Limit the DataFrame to the first `n` rows.
1831    pub fn limit(self, n: IdxSize) -> LazyFrame {
1832        self.slice(0, n)
1833    }
1834
1835    /// Apply a function/closure once the logical plan get executed.
1836    ///
1837    /// The function has access to the whole materialized DataFrame at the time it is
1838    /// called.
1839    ///
1840    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1841    /// with `LazyFrame::with_column` or `with_columns`.
1842    ///
1843    /// ## Warning
1844    /// This can blow up in your face if the schema is changed due to the operation. The
1845    /// optimizer relies on a correct schema.
1846    ///
1847    /// You can toggle certain optimizations off.
1848    pub fn map<F>(
1849        self,
1850        function: F,
1851        optimizations: AllowedOptimizations,
1852        schema: Option<Arc<dyn UdfSchema>>,
1853        name: Option<&'static str>,
1854    ) -> LazyFrame
1855    where
1856        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1857    {
1858        let opt_state = self.get_opt_state();
1859        let lp = self
1860            .get_plan_builder()
1861            .map(
1862                function,
1863                optimizations,
1864                schema,
1865                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1866            )
1867            .build();
1868        Self::from_logical_plan(lp, opt_state)
1869    }
1870
1871    #[cfg(feature = "python")]
1872    pub fn map_python(
1873        self,
1874        function: polars_utils::python_function::PythonFunction,
1875        optimizations: AllowedOptimizations,
1876        schema: Option<SchemaRef>,
1877        validate_output: bool,
1878    ) -> LazyFrame {
1879        let opt_state = self.get_opt_state();
1880        let lp = self
1881            .get_plan_builder()
1882            .map_python(function, optimizations, schema, validate_output)
1883            .build();
1884        Self::from_logical_plan(lp, opt_state)
1885    }
1886
1887    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1888        let opt_state = self.get_opt_state();
1889        let lp = self.get_plan_builder().map_private(function).build();
1890        Self::from_logical_plan(lp, opt_state)
1891    }
1892
1893    /// Add a new column at index 0 that counts the rows.
1894    ///
1895    /// `name` is the name of the new column. `offset` is where to start counting from; if
1896    /// `None`, it is set to `0`.
1897    ///
1898    /// # Warning
1899    /// This can have a negative effect on query performance. This may for instance block
1900    /// predicate pushdown optimization.
1901    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1902    where
1903        S: Into<PlSmallStr>,
1904    {
1905        let name = name.into();
1906
1907        match &self.logical_plan {
1908            v @ DslPlan::Scan {
1909                scan_type,
1910                unified_scan_args,
1911                ..
1912            } if unified_scan_args.row_index.is_none()
1913                && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1914            {
1915                let DslPlan::Scan {
1916                    sources,
1917                    mut unified_scan_args,
1918                    scan_type,
1919                    cached_ir: _,
1920                } = v.clone()
1921                else {
1922                    unreachable!()
1923                };
1924
1925                unified_scan_args.row_index = Some(RowIndex {
1926                    name,
1927                    offset: offset.unwrap_or(0),
1928                });
1929
1930                DslPlan::Scan {
1931                    sources,
1932                    unified_scan_args,
1933                    scan_type,
1934                    cached_ir: Default::default(),
1935                }
1936                .into()
1937            },
1938            _ => self.map_private(DslFunction::RowIndex { name, offset }),
1939        }
1940    }
1941
1942    /// Return the number of non-null elements for each column.
1943    pub fn count(self) -> LazyFrame {
1944        self.select(vec![col(PlSmallStr::from_static("*")).count()])
1945    }
1946
1947    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
1948    /// inserted as columns.
1949    #[cfg(feature = "dtype-struct")]
1950    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1951        self.map_private(DslFunction::Unnest {
1952            columns: cols,
1953            separator,
1954        })
1955    }
1956
1957    #[cfg(feature = "merge_sorted")]
1958    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1959    where
1960        S: Into<PlSmallStr>,
1961    {
1962        let key = key.into();
1963
1964        let lp = DslPlan::MergeSorted {
1965            input_left: Arc::new(self.logical_plan),
1966            input_right: Arc::new(other.logical_plan),
1967            key,
1968        };
1969        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1970    }
1971
1972    pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1973        let lp = DslPlan::MapFunction {
1974            input: Arc::new(self.logical_plan),
1975            function: DslFunction::Hint(hint),
1976        };
1977        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1978    }
1979}
1980
1981/// Utility struct for lazy group_by operation.
1982#[derive(Clone)]
1983pub struct LazyGroupBy {
1984    pub logical_plan: DslPlan,
1985    opt_state: OptFlags,
1986    keys: Vec<Expr>,
1987    predicates: Vec<Expr>,
1988    maintain_order: bool,
1989    #[cfg(feature = "dynamic_group_by")]
1990    dynamic_options: Option<DynamicGroupOptions>,
1991    #[cfg(feature = "dynamic_group_by")]
1992    rolling_options: Option<RollingGroupOptions>,
1993}
1994
1995impl From<LazyGroupBy> for LazyFrame {
1996    fn from(lgb: LazyGroupBy) -> Self {
1997        Self {
1998            logical_plan: lgb.logical_plan,
1999            opt_state: lgb.opt_state,
2000            cached_arena: Default::default(),
2001        }
2002    }
2003}
2004
2005impl LazyGroupBy {
2006    /// Filter groups with a predicate after aggregation.
2007    ///
2008    /// Similarly to the [LazyGroupBy::agg] method, the predicate must run an aggregation as it
2009    /// is evaluated on the groups.
2010    /// This method can be chained in which case all predicates must evaluate to `true` for a
2011    /// group to be kept.
2012    ///
2013    /// # Example
2014    ///
2015    /// ```rust
2016    /// use polars_core::prelude::*;
2017    /// use polars_lazy::prelude::*;
2018    ///
2019    /// fn example(df: DataFrame) -> LazyFrame {
2020    ///       df.lazy()
2021    ///        .group_by_stable([col("date")])
2022    ///        .having(col("rain").sum().gt(lit(10)))
2023    ///        .agg([col("rain").min().alias("min_rain")])
2024    /// }
2025    /// ```
2026    pub fn having(mut self, predicate: Expr) -> Self {
2027        self.predicates.push(predicate);
2028        self
2029    }
2030
2031    /// Group by and aggregate.
2032    ///
2033    /// Select a column with [col] and choose an aggregation.
2034    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2035    ///
2036    /// # Example
2037    ///
2038    /// ```rust
2039    /// use polars_core::prelude::*;
2040    /// use polars_lazy::prelude::*;
2041    ///
2042    /// fn example(df: DataFrame) -> LazyFrame {
2043    ///       df.lazy()
2044    ///        .group_by_stable([col("date")])
2045    ///        .agg([
2046    ///            col("rain").min().alias("min_rain"),
2047    ///            col("rain").sum().alias("sum_rain"),
2048    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2049    ///        ])
2050    /// }
2051    /// ```
2052    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2053        #[cfg(feature = "dynamic_group_by")]
2054        let lp = DslBuilder::from(self.logical_plan)
2055            .group_by(
2056                self.keys,
2057                self.predicates,
2058                aggs,
2059                None,
2060                self.maintain_order,
2061                self.dynamic_options,
2062                self.rolling_options,
2063            )
2064            .build();
2065
2066        #[cfg(not(feature = "dynamic_group_by"))]
2067        let lp = DslBuilder::from(self.logical_plan)
2068            .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2069            .build();
2070        LazyFrame::from_logical_plan(lp, self.opt_state)
2071    }
2072
2073    /// Return first n rows of each group
2074    pub fn head(self, n: Option<usize>) -> LazyFrame {
2075        let keys = self
2076            .keys
2077            .iter()
2078            .filter_map(|expr| expr_output_name(expr).ok())
2079            .collect::<Vec<_>>();
2080
2081        self.agg([all().as_expr().head(n)]).explode_impl(
2082            all() - by_name(keys.iter().cloned(), false),
2083            ExplodeOptions {
2084                empty_as_null: true,
2085                keep_nulls: true,
2086            },
2087            true,
2088        )
2089    }
2090
2091    /// Return last n rows of each group
2092    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2093        let keys = self
2094            .keys
2095            .iter()
2096            .filter_map(|expr| expr_output_name(expr).ok())
2097            .collect::<Vec<_>>();
2098
2099        self.agg([all().as_expr().tail(n)]).explode_impl(
2100            all() - by_name(keys.iter().cloned(), false),
2101            ExplodeOptions {
2102                empty_as_null: true,
2103                keep_nulls: true,
2104            },
2105            true,
2106        )
2107    }
2108
2109    /// Apply a function over the groups as a new DataFrame.
2110    ///
2111    /// **It is not recommended that you use this as materializing the DataFrame is very
2112    /// expensive.**
2113    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2114        if !self.predicates.is_empty() {
2115            panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2116        }
2117
2118        #[cfg(feature = "dynamic_group_by")]
2119        let options = GroupbyOptions {
2120            dynamic: self.dynamic_options,
2121            rolling: self.rolling_options,
2122            slice: None,
2123        };
2124
2125        #[cfg(not(feature = "dynamic_group_by"))]
2126        let options = GroupbyOptions { slice: None };
2127
2128        let lp = DslPlan::GroupBy {
2129            input: Arc::new(self.logical_plan),
2130            keys: self.keys,
2131            predicates: vec![],
2132            aggs: vec![],
2133            apply: Some((f, schema)),
2134            maintain_order: self.maintain_order,
2135            options: Arc::new(options),
2136        };
2137        LazyFrame::from_logical_plan(lp, self.opt_state)
2138    }
2139}
2140
2141#[must_use]
2142pub struct JoinBuilder {
2143    lf: LazyFrame,
2144    how: JoinType,
2145    other: Option<LazyFrame>,
2146    left_on: Vec<Expr>,
2147    right_on: Vec<Expr>,
2148    allow_parallel: bool,
2149    force_parallel: bool,
2150    suffix: Option<PlSmallStr>,
2151    validation: JoinValidation,
2152    nulls_equal: bool,
2153    coalesce: JoinCoalesce,
2154    maintain_order: MaintainOrderJoin,
2155}
2156impl JoinBuilder {
2157    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2158    pub fn new(lf: LazyFrame) -> Self {
2159        Self {
2160            lf,
2161            other: None,
2162            how: JoinType::Inner,
2163            left_on: vec![],
2164            right_on: vec![],
2165            allow_parallel: true,
2166            force_parallel: false,
2167            suffix: None,
2168            validation: Default::default(),
2169            nulls_equal: false,
2170            coalesce: Default::default(),
2171            maintain_order: Default::default(),
2172        }
2173    }
2174
2175    /// The right table in the join.
2176    pub fn with(mut self, other: LazyFrame) -> Self {
2177        self.other = Some(other);
2178        self
2179    }
2180
2181    /// Select the join type.
2182    pub fn how(mut self, how: JoinType) -> Self {
2183        self.how = how;
2184        self
2185    }
2186
2187    pub fn validate(mut self, validation: JoinValidation) -> Self {
2188        self.validation = validation;
2189        self
2190    }
2191
2192    /// The expressions you want to join both tables on.
2193    ///
2194    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2195    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2196        let on = on.as_ref().to_vec();
2197        self.left_on.clone_from(&on);
2198        self.right_on = on;
2199        self
2200    }
2201
2202    /// The expressions you want to join the left table on.
2203    ///
2204    /// The passed expressions must be valid in the left table.
2205    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2206        self.left_on = on.as_ref().to_vec();
2207        self
2208    }
2209
2210    /// The expressions you want to join the right table on.
2211    ///
2212    /// The passed expressions must be valid in the right table.
2213    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2214        self.right_on = on.as_ref().to_vec();
2215        self
2216    }
2217
2218    /// Allow parallel table evaluation.
2219    pub fn allow_parallel(mut self, allow: bool) -> Self {
2220        self.allow_parallel = allow;
2221        self
2222    }
2223
2224    /// Force parallel table evaluation.
2225    pub fn force_parallel(mut self, force: bool) -> Self {
2226        self.force_parallel = force;
2227        self
2228    }
2229
2230    /// Join on null values. By default null values will never produce matches.
2231    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2232        self.nulls_equal = nulls_equal;
2233        self
2234    }
2235
2236    /// Suffix to add duplicate column names in join.
2237    /// Defaults to `"_right"` if this method is never called.
2238    pub fn suffix<S>(mut self, suffix: S) -> Self
2239    where
2240        S: Into<PlSmallStr>,
2241    {
2242        self.suffix = Some(suffix.into());
2243        self
2244    }
2245
2246    /// Whether to coalesce join columns.
2247    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2248        self.coalesce = coalesce;
2249        self
2250    }
2251
2252    /// Whether to preserve the row order.
2253    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2254        self.maintain_order = maintain_order;
2255        self
2256    }
2257
2258    /// Finish builder
2259    pub fn finish(self) -> LazyFrame {
2260        let opt_state = self.lf.opt_state;
2261        let other = self.other.expect("'with' not set in join builder");
2262
2263        let args = JoinArgs {
2264            how: self.how,
2265            validation: self.validation,
2266            suffix: self.suffix,
2267            slice: None,
2268            nulls_equal: self.nulls_equal,
2269            coalesce: self.coalesce,
2270            maintain_order: self.maintain_order,
2271        };
2272
2273        let lp = self
2274            .lf
2275            .get_plan_builder()
2276            .join(
2277                other.logical_plan,
2278                self.left_on,
2279                self.right_on,
2280                JoinOptions {
2281                    allow_parallel: self.allow_parallel,
2282                    force_parallel: self.force_parallel,
2283                    args,
2284                }
2285                .into(),
2286            )
2287            .build();
2288        LazyFrame::from_logical_plan(lp, opt_state)
2289    }
2290
2291    // Finish with join predicates
2292    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2293        let opt_state = self.lf.opt_state;
2294        let other = self.other.expect("with not set");
2295
2296        // Decompose `And` conjunctions into their component expressions
2297        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2298            if let Expr::BinaryExpr {
2299                op: Operator::And,
2300                left,
2301                right,
2302            } = predicate
2303            {
2304                decompose_and((*left).clone(), expanded_predicates);
2305                decompose_and((*right).clone(), expanded_predicates);
2306            } else {
2307                expanded_predicates.push(predicate);
2308            }
2309        }
2310        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2311        for predicate in predicates {
2312            decompose_and(predicate, &mut expanded_predicates);
2313        }
2314        let predicates: Vec<Expr> = expanded_predicates;
2315
2316        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2317        #[cfg(feature = "is_between")]
2318        let predicates: Vec<Expr> = {
2319            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2320            for predicate in predicates {
2321                if let Expr::Function {
2322                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2323                    input,
2324                    ..
2325                } = &predicate
2326                {
2327                    if let [expr, lower, upper] = input.as_slice() {
2328                        match closed {
2329                            ClosedInterval::Both => {
2330                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2331                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2332                            },
2333                            ClosedInterval::Right => {
2334                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2335                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2336                            },
2337                            ClosedInterval::Left => {
2338                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2339                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2340                            },
2341                            ClosedInterval::None => {
2342                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2343                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2344                            },
2345                        }
2346                        continue;
2347                    }
2348                }
2349                expanded_predicates.push(predicate);
2350            }
2351            expanded_predicates
2352        };
2353
2354        let args = JoinArgs {
2355            how: self.how,
2356            validation: self.validation,
2357            suffix: self.suffix,
2358            slice: None,
2359            nulls_equal: self.nulls_equal,
2360            coalesce: self.coalesce,
2361            maintain_order: self.maintain_order,
2362        };
2363        let options = JoinOptions {
2364            allow_parallel: self.allow_parallel,
2365            force_parallel: self.force_parallel,
2366            args,
2367        };
2368
2369        let lp = DslPlan::Join {
2370            input_left: Arc::new(self.lf.logical_plan),
2371            input_right: Arc::new(other.logical_plan),
2372            left_on: Default::default(),
2373            right_on: Default::default(),
2374            predicates,
2375            options: Arc::from(options),
2376        };
2377
2378        LazyFrame::from_logical_plan(lp, opt_state)
2379    }
2380}
2381
2382pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2383    #[cfg(not(feature = "new_streaming"))]
2384    {
2385        None
2386    }
2387    #[cfg(feature = "new_streaming")]
2388    {
2389        Some(polars_stream::build_streaming_query_executor)
2390    }
2391};