polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "ipc")]
21pub use ipc::*;
22#[cfg(feature = "json")]
23pub use ndjson::*;
24#[cfg(feature = "parquet")]
25pub use parquet::*;
26use polars_compute::rolling::QuantileMethod;
27use polars_core::POOL;
28use polars_core::error::feature_gated;
29use polars_core::prelude::*;
30use polars_expr::{ExpressionConversionState, create_physical_expr};
31use polars_io::RowIndex;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38use polars_utils::plpath::PlPath;
39use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
40
41use crate::frame::cached_arenas::CachedArena;
42use crate::prelude::*;
43
44pub trait IntoLazy {
45    fn lazy(self) -> LazyFrame;
46}
47
48impl IntoLazy for DataFrame {
49    /// Convert the `DataFrame` into a `LazyFrame`
50    fn lazy(self) -> LazyFrame {
51        let lp = DslBuilder::from_existing_df(self).build();
52        LazyFrame {
53            logical_plan: lp,
54            opt_state: Default::default(),
55            cached_arena: Default::default(),
56        }
57    }
58}
59
60impl IntoLazy for LazyFrame {
61    fn lazy(self) -> LazyFrame {
62        self
63    }
64}
65
66/// Lazy abstraction over an eager `DataFrame`.
67///
68/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
69/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
70#[derive(Clone, Default)]
71#[must_use]
72pub struct LazyFrame {
73    pub logical_plan: DslPlan,
74    pub(crate) opt_state: OptFlags,
75    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
76}
77
78impl From<DslPlan> for LazyFrame {
79    fn from(plan: DslPlan) -> Self {
80        Self {
81            logical_plan: plan,
82            opt_state: OptFlags::default(),
83            cached_arena: Default::default(),
84        }
85    }
86}
87
88impl LazyFrame {
89    pub(crate) fn from_inner(
90        logical_plan: DslPlan,
91        opt_state: OptFlags,
92        cached_arena: Arc<Mutex<Option<CachedArena>>>,
93    ) -> Self {
94        Self {
95            logical_plan,
96            opt_state,
97            cached_arena,
98        }
99    }
100
101    pub(crate) fn get_plan_builder(self) -> DslBuilder {
102        DslBuilder::from(self.logical_plan)
103    }
104
105    fn get_opt_state(&self) -> OptFlags {
106        self.opt_state
107    }
108
109    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
110        LazyFrame {
111            logical_plan,
112            opt_state,
113            cached_arena: Default::default(),
114        }
115    }
116
117    /// Get current optimizations.
118    pub fn get_current_optimizations(&self) -> OptFlags {
119        self.opt_state
120    }
121
122    /// Set allowed optimizations.
123    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
124        self.opt_state = opt_state;
125        self
126    }
127
128    /// Turn off all optimizations.
129    pub fn without_optimizations(self) -> Self {
130        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
131    }
132
133    /// Toggle projection pushdown optimization.
134    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
135        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
136        self
137    }
138
139    /// Toggle cluster with columns optimization.
140    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
141        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
142        self
143    }
144
145    /// Check if operations are order dependent and unset maintaining_order if
146    /// the order would not be observed.
147    pub fn with_check_order(mut self, toggle: bool) -> Self {
148        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
149        self
150    }
151
152    /// Toggle predicate pushdown optimization.
153    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
154        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
155        self
156    }
157
158    /// Toggle type coercion optimization.
159    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
160        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
161        self
162    }
163
164    /// Toggle type check optimization.
165    pub fn with_type_check(mut self, toggle: bool) -> Self {
166        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
167        self
168    }
169
170    /// Toggle expression simplification optimization on or off.
171    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
172        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
173        self
174    }
175
176    /// Toggle common subplan elimination optimization on or off
177    #[cfg(feature = "cse")]
178    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
179        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
180        self
181    }
182
183    /// Toggle common subexpression elimination optimization on or off
184    #[cfg(feature = "cse")]
185    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
186        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
187        self
188    }
189
190    /// Toggle slice pushdown optimization.
191    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
192        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
193        self
194    }
195
196    #[cfg(feature = "new_streaming")]
197    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
198        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
199        self
200    }
201
202    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
203    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
204        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
205        self
206    }
207
208    /// Run every node eagerly. This turns off multi-node optimizations.
209    pub fn _with_eager(mut self, toggle: bool) -> Self {
210        self.opt_state.set(OptFlags::EAGER, toggle);
211        self
212    }
213
214    /// Return a String describing the naive (un-optimized) logical plan.
215    pub fn describe_plan(&self) -> PolarsResult<String> {
216        Ok(self.clone().to_alp()?.describe())
217    }
218
219    /// Return a String describing the naive (un-optimized) logical plan in tree format.
220    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
221        Ok(self.clone().to_alp()?.describe_tree_format())
222    }
223
224    /// Return a String describing the optimized logical plan.
225    ///
226    /// Returns `Err` if optimizing the logical plan fails.
227    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
228        Ok(self.clone().to_alp_optimized()?.describe())
229    }
230
231    /// Return a String describing the optimized logical plan in tree format.
232    ///
233    /// Returns `Err` if optimizing the logical plan fails.
234    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
235        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
236    }
237
238    /// Return a String describing the logical plan.
239    ///
240    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
241    /// explains the naive, un-optimized plan.
242    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
243        if optimized {
244            self.describe_optimized_plan()
245        } else {
246            self.describe_plan()
247        }
248    }
249
250    /// Add a sort operation to the logical plan.
251    ///
252    /// Sorts the LazyFrame by the column name specified using the provided options.
253    ///
254    /// # Example
255    ///
256    /// Sort DataFrame by 'sepal_width' column:
257    /// ```rust
258    /// # use polars_core::prelude::*;
259    /// # use polars_lazy::prelude::*;
260    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
261    ///     df.lazy().sort(["sepal_width"], Default::default())
262    /// }
263    /// ```
264    /// Sort by a single column with specific order:
265    /// ```
266    /// # use polars_core::prelude::*;
267    /// # use polars_lazy::prelude::*;
268    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
269    ///     df.lazy().sort(
270    ///         ["sepal_width"],
271    ///         SortMultipleOptions::new()
272    ///             .with_order_descending(descending)
273    ///     )
274    /// }
275    /// ```
276    /// Sort by multiple columns with specifying order for each column:
277    /// ```
278    /// # use polars_core::prelude::*;
279    /// # use polars_lazy::prelude::*;
280    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
281    ///     df.lazy().sort(
282    ///         ["sepal_width", "sepal_length"],
283    ///         SortMultipleOptions::new()
284    ///             .with_order_descending_multi([false, true])
285    ///     )
286    /// }
287    /// ```
288    /// See [`SortMultipleOptions`] for more options.
289    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
290        let opt_state = self.get_opt_state();
291        let lp = self
292            .get_plan_builder()
293            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
294            .build();
295        Self::from_logical_plan(lp, opt_state)
296    }
297
298    /// Add a sort operation to the logical plan.
299    ///
300    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
301    /// concrete columns before sorting.
302    ///
303    /// See [`SortMultipleOptions`] for more options.
304    ///
305    /// # Example
306    ///
307    /// ```rust
308    /// use polars_core::prelude::*;
309    /// use polars_lazy::prelude::*;
310    ///
311    /// /// Sort DataFrame by 'sepal_width' column
312    /// fn example(df: DataFrame) -> LazyFrame {
313    ///       df.lazy()
314    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
315    /// }
316    /// ```
317    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
318        self,
319        by_exprs: E,
320        sort_options: SortMultipleOptions,
321    ) -> Self {
322        let by_exprs = by_exprs.as_ref().to_vec();
323        if by_exprs.is_empty() {
324            self
325        } else {
326            let opt_state = self.get_opt_state();
327            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
328            Self::from_logical_plan(lp, opt_state)
329        }
330    }
331
332    pub fn top_k<E: AsRef<[Expr]>>(
333        self,
334        k: IdxSize,
335        by_exprs: E,
336        sort_options: SortMultipleOptions,
337    ) -> Self {
338        // this will optimize to top-k
339        self.sort_by_exprs(
340            by_exprs,
341            sort_options.with_order_reversed().with_nulls_last(true),
342        )
343        .slice(0, k)
344    }
345
346    pub fn bottom_k<E: AsRef<[Expr]>>(
347        self,
348        k: IdxSize,
349        by_exprs: E,
350        sort_options: SortMultipleOptions,
351    ) -> Self {
352        // this will optimize to bottom-k
353        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
354            .slice(0, k)
355    }
356
357    /// Reverse the `DataFrame` from top to bottom.
358    ///
359    /// Row `i` becomes row `number_of_rows - i - 1`.
360    ///
361    /// # Example
362    ///
363    /// ```rust
364    /// use polars_core::prelude::*;
365    /// use polars_lazy::prelude::*;
366    ///
367    /// fn example(df: DataFrame) -> LazyFrame {
368    ///       df.lazy()
369    ///         .reverse()
370    /// }
371    /// ```
372    pub fn reverse(self) -> Self {
373        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
374    }
375
376    /// Rename columns in the DataFrame.
377    ///
378    /// `existing` and `new` are iterables of the same length containing the old and
379    /// corresponding new column names. Renaming happens to all `existing` columns
380    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
381    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
382    /// those columns that are actually found will be renamed (others will be ignored).
383    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
384    where
385        I: IntoIterator<Item = T>,
386        J: IntoIterator<Item = S>,
387        T: AsRef<str>,
388        S: AsRef<str>,
389    {
390        let iter = existing.into_iter();
391        let cap = iter.size_hint().0;
392        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
393        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
394
395        // TODO! should this error if `existing` and `new` have different lengths?
396        // Currently, the longer of the two is truncated.
397        for (existing, new) in iter.zip(new) {
398            let existing = existing.as_ref();
399            let new = new.as_ref();
400            if new != existing {
401                existing_vec.push(existing.into());
402                new_vec.push(new.into());
403            }
404        }
405
406        self.map_private(DslFunction::Rename {
407            existing: existing_vec.into(),
408            new: new_vec.into(),
409            strict,
410        })
411    }
412
413    /// Removes columns from the DataFrame.
414    /// Note that it's better to only select the columns you need
415    /// and let the projection pushdown optimize away the unneeded columns.
416    ///
417    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
418    /// error while materializing the [`LazyFrame`].
419    pub fn drop(self, columns: Selector) -> Self {
420        let opt_state = self.get_opt_state();
421        let lp = self.get_plan_builder().drop(columns).build();
422        Self::from_logical_plan(lp, opt_state)
423    }
424
425    /// Shift the values by a given period and fill the parts that will be empty due to this operation
426    /// with `Nones`.
427    ///
428    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
429    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
430        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
431    }
432
433    /// Shift the values by a given period and fill the parts that will be empty due to this operation
434    /// with the result of the `fill_value` expression.
435    ///
436    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
437    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
438        self.select(vec![
439            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
440        ])
441    }
442
443    /// Fill None values in the DataFrame with an expression.
444    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
445        let opt_state = self.get_opt_state();
446        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
447        Self::from_logical_plan(lp, opt_state)
448    }
449
450    /// Fill NaN values in the DataFrame with an expression.
451    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
452        let opt_state = self.get_opt_state();
453        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
454        Self::from_logical_plan(lp, opt_state)
455    }
456
457    /// Caches the result into a new LazyFrame.
458    ///
459    /// This should be used to prevent computations running multiple times.
460    pub fn cache(self) -> Self {
461        let opt_state = self.get_opt_state();
462        let lp = self.get_plan_builder().cache().build();
463        Self::from_logical_plan(lp, opt_state)
464    }
465
466    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
467    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
468        let cast_cols: Vec<Expr> = dtypes
469            .into_iter()
470            .map(|(name, dt)| {
471                let name = PlSmallStr::from_str(name);
472
473                if strict {
474                    col(name).strict_cast(dt)
475                } else {
476                    col(name).cast(dt)
477                }
478            })
479            .collect();
480
481        if cast_cols.is_empty() {
482            self
483        } else {
484            self.with_columns(cast_cols)
485        }
486    }
487
488    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
489    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
490        self.with_columns(vec![if strict {
491            col(PlSmallStr::from_static("*")).strict_cast(dtype)
492        } else {
493            col(PlSmallStr::from_static("*")).cast(dtype)
494        }])
495    }
496
497    pub fn optimize(
498        self,
499        lp_arena: &mut Arena<IR>,
500        expr_arena: &mut Arena<AExpr>,
501    ) -> PolarsResult<Node> {
502        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
503    }
504
505    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
506        let (mut lp_arena, mut expr_arena) = self.get_arenas();
507        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
508
509        Ok(IRPlan::new(node, lp_arena, expr_arena))
510    }
511
512    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
513        let (mut lp_arena, mut expr_arena) = self.get_arenas();
514        let node = to_alp(
515            self.logical_plan,
516            &mut expr_arena,
517            &mut lp_arena,
518            &mut self.opt_state,
519        )?;
520        let plan = IRPlan::new(node, lp_arena, expr_arena);
521        Ok(plan)
522    }
523
524    pub(crate) fn optimize_with_scratch(
525        self,
526        lp_arena: &mut Arena<IR>,
527        expr_arena: &mut Arena<AExpr>,
528        scratch: &mut Vec<Node>,
529    ) -> PolarsResult<Node> {
530        #[allow(unused_mut)]
531        let mut opt_state = self.opt_state;
532        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
533
534        #[cfg(feature = "cse")]
535        if new_streaming {
536            // The new streaming engine can't deal with the way the common
537            // subexpression elimination adds length-incorrect with_columns.
538            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
539        }
540
541        let lp_top = optimize(
542            self.logical_plan,
543            opt_state,
544            lp_arena,
545            expr_arena,
546            scratch,
547            Some(&|expr, expr_arena, schema| {
548                let phys_expr = create_physical_expr(
549                    expr,
550                    Context::Default,
551                    expr_arena,
552                    schema,
553                    &mut ExpressionConversionState::new(true),
554                )
555                .ok()?;
556                let io_expr = phys_expr_to_io_expr(phys_expr);
557                Some(io_expr)
558            }),
559        )?;
560
561        Ok(lp_top)
562    }
563
564    fn prepare_collect_post_opt<P>(
565        mut self,
566        check_sink: bool,
567        query_start: Option<std::time::Instant>,
568        post_opt: P,
569    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
570    where
571        P: FnOnce(
572            Node,
573            &mut Arena<IR>,
574            &mut Arena<AExpr>,
575            Option<std::time::Duration>,
576        ) -> PolarsResult<()>,
577    {
578        let (mut lp_arena, mut expr_arena) = self.get_arenas();
579
580        let mut scratch = vec![];
581        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
582
583        post_opt(
584            lp_top,
585            &mut lp_arena,
586            &mut expr_arena,
587            // Post optimization callback gets the time since the
588            // query was started as its "base" timepoint.
589            query_start.map(|s| s.elapsed()),
590        )?;
591
592        // sink should be replaced
593        let no_file_sink = if check_sink {
594            !matches!(
595                lp_arena.get(lp_top),
596                IR::Sink {
597                    payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
598                    ..
599                }
600            )
601        } else {
602            true
603        };
604        let physical_plan = create_physical_plan(
605            lp_top,
606            &mut lp_arena,
607            &mut expr_arena,
608            BUILD_STREAMING_EXECUTOR,
609        )?;
610
611        let state = ExecutionState::new();
612        Ok((state, physical_plan, no_file_sink))
613    }
614
615    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
616    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
617    where
618        P: FnOnce(
619            Node,
620            &mut Arena<IR>,
621            &mut Arena<AExpr>,
622            Option<std::time::Duration>,
623        ) -> PolarsResult<()>,
624    {
625        let (mut state, mut physical_plan, _) =
626            self.prepare_collect_post_opt(false, None, post_opt)?;
627        physical_plan.execute(&mut state)
628    }
629
630    #[allow(unused_mut)]
631    fn prepare_collect(
632        self,
633        check_sink: bool,
634        query_start: Option<std::time::Instant>,
635    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
636        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
637    }
638
639    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
640    /// `engine`.
641    ///
642    /// The query is optimized prior to execution.
643    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
644        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
645            payload.clone()
646        } else {
647            self.logical_plan = DslPlan::Sink {
648                input: Arc::new(self.logical_plan),
649                payload: SinkType::Memory,
650            };
651            SinkType::Memory
652        };
653
654        // Default engine for collect is InMemory, sink_* is Streaming
655        if engine == Engine::Auto {
656            engine = match payload {
657                #[cfg(feature = "new_streaming")]
658                SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
659                _ => Engine::InMemory,
660            };
661        }
662        // Gpu uses some hacks to dispatch.
663        if engine == Engine::Gpu {
664            engine = Engine::InMemory;
665        }
666
667        #[cfg(feature = "new_streaming")]
668        {
669            if let Some(result) = self.try_new_streaming_if_requested() {
670                return result.map(|v| v.unwrap_single());
671            }
672        }
673
674        match engine {
675            Engine::Auto => unreachable!(),
676            Engine::Streaming => {
677                feature_gated!("new_streaming", self = self.with_new_streaming(true))
678            },
679            _ => {},
680        }
681        let mut alp_plan = self.clone().to_alp_optimized()?;
682
683        match engine {
684            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
685                let result = polars_stream::run_query(
686                    alp_plan.lp_top,
687                    &mut alp_plan.lp_arena,
688                    &mut alp_plan.expr_arena,
689                );
690                result.map(|v| v.unwrap_single())
691            }),
692            Engine::Gpu => {
693                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
694            },
695            Engine::InMemory => {
696                let mut physical_plan = create_physical_plan(
697                    alp_plan.lp_top,
698                    &mut alp_plan.lp_arena,
699                    &mut alp_plan.expr_arena,
700                    BUILD_STREAMING_EXECUTOR,
701                )?;
702                let mut state = ExecutionState::new();
703                physical_plan.execute(&mut state)
704            },
705        }
706    }
707
708    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
709        let sink_multiple = LazyFrame {
710            logical_plan: DslPlan::SinkMultiple { inputs: plans },
711            opt_state,
712            cached_arena: Default::default(),
713        };
714        sink_multiple.explain(true)
715    }
716
717    pub fn collect_all_with_engine(
718        plans: Vec<DslPlan>,
719        mut engine: Engine,
720        opt_state: OptFlags,
721    ) -> PolarsResult<Vec<DataFrame>> {
722        if plans.is_empty() {
723            return Ok(Vec::new());
724        }
725
726        // Default engine for collect_all is InMemory
727        if engine == Engine::Auto {
728            engine = Engine::InMemory;
729        }
730        // Gpu uses some hacks to dispatch.
731        if engine == Engine::Gpu {
732            engine = Engine::InMemory;
733        }
734
735        let mut sink_multiple = LazyFrame {
736            logical_plan: DslPlan::SinkMultiple { inputs: plans },
737            opt_state,
738            cached_arena: Default::default(),
739        };
740
741        #[cfg(feature = "new_streaming")]
742        {
743            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
744                return result.map(|v| v.unwrap_multiple());
745            }
746        }
747
748        match engine {
749            Engine::Auto => unreachable!(),
750            Engine::Streaming => {
751                feature_gated!(
752                    "new_streaming",
753                    sink_multiple = sink_multiple.with_new_streaming(true)
754                )
755            },
756            _ => {},
757        }
758        let mut alp_plan = sink_multiple.to_alp_optimized()?;
759
760        if engine == Engine::Streaming {
761            feature_gated!("new_streaming", {
762                let result = polars_stream::run_query(
763                    alp_plan.lp_top,
764                    &mut alp_plan.lp_arena,
765                    &mut alp_plan.expr_arena,
766                );
767                return result.map(|v| v.unwrap_multiple());
768            });
769        }
770
771        let IR::SinkMultiple { inputs } = alp_plan.root() else {
772            unreachable!()
773        };
774
775        let mut multiplan = create_multiple_physical_plans(
776            inputs.clone().as_slice(),
777            &mut alp_plan.lp_arena,
778            &mut alp_plan.expr_arena,
779            BUILD_STREAMING_EXECUTOR,
780        )?;
781
782        match engine {
783            Engine::Gpu => polars_bail!(
784                InvalidOperation: "collect_all is not supported for the gpu engine"
785            ),
786            Engine::InMemory => {
787                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
788                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
789                // within bounds
790                let mut state = ExecutionState::new();
791                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
792                    cache_prefiller.execute(&mut state)?;
793                }
794                let out = POOL.install(|| {
795                    multiplan
796                        .physical_plans
797                        .chunks_mut(POOL.current_num_threads() * 3)
798                        .map(|chunk| {
799                            chunk
800                                .into_par_iter()
801                                .enumerate()
802                                .map(|(idx, input)| {
803                                    let mut input = std::mem::take(input);
804                                    let mut state = state.split();
805                                    state.branch_idx += idx;
806
807                                    let df = input.execute(&mut state)?;
808                                    Ok(df)
809                                })
810                                .collect::<PolarsResult<Vec<_>>>()
811                        })
812                        .collect::<PolarsResult<Vec<_>>>()
813                });
814                Ok(out?.into_iter().flatten().collect())
815            },
816            _ => unreachable!(),
817        }
818    }
819
820    /// Execute all the lazy operations and collect them into a [`DataFrame`].
821    ///
822    /// The query is optimized prior to execution.
823    ///
824    /// # Example
825    ///
826    /// ```rust
827    /// use polars_core::prelude::*;
828    /// use polars_lazy::prelude::*;
829    ///
830    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
831    ///     df.lazy()
832    ///       .group_by([col("foo")])
833    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
834    ///       .collect()
835    /// }
836    /// ```
837    pub fn collect(self) -> PolarsResult<DataFrame> {
838        self.collect_with_engine(Engine::InMemory)
839    }
840
841    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
842    // This version does profiling of the node execution.
843    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
844    where
845        P: FnOnce(
846            Node,
847            &mut Arena<IR>,
848            &mut Arena<AExpr>,
849            Option<std::time::Duration>,
850        ) -> PolarsResult<()>,
851    {
852        let query_start = std::time::Instant::now();
853        let (mut state, mut physical_plan, _) =
854            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
855        state.time_nodes(query_start);
856        let out = physical_plan.execute(&mut state)?;
857        let timer_df = state.finish_timer()?;
858        Ok((out, timer_df))
859    }
860
861    /// Profile a LazyFrame.
862    ///
863    /// This will run the query and return a tuple
864    /// containing the materialized DataFrame and a DataFrame that contains profiling information
865    /// of each node that is executed.
866    ///
867    /// The units of the timings are microseconds.
868    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
869        self._profile_post_opt(|_, _, _, _| Ok(()))
870    }
871
872    /// Stream a query result into a parquet file. This is useful if the final result doesn't fit
873    /// into memory. This methods will return an error if the query cannot be completely done in a
874    /// streaming fashion.
875    #[cfg(feature = "parquet")]
876    pub fn sink_parquet(
877        self,
878        target: SinkTarget,
879        options: ParquetWriteOptions,
880        cloud_options: Option<polars_io::cloud::CloudOptions>,
881        sink_options: SinkOptions,
882    ) -> PolarsResult<Self> {
883        self.sink(SinkType::File(FileSinkType {
884            target,
885            sink_options,
886            file_type: FileType::Parquet(options),
887            cloud_options,
888        }))
889    }
890
891    /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
892    /// into memory. This methods will return an error if the query cannot be completely done in a
893    /// streaming fashion.
894    #[cfg(feature = "ipc")]
895    pub fn sink_ipc(
896        self,
897        target: SinkTarget,
898        options: IpcWriterOptions,
899        cloud_options: Option<polars_io::cloud::CloudOptions>,
900        sink_options: SinkOptions,
901    ) -> PolarsResult<Self> {
902        self.sink(SinkType::File(FileSinkType {
903            target,
904            sink_options,
905            file_type: FileType::Ipc(options),
906            cloud_options,
907        }))
908    }
909
910    /// Stream a query result into an csv file. This is useful if the final result doesn't fit
911    /// into memory. This methods will return an error if the query cannot be completely done in a
912    /// streaming fashion.
913    #[cfg(feature = "csv")]
914    pub fn sink_csv(
915        self,
916        target: SinkTarget,
917        options: CsvWriterOptions,
918        cloud_options: Option<polars_io::cloud::CloudOptions>,
919        sink_options: SinkOptions,
920    ) -> PolarsResult<Self> {
921        self.sink(SinkType::File(FileSinkType {
922            target,
923            sink_options,
924            file_type: FileType::Csv(options),
925            cloud_options,
926        }))
927    }
928
929    /// Stream a query result into a JSON file. This is useful if the final result doesn't fit
930    /// into memory. This methods will return an error if the query cannot be completely done in a
931    /// streaming fashion.
932    #[cfg(feature = "json")]
933    pub fn sink_json(
934        self,
935        target: SinkTarget,
936        options: JsonWriterOptions,
937        cloud_options: Option<polars_io::cloud::CloudOptions>,
938        sink_options: SinkOptions,
939    ) -> PolarsResult<Self> {
940        self.sink(SinkType::File(FileSinkType {
941            target,
942            sink_options,
943            file_type: FileType::Json(options),
944            cloud_options,
945        }))
946    }
947
948    /// Stream a query result into a parquet file in a partitioned manner. This is useful if the
949    /// final result doesn't fit into memory. This methods will return an error if the query cannot
950    /// be completely done in a streaming fashion.
951    #[cfg(feature = "parquet")]
952    #[allow(clippy::too_many_arguments)]
953    pub fn sink_parquet_partitioned(
954        self,
955        base_path: Arc<PlPath>,
956        file_path_cb: Option<PartitionTargetCallback>,
957        variant: PartitionVariant,
958        options: ParquetWriteOptions,
959        cloud_options: Option<polars_io::cloud::CloudOptions>,
960        sink_options: SinkOptions,
961        per_partition_sort_by: Option<Vec<SortColumn>>,
962        finish_callback: Option<SinkFinishCallback>,
963    ) -> PolarsResult<Self> {
964        self.sink(SinkType::Partition(PartitionSinkType {
965            base_path,
966            file_path_cb,
967            sink_options,
968            variant,
969            file_type: FileType::Parquet(options),
970            cloud_options,
971            per_partition_sort_by,
972            finish_callback,
973        }))
974    }
975
976    /// Stream a query result into an ipc/arrow file in a partitioned manner. This is useful if the
977    /// final result doesn't fit into memory. This methods will return an error if the query cannot
978    /// be completely done in a streaming fashion.
979    #[cfg(feature = "ipc")]
980    #[allow(clippy::too_many_arguments)]
981    pub fn sink_ipc_partitioned(
982        self,
983        base_path: Arc<PlPath>,
984        file_path_cb: Option<PartitionTargetCallback>,
985        variant: PartitionVariant,
986        options: IpcWriterOptions,
987        cloud_options: Option<polars_io::cloud::CloudOptions>,
988        sink_options: SinkOptions,
989        per_partition_sort_by: Option<Vec<SortColumn>>,
990        finish_callback: Option<SinkFinishCallback>,
991    ) -> PolarsResult<Self> {
992        self.sink(SinkType::Partition(PartitionSinkType {
993            base_path,
994            file_path_cb,
995            sink_options,
996            variant,
997            file_type: FileType::Ipc(options),
998            cloud_options,
999            per_partition_sort_by,
1000            finish_callback,
1001        }))
1002    }
1003
1004    /// Stream a query result into an csv file in a partitioned manner. This is useful if the final
1005    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1006    /// completely done in a streaming fashion.
1007    #[cfg(feature = "csv")]
1008    #[allow(clippy::too_many_arguments)]
1009    pub fn sink_csv_partitioned(
1010        self,
1011        base_path: Arc<PlPath>,
1012        file_path_cb: Option<PartitionTargetCallback>,
1013        variant: PartitionVariant,
1014        options: CsvWriterOptions,
1015        cloud_options: Option<polars_io::cloud::CloudOptions>,
1016        sink_options: SinkOptions,
1017        per_partition_sort_by: Option<Vec<SortColumn>>,
1018        finish_callback: Option<SinkFinishCallback>,
1019    ) -> PolarsResult<Self> {
1020        self.sink(SinkType::Partition(PartitionSinkType {
1021            base_path,
1022            file_path_cb,
1023            sink_options,
1024            variant,
1025            file_type: FileType::Csv(options),
1026            cloud_options,
1027            per_partition_sort_by,
1028            finish_callback,
1029        }))
1030    }
1031
1032    /// Stream a query result into a JSON file in a partitioned manner. This is useful if the final
1033    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1034    /// completely done in a streaming fashion.
1035    #[cfg(feature = "json")]
1036    #[allow(clippy::too_many_arguments)]
1037    pub fn sink_json_partitioned(
1038        self,
1039        base_path: Arc<PlPath>,
1040        file_path_cb: Option<PartitionTargetCallback>,
1041        variant: PartitionVariant,
1042        options: JsonWriterOptions,
1043        cloud_options: Option<polars_io::cloud::CloudOptions>,
1044        sink_options: SinkOptions,
1045        per_partition_sort_by: Option<Vec<SortColumn>>,
1046        finish_callback: Option<SinkFinishCallback>,
1047    ) -> PolarsResult<Self> {
1048        self.sink(SinkType::Partition(PartitionSinkType {
1049            base_path,
1050            file_path_cb,
1051            sink_options,
1052            variant,
1053            file_type: FileType::Json(options),
1054            cloud_options,
1055            per_partition_sort_by,
1056            finish_callback,
1057        }))
1058    }
1059
1060    #[cfg(feature = "new_streaming")]
1061    pub fn try_new_streaming_if_requested(
1062        &mut self,
1063    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1064        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1065        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1066
1067        if auto_new_streaming || force_new_streaming {
1068            // Try to run using the new streaming engine, falling back
1069            // if it fails in a todo!() error if auto_new_streaming is set.
1070            let mut new_stream_lazy = self.clone();
1071            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1072            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1073                Ok(v) => v,
1074                Err(e) => return Some(Err(e)),
1075            };
1076
1077            let f = || {
1078                polars_stream::run_query(
1079                    alp_plan.lp_top,
1080                    &mut alp_plan.lp_arena,
1081                    &mut alp_plan.expr_arena,
1082                )
1083            };
1084
1085            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1086                Ok(v) => return Some(v),
1087                Err(e) => {
1088                    // Fallback to normal engine if error is due to not being implemented
1089                    // and auto_new_streaming is set, otherwise propagate error.
1090                    if !force_new_streaming
1091                        && auto_new_streaming
1092                        && e.downcast_ref::<&str>()
1093                            .map(|s| s.starts_with("not yet implemented"))
1094                            .unwrap_or(false)
1095                    {
1096                        if polars_core::config::verbose() {
1097                            eprintln!(
1098                                "caught unimplemented error in new streaming engine, falling back to normal engine"
1099                            );
1100                        }
1101                    } else {
1102                        std::panic::resume_unwind(e);
1103                    }
1104                },
1105            }
1106        }
1107
1108        None
1109    }
1110
1111    fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1112        polars_ensure!(
1113            !matches!(self.logical_plan, DslPlan::Sink { .. }),
1114            InvalidOperation: "cannot create a sink on top of another sink"
1115        );
1116        self.logical_plan = DslPlan::Sink {
1117            input: Arc::new(self.logical_plan),
1118            payload,
1119        };
1120        Ok(self)
1121    }
1122
1123    /// Filter frame rows that match a predicate expression.
1124    ///
1125    /// The expression must yield boolean values (note that rows where the
1126    /// predicate resolves to `null` are *not* included in the resulting frame).
1127    ///
1128    /// # Example
1129    ///
1130    /// ```rust
1131    /// use polars_core::prelude::*;
1132    /// use polars_lazy::prelude::*;
1133    ///
1134    /// fn example(df: DataFrame) -> LazyFrame {
1135    ///       df.lazy()
1136    ///         .filter(col("sepal_width").is_not_null())
1137    ///         .select([col("sepal_width"), col("sepal_length")])
1138    /// }
1139    /// ```
1140    pub fn filter(self, predicate: Expr) -> Self {
1141        let opt_state = self.get_opt_state();
1142        let lp = self.get_plan_builder().filter(predicate).build();
1143        Self::from_logical_plan(lp, opt_state)
1144    }
1145
1146    /// Remove frame rows that match a predicate expression.
1147    ///
1148    /// The expression must yield boolean values (note that rows where the
1149    /// predicate resolves to `null` are *not* removed from the resulting frame).
1150    ///
1151    /// # Example
1152    ///
1153    /// ```rust
1154    /// use polars_core::prelude::*;
1155    /// use polars_lazy::prelude::*;
1156    ///
1157    /// fn example(df: DataFrame) -> LazyFrame {
1158    ///       df.lazy()
1159    ///         .remove(col("sepal_width").is_null())
1160    ///         .select([col("sepal_width"), col("sepal_length")])
1161    /// }
1162    /// ```
1163    pub fn remove(self, predicate: Expr) -> Self {
1164        self.filter(predicate.neq_missing(lit(true)))
1165    }
1166
1167    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1168    ///
1169    /// Columns can be selected with [`col`];
1170    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1171    ///
1172    /// # Example
1173    ///
1174    /// ```rust
1175    /// use polars_core::prelude::*;
1176    /// use polars_lazy::prelude::*;
1177    ///
1178    /// /// This function selects column "foo" and column "bar".
1179    /// /// Column "bar" is renamed to "ham".
1180    /// fn example(df: DataFrame) -> LazyFrame {
1181    ///       df.lazy()
1182    ///         .select([col("foo"),
1183    ///                   col("bar").alias("ham")])
1184    /// }
1185    ///
1186    /// /// This function selects all columns except "foo"
1187    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1188    ///       df.lazy()
1189    ///         .select([all().exclude_cols(["foo"]).as_expr()])
1190    /// }
1191    /// ```
1192    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1193        let exprs = exprs.as_ref().to_vec();
1194        self.select_impl(
1195            exprs,
1196            ProjectionOptions {
1197                run_parallel: true,
1198                duplicate_check: true,
1199                should_broadcast: true,
1200            },
1201        )
1202    }
1203
1204    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1205        let exprs = exprs.as_ref().to_vec();
1206        self.select_impl(
1207            exprs,
1208            ProjectionOptions {
1209                run_parallel: false,
1210                duplicate_check: true,
1211                should_broadcast: true,
1212            },
1213        )
1214    }
1215
1216    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1217        let opt_state = self.get_opt_state();
1218        let lp = self.get_plan_builder().project(exprs, options).build();
1219        Self::from_logical_plan(lp, opt_state)
1220    }
1221
1222    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1223    ///
1224    /// Takes a list of expressions to group on.
1225    ///
1226    /// # Example
1227    ///
1228    /// ```rust
1229    /// use polars_core::prelude::*;
1230    /// use polars_lazy::prelude::*;
1231    ///
1232    /// fn example(df: DataFrame) -> LazyFrame {
1233    ///       df.lazy()
1234    ///        .group_by([col("date")])
1235    ///        .agg([
1236    ///            col("rain").min().alias("min_rain"),
1237    ///            col("rain").sum().alias("sum_rain"),
1238    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1239    ///        ])
1240    /// }
1241    /// ```
1242    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1243        let keys = by
1244            .as_ref()
1245            .iter()
1246            .map(|e| e.clone().into())
1247            .collect::<Vec<_>>();
1248        let opt_state = self.get_opt_state();
1249
1250        #[cfg(feature = "dynamic_group_by")]
1251        {
1252            LazyGroupBy {
1253                logical_plan: self.logical_plan,
1254                opt_state,
1255                keys,
1256                maintain_order: false,
1257                dynamic_options: None,
1258                rolling_options: None,
1259            }
1260        }
1261
1262        #[cfg(not(feature = "dynamic_group_by"))]
1263        {
1264            LazyGroupBy {
1265                logical_plan: self.logical_plan,
1266                opt_state,
1267                keys,
1268                maintain_order: false,
1269            }
1270        }
1271    }
1272
1273    /// Create rolling groups based on a time column.
1274    ///
1275    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1276    ///
1277    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1278    /// individual values and are not of constant intervals. For constant intervals use
1279    /// *group_by_dynamic*
1280    #[cfg(feature = "dynamic_group_by")]
1281    pub fn rolling<E: AsRef<[Expr]>>(
1282        mut self,
1283        index_column: Expr,
1284        group_by: E,
1285        mut options: RollingGroupOptions,
1286    ) -> LazyGroupBy {
1287        if let Expr::Column(name) = index_column {
1288            options.index_column = name;
1289        } else {
1290            let output_field = index_column
1291                .to_field(&self.collect_schema().unwrap())
1292                .unwrap();
1293            return self.with_column(index_column).rolling(
1294                Expr::Column(output_field.name().clone()),
1295                group_by,
1296                options,
1297            );
1298        }
1299        let opt_state = self.get_opt_state();
1300        LazyGroupBy {
1301            logical_plan: self.logical_plan,
1302            opt_state,
1303            keys: group_by.as_ref().to_vec(),
1304            maintain_order: true,
1305            dynamic_options: None,
1306            rolling_options: Some(options),
1307        }
1308    }
1309
1310    /// Group based on a time value (or index value of type Int32, Int64).
1311    ///
1312    /// Time windows are calculated and rows are assigned to windows. Different from a
1313    /// normal group_by is that a row can be member of multiple groups. The time/index
1314    /// window could be seen as a rolling window, with a window size determined by
1315    /// dates/times/values instead of slots in the DataFrame.
1316    ///
1317    /// A window is defined by:
1318    ///
1319    /// - every: interval of the window
1320    /// - period: length of the window
1321    /// - offset: offset of the window
1322    ///
1323    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1324    /// with a ordinary group_by on these keys.
1325    #[cfg(feature = "dynamic_group_by")]
1326    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1327        mut self,
1328        index_column: Expr,
1329        group_by: E,
1330        mut options: DynamicGroupOptions,
1331    ) -> LazyGroupBy {
1332        if let Expr::Column(name) = index_column {
1333            options.index_column = name;
1334        } else {
1335            let output_field = index_column
1336                .to_field(&self.collect_schema().unwrap())
1337                .unwrap();
1338            return self.with_column(index_column).group_by_dynamic(
1339                Expr::Column(output_field.name().clone()),
1340                group_by,
1341                options,
1342            );
1343        }
1344        let opt_state = self.get_opt_state();
1345        LazyGroupBy {
1346            logical_plan: self.logical_plan,
1347            opt_state,
1348            keys: group_by.as_ref().to_vec(),
1349            maintain_order: true,
1350            dynamic_options: Some(options),
1351            rolling_options: None,
1352        }
1353    }
1354
1355    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1356    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1357        let keys = by
1358            .as_ref()
1359            .iter()
1360            .map(|e| e.clone().into())
1361            .collect::<Vec<_>>();
1362        let opt_state = self.get_opt_state();
1363
1364        #[cfg(feature = "dynamic_group_by")]
1365        {
1366            LazyGroupBy {
1367                logical_plan: self.logical_plan,
1368                opt_state,
1369                keys,
1370                maintain_order: true,
1371                dynamic_options: None,
1372                rolling_options: None,
1373            }
1374        }
1375
1376        #[cfg(not(feature = "dynamic_group_by"))]
1377        {
1378            LazyGroupBy {
1379                logical_plan: self.logical_plan,
1380                opt_state,
1381                keys,
1382                maintain_order: true,
1383            }
1384        }
1385    }
1386
1387    /// Left anti join this query with another lazy query.
1388    ///
1389    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1390    /// flexible join logic, see [`join`](LazyFrame::join) or
1391    /// [`join_builder`](LazyFrame::join_builder).
1392    ///
1393    /// # Example
1394    ///
1395    /// ```rust
1396    /// use polars_core::prelude::*;
1397    /// use polars_lazy::prelude::*;
1398    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1399    ///         ldf
1400    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1401    /// }
1402    /// ```
1403    #[cfg(feature = "semi_anti_join")]
1404    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1405        self.join(
1406            other,
1407            [left_on.into()],
1408            [right_on.into()],
1409            JoinArgs::new(JoinType::Anti),
1410        )
1411    }
1412
1413    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1414    #[cfg(feature = "cross_join")]
1415    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1416        self.join(
1417            other,
1418            vec![],
1419            vec![],
1420            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1421        )
1422    }
1423
1424    /// Left outer join this query with another lazy query.
1425    ///
1426    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1427    /// flexible join logic, see [`join`](LazyFrame::join) or
1428    /// [`join_builder`](LazyFrame::join_builder).
1429    ///
1430    /// # Example
1431    ///
1432    /// ```rust
1433    /// use polars_core::prelude::*;
1434    /// use polars_lazy::prelude::*;
1435    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1436    ///         ldf
1437    ///         .left_join(other, col("foo"), col("bar"))
1438    /// }
1439    /// ```
1440    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1441        self.join(
1442            other,
1443            [left_on.into()],
1444            [right_on.into()],
1445            JoinArgs::new(JoinType::Left),
1446        )
1447    }
1448
1449    /// Inner join this query with another lazy query.
1450    ///
1451    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1452    /// flexible join logic, see [`join`](LazyFrame::join) or
1453    /// [`join_builder`](LazyFrame::join_builder).
1454    ///
1455    /// # Example
1456    ///
1457    /// ```rust
1458    /// use polars_core::prelude::*;
1459    /// use polars_lazy::prelude::*;
1460    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1461    ///         ldf
1462    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1463    /// }
1464    /// ```
1465    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1466        self.join(
1467            other,
1468            [left_on.into()],
1469            [right_on.into()],
1470            JoinArgs::new(JoinType::Inner),
1471        )
1472    }
1473
1474    /// Full outer join this query with another lazy query.
1475    ///
1476    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1477    /// flexible join logic, see [`join`](LazyFrame::join) or
1478    /// [`join_builder`](LazyFrame::join_builder).
1479    ///
1480    /// # Example
1481    ///
1482    /// ```rust
1483    /// use polars_core::prelude::*;
1484    /// use polars_lazy::prelude::*;
1485    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1486    ///         ldf
1487    ///         .full_join(other, col("foo"), col("bar"))
1488    /// }
1489    /// ```
1490    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1491        self.join(
1492            other,
1493            [left_on.into()],
1494            [right_on.into()],
1495            JoinArgs::new(JoinType::Full),
1496        )
1497    }
1498
1499    /// Left semi join this query with another lazy query.
1500    ///
1501    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1502    /// flexible join logic, see [`join`](LazyFrame::join) or
1503    /// [`join_builder`](LazyFrame::join_builder).
1504    ///
1505    /// # Example
1506    ///
1507    /// ```rust
1508    /// use polars_core::prelude::*;
1509    /// use polars_lazy::prelude::*;
1510    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1511    ///         ldf
1512    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1513    /// }
1514    /// ```
1515    #[cfg(feature = "semi_anti_join")]
1516    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1517        self.join(
1518            other,
1519            [left_on.into()],
1520            [right_on.into()],
1521            JoinArgs::new(JoinType::Semi),
1522        )
1523    }
1524
1525    /// Generic function to join two LazyFrames.
1526    ///
1527    /// `join` can join on multiple columns, given as two list of expressions, and with a
1528    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1529    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1530    /// over how columns are renamed and parallelization options, use
1531    /// [`join_builder`](LazyFrame::join_builder).
1532    ///
1533    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1534    ///
1535    /// # Example
1536    ///
1537    /// ```rust
1538    /// use polars_core::prelude::*;
1539    /// use polars_lazy::prelude::*;
1540    ///
1541    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1542    ///         ldf
1543    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1544    /// }
1545    /// ```
1546    pub fn join<E: AsRef<[Expr]>>(
1547        self,
1548        other: LazyFrame,
1549        left_on: E,
1550        right_on: E,
1551        args: JoinArgs,
1552    ) -> LazyFrame {
1553        let left_on = left_on.as_ref().to_vec();
1554        let right_on = right_on.as_ref().to_vec();
1555
1556        self._join_impl(other, left_on, right_on, args)
1557    }
1558
1559    fn _join_impl(
1560        self,
1561        other: LazyFrame,
1562        left_on: Vec<Expr>,
1563        right_on: Vec<Expr>,
1564        args: JoinArgs,
1565    ) -> LazyFrame {
1566        let JoinArgs {
1567            how,
1568            validation,
1569            suffix,
1570            slice,
1571            nulls_equal,
1572            coalesce,
1573            maintain_order,
1574        } = args;
1575
1576        if slice.is_some() {
1577            panic!("impl error: slice is not handled")
1578        }
1579
1580        let mut builder = self
1581            .join_builder()
1582            .with(other)
1583            .left_on(left_on)
1584            .right_on(right_on)
1585            .how(how)
1586            .validate(validation)
1587            .join_nulls(nulls_equal)
1588            .coalesce(coalesce)
1589            .maintain_order(maintain_order);
1590
1591        if let Some(suffix) = suffix {
1592            builder = builder.suffix(suffix);
1593        }
1594
1595        // Note: args.slice is set by the optimizer
1596        builder.finish()
1597    }
1598
1599    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1600    ///
1601    /// After the `JoinBuilder` has been created and set up, calling
1602    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1603    /// representing the `join` operation.
1604    pub fn join_builder(self) -> JoinBuilder {
1605        JoinBuilder::new(self)
1606    }
1607
1608    /// Add or replace a column, given as an expression, to a DataFrame.
1609    ///
1610    /// # Example
1611    ///
1612    /// ```rust
1613    /// use polars_core::prelude::*;
1614    /// use polars_lazy::prelude::*;
1615    /// fn add_column(df: DataFrame) -> LazyFrame {
1616    ///     df.lazy()
1617    ///         .with_column(
1618    ///             when(col("sepal_length").lt(lit(5.0)))
1619    ///             .then(lit(10))
1620    ///             .otherwise(lit(1))
1621    ///             .alias("new_column_name"),
1622    ///         )
1623    /// }
1624    /// ```
1625    pub fn with_column(self, expr: Expr) -> LazyFrame {
1626        let opt_state = self.get_opt_state();
1627        let lp = self
1628            .get_plan_builder()
1629            .with_columns(
1630                vec![expr],
1631                ProjectionOptions {
1632                    run_parallel: false,
1633                    duplicate_check: true,
1634                    should_broadcast: true,
1635                },
1636            )
1637            .build();
1638        Self::from_logical_plan(lp, opt_state)
1639    }
1640
1641    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1642    ///
1643    /// # Example
1644    ///
1645    /// ```rust
1646    /// use polars_core::prelude::*;
1647    /// use polars_lazy::prelude::*;
1648    /// fn add_columns(df: DataFrame) -> LazyFrame {
1649    ///     df.lazy()
1650    ///         .with_columns(
1651    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1652    ///          )
1653    /// }
1654    /// ```
1655    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1656        let exprs = exprs.as_ref().to_vec();
1657        self.with_columns_impl(
1658            exprs,
1659            ProjectionOptions {
1660                run_parallel: true,
1661                duplicate_check: true,
1662                should_broadcast: true,
1663            },
1664        )
1665    }
1666
1667    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1668    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1669        let exprs = exprs.as_ref().to_vec();
1670        self.with_columns_impl(
1671            exprs,
1672            ProjectionOptions {
1673                run_parallel: false,
1674                duplicate_check: true,
1675                should_broadcast: true,
1676            },
1677        )
1678    }
1679
1680    /// Match or evolve to a certain schema.
1681    pub fn match_to_schema(
1682        self,
1683        schema: SchemaRef,
1684        per_column: Arc<[MatchToSchemaPerColumn]>,
1685        extra_columns: ExtraColumnsPolicy,
1686    ) -> LazyFrame {
1687        let opt_state = self.get_opt_state();
1688        let lp = self
1689            .get_plan_builder()
1690            .match_to_schema(schema, per_column, extra_columns)
1691            .build();
1692        Self::from_logical_plan(lp, opt_state)
1693    }
1694
1695    pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1696        let opt_state = self.get_opt_state();
1697        let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1698        Self::from_logical_plan(lp, opt_state)
1699    }
1700
1701    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1702        let opt_state = self.get_opt_state();
1703        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1704        Self::from_logical_plan(lp, opt_state)
1705    }
1706
1707    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1708        let contexts = contexts
1709            .as_ref()
1710            .iter()
1711            .map(|lf| lf.logical_plan.clone())
1712            .collect();
1713        let opt_state = self.get_opt_state();
1714        let lp = self.get_plan_builder().with_context(contexts).build();
1715        Self::from_logical_plan(lp, opt_state)
1716    }
1717
1718    /// Aggregate all the columns as their maximum values.
1719    ///
1720    /// Aggregated columns will have the same names as the original columns.
1721    pub fn max(self) -> Self {
1722        self.map_private(DslFunction::Stats(StatsFunction::Max))
1723    }
1724
1725    /// Aggregate all the columns as their minimum values.
1726    ///
1727    /// Aggregated columns will have the same names as the original columns.
1728    pub fn min(self) -> Self {
1729        self.map_private(DslFunction::Stats(StatsFunction::Min))
1730    }
1731
1732    /// Aggregate all the columns as their sum values.
1733    ///
1734    /// Aggregated columns will have the same names as the original columns.
1735    ///
1736    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1737    /// - For integer columns, the ordinary checks for overflow are performed:
1738    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1739    ///   silently wrap.
1740    /// - String columns will sum to None.
1741    pub fn sum(self) -> Self {
1742        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1743    }
1744
1745    /// Aggregate all the columns as their mean values.
1746    ///
1747    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1748    /// - String columns will have a mean of None.
1749    pub fn mean(self) -> Self {
1750        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1751    }
1752
1753    /// Aggregate all the columns as their median values.
1754    ///
1755    /// - Boolean and integer results are converted to `f64`. However, they are still
1756    ///   susceptible to overflow before this conversion occurs.
1757    /// - String columns will sum to None.
1758    pub fn median(self) -> Self {
1759        self.map_private(DslFunction::Stats(StatsFunction::Median))
1760    }
1761
1762    /// Aggregate all the columns as their quantile values.
1763    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1764        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1765            quantile,
1766            method,
1767        }))
1768    }
1769
1770    /// Aggregate all the columns as their standard deviation values.
1771    ///
1772    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1773    /// computing the variance, where `N` is the number of rows.
1774    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1775    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1776    /// > likelihood estimate of the variance for normally distributed variables. The
1777    /// > standard deviation computed in this function is the square root of the estimated
1778    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1779    /// > standard deviation per se.
1780    ///
1781    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1782    pub fn std(self, ddof: u8) -> Self {
1783        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1784    }
1785
1786    /// Aggregate all the columns as their variance values.
1787    ///
1788    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1789    /// computing the variance, where `N` is the number of rows.
1790    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1791    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1792    /// > likelihood estimate of the variance for normally distributed variables.
1793    ///
1794    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1795    pub fn var(self, ddof: u8) -> Self {
1796        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1797    }
1798
1799    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1800    pub fn explode(self, columns: Selector) -> LazyFrame {
1801        self.explode_impl(columns, false)
1802    }
1803
1804    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1805    fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1806        let opt_state = self.get_opt_state();
1807        let lp = self
1808            .get_plan_builder()
1809            .explode(columns, allow_empty)
1810            .build();
1811        Self::from_logical_plan(lp, opt_state)
1812    }
1813
1814    /// Aggregate all the columns as the sum of their null value count.
1815    pub fn null_count(self) -> LazyFrame {
1816        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1817    }
1818
1819    /// Drop non-unique rows and maintain the order of kept rows.
1820    ///
1821    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1822    /// `None`, all columns are considered.
1823    pub fn unique_stable(
1824        self,
1825        subset: Option<Selector>,
1826        keep_strategy: UniqueKeepStrategy,
1827    ) -> LazyFrame {
1828        self.unique_stable_generic(subset, keep_strategy)
1829    }
1830
1831    pub fn unique_stable_generic(
1832        self,
1833        subset: Option<Selector>,
1834        keep_strategy: UniqueKeepStrategy,
1835    ) -> LazyFrame {
1836        let opt_state = self.get_opt_state();
1837        let options = DistinctOptionsDSL {
1838            subset,
1839            maintain_order: true,
1840            keep_strategy,
1841        };
1842        let lp = self.get_plan_builder().distinct(options).build();
1843        Self::from_logical_plan(lp, opt_state)
1844    }
1845
1846    /// Drop non-unique rows without maintaining the order of kept rows.
1847    ///
1848    /// The order of the kept rows may change; to maintain the original row order, use
1849    /// [`unique_stable`](LazyFrame::unique_stable).
1850    ///
1851    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1852    /// all columns are considered.
1853    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1854        self.unique_generic(subset, keep_strategy)
1855    }
1856
1857    pub fn unique_generic(
1858        self,
1859        subset: Option<Selector>,
1860        keep_strategy: UniqueKeepStrategy,
1861    ) -> LazyFrame {
1862        let opt_state = self.get_opt_state();
1863        let options = DistinctOptionsDSL {
1864            subset,
1865            maintain_order: false,
1866            keep_strategy,
1867        };
1868        let lp = self.get_plan_builder().distinct(options).build();
1869        Self::from_logical_plan(lp, opt_state)
1870    }
1871
1872    /// Drop rows containing one or more NaN values.
1873    ///
1874    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1875    /// floating point columns are considered.
1876    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1877        let opt_state = self.get_opt_state();
1878        let lp = self.get_plan_builder().drop_nans(subset).build();
1879        Self::from_logical_plan(lp, opt_state)
1880    }
1881
1882    /// Drop rows containing one or more None values.
1883    ///
1884    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1885    /// columns are considered.
1886    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1887        let opt_state = self.get_opt_state();
1888        let lp = self.get_plan_builder().drop_nulls(subset).build();
1889        Self::from_logical_plan(lp, opt_state)
1890    }
1891
1892    /// Slice the DataFrame using an offset (starting row) and a length.
1893    ///
1894    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1895    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1896    /// end.
1897    ///
1898    /// If `offset` and `len` are such that the slice extends beyond the end of the
1899    /// DataFrame, the portion between `offset` and the end will be returned. In this
1900    /// case, the number of rows in the returned DataFrame will be less than `len`.
1901    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1902        let opt_state = self.get_opt_state();
1903        let lp = self.get_plan_builder().slice(offset, len).build();
1904        Self::from_logical_plan(lp, opt_state)
1905    }
1906
1907    /// Get the first row.
1908    ///
1909    /// Equivalent to `self.slice(0, 1)`.
1910    pub fn first(self) -> LazyFrame {
1911        self.slice(0, 1)
1912    }
1913
1914    /// Get the last row.
1915    ///
1916    /// Equivalent to `self.slice(-1, 1)`.
1917    pub fn last(self) -> LazyFrame {
1918        self.slice(-1, 1)
1919    }
1920
1921    /// Get the last `n` rows.
1922    ///
1923    /// Equivalent to `self.slice(-(n as i64), n)`.
1924    pub fn tail(self, n: IdxSize) -> LazyFrame {
1925        let neg_tail = -(n as i64);
1926        self.slice(neg_tail, n)
1927    }
1928
1929    /// Unpivot the DataFrame from wide to long format.
1930    ///
1931    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1932    #[cfg(feature = "pivot")]
1933    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1934        let opt_state = self.get_opt_state();
1935        let lp = self.get_plan_builder().unpivot(args).build();
1936        Self::from_logical_plan(lp, opt_state)
1937    }
1938
1939    /// Limit the DataFrame to the first `n` rows.
1940    pub fn limit(self, n: IdxSize) -> LazyFrame {
1941        self.slice(0, n)
1942    }
1943
1944    /// Apply a function/closure once the logical plan get executed.
1945    ///
1946    /// The function has access to the whole materialized DataFrame at the time it is
1947    /// called.
1948    ///
1949    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1950    /// with `LazyFrame::with_column` or `with_columns`.
1951    ///
1952    /// ## Warning
1953    /// This can blow up in your face if the schema is changed due to the operation. The
1954    /// optimizer relies on a correct schema.
1955    ///
1956    /// You can toggle certain optimizations off.
1957    pub fn map<F>(
1958        self,
1959        function: F,
1960        optimizations: AllowedOptimizations,
1961        schema: Option<Arc<dyn UdfSchema>>,
1962        name: Option<&'static str>,
1963    ) -> LazyFrame
1964    where
1965        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1966    {
1967        let opt_state = self.get_opt_state();
1968        let lp = self
1969            .get_plan_builder()
1970            .map(
1971                function,
1972                optimizations,
1973                schema,
1974                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1975            )
1976            .build();
1977        Self::from_logical_plan(lp, opt_state)
1978    }
1979
1980    #[cfg(feature = "python")]
1981    pub fn map_python(
1982        self,
1983        function: polars_utils::python_function::PythonFunction,
1984        optimizations: AllowedOptimizations,
1985        schema: Option<SchemaRef>,
1986        validate_output: bool,
1987    ) -> LazyFrame {
1988        let opt_state = self.get_opt_state();
1989        let lp = self
1990            .get_plan_builder()
1991            .map_python(function, optimizations, schema, validate_output)
1992            .build();
1993        Self::from_logical_plan(lp, opt_state)
1994    }
1995
1996    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1997        let opt_state = self.get_opt_state();
1998        let lp = self.get_plan_builder().map_private(function).build();
1999        Self::from_logical_plan(lp, opt_state)
2000    }
2001
2002    /// Add a new column at index 0 that counts the rows.
2003    ///
2004    /// `name` is the name of the new column. `offset` is where to start counting from; if
2005    /// `None`, it is set to `0`.
2006    ///
2007    /// # Warning
2008    /// This can have a negative effect on query performance. This may for instance block
2009    /// predicate pushdown optimization.
2010    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2011    where
2012        S: Into<PlSmallStr>,
2013    {
2014        let name = name.into();
2015
2016        match &self.logical_plan {
2017            v @ DslPlan::Scan { scan_type, .. }
2018                if !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2019            {
2020                let DslPlan::Scan {
2021                    sources,
2022                    mut unified_scan_args,
2023                    scan_type,
2024                    cached_ir: _,
2025                } = v.clone()
2026                else {
2027                    unreachable!()
2028                };
2029
2030                unified_scan_args.row_index = Some(RowIndex {
2031                    name,
2032                    offset: offset.unwrap_or(0),
2033                });
2034
2035                DslPlan::Scan {
2036                    sources,
2037                    unified_scan_args,
2038                    scan_type,
2039                    cached_ir: Default::default(),
2040                }
2041                .into()
2042            },
2043            _ => self.map_private(DslFunction::RowIndex { name, offset }),
2044        }
2045    }
2046
2047    /// Return the number of non-null elements for each column.
2048    pub fn count(self) -> LazyFrame {
2049        self.select(vec![col(PlSmallStr::from_static("*")).count()])
2050    }
2051
2052    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2053    /// inserted as columns.
2054    #[cfg(feature = "dtype-struct")]
2055    pub fn unnest(self, cols: Selector) -> Self {
2056        self.map_private(DslFunction::Unnest(cols))
2057    }
2058
2059    #[cfg(feature = "merge_sorted")]
2060    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2061    where
2062        S: Into<PlSmallStr>,
2063    {
2064        let key = key.into();
2065
2066        let lp = DslPlan::MergeSorted {
2067            input_left: Arc::new(self.logical_plan),
2068            input_right: Arc::new(other.logical_plan),
2069            key,
2070        };
2071        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2072    }
2073}
2074
2075/// Utility struct for lazy group_by operation.
2076#[derive(Clone)]
2077pub struct LazyGroupBy {
2078    pub logical_plan: DslPlan,
2079    opt_state: OptFlags,
2080    keys: Vec<Expr>,
2081    maintain_order: bool,
2082    #[cfg(feature = "dynamic_group_by")]
2083    dynamic_options: Option<DynamicGroupOptions>,
2084    #[cfg(feature = "dynamic_group_by")]
2085    rolling_options: Option<RollingGroupOptions>,
2086}
2087
2088impl From<LazyGroupBy> for LazyFrame {
2089    fn from(lgb: LazyGroupBy) -> Self {
2090        Self {
2091            logical_plan: lgb.logical_plan,
2092            opt_state: lgb.opt_state,
2093            cached_arena: Default::default(),
2094        }
2095    }
2096}
2097
2098impl LazyGroupBy {
2099    /// Group by and aggregate.
2100    ///
2101    /// Select a column with [col] and choose an aggregation.
2102    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2103    ///
2104    /// # Example
2105    ///
2106    /// ```rust
2107    /// use polars_core::prelude::*;
2108    /// use polars_lazy::prelude::*;
2109    ///
2110    /// fn example(df: DataFrame) -> LazyFrame {
2111    ///       df.lazy()
2112    ///        .group_by_stable([col("date")])
2113    ///        .agg([
2114    ///            col("rain").min().alias("min_rain"),
2115    ///            col("rain").sum().alias("sum_rain"),
2116    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2117    ///        ])
2118    /// }
2119    /// ```
2120    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2121        #[cfg(feature = "dynamic_group_by")]
2122        let lp = DslBuilder::from(self.logical_plan)
2123            .group_by(
2124                self.keys,
2125                aggs,
2126                None,
2127                self.maintain_order,
2128                self.dynamic_options,
2129                self.rolling_options,
2130            )
2131            .build();
2132
2133        #[cfg(not(feature = "dynamic_group_by"))]
2134        let lp = DslBuilder::from(self.logical_plan)
2135            .group_by(self.keys, aggs, None, self.maintain_order)
2136            .build();
2137        LazyFrame::from_logical_plan(lp, self.opt_state)
2138    }
2139
2140    /// Return first n rows of each group
2141    pub fn head(self, n: Option<usize>) -> LazyFrame {
2142        let keys = self
2143            .keys
2144            .iter()
2145            .filter_map(|expr| expr_output_name(expr).ok())
2146            .collect::<Vec<_>>();
2147
2148        self.agg([all().as_expr().head(n)])
2149            .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2150    }
2151
2152    /// Return last n rows of each group
2153    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2154        let keys = self
2155            .keys
2156            .iter()
2157            .filter_map(|expr| expr_output_name(expr).ok())
2158            .collect::<Vec<_>>();
2159
2160        self.agg([all().as_expr().tail(n)])
2161            .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2162    }
2163
2164    /// Apply a function over the groups as a new DataFrame.
2165    ///
2166    /// **It is not recommended that you use this as materializing the DataFrame is very
2167    /// expensive.**
2168    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2169        #[cfg(feature = "dynamic_group_by")]
2170        let options = GroupbyOptions {
2171            dynamic: self.dynamic_options,
2172            rolling: self.rolling_options,
2173            slice: None,
2174        };
2175
2176        #[cfg(not(feature = "dynamic_group_by"))]
2177        let options = GroupbyOptions { slice: None };
2178
2179        let lp = DslPlan::GroupBy {
2180            input: Arc::new(self.logical_plan),
2181            keys: self.keys,
2182            aggs: vec![],
2183            apply: Some((f, schema)),
2184            maintain_order: self.maintain_order,
2185            options: Arc::new(options),
2186        };
2187        LazyFrame::from_logical_plan(lp, self.opt_state)
2188    }
2189}
2190
2191#[must_use]
2192pub struct JoinBuilder {
2193    lf: LazyFrame,
2194    how: JoinType,
2195    other: Option<LazyFrame>,
2196    left_on: Vec<Expr>,
2197    right_on: Vec<Expr>,
2198    allow_parallel: bool,
2199    force_parallel: bool,
2200    suffix: Option<PlSmallStr>,
2201    validation: JoinValidation,
2202    nulls_equal: bool,
2203    coalesce: JoinCoalesce,
2204    maintain_order: MaintainOrderJoin,
2205}
2206impl JoinBuilder {
2207    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2208    pub fn new(lf: LazyFrame) -> Self {
2209        Self {
2210            lf,
2211            other: None,
2212            how: JoinType::Inner,
2213            left_on: vec![],
2214            right_on: vec![],
2215            allow_parallel: true,
2216            force_parallel: false,
2217            suffix: None,
2218            validation: Default::default(),
2219            nulls_equal: false,
2220            coalesce: Default::default(),
2221            maintain_order: Default::default(),
2222        }
2223    }
2224
2225    /// The right table in the join.
2226    pub fn with(mut self, other: LazyFrame) -> Self {
2227        self.other = Some(other);
2228        self
2229    }
2230
2231    /// Select the join type.
2232    pub fn how(mut self, how: JoinType) -> Self {
2233        self.how = how;
2234        self
2235    }
2236
2237    pub fn validate(mut self, validation: JoinValidation) -> Self {
2238        self.validation = validation;
2239        self
2240    }
2241
2242    /// The expressions you want to join both tables on.
2243    ///
2244    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2245    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2246        let on = on.as_ref().to_vec();
2247        self.left_on.clone_from(&on);
2248        self.right_on = on;
2249        self
2250    }
2251
2252    /// The expressions you want to join the left table on.
2253    ///
2254    /// The passed expressions must be valid in the left table.
2255    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2256        self.left_on = on.as_ref().to_vec();
2257        self
2258    }
2259
2260    /// The expressions you want to join the right table on.
2261    ///
2262    /// The passed expressions must be valid in the right table.
2263    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2264        self.right_on = on.as_ref().to_vec();
2265        self
2266    }
2267
2268    /// Allow parallel table evaluation.
2269    pub fn allow_parallel(mut self, allow: bool) -> Self {
2270        self.allow_parallel = allow;
2271        self
2272    }
2273
2274    /// Force parallel table evaluation.
2275    pub fn force_parallel(mut self, force: bool) -> Self {
2276        self.force_parallel = force;
2277        self
2278    }
2279
2280    /// Join on null values. By default null values will never produce matches.
2281    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2282        self.nulls_equal = nulls_equal;
2283        self
2284    }
2285
2286    /// Suffix to add duplicate column names in join.
2287    /// Defaults to `"_right"` if this method is never called.
2288    pub fn suffix<S>(mut self, suffix: S) -> Self
2289    where
2290        S: Into<PlSmallStr>,
2291    {
2292        self.suffix = Some(suffix.into());
2293        self
2294    }
2295
2296    /// Whether to coalesce join columns.
2297    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2298        self.coalesce = coalesce;
2299        self
2300    }
2301
2302    /// Whether to preserve the row order.
2303    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2304        self.maintain_order = maintain_order;
2305        self
2306    }
2307
2308    /// Finish builder
2309    pub fn finish(self) -> LazyFrame {
2310        let opt_state = self.lf.opt_state;
2311        let other = self.other.expect("'with' not set in join builder");
2312
2313        let args = JoinArgs {
2314            how: self.how,
2315            validation: self.validation,
2316            suffix: self.suffix,
2317            slice: None,
2318            nulls_equal: self.nulls_equal,
2319            coalesce: self.coalesce,
2320            maintain_order: self.maintain_order,
2321        };
2322
2323        let lp = self
2324            .lf
2325            .get_plan_builder()
2326            .join(
2327                other.logical_plan,
2328                self.left_on,
2329                self.right_on,
2330                JoinOptions {
2331                    allow_parallel: self.allow_parallel,
2332                    force_parallel: self.force_parallel,
2333                    args,
2334                }
2335                .into(),
2336            )
2337            .build();
2338        LazyFrame::from_logical_plan(lp, opt_state)
2339    }
2340
2341    // Finish with join predicates
2342    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2343        let opt_state = self.lf.opt_state;
2344        let other = self.other.expect("with not set");
2345
2346        // Decompose `And` conjunctions into their component expressions
2347        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2348            if let Expr::BinaryExpr {
2349                op: Operator::And,
2350                left,
2351                right,
2352            } = predicate
2353            {
2354                decompose_and((*left).clone(), expanded_predicates);
2355                decompose_and((*right).clone(), expanded_predicates);
2356            } else {
2357                expanded_predicates.push(predicate);
2358            }
2359        }
2360        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2361        for predicate in predicates {
2362            decompose_and(predicate, &mut expanded_predicates);
2363        }
2364        let predicates: Vec<Expr> = expanded_predicates;
2365
2366        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2367        #[cfg(feature = "is_between")]
2368        let predicates: Vec<Expr> = {
2369            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2370            for predicate in predicates {
2371                if let Expr::Function {
2372                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2373                    input,
2374                    ..
2375                } = &predicate
2376                {
2377                    if let [expr, lower, upper] = input.as_slice() {
2378                        match closed {
2379                            ClosedInterval::Both => {
2380                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2381                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2382                            },
2383                            ClosedInterval::Right => {
2384                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2385                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2386                            },
2387                            ClosedInterval::Left => {
2388                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2389                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2390                            },
2391                            ClosedInterval::None => {
2392                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2393                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2394                            },
2395                        }
2396                        continue;
2397                    }
2398                }
2399                expanded_predicates.push(predicate);
2400            }
2401            expanded_predicates
2402        };
2403
2404        let args = JoinArgs {
2405            how: self.how,
2406            validation: self.validation,
2407            suffix: self.suffix,
2408            slice: None,
2409            nulls_equal: self.nulls_equal,
2410            coalesce: self.coalesce,
2411            maintain_order: self.maintain_order,
2412        };
2413        let options = JoinOptions {
2414            allow_parallel: self.allow_parallel,
2415            force_parallel: self.force_parallel,
2416            args,
2417        };
2418
2419        let lp = DslPlan::Join {
2420            input_left: Arc::new(self.lf.logical_plan),
2421            input_right: Arc::new(other.logical_plan),
2422            left_on: Default::default(),
2423            right_on: Default::default(),
2424            predicates,
2425            options: Arc::from(options),
2426        };
2427
2428        LazyFrame::from_logical_plan(lp, opt_state)
2429    }
2430}
2431
2432pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2433    #[cfg(not(feature = "new_streaming"))]
2434    {
2435        None
2436    }
2437    #[cfg(feature = "new_streaming")]
2438    {
2439        Some(streaming_dispatch::build_streaming_query_executor)
2440    }
2441};
2442#[cfg(feature = "new_streaming")]
2443pub use streaming_dispatch::build_streaming_query_executor;
2444
2445#[cfg(feature = "new_streaming")]
2446mod streaming_dispatch {
2447    use std::sync::{Arc, Mutex};
2448
2449    use polars_core::POOL;
2450    use polars_core::error::PolarsResult;
2451    use polars_core::frame::DataFrame;
2452    use polars_expr::state::ExecutionState;
2453    use polars_mem_engine::Executor;
2454    use polars_plan::dsl::SinkTypeIR;
2455    use polars_plan::plans::{AExpr, IR};
2456    use polars_utils::arena::{Arena, Node};
2457
2458    pub fn build_streaming_query_executor(
2459        node: Node,
2460        ir_arena: &mut Arena<IR>,
2461        expr_arena: &mut Arena<AExpr>,
2462    ) -> PolarsResult<Box<dyn Executor>> {
2463        let rechunk = match ir_arena.get(node) {
2464            IR::Scan {
2465                unified_scan_args, ..
2466            } => unified_scan_args.rechunk,
2467            _ => false,
2468        };
2469
2470        let node = match ir_arena.get(node) {
2471            IR::SinkMultiple { .. } => panic!("SinkMultiple not supported"),
2472            IR::Sink { .. } => node,
2473            _ => ir_arena.add(IR::Sink {
2474                input: node,
2475                payload: SinkTypeIR::Memory,
2476            }),
2477        };
2478
2479        polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2480            .map(Some)
2481            .map(Mutex::new)
2482            .map(Arc::new)
2483            .map(|x| StreamingQueryExecutor {
2484                executor: x,
2485                rechunk,
2486            })
2487            .map(|x| Box::new(x) as Box<dyn Executor>)
2488    }
2489
2490    // Note: Arc/Mutex is because Executor requires Sync, but SlotMap is not Sync.
2491    struct StreamingQueryExecutor {
2492        executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2493        rechunk: bool,
2494    }
2495
2496    impl Executor for StreamingQueryExecutor {
2497        fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2498            // Must not block rayon thread on pending new-streaming future.
2499            assert!(POOL.current_thread_index().is_none());
2500
2501            let mut df = { self.executor.try_lock().unwrap().take() }
2502                .expect("unhandled: execute() more than once")
2503                .execute()
2504                .map(|x| x.unwrap_single())?;
2505
2506            if self.rechunk {
2507                df.as_single_chunk_par();
2508            }
2509
2510            Ok(df)
2511        }
2512    }
2513}