polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::POOL;
26use polars_core::error::feature_gated;
27use polars_core::prelude::*;
28use polars_io::RowIndex;
29use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
30use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
31use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
32#[cfg(feature = "is_between")]
33use polars_ops::prelude::ClosedInterval;
34pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
35use polars_utils::pl_str::PlSmallStr;
36use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
37
38use crate::frame::cached_arenas::CachedArena;
39use crate::prelude::*;
40
41pub trait IntoLazy {
42    fn lazy(self) -> LazyFrame;
43}
44
45impl IntoLazy for DataFrame {
46    /// Convert the `DataFrame` into a `LazyFrame`
47    fn lazy(self) -> LazyFrame {
48        let lp = DslBuilder::from_existing_df(self).build();
49        LazyFrame {
50            logical_plan: lp,
51            opt_state: Default::default(),
52            cached_arena: Default::default(),
53        }
54    }
55}
56
57impl IntoLazy for LazyFrame {
58    fn lazy(self) -> LazyFrame {
59        self
60    }
61}
62
63/// Lazy abstraction over an eager `DataFrame`.
64///
65/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
66/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
67#[derive(Clone, Default)]
68#[must_use]
69pub struct LazyFrame {
70    pub logical_plan: DslPlan,
71    pub(crate) opt_state: OptFlags,
72    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
73}
74
75impl From<DslPlan> for LazyFrame {
76    fn from(plan: DslPlan) -> Self {
77        Self {
78            logical_plan: plan,
79            opt_state: OptFlags::default(),
80            cached_arena: Default::default(),
81        }
82    }
83}
84
85impl LazyFrame {
86    pub(crate) fn from_inner(
87        logical_plan: DslPlan,
88        opt_state: OptFlags,
89        cached_arena: Arc<Mutex<Option<CachedArena>>>,
90    ) -> Self {
91        Self {
92            logical_plan,
93            opt_state,
94            cached_arena,
95        }
96    }
97
98    pub(crate) fn get_plan_builder(self) -> DslBuilder {
99        DslBuilder::from(self.logical_plan)
100    }
101
102    fn get_opt_state(&self) -> OptFlags {
103        self.opt_state
104    }
105
106    pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
107        LazyFrame {
108            logical_plan,
109            opt_state,
110            cached_arena: Default::default(),
111        }
112    }
113
114    /// Get current optimizations.
115    pub fn get_current_optimizations(&self) -> OptFlags {
116        self.opt_state
117    }
118
119    /// Set allowed optimizations.
120    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
121        self.opt_state = opt_state;
122        self
123    }
124
125    /// Turn off all optimizations.
126    pub fn without_optimizations(self) -> Self {
127        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
128    }
129
130    /// Toggle projection pushdown optimization.
131    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
132        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
133        self
134    }
135
136    /// Toggle cluster with columns optimization.
137    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
138        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
139        self
140    }
141
142    /// Check if operations are order dependent and unset maintaining_order if
143    /// the order would not be observed.
144    pub fn with_check_order(mut self, toggle: bool) -> Self {
145        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
146        self
147    }
148
149    /// Toggle predicate pushdown optimization.
150    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
151        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
152        self
153    }
154
155    /// Toggle type coercion optimization.
156    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
157        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
158        self
159    }
160
161    /// Toggle type check optimization.
162    pub fn with_type_check(mut self, toggle: bool) -> Self {
163        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
164        self
165    }
166
167    /// Toggle expression simplification optimization on or off.
168    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
169        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
170        self
171    }
172
173    /// Toggle common subplan elimination optimization on or off
174    #[cfg(feature = "cse")]
175    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
176        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
177        self
178    }
179
180    /// Toggle common subexpression elimination optimization on or off
181    #[cfg(feature = "cse")]
182    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
183        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
184        self
185    }
186
187    /// Toggle slice pushdown optimization.
188    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
189        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
190        self
191    }
192
193    #[cfg(feature = "new_streaming")]
194    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
195        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
196        self
197    }
198
199    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
200    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
201        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
202        self
203    }
204
205    /// Run every node eagerly. This turns off multi-node optimizations.
206    pub fn _with_eager(mut self, toggle: bool) -> Self {
207        self.opt_state.set(OptFlags::EAGER, toggle);
208        self
209    }
210
211    /// Return a String describing the naive (un-optimized) logical plan.
212    pub fn describe_plan(&self) -> PolarsResult<String> {
213        Ok(self.clone().to_alp()?.describe())
214    }
215
216    /// Return a String describing the naive (un-optimized) logical plan in tree format.
217    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
218        Ok(self.clone().to_alp()?.describe_tree_format())
219    }
220
221    /// Return a String describing the optimized logical plan.
222    ///
223    /// Returns `Err` if optimizing the logical plan fails.
224    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
225        Ok(self.clone().to_alp_optimized()?.describe())
226    }
227
228    /// Return a String describing the optimized logical plan in tree format.
229    ///
230    /// Returns `Err` if optimizing the logical plan fails.
231    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
232        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
233    }
234
235    /// Return a String describing the logical plan.
236    ///
237    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
238    /// explains the naive, un-optimized plan.
239    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
240        if optimized {
241            self.describe_optimized_plan()
242        } else {
243            self.describe_plan()
244        }
245    }
246
247    /// Add a sort operation to the logical plan.
248    ///
249    /// Sorts the LazyFrame by the column name specified using the provided options.
250    ///
251    /// # Example
252    ///
253    /// Sort DataFrame by 'sepal_width' column:
254    /// ```rust
255    /// # use polars_core::prelude::*;
256    /// # use polars_lazy::prelude::*;
257    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
258    ///     df.lazy().sort(["sepal_width"], Default::default())
259    /// }
260    /// ```
261    /// Sort by a single column with specific order:
262    /// ```
263    /// # use polars_core::prelude::*;
264    /// # use polars_lazy::prelude::*;
265    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
266    ///     df.lazy().sort(
267    ///         ["sepal_width"],
268    ///         SortMultipleOptions::new()
269    ///             .with_order_descending(descending)
270    ///     )
271    /// }
272    /// ```
273    /// Sort by multiple columns with specifying order for each column:
274    /// ```
275    /// # use polars_core::prelude::*;
276    /// # use polars_lazy::prelude::*;
277    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
278    ///     df.lazy().sort(
279    ///         ["sepal_width", "sepal_length"],
280    ///         SortMultipleOptions::new()
281    ///             .with_order_descending_multi([false, true])
282    ///     )
283    /// }
284    /// ```
285    /// See [`SortMultipleOptions`] for more options.
286    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
287        let opt_state = self.get_opt_state();
288        let lp = self
289            .get_plan_builder()
290            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
291            .build();
292        Self::from_logical_plan(lp, opt_state)
293    }
294
295    /// Add a sort operation to the logical plan.
296    ///
297    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
298    /// concrete columns before sorting.
299    ///
300    /// See [`SortMultipleOptions`] for more options.
301    ///
302    /// # Example
303    ///
304    /// ```rust
305    /// use polars_core::prelude::*;
306    /// use polars_lazy::prelude::*;
307    ///
308    /// /// Sort DataFrame by 'sepal_width' column
309    /// fn example(df: DataFrame) -> LazyFrame {
310    ///       df.lazy()
311    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
312    /// }
313    /// ```
314    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
315        self,
316        by_exprs: E,
317        sort_options: SortMultipleOptions,
318    ) -> Self {
319        let by_exprs = by_exprs.as_ref().to_vec();
320        if by_exprs.is_empty() {
321            self
322        } else {
323            let opt_state = self.get_opt_state();
324            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
325            Self::from_logical_plan(lp, opt_state)
326        }
327    }
328
329    pub fn top_k<E: AsRef<[Expr]>>(
330        self,
331        k: IdxSize,
332        by_exprs: E,
333        sort_options: SortMultipleOptions,
334    ) -> Self {
335        // this will optimize to top-k
336        self.sort_by_exprs(
337            by_exprs,
338            sort_options.with_order_reversed().with_nulls_last(true),
339        )
340        .slice(0, k)
341    }
342
343    pub fn bottom_k<E: AsRef<[Expr]>>(
344        self,
345        k: IdxSize,
346        by_exprs: E,
347        sort_options: SortMultipleOptions,
348    ) -> Self {
349        // this will optimize to bottom-k
350        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
351            .slice(0, k)
352    }
353
354    /// Reverse the `DataFrame` from top to bottom.
355    ///
356    /// Row `i` becomes row `number_of_rows - i - 1`.
357    ///
358    /// # Example
359    ///
360    /// ```rust
361    /// use polars_core::prelude::*;
362    /// use polars_lazy::prelude::*;
363    ///
364    /// fn example(df: DataFrame) -> LazyFrame {
365    ///       df.lazy()
366    ///         .reverse()
367    /// }
368    /// ```
369    pub fn reverse(self) -> Self {
370        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
371    }
372
373    /// Rename columns in the DataFrame.
374    ///
375    /// `existing` and `new` are iterables of the same length containing the old and
376    /// corresponding new column names. Renaming happens to all `existing` columns
377    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
378    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
379    /// those columns that are actually found will be renamed (others will be ignored).
380    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
381    where
382        I: IntoIterator<Item = T>,
383        J: IntoIterator<Item = S>,
384        T: AsRef<str>,
385        S: AsRef<str>,
386    {
387        let iter = existing.into_iter();
388        let cap = iter.size_hint().0;
389        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
391
392        // TODO! should this error if `existing` and `new` have different lengths?
393        // Currently, the longer of the two is truncated.
394        for (existing, new) in iter.zip(new) {
395            let existing = existing.as_ref();
396            let new = new.as_ref();
397            if new != existing {
398                existing_vec.push(existing.into());
399                new_vec.push(new.into());
400            }
401        }
402
403        self.map_private(DslFunction::Rename {
404            existing: existing_vec.into(),
405            new: new_vec.into(),
406            strict,
407        })
408    }
409
410    /// Removes columns from the DataFrame.
411    /// Note that it's better to only select the columns you need
412    /// and let the projection pushdown optimize away the unneeded columns.
413    ///
414    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
415    /// error while materializing the [`LazyFrame`].
416    pub fn drop(self, columns: Selector) -> Self {
417        let opt_state = self.get_opt_state();
418        let lp = self.get_plan_builder().drop(columns).build();
419        Self::from_logical_plan(lp, opt_state)
420    }
421
422    /// Shift the values by a given period and fill the parts that will be empty due to this operation
423    /// with `Nones`.
424    ///
425    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
426    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
427        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
428    }
429
430    /// Shift the values by a given period and fill the parts that will be empty due to this operation
431    /// with the result of the `fill_value` expression.
432    ///
433    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
434    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
435        self.select(vec![
436            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
437        ])
438    }
439
440    /// Fill None values in the DataFrame with an expression.
441    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
442        let opt_state = self.get_opt_state();
443        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
444        Self::from_logical_plan(lp, opt_state)
445    }
446
447    /// Fill NaN values in the DataFrame with an expression.
448    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
449        let opt_state = self.get_opt_state();
450        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
451        Self::from_logical_plan(lp, opt_state)
452    }
453
454    /// Caches the result into a new LazyFrame.
455    ///
456    /// This should be used to prevent computations running multiple times.
457    pub fn cache(self) -> Self {
458        let opt_state = self.get_opt_state();
459        let lp = self.get_plan_builder().cache().build();
460        Self::from_logical_plan(lp, opt_state)
461    }
462
463    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
464    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
465        let cast_cols: Vec<Expr> = dtypes
466            .into_iter()
467            .map(|(name, dt)| {
468                let name = PlSmallStr::from_str(name);
469
470                if strict {
471                    col(name).strict_cast(dt)
472                } else {
473                    col(name).cast(dt)
474                }
475            })
476            .collect();
477
478        if cast_cols.is_empty() {
479            self
480        } else {
481            self.with_columns(cast_cols)
482        }
483    }
484
485    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
486    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
487        self.with_columns(vec![if strict {
488            col(PlSmallStr::from_static("*")).strict_cast(dtype)
489        } else {
490            col(PlSmallStr::from_static("*")).cast(dtype)
491        }])
492    }
493
494    pub fn optimize(
495        self,
496        lp_arena: &mut Arena<IR>,
497        expr_arena: &mut Arena<AExpr>,
498    ) -> PolarsResult<Node> {
499        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
500    }
501
502    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
503        let (mut lp_arena, mut expr_arena) = self.get_arenas();
504        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
505
506        Ok(IRPlan::new(node, lp_arena, expr_arena))
507    }
508
509    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
510        let (mut lp_arena, mut expr_arena) = self.get_arenas();
511        let node = to_alp(
512            self.logical_plan,
513            &mut expr_arena,
514            &mut lp_arena,
515            &mut self.opt_state,
516        )?;
517        let plan = IRPlan::new(node, lp_arena, expr_arena);
518        Ok(plan)
519    }
520
521    pub(crate) fn optimize_with_scratch(
522        self,
523        lp_arena: &mut Arena<IR>,
524        expr_arena: &mut Arena<AExpr>,
525        scratch: &mut Vec<Node>,
526    ) -> PolarsResult<Node> {
527        let lp_top = optimize(
528            self.logical_plan,
529            self.opt_state,
530            lp_arena,
531            expr_arena,
532            scratch,
533            apply_scan_predicate_to_scan_ir,
534        )?;
535
536        Ok(lp_top)
537    }
538
539    fn prepare_collect_post_opt<P>(
540        mut self,
541        check_sink: bool,
542        query_start: Option<std::time::Instant>,
543        post_opt: P,
544    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
545    where
546        P: FnOnce(
547            Node,
548            &mut Arena<IR>,
549            &mut Arena<AExpr>,
550            Option<std::time::Duration>,
551        ) -> PolarsResult<()>,
552    {
553        let (mut lp_arena, mut expr_arena) = self.get_arenas();
554
555        let mut scratch = vec![];
556        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
557
558        post_opt(
559            lp_top,
560            &mut lp_arena,
561            &mut expr_arena,
562            // Post optimization callback gets the time since the
563            // query was started as its "base" timepoint.
564            query_start.map(|s| s.elapsed()),
565        )?;
566
567        // sink should be replaced
568        let no_file_sink = if check_sink {
569            !matches!(
570                lp_arena.get(lp_top),
571                IR::Sink {
572                    payload: SinkTypeIR::File { .. },
573                    ..
574                }
575            )
576        } else {
577            true
578        };
579        let physical_plan = create_physical_plan(
580            lp_top,
581            &mut lp_arena,
582            &mut expr_arena,
583            BUILD_STREAMING_EXECUTOR,
584        )?;
585
586        let state = ExecutionState::new();
587        Ok((state, physical_plan, no_file_sink))
588    }
589
590    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
591    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
592    where
593        P: FnOnce(
594            Node,
595            &mut Arena<IR>,
596            &mut Arena<AExpr>,
597            Option<std::time::Duration>,
598        ) -> PolarsResult<()>,
599    {
600        let (mut state, mut physical_plan, _) =
601            self.prepare_collect_post_opt(false, None, post_opt)?;
602        physical_plan.execute(&mut state)
603    }
604
605    #[allow(unused_mut)]
606    fn prepare_collect(
607        self,
608        check_sink: bool,
609        query_start: Option<std::time::Instant>,
610    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
611        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
612    }
613
614    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
615    /// `engine`.
616    ///
617    /// The query is optimized prior to execution.
618    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
619        let payload = match &self.logical_plan {
620            DslPlan::Sink { payload, .. } => payload.clone(),
621            DslPlan::SinkMultiple { .. } => {
622                polars_ensure!(matches!(engine, Engine::Auto | Engine::Streaming), InvalidOperation: "lazy multisinks only supported on streaming engine");
623                feature_gated!("new_streaming", {
624                    let sink_multiple = self.with_new_streaming(true);
625                    let mut alp_plan = sink_multiple.to_alp_optimized()?;
626                    let result = polars_stream::run_query(
627                        alp_plan.lp_top,
628                        &mut alp_plan.lp_arena,
629                        &mut alp_plan.expr_arena,
630                    );
631                    return result.map(|_| DataFrame::empty());
632                })
633            },
634            _ => {
635                self.logical_plan = DslPlan::Sink {
636                    input: Arc::new(self.logical_plan),
637                    payload: SinkType::Memory,
638                };
639                SinkType::Memory
640            },
641        };
642
643        // Default engine for collect is InMemory, sink_* is Streaming
644        if engine == Engine::Auto {
645            engine = match payload {
646                #[cfg(feature = "new_streaming")]
647                SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
648                _ => Engine::InMemory,
649            };
650        }
651        // Gpu uses some hacks to dispatch.
652        if engine == Engine::Gpu {
653            engine = Engine::InMemory;
654        }
655
656        #[cfg(feature = "new_streaming")]
657        {
658            if let Some(result) = self.try_new_streaming_if_requested() {
659                return result.map(|v| v.unwrap_single());
660            }
661        }
662
663        match engine {
664            Engine::Auto => unreachable!(),
665            Engine::Streaming => {
666                feature_gated!("new_streaming", self = self.with_new_streaming(true))
667            },
668            _ => {},
669        }
670        let mut alp_plan = self.clone().to_alp_optimized()?;
671
672        match engine {
673            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
674                let result = polars_stream::run_query(
675                    alp_plan.lp_top,
676                    &mut alp_plan.lp_arena,
677                    &mut alp_plan.expr_arena,
678                );
679                result.map(|v| v.unwrap_single())
680            }),
681            Engine::Gpu => {
682                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
683            },
684            Engine::InMemory => {
685                let mut physical_plan = create_physical_plan(
686                    alp_plan.lp_top,
687                    &mut alp_plan.lp_arena,
688                    &mut alp_plan.expr_arena,
689                    BUILD_STREAMING_EXECUTOR,
690                )?;
691                let mut state = ExecutionState::new();
692                physical_plan.execute(&mut state)
693            },
694        }
695    }
696
697    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
698        let sink_multiple = LazyFrame {
699            logical_plan: DslPlan::SinkMultiple { inputs: plans },
700            opt_state,
701            cached_arena: Default::default(),
702        };
703        sink_multiple.explain(true)
704    }
705
706    pub fn collect_all_with_engine(
707        plans: Vec<DslPlan>,
708        mut engine: Engine,
709        opt_state: OptFlags,
710    ) -> PolarsResult<Vec<DataFrame>> {
711        if plans.is_empty() {
712            return Ok(Vec::new());
713        }
714
715        // Default engine for collect_all is InMemory
716        if engine == Engine::Auto {
717            engine = Engine::InMemory;
718        }
719        // Gpu uses some hacks to dispatch.
720        if engine == Engine::Gpu {
721            engine = Engine::InMemory;
722        }
723
724        let mut sink_multiple = LazyFrame {
725            logical_plan: DslPlan::SinkMultiple { inputs: plans },
726            opt_state,
727            cached_arena: Default::default(),
728        };
729
730        #[cfg(feature = "new_streaming")]
731        {
732            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
733                return result.map(|v| v.unwrap_multiple());
734            }
735        }
736
737        match engine {
738            Engine::Auto => unreachable!(),
739            Engine::Streaming => {
740                feature_gated!(
741                    "new_streaming",
742                    sink_multiple = sink_multiple.with_new_streaming(true)
743                )
744            },
745            _ => {},
746        }
747        let mut alp_plan = sink_multiple.to_alp_optimized()?;
748
749        if engine == Engine::Streaming {
750            feature_gated!("new_streaming", {
751                let result = polars_stream::run_query(
752                    alp_plan.lp_top,
753                    &mut alp_plan.lp_arena,
754                    &mut alp_plan.expr_arena,
755                );
756                return result.map(|v| v.unwrap_multiple());
757            });
758        }
759
760        let IR::SinkMultiple { inputs } = alp_plan.root() else {
761            unreachable!()
762        };
763
764        let mut multiplan = create_multiple_physical_plans(
765            inputs.clone().as_slice(),
766            &mut alp_plan.lp_arena,
767            &mut alp_plan.expr_arena,
768            BUILD_STREAMING_EXECUTOR,
769        )?;
770
771        match engine {
772            Engine::Gpu => polars_bail!(
773                InvalidOperation: "collect_all is not supported for the gpu engine"
774            ),
775            Engine::InMemory => {
776                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
777                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
778                // within bounds
779                let mut state = ExecutionState::new();
780                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
781                    cache_prefiller.execute(&mut state)?;
782                }
783                let out = POOL.install(|| {
784                    multiplan
785                        .physical_plans
786                        .chunks_mut(POOL.current_num_threads() * 3)
787                        .map(|chunk| {
788                            chunk
789                                .into_par_iter()
790                                .enumerate()
791                                .map(|(idx, input)| {
792                                    let mut input = std::mem::take(input);
793                                    let mut state = state.split();
794                                    state.branch_idx += idx;
795
796                                    let df = input.execute(&mut state)?;
797                                    Ok(df)
798                                })
799                                .collect::<PolarsResult<Vec<_>>>()
800                        })
801                        .collect::<PolarsResult<Vec<_>>>()
802                });
803                Ok(out?.into_iter().flatten().collect())
804            },
805            _ => unreachable!(),
806        }
807    }
808
809    /// Execute all the lazy operations and collect them into a [`DataFrame`].
810    ///
811    /// The query is optimized prior to execution.
812    ///
813    /// # Example
814    ///
815    /// ```rust
816    /// use polars_core::prelude::*;
817    /// use polars_lazy::prelude::*;
818    ///
819    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
820    ///     df.lazy()
821    ///       .group_by([col("foo")])
822    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
823    ///       .collect()
824    /// }
825    /// ```
826    pub fn collect(self) -> PolarsResult<DataFrame> {
827        self.collect_with_engine(Engine::InMemory)
828    }
829
830    /// Collect the query in batches.
831    ///
832    /// If lazy is true the query will not start until the first poll (or until
833    /// start is called on CollectBatches).
834    #[cfg(feature = "async")]
835    pub fn collect_batches(
836        self,
837        engine: Engine,
838        maintain_order: bool,
839        chunk_size: Option<NonZeroUsize>,
840        lazy: bool,
841    ) -> PolarsResult<CollectBatches> {
842        let (send, recv) = sync_channel(1);
843        let runner_send = send.clone();
844        let ldf = self.sink_batches(
845            PlanCallback::new(move |df| {
846                // Stop if receiver has closed.
847                let send_result = send.send(Ok(df));
848                Ok(send_result.is_err())
849            }),
850            maintain_order,
851            chunk_size,
852        )?;
853        let runner = move || {
854            // We use a tokio spawn_blocking here as it has a high blocking
855            // thread pool limit.
856            polars_io::pl_async::get_runtime().spawn_blocking(move || {
857                if let Err(e) = ldf.collect_with_engine(engine) {
858                    runner_send.send(Err(e)).ok();
859                }
860            });
861        };
862
863        let mut collect_batches = CollectBatches {
864            recv,
865            runner: Some(Box::new(runner)),
866        };
867        if !lazy {
868            collect_batches.start();
869        }
870        Ok(collect_batches)
871    }
872
873    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
874    // This version does profiling of the node execution.
875    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
876    where
877        P: FnOnce(
878            Node,
879            &mut Arena<IR>,
880            &mut Arena<AExpr>,
881            Option<std::time::Duration>,
882        ) -> PolarsResult<()>,
883    {
884        let query_start = std::time::Instant::now();
885        let (mut state, mut physical_plan, _) =
886            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
887        state.time_nodes(query_start);
888        let out = physical_plan.execute(&mut state)?;
889        let timer_df = state.finish_timer()?;
890        Ok((out, timer_df))
891    }
892
893    /// Profile a LazyFrame.
894    ///
895    /// This will run the query and return a tuple
896    /// containing the materialized DataFrame and a DataFrame that contains profiling information
897    /// of each node that is executed.
898    ///
899    /// The units of the timings are microseconds.
900    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
901        self._profile_post_opt(|_, _, _, _| Ok(()))
902    }
903
904    pub fn sink_batches(
905        mut self,
906        function: PlanCallback<DataFrame, bool>,
907        maintain_order: bool,
908        chunk_size: Option<NonZeroUsize>,
909    ) -> PolarsResult<Self> {
910        use polars_plan::prelude::sink::CallbackSinkType;
911
912        polars_ensure!(
913            !matches!(self.logical_plan, DslPlan::Sink { .. }),
914            InvalidOperation: "cannot create a sink on top of another sink"
915        );
916
917        self.logical_plan = DslPlan::Sink {
918            input: Arc::new(self.logical_plan),
919            payload: SinkType::Callback(CallbackSinkType {
920                function,
921                maintain_order,
922                chunk_size,
923            }),
924        };
925
926        Ok(self)
927    }
928
929    #[cfg(feature = "new_streaming")]
930    pub fn try_new_streaming_if_requested(
931        &mut self,
932    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
933        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
934        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
935
936        if auto_new_streaming || force_new_streaming {
937            // Try to run using the new streaming engine, falling back
938            // if it fails in a todo!() error if auto_new_streaming is set.
939            let mut new_stream_lazy = self.clone();
940            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
941            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
942                Ok(v) => v,
943                Err(e) => return Some(Err(e)),
944            };
945
946            let f = || {
947                polars_stream::run_query(
948                    alp_plan.lp_top,
949                    &mut alp_plan.lp_arena,
950                    &mut alp_plan.expr_arena,
951                )
952            };
953
954            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
955                Ok(v) => return Some(v),
956                Err(e) => {
957                    // Fallback to normal engine if error is due to not being implemented
958                    // and auto_new_streaming is set, otherwise propagate error.
959                    if !force_new_streaming
960                        && auto_new_streaming
961                        && e.downcast_ref::<&str>()
962                            .map(|s| s.starts_with("not yet implemented"))
963                            .unwrap_or(false)
964                    {
965                        if polars_core::config::verbose() {
966                            eprintln!(
967                                "caught unimplemented error in new streaming engine, falling back to normal engine"
968                            );
969                        }
970                    } else {
971                        std::panic::resume_unwind(e);
972                    }
973                },
974            }
975        }
976
977        None
978    }
979
980    pub fn sink(
981        mut self,
982        sink_type: SinkDestination,
983        file_format: FileWriteFormat,
984        unified_sink_args: UnifiedSinkArgs,
985    ) -> PolarsResult<Self> {
986        polars_ensure!(
987            !matches!(self.logical_plan, DslPlan::Sink { .. }),
988            InvalidOperation: "cannot create a sink on top of another sink"
989        );
990
991        self.logical_plan = DslPlan::Sink {
992            input: Arc::new(self.logical_plan),
993            payload: match sink_type {
994                SinkDestination::File { target } => SinkType::File(FileSinkOptions {
995                    target,
996                    file_format,
997                    unified_sink_args,
998                }),
999                SinkDestination::Partitioned {
1000                    base_path,
1001                    file_path_provider,
1002                    partition_strategy,
1003                    max_rows_per_file,
1004                    approximate_bytes_per_file,
1005                } => SinkType::Partitioned(PartitionedSinkOptions {
1006                    base_path,
1007                    file_path_provider,
1008                    partition_strategy,
1009                    file_format,
1010                    unified_sink_args,
1011                    max_rows_per_file,
1012                    approximate_bytes_per_file,
1013                }),
1014            },
1015        };
1016        Ok(self)
1017    }
1018
1019    /// Filter frame rows that match a predicate expression.
1020    ///
1021    /// The expression must yield boolean values (note that rows where the
1022    /// predicate resolves to `null` are *not* included in the resulting frame).
1023    ///
1024    /// # Example
1025    ///
1026    /// ```rust
1027    /// use polars_core::prelude::*;
1028    /// use polars_lazy::prelude::*;
1029    ///
1030    /// fn example(df: DataFrame) -> LazyFrame {
1031    ///       df.lazy()
1032    ///         .filter(col("sepal_width").is_not_null())
1033    ///         .select([col("sepal_width"), col("sepal_length")])
1034    /// }
1035    /// ```
1036    pub fn filter(self, predicate: Expr) -> Self {
1037        let opt_state = self.get_opt_state();
1038        let lp = self.get_plan_builder().filter(predicate).build();
1039        Self::from_logical_plan(lp, opt_state)
1040    }
1041
1042    /// Remove frame rows that match a predicate expression.
1043    ///
1044    /// The expression must yield boolean values (note that rows where the
1045    /// predicate resolves to `null` are *not* removed from the resulting frame).
1046    ///
1047    /// # Example
1048    ///
1049    /// ```rust
1050    /// use polars_core::prelude::*;
1051    /// use polars_lazy::prelude::*;
1052    ///
1053    /// fn example(df: DataFrame) -> LazyFrame {
1054    ///       df.lazy()
1055    ///         .remove(col("sepal_width").is_null())
1056    ///         .select([col("sepal_width"), col("sepal_length")])
1057    /// }
1058    /// ```
1059    pub fn remove(self, predicate: Expr) -> Self {
1060        self.filter(predicate.neq_missing(lit(true)))
1061    }
1062
1063    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1064    ///
1065    /// Columns can be selected with [`col`];
1066    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1067    ///
1068    /// # Example
1069    ///
1070    /// ```rust
1071    /// use polars_core::prelude::*;
1072    /// use polars_lazy::prelude::*;
1073    ///
1074    /// /// This function selects column "foo" and column "bar".
1075    /// /// Column "bar" is renamed to "ham".
1076    /// fn example(df: DataFrame) -> LazyFrame {
1077    ///       df.lazy()
1078    ///         .select([col("foo"),
1079    ///                   col("bar").alias("ham")])
1080    /// }
1081    ///
1082    /// /// This function selects all columns except "foo"
1083    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1084    ///       df.lazy()
1085    ///         .select([all().exclude_cols(["foo"]).as_expr()])
1086    /// }
1087    /// ```
1088    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1089        let exprs = exprs.as_ref().to_vec();
1090        self.select_impl(
1091            exprs,
1092            ProjectionOptions {
1093                run_parallel: true,
1094                duplicate_check: true,
1095                should_broadcast: true,
1096            },
1097        )
1098    }
1099
1100    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1101        let exprs = exprs.as_ref().to_vec();
1102        self.select_impl(
1103            exprs,
1104            ProjectionOptions {
1105                run_parallel: false,
1106                duplicate_check: true,
1107                should_broadcast: true,
1108            },
1109        )
1110    }
1111
1112    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1113        let opt_state = self.get_opt_state();
1114        let lp = self.get_plan_builder().project(exprs, options).build();
1115        Self::from_logical_plan(lp, opt_state)
1116    }
1117
1118    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1119    ///
1120    /// Takes a list of expressions to group on.
1121    ///
1122    /// # Example
1123    ///
1124    /// ```rust
1125    /// use polars_core::prelude::*;
1126    /// use polars_lazy::prelude::*;
1127    ///
1128    /// fn example(df: DataFrame) -> LazyFrame {
1129    ///       df.lazy()
1130    ///        .group_by([col("date")])
1131    ///        .agg([
1132    ///            col("rain").min().alias("min_rain"),
1133    ///            col("rain").sum().alias("sum_rain"),
1134    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1135    ///        ])
1136    /// }
1137    /// ```
1138    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1139        let keys = by
1140            .as_ref()
1141            .iter()
1142            .map(|e| e.clone().into())
1143            .collect::<Vec<_>>();
1144        let opt_state = self.get_opt_state();
1145
1146        #[cfg(feature = "dynamic_group_by")]
1147        {
1148            LazyGroupBy {
1149                logical_plan: self.logical_plan,
1150                opt_state,
1151                keys,
1152                predicates: vec![],
1153                maintain_order: false,
1154                dynamic_options: None,
1155                rolling_options: None,
1156            }
1157        }
1158
1159        #[cfg(not(feature = "dynamic_group_by"))]
1160        {
1161            LazyGroupBy {
1162                logical_plan: self.logical_plan,
1163                opt_state,
1164                keys,
1165                predicates: vec![],
1166                maintain_order: false,
1167            }
1168        }
1169    }
1170
1171    /// Create rolling groups based on a time column.
1172    ///
1173    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1174    ///
1175    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1176    /// individual values and are not of constant intervals. For constant intervals use
1177    /// *group_by_dynamic*
1178    #[cfg(feature = "dynamic_group_by")]
1179    pub fn rolling<E: AsRef<[Expr]>>(
1180        mut self,
1181        index_column: Expr,
1182        group_by: E,
1183        mut options: RollingGroupOptions,
1184    ) -> LazyGroupBy {
1185        if let Expr::Column(name) = index_column {
1186            options.index_column = name;
1187        } else {
1188            let output_field = index_column
1189                .to_field(&self.collect_schema().unwrap())
1190                .unwrap();
1191            return self.with_column(index_column).rolling(
1192                Expr::Column(output_field.name().clone()),
1193                group_by,
1194                options,
1195            );
1196        }
1197        let opt_state = self.get_opt_state();
1198        LazyGroupBy {
1199            logical_plan: self.logical_plan,
1200            opt_state,
1201            predicates: vec![],
1202            keys: group_by.as_ref().to_vec(),
1203            maintain_order: true,
1204            dynamic_options: None,
1205            rolling_options: Some(options),
1206        }
1207    }
1208
1209    /// Group based on a time value (or index value of type Int32, Int64).
1210    ///
1211    /// Time windows are calculated and rows are assigned to windows. Different from a
1212    /// normal group_by is that a row can be member of multiple groups. The time/index
1213    /// window could be seen as a rolling window, with a window size determined by
1214    /// dates/times/values instead of slots in the DataFrame.
1215    ///
1216    /// A window is defined by:
1217    ///
1218    /// - every: interval of the window
1219    /// - period: length of the window
1220    /// - offset: offset of the window
1221    ///
1222    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1223    /// with a ordinary group_by on these keys.
1224    #[cfg(feature = "dynamic_group_by")]
1225    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1226        mut self,
1227        index_column: Expr,
1228        group_by: E,
1229        mut options: DynamicGroupOptions,
1230    ) -> LazyGroupBy {
1231        if let Expr::Column(name) = index_column {
1232            options.index_column = name;
1233        } else {
1234            let output_field = index_column
1235                .to_field(&self.collect_schema().unwrap())
1236                .unwrap();
1237            return self.with_column(index_column).group_by_dynamic(
1238                Expr::Column(output_field.name().clone()),
1239                group_by,
1240                options,
1241            );
1242        }
1243        let opt_state = self.get_opt_state();
1244        LazyGroupBy {
1245            logical_plan: self.logical_plan,
1246            opt_state,
1247            predicates: vec![],
1248            keys: group_by.as_ref().to_vec(),
1249            maintain_order: true,
1250            dynamic_options: Some(options),
1251            rolling_options: None,
1252        }
1253    }
1254
1255    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1256    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1257        let keys = by
1258            .as_ref()
1259            .iter()
1260            .map(|e| e.clone().into())
1261            .collect::<Vec<_>>();
1262        let opt_state = self.get_opt_state();
1263
1264        #[cfg(feature = "dynamic_group_by")]
1265        {
1266            LazyGroupBy {
1267                logical_plan: self.logical_plan,
1268                opt_state,
1269                keys,
1270                predicates: vec![],
1271                maintain_order: true,
1272                dynamic_options: None,
1273                rolling_options: None,
1274            }
1275        }
1276
1277        #[cfg(not(feature = "dynamic_group_by"))]
1278        {
1279            LazyGroupBy {
1280                logical_plan: self.logical_plan,
1281                opt_state,
1282                keys,
1283                predicates: vec![],
1284                maintain_order: true,
1285            }
1286        }
1287    }
1288
1289    /// Left anti join this query with another lazy query.
1290    ///
1291    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1292    /// flexible join logic, see [`join`](LazyFrame::join) or
1293    /// [`join_builder`](LazyFrame::join_builder).
1294    ///
1295    /// # Example
1296    ///
1297    /// ```rust
1298    /// use polars_core::prelude::*;
1299    /// use polars_lazy::prelude::*;
1300    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1301    ///         ldf
1302    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1303    /// }
1304    /// ```
1305    #[cfg(feature = "semi_anti_join")]
1306    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1307        self.join(
1308            other,
1309            [left_on.into()],
1310            [right_on.into()],
1311            JoinArgs::new(JoinType::Anti),
1312        )
1313    }
1314
1315    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1316    #[cfg(feature = "cross_join")]
1317    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1318        self.join(
1319            other,
1320            vec![],
1321            vec![],
1322            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1323        )
1324    }
1325
1326    /// Left outer join this query with another lazy query.
1327    ///
1328    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1329    /// flexible join logic, see [`join`](LazyFrame::join) or
1330    /// [`join_builder`](LazyFrame::join_builder).
1331    ///
1332    /// # Example
1333    ///
1334    /// ```rust
1335    /// use polars_core::prelude::*;
1336    /// use polars_lazy::prelude::*;
1337    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1338    ///         ldf
1339    ///         .left_join(other, col("foo"), col("bar"))
1340    /// }
1341    /// ```
1342    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1343        self.join(
1344            other,
1345            [left_on.into()],
1346            [right_on.into()],
1347            JoinArgs::new(JoinType::Left),
1348        )
1349    }
1350
1351    /// Inner join this query with another lazy query.
1352    ///
1353    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1354    /// flexible join logic, see [`join`](LazyFrame::join) or
1355    /// [`join_builder`](LazyFrame::join_builder).
1356    ///
1357    /// # Example
1358    ///
1359    /// ```rust
1360    /// use polars_core::prelude::*;
1361    /// use polars_lazy::prelude::*;
1362    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1363    ///         ldf
1364    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1365    /// }
1366    /// ```
1367    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1368        self.join(
1369            other,
1370            [left_on.into()],
1371            [right_on.into()],
1372            JoinArgs::new(JoinType::Inner),
1373        )
1374    }
1375
1376    /// Full outer join this query with another lazy query.
1377    ///
1378    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1379    /// flexible join logic, see [`join`](LazyFrame::join) or
1380    /// [`join_builder`](LazyFrame::join_builder).
1381    ///
1382    /// # Example
1383    ///
1384    /// ```rust
1385    /// use polars_core::prelude::*;
1386    /// use polars_lazy::prelude::*;
1387    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1388    ///         ldf
1389    ///         .full_join(other, col("foo"), col("bar"))
1390    /// }
1391    /// ```
1392    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1393        self.join(
1394            other,
1395            [left_on.into()],
1396            [right_on.into()],
1397            JoinArgs::new(JoinType::Full),
1398        )
1399    }
1400
1401    /// Left semi join this query with another lazy query.
1402    ///
1403    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1404    /// flexible join logic, see [`join`](LazyFrame::join) or
1405    /// [`join_builder`](LazyFrame::join_builder).
1406    ///
1407    /// # Example
1408    ///
1409    /// ```rust
1410    /// use polars_core::prelude::*;
1411    /// use polars_lazy::prelude::*;
1412    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1413    ///         ldf
1414    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1415    /// }
1416    /// ```
1417    #[cfg(feature = "semi_anti_join")]
1418    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1419        self.join(
1420            other,
1421            [left_on.into()],
1422            [right_on.into()],
1423            JoinArgs::new(JoinType::Semi),
1424        )
1425    }
1426
1427    /// Generic function to join two LazyFrames.
1428    ///
1429    /// `join` can join on multiple columns, given as two list of expressions, and with a
1430    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1431    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1432    /// over how columns are renamed and parallelization options, use
1433    /// [`join_builder`](LazyFrame::join_builder).
1434    ///
1435    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1436    ///
1437    /// # Example
1438    ///
1439    /// ```rust
1440    /// use polars_core::prelude::*;
1441    /// use polars_lazy::prelude::*;
1442    ///
1443    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1444    ///         ldf
1445    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1446    /// }
1447    /// ```
1448    pub fn join<E: AsRef<[Expr]>>(
1449        self,
1450        other: LazyFrame,
1451        left_on: E,
1452        right_on: E,
1453        args: JoinArgs,
1454    ) -> LazyFrame {
1455        let left_on = left_on.as_ref().to_vec();
1456        let right_on = right_on.as_ref().to_vec();
1457
1458        self._join_impl(other, left_on, right_on, args)
1459    }
1460
1461    fn _join_impl(
1462        self,
1463        other: LazyFrame,
1464        left_on: Vec<Expr>,
1465        right_on: Vec<Expr>,
1466        args: JoinArgs,
1467    ) -> LazyFrame {
1468        let JoinArgs {
1469            how,
1470            validation,
1471            suffix,
1472            slice,
1473            nulls_equal,
1474            coalesce,
1475            maintain_order,
1476        } = args;
1477
1478        if slice.is_some() {
1479            panic!("impl error: slice is not handled")
1480        }
1481
1482        let mut builder = self
1483            .join_builder()
1484            .with(other)
1485            .left_on(left_on)
1486            .right_on(right_on)
1487            .how(how)
1488            .validate(validation)
1489            .join_nulls(nulls_equal)
1490            .coalesce(coalesce)
1491            .maintain_order(maintain_order);
1492
1493        if let Some(suffix) = suffix {
1494            builder = builder.suffix(suffix);
1495        }
1496
1497        // Note: args.slice is set by the optimizer
1498        builder.finish()
1499    }
1500
1501    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1502    ///
1503    /// After the `JoinBuilder` has been created and set up, calling
1504    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1505    /// representing the `join` operation.
1506    pub fn join_builder(self) -> JoinBuilder {
1507        JoinBuilder::new(self)
1508    }
1509
1510    /// Add or replace a column, given as an expression, to a DataFrame.
1511    ///
1512    /// # Example
1513    ///
1514    /// ```rust
1515    /// use polars_core::prelude::*;
1516    /// use polars_lazy::prelude::*;
1517    /// fn add_column(df: DataFrame) -> LazyFrame {
1518    ///     df.lazy()
1519    ///         .with_column(
1520    ///             when(col("sepal_length").lt(lit(5.0)))
1521    ///             .then(lit(10))
1522    ///             .otherwise(lit(1))
1523    ///             .alias("new_column_name"),
1524    ///         )
1525    /// }
1526    /// ```
1527    pub fn with_column(self, expr: Expr) -> LazyFrame {
1528        let opt_state = self.get_opt_state();
1529        let lp = self
1530            .get_plan_builder()
1531            .with_columns(
1532                vec![expr],
1533                ProjectionOptions {
1534                    run_parallel: false,
1535                    duplicate_check: true,
1536                    should_broadcast: true,
1537                },
1538            )
1539            .build();
1540        Self::from_logical_plan(lp, opt_state)
1541    }
1542
1543    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1544    ///
1545    /// # Example
1546    ///
1547    /// ```rust
1548    /// use polars_core::prelude::*;
1549    /// use polars_lazy::prelude::*;
1550    /// fn add_columns(df: DataFrame) -> LazyFrame {
1551    ///     df.lazy()
1552    ///         .with_columns(
1553    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1554    ///          )
1555    /// }
1556    /// ```
1557    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1558        let exprs = exprs.as_ref().to_vec();
1559        self.with_columns_impl(
1560            exprs,
1561            ProjectionOptions {
1562                run_parallel: true,
1563                duplicate_check: true,
1564                should_broadcast: true,
1565            },
1566        )
1567    }
1568
1569    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1570    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1571        let exprs = exprs.as_ref().to_vec();
1572        self.with_columns_impl(
1573            exprs,
1574            ProjectionOptions {
1575                run_parallel: false,
1576                duplicate_check: true,
1577                should_broadcast: true,
1578            },
1579        )
1580    }
1581
1582    /// Match or evolve to a certain schema.
1583    pub fn match_to_schema(
1584        self,
1585        schema: SchemaRef,
1586        per_column: Arc<[MatchToSchemaPerColumn]>,
1587        extra_columns: ExtraColumnsPolicy,
1588    ) -> LazyFrame {
1589        let opt_state = self.get_opt_state();
1590        let lp = self
1591            .get_plan_builder()
1592            .match_to_schema(schema, per_column, extra_columns)
1593            .build();
1594        Self::from_logical_plan(lp, opt_state)
1595    }
1596
1597    pub fn pipe_with_schema(
1598        self,
1599        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1600    ) -> Self {
1601        let opt_state = self.get_opt_state();
1602        let lp = self
1603            .get_plan_builder()
1604            .pipe_with_schema(vec![], callback)
1605            .build();
1606        Self::from_logical_plan(lp, opt_state)
1607    }
1608
1609    pub fn pipe_with_schemas(
1610        self,
1611        others: Vec<LazyFrame>,
1612        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1613    ) -> Self {
1614        let opt_state = self.get_opt_state();
1615        let lp = self
1616            .get_plan_builder()
1617            .pipe_with_schema(
1618                others.into_iter().map(|lf| lf.logical_plan).collect(),
1619                callback,
1620            )
1621            .build();
1622        Self::from_logical_plan(lp, opt_state)
1623    }
1624
1625    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1626        let opt_state = self.get_opt_state();
1627        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1628        Self::from_logical_plan(lp, opt_state)
1629    }
1630
1631    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1632        let contexts = contexts
1633            .as_ref()
1634            .iter()
1635            .map(|lf| lf.logical_plan.clone())
1636            .collect();
1637        let opt_state = self.get_opt_state();
1638        let lp = self.get_plan_builder().with_context(contexts).build();
1639        Self::from_logical_plan(lp, opt_state)
1640    }
1641
1642    /// Aggregate all the columns as their maximum values.
1643    ///
1644    /// Aggregated columns will have the same names as the original columns.
1645    pub fn max(self) -> Self {
1646        self.map_private(DslFunction::Stats(StatsFunction::Max))
1647    }
1648
1649    /// Aggregate all the columns as their minimum values.
1650    ///
1651    /// Aggregated columns will have the same names as the original columns.
1652    pub fn min(self) -> Self {
1653        self.map_private(DslFunction::Stats(StatsFunction::Min))
1654    }
1655
1656    /// Aggregate all the columns as their sum values.
1657    ///
1658    /// Aggregated columns will have the same names as the original columns.
1659    ///
1660    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1661    /// - For integer columns, the ordinary checks for overflow are performed:
1662    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1663    ///   silently wrap.
1664    /// - String columns will sum to None.
1665    pub fn sum(self) -> Self {
1666        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1667    }
1668
1669    /// Aggregate all the columns as their mean values.
1670    ///
1671    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1672    /// - String columns will have a mean of None.
1673    pub fn mean(self) -> Self {
1674        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1675    }
1676
1677    /// Aggregate all the columns as their median values.
1678    ///
1679    /// - Boolean and integer results are converted to `f64`. However, they are still
1680    ///   susceptible to overflow before this conversion occurs.
1681    /// - String columns will sum to None.
1682    pub fn median(self) -> Self {
1683        self.map_private(DslFunction::Stats(StatsFunction::Median))
1684    }
1685
1686    /// Aggregate all the columns as their quantile values.
1687    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1688        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1689            quantile,
1690            method,
1691        }))
1692    }
1693
1694    /// Aggregate all the columns as their standard deviation values.
1695    ///
1696    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1697    /// computing the variance, where `N` is the number of rows.
1698    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1699    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1700    /// > likelihood estimate of the variance for normally distributed variables. The
1701    /// > standard deviation computed in this function is the square root of the estimated
1702    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1703    /// > standard deviation per se.
1704    ///
1705    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1706    pub fn std(self, ddof: u8) -> Self {
1707        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1708    }
1709
1710    /// Aggregate all the columns as their variance values.
1711    ///
1712    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1713    /// computing the variance, where `N` is the number of rows.
1714    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1715    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1716    /// > likelihood estimate of the variance for normally distributed variables.
1717    ///
1718    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1719    pub fn var(self, ddof: u8) -> Self {
1720        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1721    }
1722
1723    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1724    pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1725        self.explode_impl(columns, options, false)
1726    }
1727
1728    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1729    fn explode_impl(
1730        self,
1731        columns: Selector,
1732        options: ExplodeOptions,
1733        allow_empty: bool,
1734    ) -> LazyFrame {
1735        let opt_state = self.get_opt_state();
1736        let lp = self
1737            .get_plan_builder()
1738            .explode(columns, options, allow_empty)
1739            .build();
1740        Self::from_logical_plan(lp, opt_state)
1741    }
1742
1743    /// Aggregate all the columns as the sum of their null value count.
1744    pub fn null_count(self) -> LazyFrame {
1745        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1746    }
1747
1748    /// Drop non-unique rows and maintain the order of kept rows.
1749    ///
1750    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1751    /// `None`, all columns are considered.
1752    pub fn unique_stable(
1753        self,
1754        subset: Option<Selector>,
1755        keep_strategy: UniqueKeepStrategy,
1756    ) -> LazyFrame {
1757        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1758        self.unique_stable_generic(subset, keep_strategy)
1759    }
1760
1761    pub fn unique_stable_generic(
1762        self,
1763        subset: Option<Vec<Expr>>,
1764        keep_strategy: UniqueKeepStrategy,
1765    ) -> LazyFrame {
1766        let opt_state = self.get_opt_state();
1767        let options = DistinctOptionsDSL {
1768            subset,
1769            maintain_order: true,
1770            keep_strategy,
1771        };
1772        let lp = self.get_plan_builder().distinct(options).build();
1773        Self::from_logical_plan(lp, opt_state)
1774    }
1775
1776    /// Drop non-unique rows without maintaining the order of kept rows.
1777    ///
1778    /// The order of the kept rows may change; to maintain the original row order, use
1779    /// [`unique_stable`](LazyFrame::unique_stable).
1780    ///
1781    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1782    /// all columns are considered.
1783    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1784        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1785        self.unique_generic(subset, keep_strategy)
1786    }
1787
1788    pub fn unique_generic(
1789        self,
1790        subset: Option<Vec<Expr>>,
1791        keep_strategy: UniqueKeepStrategy,
1792    ) -> LazyFrame {
1793        let opt_state = self.get_opt_state();
1794        let options = DistinctOptionsDSL {
1795            subset,
1796            maintain_order: false,
1797            keep_strategy,
1798        };
1799        let lp = self.get_plan_builder().distinct(options).build();
1800        Self::from_logical_plan(lp, opt_state)
1801    }
1802
1803    /// Drop rows containing one or more NaN values.
1804    ///
1805    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1806    /// floating point columns are considered.
1807    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1808        let opt_state = self.get_opt_state();
1809        let lp = self.get_plan_builder().drop_nans(subset).build();
1810        Self::from_logical_plan(lp, opt_state)
1811    }
1812
1813    /// Drop rows containing one or more None values.
1814    ///
1815    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1816    /// columns are considered.
1817    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1818        let opt_state = self.get_opt_state();
1819        let lp = self.get_plan_builder().drop_nulls(subset).build();
1820        Self::from_logical_plan(lp, opt_state)
1821    }
1822
1823    /// Slice the DataFrame using an offset (starting row) and a length.
1824    ///
1825    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1826    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1827    /// end.
1828    ///
1829    /// If `offset` and `len` are such that the slice extends beyond the end of the
1830    /// DataFrame, the portion between `offset` and the end will be returned. In this
1831    /// case, the number of rows in the returned DataFrame will be less than `len`.
1832    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1833        let opt_state = self.get_opt_state();
1834        let lp = self.get_plan_builder().slice(offset, len).build();
1835        Self::from_logical_plan(lp, opt_state)
1836    }
1837
1838    /// Get the first row.
1839    ///
1840    /// Equivalent to `self.slice(0, 1)`.
1841    pub fn first(self) -> LazyFrame {
1842        self.slice(0, 1)
1843    }
1844
1845    /// Get the last row.
1846    ///
1847    /// Equivalent to `self.slice(-1, 1)`.
1848    pub fn last(self) -> LazyFrame {
1849        self.slice(-1, 1)
1850    }
1851
1852    /// Get the last `n` rows.
1853    ///
1854    /// Equivalent to `self.slice(-(n as i64), n)`.
1855    pub fn tail(self, n: IdxSize) -> LazyFrame {
1856        let neg_tail = -(n as i64);
1857        self.slice(neg_tail, n)
1858    }
1859
1860    #[cfg(feature = "pivot")]
1861    #[expect(clippy::too_many_arguments)]
1862    pub fn pivot(
1863        self,
1864        on: Selector,
1865        on_columns: Arc<DataFrame>,
1866        index: Selector,
1867        values: Selector,
1868        agg: Expr,
1869        maintain_order: bool,
1870        separator: PlSmallStr,
1871    ) -> LazyFrame {
1872        let opt_state = self.get_opt_state();
1873        let lp = self
1874            .get_plan_builder()
1875            .pivot(
1876                on,
1877                on_columns,
1878                index,
1879                values,
1880                agg,
1881                maintain_order,
1882                separator,
1883            )
1884            .build();
1885        Self::from_logical_plan(lp, opt_state)
1886    }
1887
1888    /// Unpivot the DataFrame from wide to long format.
1889    ///
1890    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1891    #[cfg(feature = "pivot")]
1892    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1893        let opt_state = self.get_opt_state();
1894        let lp = self.get_plan_builder().unpivot(args).build();
1895        Self::from_logical_plan(lp, opt_state)
1896    }
1897
1898    /// Limit the DataFrame to the first `n` rows.
1899    pub fn limit(self, n: IdxSize) -> LazyFrame {
1900        self.slice(0, n)
1901    }
1902
1903    /// Apply a function/closure once the logical plan get executed.
1904    ///
1905    /// The function has access to the whole materialized DataFrame at the time it is
1906    /// called.
1907    ///
1908    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1909    /// with `LazyFrame::with_column` or `with_columns`.
1910    ///
1911    /// ## Warning
1912    /// This can blow up in your face if the schema is changed due to the operation. The
1913    /// optimizer relies on a correct schema.
1914    ///
1915    /// You can toggle certain optimizations off.
1916    pub fn map<F>(
1917        self,
1918        function: F,
1919        optimizations: AllowedOptimizations,
1920        schema: Option<Arc<dyn UdfSchema>>,
1921        name: Option<&'static str>,
1922    ) -> LazyFrame
1923    where
1924        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1925    {
1926        let opt_state = self.get_opt_state();
1927        let lp = self
1928            .get_plan_builder()
1929            .map(
1930                function,
1931                optimizations,
1932                schema,
1933                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1934            )
1935            .build();
1936        Self::from_logical_plan(lp, opt_state)
1937    }
1938
1939    #[cfg(feature = "python")]
1940    pub fn map_python(
1941        self,
1942        function: polars_utils::python_function::PythonFunction,
1943        optimizations: AllowedOptimizations,
1944        schema: Option<SchemaRef>,
1945        validate_output: bool,
1946    ) -> LazyFrame {
1947        let opt_state = self.get_opt_state();
1948        let lp = self
1949            .get_plan_builder()
1950            .map_python(function, optimizations, schema, validate_output)
1951            .build();
1952        Self::from_logical_plan(lp, opt_state)
1953    }
1954
1955    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1956        let opt_state = self.get_opt_state();
1957        let lp = self.get_plan_builder().map_private(function).build();
1958        Self::from_logical_plan(lp, opt_state)
1959    }
1960
1961    /// Add a new column at index 0 that counts the rows.
1962    ///
1963    /// `name` is the name of the new column. `offset` is where to start counting from; if
1964    /// `None`, it is set to `0`.
1965    ///
1966    /// # Warning
1967    /// This can have a negative effect on query performance. This may for instance block
1968    /// predicate pushdown optimization.
1969    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1970    where
1971        S: Into<PlSmallStr>,
1972    {
1973        let name = name.into();
1974
1975        match &self.logical_plan {
1976            v @ DslPlan::Scan {
1977                scan_type,
1978                unified_scan_args,
1979                ..
1980            } if unified_scan_args.row_index.is_none()
1981                && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1982            {
1983                let DslPlan::Scan {
1984                    sources,
1985                    mut unified_scan_args,
1986                    scan_type,
1987                    cached_ir: _,
1988                } = v.clone()
1989                else {
1990                    unreachable!()
1991                };
1992
1993                unified_scan_args.row_index = Some(RowIndex {
1994                    name,
1995                    offset: offset.unwrap_or(0),
1996                });
1997
1998                DslPlan::Scan {
1999                    sources,
2000                    unified_scan_args,
2001                    scan_type,
2002                    cached_ir: Default::default(),
2003                }
2004                .into()
2005            },
2006            _ => self.map_private(DslFunction::RowIndex { name, offset }),
2007        }
2008    }
2009
2010    /// Return the number of non-null elements for each column.
2011    pub fn count(self) -> LazyFrame {
2012        self.select(vec![col(PlSmallStr::from_static("*")).count()])
2013    }
2014
2015    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2016    /// inserted as columns.
2017    #[cfg(feature = "dtype-struct")]
2018    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2019        self.map_private(DslFunction::Unnest {
2020            columns: cols,
2021            separator,
2022        })
2023    }
2024
2025    #[cfg(feature = "merge_sorted")]
2026    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2027    where
2028        S: Into<PlSmallStr>,
2029    {
2030        let key = key.into();
2031
2032        let lp = DslPlan::MergeSorted {
2033            input_left: Arc::new(self.logical_plan),
2034            input_right: Arc::new(other.logical_plan),
2035            key,
2036        };
2037        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2038    }
2039
2040    pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
2041        let lp = DslPlan::MapFunction {
2042            input: Arc::new(self.logical_plan),
2043            function: DslFunction::Hint(hint),
2044        };
2045        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2046    }
2047}
2048
2049/// Utility struct for lazy group_by operation.
2050#[derive(Clone)]
2051pub struct LazyGroupBy {
2052    pub logical_plan: DslPlan,
2053    opt_state: OptFlags,
2054    keys: Vec<Expr>,
2055    predicates: Vec<Expr>,
2056    maintain_order: bool,
2057    #[cfg(feature = "dynamic_group_by")]
2058    dynamic_options: Option<DynamicGroupOptions>,
2059    #[cfg(feature = "dynamic_group_by")]
2060    rolling_options: Option<RollingGroupOptions>,
2061}
2062
2063impl From<LazyGroupBy> for LazyFrame {
2064    fn from(lgb: LazyGroupBy) -> Self {
2065        Self {
2066            logical_plan: lgb.logical_plan,
2067            opt_state: lgb.opt_state,
2068            cached_arena: Default::default(),
2069        }
2070    }
2071}
2072
2073impl LazyGroupBy {
2074    /// Filter groups with a predicate after aggregation.
2075    ///
2076    /// Similarly to the [LazyGroupBy::agg] method, the predicate must run an aggregation as it
2077    /// is evaluated on the groups.
2078    /// This method can be chained in which case all predicates must evaluate to `true` for a
2079    /// group to be kept.
2080    ///
2081    /// # Example
2082    ///
2083    /// ```rust
2084    /// use polars_core::prelude::*;
2085    /// use polars_lazy::prelude::*;
2086    ///
2087    /// fn example(df: DataFrame) -> LazyFrame {
2088    ///       df.lazy()
2089    ///        .group_by_stable([col("date")])
2090    ///        .having(col("rain").sum().gt(lit(10)))
2091    ///        .agg([col("rain").min().alias("min_rain")])
2092    /// }
2093    /// ```
2094    pub fn having(mut self, predicate: Expr) -> Self {
2095        self.predicates.push(predicate);
2096        self
2097    }
2098
2099    /// Group by and aggregate.
2100    ///
2101    /// Select a column with [col] and choose an aggregation.
2102    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2103    ///
2104    /// # Example
2105    ///
2106    /// ```rust
2107    /// use polars_core::prelude::*;
2108    /// use polars_lazy::prelude::*;
2109    ///
2110    /// fn example(df: DataFrame) -> LazyFrame {
2111    ///       df.lazy()
2112    ///        .group_by_stable([col("date")])
2113    ///        .agg([
2114    ///            col("rain").min().alias("min_rain"),
2115    ///            col("rain").sum().alias("sum_rain"),
2116    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2117    ///        ])
2118    /// }
2119    /// ```
2120    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2121        #[cfg(feature = "dynamic_group_by")]
2122        let lp = DslBuilder::from(self.logical_plan)
2123            .group_by(
2124                self.keys,
2125                self.predicates,
2126                aggs,
2127                None,
2128                self.maintain_order,
2129                self.dynamic_options,
2130                self.rolling_options,
2131            )
2132            .build();
2133
2134        #[cfg(not(feature = "dynamic_group_by"))]
2135        let lp = DslBuilder::from(self.logical_plan)
2136            .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2137            .build();
2138        LazyFrame::from_logical_plan(lp, self.opt_state)
2139    }
2140
2141    /// Return first n rows of each group
2142    pub fn head(self, n: Option<usize>) -> LazyFrame {
2143        let keys = self
2144            .keys
2145            .iter()
2146            .filter_map(|expr| expr_output_name(expr).ok())
2147            .collect::<Vec<_>>();
2148
2149        self.agg([all().as_expr().head(n)]).explode_impl(
2150            all() - by_name(keys.iter().cloned(), false),
2151            ExplodeOptions {
2152                empty_as_null: true,
2153                keep_nulls: true,
2154            },
2155            true,
2156        )
2157    }
2158
2159    /// Return last n rows of each group
2160    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2161        let keys = self
2162            .keys
2163            .iter()
2164            .filter_map(|expr| expr_output_name(expr).ok())
2165            .collect::<Vec<_>>();
2166
2167        self.agg([all().as_expr().tail(n)]).explode_impl(
2168            all() - by_name(keys.iter().cloned(), false),
2169            ExplodeOptions {
2170                empty_as_null: true,
2171                keep_nulls: true,
2172            },
2173            true,
2174        )
2175    }
2176
2177    /// Apply a function over the groups as a new DataFrame.
2178    ///
2179    /// **It is not recommended that you use this as materializing the DataFrame is very
2180    /// expensive.**
2181    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2182        if !self.predicates.is_empty() {
2183            panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2184        }
2185
2186        #[cfg(feature = "dynamic_group_by")]
2187        let options = GroupbyOptions {
2188            dynamic: self.dynamic_options,
2189            rolling: self.rolling_options,
2190            slice: None,
2191        };
2192
2193        #[cfg(not(feature = "dynamic_group_by"))]
2194        let options = GroupbyOptions { slice: None };
2195
2196        let lp = DslPlan::GroupBy {
2197            input: Arc::new(self.logical_plan),
2198            keys: self.keys,
2199            predicates: vec![],
2200            aggs: vec![],
2201            apply: Some((f, schema)),
2202            maintain_order: self.maintain_order,
2203            options: Arc::new(options),
2204        };
2205        LazyFrame::from_logical_plan(lp, self.opt_state)
2206    }
2207}
2208
2209#[must_use]
2210pub struct JoinBuilder {
2211    lf: LazyFrame,
2212    how: JoinType,
2213    other: Option<LazyFrame>,
2214    left_on: Vec<Expr>,
2215    right_on: Vec<Expr>,
2216    allow_parallel: bool,
2217    force_parallel: bool,
2218    suffix: Option<PlSmallStr>,
2219    validation: JoinValidation,
2220    nulls_equal: bool,
2221    coalesce: JoinCoalesce,
2222    maintain_order: MaintainOrderJoin,
2223}
2224impl JoinBuilder {
2225    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2226    pub fn new(lf: LazyFrame) -> Self {
2227        Self {
2228            lf,
2229            other: None,
2230            how: JoinType::Inner,
2231            left_on: vec![],
2232            right_on: vec![],
2233            allow_parallel: true,
2234            force_parallel: false,
2235            suffix: None,
2236            validation: Default::default(),
2237            nulls_equal: false,
2238            coalesce: Default::default(),
2239            maintain_order: Default::default(),
2240        }
2241    }
2242
2243    /// The right table in the join.
2244    pub fn with(mut self, other: LazyFrame) -> Self {
2245        self.other = Some(other);
2246        self
2247    }
2248
2249    /// Select the join type.
2250    pub fn how(mut self, how: JoinType) -> Self {
2251        self.how = how;
2252        self
2253    }
2254
2255    pub fn validate(mut self, validation: JoinValidation) -> Self {
2256        self.validation = validation;
2257        self
2258    }
2259
2260    /// The expressions you want to join both tables on.
2261    ///
2262    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2263    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2264        let on = on.as_ref().to_vec();
2265        self.left_on.clone_from(&on);
2266        self.right_on = on;
2267        self
2268    }
2269
2270    /// The expressions you want to join the left table on.
2271    ///
2272    /// The passed expressions must be valid in the left table.
2273    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2274        self.left_on = on.as_ref().to_vec();
2275        self
2276    }
2277
2278    /// The expressions you want to join the right table on.
2279    ///
2280    /// The passed expressions must be valid in the right table.
2281    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2282        self.right_on = on.as_ref().to_vec();
2283        self
2284    }
2285
2286    /// Allow parallel table evaluation.
2287    pub fn allow_parallel(mut self, allow: bool) -> Self {
2288        self.allow_parallel = allow;
2289        self
2290    }
2291
2292    /// Force parallel table evaluation.
2293    pub fn force_parallel(mut self, force: bool) -> Self {
2294        self.force_parallel = force;
2295        self
2296    }
2297
2298    /// Join on null values. By default null values will never produce matches.
2299    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2300        self.nulls_equal = nulls_equal;
2301        self
2302    }
2303
2304    /// Suffix to add duplicate column names in join.
2305    /// Defaults to `"_right"` if this method is never called.
2306    pub fn suffix<S>(mut self, suffix: S) -> Self
2307    where
2308        S: Into<PlSmallStr>,
2309    {
2310        self.suffix = Some(suffix.into());
2311        self
2312    }
2313
2314    /// Whether to coalesce join columns.
2315    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2316        self.coalesce = coalesce;
2317        self
2318    }
2319
2320    /// Whether to preserve the row order.
2321    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2322        self.maintain_order = maintain_order;
2323        self
2324    }
2325
2326    /// Finish builder
2327    pub fn finish(self) -> LazyFrame {
2328        let opt_state = self.lf.opt_state;
2329        let other = self.other.expect("'with' not set in join builder");
2330
2331        let args = JoinArgs {
2332            how: self.how,
2333            validation: self.validation,
2334            suffix: self.suffix,
2335            slice: None,
2336            nulls_equal: self.nulls_equal,
2337            coalesce: self.coalesce,
2338            maintain_order: self.maintain_order,
2339        };
2340
2341        let lp = self
2342            .lf
2343            .get_plan_builder()
2344            .join(
2345                other.logical_plan,
2346                self.left_on,
2347                self.right_on,
2348                JoinOptions {
2349                    allow_parallel: self.allow_parallel,
2350                    force_parallel: self.force_parallel,
2351                    args,
2352                }
2353                .into(),
2354            )
2355            .build();
2356        LazyFrame::from_logical_plan(lp, opt_state)
2357    }
2358
2359    // Finish with join predicates
2360    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2361        let opt_state = self.lf.opt_state;
2362        let other = self.other.expect("with not set");
2363
2364        // Decompose `And` conjunctions into their component expressions
2365        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2366            if let Expr::BinaryExpr {
2367                op: Operator::And,
2368                left,
2369                right,
2370            } = predicate
2371            {
2372                decompose_and((*left).clone(), expanded_predicates);
2373                decompose_and((*right).clone(), expanded_predicates);
2374            } else {
2375                expanded_predicates.push(predicate);
2376            }
2377        }
2378        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2379        for predicate in predicates {
2380            decompose_and(predicate, &mut expanded_predicates);
2381        }
2382        let predicates: Vec<Expr> = expanded_predicates;
2383
2384        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2385        #[cfg(feature = "is_between")]
2386        let predicates: Vec<Expr> = {
2387            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2388            for predicate in predicates {
2389                if let Expr::Function {
2390                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2391                    input,
2392                    ..
2393                } = &predicate
2394                {
2395                    if let [expr, lower, upper] = input.as_slice() {
2396                        match closed {
2397                            ClosedInterval::Both => {
2398                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2399                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2400                            },
2401                            ClosedInterval::Right => {
2402                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2403                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2404                            },
2405                            ClosedInterval::Left => {
2406                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2407                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2408                            },
2409                            ClosedInterval::None => {
2410                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2411                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2412                            },
2413                        }
2414                        continue;
2415                    }
2416                }
2417                expanded_predicates.push(predicate);
2418            }
2419            expanded_predicates
2420        };
2421
2422        let args = JoinArgs {
2423            how: self.how,
2424            validation: self.validation,
2425            suffix: self.suffix,
2426            slice: None,
2427            nulls_equal: self.nulls_equal,
2428            coalesce: self.coalesce,
2429            maintain_order: self.maintain_order,
2430        };
2431        let options = JoinOptions {
2432            allow_parallel: self.allow_parallel,
2433            force_parallel: self.force_parallel,
2434            args,
2435        };
2436
2437        let lp = DslPlan::Join {
2438            input_left: Arc::new(self.lf.logical_plan),
2439            input_right: Arc::new(other.logical_plan),
2440            left_on: Default::default(),
2441            right_on: Default::default(),
2442            predicates,
2443            options: Arc::from(options),
2444        };
2445
2446        LazyFrame::from_logical_plan(lp, opt_state)
2447    }
2448}
2449
2450pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2451    #[cfg(not(feature = "new_streaming"))]
2452    {
2453        None
2454    }
2455    #[cfg(feature = "new_streaming")]
2456    {
2457        Some(polars_stream::build_streaming_query_executor)
2458    }
2459};
2460
2461pub struct CollectBatches {
2462    recv: Receiver<PolarsResult<DataFrame>>,
2463    runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2464}
2465
2466impl CollectBatches {
2467    /// Start running the query, if not already.
2468    pub fn start(&mut self) {
2469        if let Some(runner) = self.runner.take() {
2470            runner()
2471        }
2472    }
2473}
2474
2475impl Iterator for CollectBatches {
2476    type Item = PolarsResult<DataFrame>;
2477
2478    fn next(&mut self) -> Option<Self::Item> {
2479        self.start();
2480        self.recv.recv().ok()
2481    }
2482}