polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12#[cfg(any(
13    feature = "parquet",
14    feature = "ipc",
15    feature = "csv",
16    feature = "json"
17))]
18use std::path::Path;
19use std::sync::{Arc, Mutex};
20
21pub use anonymous_scan::*;
22#[cfg(feature = "csv")]
23pub use csv::*;
24#[cfg(not(target_arch = "wasm32"))]
25pub use exitable::*;
26pub use file_list_reader::*;
27#[cfg(feature = "ipc")]
28pub use ipc::*;
29#[cfg(feature = "json")]
30pub use ndjson::*;
31#[cfg(feature = "parquet")]
32pub use parquet::*;
33use polars_compute::rolling::QuantileMethod;
34use polars_core::POOL;
35use polars_core::error::feature_gated;
36use polars_core::prelude::*;
37use polars_expr::{ExpressionConversionState, create_physical_expr};
38use polars_io::RowIndex;
39use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
40use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
41#[cfg(feature = "is_between")]
42use polars_ops::prelude::ClosedInterval;
43pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
44use polars_plan::global::FETCH_ROWS;
45use polars_utils::pl_str::PlSmallStr;
46use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
47
48use crate::frame::cached_arenas::CachedArena;
49#[cfg(feature = "streaming")]
50use crate::physical_plan::streaming::insert_streaming_nodes;
51use crate::prelude::*;
52
53pub trait IntoLazy {
54    fn lazy(self) -> LazyFrame;
55}
56
57impl IntoLazy for DataFrame {
58    /// Convert the `DataFrame` into a `LazyFrame`
59    fn lazy(self) -> LazyFrame {
60        let lp = DslBuilder::from_existing_df(self).build();
61        LazyFrame {
62            logical_plan: lp,
63            opt_state: Default::default(),
64            cached_arena: Default::default(),
65        }
66    }
67}
68
69impl IntoLazy for LazyFrame {
70    fn lazy(self) -> LazyFrame {
71        self
72    }
73}
74
75/// Lazy abstraction over an eager `DataFrame`.
76///
77/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
78/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
79#[derive(Clone, Default)]
80#[must_use]
81pub struct LazyFrame {
82    pub logical_plan: DslPlan,
83    pub(crate) opt_state: OptFlags,
84    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
85}
86
87impl From<DslPlan> for LazyFrame {
88    fn from(plan: DslPlan) -> Self {
89        Self {
90            logical_plan: plan,
91            opt_state: OptFlags::default(),
92            cached_arena: Default::default(),
93        }
94    }
95}
96
97impl LazyFrame {
98    pub(crate) fn from_inner(
99        logical_plan: DslPlan,
100        opt_state: OptFlags,
101        cached_arena: Arc<Mutex<Option<CachedArena>>>,
102    ) -> Self {
103        Self {
104            logical_plan,
105            opt_state,
106            cached_arena,
107        }
108    }
109
110    pub(crate) fn get_plan_builder(self) -> DslBuilder {
111        DslBuilder::from(self.logical_plan)
112    }
113
114    fn get_opt_state(&self) -> OptFlags {
115        self.opt_state
116    }
117
118    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
119        LazyFrame {
120            logical_plan,
121            opt_state,
122            cached_arena: Default::default(),
123        }
124    }
125
126    /// Get current optimizations.
127    pub fn get_current_optimizations(&self) -> OptFlags {
128        self.opt_state
129    }
130
131    /// Set allowed optimizations.
132    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
133        self.opt_state = opt_state;
134        self
135    }
136
137    /// Turn off all optimizations.
138    pub fn without_optimizations(self) -> Self {
139        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
140    }
141
142    /// Toggle projection pushdown optimization.
143    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
144        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
145        self
146    }
147
148    /// Toggle cluster with columns optimization.
149    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
150        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
151        self
152    }
153
154    /// Toggle collapse joins optimization.
155    pub fn with_collapse_joins(mut self, toggle: bool) -> Self {
156        self.opt_state.set(OptFlags::COLLAPSE_JOINS, toggle);
157        self
158    }
159
160    /// Check if operations are order dependent and unset maintaining_order if
161    /// the order would not be observed.
162    pub fn with_check_order(mut self, toggle: bool) -> Self {
163        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
164        self
165    }
166
167    /// Toggle predicate pushdown optimization.
168    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
169        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
170        self
171    }
172
173    /// Toggle type coercion optimization.
174    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
175        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
176        self
177    }
178
179    /// Toggle type check optimization.
180    pub fn with_type_check(mut self, toggle: bool) -> Self {
181        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
182        self
183    }
184
185    /// Toggle expression simplification optimization on or off.
186    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
187        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
188        self
189    }
190
191    /// Toggle common subplan elimination optimization on or off
192    #[cfg(feature = "cse")]
193    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
194        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
195        self
196    }
197
198    /// Toggle common subexpression elimination optimization on or off
199    #[cfg(feature = "cse")]
200    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
201        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
202        self
203    }
204
205    /// Toggle slice pushdown optimization.
206    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
207        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
208        self
209    }
210
211    /// Run nodes that are capably of doing so on the streaming engine.
212    #[cfg(feature = "streaming")]
213    pub fn with_streaming(mut self, toggle: bool) -> Self {
214        self.opt_state.set(OptFlags::STREAMING, toggle);
215        self
216    }
217
218    #[cfg(feature = "new_streaming")]
219    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
220        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
221        self
222    }
223
224    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
225    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
226        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
227        self
228    }
229
230    /// Run every node eagerly. This turns off multi-node optimizations.
231    pub fn _with_eager(mut self, toggle: bool) -> Self {
232        self.opt_state.set(OptFlags::EAGER, toggle);
233        self
234    }
235
236    /// Return a String describing the naive (un-optimized) logical plan.
237    pub fn describe_plan(&self) -> PolarsResult<String> {
238        Ok(self.clone().to_alp()?.describe())
239    }
240
241    /// Return a String describing the naive (un-optimized) logical plan in tree format.
242    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
243        Ok(self.clone().to_alp()?.describe_tree_format())
244    }
245
246    // @NOTE: this is used because we want to set the `enable_fmt` flag of `optimize_with_scratch`
247    // to `true` for describe.
248    fn _describe_to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
249        let (mut lp_arena, mut expr_arena) = self.get_arenas();
250        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], true)?;
251
252        Ok(IRPlan::new(node, lp_arena, expr_arena))
253    }
254
255    /// Return a String describing the optimized logical plan.
256    ///
257    /// Returns `Err` if optimizing the logical plan fails.
258    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
259        Ok(self.clone()._describe_to_alp_optimized()?.describe())
260    }
261
262    /// Return a String describing the optimized logical plan in tree format.
263    ///
264    /// Returns `Err` if optimizing the logical plan fails.
265    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
266        Ok(self
267            .clone()
268            ._describe_to_alp_optimized()?
269            .describe_tree_format())
270    }
271
272    /// Return a String describing the logical plan.
273    ///
274    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
275    /// explains the naive, un-optimized plan.
276    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
277        if optimized {
278            self.describe_optimized_plan()
279        } else {
280            self.describe_plan()
281        }
282    }
283
284    /// Add a sort operation to the logical plan.
285    ///
286    /// Sorts the LazyFrame by the column name specified using the provided options.
287    ///
288    /// # Example
289    ///
290    /// Sort DataFrame by 'sepal_width' column:
291    /// ```rust
292    /// # use polars_core::prelude::*;
293    /// # use polars_lazy::prelude::*;
294    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
295    ///     df.lazy().sort(["sepal_width"], Default::default())
296    /// }
297    /// ```
298    /// Sort by a single column with specific order:
299    /// ```
300    /// # use polars_core::prelude::*;
301    /// # use polars_lazy::prelude::*;
302    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
303    ///     df.lazy().sort(
304    ///         ["sepal_width"],
305    ///         SortMultipleOptions::new()
306    ///             .with_order_descending(descending)
307    ///     )
308    /// }
309    /// ```
310    /// Sort by multiple columns with specifying order for each column:
311    /// ```
312    /// # use polars_core::prelude::*;
313    /// # use polars_lazy::prelude::*;
314    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
315    ///     df.lazy().sort(
316    ///         ["sepal_width", "sepal_length"],
317    ///         SortMultipleOptions::new()
318    ///             .with_order_descending_multi([false, true])
319    ///     )
320    /// }
321    /// ```
322    /// See [`SortMultipleOptions`] for more options.
323    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
324        let opt_state = self.get_opt_state();
325        let lp = self
326            .get_plan_builder()
327            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
328            .build();
329        Self::from_logical_plan(lp, opt_state)
330    }
331
332    /// Add a sort operation to the logical plan.
333    ///
334    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
335    /// concrete columns before sorting.
336    ///
337    /// See [`SortMultipleOptions`] for more options.
338    ///
339    /// # Example
340    ///
341    /// ```rust
342    /// use polars_core::prelude::*;
343    /// use polars_lazy::prelude::*;
344    ///
345    /// /// Sort DataFrame by 'sepal_width' column
346    /// fn example(df: DataFrame) -> LazyFrame {
347    ///       df.lazy()
348    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
349    /// }
350    /// ```
351    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
352        self,
353        by_exprs: E,
354        sort_options: SortMultipleOptions,
355    ) -> Self {
356        let by_exprs = by_exprs.as_ref().to_vec();
357        if by_exprs.is_empty() {
358            self
359        } else {
360            let opt_state = self.get_opt_state();
361            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
362            Self::from_logical_plan(lp, opt_state)
363        }
364    }
365
366    pub fn top_k<E: AsRef<[Expr]>>(
367        self,
368        k: IdxSize,
369        by_exprs: E,
370        sort_options: SortMultipleOptions,
371    ) -> Self {
372        // this will optimize to top-k
373        self.sort_by_exprs(
374            by_exprs,
375            sort_options.with_order_reversed().with_nulls_last(true),
376        )
377        .slice(0, k)
378    }
379
380    pub fn bottom_k<E: AsRef<[Expr]>>(
381        self,
382        k: IdxSize,
383        by_exprs: E,
384        sort_options: SortMultipleOptions,
385    ) -> Self {
386        // this will optimize to bottom-k
387        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
388            .slice(0, k)
389    }
390
391    /// Reverse the `DataFrame` from top to bottom.
392    ///
393    /// Row `i` becomes row `number_of_rows - i - 1`.
394    ///
395    /// # Example
396    ///
397    /// ```rust
398    /// use polars_core::prelude::*;
399    /// use polars_lazy::prelude::*;
400    ///
401    /// fn example(df: DataFrame) -> LazyFrame {
402    ///       df.lazy()
403    ///         .reverse()
404    /// }
405    /// ```
406    pub fn reverse(self) -> Self {
407        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
408    }
409
410    /// Rename columns in the DataFrame.
411    ///
412    /// `existing` and `new` are iterables of the same length containing the old and
413    /// corresponding new column names. Renaming happens to all `existing` columns
414    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
415    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
416    /// those columns that are actually found will be renamed (others will be ignored).
417    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
418    where
419        I: IntoIterator<Item = T>,
420        J: IntoIterator<Item = S>,
421        T: AsRef<str>,
422        S: AsRef<str>,
423    {
424        let iter = existing.into_iter();
425        let cap = iter.size_hint().0;
426        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
427        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
428
429        // TODO! should this error if `existing` and `new` have different lengths?
430        // Currently, the longer of the two is truncated.
431        for (existing, new) in iter.zip(new) {
432            let existing = existing.as_ref();
433            let new = new.as_ref();
434            if new != existing {
435                existing_vec.push(existing.into());
436                new_vec.push(new.into());
437            }
438        }
439
440        self.map_private(DslFunction::Rename {
441            existing: existing_vec.into(),
442            new: new_vec.into(),
443            strict,
444        })
445    }
446
447    /// Removes columns from the DataFrame.
448    /// Note that it's better to only select the columns you need
449    /// and let the projection pushdown optimize away the unneeded columns.
450    ///
451    /// If `strict` is `true`, then any given columns that are not in the schema will
452    /// give a [`PolarsError::ColumnNotFound`] error while materializing the [`LazyFrame`].
453    fn _drop<I, T>(self, columns: I, strict: bool) -> Self
454    where
455        I: IntoIterator<Item = T>,
456        T: Into<Selector>,
457    {
458        let to_drop = columns.into_iter().map(|c| c.into()).collect();
459
460        let opt_state = self.get_opt_state();
461        let lp = self.get_plan_builder().drop(to_drop, strict).build();
462        Self::from_logical_plan(lp, opt_state)
463    }
464
465    /// Removes columns from the DataFrame.
466    /// Note that it's better to only select the columns you need
467    /// and let the projection pushdown optimize away the unneeded columns.
468    ///
469    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
470    /// error while materializing the [`LazyFrame`].
471    pub fn drop<I, T>(self, columns: I) -> Self
472    where
473        I: IntoIterator<Item = T>,
474        T: Into<Selector>,
475    {
476        self._drop(columns, true)
477    }
478
479    /// Removes columns from the DataFrame.
480    /// Note that it's better to only select the columns you need
481    /// and let the projection pushdown optimize away the unneeded columns.
482    ///
483    /// If a column name does not exist in the schema, it will quietly be ignored.
484    pub fn drop_no_validate<I, T>(self, columns: I) -> Self
485    where
486        I: IntoIterator<Item = T>,
487        T: Into<Selector>,
488    {
489        self._drop(columns, false)
490    }
491
492    /// Shift the values by a given period and fill the parts that will be empty due to this operation
493    /// with `Nones`.
494    ///
495    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
496    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
497        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
498    }
499
500    /// Shift the values by a given period and fill the parts that will be empty due to this operation
501    /// with the result of the `fill_value` expression.
502    ///
503    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
504    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
505        self.select(vec![
506            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
507        ])
508    }
509
510    /// Fill None values in the DataFrame with an expression.
511    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
512        let opt_state = self.get_opt_state();
513        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
514        Self::from_logical_plan(lp, opt_state)
515    }
516
517    /// Fill NaN values in the DataFrame with an expression.
518    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
519        let opt_state = self.get_opt_state();
520        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
521        Self::from_logical_plan(lp, opt_state)
522    }
523
524    /// Caches the result into a new LazyFrame.
525    ///
526    /// This should be used to prevent computations running multiple times.
527    pub fn cache(self) -> Self {
528        let opt_state = self.get_opt_state();
529        let lp = self.get_plan_builder().cache().build();
530        Self::from_logical_plan(lp, opt_state)
531    }
532
533    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
534    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
535        let cast_cols: Vec<Expr> = dtypes
536            .into_iter()
537            .map(|(name, dt)| {
538                let name = PlSmallStr::from_str(name);
539
540                if strict {
541                    col(name).strict_cast(dt)
542                } else {
543                    col(name).cast(dt)
544                }
545            })
546            .collect();
547
548        if cast_cols.is_empty() {
549            self.clone()
550        } else {
551            self.with_columns(cast_cols)
552        }
553    }
554
555    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
556    pub fn cast_all(self, dtype: DataType, strict: bool) -> Self {
557        self.with_columns(vec![if strict {
558            col(PlSmallStr::from_static("*")).strict_cast(dtype)
559        } else {
560            col(PlSmallStr::from_static("*")).cast(dtype)
561        }])
562    }
563
564    /// Fetch is like a collect operation, but it overwrites the number of rows read by every scan
565    /// operation. This is a utility that helps debug a query on a smaller number of rows.
566    ///
567    /// Note that the fetch does not guarantee the final number of rows in the DataFrame.
568    /// Filter, join operations and a lower number of rows available in the scanned file influence
569    /// the final number of rows.
570    pub fn fetch(self, n_rows: usize) -> PolarsResult<DataFrame> {
571        FETCH_ROWS.with(|fetch_rows| fetch_rows.set(Some(n_rows)));
572        let res = self.collect();
573        FETCH_ROWS.with(|fetch_rows| fetch_rows.set(None));
574        res
575    }
576
577    pub fn optimize(
578        self,
579        lp_arena: &mut Arena<IR>,
580        expr_arena: &mut Arena<AExpr>,
581    ) -> PolarsResult<Node> {
582        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
583    }
584
585    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
586        let (mut lp_arena, mut expr_arena) = self.get_arenas();
587        let node =
588            self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;
589
590        Ok(IRPlan::new(node, lp_arena, expr_arena))
591    }
592
593    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
594        let (mut lp_arena, mut expr_arena) = self.get_arenas();
595        let node = to_alp(
596            self.logical_plan,
597            &mut expr_arena,
598            &mut lp_arena,
599            &mut self.opt_state,
600        )?;
601        let plan = IRPlan::new(node, lp_arena, expr_arena);
602        Ok(plan)
603    }
604
605    pub(crate) fn optimize_with_scratch(
606        self,
607        lp_arena: &mut Arena<IR>,
608        expr_arena: &mut Arena<AExpr>,
609        scratch: &mut Vec<Node>,
610        enable_fmt: bool,
611    ) -> PolarsResult<Node> {
612        #[allow(unused_mut)]
613        let mut opt_state = self.opt_state;
614        let streaming = self.opt_state.contains(OptFlags::STREAMING);
615        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
616        #[cfg(feature = "cse")]
617        if streaming && !new_streaming {
618            opt_state &= !OptFlags::COMM_SUBPLAN_ELIM;
619        }
620
621        #[cfg(feature = "cse")]
622        if new_streaming {
623            // The new streaming engine can't deal with the way the common
624            // subexpression elimination adds length-incorrect with_columns.
625            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
626        }
627
628        let lp_top = optimize(
629            self.logical_plan,
630            opt_state,
631            lp_arena,
632            expr_arena,
633            scratch,
634            Some(&|expr, expr_arena, schema| {
635                let phys_expr = create_physical_expr(
636                    expr,
637                    Context::Default,
638                    expr_arena,
639                    schema,
640                    &mut ExpressionConversionState::new(true, 0),
641                )
642                .ok()?;
643                let io_expr = phys_expr_to_io_expr(phys_expr);
644                Some(io_expr)
645            }),
646        )?;
647
648        if streaming {
649            #[cfg(feature = "streaming")]
650            {
651                insert_streaming_nodes(
652                    lp_top,
653                    lp_arena,
654                    expr_arena,
655                    scratch,
656                    enable_fmt,
657                    true,
658                    opt_state.contains(OptFlags::ROW_ESTIMATE),
659                )?;
660            }
661            #[cfg(not(feature = "streaming"))]
662            {
663                _ = enable_fmt;
664                panic!("activate feature 'streaming'")
665            }
666        }
667
668        Ok(lp_top)
669    }
670
671    fn prepare_collect_post_opt<P>(
672        mut self,
673        check_sink: bool,
674        query_start: Option<std::time::Instant>,
675        post_opt: P,
676    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
677    where
678        P: FnOnce(
679            Node,
680            &mut Arena<IR>,
681            &mut Arena<AExpr>,
682            Option<std::time::Duration>,
683        ) -> PolarsResult<()>,
684    {
685        let (mut lp_arena, mut expr_arena) = self.get_arenas();
686
687        let mut scratch = vec![];
688        let lp_top =
689            self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;
690
691        post_opt(
692            lp_top,
693            &mut lp_arena,
694            &mut expr_arena,
695            // Post optimization callback gets the time since the
696            // query was started as its "base" timepoint.
697            query_start.map(|s| s.elapsed()),
698        )?;
699
700        // sink should be replaced
701        let no_file_sink = if check_sink {
702            !matches!(
703                lp_arena.get(lp_top),
704                IR::Sink {
705                    payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
706                    ..
707                }
708            )
709        } else {
710            true
711        };
712        let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;
713
714        let state = ExecutionState::new();
715        Ok((state, physical_plan, no_file_sink))
716    }
717
718    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
719    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
720    where
721        P: FnOnce(
722            Node,
723            &mut Arena<IR>,
724            &mut Arena<AExpr>,
725            Option<std::time::Duration>,
726        ) -> PolarsResult<()>,
727    {
728        let (mut state, mut physical_plan, _) =
729            self.prepare_collect_post_opt(false, None, post_opt)?;
730        physical_plan.execute(&mut state)
731    }
732
733    #[allow(unused_mut)]
734    fn prepare_collect(
735        self,
736        check_sink: bool,
737        query_start: Option<std::time::Instant>,
738    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
739        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
740    }
741
742    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
743    /// `engine`.
744    ///
745    /// The query is optimized prior to execution.
746    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
747        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
748            payload.clone()
749        } else {
750            self.logical_plan = DslPlan::Sink {
751                input: Arc::new(self.logical_plan),
752                payload: SinkType::Memory,
753            };
754            SinkType::Memory
755        };
756
757        // Default engine for collect is InMemory, sink_* is Streaming
758        if engine == Engine::Auto {
759            engine = match payload {
760                SinkType::Memory => Engine::InMemory,
761                SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
762            };
763        }
764        // Gpu uses some hacks to dispatch.
765        if engine == Engine::Gpu {
766            engine = Engine::InMemory;
767        }
768
769        #[cfg(feature = "new_streaming")]
770        {
771            if let Some(result) = self.try_new_streaming_if_requested() {
772                return result.map(|v| v.unwrap());
773            }
774        }
775
776        match engine {
777            Engine::Auto => unreachable!(),
778            Engine::Streaming => {
779                feature_gated!("new_streaming", self = self.with_new_streaming(true))
780            },
781            Engine::OldStreaming => feature_gated!("streaming", self = self.with_streaming(true)),
782            _ => {},
783        }
784        let mut alp_plan = self.clone().to_alp_optimized()?;
785
786        match engine {
787            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
788                let string_cache_hold = StringCacheHolder::hold();
789                let result = polars_stream::run_query(
790                    alp_plan.lp_top,
791                    &mut alp_plan.lp_arena,
792                    &mut alp_plan.expr_arena,
793                );
794                drop(string_cache_hold);
795                result.map(|v| v.unwrap())
796            }),
797            _ if matches!(payload, SinkType::Partition { .. }) => Err(polars_err!(
798                InvalidOperation: "partition sinks are not supported on for the '{}' engine",
799                engine.into_static_str()
800            )),
801            Engine::Gpu => {
802                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
803            },
804            Engine::InMemory => {
805                let mut physical_plan = create_physical_plan(
806                    alp_plan.lp_top,
807                    &mut alp_plan.lp_arena,
808                    &mut alp_plan.expr_arena,
809                )?;
810                let mut state = ExecutionState::new();
811                physical_plan.execute(&mut state)
812            },
813            Engine::OldStreaming => {
814                self.opt_state |= OptFlags::STREAMING;
815                let (mut state, mut physical_plan, is_streaming) =
816                    self.prepare_collect(true, None)?;
817                polars_ensure!(
818                    is_streaming,
819                    ComputeError: format!("cannot run the whole query in a streaming order")
820                );
821                physical_plan.execute(&mut state)
822            },
823        }
824    }
825
826    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
827        let sink_multiple = LazyFrame {
828            logical_plan: DslPlan::SinkMultiple { inputs: plans },
829            opt_state,
830            cached_arena: Default::default(),
831        };
832        sink_multiple.explain(true)
833    }
834
835    pub fn collect_all_with_engine(
836        plans: Vec<DslPlan>,
837        mut engine: Engine,
838        opt_state: OptFlags,
839    ) -> PolarsResult<Vec<DataFrame>> {
840        if plans.is_empty() {
841            return Ok(Vec::new());
842        }
843
844        // Default engine for collect_all is InMemory
845        if engine == Engine::Auto {
846            engine = Engine::InMemory;
847        }
848        // Gpu uses some hacks to dispatch.
849        if engine == Engine::Gpu {
850            engine = Engine::InMemory;
851        }
852
853        let mut sink_multiple = LazyFrame {
854            logical_plan: DslPlan::SinkMultiple { inputs: plans },
855            opt_state,
856            cached_arena: Default::default(),
857        };
858
859        #[cfg(feature = "new_streaming")]
860        {
861            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
862                return result.map(|v| v.unwrap_err());
863            }
864        }
865
866        match engine {
867            Engine::Auto => unreachable!(),
868            Engine::Streaming => {
869                feature_gated!(
870                    "new_streaming",
871                    sink_multiple = sink_multiple.with_new_streaming(true)
872                )
873            },
874            Engine::OldStreaming => feature_gated!(
875                "streaming",
876                sink_multiple = sink_multiple.with_streaming(true)
877            ),
878            _ => {},
879        }
880        let mut alp_plan = sink_multiple.to_alp_optimized()?;
881
882        if engine == Engine::Streaming {
883            feature_gated!("new_streaming", {
884                let string_cache_hold = StringCacheHolder::hold();
885                let result = polars_stream::run_query(
886                    alp_plan.lp_top,
887                    &mut alp_plan.lp_arena,
888                    &mut alp_plan.expr_arena,
889                );
890                drop(string_cache_hold);
891                return result.map(|v| v.unwrap_err());
892            });
893        }
894
895        let IR::SinkMultiple { inputs } = alp_plan.root() else {
896            unreachable!()
897        };
898
899        let mut multiplan = create_multiple_physical_plans(
900            inputs.clone().as_slice(),
901            &mut alp_plan.lp_arena,
902            &mut alp_plan.expr_arena,
903        )?;
904
905        match engine {
906            Engine::Gpu => polars_bail!(
907                InvalidOperation: "collect_all is not supported for the gpu engine"
908            ),
909            Engine::InMemory => {
910                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
911                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
912                // within bounds
913                let mut state = ExecutionState::new();
914                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
915                    cache_prefiller.execute(&mut state)?;
916                }
917                let out = POOL.install(|| {
918                    multiplan
919                        .physical_plans
920                        .chunks_mut(POOL.current_num_threads() * 3)
921                        .map(|chunk| {
922                            chunk
923                                .into_par_iter()
924                                .enumerate()
925                                .map(|(idx, input)| {
926                                    let mut input = std::mem::take(input);
927                                    let mut state = state.split();
928                                    state.branch_idx += idx;
929
930                                    let df = input.execute(&mut state)?;
931                                    Ok(df)
932                                })
933                                .collect::<PolarsResult<Vec<_>>>()
934                        })
935                        .collect::<PolarsResult<Vec<_>>>()
936                });
937                Ok(out?.into_iter().flatten().collect())
938            },
939            Engine::OldStreaming => panic!("This is no longer supported"),
940            _ => unreachable!(),
941        }
942    }
943
944    /// Execute all the lazy operations and collect them into a [`DataFrame`].
945    ///
946    /// The query is optimized prior to execution.
947    ///
948    /// # Example
949    ///
950    /// ```rust
951    /// use polars_core::prelude::*;
952    /// use polars_lazy::prelude::*;
953    ///
954    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
955    ///     df.lazy()
956    ///       .group_by([col("foo")])
957    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
958    ///       .collect()
959    /// }
960    /// ```
961    pub fn collect(self) -> PolarsResult<DataFrame> {
962        self.collect_with_engine(Engine::InMemory)
963    }
964
965    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
966    // This version does profiling of the node execution.
967    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
968    where
969        P: FnOnce(
970            Node,
971            &mut Arena<IR>,
972            &mut Arena<AExpr>,
973            Option<std::time::Duration>,
974        ) -> PolarsResult<()>,
975    {
976        let query_start = std::time::Instant::now();
977        let (mut state, mut physical_plan, _) =
978            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
979        state.time_nodes(query_start);
980        let out = physical_plan.execute(&mut state)?;
981        let timer_df = state.finish_timer()?;
982        Ok((out, timer_df))
983    }
984
985    /// Profile a LazyFrame.
986    ///
987    /// This will run the query and return a tuple
988    /// containing the materialized DataFrame and a DataFrame that contains profiling information
989    /// of each node that is executed.
990    ///
991    /// The units of the timings are microseconds.
992    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
993        self._profile_post_opt(|_, _, _, _| Ok(()))
994    }
995
996    /// Stream a query result into a parquet file. This is useful if the final result doesn't fit
997    /// into memory. This methods will return an error if the query cannot be completely done in a
998    /// streaming fashion.
999    #[cfg(feature = "parquet")]
1000    pub fn sink_parquet(
1001        self,
1002        path: &dyn AsRef<Path>,
1003        options: ParquetWriteOptions,
1004        cloud_options: Option<polars_io::cloud::CloudOptions>,
1005        sink_options: SinkOptions,
1006    ) -> PolarsResult<Self> {
1007        self.sink(SinkType::File(FileSinkType {
1008            path: Arc::new(path.as_ref().to_path_buf()),
1009            sink_options,
1010            file_type: FileType::Parquet(options),
1011            cloud_options,
1012        }))
1013    }
1014
1015    /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
1016    /// into memory. This methods will return an error if the query cannot be completely done in a
1017    /// streaming fashion.
1018    #[cfg(feature = "ipc")]
1019    pub fn sink_ipc(
1020        self,
1021        path: impl AsRef<Path>,
1022        options: IpcWriterOptions,
1023        cloud_options: Option<polars_io::cloud::CloudOptions>,
1024        sink_options: SinkOptions,
1025    ) -> PolarsResult<Self> {
1026        self.sink(SinkType::File(FileSinkType {
1027            path: Arc::new(path.as_ref().to_path_buf()),
1028            sink_options,
1029            file_type: FileType::Ipc(options),
1030            cloud_options,
1031        }))
1032    }
1033
1034    /// Stream a query result into an csv file. This is useful if the final result doesn't fit
1035    /// into memory. This methods will return an error if the query cannot be completely done in a
1036    /// streaming fashion.
1037    #[cfg(feature = "csv")]
1038    pub fn sink_csv(
1039        self,
1040        path: impl AsRef<Path>,
1041        options: CsvWriterOptions,
1042        cloud_options: Option<polars_io::cloud::CloudOptions>,
1043        sink_options: SinkOptions,
1044    ) -> PolarsResult<Self> {
1045        self.sink(SinkType::File(FileSinkType {
1046            path: Arc::new(path.as_ref().to_path_buf()),
1047            sink_options,
1048            file_type: FileType::Csv(options),
1049            cloud_options,
1050        }))
1051    }
1052
1053    /// Stream a query result into a JSON file. This is useful if the final result doesn't fit
1054    /// into memory. This methods will return an error if the query cannot be completely done in a
1055    /// streaming fashion.
1056    #[cfg(feature = "json")]
1057    pub fn sink_json(
1058        self,
1059        path: impl AsRef<Path>,
1060        options: JsonWriterOptions,
1061        cloud_options: Option<polars_io::cloud::CloudOptions>,
1062        sink_options: SinkOptions,
1063    ) -> PolarsResult<Self> {
1064        self.sink(SinkType::File(FileSinkType {
1065            path: Arc::new(path.as_ref().to_path_buf()),
1066            sink_options,
1067            file_type: FileType::Json(options),
1068            cloud_options,
1069        }))
1070    }
1071
1072    /// Stream a query result into a parquet file in a partitioned manner. This is useful if the
1073    /// final result doesn't fit into memory. This methods will return an error if the query cannot
1074    /// be completely done in a streaming fashion.
1075    #[cfg(feature = "parquet")]
1076    pub fn sink_parquet_partitioned(
1077        self,
1078        path_f_string: impl AsRef<Path>,
1079        variant: PartitionVariant,
1080        options: ParquetWriteOptions,
1081        cloud_options: Option<polars_io::cloud::CloudOptions>,
1082        sink_options: SinkOptions,
1083    ) -> PolarsResult<Self> {
1084        self.sink(SinkType::Partition(PartitionSinkType {
1085            path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
1086            sink_options,
1087            variant,
1088            file_type: FileType::Parquet(options),
1089            cloud_options,
1090        }))
1091    }
1092
1093    /// Stream a query result into an ipc/arrow file in a partitioned manner. This is useful if the
1094    /// final result doesn't fit into memory. This methods will return an error if the query cannot
1095    /// be completely done in a streaming fashion.
1096    #[cfg(feature = "ipc")]
1097    pub fn sink_ipc_partitioned(
1098        self,
1099        path_f_string: impl AsRef<Path>,
1100        variant: PartitionVariant,
1101        options: IpcWriterOptions,
1102        cloud_options: Option<polars_io::cloud::CloudOptions>,
1103        sink_options: SinkOptions,
1104    ) -> PolarsResult<Self> {
1105        self.sink(SinkType::Partition(PartitionSinkType {
1106            path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
1107            sink_options,
1108            variant,
1109            file_type: FileType::Ipc(options),
1110            cloud_options,
1111        }))
1112    }
1113
1114    /// Stream a query result into an csv file in a partitioned manner. This is useful if the final
1115    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1116    /// completely done in a streaming fashion.
1117    #[cfg(feature = "csv")]
1118    pub fn sink_csv_partitioned(
1119        self,
1120        path_f_string: impl AsRef<Path>,
1121        variant: PartitionVariant,
1122        options: CsvWriterOptions,
1123        cloud_options: Option<polars_io::cloud::CloudOptions>,
1124        sink_options: SinkOptions,
1125    ) -> PolarsResult<Self> {
1126        self.sink(SinkType::Partition(PartitionSinkType {
1127            path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
1128            sink_options,
1129            variant,
1130            file_type: FileType::Csv(options),
1131            cloud_options,
1132        }))
1133    }
1134
1135    /// Stream a query result into a JSON file in a partitioned manner. This is useful if the final
1136    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1137    /// completely done in a streaming fashion.
1138    #[cfg(feature = "json")]
1139    pub fn sink_json_partitioned(
1140        self,
1141        path_f_string: impl AsRef<Path>,
1142        variant: PartitionVariant,
1143        options: JsonWriterOptions,
1144        cloud_options: Option<polars_io::cloud::CloudOptions>,
1145        sink_options: SinkOptions,
1146    ) -> PolarsResult<Self> {
1147        self.sink(SinkType::Partition(PartitionSinkType {
1148            path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
1149            sink_options,
1150            variant,
1151            file_type: FileType::Json(options),
1152            cloud_options,
1153        }))
1154    }
1155
1156    #[cfg(feature = "new_streaming")]
1157    pub fn try_new_streaming_if_requested(
1158        &mut self,
1159    ) -> Option<PolarsResult<Result<DataFrame, Vec<DataFrame>>>> {
1160        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1161        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1162
1163        if auto_new_streaming || force_new_streaming {
1164            // Try to run using the new streaming engine, falling back
1165            // if it fails in a todo!() error if auto_new_streaming is set.
1166            let mut new_stream_lazy = self.clone();
1167            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1168            new_stream_lazy.opt_state &= !OptFlags::STREAMING;
1169            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1170                Ok(v) => v,
1171                Err(e) => return Some(Err(e)),
1172            };
1173
1174            let _hold = StringCacheHolder::hold();
1175            let f = || {
1176                polars_stream::run_query(
1177                    alp_plan.lp_top,
1178                    &mut alp_plan.lp_arena,
1179                    &mut alp_plan.expr_arena,
1180                )
1181            };
1182
1183            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1184                Ok(v) => return Some(v),
1185                Err(e) => {
1186                    // Fallback to normal engine if error is due to not being implemented
1187                    // and auto_new_streaming is set, otherwise propagate error.
1188                    if !force_new_streaming
1189                        && auto_new_streaming
1190                        && e.downcast_ref::<&str>()
1191                            .map(|s| s.starts_with("not yet implemented"))
1192                            .unwrap_or(false)
1193                    {
1194                        if polars_core::config::verbose() {
1195                            eprintln!(
1196                                "caught unimplemented error in new streaming engine, falling back to normal engine"
1197                            );
1198                        }
1199                    } else {
1200                        std::panic::resume_unwind(e);
1201                    }
1202                },
1203            }
1204        }
1205
1206        None
1207    }
1208
1209    fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1210        polars_ensure!(
1211            !matches!(self.logical_plan, DslPlan::Sink { .. }),
1212            InvalidOperation: "cannot create a sink on top of another sink"
1213        );
1214        self.logical_plan = DslPlan::Sink {
1215            input: Arc::new(self.logical_plan),
1216            payload: payload.clone(),
1217        };
1218        Ok(self)
1219    }
1220
1221    /// Filter frame rows that match a predicate expression.
1222    ///
1223    /// The expression must yield boolean values (note that rows where the
1224    /// predicate resolves to `null` are *not* included in the resulting frame).
1225    ///
1226    /// # Example
1227    ///
1228    /// ```rust
1229    /// use polars_core::prelude::*;
1230    /// use polars_lazy::prelude::*;
1231    ///
1232    /// fn example(df: DataFrame) -> LazyFrame {
1233    ///       df.lazy()
1234    ///         .filter(col("sepal_width").is_not_null())
1235    ///         .select([col("sepal_width"), col("sepal_length")])
1236    /// }
1237    /// ```
1238    pub fn filter(self, predicate: Expr) -> Self {
1239        let opt_state = self.get_opt_state();
1240        let lp = self.get_plan_builder().filter(predicate).build();
1241        Self::from_logical_plan(lp, opt_state)
1242    }
1243
1244    /// Remove frame rows that match a predicate expression.
1245    ///
1246    /// The expression must yield boolean values (note that rows where the
1247    /// predicate resolves to `null` are *not* removed from the resulting frame).
1248    ///
1249    /// # Example
1250    ///
1251    /// ```rust
1252    /// use polars_core::prelude::*;
1253    /// use polars_lazy::prelude::*;
1254    ///
1255    /// fn example(df: DataFrame) -> LazyFrame {
1256    ///       df.lazy()
1257    ///         .remove(col("sepal_width").is_null())
1258    ///         .select([col("sepal_width"), col("sepal_length")])
1259    /// }
1260    /// ```
1261    pub fn remove(self, predicate: Expr) -> Self {
1262        self.filter(predicate.neq_missing(lit(true)))
1263    }
1264
1265    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1266    ///
1267    /// Columns can be selected with [`col`];
1268    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1269    ///
1270    /// # Example
1271    ///
1272    /// ```rust
1273    /// use polars_core::prelude::*;
1274    /// use polars_lazy::prelude::*;
1275    ///
1276    /// /// This function selects column "foo" and column "bar".
1277    /// /// Column "bar" is renamed to "ham".
1278    /// fn example(df: DataFrame) -> LazyFrame {
1279    ///       df.lazy()
1280    ///         .select([col("foo"),
1281    ///                   col("bar").alias("ham")])
1282    /// }
1283    ///
1284    /// /// This function selects all columns except "foo"
1285    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1286    ///       df.lazy()
1287    ///         .select([col(PlSmallStr::from_static("*")).exclude(["foo"])])
1288    /// }
1289    /// ```
1290    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1291        let exprs = exprs.as_ref().to_vec();
1292        self.select_impl(
1293            exprs,
1294            ProjectionOptions {
1295                run_parallel: true,
1296                duplicate_check: true,
1297                should_broadcast: true,
1298            },
1299        )
1300    }
1301
1302    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1303        let exprs = exprs.as_ref().to_vec();
1304        self.select_impl(
1305            exprs,
1306            ProjectionOptions {
1307                run_parallel: false,
1308                duplicate_check: true,
1309                should_broadcast: true,
1310            },
1311        )
1312    }
1313
1314    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1315        let opt_state = self.get_opt_state();
1316        let lp = self.get_plan_builder().project(exprs, options).build();
1317        Self::from_logical_plan(lp, opt_state)
1318    }
1319
1320    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1321    ///
1322    /// Takes a list of expressions to group on.
1323    ///
1324    /// # Example
1325    ///
1326    /// ```rust
1327    /// use polars_core::prelude::*;
1328    /// use polars_lazy::prelude::*;
1329    ///
1330    /// fn example(df: DataFrame) -> LazyFrame {
1331    ///       df.lazy()
1332    ///        .group_by([col("date")])
1333    ///        .agg([
1334    ///            col("rain").min().alias("min_rain"),
1335    ///            col("rain").sum().alias("sum_rain"),
1336    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1337    ///        ])
1338    /// }
1339    /// ```
1340    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1341        let keys = by
1342            .as_ref()
1343            .iter()
1344            .map(|e| e.clone().into())
1345            .collect::<Vec<_>>();
1346        let opt_state = self.get_opt_state();
1347
1348        #[cfg(feature = "dynamic_group_by")]
1349        {
1350            LazyGroupBy {
1351                logical_plan: self.logical_plan,
1352                opt_state,
1353                keys,
1354                maintain_order: false,
1355                dynamic_options: None,
1356                rolling_options: None,
1357            }
1358        }
1359
1360        #[cfg(not(feature = "dynamic_group_by"))]
1361        {
1362            LazyGroupBy {
1363                logical_plan: self.logical_plan,
1364                opt_state,
1365                keys,
1366                maintain_order: false,
1367            }
1368        }
1369    }
1370
1371    /// Create rolling groups based on a time column.
1372    ///
1373    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1374    ///
1375    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1376    /// individual values and are not of constant intervals. For constant intervals use
1377    /// *group_by_dynamic*
1378    #[cfg(feature = "dynamic_group_by")]
1379    pub fn rolling<E: AsRef<[Expr]>>(
1380        mut self,
1381        index_column: Expr,
1382        group_by: E,
1383        mut options: RollingGroupOptions,
1384    ) -> LazyGroupBy {
1385        if let Expr::Column(name) = index_column {
1386            options.index_column = name;
1387        } else {
1388            let output_field = index_column
1389                .to_field(&self.collect_schema().unwrap(), Context::Default)
1390                .unwrap();
1391            return self.with_column(index_column).rolling(
1392                Expr::Column(output_field.name().clone()),
1393                group_by,
1394                options,
1395            );
1396        }
1397        let opt_state = self.get_opt_state();
1398        LazyGroupBy {
1399            logical_plan: self.logical_plan,
1400            opt_state,
1401            keys: group_by.as_ref().to_vec(),
1402            maintain_order: true,
1403            dynamic_options: None,
1404            rolling_options: Some(options),
1405        }
1406    }
1407
1408    /// Group based on a time value (or index value of type Int32, Int64).
1409    ///
1410    /// Time windows are calculated and rows are assigned to windows. Different from a
1411    /// normal group_by is that a row can be member of multiple groups. The time/index
1412    /// window could be seen as a rolling window, with a window size determined by
1413    /// dates/times/values instead of slots in the DataFrame.
1414    ///
1415    /// A window is defined by:
1416    ///
1417    /// - every: interval of the window
1418    /// - period: length of the window
1419    /// - offset: offset of the window
1420    ///
1421    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1422    /// with a ordinary group_by on these keys.
1423    #[cfg(feature = "dynamic_group_by")]
1424    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1425        mut self,
1426        index_column: Expr,
1427        group_by: E,
1428        mut options: DynamicGroupOptions,
1429    ) -> LazyGroupBy {
1430        if let Expr::Column(name) = index_column {
1431            options.index_column = name;
1432        } else {
1433            let output_field = index_column
1434                .to_field(&self.collect_schema().unwrap(), Context::Default)
1435                .unwrap();
1436            return self.with_column(index_column).group_by_dynamic(
1437                Expr::Column(output_field.name().clone()),
1438                group_by,
1439                options,
1440            );
1441        }
1442        let opt_state = self.get_opt_state();
1443        LazyGroupBy {
1444            logical_plan: self.logical_plan,
1445            opt_state,
1446            keys: group_by.as_ref().to_vec(),
1447            maintain_order: true,
1448            dynamic_options: Some(options),
1449            rolling_options: None,
1450        }
1451    }
1452
1453    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1454    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1455        let keys = by
1456            .as_ref()
1457            .iter()
1458            .map(|e| e.clone().into())
1459            .collect::<Vec<_>>();
1460        let opt_state = self.get_opt_state();
1461
1462        #[cfg(feature = "dynamic_group_by")]
1463        {
1464            LazyGroupBy {
1465                logical_plan: self.logical_plan,
1466                opt_state,
1467                keys,
1468                maintain_order: true,
1469                dynamic_options: None,
1470                rolling_options: None,
1471            }
1472        }
1473
1474        #[cfg(not(feature = "dynamic_group_by"))]
1475        {
1476            LazyGroupBy {
1477                logical_plan: self.logical_plan,
1478                opt_state,
1479                keys,
1480                maintain_order: true,
1481            }
1482        }
1483    }
1484
1485    /// Left anti join this query with another lazy query.
1486    ///
1487    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1488    /// flexible join logic, see [`join`](LazyFrame::join) or
1489    /// [`join_builder`](LazyFrame::join_builder).
1490    ///
1491    /// # Example
1492    ///
1493    /// ```rust
1494    /// use polars_core::prelude::*;
1495    /// use polars_lazy::prelude::*;
1496    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1497    ///         ldf
1498    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1499    /// }
1500    /// ```
1501    #[cfg(feature = "semi_anti_join")]
1502    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1503        self.join(
1504            other,
1505            [left_on.into()],
1506            [right_on.into()],
1507            JoinArgs::new(JoinType::Anti),
1508        )
1509    }
1510
1511    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1512    #[cfg(feature = "cross_join")]
1513    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1514        self.join(
1515            other,
1516            vec![],
1517            vec![],
1518            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1519        )
1520    }
1521
1522    /// Left outer join this query with another lazy query.
1523    ///
1524    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1525    /// flexible join logic, see [`join`](LazyFrame::join) or
1526    /// [`join_builder`](LazyFrame::join_builder).
1527    ///
1528    /// # Example
1529    ///
1530    /// ```rust
1531    /// use polars_core::prelude::*;
1532    /// use polars_lazy::prelude::*;
1533    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1534    ///         ldf
1535    ///         .left_join(other, col("foo"), col("bar"))
1536    /// }
1537    /// ```
1538    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1539        self.join(
1540            other,
1541            [left_on.into()],
1542            [right_on.into()],
1543            JoinArgs::new(JoinType::Left),
1544        )
1545    }
1546
1547    /// Inner join this query with another lazy query.
1548    ///
1549    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1550    /// flexible join logic, see [`join`](LazyFrame::join) or
1551    /// [`join_builder`](LazyFrame::join_builder).
1552    ///
1553    /// # Example
1554    ///
1555    /// ```rust
1556    /// use polars_core::prelude::*;
1557    /// use polars_lazy::prelude::*;
1558    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1559    ///         ldf
1560    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1561    /// }
1562    /// ```
1563    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1564        self.join(
1565            other,
1566            [left_on.into()],
1567            [right_on.into()],
1568            JoinArgs::new(JoinType::Inner),
1569        )
1570    }
1571
1572    /// Full outer join this query with another lazy query.
1573    ///
1574    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1575    /// flexible join logic, see [`join`](LazyFrame::join) or
1576    /// [`join_builder`](LazyFrame::join_builder).
1577    ///
1578    /// # Example
1579    ///
1580    /// ```rust
1581    /// use polars_core::prelude::*;
1582    /// use polars_lazy::prelude::*;
1583    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1584    ///         ldf
1585    ///         .full_join(other, col("foo"), col("bar"))
1586    /// }
1587    /// ```
1588    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1589        self.join(
1590            other,
1591            [left_on.into()],
1592            [right_on.into()],
1593            JoinArgs::new(JoinType::Full),
1594        )
1595    }
1596
1597    /// Left semi join this query with another lazy query.
1598    ///
1599    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1600    /// flexible join logic, see [`join`](LazyFrame::join) or
1601    /// [`join_builder`](LazyFrame::join_builder).
1602    ///
1603    /// # Example
1604    ///
1605    /// ```rust
1606    /// use polars_core::prelude::*;
1607    /// use polars_lazy::prelude::*;
1608    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1609    ///         ldf
1610    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1611    /// }
1612    /// ```
1613    #[cfg(feature = "semi_anti_join")]
1614    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1615        self.join(
1616            other,
1617            [left_on.into()],
1618            [right_on.into()],
1619            JoinArgs::new(JoinType::Semi),
1620        )
1621    }
1622
1623    /// Generic function to join two LazyFrames.
1624    ///
1625    /// `join` can join on multiple columns, given as two list of expressions, and with a
1626    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1627    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1628    /// over how columns are renamed and parallelization options, use
1629    /// [`join_builder`](LazyFrame::join_builder).
1630    ///
1631    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1632    ///
1633    /// # Example
1634    ///
1635    /// ```rust
1636    /// use polars_core::prelude::*;
1637    /// use polars_lazy::prelude::*;
1638    ///
1639    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1640    ///         ldf
1641    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1642    /// }
1643    /// ```
1644    pub fn join<E: AsRef<[Expr]>>(
1645        self,
1646        other: LazyFrame,
1647        left_on: E,
1648        right_on: E,
1649        args: JoinArgs,
1650    ) -> LazyFrame {
1651        let left_on = left_on.as_ref().to_vec();
1652        let right_on = right_on.as_ref().to_vec();
1653
1654        self._join_impl(other, left_on, right_on, args)
1655    }
1656
1657    fn _join_impl(
1658        self,
1659        other: LazyFrame,
1660        left_on: Vec<Expr>,
1661        right_on: Vec<Expr>,
1662        args: JoinArgs,
1663    ) -> LazyFrame {
1664        let JoinArgs {
1665            how,
1666            validation,
1667            suffix,
1668            slice,
1669            nulls_equal,
1670            coalesce,
1671            maintain_order,
1672        } = args;
1673
1674        if slice.is_some() {
1675            panic!("impl error: slice is not handled")
1676        }
1677
1678        let mut builder = self
1679            .join_builder()
1680            .with(other)
1681            .left_on(left_on)
1682            .right_on(right_on)
1683            .how(how)
1684            .validate(validation)
1685            .join_nulls(nulls_equal)
1686            .coalesce(coalesce)
1687            .maintain_order(maintain_order);
1688
1689        if let Some(suffix) = suffix {
1690            builder = builder.suffix(suffix);
1691        }
1692
1693        // Note: args.slice is set by the optimizer
1694        builder.finish()
1695    }
1696
1697    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1698    ///
1699    /// After the `JoinBuilder` has been created and set up, calling
1700    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1701    /// representing the `join` operation.
1702    pub fn join_builder(self) -> JoinBuilder {
1703        JoinBuilder::new(self)
1704    }
1705
1706    /// Add or replace a column, given as an expression, to a DataFrame.
1707    ///
1708    /// # Example
1709    ///
1710    /// ```rust
1711    /// use polars_core::prelude::*;
1712    /// use polars_lazy::prelude::*;
1713    /// fn add_column(df: DataFrame) -> LazyFrame {
1714    ///     df.lazy()
1715    ///         .with_column(
1716    ///             when(col("sepal_length").lt(lit(5.0)))
1717    ///             .then(lit(10))
1718    ///             .otherwise(lit(1))
1719    ///             .alias("new_column_name"),
1720    ///         )
1721    /// }
1722    /// ```
1723    pub fn with_column(self, expr: Expr) -> LazyFrame {
1724        let opt_state = self.get_opt_state();
1725        let lp = self
1726            .get_plan_builder()
1727            .with_columns(
1728                vec![expr],
1729                ProjectionOptions {
1730                    run_parallel: false,
1731                    duplicate_check: true,
1732                    should_broadcast: true,
1733                },
1734            )
1735            .build();
1736        Self::from_logical_plan(lp, opt_state)
1737    }
1738
1739    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1740    ///
1741    /// # Example
1742    ///
1743    /// ```rust
1744    /// use polars_core::prelude::*;
1745    /// use polars_lazy::prelude::*;
1746    /// fn add_columns(df: DataFrame) -> LazyFrame {
1747    ///     df.lazy()
1748    ///         .with_columns(
1749    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1750    ///          )
1751    /// }
1752    /// ```
1753    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1754        let exprs = exprs.as_ref().to_vec();
1755        self.with_columns_impl(
1756            exprs,
1757            ProjectionOptions {
1758                run_parallel: true,
1759                duplicate_check: true,
1760                should_broadcast: true,
1761            },
1762        )
1763    }
1764
1765    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1766    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1767        let exprs = exprs.as_ref().to_vec();
1768        self.with_columns_impl(
1769            exprs,
1770            ProjectionOptions {
1771                run_parallel: false,
1772                duplicate_check: true,
1773                should_broadcast: true,
1774            },
1775        )
1776    }
1777
1778    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1779        let opt_state = self.get_opt_state();
1780        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1781        Self::from_logical_plan(lp, opt_state)
1782    }
1783
1784    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1785        let contexts = contexts
1786            .as_ref()
1787            .iter()
1788            .map(|lf| lf.logical_plan.clone())
1789            .collect();
1790        let opt_state = self.get_opt_state();
1791        let lp = self.get_plan_builder().with_context(contexts).build();
1792        Self::from_logical_plan(lp, opt_state)
1793    }
1794
1795    /// Aggregate all the columns as their maximum values.
1796    ///
1797    /// Aggregated columns will have the same names as the original columns.
1798    pub fn max(self) -> Self {
1799        self.map_private(DslFunction::Stats(StatsFunction::Max))
1800    }
1801
1802    /// Aggregate all the columns as their minimum values.
1803    ///
1804    /// Aggregated columns will have the same names as the original columns.
1805    pub fn min(self) -> Self {
1806        self.map_private(DslFunction::Stats(StatsFunction::Min))
1807    }
1808
1809    /// Aggregate all the columns as their sum values.
1810    ///
1811    /// Aggregated columns will have the same names as the original columns.
1812    ///
1813    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1814    /// - For integer columns, the ordinary checks for overflow are performed:
1815    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1816    ///   silently wrap.
1817    /// - String columns will sum to None.
1818    pub fn sum(self) -> Self {
1819        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1820    }
1821
1822    /// Aggregate all the columns as their mean values.
1823    ///
1824    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1825    /// - String columns will have a mean of None.
1826    pub fn mean(self) -> Self {
1827        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1828    }
1829
1830    /// Aggregate all the columns as their median values.
1831    ///
1832    /// - Boolean and integer results are converted to `f64`. However, they are still
1833    ///   susceptible to overflow before this conversion occurs.
1834    /// - String columns will sum to None.
1835    pub fn median(self) -> Self {
1836        self.map_private(DslFunction::Stats(StatsFunction::Median))
1837    }
1838
1839    /// Aggregate all the columns as their quantile values.
1840    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1841        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1842            quantile,
1843            method,
1844        }))
1845    }
1846
1847    /// Aggregate all the columns as their standard deviation values.
1848    ///
1849    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1850    /// computing the variance, where `N` is the number of rows.
1851    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1852    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1853    /// > likelihood estimate of the variance for normally distributed variables. The
1854    /// > standard deviation computed in this function is the square root of the estimated
1855    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1856    /// > standard deviation per se.
1857    ///
1858    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1859    pub fn std(self, ddof: u8) -> Self {
1860        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1861    }
1862
1863    /// Aggregate all the columns as their variance values.
1864    ///
1865    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1866    /// computing the variance, where `N` is the number of rows.
1867    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1868    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1869    /// > likelihood estimate of the variance for normally distributed variables.
1870    ///
1871    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1872    pub fn var(self, ddof: u8) -> Self {
1873        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1874    }
1875
1876    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1877    pub fn explode<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(self, columns: E) -> LazyFrame {
1878        self.explode_impl(columns, false)
1879    }
1880
1881    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1882    fn explode_impl<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(
1883        self,
1884        columns: E,
1885        allow_empty: bool,
1886    ) -> LazyFrame {
1887        let columns = columns
1888            .as_ref()
1889            .iter()
1890            .map(|e| e.clone().into())
1891            .collect::<Vec<_>>();
1892        let opt_state = self.get_opt_state();
1893        let lp = self
1894            .get_plan_builder()
1895            .explode(columns, allow_empty)
1896            .build();
1897        Self::from_logical_plan(lp, opt_state)
1898    }
1899
1900    /// Aggregate all the columns as the sum of their null value count.
1901    pub fn null_count(self) -> LazyFrame {
1902        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1903    }
1904
1905    /// Drop non-unique rows and maintain the order of kept rows.
1906    ///
1907    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1908    /// `None`, all columns are considered.
1909    pub fn unique_stable(
1910        self,
1911        subset: Option<Vec<PlSmallStr>>,
1912        keep_strategy: UniqueKeepStrategy,
1913    ) -> LazyFrame {
1914        self.unique_stable_generic(subset, keep_strategy)
1915    }
1916
1917    pub fn unique_stable_generic<E, IE>(
1918        self,
1919        subset: Option<E>,
1920        keep_strategy: UniqueKeepStrategy,
1921    ) -> LazyFrame
1922    where
1923        E: AsRef<[IE]>,
1924        IE: Into<Selector> + Clone,
1925    {
1926        let subset = subset.map(|s| {
1927            s.as_ref()
1928                .iter()
1929                .map(|e| e.clone().into())
1930                .collect::<Vec<_>>()
1931        });
1932
1933        let opt_state = self.get_opt_state();
1934        let options = DistinctOptionsDSL {
1935            subset,
1936            maintain_order: true,
1937            keep_strategy,
1938        };
1939        let lp = self.get_plan_builder().distinct(options).build();
1940        Self::from_logical_plan(lp, opt_state)
1941    }
1942
1943    /// Drop non-unique rows without maintaining the order of kept rows.
1944    ///
1945    /// The order of the kept rows may change; to maintain the original row order, use
1946    /// [`unique_stable`](LazyFrame::unique_stable).
1947    ///
1948    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1949    /// all columns are considered.
1950    pub fn unique(
1951        self,
1952        subset: Option<Vec<String>>,
1953        keep_strategy: UniqueKeepStrategy,
1954    ) -> LazyFrame {
1955        self.unique_generic(subset, keep_strategy)
1956    }
1957
1958    pub fn unique_generic<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(
1959        self,
1960        subset: Option<E>,
1961        keep_strategy: UniqueKeepStrategy,
1962    ) -> LazyFrame {
1963        let subset = subset.map(|s| {
1964            s.as_ref()
1965                .iter()
1966                .map(|e| e.clone().into())
1967                .collect::<Vec<_>>()
1968        });
1969        let opt_state = self.get_opt_state();
1970        let options = DistinctOptionsDSL {
1971            subset,
1972            maintain_order: false,
1973            keep_strategy,
1974        };
1975        let lp = self.get_plan_builder().distinct(options).build();
1976        Self::from_logical_plan(lp, opt_state)
1977    }
1978
1979    /// Drop rows containing one or more NaN values.
1980    ///
1981    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1982    /// floating point columns are considered.
1983    pub fn drop_nans(self, subset: Option<Vec<Expr>>) -> LazyFrame {
1984        let opt_state = self.get_opt_state();
1985        let lp = self.get_plan_builder().drop_nans(subset).build();
1986        Self::from_logical_plan(lp, opt_state)
1987    }
1988
1989    /// Drop rows containing one or more None values.
1990    ///
1991    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1992    /// columns are considered.
1993    pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> LazyFrame {
1994        let opt_state = self.get_opt_state();
1995        let lp = self.get_plan_builder().drop_nulls(subset).build();
1996        Self::from_logical_plan(lp, opt_state)
1997    }
1998
1999    /// Slice the DataFrame using an offset (starting row) and a length.
2000    ///
2001    /// If `offset` is negative, it is counted from the end of the DataFrame. For
2002    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
2003    /// end.
2004    ///
2005    /// If `offset` and `len` are such that the slice extends beyond the end of the
2006    /// DataFrame, the portion between `offset` and the end will be returned. In this
2007    /// case, the number of rows in the returned DataFrame will be less than `len`.
2008    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
2009        let opt_state = self.get_opt_state();
2010        let lp = self.get_plan_builder().slice(offset, len).build();
2011        Self::from_logical_plan(lp, opt_state)
2012    }
2013
2014    /// Get the first row.
2015    ///
2016    /// Equivalent to `self.slice(0, 1)`.
2017    pub fn first(self) -> LazyFrame {
2018        self.slice(0, 1)
2019    }
2020
2021    /// Get the last row.
2022    ///
2023    /// Equivalent to `self.slice(-1, 1)`.
2024    pub fn last(self) -> LazyFrame {
2025        self.slice(-1, 1)
2026    }
2027
2028    /// Get the last `n` rows.
2029    ///
2030    /// Equivalent to `self.slice(-(n as i64), n)`.
2031    pub fn tail(self, n: IdxSize) -> LazyFrame {
2032        let neg_tail = -(n as i64);
2033        self.slice(neg_tail, n)
2034    }
2035
2036    /// Unpivot the DataFrame from wide to long format.
2037    ///
2038    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
2039    #[cfg(feature = "pivot")]
2040    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
2041        let opt_state = self.get_opt_state();
2042        let lp = self.get_plan_builder().unpivot(args).build();
2043        Self::from_logical_plan(lp, opt_state)
2044    }
2045
2046    /// Limit the DataFrame to the first `n` rows.
2047    ///
2048    /// Note if you don't want the rows to be scanned, use [`fetch`](LazyFrame::fetch).
2049    pub fn limit(self, n: IdxSize) -> LazyFrame {
2050        self.slice(0, n)
2051    }
2052
2053    /// Apply a function/closure once the logical plan get executed.
2054    ///
2055    /// The function has access to the whole materialized DataFrame at the time it is
2056    /// called.
2057    ///
2058    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
2059    /// with `LazyFrame::with_column` or `with_columns`.
2060    ///
2061    /// ## Warning
2062    /// This can blow up in your face if the schema is changed due to the operation. The
2063    /// optimizer relies on a correct schema.
2064    ///
2065    /// You can toggle certain optimizations off.
2066    pub fn map<F>(
2067        self,
2068        function: F,
2069        optimizations: AllowedOptimizations,
2070        schema: Option<Arc<dyn UdfSchema>>,
2071        name: Option<&'static str>,
2072    ) -> LazyFrame
2073    where
2074        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
2075    {
2076        let opt_state = self.get_opt_state();
2077        let lp = self
2078            .get_plan_builder()
2079            .map(
2080                function,
2081                optimizations,
2082                schema,
2083                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
2084            )
2085            .build();
2086        Self::from_logical_plan(lp, opt_state)
2087    }
2088
2089    #[cfg(feature = "python")]
2090    pub fn map_python(
2091        self,
2092        function: polars_utils::python_function::PythonFunction,
2093        optimizations: AllowedOptimizations,
2094        schema: Option<SchemaRef>,
2095        validate_output: bool,
2096    ) -> LazyFrame {
2097        let opt_state = self.get_opt_state();
2098        let lp = self
2099            .get_plan_builder()
2100            .map_python(function, optimizations, schema, validate_output)
2101            .build();
2102        Self::from_logical_plan(lp, opt_state)
2103    }
2104
2105    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2106        let opt_state = self.get_opt_state();
2107        let lp = self.get_plan_builder().map_private(function).build();
2108        Self::from_logical_plan(lp, opt_state)
2109    }
2110
2111    /// Add a new column at index 0 that counts the rows.
2112    ///
2113    /// `name` is the name of the new column. `offset` is where to start counting from; if
2114    /// `None`, it is set to `0`.
2115    ///
2116    /// # Warning
2117    /// This can have a negative effect on query performance. This may for instance block
2118    /// predicate pushdown optimization.
2119    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2120    where
2121        S: Into<PlSmallStr>,
2122    {
2123        let name = name.into();
2124
2125        match &self.logical_plan {
2126            v @ DslPlan::Scan { scan_type, .. }
2127                if !matches!(&**scan_type, FileScan::Anonymous { .. }) =>
2128            {
2129                let DslPlan::Scan {
2130                    sources,
2131                    mut file_options,
2132                    scan_type,
2133                    file_info,
2134                    cached_ir: _,
2135                } = v.clone()
2136                else {
2137                    unreachable!()
2138                };
2139
2140                file_options.row_index = Some(RowIndex {
2141                    name,
2142                    offset: offset.unwrap_or(0),
2143                });
2144
2145                DslPlan::Scan {
2146                    sources,
2147                    file_options,
2148                    scan_type,
2149                    file_info,
2150                    cached_ir: Default::default(),
2151                }
2152                .into()
2153            },
2154            _ => self.map_private(DslFunction::RowIndex { name, offset }),
2155        }
2156    }
2157
2158    /// Return the number of non-null elements for each column.
2159    pub fn count(self) -> LazyFrame {
2160        self.select(vec![col(PlSmallStr::from_static("*")).count()])
2161    }
2162
2163    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2164    /// inserted as columns.
2165    #[cfg(feature = "dtype-struct")]
2166    pub fn unnest<E, IE>(self, cols: E) -> Self
2167    where
2168        E: AsRef<[IE]>,
2169        IE: Into<Selector> + Clone,
2170    {
2171        let cols = cols
2172            .as_ref()
2173            .iter()
2174            .map(|ie| ie.clone().into())
2175            .collect::<Vec<_>>();
2176        self.map_private(DslFunction::Unnest(cols))
2177    }
2178
2179    #[cfg(feature = "merge_sorted")]
2180    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2181    where
2182        S: Into<PlSmallStr>,
2183    {
2184        let key = key.into();
2185
2186        let lp = DslPlan::MergeSorted {
2187            input_left: Arc::new(self.logical_plan),
2188            input_right: Arc::new(other.logical_plan),
2189            key,
2190        };
2191        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2192    }
2193}
2194
2195/// Utility struct for lazy group_by operation.
2196#[derive(Clone)]
2197pub struct LazyGroupBy {
2198    pub logical_plan: DslPlan,
2199    opt_state: OptFlags,
2200    keys: Vec<Expr>,
2201    maintain_order: bool,
2202    #[cfg(feature = "dynamic_group_by")]
2203    dynamic_options: Option<DynamicGroupOptions>,
2204    #[cfg(feature = "dynamic_group_by")]
2205    rolling_options: Option<RollingGroupOptions>,
2206}
2207
2208impl From<LazyGroupBy> for LazyFrame {
2209    fn from(lgb: LazyGroupBy) -> Self {
2210        Self {
2211            logical_plan: lgb.logical_plan,
2212            opt_state: lgb.opt_state,
2213            cached_arena: Default::default(),
2214        }
2215    }
2216}
2217
2218impl LazyGroupBy {
2219    /// Group by and aggregate.
2220    ///
2221    /// Select a column with [col] and choose an aggregation.
2222    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2223    ///
2224    /// # Example
2225    ///
2226    /// ```rust
2227    /// use polars_core::prelude::*;
2228    /// use polars_lazy::prelude::*;
2229    ///
2230    /// fn example(df: DataFrame) -> LazyFrame {
2231    ///       df.lazy()
2232    ///        .group_by_stable([col("date")])
2233    ///        .agg([
2234    ///            col("rain").min().alias("min_rain"),
2235    ///            col("rain").sum().alias("sum_rain"),
2236    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2237    ///        ])
2238    /// }
2239    /// ```
2240    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2241        #[cfg(feature = "dynamic_group_by")]
2242        let lp = DslBuilder::from(self.logical_plan)
2243            .group_by(
2244                self.keys,
2245                aggs,
2246                None,
2247                self.maintain_order,
2248                self.dynamic_options,
2249                self.rolling_options,
2250            )
2251            .build();
2252
2253        #[cfg(not(feature = "dynamic_group_by"))]
2254        let lp = DslBuilder::from(self.logical_plan)
2255            .group_by(self.keys, aggs, None, self.maintain_order)
2256            .build();
2257        LazyFrame::from_logical_plan(lp, self.opt_state)
2258    }
2259
2260    /// Return first n rows of each group
2261    pub fn head(self, n: Option<usize>) -> LazyFrame {
2262        let keys = self
2263            .keys
2264            .iter()
2265            .filter_map(|expr| expr_output_name(expr).ok())
2266            .collect::<Vec<_>>();
2267
2268        self.agg([col(PlSmallStr::from_static("*"))
2269            .exclude(keys.iter().cloned())
2270            .head(n)])
2271            .explode_impl(
2272                [col(PlSmallStr::from_static("*")).exclude(keys.iter().cloned())],
2273                true,
2274            )
2275    }
2276
2277    /// Return last n rows of each group
2278    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2279        let keys = self
2280            .keys
2281            .iter()
2282            .filter_map(|expr| expr_output_name(expr).ok())
2283            .collect::<Vec<_>>();
2284
2285        self.agg([col(PlSmallStr::from_static("*"))
2286            .exclude(keys.iter().cloned())
2287            .tail(n)])
2288            .explode_impl(
2289                [col(PlSmallStr::from_static("*")).exclude(keys.iter().cloned())],
2290                true,
2291            )
2292    }
2293
2294    /// Apply a function over the groups as a new DataFrame.
2295    ///
2296    /// **It is not recommended that you use this as materializing the DataFrame is very
2297    /// expensive.**
2298    pub fn apply<F>(self, f: F, schema: SchemaRef) -> LazyFrame
2299    where
2300        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
2301    {
2302        #[cfg(feature = "dynamic_group_by")]
2303        let options = GroupbyOptions {
2304            dynamic: self.dynamic_options,
2305            rolling: self.rolling_options,
2306            slice: None,
2307        };
2308
2309        #[cfg(not(feature = "dynamic_group_by"))]
2310        let options = GroupbyOptions { slice: None };
2311
2312        let lp = DslPlan::GroupBy {
2313            input: Arc::new(self.logical_plan),
2314            keys: self.keys,
2315            aggs: vec![],
2316            apply: Some((Arc::new(f), schema)),
2317            maintain_order: self.maintain_order,
2318            options: Arc::new(options),
2319        };
2320        LazyFrame::from_logical_plan(lp, self.opt_state)
2321    }
2322}
2323
2324#[must_use]
2325pub struct JoinBuilder {
2326    lf: LazyFrame,
2327    how: JoinType,
2328    other: Option<LazyFrame>,
2329    left_on: Vec<Expr>,
2330    right_on: Vec<Expr>,
2331    allow_parallel: bool,
2332    force_parallel: bool,
2333    suffix: Option<PlSmallStr>,
2334    validation: JoinValidation,
2335    nulls_equal: bool,
2336    coalesce: JoinCoalesce,
2337    maintain_order: MaintainOrderJoin,
2338}
2339impl JoinBuilder {
2340    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2341    pub fn new(lf: LazyFrame) -> Self {
2342        Self {
2343            lf,
2344            other: None,
2345            how: JoinType::Inner,
2346            left_on: vec![],
2347            right_on: vec![],
2348            allow_parallel: true,
2349            force_parallel: false,
2350            suffix: None,
2351            validation: Default::default(),
2352            nulls_equal: false,
2353            coalesce: Default::default(),
2354            maintain_order: Default::default(),
2355        }
2356    }
2357
2358    /// The right table in the join.
2359    pub fn with(mut self, other: LazyFrame) -> Self {
2360        self.other = Some(other);
2361        self
2362    }
2363
2364    /// Select the join type.
2365    pub fn how(mut self, how: JoinType) -> Self {
2366        self.how = how;
2367        self
2368    }
2369
2370    pub fn validate(mut self, validation: JoinValidation) -> Self {
2371        self.validation = validation;
2372        self
2373    }
2374
2375    /// The expressions you want to join both tables on.
2376    ///
2377    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2378    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2379        let on = on.as_ref().to_vec();
2380        self.left_on.clone_from(&on);
2381        self.right_on = on;
2382        self
2383    }
2384
2385    /// The expressions you want to join the left table on.
2386    ///
2387    /// The passed expressions must be valid in the left table.
2388    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2389        self.left_on = on.as_ref().to_vec();
2390        self
2391    }
2392
2393    /// The expressions you want to join the right table on.
2394    ///
2395    /// The passed expressions must be valid in the right table.
2396    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2397        self.right_on = on.as_ref().to_vec();
2398        self
2399    }
2400
2401    /// Allow parallel table evaluation.
2402    pub fn allow_parallel(mut self, allow: bool) -> Self {
2403        self.allow_parallel = allow;
2404        self
2405    }
2406
2407    /// Force parallel table evaluation.
2408    pub fn force_parallel(mut self, force: bool) -> Self {
2409        self.force_parallel = force;
2410        self
2411    }
2412
2413    /// Join on null values. By default null values will never produce matches.
2414    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2415        self.nulls_equal = nulls_equal;
2416        self
2417    }
2418
2419    /// Suffix to add duplicate column names in join.
2420    /// Defaults to `"_right"` if this method is never called.
2421    pub fn suffix<S>(mut self, suffix: S) -> Self
2422    where
2423        S: Into<PlSmallStr>,
2424    {
2425        self.suffix = Some(suffix.into());
2426        self
2427    }
2428
2429    /// Whether to coalesce join columns.
2430    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2431        self.coalesce = coalesce;
2432        self
2433    }
2434
2435    /// Whether to preserve the row order.
2436    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2437        self.maintain_order = maintain_order;
2438        self
2439    }
2440
2441    /// Finish builder
2442    pub fn finish(self) -> LazyFrame {
2443        let opt_state = self.lf.opt_state;
2444        let other = self.other.expect("'with' not set in join builder");
2445
2446        let args = JoinArgs {
2447            how: self.how,
2448            validation: self.validation,
2449            suffix: self.suffix,
2450            slice: None,
2451            nulls_equal: self.nulls_equal,
2452            coalesce: self.coalesce,
2453            maintain_order: self.maintain_order,
2454        };
2455
2456        let lp = self
2457            .lf
2458            .get_plan_builder()
2459            .join(
2460                other.logical_plan,
2461                self.left_on,
2462                self.right_on,
2463                JoinOptions {
2464                    allow_parallel: self.allow_parallel,
2465                    force_parallel: self.force_parallel,
2466                    args,
2467                    ..Default::default()
2468                }
2469                .into(),
2470            )
2471            .build();
2472        LazyFrame::from_logical_plan(lp, opt_state)
2473    }
2474
2475    // Finish with join predicates
2476    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2477        let opt_state = self.lf.opt_state;
2478        let other = self.other.expect("with not set");
2479
2480        // Decompose `And` conjunctions into their component expressions
2481        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2482            if let Expr::BinaryExpr {
2483                op: Operator::And,
2484                left,
2485                right,
2486            } = predicate
2487            {
2488                decompose_and((*left).clone(), expanded_predicates);
2489                decompose_and((*right).clone(), expanded_predicates);
2490            } else {
2491                expanded_predicates.push(predicate);
2492            }
2493        }
2494        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2495        for predicate in predicates {
2496            decompose_and(predicate, &mut expanded_predicates);
2497        }
2498        let predicates: Vec<Expr> = expanded_predicates;
2499
2500        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2501        #[cfg(feature = "is_between")]
2502        let predicates: Vec<Expr> = {
2503            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2504            for predicate in predicates {
2505                if let Expr::Function {
2506                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2507                    input,
2508                    ..
2509                } = &predicate
2510                {
2511                    if let [expr, lower, upper] = input.as_slice() {
2512                        match closed {
2513                            ClosedInterval::Both => {
2514                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2515                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2516                            },
2517                            ClosedInterval::Right => {
2518                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2519                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2520                            },
2521                            ClosedInterval::Left => {
2522                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2523                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2524                            },
2525                            ClosedInterval::None => {
2526                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2527                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2528                            },
2529                        }
2530                        continue;
2531                    }
2532                }
2533                expanded_predicates.push(predicate);
2534            }
2535            expanded_predicates
2536        };
2537
2538        let args = JoinArgs {
2539            how: self.how,
2540            validation: self.validation,
2541            suffix: self.suffix,
2542            slice: None,
2543            nulls_equal: self.nulls_equal,
2544            coalesce: self.coalesce,
2545            maintain_order: self.maintain_order,
2546        };
2547        let options = JoinOptions {
2548            allow_parallel: self.allow_parallel,
2549            force_parallel: self.force_parallel,
2550            args,
2551            ..Default::default()
2552        };
2553
2554        let lp = DslPlan::Join {
2555            input_left: Arc::new(self.lf.logical_plan),
2556            input_right: Arc::new(other.logical_plan),
2557            left_on: Default::default(),
2558            right_on: Default::default(),
2559            predicates,
2560            options: Arc::from(options),
2561        };
2562
2563        LazyFrame::from_logical_plan(lp, opt_state)
2564    }
2565}