polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::num::NonZeroUsize;
13use std::sync::{Arc, Mutex};
14
15pub use anonymous_scan::*;
16#[cfg(feature = "csv")]
17pub use csv::*;
18#[cfg(not(target_arch = "wasm32"))]
19pub use exitable::*;
20pub use file_list_reader::*;
21#[cfg(feature = "ipc")]
22pub use ipc::*;
23#[cfg(feature = "json")]
24pub use ndjson::*;
25#[cfg(feature = "parquet")]
26pub use parquet::*;
27use polars_compute::rolling::QuantileMethod;
28use polars_core::POOL;
29use polars_core::error::feature_gated;
30use polars_core::prelude::*;
31use polars_expr::{ExpressionConversionState, create_physical_expr};
32use polars_io::RowIndex;
33use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
34use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
35#[cfg(feature = "is_between")]
36use polars_ops::prelude::ClosedInterval;
37pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
38use polars_utils::pl_str::PlSmallStr;
39use polars_utils::plpath::PlPath;
40use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
41
42use crate::frame::cached_arenas::CachedArena;
43use crate::prelude::*;
44
45pub trait IntoLazy {
46    fn lazy(self) -> LazyFrame;
47}
48
49impl IntoLazy for DataFrame {
50    /// Convert the `DataFrame` into a `LazyFrame`
51    fn lazy(self) -> LazyFrame {
52        let lp = DslBuilder::from_existing_df(self).build();
53        LazyFrame {
54            logical_plan: lp,
55            opt_state: Default::default(),
56            cached_arena: Default::default(),
57        }
58    }
59}
60
61impl IntoLazy for LazyFrame {
62    fn lazy(self) -> LazyFrame {
63        self
64    }
65}
66
67/// Lazy abstraction over an eager `DataFrame`.
68///
69/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
70/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
71#[derive(Clone, Default)]
72#[must_use]
73pub struct LazyFrame {
74    pub logical_plan: DslPlan,
75    pub(crate) opt_state: OptFlags,
76    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
77}
78
79impl From<DslPlan> for LazyFrame {
80    fn from(plan: DslPlan) -> Self {
81        Self {
82            logical_plan: plan,
83            opt_state: OptFlags::default(),
84            cached_arena: Default::default(),
85        }
86    }
87}
88
89impl LazyFrame {
90    pub(crate) fn from_inner(
91        logical_plan: DslPlan,
92        opt_state: OptFlags,
93        cached_arena: Arc<Mutex<Option<CachedArena>>>,
94    ) -> Self {
95        Self {
96            logical_plan,
97            opt_state,
98            cached_arena,
99        }
100    }
101
102    pub(crate) fn get_plan_builder(self) -> DslBuilder {
103        DslBuilder::from(self.logical_plan)
104    }
105
106    fn get_opt_state(&self) -> OptFlags {
107        self.opt_state
108    }
109
110    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
111        LazyFrame {
112            logical_plan,
113            opt_state,
114            cached_arena: Default::default(),
115        }
116    }
117
118    /// Get current optimizations.
119    pub fn get_current_optimizations(&self) -> OptFlags {
120        self.opt_state
121    }
122
123    /// Set allowed optimizations.
124    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
125        self.opt_state = opt_state;
126        self
127    }
128
129    /// Turn off all optimizations.
130    pub fn without_optimizations(self) -> Self {
131        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
132    }
133
134    /// Toggle projection pushdown optimization.
135    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
136        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
137        self
138    }
139
140    /// Toggle cluster with columns optimization.
141    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
142        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
143        self
144    }
145
146    /// Check if operations are order dependent and unset maintaining_order if
147    /// the order would not be observed.
148    pub fn with_check_order(mut self, toggle: bool) -> Self {
149        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
150        self
151    }
152
153    /// Toggle predicate pushdown optimization.
154    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
155        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
156        self
157    }
158
159    /// Toggle type coercion optimization.
160    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
161        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
162        self
163    }
164
165    /// Toggle type check optimization.
166    pub fn with_type_check(mut self, toggle: bool) -> Self {
167        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
168        self
169    }
170
171    /// Toggle expression simplification optimization on or off.
172    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
173        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
174        self
175    }
176
177    /// Toggle common subplan elimination optimization on or off
178    #[cfg(feature = "cse")]
179    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
180        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
181        self
182    }
183
184    /// Toggle common subexpression elimination optimization on or off
185    #[cfg(feature = "cse")]
186    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
187        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
188        self
189    }
190
191    /// Toggle slice pushdown optimization.
192    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
193        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
194        self
195    }
196
197    #[cfg(feature = "new_streaming")]
198    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
199        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
200        self
201    }
202
203    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
204    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
205        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
206        self
207    }
208
209    /// Run every node eagerly. This turns off multi-node optimizations.
210    pub fn _with_eager(mut self, toggle: bool) -> Self {
211        self.opt_state.set(OptFlags::EAGER, toggle);
212        self
213    }
214
215    /// Return a String describing the naive (un-optimized) logical plan.
216    pub fn describe_plan(&self) -> PolarsResult<String> {
217        Ok(self.clone().to_alp()?.describe())
218    }
219
220    /// Return a String describing the naive (un-optimized) logical plan in tree format.
221    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
222        Ok(self.clone().to_alp()?.describe_tree_format())
223    }
224
225    /// Return a String describing the optimized logical plan.
226    ///
227    /// Returns `Err` if optimizing the logical plan fails.
228    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
229        Ok(self.clone().to_alp_optimized()?.describe())
230    }
231
232    /// Return a String describing the optimized logical plan in tree format.
233    ///
234    /// Returns `Err` if optimizing the logical plan fails.
235    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
236        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
237    }
238
239    /// Return a String describing the logical plan.
240    ///
241    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
242    /// explains the naive, un-optimized plan.
243    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
244        if optimized {
245            self.describe_optimized_plan()
246        } else {
247            self.describe_plan()
248        }
249    }
250
251    /// Add a sort operation to the logical plan.
252    ///
253    /// Sorts the LazyFrame by the column name specified using the provided options.
254    ///
255    /// # Example
256    ///
257    /// Sort DataFrame by 'sepal_width' column:
258    /// ```rust
259    /// # use polars_core::prelude::*;
260    /// # use polars_lazy::prelude::*;
261    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
262    ///     df.lazy().sort(["sepal_width"], Default::default())
263    /// }
264    /// ```
265    /// Sort by a single column with specific order:
266    /// ```
267    /// # use polars_core::prelude::*;
268    /// # use polars_lazy::prelude::*;
269    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
270    ///     df.lazy().sort(
271    ///         ["sepal_width"],
272    ///         SortMultipleOptions::new()
273    ///             .with_order_descending(descending)
274    ///     )
275    /// }
276    /// ```
277    /// Sort by multiple columns with specifying order for each column:
278    /// ```
279    /// # use polars_core::prelude::*;
280    /// # use polars_lazy::prelude::*;
281    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
282    ///     df.lazy().sort(
283    ///         ["sepal_width", "sepal_length"],
284    ///         SortMultipleOptions::new()
285    ///             .with_order_descending_multi([false, true])
286    ///     )
287    /// }
288    /// ```
289    /// See [`SortMultipleOptions`] for more options.
290    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
291        let opt_state = self.get_opt_state();
292        let lp = self
293            .get_plan_builder()
294            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
295            .build();
296        Self::from_logical_plan(lp, opt_state)
297    }
298
299    /// Add a sort operation to the logical plan.
300    ///
301    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
302    /// concrete columns before sorting.
303    ///
304    /// See [`SortMultipleOptions`] for more options.
305    ///
306    /// # Example
307    ///
308    /// ```rust
309    /// use polars_core::prelude::*;
310    /// use polars_lazy::prelude::*;
311    ///
312    /// /// Sort DataFrame by 'sepal_width' column
313    /// fn example(df: DataFrame) -> LazyFrame {
314    ///       df.lazy()
315    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
316    /// }
317    /// ```
318    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
319        self,
320        by_exprs: E,
321        sort_options: SortMultipleOptions,
322    ) -> Self {
323        let by_exprs = by_exprs.as_ref().to_vec();
324        if by_exprs.is_empty() {
325            self
326        } else {
327            let opt_state = self.get_opt_state();
328            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
329            Self::from_logical_plan(lp, opt_state)
330        }
331    }
332
333    pub fn top_k<E: AsRef<[Expr]>>(
334        self,
335        k: IdxSize,
336        by_exprs: E,
337        sort_options: SortMultipleOptions,
338    ) -> Self {
339        // this will optimize to top-k
340        self.sort_by_exprs(
341            by_exprs,
342            sort_options.with_order_reversed().with_nulls_last(true),
343        )
344        .slice(0, k)
345    }
346
347    pub fn bottom_k<E: AsRef<[Expr]>>(
348        self,
349        k: IdxSize,
350        by_exprs: E,
351        sort_options: SortMultipleOptions,
352    ) -> Self {
353        // this will optimize to bottom-k
354        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
355            .slice(0, k)
356    }
357
358    /// Reverse the `DataFrame` from top to bottom.
359    ///
360    /// Row `i` becomes row `number_of_rows - i - 1`.
361    ///
362    /// # Example
363    ///
364    /// ```rust
365    /// use polars_core::prelude::*;
366    /// use polars_lazy::prelude::*;
367    ///
368    /// fn example(df: DataFrame) -> LazyFrame {
369    ///       df.lazy()
370    ///         .reverse()
371    /// }
372    /// ```
373    pub fn reverse(self) -> Self {
374        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
375    }
376
377    /// Rename columns in the DataFrame.
378    ///
379    /// `existing` and `new` are iterables of the same length containing the old and
380    /// corresponding new column names. Renaming happens to all `existing` columns
381    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
382    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
383    /// those columns that are actually found will be renamed (others will be ignored).
384    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
385    where
386        I: IntoIterator<Item = T>,
387        J: IntoIterator<Item = S>,
388        T: AsRef<str>,
389        S: AsRef<str>,
390    {
391        let iter = existing.into_iter();
392        let cap = iter.size_hint().0;
393        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
394        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
395
396        // TODO! should this error if `existing` and `new` have different lengths?
397        // Currently, the longer of the two is truncated.
398        for (existing, new) in iter.zip(new) {
399            let existing = existing.as_ref();
400            let new = new.as_ref();
401            if new != existing {
402                existing_vec.push(existing.into());
403                new_vec.push(new.into());
404            }
405        }
406
407        self.map_private(DslFunction::Rename {
408            existing: existing_vec.into(),
409            new: new_vec.into(),
410            strict,
411        })
412    }
413
414    /// Removes columns from the DataFrame.
415    /// Note that it's better to only select the columns you need
416    /// and let the projection pushdown optimize away the unneeded columns.
417    ///
418    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
419    /// error while materializing the [`LazyFrame`].
420    pub fn drop(self, columns: Selector) -> Self {
421        let opt_state = self.get_opt_state();
422        let lp = self.get_plan_builder().drop(columns).build();
423        Self::from_logical_plan(lp, opt_state)
424    }
425
426    /// Shift the values by a given period and fill the parts that will be empty due to this operation
427    /// with `Nones`.
428    ///
429    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
430    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
431        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
432    }
433
434    /// Shift the values by a given period and fill the parts that will be empty due to this operation
435    /// with the result of the `fill_value` expression.
436    ///
437    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
438    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
439        self.select(vec![
440            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
441        ])
442    }
443
444    /// Fill None values in the DataFrame with an expression.
445    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
446        let opt_state = self.get_opt_state();
447        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
448        Self::from_logical_plan(lp, opt_state)
449    }
450
451    /// Fill NaN values in the DataFrame with an expression.
452    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
453        let opt_state = self.get_opt_state();
454        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
455        Self::from_logical_plan(lp, opt_state)
456    }
457
458    /// Caches the result into a new LazyFrame.
459    ///
460    /// This should be used to prevent computations running multiple times.
461    pub fn cache(self) -> Self {
462        let opt_state = self.get_opt_state();
463        let lp = self.get_plan_builder().cache().build();
464        Self::from_logical_plan(lp, opt_state)
465    }
466
467    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
468    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
469        let cast_cols: Vec<Expr> = dtypes
470            .into_iter()
471            .map(|(name, dt)| {
472                let name = PlSmallStr::from_str(name);
473
474                if strict {
475                    col(name).strict_cast(dt)
476                } else {
477                    col(name).cast(dt)
478                }
479            })
480            .collect();
481
482        if cast_cols.is_empty() {
483            self
484        } else {
485            self.with_columns(cast_cols)
486        }
487    }
488
489    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
490    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
491        self.with_columns(vec![if strict {
492            col(PlSmallStr::from_static("*")).strict_cast(dtype)
493        } else {
494            col(PlSmallStr::from_static("*")).cast(dtype)
495        }])
496    }
497
498    pub fn optimize(
499        self,
500        lp_arena: &mut Arena<IR>,
501        expr_arena: &mut Arena<AExpr>,
502    ) -> PolarsResult<Node> {
503        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
504    }
505
506    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
507        let (mut lp_arena, mut expr_arena) = self.get_arenas();
508        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
509
510        Ok(IRPlan::new(node, lp_arena, expr_arena))
511    }
512
513    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
514        let (mut lp_arena, mut expr_arena) = self.get_arenas();
515        let node = to_alp(
516            self.logical_plan,
517            &mut expr_arena,
518            &mut lp_arena,
519            &mut self.opt_state,
520        )?;
521        let plan = IRPlan::new(node, lp_arena, expr_arena);
522        Ok(plan)
523    }
524
525    pub(crate) fn optimize_with_scratch(
526        self,
527        lp_arena: &mut Arena<IR>,
528        expr_arena: &mut Arena<AExpr>,
529        scratch: &mut Vec<Node>,
530    ) -> PolarsResult<Node> {
531        #[allow(unused_mut)]
532        let mut opt_state = self.opt_state;
533        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
534
535        #[cfg(feature = "cse")]
536        if new_streaming {
537            // The new streaming engine can't deal with the way the common
538            // subexpression elimination adds length-incorrect with_columns.
539            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
540        }
541
542        let lp_top = optimize(
543            self.logical_plan,
544            opt_state,
545            lp_arena,
546            expr_arena,
547            scratch,
548            Some(&|expr, expr_arena, schema| {
549                let phys_expr = create_physical_expr(
550                    expr,
551                    Context::Default,
552                    expr_arena,
553                    schema,
554                    &mut ExpressionConversionState::new(true),
555                )
556                .ok()?;
557                let io_expr = phys_expr_to_io_expr(phys_expr);
558                Some(io_expr)
559            }),
560        )?;
561
562        Ok(lp_top)
563    }
564
565    fn prepare_collect_post_opt<P>(
566        mut self,
567        check_sink: bool,
568        query_start: Option<std::time::Instant>,
569        post_opt: P,
570    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
571    where
572        P: FnOnce(
573            Node,
574            &mut Arena<IR>,
575            &mut Arena<AExpr>,
576            Option<std::time::Duration>,
577        ) -> PolarsResult<()>,
578    {
579        let (mut lp_arena, mut expr_arena) = self.get_arenas();
580
581        let mut scratch = vec![];
582        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
583
584        post_opt(
585            lp_top,
586            &mut lp_arena,
587            &mut expr_arena,
588            // Post optimization callback gets the time since the
589            // query was started as its "base" timepoint.
590            query_start.map(|s| s.elapsed()),
591        )?;
592
593        // sink should be replaced
594        let no_file_sink = if check_sink {
595            !matches!(
596                lp_arena.get(lp_top),
597                IR::Sink {
598                    payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
599                    ..
600                }
601            )
602        } else {
603            true
604        };
605        let physical_plan = create_physical_plan(
606            lp_top,
607            &mut lp_arena,
608            &mut expr_arena,
609            BUILD_STREAMING_EXECUTOR,
610        )?;
611
612        let state = ExecutionState::new();
613        Ok((state, physical_plan, no_file_sink))
614    }
615
616    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
617    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
618    where
619        P: FnOnce(
620            Node,
621            &mut Arena<IR>,
622            &mut Arena<AExpr>,
623            Option<std::time::Duration>,
624        ) -> PolarsResult<()>,
625    {
626        let (mut state, mut physical_plan, _) =
627            self.prepare_collect_post_opt(false, None, post_opt)?;
628        physical_plan.execute(&mut state)
629    }
630
631    #[allow(unused_mut)]
632    fn prepare_collect(
633        self,
634        check_sink: bool,
635        query_start: Option<std::time::Instant>,
636    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
637        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
638    }
639
640    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
641    /// `engine`.
642    ///
643    /// The query is optimized prior to execution.
644    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
645        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
646            payload.clone()
647        } else {
648            self.logical_plan = DslPlan::Sink {
649                input: Arc::new(self.logical_plan),
650                payload: SinkType::Memory,
651            };
652            SinkType::Memory
653        };
654
655        // Default engine for collect is InMemory, sink_* is Streaming
656        if engine == Engine::Auto {
657            engine = match payload {
658                #[cfg(feature = "new_streaming")]
659                SinkType::Callback { .. } | SinkType::File { .. } | SinkType::Partition { .. } => {
660                    Engine::Streaming
661                },
662                _ => Engine::InMemory,
663            };
664        }
665        // Gpu uses some hacks to dispatch.
666        if engine == Engine::Gpu {
667            engine = Engine::InMemory;
668        }
669
670        #[cfg(feature = "new_streaming")]
671        {
672            if let Some(result) = self.try_new_streaming_if_requested() {
673                return result.map(|v| v.unwrap_single());
674            }
675        }
676
677        match engine {
678            Engine::Auto => unreachable!(),
679            Engine::Streaming => {
680                feature_gated!("new_streaming", self = self.with_new_streaming(true))
681            },
682            _ => {},
683        }
684        let mut alp_plan = self.clone().to_alp_optimized()?;
685
686        match engine {
687            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
688                let result = polars_stream::run_query(
689                    alp_plan.lp_top,
690                    &mut alp_plan.lp_arena,
691                    &mut alp_plan.expr_arena,
692                );
693                result.map(|v| v.unwrap_single())
694            }),
695            Engine::Gpu => {
696                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
697            },
698            Engine::InMemory => {
699                let mut physical_plan = create_physical_plan(
700                    alp_plan.lp_top,
701                    &mut alp_plan.lp_arena,
702                    &mut alp_plan.expr_arena,
703                    BUILD_STREAMING_EXECUTOR,
704                )?;
705                let mut state = ExecutionState::new();
706                physical_plan.execute(&mut state)
707            },
708        }
709    }
710
711    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
712        let sink_multiple = LazyFrame {
713            logical_plan: DslPlan::SinkMultiple { inputs: plans },
714            opt_state,
715            cached_arena: Default::default(),
716        };
717        sink_multiple.explain(true)
718    }
719
720    pub fn collect_all_with_engine(
721        plans: Vec<DslPlan>,
722        mut engine: Engine,
723        opt_state: OptFlags,
724    ) -> PolarsResult<Vec<DataFrame>> {
725        if plans.is_empty() {
726            return Ok(Vec::new());
727        }
728
729        // Default engine for collect_all is InMemory
730        if engine == Engine::Auto {
731            engine = Engine::InMemory;
732        }
733        // Gpu uses some hacks to dispatch.
734        if engine == Engine::Gpu {
735            engine = Engine::InMemory;
736        }
737
738        let mut sink_multiple = LazyFrame {
739            logical_plan: DslPlan::SinkMultiple { inputs: plans },
740            opt_state,
741            cached_arena: Default::default(),
742        };
743
744        #[cfg(feature = "new_streaming")]
745        {
746            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
747                return result.map(|v| v.unwrap_multiple());
748            }
749        }
750
751        match engine {
752            Engine::Auto => unreachable!(),
753            Engine::Streaming => {
754                feature_gated!(
755                    "new_streaming",
756                    sink_multiple = sink_multiple.with_new_streaming(true)
757                )
758            },
759            _ => {},
760        }
761        let mut alp_plan = sink_multiple.to_alp_optimized()?;
762
763        if engine == Engine::Streaming {
764            feature_gated!("new_streaming", {
765                let result = polars_stream::run_query(
766                    alp_plan.lp_top,
767                    &mut alp_plan.lp_arena,
768                    &mut alp_plan.expr_arena,
769                );
770                return result.map(|v| v.unwrap_multiple());
771            });
772        }
773
774        let IR::SinkMultiple { inputs } = alp_plan.root() else {
775            unreachable!()
776        };
777
778        let mut multiplan = create_multiple_physical_plans(
779            inputs.clone().as_slice(),
780            &mut alp_plan.lp_arena,
781            &mut alp_plan.expr_arena,
782            BUILD_STREAMING_EXECUTOR,
783        )?;
784
785        match engine {
786            Engine::Gpu => polars_bail!(
787                InvalidOperation: "collect_all is not supported for the gpu engine"
788            ),
789            Engine::InMemory => {
790                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
791                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
792                // within bounds
793                let mut state = ExecutionState::new();
794                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
795                    cache_prefiller.execute(&mut state)?;
796                }
797                let out = POOL.install(|| {
798                    multiplan
799                        .physical_plans
800                        .chunks_mut(POOL.current_num_threads() * 3)
801                        .map(|chunk| {
802                            chunk
803                                .into_par_iter()
804                                .enumerate()
805                                .map(|(idx, input)| {
806                                    let mut input = std::mem::take(input);
807                                    let mut state = state.split();
808                                    state.branch_idx += idx;
809
810                                    let df = input.execute(&mut state)?;
811                                    Ok(df)
812                                })
813                                .collect::<PolarsResult<Vec<_>>>()
814                        })
815                        .collect::<PolarsResult<Vec<_>>>()
816                });
817                Ok(out?.into_iter().flatten().collect())
818            },
819            _ => unreachable!(),
820        }
821    }
822
823    /// Execute all the lazy operations and collect them into a [`DataFrame`].
824    ///
825    /// The query is optimized prior to execution.
826    ///
827    /// # Example
828    ///
829    /// ```rust
830    /// use polars_core::prelude::*;
831    /// use polars_lazy::prelude::*;
832    ///
833    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
834    ///     df.lazy()
835    ///       .group_by([col("foo")])
836    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
837    ///       .collect()
838    /// }
839    /// ```
840    pub fn collect(self) -> PolarsResult<DataFrame> {
841        self.collect_with_engine(Engine::InMemory)
842    }
843
844    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
845    // This version does profiling of the node execution.
846    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
847    where
848        P: FnOnce(
849            Node,
850            &mut Arena<IR>,
851            &mut Arena<AExpr>,
852            Option<std::time::Duration>,
853        ) -> PolarsResult<()>,
854    {
855        let query_start = std::time::Instant::now();
856        let (mut state, mut physical_plan, _) =
857            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
858        state.time_nodes(query_start);
859        let out = physical_plan.execute(&mut state)?;
860        let timer_df = state.finish_timer()?;
861        Ok((out, timer_df))
862    }
863
864    /// Profile a LazyFrame.
865    ///
866    /// This will run the query and return a tuple
867    /// containing the materialized DataFrame and a DataFrame that contains profiling information
868    /// of each node that is executed.
869    ///
870    /// The units of the timings are microseconds.
871    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
872        self._profile_post_opt(|_, _, _, _| Ok(()))
873    }
874
875    /// Stream a query result into a parquet file. This is useful if the final result doesn't fit
876    /// into memory. This methods will return an error if the query cannot be completely done in a
877    /// streaming fashion.
878    #[cfg(feature = "parquet")]
879    pub fn sink_parquet(
880        self,
881        target: SinkTarget,
882        options: ParquetWriteOptions,
883        cloud_options: Option<polars_io::cloud::CloudOptions>,
884        sink_options: SinkOptions,
885    ) -> PolarsResult<Self> {
886        self.sink(SinkType::File(FileSinkType {
887            target,
888            sink_options,
889            file_type: FileType::Parquet(options),
890            cloud_options,
891        }))
892    }
893
894    /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
895    /// into memory. This methods will return an error if the query cannot be completely done in a
896    /// streaming fashion.
897    #[cfg(feature = "ipc")]
898    pub fn sink_ipc(
899        self,
900        target: SinkTarget,
901        options: IpcWriterOptions,
902        cloud_options: Option<polars_io::cloud::CloudOptions>,
903        sink_options: SinkOptions,
904    ) -> PolarsResult<Self> {
905        self.sink(SinkType::File(FileSinkType {
906            target,
907            sink_options,
908            file_type: FileType::Ipc(options),
909            cloud_options,
910        }))
911    }
912
913    /// Stream a query result into an csv file. This is useful if the final result doesn't fit
914    /// into memory. This methods will return an error if the query cannot be completely done in a
915    /// streaming fashion.
916    #[cfg(feature = "csv")]
917    pub fn sink_csv(
918        self,
919        target: SinkTarget,
920        options: CsvWriterOptions,
921        cloud_options: Option<polars_io::cloud::CloudOptions>,
922        sink_options: SinkOptions,
923    ) -> PolarsResult<Self> {
924        self.sink(SinkType::File(FileSinkType {
925            target,
926            sink_options,
927            file_type: FileType::Csv(options),
928            cloud_options,
929        }))
930    }
931
932    /// Stream a query result into a JSON file. This is useful if the final result doesn't fit
933    /// into memory. This methods will return an error if the query cannot be completely done in a
934    /// streaming fashion.
935    #[cfg(feature = "json")]
936    pub fn sink_json(
937        self,
938        target: SinkTarget,
939        options: JsonWriterOptions,
940        cloud_options: Option<polars_io::cloud::CloudOptions>,
941        sink_options: SinkOptions,
942    ) -> PolarsResult<Self> {
943        self.sink(SinkType::File(FileSinkType {
944            target,
945            sink_options,
946            file_type: FileType::Json(options),
947            cloud_options,
948        }))
949    }
950
951    /// Stream a query result into a parquet file in a partitioned manner. This is useful if the
952    /// final result doesn't fit into memory. This methods will return an error if the query cannot
953    /// be completely done in a streaming fashion.
954    #[cfg(feature = "parquet")]
955    #[allow(clippy::too_many_arguments)]
956    pub fn sink_parquet_partitioned(
957        self,
958        base_path: Arc<PlPath>,
959        file_path_cb: Option<PartitionTargetCallback>,
960        variant: PartitionVariant,
961        options: ParquetWriteOptions,
962        cloud_options: Option<polars_io::cloud::CloudOptions>,
963        sink_options: SinkOptions,
964        per_partition_sort_by: Option<Vec<SortColumn>>,
965        finish_callback: Option<SinkFinishCallback>,
966    ) -> PolarsResult<Self> {
967        self.sink(SinkType::Partition(PartitionSinkType {
968            base_path,
969            file_path_cb,
970            sink_options,
971            variant,
972            file_type: FileType::Parquet(options),
973            cloud_options,
974            per_partition_sort_by,
975            finish_callback,
976        }))
977    }
978
979    /// Stream a query result into an ipc/arrow file in a partitioned manner. This is useful if the
980    /// final result doesn't fit into memory. This methods will return an error if the query cannot
981    /// be completely done in a streaming fashion.
982    #[cfg(feature = "ipc")]
983    #[allow(clippy::too_many_arguments)]
984    pub fn sink_ipc_partitioned(
985        self,
986        base_path: Arc<PlPath>,
987        file_path_cb: Option<PartitionTargetCallback>,
988        variant: PartitionVariant,
989        options: IpcWriterOptions,
990        cloud_options: Option<polars_io::cloud::CloudOptions>,
991        sink_options: SinkOptions,
992        per_partition_sort_by: Option<Vec<SortColumn>>,
993        finish_callback: Option<SinkFinishCallback>,
994    ) -> PolarsResult<Self> {
995        self.sink(SinkType::Partition(PartitionSinkType {
996            base_path,
997            file_path_cb,
998            sink_options,
999            variant,
1000            file_type: FileType::Ipc(options),
1001            cloud_options,
1002            per_partition_sort_by,
1003            finish_callback,
1004        }))
1005    }
1006
1007    /// Stream a query result into an csv file in a partitioned manner. This is useful if the final
1008    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1009    /// completely done in a streaming fashion.
1010    #[cfg(feature = "csv")]
1011    #[allow(clippy::too_many_arguments)]
1012    pub fn sink_csv_partitioned(
1013        self,
1014        base_path: Arc<PlPath>,
1015        file_path_cb: Option<PartitionTargetCallback>,
1016        variant: PartitionVariant,
1017        options: CsvWriterOptions,
1018        cloud_options: Option<polars_io::cloud::CloudOptions>,
1019        sink_options: SinkOptions,
1020        per_partition_sort_by: Option<Vec<SortColumn>>,
1021        finish_callback: Option<SinkFinishCallback>,
1022    ) -> PolarsResult<Self> {
1023        self.sink(SinkType::Partition(PartitionSinkType {
1024            base_path,
1025            file_path_cb,
1026            sink_options,
1027            variant,
1028            file_type: FileType::Csv(options),
1029            cloud_options,
1030            per_partition_sort_by,
1031            finish_callback,
1032        }))
1033    }
1034
1035    /// Stream a query result into a JSON file in a partitioned manner. This is useful if the final
1036    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1037    /// completely done in a streaming fashion.
1038    #[cfg(feature = "json")]
1039    #[allow(clippy::too_many_arguments)]
1040    pub fn sink_json_partitioned(
1041        self,
1042        base_path: Arc<PlPath>,
1043        file_path_cb: Option<PartitionTargetCallback>,
1044        variant: PartitionVariant,
1045        options: JsonWriterOptions,
1046        cloud_options: Option<polars_io::cloud::CloudOptions>,
1047        sink_options: SinkOptions,
1048        per_partition_sort_by: Option<Vec<SortColumn>>,
1049        finish_callback: Option<SinkFinishCallback>,
1050    ) -> PolarsResult<Self> {
1051        self.sink(SinkType::Partition(PartitionSinkType {
1052            base_path,
1053            file_path_cb,
1054            sink_options,
1055            variant,
1056            file_type: FileType::Json(options),
1057            cloud_options,
1058            per_partition_sort_by,
1059            finish_callback,
1060        }))
1061    }
1062
1063    pub fn sink_batches(
1064        self,
1065        function: PlanCallback<DataFrame, bool>,
1066        maintain_order: bool,
1067        chunk_size: Option<NonZeroUsize>,
1068    ) -> PolarsResult<Self> {
1069        self.sink(SinkType::Callback(CallbackSinkType {
1070            function,
1071            maintain_order,
1072            chunk_size,
1073        }))
1074    }
1075
1076    #[cfg(feature = "new_streaming")]
1077    pub fn try_new_streaming_if_requested(
1078        &mut self,
1079    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1080        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1081        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1082
1083        if auto_new_streaming || force_new_streaming {
1084            // Try to run using the new streaming engine, falling back
1085            // if it fails in a todo!() error if auto_new_streaming is set.
1086            let mut new_stream_lazy = self.clone();
1087            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1088            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1089                Ok(v) => v,
1090                Err(e) => return Some(Err(e)),
1091            };
1092
1093            let f = || {
1094                polars_stream::run_query(
1095                    alp_plan.lp_top,
1096                    &mut alp_plan.lp_arena,
1097                    &mut alp_plan.expr_arena,
1098                )
1099            };
1100
1101            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1102                Ok(v) => return Some(v),
1103                Err(e) => {
1104                    // Fallback to normal engine if error is due to not being implemented
1105                    // and auto_new_streaming is set, otherwise propagate error.
1106                    if !force_new_streaming
1107                        && auto_new_streaming
1108                        && e.downcast_ref::<&str>()
1109                            .map(|s| s.starts_with("not yet implemented"))
1110                            .unwrap_or(false)
1111                    {
1112                        if polars_core::config::verbose() {
1113                            eprintln!(
1114                                "caught unimplemented error in new streaming engine, falling back to normal engine"
1115                            );
1116                        }
1117                    } else {
1118                        std::panic::resume_unwind(e);
1119                    }
1120                },
1121            }
1122        }
1123
1124        None
1125    }
1126
1127    fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1128        polars_ensure!(
1129            !matches!(self.logical_plan, DslPlan::Sink { .. }),
1130            InvalidOperation: "cannot create a sink on top of another sink"
1131        );
1132        self.logical_plan = DslPlan::Sink {
1133            input: Arc::new(self.logical_plan),
1134            payload,
1135        };
1136        Ok(self)
1137    }
1138
1139    /// Filter frame rows that match a predicate expression.
1140    ///
1141    /// The expression must yield boolean values (note that rows where the
1142    /// predicate resolves to `null` are *not* included in the resulting frame).
1143    ///
1144    /// # Example
1145    ///
1146    /// ```rust
1147    /// use polars_core::prelude::*;
1148    /// use polars_lazy::prelude::*;
1149    ///
1150    /// fn example(df: DataFrame) -> LazyFrame {
1151    ///       df.lazy()
1152    ///         .filter(col("sepal_width").is_not_null())
1153    ///         .select([col("sepal_width"), col("sepal_length")])
1154    /// }
1155    /// ```
1156    pub fn filter(self, predicate: Expr) -> Self {
1157        let opt_state = self.get_opt_state();
1158        let lp = self.get_plan_builder().filter(predicate).build();
1159        Self::from_logical_plan(lp, opt_state)
1160    }
1161
1162    /// Remove frame rows that match a predicate expression.
1163    ///
1164    /// The expression must yield boolean values (note that rows where the
1165    /// predicate resolves to `null` are *not* removed from the resulting frame).
1166    ///
1167    /// # Example
1168    ///
1169    /// ```rust
1170    /// use polars_core::prelude::*;
1171    /// use polars_lazy::prelude::*;
1172    ///
1173    /// fn example(df: DataFrame) -> LazyFrame {
1174    ///       df.lazy()
1175    ///         .remove(col("sepal_width").is_null())
1176    ///         .select([col("sepal_width"), col("sepal_length")])
1177    /// }
1178    /// ```
1179    pub fn remove(self, predicate: Expr) -> Self {
1180        self.filter(predicate.neq_missing(lit(true)))
1181    }
1182
1183    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1184    ///
1185    /// Columns can be selected with [`col`];
1186    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1187    ///
1188    /// # Example
1189    ///
1190    /// ```rust
1191    /// use polars_core::prelude::*;
1192    /// use polars_lazy::prelude::*;
1193    ///
1194    /// /// This function selects column "foo" and column "bar".
1195    /// /// Column "bar" is renamed to "ham".
1196    /// fn example(df: DataFrame) -> LazyFrame {
1197    ///       df.lazy()
1198    ///         .select([col("foo"),
1199    ///                   col("bar").alias("ham")])
1200    /// }
1201    ///
1202    /// /// This function selects all columns except "foo"
1203    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1204    ///       df.lazy()
1205    ///         .select([all().exclude_cols(["foo"]).as_expr()])
1206    /// }
1207    /// ```
1208    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1209        let exprs = exprs.as_ref().to_vec();
1210        self.select_impl(
1211            exprs,
1212            ProjectionOptions {
1213                run_parallel: true,
1214                duplicate_check: true,
1215                should_broadcast: true,
1216            },
1217        )
1218    }
1219
1220    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1221        let exprs = exprs.as_ref().to_vec();
1222        self.select_impl(
1223            exprs,
1224            ProjectionOptions {
1225                run_parallel: false,
1226                duplicate_check: true,
1227                should_broadcast: true,
1228            },
1229        )
1230    }
1231
1232    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1233        let opt_state = self.get_opt_state();
1234        let lp = self.get_plan_builder().project(exprs, options).build();
1235        Self::from_logical_plan(lp, opt_state)
1236    }
1237
1238    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1239    ///
1240    /// Takes a list of expressions to group on.
1241    ///
1242    /// # Example
1243    ///
1244    /// ```rust
1245    /// use polars_core::prelude::*;
1246    /// use polars_lazy::prelude::*;
1247    ///
1248    /// fn example(df: DataFrame) -> LazyFrame {
1249    ///       df.lazy()
1250    ///        .group_by([col("date")])
1251    ///        .agg([
1252    ///            col("rain").min().alias("min_rain"),
1253    ///            col("rain").sum().alias("sum_rain"),
1254    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1255    ///        ])
1256    /// }
1257    /// ```
1258    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1259        let keys = by
1260            .as_ref()
1261            .iter()
1262            .map(|e| e.clone().into())
1263            .collect::<Vec<_>>();
1264        let opt_state = self.get_opt_state();
1265
1266        #[cfg(feature = "dynamic_group_by")]
1267        {
1268            LazyGroupBy {
1269                logical_plan: self.logical_plan,
1270                opt_state,
1271                keys,
1272                maintain_order: false,
1273                dynamic_options: None,
1274                rolling_options: None,
1275            }
1276        }
1277
1278        #[cfg(not(feature = "dynamic_group_by"))]
1279        {
1280            LazyGroupBy {
1281                logical_plan: self.logical_plan,
1282                opt_state,
1283                keys,
1284                maintain_order: false,
1285            }
1286        }
1287    }
1288
1289    /// Create rolling groups based on a time column.
1290    ///
1291    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1292    ///
1293    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1294    /// individual values and are not of constant intervals. For constant intervals use
1295    /// *group_by_dynamic*
1296    #[cfg(feature = "dynamic_group_by")]
1297    pub fn rolling<E: AsRef<[Expr]>>(
1298        mut self,
1299        index_column: Expr,
1300        group_by: E,
1301        mut options: RollingGroupOptions,
1302    ) -> LazyGroupBy {
1303        if let Expr::Column(name) = index_column {
1304            options.index_column = name;
1305        } else {
1306            let output_field = index_column
1307                .to_field(&self.collect_schema().unwrap())
1308                .unwrap();
1309            return self.with_column(index_column).rolling(
1310                Expr::Column(output_field.name().clone()),
1311                group_by,
1312                options,
1313            );
1314        }
1315        let opt_state = self.get_opt_state();
1316        LazyGroupBy {
1317            logical_plan: self.logical_plan,
1318            opt_state,
1319            keys: group_by.as_ref().to_vec(),
1320            maintain_order: true,
1321            dynamic_options: None,
1322            rolling_options: Some(options),
1323        }
1324    }
1325
1326    /// Group based on a time value (or index value of type Int32, Int64).
1327    ///
1328    /// Time windows are calculated and rows are assigned to windows. Different from a
1329    /// normal group_by is that a row can be member of multiple groups. The time/index
1330    /// window could be seen as a rolling window, with a window size determined by
1331    /// dates/times/values instead of slots in the DataFrame.
1332    ///
1333    /// A window is defined by:
1334    ///
1335    /// - every: interval of the window
1336    /// - period: length of the window
1337    /// - offset: offset of the window
1338    ///
1339    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1340    /// with a ordinary group_by on these keys.
1341    #[cfg(feature = "dynamic_group_by")]
1342    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1343        mut self,
1344        index_column: Expr,
1345        group_by: E,
1346        mut options: DynamicGroupOptions,
1347    ) -> LazyGroupBy {
1348        if let Expr::Column(name) = index_column {
1349            options.index_column = name;
1350        } else {
1351            let output_field = index_column
1352                .to_field(&self.collect_schema().unwrap())
1353                .unwrap();
1354            return self.with_column(index_column).group_by_dynamic(
1355                Expr::Column(output_field.name().clone()),
1356                group_by,
1357                options,
1358            );
1359        }
1360        let opt_state = self.get_opt_state();
1361        LazyGroupBy {
1362            logical_plan: self.logical_plan,
1363            opt_state,
1364            keys: group_by.as_ref().to_vec(),
1365            maintain_order: true,
1366            dynamic_options: Some(options),
1367            rolling_options: None,
1368        }
1369    }
1370
1371    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1372    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1373        let keys = by
1374            .as_ref()
1375            .iter()
1376            .map(|e| e.clone().into())
1377            .collect::<Vec<_>>();
1378        let opt_state = self.get_opt_state();
1379
1380        #[cfg(feature = "dynamic_group_by")]
1381        {
1382            LazyGroupBy {
1383                logical_plan: self.logical_plan,
1384                opt_state,
1385                keys,
1386                maintain_order: true,
1387                dynamic_options: None,
1388                rolling_options: None,
1389            }
1390        }
1391
1392        #[cfg(not(feature = "dynamic_group_by"))]
1393        {
1394            LazyGroupBy {
1395                logical_plan: self.logical_plan,
1396                opt_state,
1397                keys,
1398                maintain_order: true,
1399            }
1400        }
1401    }
1402
1403    /// Left anti join this query with another lazy query.
1404    ///
1405    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1406    /// flexible join logic, see [`join`](LazyFrame::join) or
1407    /// [`join_builder`](LazyFrame::join_builder).
1408    ///
1409    /// # Example
1410    ///
1411    /// ```rust
1412    /// use polars_core::prelude::*;
1413    /// use polars_lazy::prelude::*;
1414    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1415    ///         ldf
1416    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1417    /// }
1418    /// ```
1419    #[cfg(feature = "semi_anti_join")]
1420    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1421        self.join(
1422            other,
1423            [left_on.into()],
1424            [right_on.into()],
1425            JoinArgs::new(JoinType::Anti),
1426        )
1427    }
1428
1429    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1430    #[cfg(feature = "cross_join")]
1431    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1432        self.join(
1433            other,
1434            vec![],
1435            vec![],
1436            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1437        )
1438    }
1439
1440    /// Left outer join this query with another lazy query.
1441    ///
1442    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1443    /// flexible join logic, see [`join`](LazyFrame::join) or
1444    /// [`join_builder`](LazyFrame::join_builder).
1445    ///
1446    /// # Example
1447    ///
1448    /// ```rust
1449    /// use polars_core::prelude::*;
1450    /// use polars_lazy::prelude::*;
1451    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1452    ///         ldf
1453    ///         .left_join(other, col("foo"), col("bar"))
1454    /// }
1455    /// ```
1456    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1457        self.join(
1458            other,
1459            [left_on.into()],
1460            [right_on.into()],
1461            JoinArgs::new(JoinType::Left),
1462        )
1463    }
1464
1465    /// Inner join this query with another lazy query.
1466    ///
1467    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1468    /// flexible join logic, see [`join`](LazyFrame::join) or
1469    /// [`join_builder`](LazyFrame::join_builder).
1470    ///
1471    /// # Example
1472    ///
1473    /// ```rust
1474    /// use polars_core::prelude::*;
1475    /// use polars_lazy::prelude::*;
1476    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1477    ///         ldf
1478    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1479    /// }
1480    /// ```
1481    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1482        self.join(
1483            other,
1484            [left_on.into()],
1485            [right_on.into()],
1486            JoinArgs::new(JoinType::Inner),
1487        )
1488    }
1489
1490    /// Full outer join this query with another lazy query.
1491    ///
1492    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1493    /// flexible join logic, see [`join`](LazyFrame::join) or
1494    /// [`join_builder`](LazyFrame::join_builder).
1495    ///
1496    /// # Example
1497    ///
1498    /// ```rust
1499    /// use polars_core::prelude::*;
1500    /// use polars_lazy::prelude::*;
1501    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1502    ///         ldf
1503    ///         .full_join(other, col("foo"), col("bar"))
1504    /// }
1505    /// ```
1506    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1507        self.join(
1508            other,
1509            [left_on.into()],
1510            [right_on.into()],
1511            JoinArgs::new(JoinType::Full),
1512        )
1513    }
1514
1515    /// Left semi join this query with another lazy query.
1516    ///
1517    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1518    /// flexible join logic, see [`join`](LazyFrame::join) or
1519    /// [`join_builder`](LazyFrame::join_builder).
1520    ///
1521    /// # Example
1522    ///
1523    /// ```rust
1524    /// use polars_core::prelude::*;
1525    /// use polars_lazy::prelude::*;
1526    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1527    ///         ldf
1528    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1529    /// }
1530    /// ```
1531    #[cfg(feature = "semi_anti_join")]
1532    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1533        self.join(
1534            other,
1535            [left_on.into()],
1536            [right_on.into()],
1537            JoinArgs::new(JoinType::Semi),
1538        )
1539    }
1540
1541    /// Generic function to join two LazyFrames.
1542    ///
1543    /// `join` can join on multiple columns, given as two list of expressions, and with a
1544    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1545    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1546    /// over how columns are renamed and parallelization options, use
1547    /// [`join_builder`](LazyFrame::join_builder).
1548    ///
1549    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1550    ///
1551    /// # Example
1552    ///
1553    /// ```rust
1554    /// use polars_core::prelude::*;
1555    /// use polars_lazy::prelude::*;
1556    ///
1557    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1558    ///         ldf
1559    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1560    /// }
1561    /// ```
1562    pub fn join<E: AsRef<[Expr]>>(
1563        self,
1564        other: LazyFrame,
1565        left_on: E,
1566        right_on: E,
1567        args: JoinArgs,
1568    ) -> LazyFrame {
1569        let left_on = left_on.as_ref().to_vec();
1570        let right_on = right_on.as_ref().to_vec();
1571
1572        self._join_impl(other, left_on, right_on, args)
1573    }
1574
1575    fn _join_impl(
1576        self,
1577        other: LazyFrame,
1578        left_on: Vec<Expr>,
1579        right_on: Vec<Expr>,
1580        args: JoinArgs,
1581    ) -> LazyFrame {
1582        let JoinArgs {
1583            how,
1584            validation,
1585            suffix,
1586            slice,
1587            nulls_equal,
1588            coalesce,
1589            maintain_order,
1590        } = args;
1591
1592        if slice.is_some() {
1593            panic!("impl error: slice is not handled")
1594        }
1595
1596        let mut builder = self
1597            .join_builder()
1598            .with(other)
1599            .left_on(left_on)
1600            .right_on(right_on)
1601            .how(how)
1602            .validate(validation)
1603            .join_nulls(nulls_equal)
1604            .coalesce(coalesce)
1605            .maintain_order(maintain_order);
1606
1607        if let Some(suffix) = suffix {
1608            builder = builder.suffix(suffix);
1609        }
1610
1611        // Note: args.slice is set by the optimizer
1612        builder.finish()
1613    }
1614
1615    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1616    ///
1617    /// After the `JoinBuilder` has been created and set up, calling
1618    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1619    /// representing the `join` operation.
1620    pub fn join_builder(self) -> JoinBuilder {
1621        JoinBuilder::new(self)
1622    }
1623
1624    /// Add or replace a column, given as an expression, to a DataFrame.
1625    ///
1626    /// # Example
1627    ///
1628    /// ```rust
1629    /// use polars_core::prelude::*;
1630    /// use polars_lazy::prelude::*;
1631    /// fn add_column(df: DataFrame) -> LazyFrame {
1632    ///     df.lazy()
1633    ///         .with_column(
1634    ///             when(col("sepal_length").lt(lit(5.0)))
1635    ///             .then(lit(10))
1636    ///             .otherwise(lit(1))
1637    ///             .alias("new_column_name"),
1638    ///         )
1639    /// }
1640    /// ```
1641    pub fn with_column(self, expr: Expr) -> LazyFrame {
1642        let opt_state = self.get_opt_state();
1643        let lp = self
1644            .get_plan_builder()
1645            .with_columns(
1646                vec![expr],
1647                ProjectionOptions {
1648                    run_parallel: false,
1649                    duplicate_check: true,
1650                    should_broadcast: true,
1651                },
1652            )
1653            .build();
1654        Self::from_logical_plan(lp, opt_state)
1655    }
1656
1657    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1658    ///
1659    /// # Example
1660    ///
1661    /// ```rust
1662    /// use polars_core::prelude::*;
1663    /// use polars_lazy::prelude::*;
1664    /// fn add_columns(df: DataFrame) -> LazyFrame {
1665    ///     df.lazy()
1666    ///         .with_columns(
1667    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1668    ///          )
1669    /// }
1670    /// ```
1671    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1672        let exprs = exprs.as_ref().to_vec();
1673        self.with_columns_impl(
1674            exprs,
1675            ProjectionOptions {
1676                run_parallel: true,
1677                duplicate_check: true,
1678                should_broadcast: true,
1679            },
1680        )
1681    }
1682
1683    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1684    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1685        let exprs = exprs.as_ref().to_vec();
1686        self.with_columns_impl(
1687            exprs,
1688            ProjectionOptions {
1689                run_parallel: false,
1690                duplicate_check: true,
1691                should_broadcast: true,
1692            },
1693        )
1694    }
1695
1696    /// Match or evolve to a certain schema.
1697    pub fn match_to_schema(
1698        self,
1699        schema: SchemaRef,
1700        per_column: Arc<[MatchToSchemaPerColumn]>,
1701        extra_columns: ExtraColumnsPolicy,
1702    ) -> LazyFrame {
1703        let opt_state = self.get_opt_state();
1704        let lp = self
1705            .get_plan_builder()
1706            .match_to_schema(schema, per_column, extra_columns)
1707            .build();
1708        Self::from_logical_plan(lp, opt_state)
1709    }
1710
1711    pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1712        let opt_state = self.get_opt_state();
1713        let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1714        Self::from_logical_plan(lp, opt_state)
1715    }
1716
1717    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1718        let opt_state = self.get_opt_state();
1719        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1720        Self::from_logical_plan(lp, opt_state)
1721    }
1722
1723    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1724        let contexts = contexts
1725            .as_ref()
1726            .iter()
1727            .map(|lf| lf.logical_plan.clone())
1728            .collect();
1729        let opt_state = self.get_opt_state();
1730        let lp = self.get_plan_builder().with_context(contexts).build();
1731        Self::from_logical_plan(lp, opt_state)
1732    }
1733
1734    /// Aggregate all the columns as their maximum values.
1735    ///
1736    /// Aggregated columns will have the same names as the original columns.
1737    pub fn max(self) -> Self {
1738        self.map_private(DslFunction::Stats(StatsFunction::Max))
1739    }
1740
1741    /// Aggregate all the columns as their minimum values.
1742    ///
1743    /// Aggregated columns will have the same names as the original columns.
1744    pub fn min(self) -> Self {
1745        self.map_private(DslFunction::Stats(StatsFunction::Min))
1746    }
1747
1748    /// Aggregate all the columns as their sum values.
1749    ///
1750    /// Aggregated columns will have the same names as the original columns.
1751    ///
1752    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1753    /// - For integer columns, the ordinary checks for overflow are performed:
1754    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1755    ///   silently wrap.
1756    /// - String columns will sum to None.
1757    pub fn sum(self) -> Self {
1758        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1759    }
1760
1761    /// Aggregate all the columns as their mean values.
1762    ///
1763    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1764    /// - String columns will have a mean of None.
1765    pub fn mean(self) -> Self {
1766        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1767    }
1768
1769    /// Aggregate all the columns as their median values.
1770    ///
1771    /// - Boolean and integer results are converted to `f64`. However, they are still
1772    ///   susceptible to overflow before this conversion occurs.
1773    /// - String columns will sum to None.
1774    pub fn median(self) -> Self {
1775        self.map_private(DslFunction::Stats(StatsFunction::Median))
1776    }
1777
1778    /// Aggregate all the columns as their quantile values.
1779    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1780        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1781            quantile,
1782            method,
1783        }))
1784    }
1785
1786    /// Aggregate all the columns as their standard deviation values.
1787    ///
1788    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1789    /// computing the variance, where `N` is the number of rows.
1790    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1791    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1792    /// > likelihood estimate of the variance for normally distributed variables. The
1793    /// > standard deviation computed in this function is the square root of the estimated
1794    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1795    /// > standard deviation per se.
1796    ///
1797    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1798    pub fn std(self, ddof: u8) -> Self {
1799        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1800    }
1801
1802    /// Aggregate all the columns as their variance values.
1803    ///
1804    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1805    /// computing the variance, where `N` is the number of rows.
1806    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1807    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1808    /// > likelihood estimate of the variance for normally distributed variables.
1809    ///
1810    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1811    pub fn var(self, ddof: u8) -> Self {
1812        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1813    }
1814
1815    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1816    pub fn explode(self, columns: Selector) -> LazyFrame {
1817        self.explode_impl(columns, false)
1818    }
1819
1820    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1821    fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1822        let opt_state = self.get_opt_state();
1823        let lp = self
1824            .get_plan_builder()
1825            .explode(columns, allow_empty)
1826            .build();
1827        Self::from_logical_plan(lp, opt_state)
1828    }
1829
1830    /// Aggregate all the columns as the sum of their null value count.
1831    pub fn null_count(self) -> LazyFrame {
1832        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1833    }
1834
1835    /// Drop non-unique rows and maintain the order of kept rows.
1836    ///
1837    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1838    /// `None`, all columns are considered.
1839    pub fn unique_stable(
1840        self,
1841        subset: Option<Selector>,
1842        keep_strategy: UniqueKeepStrategy,
1843    ) -> LazyFrame {
1844        self.unique_stable_generic(subset, keep_strategy)
1845    }
1846
1847    pub fn unique_stable_generic(
1848        self,
1849        subset: Option<Selector>,
1850        keep_strategy: UniqueKeepStrategy,
1851    ) -> LazyFrame {
1852        let opt_state = self.get_opt_state();
1853        let options = DistinctOptionsDSL {
1854            subset,
1855            maintain_order: true,
1856            keep_strategy,
1857        };
1858        let lp = self.get_plan_builder().distinct(options).build();
1859        Self::from_logical_plan(lp, opt_state)
1860    }
1861
1862    /// Drop non-unique rows without maintaining the order of kept rows.
1863    ///
1864    /// The order of the kept rows may change; to maintain the original row order, use
1865    /// [`unique_stable`](LazyFrame::unique_stable).
1866    ///
1867    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1868    /// all columns are considered.
1869    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1870        self.unique_generic(subset, keep_strategy)
1871    }
1872
1873    pub fn unique_generic(
1874        self,
1875        subset: Option<Selector>,
1876        keep_strategy: UniqueKeepStrategy,
1877    ) -> LazyFrame {
1878        let opt_state = self.get_opt_state();
1879        let options = DistinctOptionsDSL {
1880            subset,
1881            maintain_order: false,
1882            keep_strategy,
1883        };
1884        let lp = self.get_plan_builder().distinct(options).build();
1885        Self::from_logical_plan(lp, opt_state)
1886    }
1887
1888    /// Drop rows containing one or more NaN values.
1889    ///
1890    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1891    /// floating point columns are considered.
1892    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1893        let opt_state = self.get_opt_state();
1894        let lp = self.get_plan_builder().drop_nans(subset).build();
1895        Self::from_logical_plan(lp, opt_state)
1896    }
1897
1898    /// Drop rows containing one or more None values.
1899    ///
1900    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1901    /// columns are considered.
1902    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1903        let opt_state = self.get_opt_state();
1904        let lp = self.get_plan_builder().drop_nulls(subset).build();
1905        Self::from_logical_plan(lp, opt_state)
1906    }
1907
1908    /// Slice the DataFrame using an offset (starting row) and a length.
1909    ///
1910    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1911    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1912    /// end.
1913    ///
1914    /// If `offset` and `len` are such that the slice extends beyond the end of the
1915    /// DataFrame, the portion between `offset` and the end will be returned. In this
1916    /// case, the number of rows in the returned DataFrame will be less than `len`.
1917    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1918        let opt_state = self.get_opt_state();
1919        let lp = self.get_plan_builder().slice(offset, len).build();
1920        Self::from_logical_plan(lp, opt_state)
1921    }
1922
1923    /// Get the first row.
1924    ///
1925    /// Equivalent to `self.slice(0, 1)`.
1926    pub fn first(self) -> LazyFrame {
1927        self.slice(0, 1)
1928    }
1929
1930    /// Get the last row.
1931    ///
1932    /// Equivalent to `self.slice(-1, 1)`.
1933    pub fn last(self) -> LazyFrame {
1934        self.slice(-1, 1)
1935    }
1936
1937    /// Get the last `n` rows.
1938    ///
1939    /// Equivalent to `self.slice(-(n as i64), n)`.
1940    pub fn tail(self, n: IdxSize) -> LazyFrame {
1941        let neg_tail = -(n as i64);
1942        self.slice(neg_tail, n)
1943    }
1944
1945    /// Unpivot the DataFrame from wide to long format.
1946    ///
1947    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1948    #[cfg(feature = "pivot")]
1949    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1950        let opt_state = self.get_opt_state();
1951        let lp = self.get_plan_builder().unpivot(args).build();
1952        Self::from_logical_plan(lp, opt_state)
1953    }
1954
1955    /// Limit the DataFrame to the first `n` rows.
1956    pub fn limit(self, n: IdxSize) -> LazyFrame {
1957        self.slice(0, n)
1958    }
1959
1960    /// Apply a function/closure once the logical plan get executed.
1961    ///
1962    /// The function has access to the whole materialized DataFrame at the time it is
1963    /// called.
1964    ///
1965    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1966    /// with `LazyFrame::with_column` or `with_columns`.
1967    ///
1968    /// ## Warning
1969    /// This can blow up in your face if the schema is changed due to the operation. The
1970    /// optimizer relies on a correct schema.
1971    ///
1972    /// You can toggle certain optimizations off.
1973    pub fn map<F>(
1974        self,
1975        function: F,
1976        optimizations: AllowedOptimizations,
1977        schema: Option<Arc<dyn UdfSchema>>,
1978        name: Option<&'static str>,
1979    ) -> LazyFrame
1980    where
1981        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1982    {
1983        let opt_state = self.get_opt_state();
1984        let lp = self
1985            .get_plan_builder()
1986            .map(
1987                function,
1988                optimizations,
1989                schema,
1990                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1991            )
1992            .build();
1993        Self::from_logical_plan(lp, opt_state)
1994    }
1995
1996    #[cfg(feature = "python")]
1997    pub fn map_python(
1998        self,
1999        function: polars_utils::python_function::PythonFunction,
2000        optimizations: AllowedOptimizations,
2001        schema: Option<SchemaRef>,
2002        validate_output: bool,
2003    ) -> LazyFrame {
2004        let opt_state = self.get_opt_state();
2005        let lp = self
2006            .get_plan_builder()
2007            .map_python(function, optimizations, schema, validate_output)
2008            .build();
2009        Self::from_logical_plan(lp, opt_state)
2010    }
2011
2012    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2013        let opt_state = self.get_opt_state();
2014        let lp = self.get_plan_builder().map_private(function).build();
2015        Self::from_logical_plan(lp, opt_state)
2016    }
2017
2018    /// Add a new column at index 0 that counts the rows.
2019    ///
2020    /// `name` is the name of the new column. `offset` is where to start counting from; if
2021    /// `None`, it is set to `0`.
2022    ///
2023    /// # Warning
2024    /// This can have a negative effect on query performance. This may for instance block
2025    /// predicate pushdown optimization.
2026    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2027    where
2028        S: Into<PlSmallStr>,
2029    {
2030        let name = name.into();
2031
2032        match &self.logical_plan {
2033            v @ DslPlan::Scan { scan_type, .. }
2034                if !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2035            {
2036                let DslPlan::Scan {
2037                    sources,
2038                    mut unified_scan_args,
2039                    scan_type,
2040                    cached_ir: _,
2041                } = v.clone()
2042                else {
2043                    unreachable!()
2044                };
2045
2046                unified_scan_args.row_index = Some(RowIndex {
2047                    name,
2048                    offset: offset.unwrap_or(0),
2049                });
2050
2051                DslPlan::Scan {
2052                    sources,
2053                    unified_scan_args,
2054                    scan_type,
2055                    cached_ir: Default::default(),
2056                }
2057                .into()
2058            },
2059            _ => self.map_private(DslFunction::RowIndex { name, offset }),
2060        }
2061    }
2062
2063    /// Return the number of non-null elements for each column.
2064    pub fn count(self) -> LazyFrame {
2065        self.select(vec![col(PlSmallStr::from_static("*")).count()])
2066    }
2067
2068    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2069    /// inserted as columns.
2070    #[cfg(feature = "dtype-struct")]
2071    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2072        self.map_private(DslFunction::Unnest {
2073            columns: cols,
2074            separator,
2075        })
2076    }
2077
2078    #[cfg(feature = "merge_sorted")]
2079    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2080    where
2081        S: Into<PlSmallStr>,
2082    {
2083        let key = key.into();
2084
2085        let lp = DslPlan::MergeSorted {
2086            input_left: Arc::new(self.logical_plan),
2087            input_right: Arc::new(other.logical_plan),
2088            key,
2089        };
2090        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2091    }
2092}
2093
2094/// Utility struct for lazy group_by operation.
2095#[derive(Clone)]
2096pub struct LazyGroupBy {
2097    pub logical_plan: DslPlan,
2098    opt_state: OptFlags,
2099    keys: Vec<Expr>,
2100    maintain_order: bool,
2101    #[cfg(feature = "dynamic_group_by")]
2102    dynamic_options: Option<DynamicGroupOptions>,
2103    #[cfg(feature = "dynamic_group_by")]
2104    rolling_options: Option<RollingGroupOptions>,
2105}
2106
2107impl From<LazyGroupBy> for LazyFrame {
2108    fn from(lgb: LazyGroupBy) -> Self {
2109        Self {
2110            logical_plan: lgb.logical_plan,
2111            opt_state: lgb.opt_state,
2112            cached_arena: Default::default(),
2113        }
2114    }
2115}
2116
2117impl LazyGroupBy {
2118    /// Group by and aggregate.
2119    ///
2120    /// Select a column with [col] and choose an aggregation.
2121    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2122    ///
2123    /// # Example
2124    ///
2125    /// ```rust
2126    /// use polars_core::prelude::*;
2127    /// use polars_lazy::prelude::*;
2128    ///
2129    /// fn example(df: DataFrame) -> LazyFrame {
2130    ///       df.lazy()
2131    ///        .group_by_stable([col("date")])
2132    ///        .agg([
2133    ///            col("rain").min().alias("min_rain"),
2134    ///            col("rain").sum().alias("sum_rain"),
2135    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2136    ///        ])
2137    /// }
2138    /// ```
2139    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2140        #[cfg(feature = "dynamic_group_by")]
2141        let lp = DslBuilder::from(self.logical_plan)
2142            .group_by(
2143                self.keys,
2144                aggs,
2145                None,
2146                self.maintain_order,
2147                self.dynamic_options,
2148                self.rolling_options,
2149            )
2150            .build();
2151
2152        #[cfg(not(feature = "dynamic_group_by"))]
2153        let lp = DslBuilder::from(self.logical_plan)
2154            .group_by(self.keys, aggs, None, self.maintain_order)
2155            .build();
2156        LazyFrame::from_logical_plan(lp, self.opt_state)
2157    }
2158
2159    /// Return first n rows of each group
2160    pub fn head(self, n: Option<usize>) -> LazyFrame {
2161        let keys = self
2162            .keys
2163            .iter()
2164            .filter_map(|expr| expr_output_name(expr).ok())
2165            .collect::<Vec<_>>();
2166
2167        self.agg([all().as_expr().head(n)])
2168            .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2169    }
2170
2171    /// Return last n rows of each group
2172    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2173        let keys = self
2174            .keys
2175            .iter()
2176            .filter_map(|expr| expr_output_name(expr).ok())
2177            .collect::<Vec<_>>();
2178
2179        self.agg([all().as_expr().tail(n)])
2180            .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2181    }
2182
2183    /// Apply a function over the groups as a new DataFrame.
2184    ///
2185    /// **It is not recommended that you use this as materializing the DataFrame is very
2186    /// expensive.**
2187    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2188        #[cfg(feature = "dynamic_group_by")]
2189        let options = GroupbyOptions {
2190            dynamic: self.dynamic_options,
2191            rolling: self.rolling_options,
2192            slice: None,
2193        };
2194
2195        #[cfg(not(feature = "dynamic_group_by"))]
2196        let options = GroupbyOptions { slice: None };
2197
2198        let lp = DslPlan::GroupBy {
2199            input: Arc::new(self.logical_plan),
2200            keys: self.keys,
2201            aggs: vec![],
2202            apply: Some((f, schema)),
2203            maintain_order: self.maintain_order,
2204            options: Arc::new(options),
2205        };
2206        LazyFrame::from_logical_plan(lp, self.opt_state)
2207    }
2208}
2209
2210#[must_use]
2211pub struct JoinBuilder {
2212    lf: LazyFrame,
2213    how: JoinType,
2214    other: Option<LazyFrame>,
2215    left_on: Vec<Expr>,
2216    right_on: Vec<Expr>,
2217    allow_parallel: bool,
2218    force_parallel: bool,
2219    suffix: Option<PlSmallStr>,
2220    validation: JoinValidation,
2221    nulls_equal: bool,
2222    coalesce: JoinCoalesce,
2223    maintain_order: MaintainOrderJoin,
2224}
2225impl JoinBuilder {
2226    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2227    pub fn new(lf: LazyFrame) -> Self {
2228        Self {
2229            lf,
2230            other: None,
2231            how: JoinType::Inner,
2232            left_on: vec![],
2233            right_on: vec![],
2234            allow_parallel: true,
2235            force_parallel: false,
2236            suffix: None,
2237            validation: Default::default(),
2238            nulls_equal: false,
2239            coalesce: Default::default(),
2240            maintain_order: Default::default(),
2241        }
2242    }
2243
2244    /// The right table in the join.
2245    pub fn with(mut self, other: LazyFrame) -> Self {
2246        self.other = Some(other);
2247        self
2248    }
2249
2250    /// Select the join type.
2251    pub fn how(mut self, how: JoinType) -> Self {
2252        self.how = how;
2253        self
2254    }
2255
2256    pub fn validate(mut self, validation: JoinValidation) -> Self {
2257        self.validation = validation;
2258        self
2259    }
2260
2261    /// The expressions you want to join both tables on.
2262    ///
2263    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2264    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2265        let on = on.as_ref().to_vec();
2266        self.left_on.clone_from(&on);
2267        self.right_on = on;
2268        self
2269    }
2270
2271    /// The expressions you want to join the left table on.
2272    ///
2273    /// The passed expressions must be valid in the left table.
2274    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2275        self.left_on = on.as_ref().to_vec();
2276        self
2277    }
2278
2279    /// The expressions you want to join the right table on.
2280    ///
2281    /// The passed expressions must be valid in the right table.
2282    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2283        self.right_on = on.as_ref().to_vec();
2284        self
2285    }
2286
2287    /// Allow parallel table evaluation.
2288    pub fn allow_parallel(mut self, allow: bool) -> Self {
2289        self.allow_parallel = allow;
2290        self
2291    }
2292
2293    /// Force parallel table evaluation.
2294    pub fn force_parallel(mut self, force: bool) -> Self {
2295        self.force_parallel = force;
2296        self
2297    }
2298
2299    /// Join on null values. By default null values will never produce matches.
2300    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2301        self.nulls_equal = nulls_equal;
2302        self
2303    }
2304
2305    /// Suffix to add duplicate column names in join.
2306    /// Defaults to `"_right"` if this method is never called.
2307    pub fn suffix<S>(mut self, suffix: S) -> Self
2308    where
2309        S: Into<PlSmallStr>,
2310    {
2311        self.suffix = Some(suffix.into());
2312        self
2313    }
2314
2315    /// Whether to coalesce join columns.
2316    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2317        self.coalesce = coalesce;
2318        self
2319    }
2320
2321    /// Whether to preserve the row order.
2322    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2323        self.maintain_order = maintain_order;
2324        self
2325    }
2326
2327    /// Finish builder
2328    pub fn finish(self) -> LazyFrame {
2329        let opt_state = self.lf.opt_state;
2330        let other = self.other.expect("'with' not set in join builder");
2331
2332        let args = JoinArgs {
2333            how: self.how,
2334            validation: self.validation,
2335            suffix: self.suffix,
2336            slice: None,
2337            nulls_equal: self.nulls_equal,
2338            coalesce: self.coalesce,
2339            maintain_order: self.maintain_order,
2340        };
2341
2342        let lp = self
2343            .lf
2344            .get_plan_builder()
2345            .join(
2346                other.logical_plan,
2347                self.left_on,
2348                self.right_on,
2349                JoinOptions {
2350                    allow_parallel: self.allow_parallel,
2351                    force_parallel: self.force_parallel,
2352                    args,
2353                }
2354                .into(),
2355            )
2356            .build();
2357        LazyFrame::from_logical_plan(lp, opt_state)
2358    }
2359
2360    // Finish with join predicates
2361    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2362        let opt_state = self.lf.opt_state;
2363        let other = self.other.expect("with not set");
2364
2365        // Decompose `And` conjunctions into their component expressions
2366        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2367            if let Expr::BinaryExpr {
2368                op: Operator::And,
2369                left,
2370                right,
2371            } = predicate
2372            {
2373                decompose_and((*left).clone(), expanded_predicates);
2374                decompose_and((*right).clone(), expanded_predicates);
2375            } else {
2376                expanded_predicates.push(predicate);
2377            }
2378        }
2379        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2380        for predicate in predicates {
2381            decompose_and(predicate, &mut expanded_predicates);
2382        }
2383        let predicates: Vec<Expr> = expanded_predicates;
2384
2385        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2386        #[cfg(feature = "is_between")]
2387        let predicates: Vec<Expr> = {
2388            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2389            for predicate in predicates {
2390                if let Expr::Function {
2391                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2392                    input,
2393                    ..
2394                } = &predicate
2395                {
2396                    if let [expr, lower, upper] = input.as_slice() {
2397                        match closed {
2398                            ClosedInterval::Both => {
2399                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2400                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2401                            },
2402                            ClosedInterval::Right => {
2403                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2404                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2405                            },
2406                            ClosedInterval::Left => {
2407                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2408                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2409                            },
2410                            ClosedInterval::None => {
2411                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2412                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2413                            },
2414                        }
2415                        continue;
2416                    }
2417                }
2418                expanded_predicates.push(predicate);
2419            }
2420            expanded_predicates
2421        };
2422
2423        let args = JoinArgs {
2424            how: self.how,
2425            validation: self.validation,
2426            suffix: self.suffix,
2427            slice: None,
2428            nulls_equal: self.nulls_equal,
2429            coalesce: self.coalesce,
2430            maintain_order: self.maintain_order,
2431        };
2432        let options = JoinOptions {
2433            allow_parallel: self.allow_parallel,
2434            force_parallel: self.force_parallel,
2435            args,
2436        };
2437
2438        let lp = DslPlan::Join {
2439            input_left: Arc::new(self.lf.logical_plan),
2440            input_right: Arc::new(other.logical_plan),
2441            left_on: Default::default(),
2442            right_on: Default::default(),
2443            predicates,
2444            options: Arc::from(options),
2445        };
2446
2447        LazyFrame::from_logical_plan(lp, opt_state)
2448    }
2449}
2450
2451pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2452    #[cfg(not(feature = "new_streaming"))]
2453    {
2454        None
2455    }
2456    #[cfg(feature = "new_streaming")]
2457    {
2458        Some(streaming_dispatch::build_streaming_query_executor)
2459    }
2460};
2461#[cfg(feature = "new_streaming")]
2462pub use streaming_dispatch::build_streaming_query_executor;
2463
2464#[cfg(feature = "new_streaming")]
2465mod streaming_dispatch {
2466    use std::sync::{Arc, Mutex};
2467
2468    use polars_core::POOL;
2469    use polars_core::error::PolarsResult;
2470    use polars_core::frame::DataFrame;
2471    use polars_expr::state::ExecutionState;
2472    use polars_mem_engine::Executor;
2473    use polars_plan::dsl::SinkTypeIR;
2474    use polars_plan::plans::{AExpr, IR};
2475    use polars_utils::arena::{Arena, Node};
2476
2477    pub fn build_streaming_query_executor(
2478        node: Node,
2479        ir_arena: &mut Arena<IR>,
2480        expr_arena: &mut Arena<AExpr>,
2481    ) -> PolarsResult<Box<dyn Executor>> {
2482        let rechunk = match ir_arena.get(node) {
2483            IR::Scan {
2484                unified_scan_args, ..
2485            } => unified_scan_args.rechunk,
2486            _ => false,
2487        };
2488
2489        let node = match ir_arena.get(node) {
2490            IR::SinkMultiple { .. } => panic!("SinkMultiple not supported"),
2491            IR::Sink { .. } => node,
2492            _ => ir_arena.add(IR::Sink {
2493                input: node,
2494                payload: SinkTypeIR::Memory,
2495            }),
2496        };
2497
2498        polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2499            .map(Some)
2500            .map(Mutex::new)
2501            .map(Arc::new)
2502            .map(|x| StreamingQueryExecutor {
2503                executor: x,
2504                rechunk,
2505            })
2506            .map(|x| Box::new(x) as Box<dyn Executor>)
2507    }
2508
2509    // Note: Arc/Mutex is because Executor requires Sync, but SlotMap is not Sync.
2510    struct StreamingQueryExecutor {
2511        executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2512        rechunk: bool,
2513    }
2514
2515    impl Executor for StreamingQueryExecutor {
2516        fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2517            // Must not block rayon thread on pending new-streaming future.
2518            assert!(POOL.current_thread_index().is_none());
2519
2520            let mut df = { self.executor.try_lock().unwrap().take() }
2521                .expect("unhandled: execute() more than once")
2522                .execute()
2523                .map(|x| x.unwrap_single())?;
2524
2525            if self.rechunk {
2526                df.as_single_chunk_par();
2527            }
2528
2529            Ok(df)
2530        }
2531    }
2532}