polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "ipc")]
21pub use ipc::*;
22#[cfg(feature = "json")]
23pub use ndjson::*;
24#[cfg(feature = "parquet")]
25pub use parquet::*;
26use polars_compute::rolling::QuantileMethod;
27use polars_core::POOL;
28use polars_core::error::feature_gated;
29use polars_core::prelude::*;
30use polars_expr::{ExpressionConversionState, create_physical_expr};
31use polars_io::RowIndex;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38use polars_utils::plpath::PlPath;
39use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
40
41use crate::frame::cached_arenas::CachedArena;
42use crate::prelude::*;
43
44pub trait IntoLazy {
45    fn lazy(self) -> LazyFrame;
46}
47
48impl IntoLazy for DataFrame {
49    /// Convert the `DataFrame` into a `LazyFrame`
50    fn lazy(self) -> LazyFrame {
51        let lp = DslBuilder::from_existing_df(self).build();
52        LazyFrame {
53            logical_plan: lp,
54            opt_state: Default::default(),
55            cached_arena: Default::default(),
56        }
57    }
58}
59
60impl IntoLazy for LazyFrame {
61    fn lazy(self) -> LazyFrame {
62        self
63    }
64}
65
66/// Lazy abstraction over an eager `DataFrame`.
67///
68/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
69/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
70#[derive(Clone, Default)]
71#[must_use]
72pub struct LazyFrame {
73    pub logical_plan: DslPlan,
74    pub(crate) opt_state: OptFlags,
75    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
76}
77
78impl From<DslPlan> for LazyFrame {
79    fn from(plan: DslPlan) -> Self {
80        Self {
81            logical_plan: plan,
82            opt_state: OptFlags::default(),
83            cached_arena: Default::default(),
84        }
85    }
86}
87
88impl LazyFrame {
89    pub(crate) fn from_inner(
90        logical_plan: DslPlan,
91        opt_state: OptFlags,
92        cached_arena: Arc<Mutex<Option<CachedArena>>>,
93    ) -> Self {
94        Self {
95            logical_plan,
96            opt_state,
97            cached_arena,
98        }
99    }
100
101    pub(crate) fn get_plan_builder(self) -> DslBuilder {
102        DslBuilder::from(self.logical_plan)
103    }
104
105    fn get_opt_state(&self) -> OptFlags {
106        self.opt_state
107    }
108
109    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
110        LazyFrame {
111            logical_plan,
112            opt_state,
113            cached_arena: Default::default(),
114        }
115    }
116
117    /// Get current optimizations.
118    pub fn get_current_optimizations(&self) -> OptFlags {
119        self.opt_state
120    }
121
122    /// Set allowed optimizations.
123    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
124        self.opt_state = opt_state;
125        self
126    }
127
128    /// Turn off all optimizations.
129    pub fn without_optimizations(self) -> Self {
130        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
131    }
132
133    /// Toggle projection pushdown optimization.
134    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
135        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
136        self
137    }
138
139    /// Toggle cluster with columns optimization.
140    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
141        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
142        self
143    }
144
145    /// Toggle collapse joins optimization.
146    pub fn with_collapse_joins(mut self, toggle: bool) -> Self {
147        self.opt_state.set(OptFlags::COLLAPSE_JOINS, toggle);
148        self
149    }
150
151    /// Check if operations are order dependent and unset maintaining_order if
152    /// the order would not be observed.
153    pub fn with_check_order(mut self, toggle: bool) -> Self {
154        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
155        self
156    }
157
158    /// Toggle predicate pushdown optimization.
159    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
160        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
161        self
162    }
163
164    /// Toggle type coercion optimization.
165    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
166        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
167        self
168    }
169
170    /// Toggle type check optimization.
171    pub fn with_type_check(mut self, toggle: bool) -> Self {
172        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
173        self
174    }
175
176    /// Toggle expression simplification optimization on or off.
177    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
178        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
179        self
180    }
181
182    /// Toggle common subplan elimination optimization on or off
183    #[cfg(feature = "cse")]
184    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
185        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
186        self
187    }
188
189    /// Toggle common subexpression elimination optimization on or off
190    #[cfg(feature = "cse")]
191    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
192        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
193        self
194    }
195
196    /// Toggle slice pushdown optimization.
197    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
198        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
199        self
200    }
201
202    #[cfg(feature = "new_streaming")]
203    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
204        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
205        self
206    }
207
208    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
209    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
210        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
211        self
212    }
213
214    /// Run every node eagerly. This turns off multi-node optimizations.
215    pub fn _with_eager(mut self, toggle: bool) -> Self {
216        self.opt_state.set(OptFlags::EAGER, toggle);
217        self
218    }
219
220    /// Return a String describing the naive (un-optimized) logical plan.
221    pub fn describe_plan(&self) -> PolarsResult<String> {
222        Ok(self.clone().to_alp()?.describe())
223    }
224
225    /// Return a String describing the naive (un-optimized) logical plan in tree format.
226    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
227        Ok(self.clone().to_alp()?.describe_tree_format())
228    }
229
230    /// Return a String describing the optimized logical plan.
231    ///
232    /// Returns `Err` if optimizing the logical plan fails.
233    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
234        Ok(self.clone().to_alp_optimized()?.describe())
235    }
236
237    /// Return a String describing the optimized logical plan in tree format.
238    ///
239    /// Returns `Err` if optimizing the logical plan fails.
240    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
241        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
242    }
243
244    /// Return a String describing the logical plan.
245    ///
246    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
247    /// explains the naive, un-optimized plan.
248    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
249        if optimized {
250            self.describe_optimized_plan()
251        } else {
252            self.describe_plan()
253        }
254    }
255
256    /// Add a sort operation to the logical plan.
257    ///
258    /// Sorts the LazyFrame by the column name specified using the provided options.
259    ///
260    /// # Example
261    ///
262    /// Sort DataFrame by 'sepal_width' column:
263    /// ```rust
264    /// # use polars_core::prelude::*;
265    /// # use polars_lazy::prelude::*;
266    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
267    ///     df.lazy().sort(["sepal_width"], Default::default())
268    /// }
269    /// ```
270    /// Sort by a single column with specific order:
271    /// ```
272    /// # use polars_core::prelude::*;
273    /// # use polars_lazy::prelude::*;
274    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
275    ///     df.lazy().sort(
276    ///         ["sepal_width"],
277    ///         SortMultipleOptions::new()
278    ///             .with_order_descending(descending)
279    ///     )
280    /// }
281    /// ```
282    /// Sort by multiple columns with specifying order for each column:
283    /// ```
284    /// # use polars_core::prelude::*;
285    /// # use polars_lazy::prelude::*;
286    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
287    ///     df.lazy().sort(
288    ///         ["sepal_width", "sepal_length"],
289    ///         SortMultipleOptions::new()
290    ///             .with_order_descending_multi([false, true])
291    ///     )
292    /// }
293    /// ```
294    /// See [`SortMultipleOptions`] for more options.
295    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
296        let opt_state = self.get_opt_state();
297        let lp = self
298            .get_plan_builder()
299            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
300            .build();
301        Self::from_logical_plan(lp, opt_state)
302    }
303
304    /// Add a sort operation to the logical plan.
305    ///
306    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
307    /// concrete columns before sorting.
308    ///
309    /// See [`SortMultipleOptions`] for more options.
310    ///
311    /// # Example
312    ///
313    /// ```rust
314    /// use polars_core::prelude::*;
315    /// use polars_lazy::prelude::*;
316    ///
317    /// /// Sort DataFrame by 'sepal_width' column
318    /// fn example(df: DataFrame) -> LazyFrame {
319    ///       df.lazy()
320    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
321    /// }
322    /// ```
323    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
324        self,
325        by_exprs: E,
326        sort_options: SortMultipleOptions,
327    ) -> Self {
328        let by_exprs = by_exprs.as_ref().to_vec();
329        if by_exprs.is_empty() {
330            self
331        } else {
332            let opt_state = self.get_opt_state();
333            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
334            Self::from_logical_plan(lp, opt_state)
335        }
336    }
337
338    pub fn top_k<E: AsRef<[Expr]>>(
339        self,
340        k: IdxSize,
341        by_exprs: E,
342        sort_options: SortMultipleOptions,
343    ) -> Self {
344        // this will optimize to top-k
345        self.sort_by_exprs(
346            by_exprs,
347            sort_options.with_order_reversed().with_nulls_last(true),
348        )
349        .slice(0, k)
350    }
351
352    pub fn bottom_k<E: AsRef<[Expr]>>(
353        self,
354        k: IdxSize,
355        by_exprs: E,
356        sort_options: SortMultipleOptions,
357    ) -> Self {
358        // this will optimize to bottom-k
359        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
360            .slice(0, k)
361    }
362
363    /// Reverse the `DataFrame` from top to bottom.
364    ///
365    /// Row `i` becomes row `number_of_rows - i - 1`.
366    ///
367    /// # Example
368    ///
369    /// ```rust
370    /// use polars_core::prelude::*;
371    /// use polars_lazy::prelude::*;
372    ///
373    /// fn example(df: DataFrame) -> LazyFrame {
374    ///       df.lazy()
375    ///         .reverse()
376    /// }
377    /// ```
378    pub fn reverse(self) -> Self {
379        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
380    }
381
382    /// Rename columns in the DataFrame.
383    ///
384    /// `existing` and `new` are iterables of the same length containing the old and
385    /// corresponding new column names. Renaming happens to all `existing` columns
386    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
387    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
388    /// those columns that are actually found will be renamed (others will be ignored).
389    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
390    where
391        I: IntoIterator<Item = T>,
392        J: IntoIterator<Item = S>,
393        T: AsRef<str>,
394        S: AsRef<str>,
395    {
396        let iter = existing.into_iter();
397        let cap = iter.size_hint().0;
398        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
399        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
400
401        // TODO! should this error if `existing` and `new` have different lengths?
402        // Currently, the longer of the two is truncated.
403        for (existing, new) in iter.zip(new) {
404            let existing = existing.as_ref();
405            let new = new.as_ref();
406            if new != existing {
407                existing_vec.push(existing.into());
408                new_vec.push(new.into());
409            }
410        }
411
412        self.map_private(DslFunction::Rename {
413            existing: existing_vec.into(),
414            new: new_vec.into(),
415            strict,
416        })
417    }
418
419    /// Removes columns from the DataFrame.
420    /// Note that it's better to only select the columns you need
421    /// and let the projection pushdown optimize away the unneeded columns.
422    ///
423    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
424    /// error while materializing the [`LazyFrame`].
425    pub fn drop(self, columns: Selector) -> Self {
426        let opt_state = self.get_opt_state();
427        let lp = self.get_plan_builder().drop(columns).build();
428        Self::from_logical_plan(lp, opt_state)
429    }
430
431    /// Shift the values by a given period and fill the parts that will be empty due to this operation
432    /// with `Nones`.
433    ///
434    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
435    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
436        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
437    }
438
439    /// Shift the values by a given period and fill the parts that will be empty due to this operation
440    /// with the result of the `fill_value` expression.
441    ///
442    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
443    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
444        self.select(vec![
445            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
446        ])
447    }
448
449    /// Fill None values in the DataFrame with an expression.
450    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
451        let opt_state = self.get_opt_state();
452        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
453        Self::from_logical_plan(lp, opt_state)
454    }
455
456    /// Fill NaN values in the DataFrame with an expression.
457    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
458        let opt_state = self.get_opt_state();
459        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
460        Self::from_logical_plan(lp, opt_state)
461    }
462
463    /// Caches the result into a new LazyFrame.
464    ///
465    /// This should be used to prevent computations running multiple times.
466    pub fn cache(self) -> Self {
467        let opt_state = self.get_opt_state();
468        let lp = self.get_plan_builder().cache().build();
469        Self::from_logical_plan(lp, opt_state)
470    }
471
472    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
473    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
474        let cast_cols: Vec<Expr> = dtypes
475            .into_iter()
476            .map(|(name, dt)| {
477                let name = PlSmallStr::from_str(name);
478
479                if strict {
480                    col(name).strict_cast(dt)
481                } else {
482                    col(name).cast(dt)
483                }
484            })
485            .collect();
486
487        if cast_cols.is_empty() {
488            self
489        } else {
490            self.with_columns(cast_cols)
491        }
492    }
493
494    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
495    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
496        self.with_columns(vec![if strict {
497            col(PlSmallStr::from_static("*")).strict_cast(dtype)
498        } else {
499            col(PlSmallStr::from_static("*")).cast(dtype)
500        }])
501    }
502
503    pub fn optimize(
504        self,
505        lp_arena: &mut Arena<IR>,
506        expr_arena: &mut Arena<AExpr>,
507    ) -> PolarsResult<Node> {
508        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
509    }
510
511    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
512        let (mut lp_arena, mut expr_arena) = self.get_arenas();
513        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
514
515        Ok(IRPlan::new(node, lp_arena, expr_arena))
516    }
517
518    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
519        let (mut lp_arena, mut expr_arena) = self.get_arenas();
520        let node = to_alp(
521            self.logical_plan,
522            &mut expr_arena,
523            &mut lp_arena,
524            &mut self.opt_state,
525        )?;
526        let plan = IRPlan::new(node, lp_arena, expr_arena);
527        Ok(plan)
528    }
529
530    pub(crate) fn optimize_with_scratch(
531        self,
532        lp_arena: &mut Arena<IR>,
533        expr_arena: &mut Arena<AExpr>,
534        scratch: &mut Vec<Node>,
535    ) -> PolarsResult<Node> {
536        #[allow(unused_mut)]
537        let mut opt_state = self.opt_state;
538        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
539
540        #[cfg(feature = "cse")]
541        if new_streaming {
542            // The new streaming engine can't deal with the way the common
543            // subexpression elimination adds length-incorrect with_columns.
544            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
545        }
546
547        let lp_top = optimize(
548            self.logical_plan,
549            opt_state,
550            lp_arena,
551            expr_arena,
552            scratch,
553            Some(&|expr, expr_arena, schema| {
554                let phys_expr = create_physical_expr(
555                    expr,
556                    Context::Default,
557                    expr_arena,
558                    schema,
559                    &mut ExpressionConversionState::new(true),
560                )
561                .ok()?;
562                let io_expr = phys_expr_to_io_expr(phys_expr);
563                Some(io_expr)
564            }),
565        )?;
566
567        Ok(lp_top)
568    }
569
570    fn prepare_collect_post_opt<P>(
571        mut self,
572        check_sink: bool,
573        query_start: Option<std::time::Instant>,
574        post_opt: P,
575    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
576    where
577        P: FnOnce(
578            Node,
579            &mut Arena<IR>,
580            &mut Arena<AExpr>,
581            Option<std::time::Duration>,
582        ) -> PolarsResult<()>,
583    {
584        let (mut lp_arena, mut expr_arena) = self.get_arenas();
585
586        let mut scratch = vec![];
587        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
588
589        post_opt(
590            lp_top,
591            &mut lp_arena,
592            &mut expr_arena,
593            // Post optimization callback gets the time since the
594            // query was started as its "base" timepoint.
595            query_start.map(|s| s.elapsed()),
596        )?;
597
598        // sink should be replaced
599        let no_file_sink = if check_sink {
600            !matches!(
601                lp_arena.get(lp_top),
602                IR::Sink {
603                    payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
604                    ..
605                }
606            )
607        } else {
608            true
609        };
610        let physical_plan = create_physical_plan(
611            lp_top,
612            &mut lp_arena,
613            &mut expr_arena,
614            BUILD_STREAMING_EXECUTOR,
615        )?;
616
617        let state = ExecutionState::new();
618        Ok((state, physical_plan, no_file_sink))
619    }
620
621    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
622    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
623    where
624        P: FnOnce(
625            Node,
626            &mut Arena<IR>,
627            &mut Arena<AExpr>,
628            Option<std::time::Duration>,
629        ) -> PolarsResult<()>,
630    {
631        let (mut state, mut physical_plan, _) =
632            self.prepare_collect_post_opt(false, None, post_opt)?;
633        physical_plan.execute(&mut state)
634    }
635
636    #[allow(unused_mut)]
637    fn prepare_collect(
638        self,
639        check_sink: bool,
640        query_start: Option<std::time::Instant>,
641    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
642        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
643    }
644
645    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
646    /// `engine`.
647    ///
648    /// The query is optimized prior to execution.
649    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
650        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
651            payload.clone()
652        } else {
653            self.logical_plan = DslPlan::Sink {
654                input: Arc::new(self.logical_plan),
655                payload: SinkType::Memory,
656            };
657            SinkType::Memory
658        };
659
660        // Default engine for collect is InMemory, sink_* is Streaming
661        if engine == Engine::Auto {
662            engine = match payload {
663                #[cfg(feature = "new_streaming")]
664                SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
665                _ => Engine::InMemory,
666            };
667        }
668        // Gpu uses some hacks to dispatch.
669        if engine == Engine::Gpu {
670            engine = Engine::InMemory;
671        }
672
673        #[cfg(feature = "new_streaming")]
674        {
675            if let Some(result) = self.try_new_streaming_if_requested() {
676                return result.map(|v| v.unwrap_single());
677            }
678        }
679
680        match engine {
681            Engine::Auto => unreachable!(),
682            Engine::Streaming => {
683                feature_gated!("new_streaming", self = self.with_new_streaming(true))
684            },
685            _ => {},
686        }
687        let mut alp_plan = self.clone().to_alp_optimized()?;
688
689        match engine {
690            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
691                let result = polars_stream::run_query(
692                    alp_plan.lp_top,
693                    &mut alp_plan.lp_arena,
694                    &mut alp_plan.expr_arena,
695                );
696                result.map(|v| v.unwrap_single())
697            }),
698            Engine::Gpu => {
699                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
700            },
701            Engine::InMemory => {
702                let mut physical_plan = create_physical_plan(
703                    alp_plan.lp_top,
704                    &mut alp_plan.lp_arena,
705                    &mut alp_plan.expr_arena,
706                    BUILD_STREAMING_EXECUTOR,
707                )?;
708                let mut state = ExecutionState::new();
709                physical_plan.execute(&mut state)
710            },
711        }
712    }
713
714    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
715        let sink_multiple = LazyFrame {
716            logical_plan: DslPlan::SinkMultiple { inputs: plans },
717            opt_state,
718            cached_arena: Default::default(),
719        };
720        sink_multiple.explain(true)
721    }
722
723    pub fn collect_all_with_engine(
724        plans: Vec<DslPlan>,
725        mut engine: Engine,
726        opt_state: OptFlags,
727    ) -> PolarsResult<Vec<DataFrame>> {
728        if plans.is_empty() {
729            return Ok(Vec::new());
730        }
731
732        // Default engine for collect_all is InMemory
733        if engine == Engine::Auto {
734            engine = Engine::InMemory;
735        }
736        // Gpu uses some hacks to dispatch.
737        if engine == Engine::Gpu {
738            engine = Engine::InMemory;
739        }
740
741        let mut sink_multiple = LazyFrame {
742            logical_plan: DslPlan::SinkMultiple { inputs: plans },
743            opt_state,
744            cached_arena: Default::default(),
745        };
746
747        #[cfg(feature = "new_streaming")]
748        {
749            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
750                return result.map(|v| v.unwrap_multiple());
751            }
752        }
753
754        match engine {
755            Engine::Auto => unreachable!(),
756            Engine::Streaming => {
757                feature_gated!(
758                    "new_streaming",
759                    sink_multiple = sink_multiple.with_new_streaming(true)
760                )
761            },
762            _ => {},
763        }
764        let mut alp_plan = sink_multiple.to_alp_optimized()?;
765
766        if engine == Engine::Streaming {
767            feature_gated!("new_streaming", {
768                let result = polars_stream::run_query(
769                    alp_plan.lp_top,
770                    &mut alp_plan.lp_arena,
771                    &mut alp_plan.expr_arena,
772                );
773                return result.map(|v| v.unwrap_multiple());
774            });
775        }
776
777        let IR::SinkMultiple { inputs } = alp_plan.root() else {
778            unreachable!()
779        };
780
781        let mut multiplan = create_multiple_physical_plans(
782            inputs.clone().as_slice(),
783            &mut alp_plan.lp_arena,
784            &mut alp_plan.expr_arena,
785            BUILD_STREAMING_EXECUTOR,
786        )?;
787
788        match engine {
789            Engine::Gpu => polars_bail!(
790                InvalidOperation: "collect_all is not supported for the gpu engine"
791            ),
792            Engine::InMemory => {
793                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
794                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
795                // within bounds
796                let mut state = ExecutionState::new();
797                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
798                    cache_prefiller.execute(&mut state)?;
799                }
800                let out = POOL.install(|| {
801                    multiplan
802                        .physical_plans
803                        .chunks_mut(POOL.current_num_threads() * 3)
804                        .map(|chunk| {
805                            chunk
806                                .into_par_iter()
807                                .enumerate()
808                                .map(|(idx, input)| {
809                                    let mut input = std::mem::take(input);
810                                    let mut state = state.split();
811                                    state.branch_idx += idx;
812
813                                    let df = input.execute(&mut state)?;
814                                    Ok(df)
815                                })
816                                .collect::<PolarsResult<Vec<_>>>()
817                        })
818                        .collect::<PolarsResult<Vec<_>>>()
819                });
820                Ok(out?.into_iter().flatten().collect())
821            },
822            _ => unreachable!(),
823        }
824    }
825
826    /// Execute all the lazy operations and collect them into a [`DataFrame`].
827    ///
828    /// The query is optimized prior to execution.
829    ///
830    /// # Example
831    ///
832    /// ```rust
833    /// use polars_core::prelude::*;
834    /// use polars_lazy::prelude::*;
835    ///
836    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
837    ///     df.lazy()
838    ///       .group_by([col("foo")])
839    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
840    ///       .collect()
841    /// }
842    /// ```
843    pub fn collect(self) -> PolarsResult<DataFrame> {
844        self.collect_with_engine(Engine::InMemory)
845    }
846
847    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
848    // This version does profiling of the node execution.
849    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
850    where
851        P: FnOnce(
852            Node,
853            &mut Arena<IR>,
854            &mut Arena<AExpr>,
855            Option<std::time::Duration>,
856        ) -> PolarsResult<()>,
857    {
858        let query_start = std::time::Instant::now();
859        let (mut state, mut physical_plan, _) =
860            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
861        state.time_nodes(query_start);
862        let out = physical_plan.execute(&mut state)?;
863        let timer_df = state.finish_timer()?;
864        Ok((out, timer_df))
865    }
866
867    /// Profile a LazyFrame.
868    ///
869    /// This will run the query and return a tuple
870    /// containing the materialized DataFrame and a DataFrame that contains profiling information
871    /// of each node that is executed.
872    ///
873    /// The units of the timings are microseconds.
874    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
875        self._profile_post_opt(|_, _, _, _| Ok(()))
876    }
877
878    /// Stream a query result into a parquet file. This is useful if the final result doesn't fit
879    /// into memory. This methods will return an error if the query cannot be completely done in a
880    /// streaming fashion.
881    #[cfg(feature = "parquet")]
882    pub fn sink_parquet(
883        self,
884        target: SinkTarget,
885        options: ParquetWriteOptions,
886        cloud_options: Option<polars_io::cloud::CloudOptions>,
887        sink_options: SinkOptions,
888    ) -> PolarsResult<Self> {
889        self.sink(SinkType::File(FileSinkType {
890            target,
891            sink_options,
892            file_type: FileType::Parquet(options),
893            cloud_options,
894        }))
895    }
896
897    /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
898    /// into memory. This methods will return an error if the query cannot be completely done in a
899    /// streaming fashion.
900    #[cfg(feature = "ipc")]
901    pub fn sink_ipc(
902        self,
903        target: SinkTarget,
904        options: IpcWriterOptions,
905        cloud_options: Option<polars_io::cloud::CloudOptions>,
906        sink_options: SinkOptions,
907    ) -> PolarsResult<Self> {
908        self.sink(SinkType::File(FileSinkType {
909            target,
910            sink_options,
911            file_type: FileType::Ipc(options),
912            cloud_options,
913        }))
914    }
915
916    /// Stream a query result into an csv file. This is useful if the final result doesn't fit
917    /// into memory. This methods will return an error if the query cannot be completely done in a
918    /// streaming fashion.
919    #[cfg(feature = "csv")]
920    pub fn sink_csv(
921        self,
922        target: SinkTarget,
923        options: CsvWriterOptions,
924        cloud_options: Option<polars_io::cloud::CloudOptions>,
925        sink_options: SinkOptions,
926    ) -> PolarsResult<Self> {
927        self.sink(SinkType::File(FileSinkType {
928            target,
929            sink_options,
930            file_type: FileType::Csv(options),
931            cloud_options,
932        }))
933    }
934
935    /// Stream a query result into a JSON file. This is useful if the final result doesn't fit
936    /// into memory. This methods will return an error if the query cannot be completely done in a
937    /// streaming fashion.
938    #[cfg(feature = "json")]
939    pub fn sink_json(
940        self,
941        target: SinkTarget,
942        options: JsonWriterOptions,
943        cloud_options: Option<polars_io::cloud::CloudOptions>,
944        sink_options: SinkOptions,
945    ) -> PolarsResult<Self> {
946        self.sink(SinkType::File(FileSinkType {
947            target,
948            sink_options,
949            file_type: FileType::Json(options),
950            cloud_options,
951        }))
952    }
953
954    /// Stream a query result into a parquet file in a partitioned manner. This is useful if the
955    /// final result doesn't fit into memory. This methods will return an error if the query cannot
956    /// be completely done in a streaming fashion.
957    #[cfg(feature = "parquet")]
958    #[allow(clippy::too_many_arguments)]
959    pub fn sink_parquet_partitioned(
960        self,
961        base_path: Arc<PlPath>,
962        file_path_cb: Option<PartitionTargetCallback>,
963        variant: PartitionVariant,
964        options: ParquetWriteOptions,
965        cloud_options: Option<polars_io::cloud::CloudOptions>,
966        sink_options: SinkOptions,
967        per_partition_sort_by: Option<Vec<SortColumn>>,
968        finish_callback: Option<SinkFinishCallback>,
969    ) -> PolarsResult<Self> {
970        self.sink(SinkType::Partition(PartitionSinkType {
971            base_path,
972            file_path_cb,
973            sink_options,
974            variant,
975            file_type: FileType::Parquet(options),
976            cloud_options,
977            per_partition_sort_by,
978            finish_callback,
979        }))
980    }
981
982    /// Stream a query result into an ipc/arrow file in a partitioned manner. This is useful if the
983    /// final result doesn't fit into memory. This methods will return an error if the query cannot
984    /// be completely done in a streaming fashion.
985    #[cfg(feature = "ipc")]
986    #[allow(clippy::too_many_arguments)]
987    pub fn sink_ipc_partitioned(
988        self,
989        base_path: Arc<PlPath>,
990        file_path_cb: Option<PartitionTargetCallback>,
991        variant: PartitionVariant,
992        options: IpcWriterOptions,
993        cloud_options: Option<polars_io::cloud::CloudOptions>,
994        sink_options: SinkOptions,
995        per_partition_sort_by: Option<Vec<SortColumn>>,
996        finish_callback: Option<SinkFinishCallback>,
997    ) -> PolarsResult<Self> {
998        self.sink(SinkType::Partition(PartitionSinkType {
999            base_path,
1000            file_path_cb,
1001            sink_options,
1002            variant,
1003            file_type: FileType::Ipc(options),
1004            cloud_options,
1005            per_partition_sort_by,
1006            finish_callback,
1007        }))
1008    }
1009
1010    /// Stream a query result into an csv file in a partitioned manner. This is useful if the final
1011    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1012    /// completely done in a streaming fashion.
1013    #[cfg(feature = "csv")]
1014    #[allow(clippy::too_many_arguments)]
1015    pub fn sink_csv_partitioned(
1016        self,
1017        base_path: Arc<PlPath>,
1018        file_path_cb: Option<PartitionTargetCallback>,
1019        variant: PartitionVariant,
1020        options: CsvWriterOptions,
1021        cloud_options: Option<polars_io::cloud::CloudOptions>,
1022        sink_options: SinkOptions,
1023        per_partition_sort_by: Option<Vec<SortColumn>>,
1024        finish_callback: Option<SinkFinishCallback>,
1025    ) -> PolarsResult<Self> {
1026        self.sink(SinkType::Partition(PartitionSinkType {
1027            base_path,
1028            file_path_cb,
1029            sink_options,
1030            variant,
1031            file_type: FileType::Csv(options),
1032            cloud_options,
1033            per_partition_sort_by,
1034            finish_callback,
1035        }))
1036    }
1037
1038    /// Stream a query result into a JSON file in a partitioned manner. This is useful if the final
1039    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1040    /// completely done in a streaming fashion.
1041    #[cfg(feature = "json")]
1042    #[allow(clippy::too_many_arguments)]
1043    pub fn sink_json_partitioned(
1044        self,
1045        base_path: Arc<PlPath>,
1046        file_path_cb: Option<PartitionTargetCallback>,
1047        variant: PartitionVariant,
1048        options: JsonWriterOptions,
1049        cloud_options: Option<polars_io::cloud::CloudOptions>,
1050        sink_options: SinkOptions,
1051        per_partition_sort_by: Option<Vec<SortColumn>>,
1052        finish_callback: Option<SinkFinishCallback>,
1053    ) -> PolarsResult<Self> {
1054        self.sink(SinkType::Partition(PartitionSinkType {
1055            base_path,
1056            file_path_cb,
1057            sink_options,
1058            variant,
1059            file_type: FileType::Json(options),
1060            cloud_options,
1061            per_partition_sort_by,
1062            finish_callback,
1063        }))
1064    }
1065
1066    #[cfg(feature = "new_streaming")]
1067    pub fn try_new_streaming_if_requested(
1068        &mut self,
1069    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1070        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1071        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1072
1073        if auto_new_streaming || force_new_streaming {
1074            // Try to run using the new streaming engine, falling back
1075            // if it fails in a todo!() error if auto_new_streaming is set.
1076            let mut new_stream_lazy = self.clone();
1077            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1078            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1079                Ok(v) => v,
1080                Err(e) => return Some(Err(e)),
1081            };
1082
1083            let f = || {
1084                polars_stream::run_query(
1085                    alp_plan.lp_top,
1086                    &mut alp_plan.lp_arena,
1087                    &mut alp_plan.expr_arena,
1088                )
1089            };
1090
1091            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1092                Ok(v) => return Some(v),
1093                Err(e) => {
1094                    // Fallback to normal engine if error is due to not being implemented
1095                    // and auto_new_streaming is set, otherwise propagate error.
1096                    if !force_new_streaming
1097                        && auto_new_streaming
1098                        && e.downcast_ref::<&str>()
1099                            .map(|s| s.starts_with("not yet implemented"))
1100                            .unwrap_or(false)
1101                    {
1102                        if polars_core::config::verbose() {
1103                            eprintln!(
1104                                "caught unimplemented error in new streaming engine, falling back to normal engine"
1105                            );
1106                        }
1107                    } else {
1108                        std::panic::resume_unwind(e);
1109                    }
1110                },
1111            }
1112        }
1113
1114        None
1115    }
1116
1117    fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1118        polars_ensure!(
1119            !matches!(self.logical_plan, DslPlan::Sink { .. }),
1120            InvalidOperation: "cannot create a sink on top of another sink"
1121        );
1122        self.logical_plan = DslPlan::Sink {
1123            input: Arc::new(self.logical_plan),
1124            payload,
1125        };
1126        Ok(self)
1127    }
1128
1129    /// Filter frame rows that match a predicate expression.
1130    ///
1131    /// The expression must yield boolean values (note that rows where the
1132    /// predicate resolves to `null` are *not* included in the resulting frame).
1133    ///
1134    /// # Example
1135    ///
1136    /// ```rust
1137    /// use polars_core::prelude::*;
1138    /// use polars_lazy::prelude::*;
1139    ///
1140    /// fn example(df: DataFrame) -> LazyFrame {
1141    ///       df.lazy()
1142    ///         .filter(col("sepal_width").is_not_null())
1143    ///         .select([col("sepal_width"), col("sepal_length")])
1144    /// }
1145    /// ```
1146    pub fn filter(self, predicate: Expr) -> Self {
1147        let opt_state = self.get_opt_state();
1148        let lp = self.get_plan_builder().filter(predicate).build();
1149        Self::from_logical_plan(lp, opt_state)
1150    }
1151
1152    /// Remove frame rows that match a predicate expression.
1153    ///
1154    /// The expression must yield boolean values (note that rows where the
1155    /// predicate resolves to `null` are *not* removed from the resulting frame).
1156    ///
1157    /// # Example
1158    ///
1159    /// ```rust
1160    /// use polars_core::prelude::*;
1161    /// use polars_lazy::prelude::*;
1162    ///
1163    /// fn example(df: DataFrame) -> LazyFrame {
1164    ///       df.lazy()
1165    ///         .remove(col("sepal_width").is_null())
1166    ///         .select([col("sepal_width"), col("sepal_length")])
1167    /// }
1168    /// ```
1169    pub fn remove(self, predicate: Expr) -> Self {
1170        self.filter(predicate.neq_missing(lit(true)))
1171    }
1172
1173    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1174    ///
1175    /// Columns can be selected with [`col`];
1176    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1177    ///
1178    /// # Example
1179    ///
1180    /// ```rust
1181    /// use polars_core::prelude::*;
1182    /// use polars_lazy::prelude::*;
1183    ///
1184    /// /// This function selects column "foo" and column "bar".
1185    /// /// Column "bar" is renamed to "ham".
1186    /// fn example(df: DataFrame) -> LazyFrame {
1187    ///       df.lazy()
1188    ///         .select([col("foo"),
1189    ///                   col("bar").alias("ham")])
1190    /// }
1191    ///
1192    /// /// This function selects all columns except "foo"
1193    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1194    ///       df.lazy()
1195    ///         .select([all().exclude_cols(["foo"]).as_expr()])
1196    /// }
1197    /// ```
1198    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1199        let exprs = exprs.as_ref().to_vec();
1200        self.select_impl(
1201            exprs,
1202            ProjectionOptions {
1203                run_parallel: true,
1204                duplicate_check: true,
1205                should_broadcast: true,
1206            },
1207        )
1208    }
1209
1210    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1211        let exprs = exprs.as_ref().to_vec();
1212        self.select_impl(
1213            exprs,
1214            ProjectionOptions {
1215                run_parallel: false,
1216                duplicate_check: true,
1217                should_broadcast: true,
1218            },
1219        )
1220    }
1221
1222    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1223        let opt_state = self.get_opt_state();
1224        let lp = self.get_plan_builder().project(exprs, options).build();
1225        Self::from_logical_plan(lp, opt_state)
1226    }
1227
1228    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1229    ///
1230    /// Takes a list of expressions to group on.
1231    ///
1232    /// # Example
1233    ///
1234    /// ```rust
1235    /// use polars_core::prelude::*;
1236    /// use polars_lazy::prelude::*;
1237    ///
1238    /// fn example(df: DataFrame) -> LazyFrame {
1239    ///       df.lazy()
1240    ///        .group_by([col("date")])
1241    ///        .agg([
1242    ///            col("rain").min().alias("min_rain"),
1243    ///            col("rain").sum().alias("sum_rain"),
1244    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1245    ///        ])
1246    /// }
1247    /// ```
1248    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1249        let keys = by
1250            .as_ref()
1251            .iter()
1252            .map(|e| e.clone().into())
1253            .collect::<Vec<_>>();
1254        let opt_state = self.get_opt_state();
1255
1256        #[cfg(feature = "dynamic_group_by")]
1257        {
1258            LazyGroupBy {
1259                logical_plan: self.logical_plan,
1260                opt_state,
1261                keys,
1262                maintain_order: false,
1263                dynamic_options: None,
1264                rolling_options: None,
1265            }
1266        }
1267
1268        #[cfg(not(feature = "dynamic_group_by"))]
1269        {
1270            LazyGroupBy {
1271                logical_plan: self.logical_plan,
1272                opt_state,
1273                keys,
1274                maintain_order: false,
1275            }
1276        }
1277    }
1278
1279    /// Create rolling groups based on a time column.
1280    ///
1281    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1282    ///
1283    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1284    /// individual values and are not of constant intervals. For constant intervals use
1285    /// *group_by_dynamic*
1286    #[cfg(feature = "dynamic_group_by")]
1287    pub fn rolling<E: AsRef<[Expr]>>(
1288        mut self,
1289        index_column: Expr,
1290        group_by: E,
1291        mut options: RollingGroupOptions,
1292    ) -> LazyGroupBy {
1293        if let Expr::Column(name) = index_column {
1294            options.index_column = name;
1295        } else {
1296            let output_field = index_column
1297                .to_field(&self.collect_schema().unwrap())
1298                .unwrap();
1299            return self.with_column(index_column).rolling(
1300                Expr::Column(output_field.name().clone()),
1301                group_by,
1302                options,
1303            );
1304        }
1305        let opt_state = self.get_opt_state();
1306        LazyGroupBy {
1307            logical_plan: self.logical_plan,
1308            opt_state,
1309            keys: group_by.as_ref().to_vec(),
1310            maintain_order: true,
1311            dynamic_options: None,
1312            rolling_options: Some(options),
1313        }
1314    }
1315
1316    /// Group based on a time value (or index value of type Int32, Int64).
1317    ///
1318    /// Time windows are calculated and rows are assigned to windows. Different from a
1319    /// normal group_by is that a row can be member of multiple groups. The time/index
1320    /// window could be seen as a rolling window, with a window size determined by
1321    /// dates/times/values instead of slots in the DataFrame.
1322    ///
1323    /// A window is defined by:
1324    ///
1325    /// - every: interval of the window
1326    /// - period: length of the window
1327    /// - offset: offset of the window
1328    ///
1329    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1330    /// with a ordinary group_by on these keys.
1331    #[cfg(feature = "dynamic_group_by")]
1332    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1333        mut self,
1334        index_column: Expr,
1335        group_by: E,
1336        mut options: DynamicGroupOptions,
1337    ) -> LazyGroupBy {
1338        if let Expr::Column(name) = index_column {
1339            options.index_column = name;
1340        } else {
1341            let output_field = index_column
1342                .to_field(&self.collect_schema().unwrap())
1343                .unwrap();
1344            return self.with_column(index_column).group_by_dynamic(
1345                Expr::Column(output_field.name().clone()),
1346                group_by,
1347                options,
1348            );
1349        }
1350        let opt_state = self.get_opt_state();
1351        LazyGroupBy {
1352            logical_plan: self.logical_plan,
1353            opt_state,
1354            keys: group_by.as_ref().to_vec(),
1355            maintain_order: true,
1356            dynamic_options: Some(options),
1357            rolling_options: None,
1358        }
1359    }
1360
1361    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1362    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1363        let keys = by
1364            .as_ref()
1365            .iter()
1366            .map(|e| e.clone().into())
1367            .collect::<Vec<_>>();
1368        let opt_state = self.get_opt_state();
1369
1370        #[cfg(feature = "dynamic_group_by")]
1371        {
1372            LazyGroupBy {
1373                logical_plan: self.logical_plan,
1374                opt_state,
1375                keys,
1376                maintain_order: true,
1377                dynamic_options: None,
1378                rolling_options: None,
1379            }
1380        }
1381
1382        #[cfg(not(feature = "dynamic_group_by"))]
1383        {
1384            LazyGroupBy {
1385                logical_plan: self.logical_plan,
1386                opt_state,
1387                keys,
1388                maintain_order: true,
1389            }
1390        }
1391    }
1392
1393    /// Left anti join this query with another lazy query.
1394    ///
1395    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1396    /// flexible join logic, see [`join`](LazyFrame::join) or
1397    /// [`join_builder`](LazyFrame::join_builder).
1398    ///
1399    /// # Example
1400    ///
1401    /// ```rust
1402    /// use polars_core::prelude::*;
1403    /// use polars_lazy::prelude::*;
1404    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1405    ///         ldf
1406    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1407    /// }
1408    /// ```
1409    #[cfg(feature = "semi_anti_join")]
1410    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1411        self.join(
1412            other,
1413            [left_on.into()],
1414            [right_on.into()],
1415            JoinArgs::new(JoinType::Anti),
1416        )
1417    }
1418
1419    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1420    #[cfg(feature = "cross_join")]
1421    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1422        self.join(
1423            other,
1424            vec![],
1425            vec![],
1426            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1427        )
1428    }
1429
1430    /// Left outer join this query with another lazy query.
1431    ///
1432    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1433    /// flexible join logic, see [`join`](LazyFrame::join) or
1434    /// [`join_builder`](LazyFrame::join_builder).
1435    ///
1436    /// # Example
1437    ///
1438    /// ```rust
1439    /// use polars_core::prelude::*;
1440    /// use polars_lazy::prelude::*;
1441    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1442    ///         ldf
1443    ///         .left_join(other, col("foo"), col("bar"))
1444    /// }
1445    /// ```
1446    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1447        self.join(
1448            other,
1449            [left_on.into()],
1450            [right_on.into()],
1451            JoinArgs::new(JoinType::Left),
1452        )
1453    }
1454
1455    /// Inner join this query with another lazy query.
1456    ///
1457    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1458    /// flexible join logic, see [`join`](LazyFrame::join) or
1459    /// [`join_builder`](LazyFrame::join_builder).
1460    ///
1461    /// # Example
1462    ///
1463    /// ```rust
1464    /// use polars_core::prelude::*;
1465    /// use polars_lazy::prelude::*;
1466    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1467    ///         ldf
1468    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1469    /// }
1470    /// ```
1471    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1472        self.join(
1473            other,
1474            [left_on.into()],
1475            [right_on.into()],
1476            JoinArgs::new(JoinType::Inner),
1477        )
1478    }
1479
1480    /// Full outer join this query with another lazy query.
1481    ///
1482    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1483    /// flexible join logic, see [`join`](LazyFrame::join) or
1484    /// [`join_builder`](LazyFrame::join_builder).
1485    ///
1486    /// # Example
1487    ///
1488    /// ```rust
1489    /// use polars_core::prelude::*;
1490    /// use polars_lazy::prelude::*;
1491    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1492    ///         ldf
1493    ///         .full_join(other, col("foo"), col("bar"))
1494    /// }
1495    /// ```
1496    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1497        self.join(
1498            other,
1499            [left_on.into()],
1500            [right_on.into()],
1501            JoinArgs::new(JoinType::Full),
1502        )
1503    }
1504
1505    /// Left semi join this query with another lazy query.
1506    ///
1507    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1508    /// flexible join logic, see [`join`](LazyFrame::join) or
1509    /// [`join_builder`](LazyFrame::join_builder).
1510    ///
1511    /// # Example
1512    ///
1513    /// ```rust
1514    /// use polars_core::prelude::*;
1515    /// use polars_lazy::prelude::*;
1516    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1517    ///         ldf
1518    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1519    /// }
1520    /// ```
1521    #[cfg(feature = "semi_anti_join")]
1522    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1523        self.join(
1524            other,
1525            [left_on.into()],
1526            [right_on.into()],
1527            JoinArgs::new(JoinType::Semi),
1528        )
1529    }
1530
1531    /// Generic function to join two LazyFrames.
1532    ///
1533    /// `join` can join on multiple columns, given as two list of expressions, and with a
1534    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1535    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1536    /// over how columns are renamed and parallelization options, use
1537    /// [`join_builder`](LazyFrame::join_builder).
1538    ///
1539    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1540    ///
1541    /// # Example
1542    ///
1543    /// ```rust
1544    /// use polars_core::prelude::*;
1545    /// use polars_lazy::prelude::*;
1546    ///
1547    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1548    ///         ldf
1549    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1550    /// }
1551    /// ```
1552    pub fn join<E: AsRef<[Expr]>>(
1553        self,
1554        other: LazyFrame,
1555        left_on: E,
1556        right_on: E,
1557        args: JoinArgs,
1558    ) -> LazyFrame {
1559        let left_on = left_on.as_ref().to_vec();
1560        let right_on = right_on.as_ref().to_vec();
1561
1562        self._join_impl(other, left_on, right_on, args)
1563    }
1564
1565    fn _join_impl(
1566        self,
1567        other: LazyFrame,
1568        left_on: Vec<Expr>,
1569        right_on: Vec<Expr>,
1570        args: JoinArgs,
1571    ) -> LazyFrame {
1572        let JoinArgs {
1573            how,
1574            validation,
1575            suffix,
1576            slice,
1577            nulls_equal,
1578            coalesce,
1579            maintain_order,
1580        } = args;
1581
1582        if slice.is_some() {
1583            panic!("impl error: slice is not handled")
1584        }
1585
1586        let mut builder = self
1587            .join_builder()
1588            .with(other)
1589            .left_on(left_on)
1590            .right_on(right_on)
1591            .how(how)
1592            .validate(validation)
1593            .join_nulls(nulls_equal)
1594            .coalesce(coalesce)
1595            .maintain_order(maintain_order);
1596
1597        if let Some(suffix) = suffix {
1598            builder = builder.suffix(suffix);
1599        }
1600
1601        // Note: args.slice is set by the optimizer
1602        builder.finish()
1603    }
1604
1605    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1606    ///
1607    /// After the `JoinBuilder` has been created and set up, calling
1608    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1609    /// representing the `join` operation.
1610    pub fn join_builder(self) -> JoinBuilder {
1611        JoinBuilder::new(self)
1612    }
1613
1614    /// Add or replace a column, given as an expression, to a DataFrame.
1615    ///
1616    /// # Example
1617    ///
1618    /// ```rust
1619    /// use polars_core::prelude::*;
1620    /// use polars_lazy::prelude::*;
1621    /// fn add_column(df: DataFrame) -> LazyFrame {
1622    ///     df.lazy()
1623    ///         .with_column(
1624    ///             when(col("sepal_length").lt(lit(5.0)))
1625    ///             .then(lit(10))
1626    ///             .otherwise(lit(1))
1627    ///             .alias("new_column_name"),
1628    ///         )
1629    /// }
1630    /// ```
1631    pub fn with_column(self, expr: Expr) -> LazyFrame {
1632        let opt_state = self.get_opt_state();
1633        let lp = self
1634            .get_plan_builder()
1635            .with_columns(
1636                vec![expr],
1637                ProjectionOptions {
1638                    run_parallel: false,
1639                    duplicate_check: true,
1640                    should_broadcast: true,
1641                },
1642            )
1643            .build();
1644        Self::from_logical_plan(lp, opt_state)
1645    }
1646
1647    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1648    ///
1649    /// # Example
1650    ///
1651    /// ```rust
1652    /// use polars_core::prelude::*;
1653    /// use polars_lazy::prelude::*;
1654    /// fn add_columns(df: DataFrame) -> LazyFrame {
1655    ///     df.lazy()
1656    ///         .with_columns(
1657    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1658    ///          )
1659    /// }
1660    /// ```
1661    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1662        let exprs = exprs.as_ref().to_vec();
1663        self.with_columns_impl(
1664            exprs,
1665            ProjectionOptions {
1666                run_parallel: true,
1667                duplicate_check: true,
1668                should_broadcast: true,
1669            },
1670        )
1671    }
1672
1673    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1674    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1675        let exprs = exprs.as_ref().to_vec();
1676        self.with_columns_impl(
1677            exprs,
1678            ProjectionOptions {
1679                run_parallel: false,
1680                duplicate_check: true,
1681                should_broadcast: true,
1682            },
1683        )
1684    }
1685
1686    /// Match or evolve to a certain schema.
1687    pub fn match_to_schema(
1688        self,
1689        schema: SchemaRef,
1690        per_column: Arc<[MatchToSchemaPerColumn]>,
1691        extra_columns: ExtraColumnsPolicy,
1692    ) -> LazyFrame {
1693        let opt_state = self.get_opt_state();
1694        let lp = self
1695            .get_plan_builder()
1696            .match_to_schema(schema, per_column, extra_columns)
1697            .build();
1698        Self::from_logical_plan(lp, opt_state)
1699    }
1700
1701    pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1702        let opt_state = self.get_opt_state();
1703        let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1704        Self::from_logical_plan(lp, opt_state)
1705    }
1706
1707    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1708        let opt_state = self.get_opt_state();
1709        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1710        Self::from_logical_plan(lp, opt_state)
1711    }
1712
1713    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1714        let contexts = contexts
1715            .as_ref()
1716            .iter()
1717            .map(|lf| lf.logical_plan.clone())
1718            .collect();
1719        let opt_state = self.get_opt_state();
1720        let lp = self.get_plan_builder().with_context(contexts).build();
1721        Self::from_logical_plan(lp, opt_state)
1722    }
1723
1724    /// Aggregate all the columns as their maximum values.
1725    ///
1726    /// Aggregated columns will have the same names as the original columns.
1727    pub fn max(self) -> Self {
1728        self.map_private(DslFunction::Stats(StatsFunction::Max))
1729    }
1730
1731    /// Aggregate all the columns as their minimum values.
1732    ///
1733    /// Aggregated columns will have the same names as the original columns.
1734    pub fn min(self) -> Self {
1735        self.map_private(DslFunction::Stats(StatsFunction::Min))
1736    }
1737
1738    /// Aggregate all the columns as their sum values.
1739    ///
1740    /// Aggregated columns will have the same names as the original columns.
1741    ///
1742    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1743    /// - For integer columns, the ordinary checks for overflow are performed:
1744    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1745    ///   silently wrap.
1746    /// - String columns will sum to None.
1747    pub fn sum(self) -> Self {
1748        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1749    }
1750
1751    /// Aggregate all the columns as their mean values.
1752    ///
1753    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1754    /// - String columns will have a mean of None.
1755    pub fn mean(self) -> Self {
1756        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1757    }
1758
1759    /// Aggregate all the columns as their median values.
1760    ///
1761    /// - Boolean and integer results are converted to `f64`. However, they are still
1762    ///   susceptible to overflow before this conversion occurs.
1763    /// - String columns will sum to None.
1764    pub fn median(self) -> Self {
1765        self.map_private(DslFunction::Stats(StatsFunction::Median))
1766    }
1767
1768    /// Aggregate all the columns as their quantile values.
1769    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1770        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1771            quantile,
1772            method,
1773        }))
1774    }
1775
1776    /// Aggregate all the columns as their standard deviation values.
1777    ///
1778    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1779    /// computing the variance, where `N` is the number of rows.
1780    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1781    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1782    /// > likelihood estimate of the variance for normally distributed variables. The
1783    /// > standard deviation computed in this function is the square root of the estimated
1784    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1785    /// > standard deviation per se.
1786    ///
1787    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1788    pub fn std(self, ddof: u8) -> Self {
1789        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1790    }
1791
1792    /// Aggregate all the columns as their variance values.
1793    ///
1794    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1795    /// computing the variance, where `N` is the number of rows.
1796    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1797    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1798    /// > likelihood estimate of the variance for normally distributed variables.
1799    ///
1800    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1801    pub fn var(self, ddof: u8) -> Self {
1802        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1803    }
1804
1805    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1806    pub fn explode(self, columns: Selector) -> LazyFrame {
1807        self.explode_impl(columns, false)
1808    }
1809
1810    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1811    fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1812        let opt_state = self.get_opt_state();
1813        let lp = self
1814            .get_plan_builder()
1815            .explode(columns, allow_empty)
1816            .build();
1817        Self::from_logical_plan(lp, opt_state)
1818    }
1819
1820    /// Aggregate all the columns as the sum of their null value count.
1821    pub fn null_count(self) -> LazyFrame {
1822        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1823    }
1824
1825    /// Drop non-unique rows and maintain the order of kept rows.
1826    ///
1827    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1828    /// `None`, all columns are considered.
1829    pub fn unique_stable(
1830        self,
1831        subset: Option<Selector>,
1832        keep_strategy: UniqueKeepStrategy,
1833    ) -> LazyFrame {
1834        self.unique_stable_generic(subset, keep_strategy)
1835    }
1836
1837    pub fn unique_stable_generic(
1838        self,
1839        subset: Option<Selector>,
1840        keep_strategy: UniqueKeepStrategy,
1841    ) -> LazyFrame {
1842        let opt_state = self.get_opt_state();
1843        let options = DistinctOptionsDSL {
1844            subset,
1845            maintain_order: true,
1846            keep_strategy,
1847        };
1848        let lp = self.get_plan_builder().distinct(options).build();
1849        Self::from_logical_plan(lp, opt_state)
1850    }
1851
1852    /// Drop non-unique rows without maintaining the order of kept rows.
1853    ///
1854    /// The order of the kept rows may change; to maintain the original row order, use
1855    /// [`unique_stable`](LazyFrame::unique_stable).
1856    ///
1857    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1858    /// all columns are considered.
1859    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1860        self.unique_generic(subset, keep_strategy)
1861    }
1862
1863    pub fn unique_generic(
1864        self,
1865        subset: Option<Selector>,
1866        keep_strategy: UniqueKeepStrategy,
1867    ) -> LazyFrame {
1868        let opt_state = self.get_opt_state();
1869        let options = DistinctOptionsDSL {
1870            subset,
1871            maintain_order: false,
1872            keep_strategy,
1873        };
1874        let lp = self.get_plan_builder().distinct(options).build();
1875        Self::from_logical_plan(lp, opt_state)
1876    }
1877
1878    /// Drop rows containing one or more NaN values.
1879    ///
1880    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1881    /// floating point columns are considered.
1882    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1883        let opt_state = self.get_opt_state();
1884        let lp = self.get_plan_builder().drop_nans(subset).build();
1885        Self::from_logical_plan(lp, opt_state)
1886    }
1887
1888    /// Drop rows containing one or more None values.
1889    ///
1890    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1891    /// columns are considered.
1892    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1893        let opt_state = self.get_opt_state();
1894        let lp = self.get_plan_builder().drop_nulls(subset).build();
1895        Self::from_logical_plan(lp, opt_state)
1896    }
1897
1898    /// Slice the DataFrame using an offset (starting row) and a length.
1899    ///
1900    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1901    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1902    /// end.
1903    ///
1904    /// If `offset` and `len` are such that the slice extends beyond the end of the
1905    /// DataFrame, the portion between `offset` and the end will be returned. In this
1906    /// case, the number of rows in the returned DataFrame will be less than `len`.
1907    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1908        let opt_state = self.get_opt_state();
1909        let lp = self.get_plan_builder().slice(offset, len).build();
1910        Self::from_logical_plan(lp, opt_state)
1911    }
1912
1913    /// Get the first row.
1914    ///
1915    /// Equivalent to `self.slice(0, 1)`.
1916    pub fn first(self) -> LazyFrame {
1917        self.slice(0, 1)
1918    }
1919
1920    /// Get the last row.
1921    ///
1922    /// Equivalent to `self.slice(-1, 1)`.
1923    pub fn last(self) -> LazyFrame {
1924        self.slice(-1, 1)
1925    }
1926
1927    /// Get the last `n` rows.
1928    ///
1929    /// Equivalent to `self.slice(-(n as i64), n)`.
1930    pub fn tail(self, n: IdxSize) -> LazyFrame {
1931        let neg_tail = -(n as i64);
1932        self.slice(neg_tail, n)
1933    }
1934
1935    /// Unpivot the DataFrame from wide to long format.
1936    ///
1937    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1938    #[cfg(feature = "pivot")]
1939    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1940        let opt_state = self.get_opt_state();
1941        let lp = self.get_plan_builder().unpivot(args).build();
1942        Self::from_logical_plan(lp, opt_state)
1943    }
1944
1945    /// Limit the DataFrame to the first `n` rows.
1946    pub fn limit(self, n: IdxSize) -> LazyFrame {
1947        self.slice(0, n)
1948    }
1949
1950    /// Apply a function/closure once the logical plan get executed.
1951    ///
1952    /// The function has access to the whole materialized DataFrame at the time it is
1953    /// called.
1954    ///
1955    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1956    /// with `LazyFrame::with_column` or `with_columns`.
1957    ///
1958    /// ## Warning
1959    /// This can blow up in your face if the schema is changed due to the operation. The
1960    /// optimizer relies on a correct schema.
1961    ///
1962    /// You can toggle certain optimizations off.
1963    pub fn map<F>(
1964        self,
1965        function: F,
1966        optimizations: AllowedOptimizations,
1967        schema: Option<Arc<dyn UdfSchema>>,
1968        name: Option<&'static str>,
1969    ) -> LazyFrame
1970    where
1971        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1972    {
1973        let opt_state = self.get_opt_state();
1974        let lp = self
1975            .get_plan_builder()
1976            .map(
1977                function,
1978                optimizations,
1979                schema,
1980                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1981            )
1982            .build();
1983        Self::from_logical_plan(lp, opt_state)
1984    }
1985
1986    #[cfg(feature = "python")]
1987    pub fn map_python(
1988        self,
1989        function: polars_utils::python_function::PythonFunction,
1990        optimizations: AllowedOptimizations,
1991        schema: Option<SchemaRef>,
1992        validate_output: bool,
1993    ) -> LazyFrame {
1994        let opt_state = self.get_opt_state();
1995        let lp = self
1996            .get_plan_builder()
1997            .map_python(function, optimizations, schema, validate_output)
1998            .build();
1999        Self::from_logical_plan(lp, opt_state)
2000    }
2001
2002    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2003        let opt_state = self.get_opt_state();
2004        let lp = self.get_plan_builder().map_private(function).build();
2005        Self::from_logical_plan(lp, opt_state)
2006    }
2007
2008    /// Add a new column at index 0 that counts the rows.
2009    ///
2010    /// `name` is the name of the new column. `offset` is where to start counting from; if
2011    /// `None`, it is set to `0`.
2012    ///
2013    /// # Warning
2014    /// This can have a negative effect on query performance. This may for instance block
2015    /// predicate pushdown optimization.
2016    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2017    where
2018        S: Into<PlSmallStr>,
2019    {
2020        let name = name.into();
2021
2022        match &self.logical_plan {
2023            v @ DslPlan::Scan { scan_type, .. }
2024                if !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2025            {
2026                let DslPlan::Scan {
2027                    sources,
2028                    mut unified_scan_args,
2029                    scan_type,
2030                    cached_ir: _,
2031                } = v.clone()
2032                else {
2033                    unreachable!()
2034                };
2035
2036                unified_scan_args.row_index = Some(RowIndex {
2037                    name,
2038                    offset: offset.unwrap_or(0),
2039                });
2040
2041                DslPlan::Scan {
2042                    sources,
2043                    unified_scan_args,
2044                    scan_type,
2045                    cached_ir: Default::default(),
2046                }
2047                .into()
2048            },
2049            _ => self.map_private(DslFunction::RowIndex { name, offset }),
2050        }
2051    }
2052
2053    /// Return the number of non-null elements for each column.
2054    pub fn count(self) -> LazyFrame {
2055        self.select(vec![col(PlSmallStr::from_static("*")).count()])
2056    }
2057
2058    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2059    /// inserted as columns.
2060    #[cfg(feature = "dtype-struct")]
2061    pub fn unnest(self, cols: Selector) -> Self {
2062        self.map_private(DslFunction::Unnest(cols))
2063    }
2064
2065    #[cfg(feature = "merge_sorted")]
2066    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2067    where
2068        S: Into<PlSmallStr>,
2069    {
2070        let key = key.into();
2071
2072        let lp = DslPlan::MergeSorted {
2073            input_left: Arc::new(self.logical_plan),
2074            input_right: Arc::new(other.logical_plan),
2075            key,
2076        };
2077        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2078    }
2079}
2080
2081/// Utility struct for lazy group_by operation.
2082#[derive(Clone)]
2083pub struct LazyGroupBy {
2084    pub logical_plan: DslPlan,
2085    opt_state: OptFlags,
2086    keys: Vec<Expr>,
2087    maintain_order: bool,
2088    #[cfg(feature = "dynamic_group_by")]
2089    dynamic_options: Option<DynamicGroupOptions>,
2090    #[cfg(feature = "dynamic_group_by")]
2091    rolling_options: Option<RollingGroupOptions>,
2092}
2093
2094impl From<LazyGroupBy> for LazyFrame {
2095    fn from(lgb: LazyGroupBy) -> Self {
2096        Self {
2097            logical_plan: lgb.logical_plan,
2098            opt_state: lgb.opt_state,
2099            cached_arena: Default::default(),
2100        }
2101    }
2102}
2103
2104impl LazyGroupBy {
2105    /// Group by and aggregate.
2106    ///
2107    /// Select a column with [col] and choose an aggregation.
2108    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2109    ///
2110    /// # Example
2111    ///
2112    /// ```rust
2113    /// use polars_core::prelude::*;
2114    /// use polars_lazy::prelude::*;
2115    ///
2116    /// fn example(df: DataFrame) -> LazyFrame {
2117    ///       df.lazy()
2118    ///        .group_by_stable([col("date")])
2119    ///        .agg([
2120    ///            col("rain").min().alias("min_rain"),
2121    ///            col("rain").sum().alias("sum_rain"),
2122    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2123    ///        ])
2124    /// }
2125    /// ```
2126    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2127        #[cfg(feature = "dynamic_group_by")]
2128        let lp = DslBuilder::from(self.logical_plan)
2129            .group_by(
2130                self.keys,
2131                aggs,
2132                None,
2133                self.maintain_order,
2134                self.dynamic_options,
2135                self.rolling_options,
2136            )
2137            .build();
2138
2139        #[cfg(not(feature = "dynamic_group_by"))]
2140        let lp = DslBuilder::from(self.logical_plan)
2141            .group_by(self.keys, aggs, None, self.maintain_order)
2142            .build();
2143        LazyFrame::from_logical_plan(lp, self.opt_state)
2144    }
2145
2146    /// Return first n rows of each group
2147    pub fn head(self, n: Option<usize>) -> LazyFrame {
2148        let keys = self
2149            .keys
2150            .iter()
2151            .filter_map(|expr| expr_output_name(expr).ok())
2152            .collect::<Vec<_>>();
2153
2154        self.agg([all().as_expr().head(n)])
2155            .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2156    }
2157
2158    /// Return last n rows of each group
2159    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2160        let keys = self
2161            .keys
2162            .iter()
2163            .filter_map(|expr| expr_output_name(expr).ok())
2164            .collect::<Vec<_>>();
2165
2166        self.agg([all().as_expr().tail(n)])
2167            .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2168    }
2169
2170    /// Apply a function over the groups as a new DataFrame.
2171    ///
2172    /// **It is not recommended that you use this as materializing the DataFrame is very
2173    /// expensive.**
2174    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2175        #[cfg(feature = "dynamic_group_by")]
2176        let options = GroupbyOptions {
2177            dynamic: self.dynamic_options,
2178            rolling: self.rolling_options,
2179            slice: None,
2180        };
2181
2182        #[cfg(not(feature = "dynamic_group_by"))]
2183        let options = GroupbyOptions { slice: None };
2184
2185        let lp = DslPlan::GroupBy {
2186            input: Arc::new(self.logical_plan),
2187            keys: self.keys,
2188            aggs: vec![],
2189            apply: Some((f, schema)),
2190            maintain_order: self.maintain_order,
2191            options: Arc::new(options),
2192        };
2193        LazyFrame::from_logical_plan(lp, self.opt_state)
2194    }
2195}
2196
2197#[must_use]
2198pub struct JoinBuilder {
2199    lf: LazyFrame,
2200    how: JoinType,
2201    other: Option<LazyFrame>,
2202    left_on: Vec<Expr>,
2203    right_on: Vec<Expr>,
2204    allow_parallel: bool,
2205    force_parallel: bool,
2206    suffix: Option<PlSmallStr>,
2207    validation: JoinValidation,
2208    nulls_equal: bool,
2209    coalesce: JoinCoalesce,
2210    maintain_order: MaintainOrderJoin,
2211}
2212impl JoinBuilder {
2213    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2214    pub fn new(lf: LazyFrame) -> Self {
2215        Self {
2216            lf,
2217            other: None,
2218            how: JoinType::Inner,
2219            left_on: vec![],
2220            right_on: vec![],
2221            allow_parallel: true,
2222            force_parallel: false,
2223            suffix: None,
2224            validation: Default::default(),
2225            nulls_equal: false,
2226            coalesce: Default::default(),
2227            maintain_order: Default::default(),
2228        }
2229    }
2230
2231    /// The right table in the join.
2232    pub fn with(mut self, other: LazyFrame) -> Self {
2233        self.other = Some(other);
2234        self
2235    }
2236
2237    /// Select the join type.
2238    pub fn how(mut self, how: JoinType) -> Self {
2239        self.how = how;
2240        self
2241    }
2242
2243    pub fn validate(mut self, validation: JoinValidation) -> Self {
2244        self.validation = validation;
2245        self
2246    }
2247
2248    /// The expressions you want to join both tables on.
2249    ///
2250    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2251    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2252        let on = on.as_ref().to_vec();
2253        self.left_on.clone_from(&on);
2254        self.right_on = on;
2255        self
2256    }
2257
2258    /// The expressions you want to join the left table on.
2259    ///
2260    /// The passed expressions must be valid in the left table.
2261    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2262        self.left_on = on.as_ref().to_vec();
2263        self
2264    }
2265
2266    /// The expressions you want to join the right table on.
2267    ///
2268    /// The passed expressions must be valid in the right table.
2269    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2270        self.right_on = on.as_ref().to_vec();
2271        self
2272    }
2273
2274    /// Allow parallel table evaluation.
2275    pub fn allow_parallel(mut self, allow: bool) -> Self {
2276        self.allow_parallel = allow;
2277        self
2278    }
2279
2280    /// Force parallel table evaluation.
2281    pub fn force_parallel(mut self, force: bool) -> Self {
2282        self.force_parallel = force;
2283        self
2284    }
2285
2286    /// Join on null values. By default null values will never produce matches.
2287    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2288        self.nulls_equal = nulls_equal;
2289        self
2290    }
2291
2292    /// Suffix to add duplicate column names in join.
2293    /// Defaults to `"_right"` if this method is never called.
2294    pub fn suffix<S>(mut self, suffix: S) -> Self
2295    where
2296        S: Into<PlSmallStr>,
2297    {
2298        self.suffix = Some(suffix.into());
2299        self
2300    }
2301
2302    /// Whether to coalesce join columns.
2303    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2304        self.coalesce = coalesce;
2305        self
2306    }
2307
2308    /// Whether to preserve the row order.
2309    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2310        self.maintain_order = maintain_order;
2311        self
2312    }
2313
2314    /// Finish builder
2315    pub fn finish(self) -> LazyFrame {
2316        let opt_state = self.lf.opt_state;
2317        let other = self.other.expect("'with' not set in join builder");
2318
2319        let args = JoinArgs {
2320            how: self.how,
2321            validation: self.validation,
2322            suffix: self.suffix,
2323            slice: None,
2324            nulls_equal: self.nulls_equal,
2325            coalesce: self.coalesce,
2326            maintain_order: self.maintain_order,
2327        };
2328
2329        let lp = self
2330            .lf
2331            .get_plan_builder()
2332            .join(
2333                other.logical_plan,
2334                self.left_on,
2335                self.right_on,
2336                JoinOptions {
2337                    allow_parallel: self.allow_parallel,
2338                    force_parallel: self.force_parallel,
2339                    args,
2340                }
2341                .into(),
2342            )
2343            .build();
2344        LazyFrame::from_logical_plan(lp, opt_state)
2345    }
2346
2347    // Finish with join predicates
2348    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2349        let opt_state = self.lf.opt_state;
2350        let other = self.other.expect("with not set");
2351
2352        // Decompose `And` conjunctions into their component expressions
2353        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2354            if let Expr::BinaryExpr {
2355                op: Operator::And,
2356                left,
2357                right,
2358            } = predicate
2359            {
2360                decompose_and((*left).clone(), expanded_predicates);
2361                decompose_and((*right).clone(), expanded_predicates);
2362            } else {
2363                expanded_predicates.push(predicate);
2364            }
2365        }
2366        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2367        for predicate in predicates {
2368            decompose_and(predicate, &mut expanded_predicates);
2369        }
2370        let predicates: Vec<Expr> = expanded_predicates;
2371
2372        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2373        #[cfg(feature = "is_between")]
2374        let predicates: Vec<Expr> = {
2375            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2376            for predicate in predicates {
2377                if let Expr::Function {
2378                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2379                    input,
2380                    ..
2381                } = &predicate
2382                {
2383                    if let [expr, lower, upper] = input.as_slice() {
2384                        match closed {
2385                            ClosedInterval::Both => {
2386                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2387                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2388                            },
2389                            ClosedInterval::Right => {
2390                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2391                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2392                            },
2393                            ClosedInterval::Left => {
2394                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2395                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2396                            },
2397                            ClosedInterval::None => {
2398                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2399                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2400                            },
2401                        }
2402                        continue;
2403                    }
2404                }
2405                expanded_predicates.push(predicate);
2406            }
2407            expanded_predicates
2408        };
2409
2410        let args = JoinArgs {
2411            how: self.how,
2412            validation: self.validation,
2413            suffix: self.suffix,
2414            slice: None,
2415            nulls_equal: self.nulls_equal,
2416            coalesce: self.coalesce,
2417            maintain_order: self.maintain_order,
2418        };
2419        let options = JoinOptions {
2420            allow_parallel: self.allow_parallel,
2421            force_parallel: self.force_parallel,
2422            args,
2423        };
2424
2425        let lp = DslPlan::Join {
2426            input_left: Arc::new(self.lf.logical_plan),
2427            input_right: Arc::new(other.logical_plan),
2428            left_on: Default::default(),
2429            right_on: Default::default(),
2430            predicates,
2431            options: Arc::from(options),
2432        };
2433
2434        LazyFrame::from_logical_plan(lp, opt_state)
2435    }
2436}
2437
2438pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2439    #[cfg(not(feature = "new_streaming"))]
2440    {
2441        None
2442    }
2443    #[cfg(feature = "new_streaming")]
2444    {
2445        Some(streaming_dispatch::build_streaming_query_executor)
2446    }
2447};
2448#[cfg(feature = "new_streaming")]
2449pub use streaming_dispatch::build_streaming_query_executor;
2450
2451#[cfg(feature = "new_streaming")]
2452mod streaming_dispatch {
2453    use std::sync::{Arc, Mutex};
2454
2455    use polars_core::POOL;
2456    use polars_core::error::PolarsResult;
2457    use polars_core::frame::DataFrame;
2458    use polars_expr::state::ExecutionState;
2459    use polars_mem_engine::Executor;
2460    use polars_plan::dsl::SinkTypeIR;
2461    use polars_plan::plans::{AExpr, IR};
2462    use polars_utils::arena::{Arena, Node};
2463
2464    pub fn build_streaming_query_executor(
2465        node: Node,
2466        ir_arena: &mut Arena<IR>,
2467        expr_arena: &mut Arena<AExpr>,
2468    ) -> PolarsResult<Box<dyn Executor>> {
2469        let rechunk = match ir_arena.get(node) {
2470            IR::Scan {
2471                unified_scan_args, ..
2472            } => unified_scan_args.rechunk,
2473            _ => false,
2474        };
2475
2476        let node = match ir_arena.get(node) {
2477            IR::SinkMultiple { .. } => panic!("SinkMultiple not supported"),
2478            IR::Sink { .. } => node,
2479            _ => ir_arena.add(IR::Sink {
2480                input: node,
2481                payload: SinkTypeIR::Memory,
2482            }),
2483        };
2484
2485        polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2486            .map(Some)
2487            .map(Mutex::new)
2488            .map(Arc::new)
2489            .map(|x| StreamingQueryExecutor {
2490                executor: x,
2491                rechunk,
2492            })
2493            .map(|x| Box::new(x) as Box<dyn Executor>)
2494    }
2495
2496    // Note: Arc/Mutex is because Executor requires Sync, but SlotMap is not Sync.
2497    struct StreamingQueryExecutor {
2498        executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2499        rechunk: bool,
2500    }
2501
2502    impl Executor for StreamingQueryExecutor {
2503        fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2504            // Must not block rayon thread on pending new-streaming future.
2505            assert!(POOL.current_thread_index().is_none());
2506
2507            let mut df = { self.executor.try_lock().unwrap().take() }
2508                .expect("unhandled: execute() more than once")
2509                .execute()
2510                .map(|x| x.unwrap_single())?;
2511
2512            if self.rechunk {
2513                df.as_single_chunk_par();
2514            }
2515
2516            Ok(df)
2517        }
2518    }
2519}