polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::num::NonZeroUsize;
13use std::sync::{Arc, Mutex};
14
15pub use anonymous_scan::*;
16#[cfg(feature = "csv")]
17pub use csv::*;
18#[cfg(not(target_arch = "wasm32"))]
19pub use exitable::*;
20pub use file_list_reader::*;
21#[cfg(feature = "json")]
22pub use ndjson::*;
23#[cfg(feature = "parquet")]
24pub use parquet::*;
25use polars_compute::rolling::QuantileMethod;
26use polars_core::POOL;
27use polars_core::error::feature_gated;
28use polars_core::prelude::*;
29use polars_io::RowIndex;
30use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
31use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
32use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
33#[cfg(feature = "is_between")]
34use polars_ops::prelude::ClosedInterval;
35pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
36use polars_utils::pl_str::PlSmallStr;
37use polars_utils::plpath::PlPath;
38use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
39
40use crate::frame::cached_arenas::CachedArena;
41use crate::prelude::*;
42
43pub trait IntoLazy {
44    fn lazy(self) -> LazyFrame;
45}
46
47impl IntoLazy for DataFrame {
48    /// Convert the `DataFrame` into a `LazyFrame`
49    fn lazy(self) -> LazyFrame {
50        let lp = DslBuilder::from_existing_df(self).build();
51        LazyFrame {
52            logical_plan: lp,
53            opt_state: Default::default(),
54            cached_arena: Default::default(),
55        }
56    }
57}
58
59impl IntoLazy for LazyFrame {
60    fn lazy(self) -> LazyFrame {
61        self
62    }
63}
64
65/// Lazy abstraction over an eager `DataFrame`.
66///
67/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
68/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
69#[derive(Clone, Default)]
70#[must_use]
71pub struct LazyFrame {
72    pub logical_plan: DslPlan,
73    pub(crate) opt_state: OptFlags,
74    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
75}
76
77impl From<DslPlan> for LazyFrame {
78    fn from(plan: DslPlan) -> Self {
79        Self {
80            logical_plan: plan,
81            opt_state: OptFlags::default(),
82            cached_arena: Default::default(),
83        }
84    }
85}
86
87impl LazyFrame {
88    pub(crate) fn from_inner(
89        logical_plan: DslPlan,
90        opt_state: OptFlags,
91        cached_arena: Arc<Mutex<Option<CachedArena>>>,
92    ) -> Self {
93        Self {
94            logical_plan,
95            opt_state,
96            cached_arena,
97        }
98    }
99
100    pub(crate) fn get_plan_builder(self) -> DslBuilder {
101        DslBuilder::from(self.logical_plan)
102    }
103
104    fn get_opt_state(&self) -> OptFlags {
105        self.opt_state
106    }
107
108    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
109        LazyFrame {
110            logical_plan,
111            opt_state,
112            cached_arena: Default::default(),
113        }
114    }
115
116    /// Get current optimizations.
117    pub fn get_current_optimizations(&self) -> OptFlags {
118        self.opt_state
119    }
120
121    /// Set allowed optimizations.
122    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
123        self.opt_state = opt_state;
124        self
125    }
126
127    /// Turn off all optimizations.
128    pub fn without_optimizations(self) -> Self {
129        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
130    }
131
132    /// Toggle projection pushdown optimization.
133    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
134        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
135        self
136    }
137
138    /// Toggle cluster with columns optimization.
139    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
140        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
141        self
142    }
143
144    /// Check if operations are order dependent and unset maintaining_order if
145    /// the order would not be observed.
146    pub fn with_check_order(mut self, toggle: bool) -> Self {
147        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
148        self
149    }
150
151    /// Toggle predicate pushdown optimization.
152    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
153        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
154        self
155    }
156
157    /// Toggle type coercion optimization.
158    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
159        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
160        self
161    }
162
163    /// Toggle type check optimization.
164    pub fn with_type_check(mut self, toggle: bool) -> Self {
165        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
166        self
167    }
168
169    /// Toggle expression simplification optimization on or off.
170    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
171        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
172        self
173    }
174
175    /// Toggle common subplan elimination optimization on or off
176    #[cfg(feature = "cse")]
177    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
178        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
179        self
180    }
181
182    /// Toggle common subexpression elimination optimization on or off
183    #[cfg(feature = "cse")]
184    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
185        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
186        self
187    }
188
189    /// Toggle slice pushdown optimization.
190    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
191        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
192        self
193    }
194
195    #[cfg(feature = "new_streaming")]
196    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
197        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
198        self
199    }
200
201    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
202    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
203        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
204        self
205    }
206
207    /// Run every node eagerly. This turns off multi-node optimizations.
208    pub fn _with_eager(mut self, toggle: bool) -> Self {
209        self.opt_state.set(OptFlags::EAGER, toggle);
210        self
211    }
212
213    /// Return a String describing the naive (un-optimized) logical plan.
214    pub fn describe_plan(&self) -> PolarsResult<String> {
215        Ok(self.clone().to_alp()?.describe())
216    }
217
218    /// Return a String describing the naive (un-optimized) logical plan in tree format.
219    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
220        Ok(self.clone().to_alp()?.describe_tree_format())
221    }
222
223    /// Return a String describing the optimized logical plan.
224    ///
225    /// Returns `Err` if optimizing the logical plan fails.
226    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
227        Ok(self.clone().to_alp_optimized()?.describe())
228    }
229
230    /// Return a String describing the optimized logical plan in tree format.
231    ///
232    /// Returns `Err` if optimizing the logical plan fails.
233    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
234        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
235    }
236
237    /// Return a String describing the logical plan.
238    ///
239    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
240    /// explains the naive, un-optimized plan.
241    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
242        if optimized {
243            self.describe_optimized_plan()
244        } else {
245            self.describe_plan()
246        }
247    }
248
249    /// Add a sort operation to the logical plan.
250    ///
251    /// Sorts the LazyFrame by the column name specified using the provided options.
252    ///
253    /// # Example
254    ///
255    /// Sort DataFrame by 'sepal_width' column:
256    /// ```rust
257    /// # use polars_core::prelude::*;
258    /// # use polars_lazy::prelude::*;
259    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
260    ///     df.lazy().sort(["sepal_width"], Default::default())
261    /// }
262    /// ```
263    /// Sort by a single column with specific order:
264    /// ```
265    /// # use polars_core::prelude::*;
266    /// # use polars_lazy::prelude::*;
267    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
268    ///     df.lazy().sort(
269    ///         ["sepal_width"],
270    ///         SortMultipleOptions::new()
271    ///             .with_order_descending(descending)
272    ///     )
273    /// }
274    /// ```
275    /// Sort by multiple columns with specifying order for each column:
276    /// ```
277    /// # use polars_core::prelude::*;
278    /// # use polars_lazy::prelude::*;
279    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
280    ///     df.lazy().sort(
281    ///         ["sepal_width", "sepal_length"],
282    ///         SortMultipleOptions::new()
283    ///             .with_order_descending_multi([false, true])
284    ///     )
285    /// }
286    /// ```
287    /// See [`SortMultipleOptions`] for more options.
288    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
289        let opt_state = self.get_opt_state();
290        let lp = self
291            .get_plan_builder()
292            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
293            .build();
294        Self::from_logical_plan(lp, opt_state)
295    }
296
297    /// Add a sort operation to the logical plan.
298    ///
299    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
300    /// concrete columns before sorting.
301    ///
302    /// See [`SortMultipleOptions`] for more options.
303    ///
304    /// # Example
305    ///
306    /// ```rust
307    /// use polars_core::prelude::*;
308    /// use polars_lazy::prelude::*;
309    ///
310    /// /// Sort DataFrame by 'sepal_width' column
311    /// fn example(df: DataFrame) -> LazyFrame {
312    ///       df.lazy()
313    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
314    /// }
315    /// ```
316    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
317        self,
318        by_exprs: E,
319        sort_options: SortMultipleOptions,
320    ) -> Self {
321        let by_exprs = by_exprs.as_ref().to_vec();
322        if by_exprs.is_empty() {
323            self
324        } else {
325            let opt_state = self.get_opt_state();
326            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
327            Self::from_logical_plan(lp, opt_state)
328        }
329    }
330
331    pub fn top_k<E: AsRef<[Expr]>>(
332        self,
333        k: IdxSize,
334        by_exprs: E,
335        sort_options: SortMultipleOptions,
336    ) -> Self {
337        // this will optimize to top-k
338        self.sort_by_exprs(
339            by_exprs,
340            sort_options.with_order_reversed().with_nulls_last(true),
341        )
342        .slice(0, k)
343    }
344
345    pub fn bottom_k<E: AsRef<[Expr]>>(
346        self,
347        k: IdxSize,
348        by_exprs: E,
349        sort_options: SortMultipleOptions,
350    ) -> Self {
351        // this will optimize to bottom-k
352        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
353            .slice(0, k)
354    }
355
356    /// Reverse the `DataFrame` from top to bottom.
357    ///
358    /// Row `i` becomes row `number_of_rows - i - 1`.
359    ///
360    /// # Example
361    ///
362    /// ```rust
363    /// use polars_core::prelude::*;
364    /// use polars_lazy::prelude::*;
365    ///
366    /// fn example(df: DataFrame) -> LazyFrame {
367    ///       df.lazy()
368    ///         .reverse()
369    /// }
370    /// ```
371    pub fn reverse(self) -> Self {
372        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
373    }
374
375    /// Rename columns in the DataFrame.
376    ///
377    /// `existing` and `new` are iterables of the same length containing the old and
378    /// corresponding new column names. Renaming happens to all `existing` columns
379    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
380    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
381    /// those columns that are actually found will be renamed (others will be ignored).
382    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
383    where
384        I: IntoIterator<Item = T>,
385        J: IntoIterator<Item = S>,
386        T: AsRef<str>,
387        S: AsRef<str>,
388    {
389        let iter = existing.into_iter();
390        let cap = iter.size_hint().0;
391        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
392        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
393
394        // TODO! should this error if `existing` and `new` have different lengths?
395        // Currently, the longer of the two is truncated.
396        for (existing, new) in iter.zip(new) {
397            let existing = existing.as_ref();
398            let new = new.as_ref();
399            if new != existing {
400                existing_vec.push(existing.into());
401                new_vec.push(new.into());
402            }
403        }
404
405        self.map_private(DslFunction::Rename {
406            existing: existing_vec.into(),
407            new: new_vec.into(),
408            strict,
409        })
410    }
411
412    /// Removes columns from the DataFrame.
413    /// Note that it's better to only select the columns you need
414    /// and let the projection pushdown optimize away the unneeded columns.
415    ///
416    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
417    /// error while materializing the [`LazyFrame`].
418    pub fn drop(self, columns: Selector) -> Self {
419        let opt_state = self.get_opt_state();
420        let lp = self.get_plan_builder().drop(columns).build();
421        Self::from_logical_plan(lp, opt_state)
422    }
423
424    /// Shift the values by a given period and fill the parts that will be empty due to this operation
425    /// with `Nones`.
426    ///
427    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
428    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
429        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
430    }
431
432    /// Shift the values by a given period and fill the parts that will be empty due to this operation
433    /// with the result of the `fill_value` expression.
434    ///
435    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
436    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
437        self.select(vec![
438            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
439        ])
440    }
441
442    /// Fill None values in the DataFrame with an expression.
443    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
444        let opt_state = self.get_opt_state();
445        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
446        Self::from_logical_plan(lp, opt_state)
447    }
448
449    /// Fill NaN values in the DataFrame with an expression.
450    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
451        let opt_state = self.get_opt_state();
452        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
453        Self::from_logical_plan(lp, opt_state)
454    }
455
456    /// Caches the result into a new LazyFrame.
457    ///
458    /// This should be used to prevent computations running multiple times.
459    pub fn cache(self) -> Self {
460        let opt_state = self.get_opt_state();
461        let lp = self.get_plan_builder().cache().build();
462        Self::from_logical_plan(lp, opt_state)
463    }
464
465    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
466    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
467        let cast_cols: Vec<Expr> = dtypes
468            .into_iter()
469            .map(|(name, dt)| {
470                let name = PlSmallStr::from_str(name);
471
472                if strict {
473                    col(name).strict_cast(dt)
474                } else {
475                    col(name).cast(dt)
476                }
477            })
478            .collect();
479
480        if cast_cols.is_empty() {
481            self
482        } else {
483            self.with_columns(cast_cols)
484        }
485    }
486
487    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
488    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
489        self.with_columns(vec![if strict {
490            col(PlSmallStr::from_static("*")).strict_cast(dtype)
491        } else {
492            col(PlSmallStr::from_static("*")).cast(dtype)
493        }])
494    }
495
496    pub fn optimize(
497        self,
498        lp_arena: &mut Arena<IR>,
499        expr_arena: &mut Arena<AExpr>,
500    ) -> PolarsResult<Node> {
501        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
502    }
503
504    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
505        let (mut lp_arena, mut expr_arena) = self.get_arenas();
506        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
507
508        Ok(IRPlan::new(node, lp_arena, expr_arena))
509    }
510
511    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
512        let (mut lp_arena, mut expr_arena) = self.get_arenas();
513        let node = to_alp(
514            self.logical_plan,
515            &mut expr_arena,
516            &mut lp_arena,
517            &mut self.opt_state,
518        )?;
519        let plan = IRPlan::new(node, lp_arena, expr_arena);
520        Ok(plan)
521    }
522
523    pub(crate) fn optimize_with_scratch(
524        self,
525        lp_arena: &mut Arena<IR>,
526        expr_arena: &mut Arena<AExpr>,
527        scratch: &mut Vec<Node>,
528    ) -> PolarsResult<Node> {
529        #[allow(unused_mut)]
530        let mut opt_state = self.opt_state;
531        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
532
533        #[cfg(feature = "cse")]
534        if new_streaming {
535            // The new streaming engine can't deal with the way the common
536            // subexpression elimination adds length-incorrect with_columns.
537            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
538        }
539
540        let lp_top = optimize(
541            self.logical_plan,
542            opt_state,
543            lp_arena,
544            expr_arena,
545            scratch,
546            apply_scan_predicate_to_scan_ir,
547        )?;
548
549        Ok(lp_top)
550    }
551
552    fn prepare_collect_post_opt<P>(
553        mut self,
554        check_sink: bool,
555        query_start: Option<std::time::Instant>,
556        post_opt: P,
557    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
558    where
559        P: FnOnce(
560            Node,
561            &mut Arena<IR>,
562            &mut Arena<AExpr>,
563            Option<std::time::Duration>,
564        ) -> PolarsResult<()>,
565    {
566        let (mut lp_arena, mut expr_arena) = self.get_arenas();
567
568        let mut scratch = vec![];
569        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
570
571        post_opt(
572            lp_top,
573            &mut lp_arena,
574            &mut expr_arena,
575            // Post optimization callback gets the time since the
576            // query was started as its "base" timepoint.
577            query_start.map(|s| s.elapsed()),
578        )?;
579
580        // sink should be replaced
581        let no_file_sink = if check_sink {
582            !matches!(
583                lp_arena.get(lp_top),
584                IR::Sink {
585                    payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
586                    ..
587                }
588            )
589        } else {
590            true
591        };
592        let physical_plan = create_physical_plan(
593            lp_top,
594            &mut lp_arena,
595            &mut expr_arena,
596            BUILD_STREAMING_EXECUTOR,
597        )?;
598
599        let state = ExecutionState::new();
600        Ok((state, physical_plan, no_file_sink))
601    }
602
603    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
604    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
605    where
606        P: FnOnce(
607            Node,
608            &mut Arena<IR>,
609            &mut Arena<AExpr>,
610            Option<std::time::Duration>,
611        ) -> PolarsResult<()>,
612    {
613        let (mut state, mut physical_plan, _) =
614            self.prepare_collect_post_opt(false, None, post_opt)?;
615        physical_plan.execute(&mut state)
616    }
617
618    #[allow(unused_mut)]
619    fn prepare_collect(
620        self,
621        check_sink: bool,
622        query_start: Option<std::time::Instant>,
623    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
624        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
625    }
626
627    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
628    /// `engine`.
629    ///
630    /// The query is optimized prior to execution.
631    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
632        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
633            payload.clone()
634        } else {
635            self.logical_plan = DslPlan::Sink {
636                input: Arc::new(self.logical_plan),
637                payload: SinkType::Memory,
638            };
639            SinkType::Memory
640        };
641
642        // Default engine for collect is InMemory, sink_* is Streaming
643        if engine == Engine::Auto {
644            engine = match payload {
645                #[cfg(feature = "new_streaming")]
646                SinkType::Callback { .. } | SinkType::File { .. } | SinkType::Partition { .. } => {
647                    Engine::Streaming
648                },
649                _ => Engine::InMemory,
650            };
651        }
652        // Gpu uses some hacks to dispatch.
653        if engine == Engine::Gpu {
654            engine = Engine::InMemory;
655        }
656
657        #[cfg(feature = "new_streaming")]
658        {
659            if let Some(result) = self.try_new_streaming_if_requested() {
660                return result.map(|v| v.unwrap_single());
661            }
662        }
663
664        match engine {
665            Engine::Auto => unreachable!(),
666            Engine::Streaming => {
667                feature_gated!("new_streaming", self = self.with_new_streaming(true))
668            },
669            _ => {},
670        }
671        let mut alp_plan = self.clone().to_alp_optimized()?;
672
673        match engine {
674            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
675                let result = polars_stream::run_query(
676                    alp_plan.lp_top,
677                    &mut alp_plan.lp_arena,
678                    &mut alp_plan.expr_arena,
679                );
680                result.map(|v| v.unwrap_single())
681            }),
682            Engine::Gpu => {
683                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
684            },
685            Engine::InMemory => {
686                let mut physical_plan = create_physical_plan(
687                    alp_plan.lp_top,
688                    &mut alp_plan.lp_arena,
689                    &mut alp_plan.expr_arena,
690                    BUILD_STREAMING_EXECUTOR,
691                )?;
692                let mut state = ExecutionState::new();
693                physical_plan.execute(&mut state)
694            },
695        }
696    }
697
698    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
699        let sink_multiple = LazyFrame {
700            logical_plan: DslPlan::SinkMultiple { inputs: plans },
701            opt_state,
702            cached_arena: Default::default(),
703        };
704        sink_multiple.explain(true)
705    }
706
707    pub fn collect_all_with_engine(
708        plans: Vec<DslPlan>,
709        mut engine: Engine,
710        opt_state: OptFlags,
711    ) -> PolarsResult<Vec<DataFrame>> {
712        if plans.is_empty() {
713            return Ok(Vec::new());
714        }
715
716        // Default engine for collect_all is InMemory
717        if engine == Engine::Auto {
718            engine = Engine::InMemory;
719        }
720        // Gpu uses some hacks to dispatch.
721        if engine == Engine::Gpu {
722            engine = Engine::InMemory;
723        }
724
725        let mut sink_multiple = LazyFrame {
726            logical_plan: DslPlan::SinkMultiple { inputs: plans },
727            opt_state,
728            cached_arena: Default::default(),
729        };
730
731        #[cfg(feature = "new_streaming")]
732        {
733            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
734                return result.map(|v| v.unwrap_multiple());
735            }
736        }
737
738        match engine {
739            Engine::Auto => unreachable!(),
740            Engine::Streaming => {
741                feature_gated!(
742                    "new_streaming",
743                    sink_multiple = sink_multiple.with_new_streaming(true)
744                )
745            },
746            _ => {},
747        }
748        let mut alp_plan = sink_multiple.to_alp_optimized()?;
749
750        if engine == Engine::Streaming {
751            feature_gated!("new_streaming", {
752                let result = polars_stream::run_query(
753                    alp_plan.lp_top,
754                    &mut alp_plan.lp_arena,
755                    &mut alp_plan.expr_arena,
756                );
757                return result.map(|v| v.unwrap_multiple());
758            });
759        }
760
761        let IR::SinkMultiple { inputs } = alp_plan.root() else {
762            unreachable!()
763        };
764
765        let mut multiplan = create_multiple_physical_plans(
766            inputs.clone().as_slice(),
767            &mut alp_plan.lp_arena,
768            &mut alp_plan.expr_arena,
769            BUILD_STREAMING_EXECUTOR,
770        )?;
771
772        match engine {
773            Engine::Gpu => polars_bail!(
774                InvalidOperation: "collect_all is not supported for the gpu engine"
775            ),
776            Engine::InMemory => {
777                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
778                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
779                // within bounds
780                let mut state = ExecutionState::new();
781                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
782                    cache_prefiller.execute(&mut state)?;
783                }
784                let out = POOL.install(|| {
785                    multiplan
786                        .physical_plans
787                        .chunks_mut(POOL.current_num_threads() * 3)
788                        .map(|chunk| {
789                            chunk
790                                .into_par_iter()
791                                .enumerate()
792                                .map(|(idx, input)| {
793                                    let mut input = std::mem::take(input);
794                                    let mut state = state.split();
795                                    state.branch_idx += idx;
796
797                                    let df = input.execute(&mut state)?;
798                                    Ok(df)
799                                })
800                                .collect::<PolarsResult<Vec<_>>>()
801                        })
802                        .collect::<PolarsResult<Vec<_>>>()
803                });
804                Ok(out?.into_iter().flatten().collect())
805            },
806            _ => unreachable!(),
807        }
808    }
809
810    /// Execute all the lazy operations and collect them into a [`DataFrame`].
811    ///
812    /// The query is optimized prior to execution.
813    ///
814    /// # Example
815    ///
816    /// ```rust
817    /// use polars_core::prelude::*;
818    /// use polars_lazy::prelude::*;
819    ///
820    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
821    ///     df.lazy()
822    ///       .group_by([col("foo")])
823    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
824    ///       .collect()
825    /// }
826    /// ```
827    pub fn collect(self) -> PolarsResult<DataFrame> {
828        self.collect_with_engine(Engine::InMemory)
829    }
830
831    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
832    // This version does profiling of the node execution.
833    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
834    where
835        P: FnOnce(
836            Node,
837            &mut Arena<IR>,
838            &mut Arena<AExpr>,
839            Option<std::time::Duration>,
840        ) -> PolarsResult<()>,
841    {
842        let query_start = std::time::Instant::now();
843        let (mut state, mut physical_plan, _) =
844            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
845        state.time_nodes(query_start);
846        let out = physical_plan.execute(&mut state)?;
847        let timer_df = state.finish_timer()?;
848        Ok((out, timer_df))
849    }
850
851    /// Profile a LazyFrame.
852    ///
853    /// This will run the query and return a tuple
854    /// containing the materialized DataFrame and a DataFrame that contains profiling information
855    /// of each node that is executed.
856    ///
857    /// The units of the timings are microseconds.
858    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
859        self._profile_post_opt(|_, _, _, _| Ok(()))
860    }
861
862    /// Stream a query result into a parquet file. This is useful if the final result doesn't fit
863    /// into memory. This methods will return an error if the query cannot be completely done in a
864    /// streaming fashion.
865    #[cfg(feature = "parquet")]
866    pub fn sink_parquet(
867        self,
868        target: SinkTarget,
869        options: ParquetWriteOptions,
870        cloud_options: Option<polars_io::cloud::CloudOptions>,
871        sink_options: SinkOptions,
872    ) -> PolarsResult<Self> {
873        self.sink(SinkType::File(FileSinkType {
874            target,
875            sink_options,
876            file_type: FileType::Parquet(options),
877            cloud_options,
878        }))
879    }
880
881    /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
882    /// into memory. This methods will return an error if the query cannot be completely done in a
883    /// streaming fashion.
884    #[cfg(feature = "ipc")]
885    pub fn sink_ipc(
886        self,
887        target: SinkTarget,
888        options: IpcWriterOptions,
889        cloud_options: Option<polars_io::cloud::CloudOptions>,
890        sink_options: SinkOptions,
891    ) -> PolarsResult<Self> {
892        self.sink(SinkType::File(FileSinkType {
893            target,
894            sink_options,
895            file_type: FileType::Ipc(options),
896            cloud_options,
897        }))
898    }
899
900    /// Stream a query result into an csv file. This is useful if the final result doesn't fit
901    /// into memory. This methods will return an error if the query cannot be completely done in a
902    /// streaming fashion.
903    #[cfg(feature = "csv")]
904    pub fn sink_csv(
905        self,
906        target: SinkTarget,
907        options: CsvWriterOptions,
908        cloud_options: Option<polars_io::cloud::CloudOptions>,
909        sink_options: SinkOptions,
910    ) -> PolarsResult<Self> {
911        self.sink(SinkType::File(FileSinkType {
912            target,
913            sink_options,
914            file_type: FileType::Csv(options),
915            cloud_options,
916        }))
917    }
918
919    /// Stream a query result into a JSON file. This is useful if the final result doesn't fit
920    /// into memory. This methods will return an error if the query cannot be completely done in a
921    /// streaming fashion.
922    #[cfg(feature = "json")]
923    pub fn sink_json(
924        self,
925        target: SinkTarget,
926        options: JsonWriterOptions,
927        cloud_options: Option<polars_io::cloud::CloudOptions>,
928        sink_options: SinkOptions,
929    ) -> PolarsResult<Self> {
930        self.sink(SinkType::File(FileSinkType {
931            target,
932            sink_options,
933            file_type: FileType::Json(options),
934            cloud_options,
935        }))
936    }
937
938    /// Stream a query result into a parquet file in a partitioned manner. This is useful if the
939    /// final result doesn't fit into memory. This methods will return an error if the query cannot
940    /// be completely done in a streaming fashion.
941    #[cfg(feature = "parquet")]
942    #[allow(clippy::too_many_arguments)]
943    pub fn sink_parquet_partitioned(
944        self,
945        base_path: Arc<PlPath>,
946        file_path_cb: Option<PartitionTargetCallback>,
947        variant: PartitionVariant,
948        options: ParquetWriteOptions,
949        cloud_options: Option<polars_io::cloud::CloudOptions>,
950        sink_options: SinkOptions,
951        per_partition_sort_by: Option<Vec<SortColumn>>,
952        finish_callback: Option<SinkFinishCallback>,
953    ) -> PolarsResult<Self> {
954        self.sink(SinkType::Partition(PartitionSinkType {
955            base_path,
956            file_path_cb,
957            sink_options,
958            variant,
959            file_type: FileType::Parquet(options),
960            cloud_options,
961            per_partition_sort_by,
962            finish_callback,
963        }))
964    }
965
966    /// Stream a query result into an ipc/arrow file in a partitioned manner. This is useful if the
967    /// final result doesn't fit into memory. This methods will return an error if the query cannot
968    /// be completely done in a streaming fashion.
969    #[cfg(feature = "ipc")]
970    #[allow(clippy::too_many_arguments)]
971    pub fn sink_ipc_partitioned(
972        self,
973        base_path: Arc<PlPath>,
974        file_path_cb: Option<PartitionTargetCallback>,
975        variant: PartitionVariant,
976        options: IpcWriterOptions,
977        cloud_options: Option<polars_io::cloud::CloudOptions>,
978        sink_options: SinkOptions,
979        per_partition_sort_by: Option<Vec<SortColumn>>,
980        finish_callback: Option<SinkFinishCallback>,
981    ) -> PolarsResult<Self> {
982        self.sink(SinkType::Partition(PartitionSinkType {
983            base_path,
984            file_path_cb,
985            sink_options,
986            variant,
987            file_type: FileType::Ipc(options),
988            cloud_options,
989            per_partition_sort_by,
990            finish_callback,
991        }))
992    }
993
994    /// Stream a query result into an csv file in a partitioned manner. This is useful if the final
995    /// result doesn't fit into memory. This methods will return an error if the query cannot be
996    /// completely done in a streaming fashion.
997    #[cfg(feature = "csv")]
998    #[allow(clippy::too_many_arguments)]
999    pub fn sink_csv_partitioned(
1000        self,
1001        base_path: Arc<PlPath>,
1002        file_path_cb: Option<PartitionTargetCallback>,
1003        variant: PartitionVariant,
1004        options: CsvWriterOptions,
1005        cloud_options: Option<polars_io::cloud::CloudOptions>,
1006        sink_options: SinkOptions,
1007        per_partition_sort_by: Option<Vec<SortColumn>>,
1008        finish_callback: Option<SinkFinishCallback>,
1009    ) -> PolarsResult<Self> {
1010        self.sink(SinkType::Partition(PartitionSinkType {
1011            base_path,
1012            file_path_cb,
1013            sink_options,
1014            variant,
1015            file_type: FileType::Csv(options),
1016            cloud_options,
1017            per_partition_sort_by,
1018            finish_callback,
1019        }))
1020    }
1021
1022    /// Stream a query result into a JSON file in a partitioned manner. This is useful if the final
1023    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1024    /// completely done in a streaming fashion.
1025    #[cfg(feature = "json")]
1026    #[allow(clippy::too_many_arguments)]
1027    pub fn sink_json_partitioned(
1028        self,
1029        base_path: Arc<PlPath>,
1030        file_path_cb: Option<PartitionTargetCallback>,
1031        variant: PartitionVariant,
1032        options: JsonWriterOptions,
1033        cloud_options: Option<polars_io::cloud::CloudOptions>,
1034        sink_options: SinkOptions,
1035        per_partition_sort_by: Option<Vec<SortColumn>>,
1036        finish_callback: Option<SinkFinishCallback>,
1037    ) -> PolarsResult<Self> {
1038        self.sink(SinkType::Partition(PartitionSinkType {
1039            base_path,
1040            file_path_cb,
1041            sink_options,
1042            variant,
1043            file_type: FileType::Json(options),
1044            cloud_options,
1045            per_partition_sort_by,
1046            finish_callback,
1047        }))
1048    }
1049
1050    pub fn sink_batches(
1051        self,
1052        function: PlanCallback<DataFrame, bool>,
1053        maintain_order: bool,
1054        chunk_size: Option<NonZeroUsize>,
1055    ) -> PolarsResult<Self> {
1056        self.sink(SinkType::Callback(CallbackSinkType {
1057            function,
1058            maintain_order,
1059            chunk_size,
1060        }))
1061    }
1062
1063    #[cfg(feature = "new_streaming")]
1064    pub fn try_new_streaming_if_requested(
1065        &mut self,
1066    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1067        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1068        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1069
1070        if auto_new_streaming || force_new_streaming {
1071            // Try to run using the new streaming engine, falling back
1072            // if it fails in a todo!() error if auto_new_streaming is set.
1073            let mut new_stream_lazy = self.clone();
1074            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1075            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1076                Ok(v) => v,
1077                Err(e) => return Some(Err(e)),
1078            };
1079
1080            let f = || {
1081                polars_stream::run_query(
1082                    alp_plan.lp_top,
1083                    &mut alp_plan.lp_arena,
1084                    &mut alp_plan.expr_arena,
1085                )
1086            };
1087
1088            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1089                Ok(v) => return Some(v),
1090                Err(e) => {
1091                    // Fallback to normal engine if error is due to not being implemented
1092                    // and auto_new_streaming is set, otherwise propagate error.
1093                    if !force_new_streaming
1094                        && auto_new_streaming
1095                        && e.downcast_ref::<&str>()
1096                            .map(|s| s.starts_with("not yet implemented"))
1097                            .unwrap_or(false)
1098                    {
1099                        if polars_core::config::verbose() {
1100                            eprintln!(
1101                                "caught unimplemented error in new streaming engine, falling back to normal engine"
1102                            );
1103                        }
1104                    } else {
1105                        std::panic::resume_unwind(e);
1106                    }
1107                },
1108            }
1109        }
1110
1111        None
1112    }
1113
1114    fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1115        polars_ensure!(
1116            !matches!(self.logical_plan, DslPlan::Sink { .. }),
1117            InvalidOperation: "cannot create a sink on top of another sink"
1118        );
1119        self.logical_plan = DslPlan::Sink {
1120            input: Arc::new(self.logical_plan),
1121            payload,
1122        };
1123        Ok(self)
1124    }
1125
1126    /// Filter frame rows that match a predicate expression.
1127    ///
1128    /// The expression must yield boolean values (note that rows where the
1129    /// predicate resolves to `null` are *not* included in the resulting frame).
1130    ///
1131    /// # Example
1132    ///
1133    /// ```rust
1134    /// use polars_core::prelude::*;
1135    /// use polars_lazy::prelude::*;
1136    ///
1137    /// fn example(df: DataFrame) -> LazyFrame {
1138    ///       df.lazy()
1139    ///         .filter(col("sepal_width").is_not_null())
1140    ///         .select([col("sepal_width"), col("sepal_length")])
1141    /// }
1142    /// ```
1143    pub fn filter(self, predicate: Expr) -> Self {
1144        let opt_state = self.get_opt_state();
1145        let lp = self.get_plan_builder().filter(predicate).build();
1146        Self::from_logical_plan(lp, opt_state)
1147    }
1148
1149    /// Remove frame rows that match a predicate expression.
1150    ///
1151    /// The expression must yield boolean values (note that rows where the
1152    /// predicate resolves to `null` are *not* removed from the resulting frame).
1153    ///
1154    /// # Example
1155    ///
1156    /// ```rust
1157    /// use polars_core::prelude::*;
1158    /// use polars_lazy::prelude::*;
1159    ///
1160    /// fn example(df: DataFrame) -> LazyFrame {
1161    ///       df.lazy()
1162    ///         .remove(col("sepal_width").is_null())
1163    ///         .select([col("sepal_width"), col("sepal_length")])
1164    /// }
1165    /// ```
1166    pub fn remove(self, predicate: Expr) -> Self {
1167        self.filter(predicate.neq_missing(lit(true)))
1168    }
1169
1170    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1171    ///
1172    /// Columns can be selected with [`col`];
1173    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1174    ///
1175    /// # Example
1176    ///
1177    /// ```rust
1178    /// use polars_core::prelude::*;
1179    /// use polars_lazy::prelude::*;
1180    ///
1181    /// /// This function selects column "foo" and column "bar".
1182    /// /// Column "bar" is renamed to "ham".
1183    /// fn example(df: DataFrame) -> LazyFrame {
1184    ///       df.lazy()
1185    ///         .select([col("foo"),
1186    ///                   col("bar").alias("ham")])
1187    /// }
1188    ///
1189    /// /// This function selects all columns except "foo"
1190    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1191    ///       df.lazy()
1192    ///         .select([all().exclude_cols(["foo"]).as_expr()])
1193    /// }
1194    /// ```
1195    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1196        let exprs = exprs.as_ref().to_vec();
1197        self.select_impl(
1198            exprs,
1199            ProjectionOptions {
1200                run_parallel: true,
1201                duplicate_check: true,
1202                should_broadcast: true,
1203            },
1204        )
1205    }
1206
1207    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1208        let exprs = exprs.as_ref().to_vec();
1209        self.select_impl(
1210            exprs,
1211            ProjectionOptions {
1212                run_parallel: false,
1213                duplicate_check: true,
1214                should_broadcast: true,
1215            },
1216        )
1217    }
1218
1219    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1220        let opt_state = self.get_opt_state();
1221        let lp = self.get_plan_builder().project(exprs, options).build();
1222        Self::from_logical_plan(lp, opt_state)
1223    }
1224
1225    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1226    ///
1227    /// Takes a list of expressions to group on.
1228    ///
1229    /// # Example
1230    ///
1231    /// ```rust
1232    /// use polars_core::prelude::*;
1233    /// use polars_lazy::prelude::*;
1234    ///
1235    /// fn example(df: DataFrame) -> LazyFrame {
1236    ///       df.lazy()
1237    ///        .group_by([col("date")])
1238    ///        .agg([
1239    ///            col("rain").min().alias("min_rain"),
1240    ///            col("rain").sum().alias("sum_rain"),
1241    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1242    ///        ])
1243    /// }
1244    /// ```
1245    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1246        let keys = by
1247            .as_ref()
1248            .iter()
1249            .map(|e| e.clone().into())
1250            .collect::<Vec<_>>();
1251        let opt_state = self.get_opt_state();
1252
1253        #[cfg(feature = "dynamic_group_by")]
1254        {
1255            LazyGroupBy {
1256                logical_plan: self.logical_plan,
1257                opt_state,
1258                keys,
1259                maintain_order: false,
1260                dynamic_options: None,
1261                rolling_options: None,
1262            }
1263        }
1264
1265        #[cfg(not(feature = "dynamic_group_by"))]
1266        {
1267            LazyGroupBy {
1268                logical_plan: self.logical_plan,
1269                opt_state,
1270                keys,
1271                maintain_order: false,
1272            }
1273        }
1274    }
1275
1276    /// Create rolling groups based on a time column.
1277    ///
1278    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1279    ///
1280    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1281    /// individual values and are not of constant intervals. For constant intervals use
1282    /// *group_by_dynamic*
1283    #[cfg(feature = "dynamic_group_by")]
1284    pub fn rolling<E: AsRef<[Expr]>>(
1285        mut self,
1286        index_column: Expr,
1287        group_by: E,
1288        mut options: RollingGroupOptions,
1289    ) -> LazyGroupBy {
1290        if let Expr::Column(name) = index_column {
1291            options.index_column = name;
1292        } else {
1293            let output_field = index_column
1294                .to_field(&self.collect_schema().unwrap())
1295                .unwrap();
1296            return self.with_column(index_column).rolling(
1297                Expr::Column(output_field.name().clone()),
1298                group_by,
1299                options,
1300            );
1301        }
1302        let opt_state = self.get_opt_state();
1303        LazyGroupBy {
1304            logical_plan: self.logical_plan,
1305            opt_state,
1306            keys: group_by.as_ref().to_vec(),
1307            maintain_order: true,
1308            dynamic_options: None,
1309            rolling_options: Some(options),
1310        }
1311    }
1312
1313    /// Group based on a time value (or index value of type Int32, Int64).
1314    ///
1315    /// Time windows are calculated and rows are assigned to windows. Different from a
1316    /// normal group_by is that a row can be member of multiple groups. The time/index
1317    /// window could be seen as a rolling window, with a window size determined by
1318    /// dates/times/values instead of slots in the DataFrame.
1319    ///
1320    /// A window is defined by:
1321    ///
1322    /// - every: interval of the window
1323    /// - period: length of the window
1324    /// - offset: offset of the window
1325    ///
1326    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1327    /// with a ordinary group_by on these keys.
1328    #[cfg(feature = "dynamic_group_by")]
1329    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1330        mut self,
1331        index_column: Expr,
1332        group_by: E,
1333        mut options: DynamicGroupOptions,
1334    ) -> LazyGroupBy {
1335        if let Expr::Column(name) = index_column {
1336            options.index_column = name;
1337        } else {
1338            let output_field = index_column
1339                .to_field(&self.collect_schema().unwrap())
1340                .unwrap();
1341            return self.with_column(index_column).group_by_dynamic(
1342                Expr::Column(output_field.name().clone()),
1343                group_by,
1344                options,
1345            );
1346        }
1347        let opt_state = self.get_opt_state();
1348        LazyGroupBy {
1349            logical_plan: self.logical_plan,
1350            opt_state,
1351            keys: group_by.as_ref().to_vec(),
1352            maintain_order: true,
1353            dynamic_options: Some(options),
1354            rolling_options: None,
1355        }
1356    }
1357
1358    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1359    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1360        let keys = by
1361            .as_ref()
1362            .iter()
1363            .map(|e| e.clone().into())
1364            .collect::<Vec<_>>();
1365        let opt_state = self.get_opt_state();
1366
1367        #[cfg(feature = "dynamic_group_by")]
1368        {
1369            LazyGroupBy {
1370                logical_plan: self.logical_plan,
1371                opt_state,
1372                keys,
1373                maintain_order: true,
1374                dynamic_options: None,
1375                rolling_options: None,
1376            }
1377        }
1378
1379        #[cfg(not(feature = "dynamic_group_by"))]
1380        {
1381            LazyGroupBy {
1382                logical_plan: self.logical_plan,
1383                opt_state,
1384                keys,
1385                maintain_order: true,
1386            }
1387        }
1388    }
1389
1390    /// Left anti join this query with another lazy query.
1391    ///
1392    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1393    /// flexible join logic, see [`join`](LazyFrame::join) or
1394    /// [`join_builder`](LazyFrame::join_builder).
1395    ///
1396    /// # Example
1397    ///
1398    /// ```rust
1399    /// use polars_core::prelude::*;
1400    /// use polars_lazy::prelude::*;
1401    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1402    ///         ldf
1403    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1404    /// }
1405    /// ```
1406    #[cfg(feature = "semi_anti_join")]
1407    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1408        self.join(
1409            other,
1410            [left_on.into()],
1411            [right_on.into()],
1412            JoinArgs::new(JoinType::Anti),
1413        )
1414    }
1415
1416    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1417    #[cfg(feature = "cross_join")]
1418    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1419        self.join(
1420            other,
1421            vec![],
1422            vec![],
1423            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1424        )
1425    }
1426
1427    /// Left outer join this query with another lazy query.
1428    ///
1429    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1430    /// flexible join logic, see [`join`](LazyFrame::join) or
1431    /// [`join_builder`](LazyFrame::join_builder).
1432    ///
1433    /// # Example
1434    ///
1435    /// ```rust
1436    /// use polars_core::prelude::*;
1437    /// use polars_lazy::prelude::*;
1438    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1439    ///         ldf
1440    ///         .left_join(other, col("foo"), col("bar"))
1441    /// }
1442    /// ```
1443    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1444        self.join(
1445            other,
1446            [left_on.into()],
1447            [right_on.into()],
1448            JoinArgs::new(JoinType::Left),
1449        )
1450    }
1451
1452    /// Inner join this query with another lazy query.
1453    ///
1454    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1455    /// flexible join logic, see [`join`](LazyFrame::join) or
1456    /// [`join_builder`](LazyFrame::join_builder).
1457    ///
1458    /// # Example
1459    ///
1460    /// ```rust
1461    /// use polars_core::prelude::*;
1462    /// use polars_lazy::prelude::*;
1463    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1464    ///         ldf
1465    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1466    /// }
1467    /// ```
1468    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1469        self.join(
1470            other,
1471            [left_on.into()],
1472            [right_on.into()],
1473            JoinArgs::new(JoinType::Inner),
1474        )
1475    }
1476
1477    /// Full outer join this query with another lazy query.
1478    ///
1479    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1480    /// flexible join logic, see [`join`](LazyFrame::join) or
1481    /// [`join_builder`](LazyFrame::join_builder).
1482    ///
1483    /// # Example
1484    ///
1485    /// ```rust
1486    /// use polars_core::prelude::*;
1487    /// use polars_lazy::prelude::*;
1488    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1489    ///         ldf
1490    ///         .full_join(other, col("foo"), col("bar"))
1491    /// }
1492    /// ```
1493    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1494        self.join(
1495            other,
1496            [left_on.into()],
1497            [right_on.into()],
1498            JoinArgs::new(JoinType::Full),
1499        )
1500    }
1501
1502    /// Left semi join this query with another lazy query.
1503    ///
1504    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1505    /// flexible join logic, see [`join`](LazyFrame::join) or
1506    /// [`join_builder`](LazyFrame::join_builder).
1507    ///
1508    /// # Example
1509    ///
1510    /// ```rust
1511    /// use polars_core::prelude::*;
1512    /// use polars_lazy::prelude::*;
1513    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1514    ///         ldf
1515    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1516    /// }
1517    /// ```
1518    #[cfg(feature = "semi_anti_join")]
1519    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1520        self.join(
1521            other,
1522            [left_on.into()],
1523            [right_on.into()],
1524            JoinArgs::new(JoinType::Semi),
1525        )
1526    }
1527
1528    /// Generic function to join two LazyFrames.
1529    ///
1530    /// `join` can join on multiple columns, given as two list of expressions, and with a
1531    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1532    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1533    /// over how columns are renamed and parallelization options, use
1534    /// [`join_builder`](LazyFrame::join_builder).
1535    ///
1536    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1537    ///
1538    /// # Example
1539    ///
1540    /// ```rust
1541    /// use polars_core::prelude::*;
1542    /// use polars_lazy::prelude::*;
1543    ///
1544    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1545    ///         ldf
1546    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1547    /// }
1548    /// ```
1549    pub fn join<E: AsRef<[Expr]>>(
1550        self,
1551        other: LazyFrame,
1552        left_on: E,
1553        right_on: E,
1554        args: JoinArgs,
1555    ) -> LazyFrame {
1556        let left_on = left_on.as_ref().to_vec();
1557        let right_on = right_on.as_ref().to_vec();
1558
1559        self._join_impl(other, left_on, right_on, args)
1560    }
1561
1562    fn _join_impl(
1563        self,
1564        other: LazyFrame,
1565        left_on: Vec<Expr>,
1566        right_on: Vec<Expr>,
1567        args: JoinArgs,
1568    ) -> LazyFrame {
1569        let JoinArgs {
1570            how,
1571            validation,
1572            suffix,
1573            slice,
1574            nulls_equal,
1575            coalesce,
1576            maintain_order,
1577        } = args;
1578
1579        if slice.is_some() {
1580            panic!("impl error: slice is not handled")
1581        }
1582
1583        let mut builder = self
1584            .join_builder()
1585            .with(other)
1586            .left_on(left_on)
1587            .right_on(right_on)
1588            .how(how)
1589            .validate(validation)
1590            .join_nulls(nulls_equal)
1591            .coalesce(coalesce)
1592            .maintain_order(maintain_order);
1593
1594        if let Some(suffix) = suffix {
1595            builder = builder.suffix(suffix);
1596        }
1597
1598        // Note: args.slice is set by the optimizer
1599        builder.finish()
1600    }
1601
1602    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1603    ///
1604    /// After the `JoinBuilder` has been created and set up, calling
1605    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1606    /// representing the `join` operation.
1607    pub fn join_builder(self) -> JoinBuilder {
1608        JoinBuilder::new(self)
1609    }
1610
1611    /// Add or replace a column, given as an expression, to a DataFrame.
1612    ///
1613    /// # Example
1614    ///
1615    /// ```rust
1616    /// use polars_core::prelude::*;
1617    /// use polars_lazy::prelude::*;
1618    /// fn add_column(df: DataFrame) -> LazyFrame {
1619    ///     df.lazy()
1620    ///         .with_column(
1621    ///             when(col("sepal_length").lt(lit(5.0)))
1622    ///             .then(lit(10))
1623    ///             .otherwise(lit(1))
1624    ///             .alias("new_column_name"),
1625    ///         )
1626    /// }
1627    /// ```
1628    pub fn with_column(self, expr: Expr) -> LazyFrame {
1629        let opt_state = self.get_opt_state();
1630        let lp = self
1631            .get_plan_builder()
1632            .with_columns(
1633                vec![expr],
1634                ProjectionOptions {
1635                    run_parallel: false,
1636                    duplicate_check: true,
1637                    should_broadcast: true,
1638                },
1639            )
1640            .build();
1641        Self::from_logical_plan(lp, opt_state)
1642    }
1643
1644    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1645    ///
1646    /// # Example
1647    ///
1648    /// ```rust
1649    /// use polars_core::prelude::*;
1650    /// use polars_lazy::prelude::*;
1651    /// fn add_columns(df: DataFrame) -> LazyFrame {
1652    ///     df.lazy()
1653    ///         .with_columns(
1654    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1655    ///          )
1656    /// }
1657    /// ```
1658    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1659        let exprs = exprs.as_ref().to_vec();
1660        self.with_columns_impl(
1661            exprs,
1662            ProjectionOptions {
1663                run_parallel: true,
1664                duplicate_check: true,
1665                should_broadcast: true,
1666            },
1667        )
1668    }
1669
1670    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1671    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1672        let exprs = exprs.as_ref().to_vec();
1673        self.with_columns_impl(
1674            exprs,
1675            ProjectionOptions {
1676                run_parallel: false,
1677                duplicate_check: true,
1678                should_broadcast: true,
1679            },
1680        )
1681    }
1682
1683    /// Match or evolve to a certain schema.
1684    pub fn match_to_schema(
1685        self,
1686        schema: SchemaRef,
1687        per_column: Arc<[MatchToSchemaPerColumn]>,
1688        extra_columns: ExtraColumnsPolicy,
1689    ) -> LazyFrame {
1690        let opt_state = self.get_opt_state();
1691        let lp = self
1692            .get_plan_builder()
1693            .match_to_schema(schema, per_column, extra_columns)
1694            .build();
1695        Self::from_logical_plan(lp, opt_state)
1696    }
1697
1698    pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1699        let opt_state = self.get_opt_state();
1700        let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1701        Self::from_logical_plan(lp, opt_state)
1702    }
1703
1704    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1705        let opt_state = self.get_opt_state();
1706        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1707        Self::from_logical_plan(lp, opt_state)
1708    }
1709
1710    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1711        let contexts = contexts
1712            .as_ref()
1713            .iter()
1714            .map(|lf| lf.logical_plan.clone())
1715            .collect();
1716        let opt_state = self.get_opt_state();
1717        let lp = self.get_plan_builder().with_context(contexts).build();
1718        Self::from_logical_plan(lp, opt_state)
1719    }
1720
1721    /// Aggregate all the columns as their maximum values.
1722    ///
1723    /// Aggregated columns will have the same names as the original columns.
1724    pub fn max(self) -> Self {
1725        self.map_private(DslFunction::Stats(StatsFunction::Max))
1726    }
1727
1728    /// Aggregate all the columns as their minimum values.
1729    ///
1730    /// Aggregated columns will have the same names as the original columns.
1731    pub fn min(self) -> Self {
1732        self.map_private(DslFunction::Stats(StatsFunction::Min))
1733    }
1734
1735    /// Aggregate all the columns as their sum values.
1736    ///
1737    /// Aggregated columns will have the same names as the original columns.
1738    ///
1739    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1740    /// - For integer columns, the ordinary checks for overflow are performed:
1741    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1742    ///   silently wrap.
1743    /// - String columns will sum to None.
1744    pub fn sum(self) -> Self {
1745        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1746    }
1747
1748    /// Aggregate all the columns as their mean values.
1749    ///
1750    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1751    /// - String columns will have a mean of None.
1752    pub fn mean(self) -> Self {
1753        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1754    }
1755
1756    /// Aggregate all the columns as their median values.
1757    ///
1758    /// - Boolean and integer results are converted to `f64`. However, they are still
1759    ///   susceptible to overflow before this conversion occurs.
1760    /// - String columns will sum to None.
1761    pub fn median(self) -> Self {
1762        self.map_private(DslFunction::Stats(StatsFunction::Median))
1763    }
1764
1765    /// Aggregate all the columns as their quantile values.
1766    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1767        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1768            quantile,
1769            method,
1770        }))
1771    }
1772
1773    /// Aggregate all the columns as their standard deviation values.
1774    ///
1775    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1776    /// computing the variance, where `N` is the number of rows.
1777    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1778    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1779    /// > likelihood estimate of the variance for normally distributed variables. The
1780    /// > standard deviation computed in this function is the square root of the estimated
1781    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1782    /// > standard deviation per se.
1783    ///
1784    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1785    pub fn std(self, ddof: u8) -> Self {
1786        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1787    }
1788
1789    /// Aggregate all the columns as their variance values.
1790    ///
1791    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1792    /// computing the variance, where `N` is the number of rows.
1793    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1794    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1795    /// > likelihood estimate of the variance for normally distributed variables.
1796    ///
1797    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1798    pub fn var(self, ddof: u8) -> Self {
1799        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1800    }
1801
1802    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1803    pub fn explode(self, columns: Selector) -> LazyFrame {
1804        self.explode_impl(columns, false)
1805    }
1806
1807    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1808    fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1809        let opt_state = self.get_opt_state();
1810        let lp = self
1811            .get_plan_builder()
1812            .explode(columns, allow_empty)
1813            .build();
1814        Self::from_logical_plan(lp, opt_state)
1815    }
1816
1817    /// Aggregate all the columns as the sum of their null value count.
1818    pub fn null_count(self) -> LazyFrame {
1819        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1820    }
1821
1822    /// Drop non-unique rows and maintain the order of kept rows.
1823    ///
1824    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1825    /// `None`, all columns are considered.
1826    pub fn unique_stable(
1827        self,
1828        subset: Option<Selector>,
1829        keep_strategy: UniqueKeepStrategy,
1830    ) -> LazyFrame {
1831        self.unique_stable_generic(subset, keep_strategy)
1832    }
1833
1834    pub fn unique_stable_generic(
1835        self,
1836        subset: Option<Selector>,
1837        keep_strategy: UniqueKeepStrategy,
1838    ) -> LazyFrame {
1839        let opt_state = self.get_opt_state();
1840        let options = DistinctOptionsDSL {
1841            subset,
1842            maintain_order: true,
1843            keep_strategy,
1844        };
1845        let lp = self.get_plan_builder().distinct(options).build();
1846        Self::from_logical_plan(lp, opt_state)
1847    }
1848
1849    /// Drop non-unique rows without maintaining the order of kept rows.
1850    ///
1851    /// The order of the kept rows may change; to maintain the original row order, use
1852    /// [`unique_stable`](LazyFrame::unique_stable).
1853    ///
1854    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1855    /// all columns are considered.
1856    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1857        self.unique_generic(subset, keep_strategy)
1858    }
1859
1860    pub fn unique_generic(
1861        self,
1862        subset: Option<Selector>,
1863        keep_strategy: UniqueKeepStrategy,
1864    ) -> LazyFrame {
1865        let opt_state = self.get_opt_state();
1866        let options = DistinctOptionsDSL {
1867            subset,
1868            maintain_order: false,
1869            keep_strategy,
1870        };
1871        let lp = self.get_plan_builder().distinct(options).build();
1872        Self::from_logical_plan(lp, opt_state)
1873    }
1874
1875    /// Drop rows containing one or more NaN values.
1876    ///
1877    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1878    /// floating point columns are considered.
1879    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1880        let opt_state = self.get_opt_state();
1881        let lp = self.get_plan_builder().drop_nans(subset).build();
1882        Self::from_logical_plan(lp, opt_state)
1883    }
1884
1885    /// Drop rows containing one or more None values.
1886    ///
1887    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1888    /// columns are considered.
1889    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1890        let opt_state = self.get_opt_state();
1891        let lp = self.get_plan_builder().drop_nulls(subset).build();
1892        Self::from_logical_plan(lp, opt_state)
1893    }
1894
1895    /// Slice the DataFrame using an offset (starting row) and a length.
1896    ///
1897    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1898    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1899    /// end.
1900    ///
1901    /// If `offset` and `len` are such that the slice extends beyond the end of the
1902    /// DataFrame, the portion between `offset` and the end will be returned. In this
1903    /// case, the number of rows in the returned DataFrame will be less than `len`.
1904    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1905        let opt_state = self.get_opt_state();
1906        let lp = self.get_plan_builder().slice(offset, len).build();
1907        Self::from_logical_plan(lp, opt_state)
1908    }
1909
1910    /// Get the first row.
1911    ///
1912    /// Equivalent to `self.slice(0, 1)`.
1913    pub fn first(self) -> LazyFrame {
1914        self.slice(0, 1)
1915    }
1916
1917    /// Get the last row.
1918    ///
1919    /// Equivalent to `self.slice(-1, 1)`.
1920    pub fn last(self) -> LazyFrame {
1921        self.slice(-1, 1)
1922    }
1923
1924    /// Get the last `n` rows.
1925    ///
1926    /// Equivalent to `self.slice(-(n as i64), n)`.
1927    pub fn tail(self, n: IdxSize) -> LazyFrame {
1928        let neg_tail = -(n as i64);
1929        self.slice(neg_tail, n)
1930    }
1931
1932    /// Unpivot the DataFrame from wide to long format.
1933    ///
1934    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1935    #[cfg(feature = "pivot")]
1936    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1937        let opt_state = self.get_opt_state();
1938        let lp = self.get_plan_builder().unpivot(args).build();
1939        Self::from_logical_plan(lp, opt_state)
1940    }
1941
1942    /// Limit the DataFrame to the first `n` rows.
1943    pub fn limit(self, n: IdxSize) -> LazyFrame {
1944        self.slice(0, n)
1945    }
1946
1947    /// Apply a function/closure once the logical plan get executed.
1948    ///
1949    /// The function has access to the whole materialized DataFrame at the time it is
1950    /// called.
1951    ///
1952    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1953    /// with `LazyFrame::with_column` or `with_columns`.
1954    ///
1955    /// ## Warning
1956    /// This can blow up in your face if the schema is changed due to the operation. The
1957    /// optimizer relies on a correct schema.
1958    ///
1959    /// You can toggle certain optimizations off.
1960    pub fn map<F>(
1961        self,
1962        function: F,
1963        optimizations: AllowedOptimizations,
1964        schema: Option<Arc<dyn UdfSchema>>,
1965        name: Option<&'static str>,
1966    ) -> LazyFrame
1967    where
1968        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1969    {
1970        let opt_state = self.get_opt_state();
1971        let lp = self
1972            .get_plan_builder()
1973            .map(
1974                function,
1975                optimizations,
1976                schema,
1977                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1978            )
1979            .build();
1980        Self::from_logical_plan(lp, opt_state)
1981    }
1982
1983    #[cfg(feature = "python")]
1984    pub fn map_python(
1985        self,
1986        function: polars_utils::python_function::PythonFunction,
1987        optimizations: AllowedOptimizations,
1988        schema: Option<SchemaRef>,
1989        validate_output: bool,
1990    ) -> LazyFrame {
1991        let opt_state = self.get_opt_state();
1992        let lp = self
1993            .get_plan_builder()
1994            .map_python(function, optimizations, schema, validate_output)
1995            .build();
1996        Self::from_logical_plan(lp, opt_state)
1997    }
1998
1999    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2000        let opt_state = self.get_opt_state();
2001        let lp = self.get_plan_builder().map_private(function).build();
2002        Self::from_logical_plan(lp, opt_state)
2003    }
2004
2005    /// Add a new column at index 0 that counts the rows.
2006    ///
2007    /// `name` is the name of the new column. `offset` is where to start counting from; if
2008    /// `None`, it is set to `0`.
2009    ///
2010    /// # Warning
2011    /// This can have a negative effect on query performance. This may for instance block
2012    /// predicate pushdown optimization.
2013    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2014    where
2015        S: Into<PlSmallStr>,
2016    {
2017        let name = name.into();
2018
2019        match &self.logical_plan {
2020            v @ DslPlan::Scan {
2021                scan_type,
2022                unified_scan_args,
2023                ..
2024            } if unified_scan_args.row_index.is_none()
2025                && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2026            {
2027                let DslPlan::Scan {
2028                    sources,
2029                    mut unified_scan_args,
2030                    scan_type,
2031                    cached_ir: _,
2032                } = v.clone()
2033                else {
2034                    unreachable!()
2035                };
2036
2037                unified_scan_args.row_index = Some(RowIndex {
2038                    name,
2039                    offset: offset.unwrap_or(0),
2040                });
2041
2042                DslPlan::Scan {
2043                    sources,
2044                    unified_scan_args,
2045                    scan_type,
2046                    cached_ir: Default::default(),
2047                }
2048                .into()
2049            },
2050            _ => self.map_private(DslFunction::RowIndex { name, offset }),
2051        }
2052    }
2053
2054    /// Return the number of non-null elements for each column.
2055    pub fn count(self) -> LazyFrame {
2056        self.select(vec![col(PlSmallStr::from_static("*")).count()])
2057    }
2058
2059    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2060    /// inserted as columns.
2061    #[cfg(feature = "dtype-struct")]
2062    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2063        self.map_private(DslFunction::Unnest {
2064            columns: cols,
2065            separator,
2066        })
2067    }
2068
2069    #[cfg(feature = "merge_sorted")]
2070    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2071    where
2072        S: Into<PlSmallStr>,
2073    {
2074        let key = key.into();
2075
2076        let lp = DslPlan::MergeSorted {
2077            input_left: Arc::new(self.logical_plan),
2078            input_right: Arc::new(other.logical_plan),
2079            key,
2080        };
2081        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2082    }
2083
2084    pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
2085        let lp = DslPlan::MapFunction {
2086            input: Arc::new(self.logical_plan),
2087            function: DslFunction::Hint(hint),
2088        };
2089        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2090    }
2091}
2092
2093/// Utility struct for lazy group_by operation.
2094#[derive(Clone)]
2095pub struct LazyGroupBy {
2096    pub logical_plan: DslPlan,
2097    opt_state: OptFlags,
2098    keys: Vec<Expr>,
2099    maintain_order: bool,
2100    #[cfg(feature = "dynamic_group_by")]
2101    dynamic_options: Option<DynamicGroupOptions>,
2102    #[cfg(feature = "dynamic_group_by")]
2103    rolling_options: Option<RollingGroupOptions>,
2104}
2105
2106impl From<LazyGroupBy> for LazyFrame {
2107    fn from(lgb: LazyGroupBy) -> Self {
2108        Self {
2109            logical_plan: lgb.logical_plan,
2110            opt_state: lgb.opt_state,
2111            cached_arena: Default::default(),
2112        }
2113    }
2114}
2115
2116impl LazyGroupBy {
2117    /// Group by and aggregate.
2118    ///
2119    /// Select a column with [col] and choose an aggregation.
2120    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2121    ///
2122    /// # Example
2123    ///
2124    /// ```rust
2125    /// use polars_core::prelude::*;
2126    /// use polars_lazy::prelude::*;
2127    ///
2128    /// fn example(df: DataFrame) -> LazyFrame {
2129    ///       df.lazy()
2130    ///        .group_by_stable([col("date")])
2131    ///        .agg([
2132    ///            col("rain").min().alias("min_rain"),
2133    ///            col("rain").sum().alias("sum_rain"),
2134    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2135    ///        ])
2136    /// }
2137    /// ```
2138    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2139        #[cfg(feature = "dynamic_group_by")]
2140        let lp = DslBuilder::from(self.logical_plan)
2141            .group_by(
2142                self.keys,
2143                aggs,
2144                None,
2145                self.maintain_order,
2146                self.dynamic_options,
2147                self.rolling_options,
2148            )
2149            .build();
2150
2151        #[cfg(not(feature = "dynamic_group_by"))]
2152        let lp = DslBuilder::from(self.logical_plan)
2153            .group_by(self.keys, aggs, None, self.maintain_order)
2154            .build();
2155        LazyFrame::from_logical_plan(lp, self.opt_state)
2156    }
2157
2158    /// Return first n rows of each group
2159    pub fn head(self, n: Option<usize>) -> LazyFrame {
2160        let keys = self
2161            .keys
2162            .iter()
2163            .filter_map(|expr| expr_output_name(expr).ok())
2164            .collect::<Vec<_>>();
2165
2166        self.agg([all().as_expr().head(n)])
2167            .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2168    }
2169
2170    /// Return last n rows of each group
2171    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2172        let keys = self
2173            .keys
2174            .iter()
2175            .filter_map(|expr| expr_output_name(expr).ok())
2176            .collect::<Vec<_>>();
2177
2178        self.agg([all().as_expr().tail(n)])
2179            .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2180    }
2181
2182    /// Apply a function over the groups as a new DataFrame.
2183    ///
2184    /// **It is not recommended that you use this as materializing the DataFrame is very
2185    /// expensive.**
2186    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2187        #[cfg(feature = "dynamic_group_by")]
2188        let options = GroupbyOptions {
2189            dynamic: self.dynamic_options,
2190            rolling: self.rolling_options,
2191            slice: None,
2192        };
2193
2194        #[cfg(not(feature = "dynamic_group_by"))]
2195        let options = GroupbyOptions { slice: None };
2196
2197        let lp = DslPlan::GroupBy {
2198            input: Arc::new(self.logical_plan),
2199            keys: self.keys,
2200            aggs: vec![],
2201            apply: Some((f, schema)),
2202            maintain_order: self.maintain_order,
2203            options: Arc::new(options),
2204        };
2205        LazyFrame::from_logical_plan(lp, self.opt_state)
2206    }
2207}
2208
2209#[must_use]
2210pub struct JoinBuilder {
2211    lf: LazyFrame,
2212    how: JoinType,
2213    other: Option<LazyFrame>,
2214    left_on: Vec<Expr>,
2215    right_on: Vec<Expr>,
2216    allow_parallel: bool,
2217    force_parallel: bool,
2218    suffix: Option<PlSmallStr>,
2219    validation: JoinValidation,
2220    nulls_equal: bool,
2221    coalesce: JoinCoalesce,
2222    maintain_order: MaintainOrderJoin,
2223}
2224impl JoinBuilder {
2225    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2226    pub fn new(lf: LazyFrame) -> Self {
2227        Self {
2228            lf,
2229            other: None,
2230            how: JoinType::Inner,
2231            left_on: vec![],
2232            right_on: vec![],
2233            allow_parallel: true,
2234            force_parallel: false,
2235            suffix: None,
2236            validation: Default::default(),
2237            nulls_equal: false,
2238            coalesce: Default::default(),
2239            maintain_order: Default::default(),
2240        }
2241    }
2242
2243    /// The right table in the join.
2244    pub fn with(mut self, other: LazyFrame) -> Self {
2245        self.other = Some(other);
2246        self
2247    }
2248
2249    /// Select the join type.
2250    pub fn how(mut self, how: JoinType) -> Self {
2251        self.how = how;
2252        self
2253    }
2254
2255    pub fn validate(mut self, validation: JoinValidation) -> Self {
2256        self.validation = validation;
2257        self
2258    }
2259
2260    /// The expressions you want to join both tables on.
2261    ///
2262    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2263    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2264        let on = on.as_ref().to_vec();
2265        self.left_on.clone_from(&on);
2266        self.right_on = on;
2267        self
2268    }
2269
2270    /// The expressions you want to join the left table on.
2271    ///
2272    /// The passed expressions must be valid in the left table.
2273    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2274        self.left_on = on.as_ref().to_vec();
2275        self
2276    }
2277
2278    /// The expressions you want to join the right table on.
2279    ///
2280    /// The passed expressions must be valid in the right table.
2281    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2282        self.right_on = on.as_ref().to_vec();
2283        self
2284    }
2285
2286    /// Allow parallel table evaluation.
2287    pub fn allow_parallel(mut self, allow: bool) -> Self {
2288        self.allow_parallel = allow;
2289        self
2290    }
2291
2292    /// Force parallel table evaluation.
2293    pub fn force_parallel(mut self, force: bool) -> Self {
2294        self.force_parallel = force;
2295        self
2296    }
2297
2298    /// Join on null values. By default null values will never produce matches.
2299    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2300        self.nulls_equal = nulls_equal;
2301        self
2302    }
2303
2304    /// Suffix to add duplicate column names in join.
2305    /// Defaults to `"_right"` if this method is never called.
2306    pub fn suffix<S>(mut self, suffix: S) -> Self
2307    where
2308        S: Into<PlSmallStr>,
2309    {
2310        self.suffix = Some(suffix.into());
2311        self
2312    }
2313
2314    /// Whether to coalesce join columns.
2315    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2316        self.coalesce = coalesce;
2317        self
2318    }
2319
2320    /// Whether to preserve the row order.
2321    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2322        self.maintain_order = maintain_order;
2323        self
2324    }
2325
2326    /// Finish builder
2327    pub fn finish(self) -> LazyFrame {
2328        let opt_state = self.lf.opt_state;
2329        let other = self.other.expect("'with' not set in join builder");
2330
2331        let args = JoinArgs {
2332            how: self.how,
2333            validation: self.validation,
2334            suffix: self.suffix,
2335            slice: None,
2336            nulls_equal: self.nulls_equal,
2337            coalesce: self.coalesce,
2338            maintain_order: self.maintain_order,
2339        };
2340
2341        let lp = self
2342            .lf
2343            .get_plan_builder()
2344            .join(
2345                other.logical_plan,
2346                self.left_on,
2347                self.right_on,
2348                JoinOptions {
2349                    allow_parallel: self.allow_parallel,
2350                    force_parallel: self.force_parallel,
2351                    args,
2352                }
2353                .into(),
2354            )
2355            .build();
2356        LazyFrame::from_logical_plan(lp, opt_state)
2357    }
2358
2359    // Finish with join predicates
2360    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2361        let opt_state = self.lf.opt_state;
2362        let other = self.other.expect("with not set");
2363
2364        // Decompose `And` conjunctions into their component expressions
2365        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2366            if let Expr::BinaryExpr {
2367                op: Operator::And,
2368                left,
2369                right,
2370            } = predicate
2371            {
2372                decompose_and((*left).clone(), expanded_predicates);
2373                decompose_and((*right).clone(), expanded_predicates);
2374            } else {
2375                expanded_predicates.push(predicate);
2376            }
2377        }
2378        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2379        for predicate in predicates {
2380            decompose_and(predicate, &mut expanded_predicates);
2381        }
2382        let predicates: Vec<Expr> = expanded_predicates;
2383
2384        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2385        #[cfg(feature = "is_between")]
2386        let predicates: Vec<Expr> = {
2387            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2388            for predicate in predicates {
2389                if let Expr::Function {
2390                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2391                    input,
2392                    ..
2393                } = &predicate
2394                {
2395                    if let [expr, lower, upper] = input.as_slice() {
2396                        match closed {
2397                            ClosedInterval::Both => {
2398                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2399                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2400                            },
2401                            ClosedInterval::Right => {
2402                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2403                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2404                            },
2405                            ClosedInterval::Left => {
2406                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2407                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2408                            },
2409                            ClosedInterval::None => {
2410                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2411                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2412                            },
2413                        }
2414                        continue;
2415                    }
2416                }
2417                expanded_predicates.push(predicate);
2418            }
2419            expanded_predicates
2420        };
2421
2422        let args = JoinArgs {
2423            how: self.how,
2424            validation: self.validation,
2425            suffix: self.suffix,
2426            slice: None,
2427            nulls_equal: self.nulls_equal,
2428            coalesce: self.coalesce,
2429            maintain_order: self.maintain_order,
2430        };
2431        let options = JoinOptions {
2432            allow_parallel: self.allow_parallel,
2433            force_parallel: self.force_parallel,
2434            args,
2435        };
2436
2437        let lp = DslPlan::Join {
2438            input_left: Arc::new(self.lf.logical_plan),
2439            input_right: Arc::new(other.logical_plan),
2440            left_on: Default::default(),
2441            right_on: Default::default(),
2442            predicates,
2443            options: Arc::from(options),
2444        };
2445
2446        LazyFrame::from_logical_plan(lp, opt_state)
2447    }
2448}
2449
2450pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2451    #[cfg(not(feature = "new_streaming"))]
2452    {
2453        None
2454    }
2455    #[cfg(feature = "new_streaming")]
2456    {
2457        Some(polars_stream::build_streaming_query_executor)
2458    }
2459};