polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::POOL;
26use polars_core::error::feature_gated;
27use polars_core::prelude::*;
28use polars_io::RowIndex;
29use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
30use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
31use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
32#[cfg(feature = "is_between")]
33use polars_ops::prelude::ClosedInterval;
34pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
35use polars_utils::pl_str::PlSmallStr;
36use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
37
38use crate::frame::cached_arenas::CachedArena;
39use crate::prelude::*;
40
41pub trait IntoLazy {
42    fn lazy(self) -> LazyFrame;
43}
44
45impl IntoLazy for DataFrame {
46    /// Convert the `DataFrame` into a `LazyFrame`
47    fn lazy(self) -> LazyFrame {
48        let lp = DslBuilder::from_existing_df(self).build();
49        LazyFrame {
50            logical_plan: lp,
51            opt_state: Default::default(),
52            cached_arena: Default::default(),
53        }
54    }
55}
56
57impl IntoLazy for LazyFrame {
58    fn lazy(self) -> LazyFrame {
59        self
60    }
61}
62
63/// Lazy abstraction over an eager `DataFrame`.
64///
65/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
66/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
67#[derive(Clone, Default)]
68#[must_use]
69pub struct LazyFrame {
70    pub logical_plan: DslPlan,
71    pub(crate) opt_state: OptFlags,
72    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
73}
74
75impl From<DslPlan> for LazyFrame {
76    fn from(plan: DslPlan) -> Self {
77        Self {
78            logical_plan: plan,
79            opt_state: OptFlags::default(),
80            cached_arena: Default::default(),
81        }
82    }
83}
84
85impl LazyFrame {
86    pub(crate) fn from_inner(
87        logical_plan: DslPlan,
88        opt_state: OptFlags,
89        cached_arena: Arc<Mutex<Option<CachedArena>>>,
90    ) -> Self {
91        Self {
92            logical_plan,
93            opt_state,
94            cached_arena,
95        }
96    }
97
98    pub(crate) fn get_plan_builder(self) -> DslBuilder {
99        DslBuilder::from(self.logical_plan)
100    }
101
102    fn get_opt_state(&self) -> OptFlags {
103        self.opt_state
104    }
105
106    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
107        LazyFrame {
108            logical_plan,
109            opt_state,
110            cached_arena: Default::default(),
111        }
112    }
113
114    /// Get current optimizations.
115    pub fn get_current_optimizations(&self) -> OptFlags {
116        self.opt_state
117    }
118
119    /// Set allowed optimizations.
120    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
121        self.opt_state = opt_state;
122        self
123    }
124
125    /// Turn off all optimizations.
126    pub fn without_optimizations(self) -> Self {
127        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
128    }
129
130    /// Toggle projection pushdown optimization.
131    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
132        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
133        self
134    }
135
136    /// Toggle cluster with columns optimization.
137    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
138        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
139        self
140    }
141
142    /// Check if operations are order dependent and unset maintaining_order if
143    /// the order would not be observed.
144    pub fn with_check_order(mut self, toggle: bool) -> Self {
145        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
146        self
147    }
148
149    /// Toggle predicate pushdown optimization.
150    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
151        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
152        self
153    }
154
155    /// Toggle type coercion optimization.
156    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
157        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
158        self
159    }
160
161    /// Toggle type check optimization.
162    pub fn with_type_check(mut self, toggle: bool) -> Self {
163        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
164        self
165    }
166
167    /// Toggle expression simplification optimization on or off.
168    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
169        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
170        self
171    }
172
173    /// Toggle common subplan elimination optimization on or off
174    #[cfg(feature = "cse")]
175    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
176        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
177        self
178    }
179
180    /// Toggle common subexpression elimination optimization on or off
181    #[cfg(feature = "cse")]
182    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
183        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
184        self
185    }
186
187    /// Toggle slice pushdown optimization.
188    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
189        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
190        self
191    }
192
193    #[cfg(feature = "new_streaming")]
194    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
195        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
196        self
197    }
198
199    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
200    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
201        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
202        self
203    }
204
205    /// Run every node eagerly. This turns off multi-node optimizations.
206    pub fn _with_eager(mut self, toggle: bool) -> Self {
207        self.opt_state.set(OptFlags::EAGER, toggle);
208        self
209    }
210
211    /// Return a String describing the naive (un-optimized) logical plan.
212    pub fn describe_plan(&self) -> PolarsResult<String> {
213        Ok(self.clone().to_alp()?.describe())
214    }
215
216    /// Return a String describing the naive (un-optimized) logical plan in tree format.
217    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
218        Ok(self.clone().to_alp()?.describe_tree_format())
219    }
220
221    /// Return a String describing the optimized logical plan.
222    ///
223    /// Returns `Err` if optimizing the logical plan fails.
224    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
225        Ok(self.clone().to_alp_optimized()?.describe())
226    }
227
228    /// Return a String describing the optimized logical plan in tree format.
229    ///
230    /// Returns `Err` if optimizing the logical plan fails.
231    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
232        Ok(self.clone().to_alp_optimized()?.describe_tree_format())
233    }
234
235    /// Return a String describing the logical plan.
236    ///
237    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
238    /// explains the naive, un-optimized plan.
239    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
240        if optimized {
241            self.describe_optimized_plan()
242        } else {
243            self.describe_plan()
244        }
245    }
246
247    /// Add a sort operation to the logical plan.
248    ///
249    /// Sorts the LazyFrame by the column name specified using the provided options.
250    ///
251    /// # Example
252    ///
253    /// Sort DataFrame by 'sepal_width' column:
254    /// ```rust
255    /// # use polars_core::prelude::*;
256    /// # use polars_lazy::prelude::*;
257    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
258    ///     df.lazy().sort(["sepal_width"], Default::default())
259    /// }
260    /// ```
261    /// Sort by a single column with specific order:
262    /// ```
263    /// # use polars_core::prelude::*;
264    /// # use polars_lazy::prelude::*;
265    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
266    ///     df.lazy().sort(
267    ///         ["sepal_width"],
268    ///         SortMultipleOptions::new()
269    ///             .with_order_descending(descending)
270    ///     )
271    /// }
272    /// ```
273    /// Sort by multiple columns with specifying order for each column:
274    /// ```
275    /// # use polars_core::prelude::*;
276    /// # use polars_lazy::prelude::*;
277    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
278    ///     df.lazy().sort(
279    ///         ["sepal_width", "sepal_length"],
280    ///         SortMultipleOptions::new()
281    ///             .with_order_descending_multi([false, true])
282    ///     )
283    /// }
284    /// ```
285    /// See [`SortMultipleOptions`] for more options.
286    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
287        let opt_state = self.get_opt_state();
288        let lp = self
289            .get_plan_builder()
290            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
291            .build();
292        Self::from_logical_plan(lp, opt_state)
293    }
294
295    /// Add a sort operation to the logical plan.
296    ///
297    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
298    /// concrete columns before sorting.
299    ///
300    /// See [`SortMultipleOptions`] for more options.
301    ///
302    /// # Example
303    ///
304    /// ```rust
305    /// use polars_core::prelude::*;
306    /// use polars_lazy::prelude::*;
307    ///
308    /// /// Sort DataFrame by 'sepal_width' column
309    /// fn example(df: DataFrame) -> LazyFrame {
310    ///       df.lazy()
311    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
312    /// }
313    /// ```
314    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
315        self,
316        by_exprs: E,
317        sort_options: SortMultipleOptions,
318    ) -> Self {
319        let by_exprs = by_exprs.as_ref().to_vec();
320        if by_exprs.is_empty() {
321            self
322        } else {
323            let opt_state = self.get_opt_state();
324            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
325            Self::from_logical_plan(lp, opt_state)
326        }
327    }
328
329    pub fn top_k<E: AsRef<[Expr]>>(
330        self,
331        k: IdxSize,
332        by_exprs: E,
333        sort_options: SortMultipleOptions,
334    ) -> Self {
335        // this will optimize to top-k
336        self.sort_by_exprs(
337            by_exprs,
338            sort_options.with_order_reversed().with_nulls_last(true),
339        )
340        .slice(0, k)
341    }
342
343    pub fn bottom_k<E: AsRef<[Expr]>>(
344        self,
345        k: IdxSize,
346        by_exprs: E,
347        sort_options: SortMultipleOptions,
348    ) -> Self {
349        // this will optimize to bottom-k
350        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
351            .slice(0, k)
352    }
353
354    /// Reverse the `DataFrame` from top to bottom.
355    ///
356    /// Row `i` becomes row `number_of_rows - i - 1`.
357    ///
358    /// # Example
359    ///
360    /// ```rust
361    /// use polars_core::prelude::*;
362    /// use polars_lazy::prelude::*;
363    ///
364    /// fn example(df: DataFrame) -> LazyFrame {
365    ///       df.lazy()
366    ///         .reverse()
367    /// }
368    /// ```
369    pub fn reverse(self) -> Self {
370        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
371    }
372
373    /// Rename columns in the DataFrame.
374    ///
375    /// `existing` and `new` are iterables of the same length containing the old and
376    /// corresponding new column names. Renaming happens to all `existing` columns
377    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
378    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
379    /// those columns that are actually found will be renamed (others will be ignored).
380    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
381    where
382        I: IntoIterator<Item = T>,
383        J: IntoIterator<Item = S>,
384        T: AsRef<str>,
385        S: AsRef<str>,
386    {
387        let iter = existing.into_iter();
388        let cap = iter.size_hint().0;
389        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
391
392        // TODO! should this error if `existing` and `new` have different lengths?
393        // Currently, the longer of the two is truncated.
394        for (existing, new) in iter.zip(new) {
395            let existing = existing.as_ref();
396            let new = new.as_ref();
397            if new != existing {
398                existing_vec.push(existing.into());
399                new_vec.push(new.into());
400            }
401        }
402
403        self.map_private(DslFunction::Rename {
404            existing: existing_vec.into(),
405            new: new_vec.into(),
406            strict,
407        })
408    }
409
410    /// Removes columns from the DataFrame.
411    /// Note that it's better to only select the columns you need
412    /// and let the projection pushdown optimize away the unneeded columns.
413    ///
414    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
415    /// error while materializing the [`LazyFrame`].
416    pub fn drop(self, columns: Selector) -> Self {
417        let opt_state = self.get_opt_state();
418        let lp = self.get_plan_builder().drop(columns).build();
419        Self::from_logical_plan(lp, opt_state)
420    }
421
422    /// Shift the values by a given period and fill the parts that will be empty due to this operation
423    /// with `Nones`.
424    ///
425    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
426    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
427        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
428    }
429
430    /// Shift the values by a given period and fill the parts that will be empty due to this operation
431    /// with the result of the `fill_value` expression.
432    ///
433    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
434    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
435        self.select(vec![
436            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
437        ])
438    }
439
440    /// Fill None values in the DataFrame with an expression.
441    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
442        let opt_state = self.get_opt_state();
443        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
444        Self::from_logical_plan(lp, opt_state)
445    }
446
447    /// Fill NaN values in the DataFrame with an expression.
448    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
449        let opt_state = self.get_opt_state();
450        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
451        Self::from_logical_plan(lp, opt_state)
452    }
453
454    /// Caches the result into a new LazyFrame.
455    ///
456    /// This should be used to prevent computations running multiple times.
457    pub fn cache(self) -> Self {
458        let opt_state = self.get_opt_state();
459        let lp = self.get_plan_builder().cache().build();
460        Self::from_logical_plan(lp, opt_state)
461    }
462
463    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
464    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
465        let cast_cols: Vec<Expr> = dtypes
466            .into_iter()
467            .map(|(name, dt)| {
468                let name = PlSmallStr::from_str(name);
469
470                if strict {
471                    col(name).strict_cast(dt)
472                } else {
473                    col(name).cast(dt)
474                }
475            })
476            .collect();
477
478        if cast_cols.is_empty() {
479            self
480        } else {
481            self.with_columns(cast_cols)
482        }
483    }
484
485    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
486    pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
487        self.with_columns(vec![if strict {
488            col(PlSmallStr::from_static("*")).strict_cast(dtype)
489        } else {
490            col(PlSmallStr::from_static("*")).cast(dtype)
491        }])
492    }
493
494    pub fn optimize(
495        self,
496        lp_arena: &mut Arena<IR>,
497        expr_arena: &mut Arena<AExpr>,
498    ) -> PolarsResult<Node> {
499        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
500    }
501
502    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
503        let (mut lp_arena, mut expr_arena) = self.get_arenas();
504        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
505
506        Ok(IRPlan::new(node, lp_arena, expr_arena))
507    }
508
509    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
510        let (mut lp_arena, mut expr_arena) = self.get_arenas();
511        let node = to_alp(
512            self.logical_plan,
513            &mut expr_arena,
514            &mut lp_arena,
515            &mut self.opt_state,
516        )?;
517        let plan = IRPlan::new(node, lp_arena, expr_arena);
518        Ok(plan)
519    }
520
521    pub(crate) fn optimize_with_scratch(
522        self,
523        lp_arena: &mut Arena<IR>,
524        expr_arena: &mut Arena<AExpr>,
525        scratch: &mut Vec<Node>,
526    ) -> PolarsResult<Node> {
527        #[allow(unused_mut)]
528        let mut opt_state = self.opt_state;
529        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
530
531        #[cfg(feature = "cse")]
532        if new_streaming {
533            // The new streaming engine can't deal with the way the common
534            // subexpression elimination adds length-incorrect with_columns.
535            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
536        }
537
538        let lp_top = optimize(
539            self.logical_plan,
540            opt_state,
541            lp_arena,
542            expr_arena,
543            scratch,
544            apply_scan_predicate_to_scan_ir,
545        )?;
546
547        Ok(lp_top)
548    }
549
550    fn prepare_collect_post_opt<P>(
551        mut self,
552        check_sink: bool,
553        query_start: Option<std::time::Instant>,
554        post_opt: P,
555    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
556    where
557        P: FnOnce(
558            Node,
559            &mut Arena<IR>,
560            &mut Arena<AExpr>,
561            Option<std::time::Duration>,
562        ) -> PolarsResult<()>,
563    {
564        let (mut lp_arena, mut expr_arena) = self.get_arenas();
565
566        let mut scratch = vec![];
567        let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
568
569        post_opt(
570            lp_top,
571            &mut lp_arena,
572            &mut expr_arena,
573            // Post optimization callback gets the time since the
574            // query was started as its "base" timepoint.
575            query_start.map(|s| s.elapsed()),
576        )?;
577
578        // sink should be replaced
579        let no_file_sink = if check_sink {
580            !matches!(
581                lp_arena.get(lp_top),
582                IR::Sink {
583                    payload: SinkTypeIR::File { .. },
584                    ..
585                }
586            )
587        } else {
588            true
589        };
590        let physical_plan = create_physical_plan(
591            lp_top,
592            &mut lp_arena,
593            &mut expr_arena,
594            BUILD_STREAMING_EXECUTOR,
595        )?;
596
597        let state = ExecutionState::new();
598        Ok((state, physical_plan, no_file_sink))
599    }
600
601    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
602    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
603    where
604        P: FnOnce(
605            Node,
606            &mut Arena<IR>,
607            &mut Arena<AExpr>,
608            Option<std::time::Duration>,
609        ) -> PolarsResult<()>,
610    {
611        let (mut state, mut physical_plan, _) =
612            self.prepare_collect_post_opt(false, None, post_opt)?;
613        physical_plan.execute(&mut state)
614    }
615
616    #[allow(unused_mut)]
617    fn prepare_collect(
618        self,
619        check_sink: bool,
620        query_start: Option<std::time::Instant>,
621    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
622        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
623    }
624
625    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
626    /// `engine`.
627    ///
628    /// The query is optimized prior to execution.
629    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
630        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
631            payload.clone()
632        } else {
633            self.logical_plan = DslPlan::Sink {
634                input: Arc::new(self.logical_plan),
635                payload: SinkType::Memory,
636            };
637            SinkType::Memory
638        };
639
640        // Default engine for collect is InMemory, sink_* is Streaming
641        if engine == Engine::Auto {
642            engine = match payload {
643                #[cfg(feature = "new_streaming")]
644                SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
645                _ => Engine::InMemory,
646            };
647        }
648        // Gpu uses some hacks to dispatch.
649        if engine == Engine::Gpu {
650            engine = Engine::InMemory;
651        }
652
653        #[cfg(feature = "new_streaming")]
654        {
655            if let Some(result) = self.try_new_streaming_if_requested() {
656                return result.map(|v| v.unwrap_single());
657            }
658        }
659
660        match engine {
661            Engine::Auto => unreachable!(),
662            Engine::Streaming => {
663                feature_gated!("new_streaming", self = self.with_new_streaming(true))
664            },
665            _ => {},
666        }
667        let mut alp_plan = self.clone().to_alp_optimized()?;
668
669        match engine {
670            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
671                let result = polars_stream::run_query(
672                    alp_plan.lp_top,
673                    &mut alp_plan.lp_arena,
674                    &mut alp_plan.expr_arena,
675                );
676                result.map(|v| v.unwrap_single())
677            }),
678            Engine::Gpu => {
679                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
680            },
681            Engine::InMemory => {
682                let mut physical_plan = create_physical_plan(
683                    alp_plan.lp_top,
684                    &mut alp_plan.lp_arena,
685                    &mut alp_plan.expr_arena,
686                    BUILD_STREAMING_EXECUTOR,
687                )?;
688                let mut state = ExecutionState::new();
689                physical_plan.execute(&mut state)
690            },
691        }
692    }
693
694    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
695        let sink_multiple = LazyFrame {
696            logical_plan: DslPlan::SinkMultiple { inputs: plans },
697            opt_state,
698            cached_arena: Default::default(),
699        };
700        sink_multiple.explain(true)
701    }
702
703    pub fn collect_all_with_engine(
704        plans: Vec<DslPlan>,
705        mut engine: Engine,
706        opt_state: OptFlags,
707    ) -> PolarsResult<Vec<DataFrame>> {
708        if plans.is_empty() {
709            return Ok(Vec::new());
710        }
711
712        // Default engine for collect_all is InMemory
713        if engine == Engine::Auto {
714            engine = Engine::InMemory;
715        }
716        // Gpu uses some hacks to dispatch.
717        if engine == Engine::Gpu {
718            engine = Engine::InMemory;
719        }
720
721        let mut sink_multiple = LazyFrame {
722            logical_plan: DslPlan::SinkMultiple { inputs: plans },
723            opt_state,
724            cached_arena: Default::default(),
725        };
726
727        #[cfg(feature = "new_streaming")]
728        {
729            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
730                return result.map(|v| v.unwrap_multiple());
731            }
732        }
733
734        match engine {
735            Engine::Auto => unreachable!(),
736            Engine::Streaming => {
737                feature_gated!(
738                    "new_streaming",
739                    sink_multiple = sink_multiple.with_new_streaming(true)
740                )
741            },
742            _ => {},
743        }
744        let mut alp_plan = sink_multiple.to_alp_optimized()?;
745
746        if engine == Engine::Streaming {
747            feature_gated!("new_streaming", {
748                let result = polars_stream::run_query(
749                    alp_plan.lp_top,
750                    &mut alp_plan.lp_arena,
751                    &mut alp_plan.expr_arena,
752                );
753                return result.map(|v| v.unwrap_multiple());
754            });
755        }
756
757        let IR::SinkMultiple { inputs } = alp_plan.root() else {
758            unreachable!()
759        };
760
761        let mut multiplan = create_multiple_physical_plans(
762            inputs.clone().as_slice(),
763            &mut alp_plan.lp_arena,
764            &mut alp_plan.expr_arena,
765            BUILD_STREAMING_EXECUTOR,
766        )?;
767
768        match engine {
769            Engine::Gpu => polars_bail!(
770                InvalidOperation: "collect_all is not supported for the gpu engine"
771            ),
772            Engine::InMemory => {
773                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
774                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
775                // within bounds
776                let mut state = ExecutionState::new();
777                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
778                    cache_prefiller.execute(&mut state)?;
779                }
780                let out = POOL.install(|| {
781                    multiplan
782                        .physical_plans
783                        .chunks_mut(POOL.current_num_threads() * 3)
784                        .map(|chunk| {
785                            chunk
786                                .into_par_iter()
787                                .enumerate()
788                                .map(|(idx, input)| {
789                                    let mut input = std::mem::take(input);
790                                    let mut state = state.split();
791                                    state.branch_idx += idx;
792
793                                    let df = input.execute(&mut state)?;
794                                    Ok(df)
795                                })
796                                .collect::<PolarsResult<Vec<_>>>()
797                        })
798                        .collect::<PolarsResult<Vec<_>>>()
799                });
800                Ok(out?.into_iter().flatten().collect())
801            },
802            _ => unreachable!(),
803        }
804    }
805
806    /// Execute all the lazy operations and collect them into a [`DataFrame`].
807    ///
808    /// The query is optimized prior to execution.
809    ///
810    /// # Example
811    ///
812    /// ```rust
813    /// use polars_core::prelude::*;
814    /// use polars_lazy::prelude::*;
815    ///
816    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
817    ///     df.lazy()
818    ///       .group_by([col("foo")])
819    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
820    ///       .collect()
821    /// }
822    /// ```
823    pub fn collect(self) -> PolarsResult<DataFrame> {
824        self.collect_with_engine(Engine::InMemory)
825    }
826
827    /// Collect the query in batches.
828    ///
829    /// If lazy is true the query will not start until the first poll (or until
830    /// start is called on CollectBatches).
831    #[cfg(feature = "async")]
832    pub fn collect_batches(
833        self,
834        engine: Engine,
835        maintain_order: bool,
836        chunk_size: Option<NonZeroUsize>,
837        lazy: bool,
838    ) -> PolarsResult<CollectBatches> {
839        let (send, recv) = sync_channel(1);
840        let runner_send = send.clone();
841        let ldf = self.sink_batches(
842            PlanCallback::new(move |df| {
843                // Stop if receiver has closed.
844                let send_result = send.send(Ok(df));
845                Ok(send_result.is_err())
846            }),
847            maintain_order,
848            chunk_size,
849        )?;
850        let runner = move || {
851            // We use a tokio spawn_blocking here as it has a high blocking
852            // thread pool limit.
853            polars_io::pl_async::get_runtime().spawn_blocking(move || {
854                if let Err(e) = ldf.collect_with_engine(engine) {
855                    runner_send.send(Err(e)).ok();
856                }
857            });
858        };
859
860        let mut collect_batches = CollectBatches {
861            recv,
862            runner: Some(Box::new(runner)),
863        };
864        if !lazy {
865            collect_batches.start();
866        }
867        Ok(collect_batches)
868    }
869
870    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
871    // This version does profiling of the node execution.
872    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
873    where
874        P: FnOnce(
875            Node,
876            &mut Arena<IR>,
877            &mut Arena<AExpr>,
878            Option<std::time::Duration>,
879        ) -> PolarsResult<()>,
880    {
881        let query_start = std::time::Instant::now();
882        let (mut state, mut physical_plan, _) =
883            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
884        state.time_nodes(query_start);
885        let out = physical_plan.execute(&mut state)?;
886        let timer_df = state.finish_timer()?;
887        Ok((out, timer_df))
888    }
889
890    /// Profile a LazyFrame.
891    ///
892    /// This will run the query and return a tuple
893    /// containing the materialized DataFrame and a DataFrame that contains profiling information
894    /// of each node that is executed.
895    ///
896    /// The units of the timings are microseconds.
897    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
898        self._profile_post_opt(|_, _, _, _| Ok(()))
899    }
900
901    pub fn sink_batches(
902        mut self,
903        function: PlanCallback<DataFrame, bool>,
904        maintain_order: bool,
905        chunk_size: Option<NonZeroUsize>,
906    ) -> PolarsResult<Self> {
907        use polars_plan::prelude::sink::CallbackSinkType;
908
909        polars_ensure!(
910            !matches!(self.logical_plan, DslPlan::Sink { .. }),
911            InvalidOperation: "cannot create a sink on top of another sink"
912        );
913
914        self.logical_plan = DslPlan::Sink {
915            input: Arc::new(self.logical_plan),
916            payload: SinkType::Callback(CallbackSinkType {
917                function,
918                maintain_order,
919                chunk_size,
920            }),
921        };
922
923        Ok(self)
924    }
925
926    #[cfg(feature = "new_streaming")]
927    pub fn try_new_streaming_if_requested(
928        &mut self,
929    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
930        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
931        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
932
933        if auto_new_streaming || force_new_streaming {
934            // Try to run using the new streaming engine, falling back
935            // if it fails in a todo!() error if auto_new_streaming is set.
936            let mut new_stream_lazy = self.clone();
937            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
938            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
939                Ok(v) => v,
940                Err(e) => return Some(Err(e)),
941            };
942
943            let f = || {
944                polars_stream::run_query(
945                    alp_plan.lp_top,
946                    &mut alp_plan.lp_arena,
947                    &mut alp_plan.expr_arena,
948                )
949            };
950
951            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
952                Ok(v) => return Some(v),
953                Err(e) => {
954                    // Fallback to normal engine if error is due to not being implemented
955                    // and auto_new_streaming is set, otherwise propagate error.
956                    if !force_new_streaming
957                        && auto_new_streaming
958                        && e.downcast_ref::<&str>()
959                            .map(|s| s.starts_with("not yet implemented"))
960                            .unwrap_or(false)
961                    {
962                        if polars_core::config::verbose() {
963                            eprintln!(
964                                "caught unimplemented error in new streaming engine, falling back to normal engine"
965                            );
966                        }
967                    } else {
968                        std::panic::resume_unwind(e);
969                    }
970                },
971            }
972        }
973
974        None
975    }
976
977    pub fn sink(
978        mut self,
979        sink_type: SinkDestination,
980        file_format: impl Into<Arc<FileType>>,
981        unified_sink_args: UnifiedSinkArgs,
982    ) -> PolarsResult<Self> {
983        polars_ensure!(
984            !matches!(self.logical_plan, DslPlan::Sink { .. }),
985            InvalidOperation: "cannot create a sink on top of another sink"
986        );
987
988        self.logical_plan = DslPlan::Sink {
989            input: Arc::new(self.logical_plan),
990            payload: match sink_type {
991                SinkDestination::File { target } => SinkType::File(FileSinkOptions {
992                    target,
993                    file_format: file_format.into(),
994                    unified_sink_args,
995                }),
996                SinkDestination::Partitioned {
997                    base_path,
998                    file_path_provider,
999                    partition_strategy,
1000                    finish_callback,
1001                    max_rows_per_file,
1002                    approximate_bytes_per_file,
1003                } => SinkType::Partitioned(PartitionedSinkOptions {
1004                    base_path,
1005                    file_path_provider,
1006                    partition_strategy,
1007                    finish_callback,
1008                    file_format: file_format.into(),
1009                    unified_sink_args,
1010                    max_rows_per_file,
1011                    approximate_bytes_per_file,
1012                }),
1013            },
1014        };
1015        Ok(self)
1016    }
1017
1018    /// Filter frame rows that match a predicate expression.
1019    ///
1020    /// The expression must yield boolean values (note that rows where the
1021    /// predicate resolves to `null` are *not* included in the resulting frame).
1022    ///
1023    /// # Example
1024    ///
1025    /// ```rust
1026    /// use polars_core::prelude::*;
1027    /// use polars_lazy::prelude::*;
1028    ///
1029    /// fn example(df: DataFrame) -> LazyFrame {
1030    ///       df.lazy()
1031    ///         .filter(col("sepal_width").is_not_null())
1032    ///         .select([col("sepal_width"), col("sepal_length")])
1033    /// }
1034    /// ```
1035    pub fn filter(self, predicate: Expr) -> Self {
1036        let opt_state = self.get_opt_state();
1037        let lp = self.get_plan_builder().filter(predicate).build();
1038        Self::from_logical_plan(lp, opt_state)
1039    }
1040
1041    /// Remove frame rows that match a predicate expression.
1042    ///
1043    /// The expression must yield boolean values (note that rows where the
1044    /// predicate resolves to `null` are *not* removed from the resulting frame).
1045    ///
1046    /// # Example
1047    ///
1048    /// ```rust
1049    /// use polars_core::prelude::*;
1050    /// use polars_lazy::prelude::*;
1051    ///
1052    /// fn example(df: DataFrame) -> LazyFrame {
1053    ///       df.lazy()
1054    ///         .remove(col("sepal_width").is_null())
1055    ///         .select([col("sepal_width"), col("sepal_length")])
1056    /// }
1057    /// ```
1058    pub fn remove(self, predicate: Expr) -> Self {
1059        self.filter(predicate.neq_missing(lit(true)))
1060    }
1061
1062    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1063    ///
1064    /// Columns can be selected with [`col`];
1065    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1066    ///
1067    /// # Example
1068    ///
1069    /// ```rust
1070    /// use polars_core::prelude::*;
1071    /// use polars_lazy::prelude::*;
1072    ///
1073    /// /// This function selects column "foo" and column "bar".
1074    /// /// Column "bar" is renamed to "ham".
1075    /// fn example(df: DataFrame) -> LazyFrame {
1076    ///       df.lazy()
1077    ///         .select([col("foo"),
1078    ///                   col("bar").alias("ham")])
1079    /// }
1080    ///
1081    /// /// This function selects all columns except "foo"
1082    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1083    ///       df.lazy()
1084    ///         .select([all().exclude_cols(["foo"]).as_expr()])
1085    /// }
1086    /// ```
1087    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1088        let exprs = exprs.as_ref().to_vec();
1089        self.select_impl(
1090            exprs,
1091            ProjectionOptions {
1092                run_parallel: true,
1093                duplicate_check: true,
1094                should_broadcast: true,
1095            },
1096        )
1097    }
1098
1099    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1100        let exprs = exprs.as_ref().to_vec();
1101        self.select_impl(
1102            exprs,
1103            ProjectionOptions {
1104                run_parallel: false,
1105                duplicate_check: true,
1106                should_broadcast: true,
1107            },
1108        )
1109    }
1110
1111    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1112        let opt_state = self.get_opt_state();
1113        let lp = self.get_plan_builder().project(exprs, options).build();
1114        Self::from_logical_plan(lp, opt_state)
1115    }
1116
1117    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1118    ///
1119    /// Takes a list of expressions to group on.
1120    ///
1121    /// # Example
1122    ///
1123    /// ```rust
1124    /// use polars_core::prelude::*;
1125    /// use polars_lazy::prelude::*;
1126    ///
1127    /// fn example(df: DataFrame) -> LazyFrame {
1128    ///       df.lazy()
1129    ///        .group_by([col("date")])
1130    ///        .agg([
1131    ///            col("rain").min().alias("min_rain"),
1132    ///            col("rain").sum().alias("sum_rain"),
1133    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1134    ///        ])
1135    /// }
1136    /// ```
1137    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1138        let keys = by
1139            .as_ref()
1140            .iter()
1141            .map(|e| e.clone().into())
1142            .collect::<Vec<_>>();
1143        let opt_state = self.get_opt_state();
1144
1145        #[cfg(feature = "dynamic_group_by")]
1146        {
1147            LazyGroupBy {
1148                logical_plan: self.logical_plan,
1149                opt_state,
1150                keys,
1151                predicates: vec![],
1152                maintain_order: false,
1153                dynamic_options: None,
1154                rolling_options: None,
1155            }
1156        }
1157
1158        #[cfg(not(feature = "dynamic_group_by"))]
1159        {
1160            LazyGroupBy {
1161                logical_plan: self.logical_plan,
1162                opt_state,
1163                keys,
1164                predicates: vec![],
1165                maintain_order: false,
1166            }
1167        }
1168    }
1169
1170    /// Create rolling groups based on a time column.
1171    ///
1172    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1173    ///
1174    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1175    /// individual values and are not of constant intervals. For constant intervals use
1176    /// *group_by_dynamic*
1177    #[cfg(feature = "dynamic_group_by")]
1178    pub fn rolling<E: AsRef<[Expr]>>(
1179        mut self,
1180        index_column: Expr,
1181        group_by: E,
1182        mut options: RollingGroupOptions,
1183    ) -> LazyGroupBy {
1184        if let Expr::Column(name) = index_column {
1185            options.index_column = name;
1186        } else {
1187            let output_field = index_column
1188                .to_field(&self.collect_schema().unwrap())
1189                .unwrap();
1190            return self.with_column(index_column).rolling(
1191                Expr::Column(output_field.name().clone()),
1192                group_by,
1193                options,
1194            );
1195        }
1196        let opt_state = self.get_opt_state();
1197        LazyGroupBy {
1198            logical_plan: self.logical_plan,
1199            opt_state,
1200            predicates: vec![],
1201            keys: group_by.as_ref().to_vec(),
1202            maintain_order: true,
1203            dynamic_options: None,
1204            rolling_options: Some(options),
1205        }
1206    }
1207
1208    /// Group based on a time value (or index value of type Int32, Int64).
1209    ///
1210    /// Time windows are calculated and rows are assigned to windows. Different from a
1211    /// normal group_by is that a row can be member of multiple groups. The time/index
1212    /// window could be seen as a rolling window, with a window size determined by
1213    /// dates/times/values instead of slots in the DataFrame.
1214    ///
1215    /// A window is defined by:
1216    ///
1217    /// - every: interval of the window
1218    /// - period: length of the window
1219    /// - offset: offset of the window
1220    ///
1221    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1222    /// with a ordinary group_by on these keys.
1223    #[cfg(feature = "dynamic_group_by")]
1224    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1225        mut self,
1226        index_column: Expr,
1227        group_by: E,
1228        mut options: DynamicGroupOptions,
1229    ) -> LazyGroupBy {
1230        if let Expr::Column(name) = index_column {
1231            options.index_column = name;
1232        } else {
1233            let output_field = index_column
1234                .to_field(&self.collect_schema().unwrap())
1235                .unwrap();
1236            return self.with_column(index_column).group_by_dynamic(
1237                Expr::Column(output_field.name().clone()),
1238                group_by,
1239                options,
1240            );
1241        }
1242        let opt_state = self.get_opt_state();
1243        LazyGroupBy {
1244            logical_plan: self.logical_plan,
1245            opt_state,
1246            predicates: vec![],
1247            keys: group_by.as_ref().to_vec(),
1248            maintain_order: true,
1249            dynamic_options: Some(options),
1250            rolling_options: None,
1251        }
1252    }
1253
1254    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1255    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1256        let keys = by
1257            .as_ref()
1258            .iter()
1259            .map(|e| e.clone().into())
1260            .collect::<Vec<_>>();
1261        let opt_state = self.get_opt_state();
1262
1263        #[cfg(feature = "dynamic_group_by")]
1264        {
1265            LazyGroupBy {
1266                logical_plan: self.logical_plan,
1267                opt_state,
1268                keys,
1269                predicates: vec![],
1270                maintain_order: true,
1271                dynamic_options: None,
1272                rolling_options: None,
1273            }
1274        }
1275
1276        #[cfg(not(feature = "dynamic_group_by"))]
1277        {
1278            LazyGroupBy {
1279                logical_plan: self.logical_plan,
1280                opt_state,
1281                keys,
1282                predicates: vec![],
1283                maintain_order: true,
1284            }
1285        }
1286    }
1287
1288    /// Left anti join this query with another lazy query.
1289    ///
1290    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1291    /// flexible join logic, see [`join`](LazyFrame::join) or
1292    /// [`join_builder`](LazyFrame::join_builder).
1293    ///
1294    /// # Example
1295    ///
1296    /// ```rust
1297    /// use polars_core::prelude::*;
1298    /// use polars_lazy::prelude::*;
1299    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1300    ///         ldf
1301    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1302    /// }
1303    /// ```
1304    #[cfg(feature = "semi_anti_join")]
1305    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1306        self.join(
1307            other,
1308            [left_on.into()],
1309            [right_on.into()],
1310            JoinArgs::new(JoinType::Anti),
1311        )
1312    }
1313
1314    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1315    #[cfg(feature = "cross_join")]
1316    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1317        self.join(
1318            other,
1319            vec![],
1320            vec![],
1321            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1322        )
1323    }
1324
1325    /// Left outer join this query with another lazy query.
1326    ///
1327    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1328    /// flexible join logic, see [`join`](LazyFrame::join) or
1329    /// [`join_builder`](LazyFrame::join_builder).
1330    ///
1331    /// # Example
1332    ///
1333    /// ```rust
1334    /// use polars_core::prelude::*;
1335    /// use polars_lazy::prelude::*;
1336    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1337    ///         ldf
1338    ///         .left_join(other, col("foo"), col("bar"))
1339    /// }
1340    /// ```
1341    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1342        self.join(
1343            other,
1344            [left_on.into()],
1345            [right_on.into()],
1346            JoinArgs::new(JoinType::Left),
1347        )
1348    }
1349
1350    /// Inner join this query with another lazy query.
1351    ///
1352    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1353    /// flexible join logic, see [`join`](LazyFrame::join) or
1354    /// [`join_builder`](LazyFrame::join_builder).
1355    ///
1356    /// # Example
1357    ///
1358    /// ```rust
1359    /// use polars_core::prelude::*;
1360    /// use polars_lazy::prelude::*;
1361    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1362    ///         ldf
1363    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1364    /// }
1365    /// ```
1366    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1367        self.join(
1368            other,
1369            [left_on.into()],
1370            [right_on.into()],
1371            JoinArgs::new(JoinType::Inner),
1372        )
1373    }
1374
1375    /// Full outer join this query with another lazy query.
1376    ///
1377    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1378    /// flexible join logic, see [`join`](LazyFrame::join) or
1379    /// [`join_builder`](LazyFrame::join_builder).
1380    ///
1381    /// # Example
1382    ///
1383    /// ```rust
1384    /// use polars_core::prelude::*;
1385    /// use polars_lazy::prelude::*;
1386    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1387    ///         ldf
1388    ///         .full_join(other, col("foo"), col("bar"))
1389    /// }
1390    /// ```
1391    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1392        self.join(
1393            other,
1394            [left_on.into()],
1395            [right_on.into()],
1396            JoinArgs::new(JoinType::Full),
1397        )
1398    }
1399
1400    /// Left semi join this query with another lazy query.
1401    ///
1402    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1403    /// flexible join logic, see [`join`](LazyFrame::join) or
1404    /// [`join_builder`](LazyFrame::join_builder).
1405    ///
1406    /// # Example
1407    ///
1408    /// ```rust
1409    /// use polars_core::prelude::*;
1410    /// use polars_lazy::prelude::*;
1411    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1412    ///         ldf
1413    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1414    /// }
1415    /// ```
1416    #[cfg(feature = "semi_anti_join")]
1417    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1418        self.join(
1419            other,
1420            [left_on.into()],
1421            [right_on.into()],
1422            JoinArgs::new(JoinType::Semi),
1423        )
1424    }
1425
1426    /// Generic function to join two LazyFrames.
1427    ///
1428    /// `join` can join on multiple columns, given as two list of expressions, and with a
1429    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1430    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1431    /// over how columns are renamed and parallelization options, use
1432    /// [`join_builder`](LazyFrame::join_builder).
1433    ///
1434    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1435    ///
1436    /// # Example
1437    ///
1438    /// ```rust
1439    /// use polars_core::prelude::*;
1440    /// use polars_lazy::prelude::*;
1441    ///
1442    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1443    ///         ldf
1444    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1445    /// }
1446    /// ```
1447    pub fn join<E: AsRef<[Expr]>>(
1448        self,
1449        other: LazyFrame,
1450        left_on: E,
1451        right_on: E,
1452        args: JoinArgs,
1453    ) -> LazyFrame {
1454        let left_on = left_on.as_ref().to_vec();
1455        let right_on = right_on.as_ref().to_vec();
1456
1457        self._join_impl(other, left_on, right_on, args)
1458    }
1459
1460    fn _join_impl(
1461        self,
1462        other: LazyFrame,
1463        left_on: Vec<Expr>,
1464        right_on: Vec<Expr>,
1465        args: JoinArgs,
1466    ) -> LazyFrame {
1467        let JoinArgs {
1468            how,
1469            validation,
1470            suffix,
1471            slice,
1472            nulls_equal,
1473            coalesce,
1474            maintain_order,
1475        } = args;
1476
1477        if slice.is_some() {
1478            panic!("impl error: slice is not handled")
1479        }
1480
1481        let mut builder = self
1482            .join_builder()
1483            .with(other)
1484            .left_on(left_on)
1485            .right_on(right_on)
1486            .how(how)
1487            .validate(validation)
1488            .join_nulls(nulls_equal)
1489            .coalesce(coalesce)
1490            .maintain_order(maintain_order);
1491
1492        if let Some(suffix) = suffix {
1493            builder = builder.suffix(suffix);
1494        }
1495
1496        // Note: args.slice is set by the optimizer
1497        builder.finish()
1498    }
1499
1500    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1501    ///
1502    /// After the `JoinBuilder` has been created and set up, calling
1503    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1504    /// representing the `join` operation.
1505    pub fn join_builder(self) -> JoinBuilder {
1506        JoinBuilder::new(self)
1507    }
1508
1509    /// Add or replace a column, given as an expression, to a DataFrame.
1510    ///
1511    /// # Example
1512    ///
1513    /// ```rust
1514    /// use polars_core::prelude::*;
1515    /// use polars_lazy::prelude::*;
1516    /// fn add_column(df: DataFrame) -> LazyFrame {
1517    ///     df.lazy()
1518    ///         .with_column(
1519    ///             when(col("sepal_length").lt(lit(5.0)))
1520    ///             .then(lit(10))
1521    ///             .otherwise(lit(1))
1522    ///             .alias("new_column_name"),
1523    ///         )
1524    /// }
1525    /// ```
1526    pub fn with_column(self, expr: Expr) -> LazyFrame {
1527        let opt_state = self.get_opt_state();
1528        let lp = self
1529            .get_plan_builder()
1530            .with_columns(
1531                vec![expr],
1532                ProjectionOptions {
1533                    run_parallel: false,
1534                    duplicate_check: true,
1535                    should_broadcast: true,
1536                },
1537            )
1538            .build();
1539        Self::from_logical_plan(lp, opt_state)
1540    }
1541
1542    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1543    ///
1544    /// # Example
1545    ///
1546    /// ```rust
1547    /// use polars_core::prelude::*;
1548    /// use polars_lazy::prelude::*;
1549    /// fn add_columns(df: DataFrame) -> LazyFrame {
1550    ///     df.lazy()
1551    ///         .with_columns(
1552    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1553    ///          )
1554    /// }
1555    /// ```
1556    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1557        let exprs = exprs.as_ref().to_vec();
1558        self.with_columns_impl(
1559            exprs,
1560            ProjectionOptions {
1561                run_parallel: true,
1562                duplicate_check: true,
1563                should_broadcast: true,
1564            },
1565        )
1566    }
1567
1568    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1569    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1570        let exprs = exprs.as_ref().to_vec();
1571        self.with_columns_impl(
1572            exprs,
1573            ProjectionOptions {
1574                run_parallel: false,
1575                duplicate_check: true,
1576                should_broadcast: true,
1577            },
1578        )
1579    }
1580
1581    /// Match or evolve to a certain schema.
1582    pub fn match_to_schema(
1583        self,
1584        schema: SchemaRef,
1585        per_column: Arc<[MatchToSchemaPerColumn]>,
1586        extra_columns: ExtraColumnsPolicy,
1587    ) -> LazyFrame {
1588        let opt_state = self.get_opt_state();
1589        let lp = self
1590            .get_plan_builder()
1591            .match_to_schema(schema, per_column, extra_columns)
1592            .build();
1593        Self::from_logical_plan(lp, opt_state)
1594    }
1595
1596    pub fn pipe_with_schema(
1597        self,
1598        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1599    ) -> Self {
1600        let opt_state = self.get_opt_state();
1601        let lp = self
1602            .get_plan_builder()
1603            .pipe_with_schema(vec![], callback)
1604            .build();
1605        Self::from_logical_plan(lp, opt_state)
1606    }
1607
1608    pub fn pipe_with_schemas(
1609        self,
1610        others: Vec<LazyFrame>,
1611        callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1612    ) -> Self {
1613        let opt_state = self.get_opt_state();
1614        let lp = self
1615            .get_plan_builder()
1616            .pipe_with_schema(
1617                others.into_iter().map(|lf| lf.logical_plan).collect(),
1618                callback,
1619            )
1620            .build();
1621        Self::from_logical_plan(lp, opt_state)
1622    }
1623
1624    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1625        let opt_state = self.get_opt_state();
1626        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1627        Self::from_logical_plan(lp, opt_state)
1628    }
1629
1630    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1631        let contexts = contexts
1632            .as_ref()
1633            .iter()
1634            .map(|lf| lf.logical_plan.clone())
1635            .collect();
1636        let opt_state = self.get_opt_state();
1637        let lp = self.get_plan_builder().with_context(contexts).build();
1638        Self::from_logical_plan(lp, opt_state)
1639    }
1640
1641    /// Aggregate all the columns as their maximum values.
1642    ///
1643    /// Aggregated columns will have the same names as the original columns.
1644    pub fn max(self) -> Self {
1645        self.map_private(DslFunction::Stats(StatsFunction::Max))
1646    }
1647
1648    /// Aggregate all the columns as their minimum values.
1649    ///
1650    /// Aggregated columns will have the same names as the original columns.
1651    pub fn min(self) -> Self {
1652        self.map_private(DslFunction::Stats(StatsFunction::Min))
1653    }
1654
1655    /// Aggregate all the columns as their sum values.
1656    ///
1657    /// Aggregated columns will have the same names as the original columns.
1658    ///
1659    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1660    /// - For integer columns, the ordinary checks for overflow are performed:
1661    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1662    ///   silently wrap.
1663    /// - String columns will sum to None.
1664    pub fn sum(self) -> Self {
1665        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1666    }
1667
1668    /// Aggregate all the columns as their mean values.
1669    ///
1670    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1671    /// - String columns will have a mean of None.
1672    pub fn mean(self) -> Self {
1673        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1674    }
1675
1676    /// Aggregate all the columns as their median values.
1677    ///
1678    /// - Boolean and integer results are converted to `f64`. However, they are still
1679    ///   susceptible to overflow before this conversion occurs.
1680    /// - String columns will sum to None.
1681    pub fn median(self) -> Self {
1682        self.map_private(DslFunction::Stats(StatsFunction::Median))
1683    }
1684
1685    /// Aggregate all the columns as their quantile values.
1686    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1687        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1688            quantile,
1689            method,
1690        }))
1691    }
1692
1693    /// Aggregate all the columns as their standard deviation values.
1694    ///
1695    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1696    /// computing the variance, where `N` is the number of rows.
1697    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1698    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1699    /// > likelihood estimate of the variance for normally distributed variables. The
1700    /// > standard deviation computed in this function is the square root of the estimated
1701    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1702    /// > standard deviation per se.
1703    ///
1704    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1705    pub fn std(self, ddof: u8) -> Self {
1706        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1707    }
1708
1709    /// Aggregate all the columns as their variance values.
1710    ///
1711    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1712    /// computing the variance, where `N` is the number of rows.
1713    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1714    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1715    /// > likelihood estimate of the variance for normally distributed variables.
1716    ///
1717    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1718    pub fn var(self, ddof: u8) -> Self {
1719        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1720    }
1721
1722    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1723    pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1724        self.explode_impl(columns, options, false)
1725    }
1726
1727    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1728    fn explode_impl(
1729        self,
1730        columns: Selector,
1731        options: ExplodeOptions,
1732        allow_empty: bool,
1733    ) -> LazyFrame {
1734        let opt_state = self.get_opt_state();
1735        let lp = self
1736            .get_plan_builder()
1737            .explode(columns, options, allow_empty)
1738            .build();
1739        Self::from_logical_plan(lp, opt_state)
1740    }
1741
1742    /// Aggregate all the columns as the sum of their null value count.
1743    pub fn null_count(self) -> LazyFrame {
1744        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1745    }
1746
1747    /// Drop non-unique rows and maintain the order of kept rows.
1748    ///
1749    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1750    /// `None`, all columns are considered.
1751    pub fn unique_stable(
1752        self,
1753        subset: Option<Selector>,
1754        keep_strategy: UniqueKeepStrategy,
1755    ) -> LazyFrame {
1756        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1757        self.unique_stable_generic(subset, keep_strategy)
1758    }
1759
1760    pub fn unique_stable_generic(
1761        self,
1762        subset: Option<Vec<Expr>>,
1763        keep_strategy: UniqueKeepStrategy,
1764    ) -> LazyFrame {
1765        let opt_state = self.get_opt_state();
1766        let options = DistinctOptionsDSL {
1767            subset,
1768            maintain_order: true,
1769            keep_strategy,
1770        };
1771        let lp = self.get_plan_builder().distinct(options).build();
1772        Self::from_logical_plan(lp, opt_state)
1773    }
1774
1775    /// Drop non-unique rows without maintaining the order of kept rows.
1776    ///
1777    /// The order of the kept rows may change; to maintain the original row order, use
1778    /// [`unique_stable`](LazyFrame::unique_stable).
1779    ///
1780    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1781    /// all columns are considered.
1782    pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1783        let subset = subset.map(|s| vec![Expr::Selector(s)]);
1784        self.unique_generic(subset, keep_strategy)
1785    }
1786
1787    pub fn unique_generic(
1788        self,
1789        subset: Option<Vec<Expr>>,
1790        keep_strategy: UniqueKeepStrategy,
1791    ) -> LazyFrame {
1792        let opt_state = self.get_opt_state();
1793        let options = DistinctOptionsDSL {
1794            subset,
1795            maintain_order: false,
1796            keep_strategy,
1797        };
1798        let lp = self.get_plan_builder().distinct(options).build();
1799        Self::from_logical_plan(lp, opt_state)
1800    }
1801
1802    /// Drop rows containing one or more NaN values.
1803    ///
1804    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1805    /// floating point columns are considered.
1806    pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1807        let opt_state = self.get_opt_state();
1808        let lp = self.get_plan_builder().drop_nans(subset).build();
1809        Self::from_logical_plan(lp, opt_state)
1810    }
1811
1812    /// Drop rows containing one or more None values.
1813    ///
1814    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1815    /// columns are considered.
1816    pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1817        let opt_state = self.get_opt_state();
1818        let lp = self.get_plan_builder().drop_nulls(subset).build();
1819        Self::from_logical_plan(lp, opt_state)
1820    }
1821
1822    /// Slice the DataFrame using an offset (starting row) and a length.
1823    ///
1824    /// If `offset` is negative, it is counted from the end of the DataFrame. For
1825    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1826    /// end.
1827    ///
1828    /// If `offset` and `len` are such that the slice extends beyond the end of the
1829    /// DataFrame, the portion between `offset` and the end will be returned. In this
1830    /// case, the number of rows in the returned DataFrame will be less than `len`.
1831    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1832        let opt_state = self.get_opt_state();
1833        let lp = self.get_plan_builder().slice(offset, len).build();
1834        Self::from_logical_plan(lp, opt_state)
1835    }
1836
1837    /// Get the first row.
1838    ///
1839    /// Equivalent to `self.slice(0, 1)`.
1840    pub fn first(self) -> LazyFrame {
1841        self.slice(0, 1)
1842    }
1843
1844    /// Get the last row.
1845    ///
1846    /// Equivalent to `self.slice(-1, 1)`.
1847    pub fn last(self) -> LazyFrame {
1848        self.slice(-1, 1)
1849    }
1850
1851    /// Get the last `n` rows.
1852    ///
1853    /// Equivalent to `self.slice(-(n as i64), n)`.
1854    pub fn tail(self, n: IdxSize) -> LazyFrame {
1855        let neg_tail = -(n as i64);
1856        self.slice(neg_tail, n)
1857    }
1858
1859    #[cfg(feature = "pivot")]
1860    #[expect(clippy::too_many_arguments)]
1861    pub fn pivot(
1862        self,
1863        on: Selector,
1864        on_columns: Arc<DataFrame>,
1865        index: Selector,
1866        values: Selector,
1867        agg: Expr,
1868        maintain_order: bool,
1869        separator: PlSmallStr,
1870    ) -> LazyFrame {
1871        let opt_state = self.get_opt_state();
1872        let lp = self
1873            .get_plan_builder()
1874            .pivot(
1875                on,
1876                on_columns,
1877                index,
1878                values,
1879                agg,
1880                maintain_order,
1881                separator,
1882            )
1883            .build();
1884        Self::from_logical_plan(lp, opt_state)
1885    }
1886
1887    /// Unpivot the DataFrame from wide to long format.
1888    ///
1889    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1890    #[cfg(feature = "pivot")]
1891    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1892        let opt_state = self.get_opt_state();
1893        let lp = self.get_plan_builder().unpivot(args).build();
1894        Self::from_logical_plan(lp, opt_state)
1895    }
1896
1897    /// Limit the DataFrame to the first `n` rows.
1898    pub fn limit(self, n: IdxSize) -> LazyFrame {
1899        self.slice(0, n)
1900    }
1901
1902    /// Apply a function/closure once the logical plan get executed.
1903    ///
1904    /// The function has access to the whole materialized DataFrame at the time it is
1905    /// called.
1906    ///
1907    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1908    /// with `LazyFrame::with_column` or `with_columns`.
1909    ///
1910    /// ## Warning
1911    /// This can blow up in your face if the schema is changed due to the operation. The
1912    /// optimizer relies on a correct schema.
1913    ///
1914    /// You can toggle certain optimizations off.
1915    pub fn map<F>(
1916        self,
1917        function: F,
1918        optimizations: AllowedOptimizations,
1919        schema: Option<Arc<dyn UdfSchema>>,
1920        name: Option<&'static str>,
1921    ) -> LazyFrame
1922    where
1923        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1924    {
1925        let opt_state = self.get_opt_state();
1926        let lp = self
1927            .get_plan_builder()
1928            .map(
1929                function,
1930                optimizations,
1931                schema,
1932                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1933            )
1934            .build();
1935        Self::from_logical_plan(lp, opt_state)
1936    }
1937
1938    #[cfg(feature = "python")]
1939    pub fn map_python(
1940        self,
1941        function: polars_utils::python_function::PythonFunction,
1942        optimizations: AllowedOptimizations,
1943        schema: Option<SchemaRef>,
1944        validate_output: bool,
1945    ) -> LazyFrame {
1946        let opt_state = self.get_opt_state();
1947        let lp = self
1948            .get_plan_builder()
1949            .map_python(function, optimizations, schema, validate_output)
1950            .build();
1951        Self::from_logical_plan(lp, opt_state)
1952    }
1953
1954    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1955        let opt_state = self.get_opt_state();
1956        let lp = self.get_plan_builder().map_private(function).build();
1957        Self::from_logical_plan(lp, opt_state)
1958    }
1959
1960    /// Add a new column at index 0 that counts the rows.
1961    ///
1962    /// `name` is the name of the new column. `offset` is where to start counting from; if
1963    /// `None`, it is set to `0`.
1964    ///
1965    /// # Warning
1966    /// This can have a negative effect on query performance. This may for instance block
1967    /// predicate pushdown optimization.
1968    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1969    where
1970        S: Into<PlSmallStr>,
1971    {
1972        let name = name.into();
1973
1974        match &self.logical_plan {
1975            v @ DslPlan::Scan {
1976                scan_type,
1977                unified_scan_args,
1978                ..
1979            } if unified_scan_args.row_index.is_none()
1980                && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1981            {
1982                let DslPlan::Scan {
1983                    sources,
1984                    mut unified_scan_args,
1985                    scan_type,
1986                    cached_ir: _,
1987                } = v.clone()
1988                else {
1989                    unreachable!()
1990                };
1991
1992                unified_scan_args.row_index = Some(RowIndex {
1993                    name,
1994                    offset: offset.unwrap_or(0),
1995                });
1996
1997                DslPlan::Scan {
1998                    sources,
1999                    unified_scan_args,
2000                    scan_type,
2001                    cached_ir: Default::default(),
2002                }
2003                .into()
2004            },
2005            _ => self.map_private(DslFunction::RowIndex { name, offset }),
2006        }
2007    }
2008
2009    /// Return the number of non-null elements for each column.
2010    pub fn count(self) -> LazyFrame {
2011        self.select(vec![col(PlSmallStr::from_static("*")).count()])
2012    }
2013
2014    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2015    /// inserted as columns.
2016    #[cfg(feature = "dtype-struct")]
2017    pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2018        self.map_private(DslFunction::Unnest {
2019            columns: cols,
2020            separator,
2021        })
2022    }
2023
2024    #[cfg(feature = "merge_sorted")]
2025    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2026    where
2027        S: Into<PlSmallStr>,
2028    {
2029        let key = key.into();
2030
2031        let lp = DslPlan::MergeSorted {
2032            input_left: Arc::new(self.logical_plan),
2033            input_right: Arc::new(other.logical_plan),
2034            key,
2035        };
2036        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2037    }
2038
2039    pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
2040        let lp = DslPlan::MapFunction {
2041            input: Arc::new(self.logical_plan),
2042            function: DslFunction::Hint(hint),
2043        };
2044        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2045    }
2046}
2047
2048/// Utility struct for lazy group_by operation.
2049#[derive(Clone)]
2050pub struct LazyGroupBy {
2051    pub logical_plan: DslPlan,
2052    opt_state: OptFlags,
2053    keys: Vec<Expr>,
2054    predicates: Vec<Expr>,
2055    maintain_order: bool,
2056    #[cfg(feature = "dynamic_group_by")]
2057    dynamic_options: Option<DynamicGroupOptions>,
2058    #[cfg(feature = "dynamic_group_by")]
2059    rolling_options: Option<RollingGroupOptions>,
2060}
2061
2062impl From<LazyGroupBy> for LazyFrame {
2063    fn from(lgb: LazyGroupBy) -> Self {
2064        Self {
2065            logical_plan: lgb.logical_plan,
2066            opt_state: lgb.opt_state,
2067            cached_arena: Default::default(),
2068        }
2069    }
2070}
2071
2072impl LazyGroupBy {
2073    /// Filter groups with a predicate after aggregation.
2074    ///
2075    /// Similarly to the [LazyGroupBy::agg] method, the predicate must run an aggregation as it
2076    /// is evaluated on the groups.
2077    /// This method can be chained in which case all predicates must evaluate to `true` for a
2078    /// group to be kept.
2079    ///
2080    /// # Example
2081    ///
2082    /// ```rust
2083    /// use polars_core::prelude::*;
2084    /// use polars_lazy::prelude::*;
2085    ///
2086    /// fn example(df: DataFrame) -> LazyFrame {
2087    ///       df.lazy()
2088    ///        .group_by_stable([col("date")])
2089    ///        .having(col("rain").sum().gt(lit(10)))
2090    ///        .agg([col("rain").min().alias("min_rain")])
2091    /// }
2092    /// ```
2093    pub fn having(mut self, predicate: Expr) -> Self {
2094        self.predicates.push(predicate);
2095        self
2096    }
2097
2098    /// Group by and aggregate.
2099    ///
2100    /// Select a column with [col] and choose an aggregation.
2101    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2102    ///
2103    /// # Example
2104    ///
2105    /// ```rust
2106    /// use polars_core::prelude::*;
2107    /// use polars_lazy::prelude::*;
2108    ///
2109    /// fn example(df: DataFrame) -> LazyFrame {
2110    ///       df.lazy()
2111    ///        .group_by_stable([col("date")])
2112    ///        .agg([
2113    ///            col("rain").min().alias("min_rain"),
2114    ///            col("rain").sum().alias("sum_rain"),
2115    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2116    ///        ])
2117    /// }
2118    /// ```
2119    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2120        #[cfg(feature = "dynamic_group_by")]
2121        let lp = DslBuilder::from(self.logical_plan)
2122            .group_by(
2123                self.keys,
2124                self.predicates,
2125                aggs,
2126                None,
2127                self.maintain_order,
2128                self.dynamic_options,
2129                self.rolling_options,
2130            )
2131            .build();
2132
2133        #[cfg(not(feature = "dynamic_group_by"))]
2134        let lp = DslBuilder::from(self.logical_plan)
2135            .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2136            .build();
2137        LazyFrame::from_logical_plan(lp, self.opt_state)
2138    }
2139
2140    /// Return first n rows of each group
2141    pub fn head(self, n: Option<usize>) -> LazyFrame {
2142        let keys = self
2143            .keys
2144            .iter()
2145            .filter_map(|expr| expr_output_name(expr).ok())
2146            .collect::<Vec<_>>();
2147
2148        self.agg([all().as_expr().head(n)]).explode_impl(
2149            all() - by_name(keys.iter().cloned(), false),
2150            ExplodeOptions {
2151                empty_as_null: true,
2152                keep_nulls: true,
2153            },
2154            true,
2155        )
2156    }
2157
2158    /// Return last n rows of each group
2159    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2160        let keys = self
2161            .keys
2162            .iter()
2163            .filter_map(|expr| expr_output_name(expr).ok())
2164            .collect::<Vec<_>>();
2165
2166        self.agg([all().as_expr().tail(n)]).explode_impl(
2167            all() - by_name(keys.iter().cloned(), false),
2168            ExplodeOptions {
2169                empty_as_null: true,
2170                keep_nulls: true,
2171            },
2172            true,
2173        )
2174    }
2175
2176    /// Apply a function over the groups as a new DataFrame.
2177    ///
2178    /// **It is not recommended that you use this as materializing the DataFrame is very
2179    /// expensive.**
2180    pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2181        if !self.predicates.is_empty() {
2182            panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2183        }
2184
2185        #[cfg(feature = "dynamic_group_by")]
2186        let options = GroupbyOptions {
2187            dynamic: self.dynamic_options,
2188            rolling: self.rolling_options,
2189            slice: None,
2190        };
2191
2192        #[cfg(not(feature = "dynamic_group_by"))]
2193        let options = GroupbyOptions { slice: None };
2194
2195        let lp = DslPlan::GroupBy {
2196            input: Arc::new(self.logical_plan),
2197            keys: self.keys,
2198            predicates: vec![],
2199            aggs: vec![],
2200            apply: Some((f, schema)),
2201            maintain_order: self.maintain_order,
2202            options: Arc::new(options),
2203        };
2204        LazyFrame::from_logical_plan(lp, self.opt_state)
2205    }
2206}
2207
2208#[must_use]
2209pub struct JoinBuilder {
2210    lf: LazyFrame,
2211    how: JoinType,
2212    other: Option<LazyFrame>,
2213    left_on: Vec<Expr>,
2214    right_on: Vec<Expr>,
2215    allow_parallel: bool,
2216    force_parallel: bool,
2217    suffix: Option<PlSmallStr>,
2218    validation: JoinValidation,
2219    nulls_equal: bool,
2220    coalesce: JoinCoalesce,
2221    maintain_order: MaintainOrderJoin,
2222}
2223impl JoinBuilder {
2224    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2225    pub fn new(lf: LazyFrame) -> Self {
2226        Self {
2227            lf,
2228            other: None,
2229            how: JoinType::Inner,
2230            left_on: vec![],
2231            right_on: vec![],
2232            allow_parallel: true,
2233            force_parallel: false,
2234            suffix: None,
2235            validation: Default::default(),
2236            nulls_equal: false,
2237            coalesce: Default::default(),
2238            maintain_order: Default::default(),
2239        }
2240    }
2241
2242    /// The right table in the join.
2243    pub fn with(mut self, other: LazyFrame) -> Self {
2244        self.other = Some(other);
2245        self
2246    }
2247
2248    /// Select the join type.
2249    pub fn how(mut self, how: JoinType) -> Self {
2250        self.how = how;
2251        self
2252    }
2253
2254    pub fn validate(mut self, validation: JoinValidation) -> Self {
2255        self.validation = validation;
2256        self
2257    }
2258
2259    /// The expressions you want to join both tables on.
2260    ///
2261    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2262    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2263        let on = on.as_ref().to_vec();
2264        self.left_on.clone_from(&on);
2265        self.right_on = on;
2266        self
2267    }
2268
2269    /// The expressions you want to join the left table on.
2270    ///
2271    /// The passed expressions must be valid in the left table.
2272    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2273        self.left_on = on.as_ref().to_vec();
2274        self
2275    }
2276
2277    /// The expressions you want to join the right table on.
2278    ///
2279    /// The passed expressions must be valid in the right table.
2280    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2281        self.right_on = on.as_ref().to_vec();
2282        self
2283    }
2284
2285    /// Allow parallel table evaluation.
2286    pub fn allow_parallel(mut self, allow: bool) -> Self {
2287        self.allow_parallel = allow;
2288        self
2289    }
2290
2291    /// Force parallel table evaluation.
2292    pub fn force_parallel(mut self, force: bool) -> Self {
2293        self.force_parallel = force;
2294        self
2295    }
2296
2297    /// Join on null values. By default null values will never produce matches.
2298    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2299        self.nulls_equal = nulls_equal;
2300        self
2301    }
2302
2303    /// Suffix to add duplicate column names in join.
2304    /// Defaults to `"_right"` if this method is never called.
2305    pub fn suffix<S>(mut self, suffix: S) -> Self
2306    where
2307        S: Into<PlSmallStr>,
2308    {
2309        self.suffix = Some(suffix.into());
2310        self
2311    }
2312
2313    /// Whether to coalesce join columns.
2314    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2315        self.coalesce = coalesce;
2316        self
2317    }
2318
2319    /// Whether to preserve the row order.
2320    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2321        self.maintain_order = maintain_order;
2322        self
2323    }
2324
2325    /// Finish builder
2326    pub fn finish(self) -> LazyFrame {
2327        let opt_state = self.lf.opt_state;
2328        let other = self.other.expect("'with' not set in join builder");
2329
2330        let args = JoinArgs {
2331            how: self.how,
2332            validation: self.validation,
2333            suffix: self.suffix,
2334            slice: None,
2335            nulls_equal: self.nulls_equal,
2336            coalesce: self.coalesce,
2337            maintain_order: self.maintain_order,
2338        };
2339
2340        let lp = self
2341            .lf
2342            .get_plan_builder()
2343            .join(
2344                other.logical_plan,
2345                self.left_on,
2346                self.right_on,
2347                JoinOptions {
2348                    allow_parallel: self.allow_parallel,
2349                    force_parallel: self.force_parallel,
2350                    args,
2351                }
2352                .into(),
2353            )
2354            .build();
2355        LazyFrame::from_logical_plan(lp, opt_state)
2356    }
2357
2358    // Finish with join predicates
2359    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2360        let opt_state = self.lf.opt_state;
2361        let other = self.other.expect("with not set");
2362
2363        // Decompose `And` conjunctions into their component expressions
2364        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2365            if let Expr::BinaryExpr {
2366                op: Operator::And,
2367                left,
2368                right,
2369            } = predicate
2370            {
2371                decompose_and((*left).clone(), expanded_predicates);
2372                decompose_and((*right).clone(), expanded_predicates);
2373            } else {
2374                expanded_predicates.push(predicate);
2375            }
2376        }
2377        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2378        for predicate in predicates {
2379            decompose_and(predicate, &mut expanded_predicates);
2380        }
2381        let predicates: Vec<Expr> = expanded_predicates;
2382
2383        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2384        #[cfg(feature = "is_between")]
2385        let predicates: Vec<Expr> = {
2386            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2387            for predicate in predicates {
2388                if let Expr::Function {
2389                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2390                    input,
2391                    ..
2392                } = &predicate
2393                {
2394                    if let [expr, lower, upper] = input.as_slice() {
2395                        match closed {
2396                            ClosedInterval::Both => {
2397                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2398                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2399                            },
2400                            ClosedInterval::Right => {
2401                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2402                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2403                            },
2404                            ClosedInterval::Left => {
2405                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2406                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2407                            },
2408                            ClosedInterval::None => {
2409                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2410                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2411                            },
2412                        }
2413                        continue;
2414                    }
2415                }
2416                expanded_predicates.push(predicate);
2417            }
2418            expanded_predicates
2419        };
2420
2421        let args = JoinArgs {
2422            how: self.how,
2423            validation: self.validation,
2424            suffix: self.suffix,
2425            slice: None,
2426            nulls_equal: self.nulls_equal,
2427            coalesce: self.coalesce,
2428            maintain_order: self.maintain_order,
2429        };
2430        let options = JoinOptions {
2431            allow_parallel: self.allow_parallel,
2432            force_parallel: self.force_parallel,
2433            args,
2434        };
2435
2436        let lp = DslPlan::Join {
2437            input_left: Arc::new(self.lf.logical_plan),
2438            input_right: Arc::new(other.logical_plan),
2439            left_on: Default::default(),
2440            right_on: Default::default(),
2441            predicates,
2442            options: Arc::from(options),
2443        };
2444
2445        LazyFrame::from_logical_plan(lp, opt_state)
2446    }
2447}
2448
2449pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2450    #[cfg(not(feature = "new_streaming"))]
2451    {
2452        None
2453    }
2454    #[cfg(feature = "new_streaming")]
2455    {
2456        Some(polars_stream::build_streaming_query_executor)
2457    }
2458};
2459
2460pub struct CollectBatches {
2461    recv: Receiver<PolarsResult<DataFrame>>,
2462    runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2463}
2464
2465impl CollectBatches {
2466    /// Start running the query, if not already.
2467    pub fn start(&mut self) {
2468        if let Some(runner) = self.runner.take() {
2469            runner()
2470        }
2471    }
2472}
2473
2474impl Iterator for CollectBatches {
2475    type Item = PolarsResult<DataFrame>;
2476
2477    fn next(&mut self) -> Option<Self::Item> {
2478        self.start();
2479        self.recv.recv().ok()
2480    }
2481}