Skip to main content

polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::error::feature_gated;
26use polars_core::prelude::*;
27use polars_core::query_result::QueryResult;
28use polars_io::RowIndex;
29use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
30use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
31use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
32#[cfg(feature = "is_between")]
33use polars_ops::prelude::ClosedInterval;
34pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
35use polars_utils::pl_str::PlSmallStr;
36
37use crate::frame::cached_arenas::CachedArena;
38use crate::prelude::*;
39
40pub trait IntoLazy {
41    fn lazy(self) -> LazyFrame;
42}
43
44impl IntoLazy for DataFrame {
45    /// Convert the `DataFrame` into a `LazyFrame`
46    fn lazy(self) -> LazyFrame {
47        let lp = DslBuilder::from_existing_df(self).build();
48        LazyFrame {
49            logical_plan: lp,
50            opt_state: Default::default(),
51            cached_arena: Default::default(),
52        }
53    }
54}
55
56impl IntoLazy for LazyFrame {
57    fn lazy(self) -> LazyFrame {
58        self
59    }
60}
61
62/// Lazy abstraction over an eager `DataFrame`.
63///
64/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
65/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
66#[derive(Clone, Default)]
67#[must_use]
68pub struct LazyFrame {
69    pub logical_plan: DslPlan,
70    pub(crate) opt_state: OptFlags,
71    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
72}
73
74impl From<DslPlan> for LazyFrame {
75    fn from(plan: DslPlan) -> Self {
76        Self {
77            logical_plan: plan,
78            opt_state: OptFlags::default(),
79            cached_arena: Default::default(),
80        }
81    }
82}
83
84impl LazyFrame {
85    pub(crate) fn from_inner(
86        logical_plan: DslPlan,
87        opt_state: OptFlags,
88        cached_arena: Arc<Mutex<Option<CachedArena>>>,
89    ) -> Self {
90        Self {
91            logical_plan,
92            opt_state,
93            cached_arena,
94        }
95    }
96
97    pub(crate) fn get_plan_builder(self) -> DslBuilder {
98        DslBuilder::from(self.logical_plan)
99    }
100
101    fn get_opt_state(&self) -> OptFlags {
102        self.opt_state
103    }
104
105    pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
106        LazyFrame {
107            logical_plan,
108            opt_state,
109            cached_arena: Default::default(),
110        }
111    }
112
113    /// Get current optimizations.
114    pub fn get_current_optimizations(&self) -> OptFlags {
115        self.opt_state
116    }
117
118    /// Set allowed optimizations.
119    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
120        self.opt_state = opt_state;
121        self
122    }
123
124    /// Turn off all optimizations.
125    pub fn without_optimizations(self) -> Self {
126        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
127    }
128
129    /// Toggle projection pushdown optimization.
130    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
131        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
132        self
133    }
134
135    /// Toggle cluster with columns optimization.
136    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
137        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
138        self
139    }
140
141    /// Check if operations are order dependent and unset maintaining_order if
142    /// the order would not be observed.
143    pub fn with_check_order(mut self, toggle: bool) -> Self {
144        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
145        self
146    }
147
148    /// Toggle predicate pushdown optimization.
149    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
150        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
151        self
152    }
153
154    /// Toggle type coercion optimization.
155    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
156        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
157        self
158    }
159
160    /// Toggle type check optimization.
161    pub fn with_type_check(mut self, toggle: bool) -> Self {
162        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
163        self
164    }
165
166    /// Toggle expression simplification optimization on or off.
167    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
168        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
169        self
170    }
171
172    /// Toggle common subplan elimination optimization on or off
173    #[cfg(feature = "cse")]
174    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
175        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
176        self
177    }
178
179    /// Toggle common subexpression elimination optimization on or off
180    #[cfg(feature = "cse")]
181    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
182        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
183        self
184    }
185
186    /// Toggle slice pushdown optimization.
187    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
188        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
189        self
190    }
191
192    #[cfg(feature = "new_streaming")]
193    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
194        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
195        self
196    }
197
198    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
199    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
200        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
201        self
202    }
203
204    /// Run every node eagerly. This turns off multi-node optimizations.
205    pub fn _with_eager(mut self, toggle: bool) -> Self {
206        self.opt_state.set(OptFlags::EAGER, toggle);
207        self
208    }
209
210    /// Return a String describing the naive (un-optimized) logical plan.
211    pub fn describe_plan(&self) -> PolarsResult<String> {
212        Ok(self.clone().to_alp()?.describe())
213    }
214
215    /// Return a String describing the naive (un-optimized) logical plan in tree format.
216    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
217        Ok(self.clone().to_alp()?.describe_tree_format())
218    }
219
220    /// Return a String describing the optimized logical plan.
221    ///
222    /// Returns `Err` if optimizing the logical plan fails.
223    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
224        Ok(self.clone().to_alp_optimized()?.describe())
225    }
226
227    /// Return a String describing the optimized logical plan in tree format.
228    ///
229    /// Returns `Err` if optimizing the logical plan fails.
230    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
231        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
232    }
233
234    /// Return a String describing the logical plan.
235    ///
236    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
237    /// explains the naive, un-optimized plan.
238    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
239        if optimized {
240            self.describe_optimized_plan()
241        } else {
242            self.describe_plan()
243        }
244    }
245
246    /// Add a sort operation to the logical plan.
247    ///
248    /// Sorts the LazyFrame by the column name specified using the provided options.
249    ///
250    /// # Example
251    ///
252    /// Sort DataFrame by 'sepal_width' column:
253    /// ```rust
254    /// # use polars_core::prelude::*;
255    /// # use polars_lazy::prelude::*;
256    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
257    ///     df.lazy().sort(["sepal_width"], Default::default())
258    /// }
259    /// ```
260    /// Sort by a single column with specific order:
261    /// ```
262    /// # use polars_core::prelude::*;
263    /// # use polars_lazy::prelude::*;
264    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
265    ///     df.lazy().sort(
266    ///         ["sepal_width"],
267    ///         SortMultipleOptions::new()
268    ///             .with_order_descending(descending)
269    ///     )
270    /// }
271    /// ```
272    /// Sort by multiple columns with specifying order for each column:
273    /// ```
274    /// # use polars_core::prelude::*;
275    /// # use polars_lazy::prelude::*;
276    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
277    ///     df.lazy().sort(
278    ///         ["sepal_width", "sepal_length"],
279    ///         SortMultipleOptions::new()
280    ///             .with_order_descending_multi([false, true])
281    ///     )
282    /// }
283    /// ```
284    /// See [`SortMultipleOptions`] for more options.
285    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
286        let opt_state = self.get_opt_state();
287        let lp = self
288            .get_plan_builder()
289            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
290            .build();
291        Self::from_logical_plan(lp, opt_state)
292    }
293
294    /// Add a sort operation to the logical plan.
295    ///
296    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
297    /// concrete columns before sorting.
298    ///
299    /// See [`SortMultipleOptions`] for more options.
300    ///
301    /// # Example
302    ///
303    /// ```rust
304    /// use polars_core::prelude::*;
305    /// use polars_lazy::prelude::*;
306    ///
307    /// /// Sort DataFrame by 'sepal_width' column
308    /// fn example(df: DataFrame) -> LazyFrame {
309    ///       df.lazy()
310    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
311    /// }
312    /// ```
313    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
314        self,
315        by_exprs: E,
316        sort_options: SortMultipleOptions,
317    ) -> Self {
318        let by_exprs = by_exprs.as_ref().to_vec();
319        if by_exprs.is_empty() {
320            self
321        } else {
322            let opt_state = self.get_opt_state();
323            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
324            Self::from_logical_plan(lp, opt_state)
325        }
326    }
327
328    pub fn top_k<E: AsRef<[Expr]>>(
329        self,
330        k: IdxSize,
331        by_exprs: E,
332        sort_options: SortMultipleOptions,
333    ) -> Self {
334        // this will optimize to top-k
335        self.sort_by_exprs(
336            by_exprs,
337            sort_options.with_order_reversed().with_nulls_last(true),
338        )
339        .slice(0, k)
340    }
341
342    pub fn bottom_k<E: AsRef<[Expr]>>(
343        self,
344        k: IdxSize,
345        by_exprs: E,
346        sort_options: SortMultipleOptions,
347    ) -> Self {
348        // this will optimize to bottom-k
349        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
350            .slice(0, k)
351    }
352
353    /// Reverse the `DataFrame` from top to bottom.
354    ///
355    /// Row `i` becomes row `number_of_rows - i - 1`.
356    ///
357    /// # Example
358    ///
359    /// ```rust
360    /// use polars_core::prelude::*;
361    /// use polars_lazy::prelude::*;
362    ///
363    /// fn example(df: DataFrame) -> LazyFrame {
364    ///       df.lazy()
365    ///         .reverse()
366    /// }
367    /// ```
368    pub fn reverse(self) -> Self {
369        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
370    }
371
372    /// Rename columns in the DataFrame.
373    ///
374    /// `existing` and `new` are iterables of the same length containing the old and
375    /// corresponding new column names. Renaming happens to all `existing` columns
376    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
377    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
378    /// those columns that are actually found will be renamed (others will be ignored).
379    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
380    where
381        I: IntoIterator<Item = T>,
382        J: IntoIterator<Item = S>,
383        T: AsRef<str>,
384        S: AsRef<str>,
385    {
386        let iter = existing.into_iter();
387        let cap = iter.size_hint().0;
388        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
389        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390
391        // TODO! should this error if `existing` and `new` have different lengths?
392        // Currently, the longer of the two is truncated.
393        for (existing, new) in iter.zip(new) {
394            let existing = existing.as_ref();
395            let new = new.as_ref();
396            if new != existing {
397                existing_vec.push(existing.into());
398                new_vec.push(new.into());
399            }
400        }
401
402        self.map_private(DslFunction::Rename {
403            existing: existing_vec.into(),
404            new: new_vec.into(),
405            strict,
406        })
407    }
408
409    /// Removes columns from the DataFrame.
410    /// Note that it's better to only select the columns you need
411    /// and let the projection pushdown optimize away the unneeded columns.
412    ///
413    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
414    /// error while materializing the [`LazyFrame`].
415    pub fn drop(self, columns: Selector) -> Self {
416        let opt_state = self.get_opt_state();
417        let lp = self.get_plan_builder().drop(columns).build();
418        Self::from_logical_plan(lp, opt_state)
419    }
420
421    /// Shift the values by a given period and fill the parts that will be empty due to this operation
422    /// with `Nones`.
423    ///
424    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
425    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
426        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
427    }
428
429    /// Shift the values by a given period and fill the parts that will be empty due to this operation
430    /// with the result of the `fill_value` expression.
431    ///
432    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
433    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
434        self.select(vec![
435            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
436        ])
437    }
438
439    /// Fill None values in the DataFrame with an expression.
440    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
441        let opt_state = self.get_opt_state();
442        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
443        Self::from_logical_plan(lp, opt_state)
444    }
445
446    /// Fill NaN values in the DataFrame with an expression.
447    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448        let opt_state = self.get_opt_state();
449        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
450        Self::from_logical_plan(lp, opt_state)
451    }
452
453    /// Caches the result into a new LazyFrame.
454    ///
455    /// This should be used to prevent computations running multiple times.
456    pub fn cache(self) -> Self {
457        let opt_state = self.get_opt_state();
458        let lp = self.get_plan_builder().cache().build();
459        Self::from_logical_plan(lp, opt_state)
460    }
461
462    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
463    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
464        let cast_cols: Vec<Expr> = dtypes
465            .into_iter()
466            .map(|(name, dt)| {
467                let name = PlSmallStr::from_str(name);
468
469                if strict {
470                    col(name).strict_cast(dt)
471                } else {
472                    col(name).cast(dt)
473                }
474            })
475            .collect();
476
477        if cast_cols.is_empty() {
478            self
479        } else {
480            self.with_columns(cast_cols)
481        }
482    }
483
484    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
485    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
486        self.with_columns(vec![if strict {
487            col(PlSmallStr::from_static("*")).strict_cast(dtype)
488        } else {
489            col(PlSmallStr::from_static("*")).cast(dtype)
490        }])
491    }
492
493    pub fn optimize(
494        self,
495        lp_arena: &mut Arena<IR>,
496        expr_arena: &mut Arena<AExpr>,
497    ) -> PolarsResult<Node> {
498        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
499    }
500
501    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
502        let (mut lp_arena, mut expr_arena) = self.get_arenas();
503        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
504
505        Ok(IRPlan::new(node, lp_arena, expr_arena))
506    }
507
508    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
509        let (mut lp_arena, mut expr_arena) = self.get_arenas();
510        let node = to_alp(
511            self.logical_plan,
512            &mut expr_arena,
513            &mut lp_arena,
514            &mut self.opt_state,
515        )?;
516        let plan = IRPlan::new(node, lp_arena, expr_arena);
517        Ok(plan)
518    }
519
520    pub(crate) fn optimize_with_scratch(
521        self,
522        lp_arena: &mut Arena<IR>,
523        expr_arena: &mut Arena<AExpr>,
524        scratch: &mut Vec<Node>,
525    ) -> PolarsResult<Node> {
526        let lp_top = optimize(
527            self.logical_plan,
528            self.opt_state,
529            lp_arena,
530            expr_arena,
531            scratch,
532            apply_scan_predicate_to_scan_ir,
533        )?;
534
535        Ok(lp_top)
536    }
537
538    fn prepare_collect_post_opt<P>(
539        mut self,
540        check_sink: bool,
541        query_start: Option<std::time::Instant>,
542        post_opt: P,
543    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
544    where
545        P: FnOnce(
546            Node,
547            &mut Arena<IR>,
548            &mut Arena<AExpr>,
549            Option<std::time::Duration>,
550        ) -> PolarsResult<()>,
551    {
552        let (mut lp_arena, mut expr_arena) = self.get_arenas();
553
554        let mut scratch = vec![];
555        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
556
557        post_opt(
558            lp_top,
559            &mut lp_arena,
560            &mut expr_arena,
561            // Post optimization callback gets the time since the
562            // query was started as its "base" timepoint.
563            query_start.map(|s| s.elapsed()),
564        )?;
565
566        // sink should be replaced
567        let no_file_sink = if check_sink {
568            !matches!(
569                lp_arena.get(lp_top),
570                IR::Sink {
571                    payload: SinkTypeIR::File { .. },
572                    ..
573                }
574            )
575        } else {
576            true
577        };
578        let physical_plan = create_physical_plan(
579            lp_top,
580            &mut lp_arena,
581            &mut expr_arena,
582            BUILD_STREAMING_EXECUTOR,
583        )?;
584
585        let state = ExecutionState::new();
586        Ok((state, physical_plan, no_file_sink))
587    }
588
589    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
590    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
591    where
592        P: FnOnce(
593            Node,
594            &mut Arena<IR>,
595            &mut Arena<AExpr>,
596            Option<std::time::Duration>,
597        ) -> PolarsResult<()>,
598    {
599        let (mut state, mut physical_plan, _) =
600            self.prepare_collect_post_opt(false, None, post_opt)?;
601        physical_plan.execute(&mut state)
602    }
603
604    #[allow(unused_mut)]
605    fn prepare_collect(
606        self,
607        check_sink: bool,
608        query_start: Option<std::time::Instant>,
609    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
610        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
611    }
612
613    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
614    /// `engine`.
615    ///
616    /// The query is optimized prior to execution.
617    pub fn collect_with_engine(mut self, engine: Engine) -> PolarsResult<QueryResult> {
618        #[cfg(feature = "new_streaming")]
619        {
620            if let Some(result) = self.try_new_streaming_if_requested() {
621                return result;
622            }
623        }
624
625        if let Engine::Streaming = engine {
626            feature_gated!("new_streaming", self = self.with_new_streaming(true))
627        }
628
629        let mut ir_plan = self.to_alp_optimized()?;
630
631        ir_plan.ensure_root_node_is_sink();
632
633        match engine {
634            Engine::Streaming => feature_gated!("new_streaming", {
635                polars_stream::run_query(
636                    ir_plan.lp_top,
637                    &mut ir_plan.lp_arena,
638                    &mut ir_plan.expr_arena,
639                )
640            }),
641            Engine::Auto | Engine::InMemory | Engine::Gpu => {
642                if let IR::SinkMultiple { inputs } = ir_plan.root() {
643                    polars_ensure!(
644                        engine != Engine::Gpu,
645                        InvalidOperation:
646                        "collect_all is not supported for the gpu engine"
647                    );
648
649                    return create_multiple_physical_plans(
650                        inputs.clone().as_slice(),
651                        &mut ir_plan.lp_arena,
652                        &mut ir_plan.expr_arena,
653                        BUILD_STREAMING_EXECUTOR,
654                    )?
655                    .execute()
656                    .map(QueryResult::Multiple);
657                }
658
659                let mut physical_plan = create_physical_plan(
660                    ir_plan.lp_top,
661                    &mut ir_plan.lp_arena,
662                    &mut ir_plan.expr_arena,
663                    BUILD_STREAMING_EXECUTOR,
664                )?;
665                let mut state = ExecutionState::new();
666                physical_plan.execute(&mut state).map(QueryResult::Single)
667            },
668        }
669    }
670
671    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
672        let sink_multiple = LazyFrame {
673            logical_plan: DslPlan::SinkMultiple { inputs: plans },
674            opt_state,
675            cached_arena: Default::default(),
676        };
677        sink_multiple.explain(true)
678    }
679
680    pub fn collect_all_with_engine(
681        plans: Vec<DslPlan>,
682        engine: Engine,
683        opt_state: OptFlags,
684    ) -> PolarsResult<Vec<DataFrame>> {
685        if plans.is_empty() {
686            return Ok(Vec::new());
687        }
688
689        LazyFrame {
690            logical_plan: DslPlan::SinkMultiple { inputs: plans },
691            opt_state,
692            cached_arena: Default::default(),
693        }
694        .collect_with_engine(engine)
695        .map(|r| r.unwrap_multiple())
696    }
697
698    /// Execute all the lazy operations and collect them into a [`DataFrame`].
699    ///
700    /// The query is optimized prior to execution.
701    ///
702    /// # Example
703    ///
704    /// ```rust
705    /// use polars_core::prelude::*;
706    /// use polars_lazy::prelude::*;
707    ///
708    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
709    ///     df.lazy()
710    ///       .group_by([col("foo")])
711    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
712    ///       .collect()
713    /// }
714    /// ```
715    pub fn collect(self) -> PolarsResult<DataFrame> {
716        self.collect_with_engine(Engine::Auto).map(|r| match r {
717            QueryResult::Single(df) => df,
718            // TODO: Should return query results
719            QueryResult::Multiple(_) => DataFrame::empty(),
720        })
721    }
722
723    /// Collect the query in batches.
724    ///
725    /// If lazy is true the query will not start until the first poll (or until
726    /// start is called on CollectBatches).
727    #[cfg(feature = "async")]
728    pub fn collect_batches(
729        self,
730        engine: Engine,
731        maintain_order: bool,
732        chunk_size: Option<NonZeroUsize>,
733        lazy: bool,
734    ) -> PolarsResult<CollectBatches> {
735        let (send, recv) = sync_channel(1);
736        let runner_send = send.clone();
737        let ldf = self.sink_batches(
738            PlanCallback::new(move |df| {
739                // Stop if receiver has closed.
740                let send_result = send.send(Ok(df));
741                Ok(send_result.is_err())
742            }),
743            maintain_order,
744            chunk_size,
745        )?;
746        let runner = move || {
747            // We use a tokio spawn_blocking here as it has a high blocking
748            // thread pool limit.
749            polars_io::pl_async::get_runtime().spawn_blocking(move || {
750                if let Err(e) = ldf.collect_with_engine(engine) {
751                    runner_send.send(Err(e)).ok();
752                }
753            });
754        };
755
756        let mut collect_batches = CollectBatches {
757            recv,
758            runner: Some(Box::new(runner)),
759        };
760        if !lazy {
761            collect_batches.start();
762        }
763        Ok(collect_batches)
764    }
765
766    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
767    // This version does profiling of the node execution.
768    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
769    where
770        P: FnOnce(
771            Node,
772            &mut Arena<IR>,
773            &mut Arena<AExpr>,
774            Option<std::time::Duration>,
775        ) -> PolarsResult<()>,
776    {
777        let query_start = std::time::Instant::now();
778        let (mut state, mut physical_plan, _) =
779            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
780        state.time_nodes(query_start);
781        let out = physical_plan.execute(&mut state)?;
782        let timer_df = state.finish_timer()?;
783        Ok((out, timer_df))
784    }
785
786    /// Profile a LazyFrame.
787    ///
788    /// This will run the query and return a tuple
789    /// containing the materialized DataFrame and a DataFrame that contains profiling information
790    /// of each node that is executed.
791    ///
792    /// The units of the timings are microseconds.
793    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
794        self._profile_post_opt(|_, _, _, _| Ok(()))
795    }
796
797    pub fn sink_batches(
798        mut self,
799        function: PlanCallback<DataFrame, bool>,
800        maintain_order: bool,
801        chunk_size: Option<NonZeroUsize>,
802    ) -> PolarsResult<Self> {
803        use polars_plan::prelude::sink::CallbackSinkType;
804
805        polars_ensure!(
806            !matches!(self.logical_plan, DslPlan::Sink { .. }),
807            InvalidOperation: "cannot create a sink on top of another sink"
808        );
809
810        self.logical_plan = DslPlan::Sink {
811            input: Arc::new(self.logical_plan),
812            payload: SinkType::Callback(CallbackSinkType {
813                function,
814                maintain_order,
815                chunk_size,
816            }),
817        };
818
819        Ok(self)
820    }
821
822    #[cfg(feature = "new_streaming")]
823    pub fn try_new_streaming_if_requested(
824        &mut self,
825    ) -> Option<PolarsResult<polars_core::query_result::QueryResult>> {
826        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
827        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
828
829        if auto_new_streaming || force_new_streaming {
830            // Try to run using the new streaming engine, falling back
831            // if it fails in a todo!() error if auto_new_streaming is set.
832            let mut new_stream_lazy = self.clone();
833            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
834            let mut ir_plan = match new_stream_lazy.to_alp_optimized() {
835                Ok(v) => v,
836                Err(e) => return Some(Err(e)),
837            };
838
839            ir_plan.ensure_root_node_is_sink();
840
841            let f = || {
842                polars_stream::run_query(
843                    ir_plan.lp_top,
844                    &mut ir_plan.lp_arena,
845                    &mut ir_plan.expr_arena,
846                )
847            };
848
849            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
850                Ok(v) => return Some(v),
851                Err(e) => {
852                    // Fallback to normal engine if error is due to not being implemented
853                    // and auto_new_streaming is set, otherwise propagate error.
854                    if !force_new_streaming
855                        && auto_new_streaming
856                        && e.downcast_ref::<&str>()
857                            .map(|s| s.starts_with("not yet implemented"))
858                            .unwrap_or(false)
859                    {
860                        if polars_core::config::verbose() {
861                            eprintln!(
862                                "caught unimplemented error in new streaming engine, falling back to normal engine"
863                            );
864                        }
865                    } else {
866                        std::panic::resume_unwind(e);
867                    }
868                },
869            }
870        }
871
872        None
873    }
874
875    pub fn sink(
876        mut self,
877        sink_type: SinkDestination,
878        file_format: FileWriteFormat,
879        unified_sink_args: UnifiedSinkArgs,
880    ) -> PolarsResult<Self> {
881        polars_ensure!(
882            !matches!(self.logical_plan, DslPlan::Sink { .. }),
883            InvalidOperation: "cannot create a sink on top of another sink"
884        );
885
886        self.logical_plan = DslPlan::Sink {
887            input: Arc::new(self.logical_plan),
888            payload: match sink_type {
889                SinkDestination::File { target } => SinkType::File(FileSinkOptions {
890                    target,
891                    file_format,
892                    unified_sink_args,
893                }),
894                SinkDestination::Partitioned {
895                    base_path,
896                    file_path_provider,
897                    partition_strategy,
898                    max_rows_per_file,
899                    approximate_bytes_per_file,
900                } => SinkType::Partitioned(PartitionedSinkOptions {
901                    base_path,
902                    file_path_provider,
903                    partition_strategy,
904                    file_format,
905                    unified_sink_args,
906                    max_rows_per_file,
907                    approximate_bytes_per_file,
908                }),
909            },
910        };
911        Ok(self)
912    }
913
914    /// Filter frame rows that match a predicate expression.
915    ///
916    /// The expression must yield boolean values (note that rows where the
917    /// predicate resolves to `null` are *not* included in the resulting frame).
918    ///
919    /// # Example
920    ///
921    /// ```rust
922    /// use polars_core::prelude::*;
923    /// use polars_lazy::prelude::*;
924    ///
925    /// fn example(df: DataFrame) -> LazyFrame {
926    ///       df.lazy()
927    ///         .filter(col("sepal_width").is_not_null())
928    ///         .select([col("sepal_width"), col("sepal_length")])
929    /// }
930    /// ```
931    pub fn filter(self, predicate: Expr) -> Self {
932        let opt_state = self.get_opt_state();
933        let lp = self.get_plan_builder().filter(predicate).build();
934        Self::from_logical_plan(lp, opt_state)
935    }
936
937    /// Remove frame rows that match a predicate expression.
938    ///
939    /// The expression must yield boolean values (note that rows where the
940    /// predicate resolves to `null` are *not* removed from the resulting frame).
941    ///
942    /// # Example
943    ///
944    /// ```rust
945    /// use polars_core::prelude::*;
946    /// use polars_lazy::prelude::*;
947    ///
948    /// fn example(df: DataFrame) -> LazyFrame {
949    ///       df.lazy()
950    ///         .remove(col("sepal_width").is_null())
951    ///         .select([col("sepal_width"), col("sepal_length")])
952    /// }
953    /// ```
954    pub fn remove(self, predicate: Expr) -> Self {
955        self.filter(predicate.neq_missing(lit(true)))
956    }
957
958    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
959    ///
960    /// Columns can be selected with [`col`];
961    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
962    ///
963    /// # Example
964    ///
965    /// ```rust
966    /// use polars_core::prelude::*;
967    /// use polars_lazy::prelude::*;
968    ///
969    /// /// This function selects column "foo" and column "bar".
970    /// /// Column "bar" is renamed to "ham".
971    /// fn example(df: DataFrame) -> LazyFrame {
972    ///       df.lazy()
973    ///         .select([col("foo"),
974    ///                   col("bar").alias("ham")])
975    /// }
976    ///
977    /// /// This function selects all columns except "foo"
978    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
979    ///       df.lazy()
980    ///         .select([all().exclude_cols(["foo"]).as_expr()])
981    /// }
982    /// ```
983    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
984        let exprs = exprs.as_ref().to_vec();
985        self.select_impl(
986            exprs,
987            ProjectionOptions {
988                run_parallel: true,
989                duplicate_check: true,
990                should_broadcast: true,
991            },
992        )
993    }
994
995    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
996        let exprs = exprs.as_ref().to_vec();
997        self.select_impl(
998            exprs,
999            ProjectionOptions {
1000                run_parallel: false,
1001                duplicate_check: true,
1002                should_broadcast: true,
1003            },
1004        )
1005    }
1006
1007    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1008        let opt_state = self.get_opt_state();
1009        let lp = self.get_plan_builder().project(exprs, options).build();
1010        Self::from_logical_plan(lp, opt_state)
1011    }
1012
1013    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1014    ///
1015    /// Takes a list of expressions to group on.
1016    ///
1017    /// # Example
1018    ///
1019    /// ```rust
1020    /// use polars_core::prelude::*;
1021    /// use polars_lazy::prelude::*;
1022    ///
1023    /// fn example(df: DataFrame) -> LazyFrame {
1024    ///       df.lazy()
1025    ///        .group_by([col("date")])
1026    ///        .agg([
1027    ///            col("rain").min().alias("min_rain"),
1028    ///            col("rain").sum().alias("sum_rain"),
1029    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1030    ///        ])
1031    /// }
1032    /// ```
1033    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1034        let keys = by
1035            .as_ref()
1036            .iter()
1037            .map(|e| e.clone().into())
1038            .collect::<Vec<_>>();
1039        let opt_state = self.get_opt_state();
1040
1041        #[cfg(feature = "dynamic_group_by")]
1042        {
1043            LazyGroupBy {
1044                logical_plan: self.logical_plan,
1045                opt_state,
1046                keys,
1047                predicates: vec![],
1048                maintain_order: false,
1049                dynamic_options: None,
1050                rolling_options: None,
1051            }
1052        }
1053
1054        #[cfg(not(feature = "dynamic_group_by"))]
1055        {
1056            LazyGroupBy {
1057                logical_plan: self.logical_plan,
1058                opt_state,
1059                keys,
1060                predicates: vec![],
1061                maintain_order: false,
1062            }
1063        }
1064    }
1065
1066    /// Create rolling groups based on a time column.
1067    ///
1068    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1069    ///
1070    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1071    /// individual values and are not of constant intervals. For constant intervals use
1072    /// *group_by_dynamic*
1073    #[cfg(feature = "dynamic_group_by")]
1074    pub fn rolling<E: AsRef<[Expr]>>(
1075        mut self,
1076        index_column: Expr,
1077        group_by: E,
1078        mut options: RollingGroupOptions,
1079    ) -> LazyGroupBy {
1080        if let Expr::Column(name) = index_column {
1081            options.index_column = name;
1082        } else {
1083            let output_field = index_column
1084                .to_field(&self.collect_schema().unwrap())
1085                .unwrap();
1086            return self.with_column(index_column).rolling(
1087                Expr::Column(output_field.name().clone()),
1088                group_by,
1089                options,
1090            );
1091        }
1092        let opt_state = self.get_opt_state();
1093        LazyGroupBy {
1094            logical_plan: self.logical_plan,
1095            opt_state,
1096            predicates: vec![],
1097            keys: group_by.as_ref().to_vec(),
1098            maintain_order: true,
1099            dynamic_options: None,
1100            rolling_options: Some(options),
1101        }
1102    }
1103
1104    /// Group based on a time value (or index value of type Int32, Int64).
1105    ///
1106    /// Time windows are calculated and rows are assigned to windows. Different from a
1107    /// normal group_by is that a row can be member of multiple groups. The time/index
1108    /// window could be seen as a rolling window, with a window size determined by
1109    /// dates/times/values instead of slots in the DataFrame.
1110    ///
1111    /// A window is defined by:
1112    ///
1113    /// - every: interval of the window
1114    /// - period: length of the window
1115    /// - offset: offset of the window
1116    ///
1117    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1118    /// with a ordinary group_by on these keys.
1119    #[cfg(feature = "dynamic_group_by")]
1120    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1121        mut self,
1122        index_column: Expr,
1123        group_by: E,
1124        mut options: DynamicGroupOptions,
1125    ) -> LazyGroupBy {
1126        if let Expr::Column(name) = index_column {
1127            options.index_column = name;
1128        } else {
1129            let output_field = index_column
1130                .to_field(&self.collect_schema().unwrap())
1131                .unwrap();
1132            return self.with_column(index_column).group_by_dynamic(
1133                Expr::Column(output_field.name().clone()),
1134                group_by,
1135                options,
1136            );
1137        }
1138        let opt_state = self.get_opt_state();
1139        LazyGroupBy {
1140            logical_plan: self.logical_plan,
1141            opt_state,
1142            predicates: vec![],
1143            keys: group_by.as_ref().to_vec(),
1144            maintain_order: true,
1145            dynamic_options: Some(options),
1146            rolling_options: None,
1147        }
1148    }
1149
1150    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1151    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1152        let keys = by
1153            .as_ref()
1154            .iter()
1155            .map(|e| e.clone().into())
1156            .collect::<Vec<_>>();
1157        let opt_state = self.get_opt_state();
1158
1159        #[cfg(feature = "dynamic_group_by")]
1160        {
1161            LazyGroupBy {
1162                logical_plan: self.logical_plan,
1163                opt_state,
1164                keys,
1165                predicates: vec![],
1166                maintain_order: true,
1167                dynamic_options: None,
1168                rolling_options: None,
1169            }
1170        }
1171
1172        #[cfg(not(feature = "dynamic_group_by"))]
1173        {
1174            LazyGroupBy {
1175                logical_plan: self.logical_plan,
1176                opt_state,
1177                keys,
1178                predicates: vec![],
1179                maintain_order: true,
1180            }
1181        }
1182    }
1183
1184    /// Left anti join this query with another lazy query.
1185    ///
1186    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1187    /// flexible join logic, see [`join`](LazyFrame::join) or
1188    /// [`join_builder`](LazyFrame::join_builder).
1189    ///
1190    /// # Example
1191    ///
1192    /// ```rust
1193    /// use polars_core::prelude::*;
1194    /// use polars_lazy::prelude::*;
1195    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1196    ///         ldf
1197    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1198    /// }
1199    /// ```
1200    #[cfg(feature = "semi_anti_join")]
1201    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1202        self.join(
1203            other,
1204            [left_on.into()],
1205            [right_on.into()],
1206            JoinArgs::new(JoinType::Anti),
1207        )
1208    }
1209
1210    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1211    #[cfg(feature = "cross_join")]
1212    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1213        self.join(
1214            other,
1215            vec![],
1216            vec![],
1217            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1218        )
1219    }
1220
1221    /// Left outer join this query with another lazy query.
1222    ///
1223    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1224    /// flexible join logic, see [`join`](LazyFrame::join) or
1225    /// [`join_builder`](LazyFrame::join_builder).
1226    ///
1227    /// # Example
1228    ///
1229    /// ```rust
1230    /// use polars_core::prelude::*;
1231    /// use polars_lazy::prelude::*;
1232    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1233    ///         ldf
1234    ///         .left_join(other, col("foo"), col("bar"))
1235    /// }
1236    /// ```
1237    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1238        self.join(
1239            other,
1240            [left_on.into()],
1241            [right_on.into()],
1242            JoinArgs::new(JoinType::Left),
1243        )
1244    }
1245
1246    /// Inner join this query with another lazy query.
1247    ///
1248    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1249    /// flexible join logic, see [`join`](LazyFrame::join) or
1250    /// [`join_builder`](LazyFrame::join_builder).
1251    ///
1252    /// # Example
1253    ///
1254    /// ```rust
1255    /// use polars_core::prelude::*;
1256    /// use polars_lazy::prelude::*;
1257    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1258    ///         ldf
1259    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1260    /// }
1261    /// ```
1262    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1263        self.join(
1264            other,
1265            [left_on.into()],
1266            [right_on.into()],
1267            JoinArgs::new(JoinType::Inner),
1268        )
1269    }
1270
1271    /// Full outer join this query with another lazy query.
1272    ///
1273    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1274    /// flexible join logic, see [`join`](LazyFrame::join) or
1275    /// [`join_builder`](LazyFrame::join_builder).
1276    ///
1277    /// # Example
1278    ///
1279    /// ```rust
1280    /// use polars_core::prelude::*;
1281    /// use polars_lazy::prelude::*;
1282    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1283    ///         ldf
1284    ///         .full_join(other, col("foo"), col("bar"))
1285    /// }
1286    /// ```
1287    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1288        self.join(
1289            other,
1290            [left_on.into()],
1291            [right_on.into()],
1292            JoinArgs::new(JoinType::Full),
1293        )
1294    }
1295
1296    /// Left semi join this query with another lazy query.
1297    ///
1298    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1299    /// flexible join logic, see [`join`](LazyFrame::join) or
1300    /// [`join_builder`](LazyFrame::join_builder).
1301    ///
1302    /// # Example
1303    ///
1304    /// ```rust
1305    /// use polars_core::prelude::*;
1306    /// use polars_lazy::prelude::*;
1307    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1308    ///         ldf
1309    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1310    /// }
1311    /// ```
1312    #[cfg(feature = "semi_anti_join")]
1313    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1314        self.join(
1315            other,
1316            [left_on.into()],
1317            [right_on.into()],
1318            JoinArgs::new(JoinType::Semi),
1319        )
1320    }
1321
1322    /// Generic function to join two LazyFrames.
1323    ///
1324    /// `join` can join on multiple columns, given as two list of expressions, and with a
1325    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1326    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1327    /// over how columns are renamed and parallelization options, use
1328    /// [`join_builder`](LazyFrame::join_builder).
1329    ///
1330    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1331    ///
1332    /// # Example
1333    ///
1334    /// ```rust
1335    /// use polars_core::prelude::*;
1336    /// use polars_lazy::prelude::*;
1337    ///
1338    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1339    ///         ldf
1340    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1341    /// }
1342    /// ```
1343    pub fn join<E: AsRef<[Expr]>>(
1344        self,
1345        other: LazyFrame,
1346        left_on: E,
1347        right_on: E,
1348        args: JoinArgs,
1349    ) -> LazyFrame {
1350        let left_on = left_on.as_ref().to_vec();
1351        let right_on = right_on.as_ref().to_vec();
1352
1353        self._join_impl(other, left_on, right_on, args)
1354    }
1355
1356    fn _join_impl(
1357        self,
1358        other: LazyFrame,
1359        left_on: Vec<Expr>,
1360        right_on: Vec<Expr>,
1361        args: JoinArgs,
1362    ) -> LazyFrame {
1363        let JoinArgs {
1364            how,
1365            validation,
1366            suffix,
1367            slice,
1368            nulls_equal,
1369            coalesce,
1370            maintain_order,
1371            build_side,
1372        } = args;
1373
1374        if slice.is_some() {
1375            panic!("impl error: slice is not handled")
1376        }
1377
1378        let mut builder = self
1379            .join_builder()
1380            .with(other)
1381            .left_on(left_on)
1382            .right_on(right_on)
1383            .how(how)
1384            .validate(validation)
1385            .join_nulls(nulls_equal)
1386            .coalesce(coalesce)
1387            .maintain_order(maintain_order)
1388            .build_side(build_side);
1389
1390        if let Some(suffix) = suffix {
1391            builder = builder.suffix(suffix);
1392        }
1393
1394        // Note: args.slice is set by the optimizer
1395        builder.finish()
1396    }
1397
1398    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1399    ///
1400    /// After the `JoinBuilder` has been created and set up, calling
1401    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1402    /// representing the `join` operation.
1403    pub fn join_builder(self) -> JoinBuilder {
1404        JoinBuilder::new(self)
1405    }
1406
1407    /// Add or replace a column, given as an expression, to a DataFrame.
1408    ///
1409    /// # Example
1410    ///
1411    /// ```rust
1412    /// use polars_core::prelude::*;
1413    /// use polars_lazy::prelude::*;
1414    /// fn add_column(df: DataFrame) -> LazyFrame {
1415    ///     df.lazy()
1416    ///         .with_column(
1417    ///             when(col("sepal_length").lt(lit(5.0)))
1418    ///             .then(lit(10))
1419    ///             .otherwise(lit(1))
1420    ///             .alias("new_column_name"),
1421    ///         )
1422    /// }
1423    /// ```
1424    pub fn with_column(self, expr: Expr) -> LazyFrame {
1425        let opt_state = self.get_opt_state();
1426        let lp = self
1427            .get_plan_builder()
1428            .with_columns(
1429                vec![expr],
1430                ProjectionOptions {
1431                    run_parallel: false,
1432                    duplicate_check: true,
1433                    should_broadcast: true,
1434                },
1435            )
1436            .build();
1437        Self::from_logical_plan(lp, opt_state)
1438    }
1439
1440    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1441    ///
1442    /// # Example
1443    ///
1444    /// ```rust
1445    /// use polars_core::prelude::*;
1446    /// use polars_lazy::prelude::*;
1447    /// fn add_columns(df: DataFrame) -> LazyFrame {
1448    ///     df.lazy()
1449    ///         .with_columns(
1450    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1451    ///          )
1452    /// }
1453    /// ```
1454    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1455        let exprs = exprs.as_ref().to_vec();
1456        self.with_columns_impl(
1457            exprs,
1458            ProjectionOptions {
1459                run_parallel: true,
1460                duplicate_check: true,
1461                should_broadcast: true,
1462            },
1463        )
1464    }
1465
1466    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1467    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1468        let exprs = exprs.as_ref().to_vec();
1469        self.with_columns_impl(
1470            exprs,
1471            ProjectionOptions {
1472                run_parallel: false,
1473                duplicate_check: true,
1474                should_broadcast: true,
1475            },
1476        )
1477    }
1478
1479    /// Match or evolve to a certain schema.
1480    pub fn match_to_schema(
1481        self,
1482        schema: SchemaRef,
1483        per_column: Arc<[MatchToSchemaPerColumn]>,
1484        extra_columns: ExtraColumnsPolicy,
1485    ) -> LazyFrame {
1486        let opt_state = self.get_opt_state();
1487        let lp = self
1488            .get_plan_builder()
1489            .match_to_schema(schema, per_column, extra_columns)
1490            .build();
1491        Self::from_logical_plan(lp, opt_state)
1492    }
1493
1494    pub fn pipe_with_schema(
1495        self,
1496        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1497    ) -> Self {
1498        let opt_state = self.get_opt_state();
1499        let lp = self
1500            .get_plan_builder()
1501            .pipe_with_schema(vec![], callback)
1502            .build();
1503        Self::from_logical_plan(lp, opt_state)
1504    }
1505
1506    pub fn pipe_with_schemas(
1507        self,
1508        others: Vec<LazyFrame>,
1509        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1510    ) -> Self {
1511        let opt_state = self.get_opt_state();
1512        let lp = self
1513            .get_plan_builder()
1514            .pipe_with_schema(
1515                others.into_iter().map(|lf| lf.logical_plan).collect(),
1516                callback,
1517            )
1518            .build();
1519        Self::from_logical_plan(lp, opt_state)
1520    }
1521
1522    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1523        let opt_state = self.get_opt_state();
1524        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1525        Self::from_logical_plan(lp, opt_state)
1526    }
1527
1528    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1529        let contexts = contexts
1530            .as_ref()
1531            .iter()
1532            .map(|lf| lf.logical_plan.clone())
1533            .collect();
1534        let opt_state = self.get_opt_state();
1535        let lp = self.get_plan_builder().with_context(contexts).build();
1536        Self::from_logical_plan(lp, opt_state)
1537    }
1538
1539    /// Aggregate all the columns as their maximum values.
1540    ///
1541    /// Aggregated columns will have the same names as the original columns.
1542    pub fn max(self) -> Self {
1543        self.map_private(DslFunction::Stats(StatsFunction::Max))
1544    }
1545
1546    /// Aggregate all the columns as their minimum values.
1547    ///
1548    /// Aggregated columns will have the same names as the original columns.
1549    pub fn min(self) -> Self {
1550        self.map_private(DslFunction::Stats(StatsFunction::Min))
1551    }
1552
1553    /// Aggregate all the columns as their sum values.
1554    ///
1555    /// Aggregated columns will have the same names as the original columns.
1556    ///
1557    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1558    /// - For integer columns, the ordinary checks for overflow are performed:
1559    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1560    ///   silently wrap.
1561    /// - String columns will sum to None.
1562    pub fn sum(self) -> Self {
1563        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1564    }
1565
1566    /// Aggregate all the columns as their mean values.
1567    ///
1568    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1569    /// - String columns will have a mean of None.
1570    pub fn mean(self) -> Self {
1571        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1572    }
1573
1574    /// Aggregate all the columns as their median values.
1575    ///
1576    /// - Boolean and integer results are converted to `f64`. However, they are still
1577    ///   susceptible to overflow before this conversion occurs.
1578    /// - String columns will sum to None.
1579    pub fn median(self) -> Self {
1580        self.map_private(DslFunction::Stats(StatsFunction::Median))
1581    }
1582
1583    /// Aggregate all the columns as their quantile values.
1584    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1585        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1586            quantile,
1587            method,
1588        }))
1589    }
1590
1591    /// Aggregate all the columns as their standard deviation values.
1592    ///
1593    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1594    /// computing the variance, where `N` is the number of rows.
1595    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1596    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1597    /// > likelihood estimate of the variance for normally distributed variables. The
1598    /// > standard deviation computed in this function is the square root of the estimated
1599    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1600    /// > standard deviation per se.
1601    ///
1602    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1603    pub fn std(self, ddof: u8) -> Self {
1604        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1605    }
1606
1607    /// Aggregate all the columns as their variance values.
1608    ///
1609    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1610    /// computing the variance, where `N` is the number of rows.
1611    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1612    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1613    /// > likelihood estimate of the variance for normally distributed variables.
1614    ///
1615    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1616    pub fn var(self, ddof: u8) -> Self {
1617        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1618    }
1619
1620    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1621    pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1622        self.explode_impl(columns, options, false)
1623    }
1624
1625    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1626    fn explode_impl(
1627        self,
1628        columns: Selector,
1629        options: ExplodeOptions,
1630        allow_empty: bool,
1631    ) -> LazyFrame {
1632        let opt_state = self.get_opt_state();
1633        let lp = self
1634            .get_plan_builder()
1635            .explode(columns, options, allow_empty)
1636            .build();
1637        Self::from_logical_plan(lp, opt_state)
1638    }
1639
1640    /// Aggregate all the columns as the sum of their null value count.
1641    pub fn null_count(self) -> LazyFrame {
1642        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1643    }
1644
1645    /// Drop non-unique rows and maintain the order of kept rows.
1646    ///
1647    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1648    /// `None`, all columns are considered.
1649    pub fn unique_stable(
1650        self,
1651        subset: Option<Selector>,
1652        keep_strategy: UniqueKeepStrategy,
1653    ) -> LazyFrame {
1654        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1655        self.unique_stable_generic(subset, keep_strategy)
1656    }
1657
1658    pub fn unique_stable_generic(
1659        self,
1660        subset: Option<Vec<Expr>>,
1661        keep_strategy: UniqueKeepStrategy,
1662    ) -> LazyFrame {
1663        let opt_state = self.get_opt_state();
1664        let options = DistinctOptionsDSL {
1665            subset,
1666            maintain_order: true,
1667            keep_strategy,
1668        };
1669        let lp = self.get_plan_builder().distinct(options).build();
1670        Self::from_logical_plan(lp, opt_state)
1671    }
1672
1673    /// Drop non-unique rows without maintaining the order of kept rows.
1674    ///
1675    /// The order of the kept rows may change; to maintain the original row order, use
1676    /// [`unique_stable`](LazyFrame::unique_stable).
1677    ///
1678    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1679    /// all columns are considered.
1680    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1681        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1682        self.unique_generic(subset, keep_strategy)
1683    }
1684
1685    pub fn unique_generic(
1686        self,
1687        subset: Option<Vec<Expr>>,
1688        keep_strategy: UniqueKeepStrategy,
1689    ) -> LazyFrame {
1690        let opt_state = self.get_opt_state();
1691        let options = DistinctOptionsDSL {
1692            subset,
1693            maintain_order: false,
1694            keep_strategy,
1695        };
1696        let lp = self.get_plan_builder().distinct(options).build();
1697        Self::from_logical_plan(lp, opt_state)
1698    }
1699
1700    /// Drop rows containing one or more NaN values.
1701    ///
1702    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1703    /// floating point columns are considered.
1704    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1705        let opt_state = self.get_opt_state();
1706        let lp = self.get_plan_builder().drop_nans(subset).build();
1707        Self::from_logical_plan(lp, opt_state)
1708    }
1709
1710    /// Drop rows containing one or more None values.
1711    ///
1712    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1713    /// columns are considered.
1714    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1715        let opt_state = self.get_opt_state();
1716        let lp = self.get_plan_builder().drop_nulls(subset).build();
1717        Self::from_logical_plan(lp, opt_state)
1718    }
1719
1720    /// Slice the DataFrame using an offset (starting row) and a length.
1721    ///
1722    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1723    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1724    /// end.
1725    ///
1726    /// If `offset` and `len` are such that the slice extends beyond the end of the
1727    /// DataFrame, the portion between `offset` and the end will be returned. In this
1728    /// case, the number of rows in the returned DataFrame will be less than `len`.
1729    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1730        let opt_state = self.get_opt_state();
1731        let lp = self.get_plan_builder().slice(offset, len).build();
1732        Self::from_logical_plan(lp, opt_state)
1733    }
1734
1735    /// Remove all the rows of the LazyFrame.
1736    pub fn clear(self) -> LazyFrame {
1737        self.slice(0, 0)
1738    }
1739
1740    /// Get the first row.
1741    ///
1742    /// Equivalent to `self.slice(0, 1)`.
1743    pub fn first(self) -> LazyFrame {
1744        self.slice(0, 1)
1745    }
1746
1747    /// Get the last row.
1748    ///
1749    /// Equivalent to `self.slice(-1, 1)`.
1750    pub fn last(self) -> LazyFrame {
1751        self.slice(-1, 1)
1752    }
1753
1754    /// Get the last `n` rows.
1755    ///
1756    /// Equivalent to `self.slice(-(n as i64), n)`.
1757    pub fn tail(self, n: IdxSize) -> LazyFrame {
1758        let neg_tail = -(n as i64);
1759        self.slice(neg_tail, n)
1760    }
1761
1762    #[cfg(feature = "pivot")]
1763    #[expect(clippy::too_many_arguments)]
1764    pub fn pivot(
1765        self,
1766        on: Selector,
1767        on_columns: Arc<DataFrame>,
1768        index: Selector,
1769        values: Selector,
1770        agg: Expr,
1771        maintain_order: bool,
1772        separator: PlSmallStr,
1773    ) -> LazyFrame {
1774        let opt_state = self.get_opt_state();
1775        let lp = self
1776            .get_plan_builder()
1777            .pivot(
1778                on,
1779                on_columns,
1780                index,
1781                values,
1782                agg,
1783                maintain_order,
1784                separator,
1785            )
1786            .build();
1787        Self::from_logical_plan(lp, opt_state)
1788    }
1789
1790    /// Unpivot the DataFrame from wide to long format.
1791    ///
1792    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1793    #[cfg(feature = "pivot")]
1794    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1795        let opt_state = self.get_opt_state();
1796        let lp = self.get_plan_builder().unpivot(args).build();
1797        Self::from_logical_plan(lp, opt_state)
1798    }
1799
1800    /// Limit the DataFrame to the first `n` rows.
1801    pub fn limit(self, n: IdxSize) -> LazyFrame {
1802        self.slice(0, n)
1803    }
1804
1805    /// Apply a function/closure once the logical plan get executed.
1806    ///
1807    /// The function has access to the whole materialized DataFrame at the time it is
1808    /// called.
1809    ///
1810    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1811    /// with `LazyFrame::with_column` or `with_columns`.
1812    ///
1813    /// ## Warning
1814    /// This can blow up in your face if the schema is changed due to the operation. The
1815    /// optimizer relies on a correct schema.
1816    ///
1817    /// You can toggle certain optimizations off.
1818    pub fn map<F>(
1819        self,
1820        function: F,
1821        optimizations: AllowedOptimizations,
1822        schema: Option<Arc<dyn UdfSchema>>,
1823        name: Option<&'static str>,
1824    ) -> LazyFrame
1825    where
1826        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1827    {
1828        let opt_state = self.get_opt_state();
1829        let lp = self
1830            .get_plan_builder()
1831            .map(
1832                function,
1833                optimizations,
1834                schema,
1835                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1836            )
1837            .build();
1838        Self::from_logical_plan(lp, opt_state)
1839    }
1840
1841    #[cfg(feature = "python")]
1842    pub fn map_python(
1843        self,
1844        function: polars_utils::python_function::PythonFunction,
1845        optimizations: AllowedOptimizations,
1846        schema: Option<SchemaRef>,
1847        validate_output: bool,
1848    ) -> LazyFrame {
1849        let opt_state = self.get_opt_state();
1850        let lp = self
1851            .get_plan_builder()
1852            .map_python(function, optimizations, schema, validate_output)
1853            .build();
1854        Self::from_logical_plan(lp, opt_state)
1855    }
1856
1857    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1858        let opt_state = self.get_opt_state();
1859        let lp = self.get_plan_builder().map_private(function).build();
1860        Self::from_logical_plan(lp, opt_state)
1861    }
1862
1863    /// Add a new column at index 0 that counts the rows.
1864    ///
1865    /// `name` is the name of the new column. `offset` is where to start counting from; if
1866    /// `None`, it is set to `0`.
1867    ///
1868    /// # Warning
1869    /// This can have a negative effect on query performance. This may for instance block
1870    /// predicate pushdown optimization.
1871    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1872    where
1873        S: Into<PlSmallStr>,
1874    {
1875        let name = name.into();
1876
1877        match &self.logical_plan {
1878            v @ DslPlan::Scan {
1879                scan_type,
1880                unified_scan_args,
1881                ..
1882            } if unified_scan_args.row_index.is_none()
1883                && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1884            {
1885                let DslPlan::Scan {
1886                    sources,
1887                    mut unified_scan_args,
1888                    scan_type,
1889                    cached_ir: _,
1890                } = v.clone()
1891                else {
1892                    unreachable!()
1893                };
1894
1895                unified_scan_args.row_index = Some(RowIndex {
1896                    name,
1897                    offset: offset.unwrap_or(0),
1898                });
1899
1900                DslPlan::Scan {
1901                    sources,
1902                    unified_scan_args,
1903                    scan_type,
1904                    cached_ir: Default::default(),
1905                }
1906                .into()
1907            },
1908            _ => self.map_private(DslFunction::RowIndex { name, offset }),
1909        }
1910    }
1911
1912    /// Return the number of non-null elements for each column.
1913    pub fn count(self) -> LazyFrame {
1914        self.select(vec![col(PlSmallStr::from_static("*")).count()])
1915    }
1916
1917    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
1918    /// inserted as columns.
1919    #[cfg(feature = "dtype-struct")]
1920    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1921        self.map_private(DslFunction::Unnest {
1922            columns: cols,
1923            separator,
1924        })
1925    }
1926
1927    #[cfg(feature = "merge_sorted")]
1928    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1929    where
1930        S: Into<PlSmallStr>,
1931    {
1932        let key = key.into();
1933
1934        let lp = DslPlan::MergeSorted {
1935            input_left: Arc::new(self.logical_plan),
1936            input_right: Arc::new(other.logical_plan),
1937            key,
1938        };
1939        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1940    }
1941
1942    pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1943        let lp = DslPlan::MapFunction {
1944            input: Arc::new(self.logical_plan),
1945            function: DslFunction::Hint(hint),
1946        };
1947        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1948    }
1949}
1950
1951/// Utility struct for lazy group_by operation.
1952#[derive(Clone)]
1953pub struct LazyGroupBy {
1954    pub logical_plan: DslPlan,
1955    opt_state: OptFlags,
1956    keys: Vec<Expr>,
1957    predicates: Vec<Expr>,
1958    maintain_order: bool,
1959    #[cfg(feature = "dynamic_group_by")]
1960    dynamic_options: Option<DynamicGroupOptions>,
1961    #[cfg(feature = "dynamic_group_by")]
1962    rolling_options: Option<RollingGroupOptions>,
1963}
1964
1965impl From<LazyGroupBy> for LazyFrame {
1966    fn from(lgb: LazyGroupBy) -> Self {
1967        Self {
1968            logical_plan: lgb.logical_plan,
1969            opt_state: lgb.opt_state,
1970            cached_arena: Default::default(),
1971        }
1972    }
1973}
1974
1975impl LazyGroupBy {
1976    /// Filter groups with a predicate after aggregation.
1977    ///
1978    /// Similarly to the [LazyGroupBy::agg] method, the predicate must run an aggregation as it
1979    /// is evaluated on the groups.
1980    /// This method can be chained in which case all predicates must evaluate to `true` for a
1981    /// group to be kept.
1982    ///
1983    /// # Example
1984    ///
1985    /// ```rust
1986    /// use polars_core::prelude::*;
1987    /// use polars_lazy::prelude::*;
1988    ///
1989    /// fn example(df: DataFrame) -> LazyFrame {
1990    ///       df.lazy()
1991    ///        .group_by_stable([col("date")])
1992    ///        .having(col("rain").sum().gt(lit(10)))
1993    ///        .agg([col("rain").min().alias("min_rain")])
1994    /// }
1995    /// ```
1996    pub fn having(mut self, predicate: Expr) -> Self {
1997        self.predicates.push(predicate);
1998        self
1999    }
2000
2001    /// Group by and aggregate.
2002    ///
2003    /// Select a column with [col] and choose an aggregation.
2004    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2005    ///
2006    /// # Example
2007    ///
2008    /// ```rust
2009    /// use polars_core::prelude::*;
2010    /// use polars_lazy::prelude::*;
2011    ///
2012    /// fn example(df: DataFrame) -> LazyFrame {
2013    ///       df.lazy()
2014    ///        .group_by_stable([col("date")])
2015    ///        .agg([
2016    ///            col("rain").min().alias("min_rain"),
2017    ///            col("rain").sum().alias("sum_rain"),
2018    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2019    ///        ])
2020    /// }
2021    /// ```
2022    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2023        #[cfg(feature = "dynamic_group_by")]
2024        let lp = DslBuilder::from(self.logical_plan)
2025            .group_by(
2026                self.keys,
2027                self.predicates,
2028                aggs,
2029                None,
2030                self.maintain_order,
2031                self.dynamic_options,
2032                self.rolling_options,
2033            )
2034            .build();
2035
2036        #[cfg(not(feature = "dynamic_group_by"))]
2037        let lp = DslBuilder::from(self.logical_plan)
2038            .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2039            .build();
2040        LazyFrame::from_logical_plan(lp, self.opt_state)
2041    }
2042
2043    /// Return first n rows of each group
2044    pub fn head(self, n: Option<usize>) -> LazyFrame {
2045        let keys = self
2046            .keys
2047            .iter()
2048            .filter_map(|expr| expr_output_name(expr).ok())
2049            .collect::<Vec<_>>();
2050
2051        self.agg([all().as_expr().head(n)]).explode_impl(
2052            all() - by_name(keys.iter().cloned(), false, false),
2053            ExplodeOptions {
2054                empty_as_null: true,
2055                keep_nulls: true,
2056            },
2057            true,
2058        )
2059    }
2060
2061    /// Return last n rows of each group
2062    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2063        let keys = self
2064            .keys
2065            .iter()
2066            .filter_map(|expr| expr_output_name(expr).ok())
2067            .collect::<Vec<_>>();
2068
2069        self.agg([all().as_expr().tail(n)]).explode_impl(
2070            all() - by_name(keys.iter().cloned(), false, false),
2071            ExplodeOptions {
2072                empty_as_null: true,
2073                keep_nulls: true,
2074            },
2075            true,
2076        )
2077    }
2078
2079    /// Apply a function over the groups as a new DataFrame.
2080    ///
2081    /// **It is not recommended that you use this as materializing the DataFrame is very
2082    /// expensive.**
2083    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2084        if !self.predicates.is_empty() {
2085            panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2086        }
2087
2088        #[cfg(feature = "dynamic_group_by")]
2089        let options = GroupbyOptions {
2090            dynamic: self.dynamic_options,
2091            rolling: self.rolling_options,
2092            slice: None,
2093        };
2094
2095        #[cfg(not(feature = "dynamic_group_by"))]
2096        let options = GroupbyOptions { slice: None };
2097
2098        let lp = DslPlan::GroupBy {
2099            input: Arc::new(self.logical_plan),
2100            keys: self.keys,
2101            predicates: vec![],
2102            aggs: vec![],
2103            apply: Some((f, schema)),
2104            maintain_order: self.maintain_order,
2105            options: Arc::new(options),
2106        };
2107        LazyFrame::from_logical_plan(lp, self.opt_state)
2108    }
2109}
2110
2111#[must_use]
2112pub struct JoinBuilder {
2113    lf: LazyFrame,
2114    how: JoinType,
2115    other: Option<LazyFrame>,
2116    left_on: Vec<Expr>,
2117    right_on: Vec<Expr>,
2118    allow_parallel: bool,
2119    force_parallel: bool,
2120    suffix: Option<PlSmallStr>,
2121    validation: JoinValidation,
2122    nulls_equal: bool,
2123    coalesce: JoinCoalesce,
2124    maintain_order: MaintainOrderJoin,
2125    build_side: Option<JoinBuildSide>,
2126}
2127impl JoinBuilder {
2128    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2129    pub fn new(lf: LazyFrame) -> Self {
2130        Self {
2131            lf,
2132            other: None,
2133            how: JoinType::Inner,
2134            left_on: vec![],
2135            right_on: vec![],
2136            allow_parallel: true,
2137            force_parallel: false,
2138            suffix: None,
2139            validation: Default::default(),
2140            nulls_equal: false,
2141            coalesce: Default::default(),
2142            maintain_order: Default::default(),
2143            build_side: None,
2144        }
2145    }
2146
2147    /// The right table in the join.
2148    pub fn with(mut self, other: LazyFrame) -> Self {
2149        self.other = Some(other);
2150        self
2151    }
2152
2153    /// Select the join type.
2154    pub fn how(mut self, how: JoinType) -> Self {
2155        self.how = how;
2156        self
2157    }
2158
2159    pub fn validate(mut self, validation: JoinValidation) -> Self {
2160        self.validation = validation;
2161        self
2162    }
2163
2164    /// The expressions you want to join both tables on.
2165    ///
2166    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2167    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2168        let on = on.as_ref().to_vec();
2169        self.left_on.clone_from(&on);
2170        self.right_on = on;
2171        self
2172    }
2173
2174    /// The expressions you want to join the left table on.
2175    ///
2176    /// The passed expressions must be valid in the left table.
2177    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2178        self.left_on = on.as_ref().to_vec();
2179        self
2180    }
2181
2182    /// The expressions you want to join the right table on.
2183    ///
2184    /// The passed expressions must be valid in the right table.
2185    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2186        self.right_on = on.as_ref().to_vec();
2187        self
2188    }
2189
2190    /// Allow parallel table evaluation.
2191    pub fn allow_parallel(mut self, allow: bool) -> Self {
2192        self.allow_parallel = allow;
2193        self
2194    }
2195
2196    /// Force parallel table evaluation.
2197    pub fn force_parallel(mut self, force: bool) -> Self {
2198        self.force_parallel = force;
2199        self
2200    }
2201
2202    /// Join on null values. By default null values will never produce matches.
2203    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2204        self.nulls_equal = nulls_equal;
2205        self
2206    }
2207
2208    /// Suffix to add duplicate column names in join.
2209    /// Defaults to `"_right"` if this method is never called.
2210    pub fn suffix<S>(mut self, suffix: S) -> Self
2211    where
2212        S: Into<PlSmallStr>,
2213    {
2214        self.suffix = Some(suffix.into());
2215        self
2216    }
2217
2218    /// Whether to coalesce join columns.
2219    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2220        self.coalesce = coalesce;
2221        self
2222    }
2223
2224    /// Whether to preserve the row order.
2225    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2226        self.maintain_order = maintain_order;
2227        self
2228    }
2229
2230    /// Whether to prefer a specific build side.
2231    pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2232        self.build_side = build_side;
2233        self
2234    }
2235
2236    /// Finish builder
2237    pub fn finish(self) -> LazyFrame {
2238        let opt_state = self.lf.opt_state;
2239        let other = self.other.expect("'with' not set in join builder");
2240
2241        let args = JoinArgs {
2242            how: self.how,
2243            validation: self.validation,
2244            suffix: self.suffix,
2245            slice: None,
2246            nulls_equal: self.nulls_equal,
2247            coalesce: self.coalesce,
2248            maintain_order: self.maintain_order,
2249            build_side: self.build_side,
2250        };
2251
2252        let lp = self
2253            .lf
2254            .get_plan_builder()
2255            .join(
2256                other.logical_plan,
2257                self.left_on,
2258                self.right_on,
2259                JoinOptions {
2260                    allow_parallel: self.allow_parallel,
2261                    force_parallel: self.force_parallel,
2262                    args,
2263                }
2264                .into(),
2265            )
2266            .build();
2267        LazyFrame::from_logical_plan(lp, opt_state)
2268    }
2269
2270    // Finish with join predicates
2271    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2272        let opt_state = self.lf.opt_state;
2273        let other = self.other.expect("with not set");
2274
2275        // Decompose `And` conjunctions into their component expressions
2276        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2277            if let Expr::BinaryExpr {
2278                op: Operator::And,
2279                left,
2280                right,
2281            } = predicate
2282            {
2283                decompose_and((*left).clone(), expanded_predicates);
2284                decompose_and((*right).clone(), expanded_predicates);
2285            } else {
2286                expanded_predicates.push(predicate);
2287            }
2288        }
2289        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2290        for predicate in predicates {
2291            decompose_and(predicate, &mut expanded_predicates);
2292        }
2293        let predicates: Vec<Expr> = expanded_predicates;
2294
2295        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2296        #[cfg(feature = "is_between")]
2297        let predicates: Vec<Expr> = {
2298            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2299            for predicate in predicates {
2300                if let Expr::Function {
2301                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2302                    input,
2303                    ..
2304                } = &predicate
2305                {
2306                    if let [expr, lower, upper] = input.as_slice() {
2307                        match closed {
2308                            ClosedInterval::Both => {
2309                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2310                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2311                            },
2312                            ClosedInterval::Right => {
2313                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2314                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2315                            },
2316                            ClosedInterval::Left => {
2317                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2318                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2319                            },
2320                            ClosedInterval::None => {
2321                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2322                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2323                            },
2324                        }
2325                        continue;
2326                    }
2327                }
2328                expanded_predicates.push(predicate);
2329            }
2330            expanded_predicates
2331        };
2332
2333        let args = JoinArgs {
2334            how: self.how,
2335            validation: self.validation,
2336            suffix: self.suffix,
2337            slice: None,
2338            nulls_equal: self.nulls_equal,
2339            coalesce: self.coalesce,
2340            maintain_order: self.maintain_order,
2341            build_side: self.build_side,
2342        };
2343        let options = JoinOptions {
2344            allow_parallel: self.allow_parallel,
2345            force_parallel: self.force_parallel,
2346            args,
2347        };
2348
2349        let lp = DslPlan::Join {
2350            input_left: Arc::new(self.lf.logical_plan),
2351            input_right: Arc::new(other.logical_plan),
2352            left_on: Default::default(),
2353            right_on: Default::default(),
2354            predicates,
2355            options: Arc::from(options),
2356        };
2357
2358        LazyFrame::from_logical_plan(lp, opt_state)
2359    }
2360}
2361
2362pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2363    #[cfg(not(feature = "new_streaming"))]
2364    {
2365        None
2366    }
2367    #[cfg(feature = "new_streaming")]
2368    {
2369        Some(polars_stream::build_streaming_query_executor)
2370    }
2371};
2372
2373pub struct CollectBatches {
2374    recv: Receiver<PolarsResult<DataFrame>>,
2375    runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2376}
2377
2378impl CollectBatches {
2379    /// Start running the query, if not already.
2380    pub fn start(&mut self) {
2381        if let Some(runner) = self.runner.take() {
2382            runner()
2383        }
2384    }
2385}
2386
2387impl Iterator for CollectBatches {
2388    type Item = PolarsResult<DataFrame>;
2389
2390    fn next(&mut self) -> Option<Self::Item> {
2391        self.start();
2392        self.recv.recv().ok()
2393    }
2394}