polars_lazy/frame/
mod.rs

1//! Lazy variant of a [DataFrame].
2#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::path::PathBuf;
13use std::sync::{Arc, Mutex};
14
15pub use anonymous_scan::*;
16#[cfg(feature = "csv")]
17pub use csv::*;
18#[cfg(not(target_arch = "wasm32"))]
19pub use exitable::*;
20pub use file_list_reader::*;
21#[cfg(feature = "ipc")]
22pub use ipc::*;
23#[cfg(feature = "json")]
24pub use ndjson::*;
25#[cfg(feature = "parquet")]
26pub use parquet::*;
27use polars_compute::rolling::QuantileMethod;
28use polars_core::POOL;
29#[cfg(all(feature = "new_streaming", feature = "dtype-categorical"))]
30use polars_core::StringCacheHolder;
31use polars_core::error::feature_gated;
32use polars_core::prelude::*;
33use polars_expr::{ExpressionConversionState, create_physical_expr};
34use polars_io::RowIndex;
35use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
36use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
37#[cfg(feature = "is_between")]
38use polars_ops::prelude::ClosedInterval;
39pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
40use polars_plan::global::FETCH_ROWS;
41use polars_utils::pl_str::PlSmallStr;
42use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
43
44use crate::frame::cached_arenas::CachedArena;
45#[cfg(feature = "streaming")]
46use crate::physical_plan::streaming::insert_streaming_nodes;
47use crate::prelude::*;
48
49pub trait IntoLazy {
50    fn lazy(self) -> LazyFrame;
51}
52
53impl IntoLazy for DataFrame {
54    /// Convert the `DataFrame` into a `LazyFrame`
55    fn lazy(self) -> LazyFrame {
56        let lp = DslBuilder::from_existing_df(self).build();
57        LazyFrame {
58            logical_plan: lp,
59            opt_state: Default::default(),
60            cached_arena: Default::default(),
61        }
62    }
63}
64
65impl IntoLazy for LazyFrame {
66    fn lazy(self) -> LazyFrame {
67        self
68    }
69}
70
71/// Lazy abstraction over an eager `DataFrame`.
72///
73/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
74/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
75#[derive(Clone, Default)]
76#[must_use]
77pub struct LazyFrame {
78    pub logical_plan: DslPlan,
79    pub(crate) opt_state: OptFlags,
80    pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
81}
82
83impl From<DslPlan> for LazyFrame {
84    fn from(plan: DslPlan) -> Self {
85        Self {
86            logical_plan: plan,
87            opt_state: OptFlags::default(),
88            cached_arena: Default::default(),
89        }
90    }
91}
92
93impl LazyFrame {
94    pub(crate) fn from_inner(
95        logical_plan: DslPlan,
96        opt_state: OptFlags,
97        cached_arena: Arc<Mutex<Option<CachedArena>>>,
98    ) -> Self {
99        Self {
100            logical_plan,
101            opt_state,
102            cached_arena,
103        }
104    }
105
106    pub(crate) fn get_plan_builder(self) -> DslBuilder {
107        DslBuilder::from(self.logical_plan)
108    }
109
110    fn get_opt_state(&self) -> OptFlags {
111        self.opt_state
112    }
113
114    fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
115        LazyFrame {
116            logical_plan,
117            opt_state,
118            cached_arena: Default::default(),
119        }
120    }
121
122    /// Get current optimizations.
123    pub fn get_current_optimizations(&self) -> OptFlags {
124        self.opt_state
125    }
126
127    /// Set allowed optimizations.
128    pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
129        self.opt_state = opt_state;
130        self
131    }
132
133    /// Turn off all optimizations.
134    pub fn without_optimizations(self) -> Self {
135        self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
136    }
137
138    /// Toggle projection pushdown optimization.
139    pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
140        self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
141        self
142    }
143
144    /// Toggle cluster with columns optimization.
145    pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
146        self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
147        self
148    }
149
150    /// Toggle collapse joins optimization.
151    pub fn with_collapse_joins(mut self, toggle: bool) -> Self {
152        self.opt_state.set(OptFlags::COLLAPSE_JOINS, toggle);
153        self
154    }
155
156    /// Check if operations are order dependent and unset maintaining_order if
157    /// the order would not be observed.
158    pub fn with_check_order(mut self, toggle: bool) -> Self {
159        self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
160        self
161    }
162
163    /// Toggle predicate pushdown optimization.
164    pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
165        self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
166        self
167    }
168
169    /// Toggle type coercion optimization.
170    pub fn with_type_coercion(mut self, toggle: bool) -> Self {
171        self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
172        self
173    }
174
175    /// Toggle type check optimization.
176    pub fn with_type_check(mut self, toggle: bool) -> Self {
177        self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
178        self
179    }
180
181    /// Toggle expression simplification optimization on or off.
182    pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
183        self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
184        self
185    }
186
187    /// Toggle common subplan elimination optimization on or off
188    #[cfg(feature = "cse")]
189    pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
190        self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
191        self
192    }
193
194    /// Toggle common subexpression elimination optimization on or off
195    #[cfg(feature = "cse")]
196    pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
197        self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
198        self
199    }
200
201    /// Toggle slice pushdown optimization.
202    pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
203        self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
204        self
205    }
206
207    /// Run nodes that are capably of doing so on the streaming engine.
208    #[cfg(feature = "streaming")]
209    pub fn with_streaming(mut self, toggle: bool) -> Self {
210        self.opt_state.set(OptFlags::STREAMING, toggle);
211        self
212    }
213
214    #[cfg(feature = "new_streaming")]
215    pub fn with_new_streaming(mut self, toggle: bool) -> Self {
216        self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
217        self
218    }
219
220    /// Try to estimate the number of rows so that joins can determine which side to keep in memory.
221    pub fn with_row_estimate(mut self, toggle: bool) -> Self {
222        self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
223        self
224    }
225
226    /// Run every node eagerly. This turns off multi-node optimizations.
227    pub fn _with_eager(mut self, toggle: bool) -> Self {
228        self.opt_state.set(OptFlags::EAGER, toggle);
229        self
230    }
231
232    /// Return a String describing the naive (un-optimized) logical plan.
233    pub fn describe_plan(&self) -> PolarsResult<String> {
234        Ok(self.clone().to_alp()?.describe())
235    }
236
237    /// Return a String describing the naive (un-optimized) logical plan in tree format.
238    pub fn describe_plan_tree(&self) -> PolarsResult<String> {
239        Ok(self.clone().to_alp()?.describe_tree_format())
240    }
241
242    // @NOTE: this is used because we want to set the `enable_fmt` flag of `optimize_with_scratch`
243    // to `true` for describe.
244    fn _describe_to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
245        let (mut lp_arena, mut expr_arena) = self.get_arenas();
246        let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], true)?;
247
248        Ok(IRPlan::new(node, lp_arena, expr_arena))
249    }
250
251    /// Return a String describing the optimized logical plan.
252    ///
253    /// Returns `Err` if optimizing the logical plan fails.
254    pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
255        Ok(self.clone()._describe_to_alp_optimized()?.describe())
256    }
257
258    /// Return a String describing the optimized logical plan in tree format.
259    ///
260    /// Returns `Err` if optimizing the logical plan fails.
261    pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
262        Ok(self
263            .clone()
264            ._describe_to_alp_optimized()?
265            .describe_tree_format())
266    }
267
268    /// Return a String describing the logical plan.
269    ///
270    /// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
271    /// explains the naive, un-optimized plan.
272    pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
273        if optimized {
274            self.describe_optimized_plan()
275        } else {
276            self.describe_plan()
277        }
278    }
279
280    /// Add a sort operation to the logical plan.
281    ///
282    /// Sorts the LazyFrame by the column name specified using the provided options.
283    ///
284    /// # Example
285    ///
286    /// Sort DataFrame by 'sepal_width' column:
287    /// ```rust
288    /// # use polars_core::prelude::*;
289    /// # use polars_lazy::prelude::*;
290    /// fn sort_by_a(df: DataFrame) -> LazyFrame {
291    ///     df.lazy().sort(["sepal_width"], Default::default())
292    /// }
293    /// ```
294    /// Sort by a single column with specific order:
295    /// ```
296    /// # use polars_core::prelude::*;
297    /// # use polars_lazy::prelude::*;
298    /// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
299    ///     df.lazy().sort(
300    ///         ["sepal_width"],
301    ///         SortMultipleOptions::new()
302    ///             .with_order_descending(descending)
303    ///     )
304    /// }
305    /// ```
306    /// Sort by multiple columns with specifying order for each column:
307    /// ```
308    /// # use polars_core::prelude::*;
309    /// # use polars_lazy::prelude::*;
310    /// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
311    ///     df.lazy().sort(
312    ///         ["sepal_width", "sepal_length"],
313    ///         SortMultipleOptions::new()
314    ///             .with_order_descending_multi([false, true])
315    ///     )
316    /// }
317    /// ```
318    /// See [`SortMultipleOptions`] for more options.
319    pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
320        let opt_state = self.get_opt_state();
321        let lp = self
322            .get_plan_builder()
323            .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
324            .build();
325        Self::from_logical_plan(lp, opt_state)
326    }
327
328    /// Add a sort operation to the logical plan.
329    ///
330    /// Sorts the LazyFrame by the provided list of expressions, which will be turned into
331    /// concrete columns before sorting.
332    ///
333    /// See [`SortMultipleOptions`] for more options.
334    ///
335    /// # Example
336    ///
337    /// ```rust
338    /// use polars_core::prelude::*;
339    /// use polars_lazy::prelude::*;
340    ///
341    /// /// Sort DataFrame by 'sepal_width' column
342    /// fn example(df: DataFrame) -> LazyFrame {
343    ///       df.lazy()
344    ///         .sort_by_exprs(vec![col("sepal_width")], Default::default())
345    /// }
346    /// ```
347    pub fn sort_by_exprs<E: AsRef<[Expr]>>(
348        self,
349        by_exprs: E,
350        sort_options: SortMultipleOptions,
351    ) -> Self {
352        let by_exprs = by_exprs.as_ref().to_vec();
353        if by_exprs.is_empty() {
354            self
355        } else {
356            let opt_state = self.get_opt_state();
357            let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
358            Self::from_logical_plan(lp, opt_state)
359        }
360    }
361
362    pub fn top_k<E: AsRef<[Expr]>>(
363        self,
364        k: IdxSize,
365        by_exprs: E,
366        sort_options: SortMultipleOptions,
367    ) -> Self {
368        // this will optimize to top-k
369        self.sort_by_exprs(
370            by_exprs,
371            sort_options.with_order_reversed().with_nulls_last(true),
372        )
373        .slice(0, k)
374    }
375
376    pub fn bottom_k<E: AsRef<[Expr]>>(
377        self,
378        k: IdxSize,
379        by_exprs: E,
380        sort_options: SortMultipleOptions,
381    ) -> Self {
382        // this will optimize to bottom-k
383        self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
384            .slice(0, k)
385    }
386
387    /// Reverse the `DataFrame` from top to bottom.
388    ///
389    /// Row `i` becomes row `number_of_rows - i - 1`.
390    ///
391    /// # Example
392    ///
393    /// ```rust
394    /// use polars_core::prelude::*;
395    /// use polars_lazy::prelude::*;
396    ///
397    /// fn example(df: DataFrame) -> LazyFrame {
398    ///       df.lazy()
399    ///         .reverse()
400    /// }
401    /// ```
402    pub fn reverse(self) -> Self {
403        self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
404    }
405
406    /// Rename columns in the DataFrame.
407    ///
408    /// `existing` and `new` are iterables of the same length containing the old and
409    /// corresponding new column names. Renaming happens to all `existing` columns
410    /// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
411    /// must be present in the `LazyFrame` when `rename` is called; otherwise, only
412    /// those columns that are actually found will be renamed (others will be ignored).
413    pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
414    where
415        I: IntoIterator<Item = T>,
416        J: IntoIterator<Item = S>,
417        T: AsRef<str>,
418        S: AsRef<str>,
419    {
420        let iter = existing.into_iter();
421        let cap = iter.size_hint().0;
422        let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
423        let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
424
425        // TODO! should this error if `existing` and `new` have different lengths?
426        // Currently, the longer of the two is truncated.
427        for (existing, new) in iter.zip(new) {
428            let existing = existing.as_ref();
429            let new = new.as_ref();
430            if new != existing {
431                existing_vec.push(existing.into());
432                new_vec.push(new.into());
433            }
434        }
435
436        self.map_private(DslFunction::Rename {
437            existing: existing_vec.into(),
438            new: new_vec.into(),
439            strict,
440        })
441    }
442
443    /// Removes columns from the DataFrame.
444    /// Note that it's better to only select the columns you need
445    /// and let the projection pushdown optimize away the unneeded columns.
446    ///
447    /// If `strict` is `true`, then any given columns that are not in the schema will
448    /// give a [`PolarsError::ColumnNotFound`] error while materializing the [`LazyFrame`].
449    fn _drop<I, T>(self, columns: I, strict: bool) -> Self
450    where
451        I: IntoIterator<Item = T>,
452        T: Into<Selector>,
453    {
454        let to_drop = columns.into_iter().map(|c| c.into()).collect();
455
456        let opt_state = self.get_opt_state();
457        let lp = self.get_plan_builder().drop(to_drop, strict).build();
458        Self::from_logical_plan(lp, opt_state)
459    }
460
461    /// Removes columns from the DataFrame.
462    /// Note that it's better to only select the columns you need
463    /// and let the projection pushdown optimize away the unneeded columns.
464    ///
465    /// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
466    /// error while materializing the [`LazyFrame`].
467    pub fn drop<I, T>(self, columns: I) -> Self
468    where
469        I: IntoIterator<Item = T>,
470        T: Into<Selector>,
471    {
472        self._drop(columns, true)
473    }
474
475    /// Removes columns from the DataFrame.
476    /// Note that it's better to only select the columns you need
477    /// and let the projection pushdown optimize away the unneeded columns.
478    ///
479    /// If a column name does not exist in the schema, it will quietly be ignored.
480    pub fn drop_no_validate<I, T>(self, columns: I) -> Self
481    where
482        I: IntoIterator<Item = T>,
483        T: Into<Selector>,
484    {
485        self._drop(columns, false)
486    }
487
488    /// Shift the values by a given period and fill the parts that will be empty due to this operation
489    /// with `Nones`.
490    ///
491    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
492    pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
493        self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
494    }
495
496    /// Shift the values by a given period and fill the parts that will be empty due to this operation
497    /// with the result of the `fill_value` expression.
498    ///
499    /// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
500    pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
501        self.select(vec![
502            col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
503        ])
504    }
505
506    /// Fill None values in the DataFrame with an expression.
507    pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
508        let opt_state = self.get_opt_state();
509        let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
510        Self::from_logical_plan(lp, opt_state)
511    }
512
513    /// Fill NaN values in the DataFrame with an expression.
514    pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
515        let opt_state = self.get_opt_state();
516        let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
517        Self::from_logical_plan(lp, opt_state)
518    }
519
520    /// Caches the result into a new LazyFrame.
521    ///
522    /// This should be used to prevent computations running multiple times.
523    pub fn cache(self) -> Self {
524        let opt_state = self.get_opt_state();
525        let lp = self.get_plan_builder().cache().build();
526        Self::from_logical_plan(lp, opt_state)
527    }
528
529    /// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
530    pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
531        let cast_cols: Vec<Expr> = dtypes
532            .into_iter()
533            .map(|(name, dt)| {
534                let name = PlSmallStr::from_str(name);
535
536                if strict {
537                    col(name).strict_cast(dt)
538                } else {
539                    col(name).cast(dt)
540                }
541            })
542            .collect();
543
544        if cast_cols.is_empty() {
545            self.clone()
546        } else {
547            self.with_columns(cast_cols)
548        }
549    }
550
551    /// Cast all frame columns to the given dtype, resulting in a new LazyFrame
552    pub fn cast_all(self, dtype: DataType, strict: bool) -> Self {
553        self.with_columns(vec![if strict {
554            col(PlSmallStr::from_static("*")).strict_cast(dtype)
555        } else {
556            col(PlSmallStr::from_static("*")).cast(dtype)
557        }])
558    }
559
560    /// Fetch is like a collect operation, but it overwrites the number of rows read by every scan
561    /// operation. This is a utility that helps debug a query on a smaller number of rows.
562    ///
563    /// Note that the fetch does not guarantee the final number of rows in the DataFrame.
564    /// Filter, join operations and a lower number of rows available in the scanned file influence
565    /// the final number of rows.
566    pub fn fetch(self, n_rows: usize) -> PolarsResult<DataFrame> {
567        FETCH_ROWS.with(|fetch_rows| fetch_rows.set(Some(n_rows)));
568        let res = self.collect();
569        FETCH_ROWS.with(|fetch_rows| fetch_rows.set(None));
570        res
571    }
572
573    pub fn optimize(
574        self,
575        lp_arena: &mut Arena<IR>,
576        expr_arena: &mut Arena<AExpr>,
577    ) -> PolarsResult<Node> {
578        self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
579    }
580
581    pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
582        let (mut lp_arena, mut expr_arena) = self.get_arenas();
583        let node =
584            self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;
585
586        Ok(IRPlan::new(node, lp_arena, expr_arena))
587    }
588
589    pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
590        let (mut lp_arena, mut expr_arena) = self.get_arenas();
591        let node = to_alp(
592            self.logical_plan,
593            &mut expr_arena,
594            &mut lp_arena,
595            &mut self.opt_state,
596        )?;
597        let plan = IRPlan::new(node, lp_arena, expr_arena);
598        Ok(plan)
599    }
600
601    pub(crate) fn optimize_with_scratch(
602        self,
603        lp_arena: &mut Arena<IR>,
604        expr_arena: &mut Arena<AExpr>,
605        scratch: &mut Vec<Node>,
606        enable_fmt: bool,
607    ) -> PolarsResult<Node> {
608        #[allow(unused_mut)]
609        let mut opt_state = self.opt_state;
610        let streaming = self.opt_state.contains(OptFlags::STREAMING);
611        let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
612        #[cfg(feature = "cse")]
613        if streaming && !new_streaming {
614            opt_state &= !OptFlags::COMM_SUBPLAN_ELIM;
615        }
616
617        #[cfg(feature = "cse")]
618        if new_streaming {
619            // The new streaming engine can't deal with the way the common
620            // subexpression elimination adds length-incorrect with_columns.
621            opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
622        }
623
624        let lp_top = optimize(
625            self.logical_plan,
626            opt_state,
627            lp_arena,
628            expr_arena,
629            scratch,
630            Some(&|expr, expr_arena, schema| {
631                let phys_expr = create_physical_expr(
632                    expr,
633                    Context::Default,
634                    expr_arena,
635                    schema,
636                    &mut ExpressionConversionState::new(true),
637                )
638                .ok()?;
639                let io_expr = phys_expr_to_io_expr(phys_expr);
640                Some(io_expr)
641            }),
642        )?;
643
644        if streaming {
645            #[cfg(feature = "streaming")]
646            {
647                insert_streaming_nodes(
648                    lp_top,
649                    lp_arena,
650                    expr_arena,
651                    scratch,
652                    enable_fmt,
653                    true,
654                    opt_state.contains(OptFlags::ROW_ESTIMATE),
655                )?;
656            }
657            #[cfg(not(feature = "streaming"))]
658            {
659                _ = enable_fmt;
660                panic!("activate feature 'streaming'")
661            }
662        }
663
664        Ok(lp_top)
665    }
666
667    fn prepare_collect_post_opt<P>(
668        mut self,
669        check_sink: bool,
670        query_start: Option<std::time::Instant>,
671        post_opt: P,
672    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
673    where
674        P: FnOnce(
675            Node,
676            &mut Arena<IR>,
677            &mut Arena<AExpr>,
678            Option<std::time::Duration>,
679        ) -> PolarsResult<()>,
680    {
681        let (mut lp_arena, mut expr_arena) = self.get_arenas();
682
683        let mut scratch = vec![];
684        let lp_top =
685            self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;
686
687        post_opt(
688            lp_top,
689            &mut lp_arena,
690            &mut expr_arena,
691            // Post optimization callback gets the time since the
692            // query was started as its "base" timepoint.
693            query_start.map(|s| s.elapsed()),
694        )?;
695
696        // sink should be replaced
697        let no_file_sink = if check_sink {
698            !matches!(
699                lp_arena.get(lp_top),
700                IR::Sink {
701                    payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
702                    ..
703                }
704            )
705        } else {
706            true
707        };
708        let physical_plan = create_physical_plan(
709            lp_top,
710            &mut lp_arena,
711            &mut expr_arena,
712            BUILD_STREAMING_EXECUTOR,
713        )?;
714
715        let state = ExecutionState::new();
716        Ok((state, physical_plan, no_file_sink))
717    }
718
719    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
720    pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
721    where
722        P: FnOnce(
723            Node,
724            &mut Arena<IR>,
725            &mut Arena<AExpr>,
726            Option<std::time::Duration>,
727        ) -> PolarsResult<()>,
728    {
729        let (mut state, mut physical_plan, _) =
730            self.prepare_collect_post_opt(false, None, post_opt)?;
731        physical_plan.execute(&mut state)
732    }
733
734    #[allow(unused_mut)]
735    fn prepare_collect(
736        self,
737        check_sink: bool,
738        query_start: Option<std::time::Instant>,
739    ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
740        self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
741    }
742
743    /// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
744    /// `engine`.
745    ///
746    /// The query is optimized prior to execution.
747    pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
748        let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
749            payload.clone()
750        } else {
751            self.logical_plan = DslPlan::Sink {
752                input: Arc::new(self.logical_plan),
753                payload: SinkType::Memory,
754            };
755            SinkType::Memory
756        };
757
758        // Default engine for collect is InMemory, sink_* is Streaming
759        if engine == Engine::Auto {
760            engine = match payload {
761                #[cfg(feature = "new_streaming")]
762                SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
763                _ => Engine::InMemory,
764            };
765        }
766        // Gpu uses some hacks to dispatch.
767        if engine == Engine::Gpu {
768            engine = Engine::InMemory;
769        }
770
771        #[cfg(feature = "new_streaming")]
772        {
773            if let Some(result) = self.try_new_streaming_if_requested() {
774                return result.map(|v| v.unwrap_single());
775            }
776        }
777
778        match engine {
779            Engine::Auto => unreachable!(),
780            Engine::Streaming => {
781                feature_gated!("new_streaming", self = self.with_new_streaming(true))
782            },
783            Engine::OldStreaming => feature_gated!("streaming", self = self.with_streaming(true)),
784            _ => {},
785        }
786        let mut alp_plan = self.clone().to_alp_optimized()?;
787
788        match engine {
789            Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
790                #[cfg(feature = "dtype-categorical")]
791                let string_cache_hold = StringCacheHolder::hold();
792                let result = polars_stream::run_query(
793                    alp_plan.lp_top,
794                    &mut alp_plan.lp_arena,
795                    &mut alp_plan.expr_arena,
796                );
797                #[cfg(feature = "dtype-categorical")]
798                drop(string_cache_hold);
799                result.map(|v| v.unwrap_single())
800            }),
801            _ if matches!(payload, SinkType::Partition { .. }) => Err(polars_err!(
802                InvalidOperation: "partition sinks are not supported on for the '{}' engine",
803                engine.into_static_str()
804            )),
805            Engine::Gpu => {
806                Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
807            },
808            Engine::InMemory => {
809                let mut physical_plan = create_physical_plan(
810                    alp_plan.lp_top,
811                    &mut alp_plan.lp_arena,
812                    &mut alp_plan.expr_arena,
813                    BUILD_STREAMING_EXECUTOR,
814                )?;
815                let mut state = ExecutionState::new();
816                physical_plan.execute(&mut state)
817            },
818            Engine::OldStreaming => {
819                self.opt_state |= OptFlags::STREAMING;
820                let (mut state, mut physical_plan, is_streaming) =
821                    self.prepare_collect(true, None)?;
822                polars_ensure!(
823                    is_streaming,
824                    ComputeError: format!("cannot run the whole query in a streaming order")
825                );
826                physical_plan.execute(&mut state)
827            },
828        }
829    }
830
831    pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
832        let sink_multiple = LazyFrame {
833            logical_plan: DslPlan::SinkMultiple { inputs: plans },
834            opt_state,
835            cached_arena: Default::default(),
836        };
837        sink_multiple.explain(true)
838    }
839
840    pub fn collect_all_with_engine(
841        plans: Vec<DslPlan>,
842        mut engine: Engine,
843        opt_state: OptFlags,
844    ) -> PolarsResult<Vec<DataFrame>> {
845        if plans.is_empty() {
846            return Ok(Vec::new());
847        }
848
849        // Default engine for collect_all is InMemory
850        if engine == Engine::Auto {
851            engine = Engine::InMemory;
852        }
853        // Gpu uses some hacks to dispatch.
854        if engine == Engine::Gpu {
855            engine = Engine::InMemory;
856        }
857
858        let mut sink_multiple = LazyFrame {
859            logical_plan: DslPlan::SinkMultiple { inputs: plans },
860            opt_state,
861            cached_arena: Default::default(),
862        };
863
864        #[cfg(feature = "new_streaming")]
865        {
866            if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
867                return result.map(|v| v.unwrap_multiple());
868            }
869        }
870
871        match engine {
872            Engine::Auto => unreachable!(),
873            Engine::Streaming => {
874                feature_gated!(
875                    "new_streaming",
876                    sink_multiple = sink_multiple.with_new_streaming(true)
877                )
878            },
879            Engine::OldStreaming => feature_gated!(
880                "streaming",
881                sink_multiple = sink_multiple.with_streaming(true)
882            ),
883            _ => {},
884        }
885        let mut alp_plan = sink_multiple.to_alp_optimized()?;
886
887        if engine == Engine::Streaming {
888            feature_gated!("new_streaming", {
889                #[cfg(feature = "dtype-categorical")]
890                let string_cache_hold = StringCacheHolder::hold();
891                let result = polars_stream::run_query(
892                    alp_plan.lp_top,
893                    &mut alp_plan.lp_arena,
894                    &mut alp_plan.expr_arena,
895                );
896                #[cfg(feature = "dtype-categorical")]
897                drop(string_cache_hold);
898                return result.map(|v| v.unwrap_multiple());
899            });
900        }
901
902        let IR::SinkMultiple { inputs } = alp_plan.root() else {
903            unreachable!()
904        };
905
906        let mut multiplan = create_multiple_physical_plans(
907            inputs.clone().as_slice(),
908            &mut alp_plan.lp_arena,
909            &mut alp_plan.expr_arena,
910            BUILD_STREAMING_EXECUTOR,
911        )?;
912
913        match engine {
914            Engine::Gpu => polars_bail!(
915                InvalidOperation: "collect_all is not supported for the gpu engine"
916            ),
917            Engine::InMemory => {
918                // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
919                // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
920                // within bounds
921                let mut state = ExecutionState::new();
922                if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
923                    cache_prefiller.execute(&mut state)?;
924                }
925                let out = POOL.install(|| {
926                    multiplan
927                        .physical_plans
928                        .chunks_mut(POOL.current_num_threads() * 3)
929                        .map(|chunk| {
930                            chunk
931                                .into_par_iter()
932                                .enumerate()
933                                .map(|(idx, input)| {
934                                    let mut input = std::mem::take(input);
935                                    let mut state = state.split();
936                                    state.branch_idx += idx;
937
938                                    let df = input.execute(&mut state)?;
939                                    Ok(df)
940                                })
941                                .collect::<PolarsResult<Vec<_>>>()
942                        })
943                        .collect::<PolarsResult<Vec<_>>>()
944                });
945                Ok(out?.into_iter().flatten().collect())
946            },
947            Engine::OldStreaming => panic!("This is no longer supported"),
948            _ => unreachable!(),
949        }
950    }
951
952    /// Execute all the lazy operations and collect them into a [`DataFrame`].
953    ///
954    /// The query is optimized prior to execution.
955    ///
956    /// # Example
957    ///
958    /// ```rust
959    /// use polars_core::prelude::*;
960    /// use polars_lazy::prelude::*;
961    ///
962    /// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
963    ///     df.lazy()
964    ///       .group_by([col("foo")])
965    ///       .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
966    ///       .collect()
967    /// }
968    /// ```
969    pub fn collect(self) -> PolarsResult<DataFrame> {
970        self.collect_with_engine(Engine::InMemory)
971    }
972
973    // post_opt: A function that is called after optimization. This can be used to modify the IR jit.
974    // This version does profiling of the node execution.
975    pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
976    where
977        P: FnOnce(
978            Node,
979            &mut Arena<IR>,
980            &mut Arena<AExpr>,
981            Option<std::time::Duration>,
982        ) -> PolarsResult<()>,
983    {
984        let query_start = std::time::Instant::now();
985        let (mut state, mut physical_plan, _) =
986            self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
987        state.time_nodes(query_start);
988        let out = physical_plan.execute(&mut state)?;
989        let timer_df = state.finish_timer()?;
990        Ok((out, timer_df))
991    }
992
993    /// Profile a LazyFrame.
994    ///
995    /// This will run the query and return a tuple
996    /// containing the materialized DataFrame and a DataFrame that contains profiling information
997    /// of each node that is executed.
998    ///
999    /// The units of the timings are microseconds.
1000    pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
1001        self._profile_post_opt(|_, _, _, _| Ok(()))
1002    }
1003
1004    /// Stream a query result into a parquet file. This is useful if the final result doesn't fit
1005    /// into memory. This methods will return an error if the query cannot be completely done in a
1006    /// streaming fashion.
1007    #[cfg(feature = "parquet")]
1008    pub fn sink_parquet(
1009        self,
1010        target: SinkTarget,
1011        options: ParquetWriteOptions,
1012        cloud_options: Option<polars_io::cloud::CloudOptions>,
1013        sink_options: SinkOptions,
1014    ) -> PolarsResult<Self> {
1015        self.sink(SinkType::File(FileSinkType {
1016            target,
1017            sink_options,
1018            file_type: FileType::Parquet(options),
1019            cloud_options,
1020        }))
1021    }
1022
1023    /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
1024    /// into memory. This methods will return an error if the query cannot be completely done in a
1025    /// streaming fashion.
1026    #[cfg(feature = "ipc")]
1027    pub fn sink_ipc(
1028        self,
1029        target: SinkTarget,
1030        options: IpcWriterOptions,
1031        cloud_options: Option<polars_io::cloud::CloudOptions>,
1032        sink_options: SinkOptions,
1033    ) -> PolarsResult<Self> {
1034        self.sink(SinkType::File(FileSinkType {
1035            target,
1036            sink_options,
1037            file_type: FileType::Ipc(options),
1038            cloud_options,
1039        }))
1040    }
1041
1042    /// Stream a query result into an csv file. This is useful if the final result doesn't fit
1043    /// into memory. This methods will return an error if the query cannot be completely done in a
1044    /// streaming fashion.
1045    #[cfg(feature = "csv")]
1046    pub fn sink_csv(
1047        self,
1048        target: SinkTarget,
1049        options: CsvWriterOptions,
1050        cloud_options: Option<polars_io::cloud::CloudOptions>,
1051        sink_options: SinkOptions,
1052    ) -> PolarsResult<Self> {
1053        self.sink(SinkType::File(FileSinkType {
1054            target,
1055            sink_options,
1056            file_type: FileType::Csv(options),
1057            cloud_options,
1058        }))
1059    }
1060
1061    /// Stream a query result into a JSON file. This is useful if the final result doesn't fit
1062    /// into memory. This methods will return an error if the query cannot be completely done in a
1063    /// streaming fashion.
1064    #[cfg(feature = "json")]
1065    pub fn sink_json(
1066        self,
1067        target: SinkTarget,
1068        options: JsonWriterOptions,
1069        cloud_options: Option<polars_io::cloud::CloudOptions>,
1070        sink_options: SinkOptions,
1071    ) -> PolarsResult<Self> {
1072        self.sink(SinkType::File(FileSinkType {
1073            target,
1074            sink_options,
1075            file_type: FileType::Json(options),
1076            cloud_options,
1077        }))
1078    }
1079
1080    /// Stream a query result into a parquet file in a partitioned manner. This is useful if the
1081    /// final result doesn't fit into memory. This methods will return an error if the query cannot
1082    /// be completely done in a streaming fashion.
1083    #[cfg(feature = "parquet")]
1084    #[allow(clippy::too_many_arguments)]
1085    pub fn sink_parquet_partitioned(
1086        self,
1087        base_path: Arc<PathBuf>,
1088        file_path_cb: Option<PartitionTargetCallback>,
1089        variant: PartitionVariant,
1090        options: ParquetWriteOptions,
1091        cloud_options: Option<polars_io::cloud::CloudOptions>,
1092        sink_options: SinkOptions,
1093        per_partition_sort_by: Option<Vec<SortColumn>>,
1094        finish_callback: Option<SinkFinishCallback>,
1095    ) -> PolarsResult<Self> {
1096        self.sink(SinkType::Partition(PartitionSinkType {
1097            base_path,
1098            file_path_cb,
1099            sink_options,
1100            variant,
1101            file_type: FileType::Parquet(options),
1102            cloud_options,
1103            per_partition_sort_by,
1104            finish_callback,
1105        }))
1106    }
1107
1108    /// Stream a query result into an ipc/arrow file in a partitioned manner. This is useful if the
1109    /// final result doesn't fit into memory. This methods will return an error if the query cannot
1110    /// be completely done in a streaming fashion.
1111    #[cfg(feature = "ipc")]
1112    #[allow(clippy::too_many_arguments)]
1113    pub fn sink_ipc_partitioned(
1114        self,
1115        base_path: Arc<PathBuf>,
1116        file_path_cb: Option<PartitionTargetCallback>,
1117        variant: PartitionVariant,
1118        options: IpcWriterOptions,
1119        cloud_options: Option<polars_io::cloud::CloudOptions>,
1120        sink_options: SinkOptions,
1121        per_partition_sort_by: Option<Vec<SortColumn>>,
1122        finish_callback: Option<SinkFinishCallback>,
1123    ) -> PolarsResult<Self> {
1124        self.sink(SinkType::Partition(PartitionSinkType {
1125            base_path,
1126            file_path_cb,
1127            sink_options,
1128            variant,
1129            file_type: FileType::Ipc(options),
1130            cloud_options,
1131            per_partition_sort_by,
1132            finish_callback,
1133        }))
1134    }
1135
1136    /// Stream a query result into an csv file in a partitioned manner. This is useful if the final
1137    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1138    /// completely done in a streaming fashion.
1139    #[cfg(feature = "csv")]
1140    #[allow(clippy::too_many_arguments)]
1141    pub fn sink_csv_partitioned(
1142        self,
1143        base_path: Arc<PathBuf>,
1144        file_path_cb: Option<PartitionTargetCallback>,
1145        variant: PartitionVariant,
1146        options: CsvWriterOptions,
1147        cloud_options: Option<polars_io::cloud::CloudOptions>,
1148        sink_options: SinkOptions,
1149        per_partition_sort_by: Option<Vec<SortColumn>>,
1150        finish_callback: Option<SinkFinishCallback>,
1151    ) -> PolarsResult<Self> {
1152        self.sink(SinkType::Partition(PartitionSinkType {
1153            base_path,
1154            file_path_cb,
1155            sink_options,
1156            variant,
1157            file_type: FileType::Csv(options),
1158            cloud_options,
1159            per_partition_sort_by,
1160            finish_callback,
1161        }))
1162    }
1163
1164    /// Stream a query result into a JSON file in a partitioned manner. This is useful if the final
1165    /// result doesn't fit into memory. This methods will return an error if the query cannot be
1166    /// completely done in a streaming fashion.
1167    #[cfg(feature = "json")]
1168    #[allow(clippy::too_many_arguments)]
1169    pub fn sink_json_partitioned(
1170        self,
1171        base_path: Arc<PathBuf>,
1172        file_path_cb: Option<PartitionTargetCallback>,
1173        variant: PartitionVariant,
1174        options: JsonWriterOptions,
1175        cloud_options: Option<polars_io::cloud::CloudOptions>,
1176        sink_options: SinkOptions,
1177        per_partition_sort_by: Option<Vec<SortColumn>>,
1178        finish_callback: Option<SinkFinishCallback>,
1179    ) -> PolarsResult<Self> {
1180        self.sink(SinkType::Partition(PartitionSinkType {
1181            base_path,
1182            file_path_cb,
1183            sink_options,
1184            variant,
1185            file_type: FileType::Json(options),
1186            cloud_options,
1187            per_partition_sort_by,
1188            finish_callback,
1189        }))
1190    }
1191
1192    #[cfg(feature = "new_streaming")]
1193    pub fn try_new_streaming_if_requested(
1194        &mut self,
1195    ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1196        let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1197        let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1198
1199        if auto_new_streaming || force_new_streaming {
1200            // Try to run using the new streaming engine, falling back
1201            // if it fails in a todo!() error if auto_new_streaming is set.
1202            let mut new_stream_lazy = self.clone();
1203            new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1204            new_stream_lazy.opt_state &= !OptFlags::STREAMING;
1205            let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1206                Ok(v) => v,
1207                Err(e) => return Some(Err(e)),
1208            };
1209
1210            #[cfg(feature = "dtype-categorical")]
1211            let _hold = StringCacheHolder::hold();
1212            let f = || {
1213                polars_stream::run_query(
1214                    alp_plan.lp_top,
1215                    &mut alp_plan.lp_arena,
1216                    &mut alp_plan.expr_arena,
1217                )
1218            };
1219
1220            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1221                Ok(v) => return Some(v),
1222                Err(e) => {
1223                    // Fallback to normal engine if error is due to not being implemented
1224                    // and auto_new_streaming is set, otherwise propagate error.
1225                    if !force_new_streaming
1226                        && auto_new_streaming
1227                        && e.downcast_ref::<&str>()
1228                            .map(|s| s.starts_with("not yet implemented"))
1229                            .unwrap_or(false)
1230                    {
1231                        if polars_core::config::verbose() {
1232                            eprintln!(
1233                                "caught unimplemented error in new streaming engine, falling back to normal engine"
1234                            );
1235                        }
1236                    } else {
1237                        std::panic::resume_unwind(e);
1238                    }
1239                },
1240            }
1241        }
1242
1243        None
1244    }
1245
1246    fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1247        polars_ensure!(
1248            !matches!(self.logical_plan, DslPlan::Sink { .. }),
1249            InvalidOperation: "cannot create a sink on top of another sink"
1250        );
1251        self.logical_plan = DslPlan::Sink {
1252            input: Arc::new(self.logical_plan),
1253            payload: payload.clone(),
1254        };
1255        Ok(self)
1256    }
1257
1258    /// Filter frame rows that match a predicate expression.
1259    ///
1260    /// The expression must yield boolean values (note that rows where the
1261    /// predicate resolves to `null` are *not* included in the resulting frame).
1262    ///
1263    /// # Example
1264    ///
1265    /// ```rust
1266    /// use polars_core::prelude::*;
1267    /// use polars_lazy::prelude::*;
1268    ///
1269    /// fn example(df: DataFrame) -> LazyFrame {
1270    ///       df.lazy()
1271    ///         .filter(col("sepal_width").is_not_null())
1272    ///         .select([col("sepal_width"), col("sepal_length")])
1273    /// }
1274    /// ```
1275    pub fn filter(self, predicate: Expr) -> Self {
1276        let opt_state = self.get_opt_state();
1277        let lp = self.get_plan_builder().filter(predicate).build();
1278        Self::from_logical_plan(lp, opt_state)
1279    }
1280
1281    /// Remove frame rows that match a predicate expression.
1282    ///
1283    /// The expression must yield boolean values (note that rows where the
1284    /// predicate resolves to `null` are *not* removed from the resulting frame).
1285    ///
1286    /// # Example
1287    ///
1288    /// ```rust
1289    /// use polars_core::prelude::*;
1290    /// use polars_lazy::prelude::*;
1291    ///
1292    /// fn example(df: DataFrame) -> LazyFrame {
1293    ///       df.lazy()
1294    ///         .remove(col("sepal_width").is_null())
1295    ///         .select([col("sepal_width"), col("sepal_length")])
1296    /// }
1297    /// ```
1298    pub fn remove(self, predicate: Expr) -> Self {
1299        self.filter(predicate.neq_missing(lit(true)))
1300    }
1301
1302    /// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1303    ///
1304    /// Columns can be selected with [`col`];
1305    /// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1306    ///
1307    /// # Example
1308    ///
1309    /// ```rust
1310    /// use polars_core::prelude::*;
1311    /// use polars_lazy::prelude::*;
1312    ///
1313    /// /// This function selects column "foo" and column "bar".
1314    /// /// Column "bar" is renamed to "ham".
1315    /// fn example(df: DataFrame) -> LazyFrame {
1316    ///       df.lazy()
1317    ///         .select([col("foo"),
1318    ///                   col("bar").alias("ham")])
1319    /// }
1320    ///
1321    /// /// This function selects all columns except "foo"
1322    /// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1323    ///       df.lazy()
1324    ///         .select([col(PlSmallStr::from_static("*")).exclude(["foo"])])
1325    /// }
1326    /// ```
1327    pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1328        let exprs = exprs.as_ref().to_vec();
1329        self.select_impl(
1330            exprs,
1331            ProjectionOptions {
1332                run_parallel: true,
1333                duplicate_check: true,
1334                should_broadcast: true,
1335            },
1336        )
1337    }
1338
1339    pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1340        let exprs = exprs.as_ref().to_vec();
1341        self.select_impl(
1342            exprs,
1343            ProjectionOptions {
1344                run_parallel: false,
1345                duplicate_check: true,
1346                should_broadcast: true,
1347            },
1348        )
1349    }
1350
1351    fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1352        let opt_state = self.get_opt_state();
1353        let lp = self.get_plan_builder().project(exprs, options).build();
1354        Self::from_logical_plan(lp, opt_state)
1355    }
1356
1357    /// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1358    ///
1359    /// Takes a list of expressions to group on.
1360    ///
1361    /// # Example
1362    ///
1363    /// ```rust
1364    /// use polars_core::prelude::*;
1365    /// use polars_lazy::prelude::*;
1366    ///
1367    /// fn example(df: DataFrame) -> LazyFrame {
1368    ///       df.lazy()
1369    ///        .group_by([col("date")])
1370    ///        .agg([
1371    ///            col("rain").min().alias("min_rain"),
1372    ///            col("rain").sum().alias("sum_rain"),
1373    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1374    ///        ])
1375    /// }
1376    /// ```
1377    pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1378        let keys = by
1379            .as_ref()
1380            .iter()
1381            .map(|e| e.clone().into())
1382            .collect::<Vec<_>>();
1383        let opt_state = self.get_opt_state();
1384
1385        #[cfg(feature = "dynamic_group_by")]
1386        {
1387            LazyGroupBy {
1388                logical_plan: self.logical_plan,
1389                opt_state,
1390                keys,
1391                maintain_order: false,
1392                dynamic_options: None,
1393                rolling_options: None,
1394            }
1395        }
1396
1397        #[cfg(not(feature = "dynamic_group_by"))]
1398        {
1399            LazyGroupBy {
1400                logical_plan: self.logical_plan,
1401                opt_state,
1402                keys,
1403                maintain_order: false,
1404            }
1405        }
1406    }
1407
1408    /// Create rolling groups based on a time column.
1409    ///
1410    /// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1411    ///
1412    /// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1413    /// individual values and are not of constant intervals. For constant intervals use
1414    /// *group_by_dynamic*
1415    #[cfg(feature = "dynamic_group_by")]
1416    pub fn rolling<E: AsRef<[Expr]>>(
1417        mut self,
1418        index_column: Expr,
1419        group_by: E,
1420        mut options: RollingGroupOptions,
1421    ) -> LazyGroupBy {
1422        if let Expr::Column(name) = index_column {
1423            options.index_column = name;
1424        } else {
1425            let output_field = index_column
1426                .to_field(&self.collect_schema().unwrap(), Context::Default)
1427                .unwrap();
1428            return self.with_column(index_column).rolling(
1429                Expr::Column(output_field.name().clone()),
1430                group_by,
1431                options,
1432            );
1433        }
1434        let opt_state = self.get_opt_state();
1435        LazyGroupBy {
1436            logical_plan: self.logical_plan,
1437            opt_state,
1438            keys: group_by.as_ref().to_vec(),
1439            maintain_order: true,
1440            dynamic_options: None,
1441            rolling_options: Some(options),
1442        }
1443    }
1444
1445    /// Group based on a time value (or index value of type Int32, Int64).
1446    ///
1447    /// Time windows are calculated and rows are assigned to windows. Different from a
1448    /// normal group_by is that a row can be member of multiple groups. The time/index
1449    /// window could be seen as a rolling window, with a window size determined by
1450    /// dates/times/values instead of slots in the DataFrame.
1451    ///
1452    /// A window is defined by:
1453    ///
1454    /// - every: interval of the window
1455    /// - period: length of the window
1456    /// - offset: offset of the window
1457    ///
1458    /// The `group_by` argument should be empty `[]` if you don't want to combine this
1459    /// with a ordinary group_by on these keys.
1460    #[cfg(feature = "dynamic_group_by")]
1461    pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1462        mut self,
1463        index_column: Expr,
1464        group_by: E,
1465        mut options: DynamicGroupOptions,
1466    ) -> LazyGroupBy {
1467        if let Expr::Column(name) = index_column {
1468            options.index_column = name;
1469        } else {
1470            let output_field = index_column
1471                .to_field(&self.collect_schema().unwrap(), Context::Default)
1472                .unwrap();
1473            return self.with_column(index_column).group_by_dynamic(
1474                Expr::Column(output_field.name().clone()),
1475                group_by,
1476                options,
1477            );
1478        }
1479        let opt_state = self.get_opt_state();
1480        LazyGroupBy {
1481            logical_plan: self.logical_plan,
1482            opt_state,
1483            keys: group_by.as_ref().to_vec(),
1484            maintain_order: true,
1485            dynamic_options: Some(options),
1486            rolling_options: None,
1487        }
1488    }
1489
1490    /// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1491    pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1492        let keys = by
1493            .as_ref()
1494            .iter()
1495            .map(|e| e.clone().into())
1496            .collect::<Vec<_>>();
1497        let opt_state = self.get_opt_state();
1498
1499        #[cfg(feature = "dynamic_group_by")]
1500        {
1501            LazyGroupBy {
1502                logical_plan: self.logical_plan,
1503                opt_state,
1504                keys,
1505                maintain_order: true,
1506                dynamic_options: None,
1507                rolling_options: None,
1508            }
1509        }
1510
1511        #[cfg(not(feature = "dynamic_group_by"))]
1512        {
1513            LazyGroupBy {
1514                logical_plan: self.logical_plan,
1515                opt_state,
1516                keys,
1517                maintain_order: true,
1518            }
1519        }
1520    }
1521
1522    /// Left anti join this query with another lazy query.
1523    ///
1524    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1525    /// flexible join logic, see [`join`](LazyFrame::join) or
1526    /// [`join_builder`](LazyFrame::join_builder).
1527    ///
1528    /// # Example
1529    ///
1530    /// ```rust
1531    /// use polars_core::prelude::*;
1532    /// use polars_lazy::prelude::*;
1533    /// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1534    ///         ldf
1535    ///         .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1536    /// }
1537    /// ```
1538    #[cfg(feature = "semi_anti_join")]
1539    pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1540        self.join(
1541            other,
1542            [left_on.into()],
1543            [right_on.into()],
1544            JoinArgs::new(JoinType::Anti),
1545        )
1546    }
1547
1548    /// Creates the Cartesian product from both frames, preserving the order of the left keys.
1549    #[cfg(feature = "cross_join")]
1550    pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1551        self.join(
1552            other,
1553            vec![],
1554            vec![],
1555            JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1556        )
1557    }
1558
1559    /// Left outer join this query with another lazy query.
1560    ///
1561    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1562    /// flexible join logic, see [`join`](LazyFrame::join) or
1563    /// [`join_builder`](LazyFrame::join_builder).
1564    ///
1565    /// # Example
1566    ///
1567    /// ```rust
1568    /// use polars_core::prelude::*;
1569    /// use polars_lazy::prelude::*;
1570    /// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1571    ///         ldf
1572    ///         .left_join(other, col("foo"), col("bar"))
1573    /// }
1574    /// ```
1575    pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1576        self.join(
1577            other,
1578            [left_on.into()],
1579            [right_on.into()],
1580            JoinArgs::new(JoinType::Left),
1581        )
1582    }
1583
1584    /// Inner join this query with another lazy query.
1585    ///
1586    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1587    /// flexible join logic, see [`join`](LazyFrame::join) or
1588    /// [`join_builder`](LazyFrame::join_builder).
1589    ///
1590    /// # Example
1591    ///
1592    /// ```rust
1593    /// use polars_core::prelude::*;
1594    /// use polars_lazy::prelude::*;
1595    /// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1596    ///         ldf
1597    ///         .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1598    /// }
1599    /// ```
1600    pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1601        self.join(
1602            other,
1603            [left_on.into()],
1604            [right_on.into()],
1605            JoinArgs::new(JoinType::Inner),
1606        )
1607    }
1608
1609    /// Full outer join this query with another lazy query.
1610    ///
1611    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1612    /// flexible join logic, see [`join`](LazyFrame::join) or
1613    /// [`join_builder`](LazyFrame::join_builder).
1614    ///
1615    /// # Example
1616    ///
1617    /// ```rust
1618    /// use polars_core::prelude::*;
1619    /// use polars_lazy::prelude::*;
1620    /// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1621    ///         ldf
1622    ///         .full_join(other, col("foo"), col("bar"))
1623    /// }
1624    /// ```
1625    pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1626        self.join(
1627            other,
1628            [left_on.into()],
1629            [right_on.into()],
1630            JoinArgs::new(JoinType::Full),
1631        )
1632    }
1633
1634    /// Left semi join this query with another lazy query.
1635    ///
1636    /// Matches on the values of the expressions `left_on` and `right_on`. For more
1637    /// flexible join logic, see [`join`](LazyFrame::join) or
1638    /// [`join_builder`](LazyFrame::join_builder).
1639    ///
1640    /// # Example
1641    ///
1642    /// ```rust
1643    /// use polars_core::prelude::*;
1644    /// use polars_lazy::prelude::*;
1645    /// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1646    ///         ldf
1647    ///         .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1648    /// }
1649    /// ```
1650    #[cfg(feature = "semi_anti_join")]
1651    pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1652        self.join(
1653            other,
1654            [left_on.into()],
1655            [right_on.into()],
1656            JoinArgs::new(JoinType::Semi),
1657        )
1658    }
1659
1660    /// Generic function to join two LazyFrames.
1661    ///
1662    /// `join` can join on multiple columns, given as two list of expressions, and with a
1663    /// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1664    /// that already exist in this DataFrame are suffixed with `"_right"`. For control
1665    /// over how columns are renamed and parallelization options, use
1666    /// [`join_builder`](LazyFrame::join_builder).
1667    ///
1668    /// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1669    ///
1670    /// # Example
1671    ///
1672    /// ```rust
1673    /// use polars_core::prelude::*;
1674    /// use polars_lazy::prelude::*;
1675    ///
1676    /// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1677    ///         ldf
1678    ///         .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1679    /// }
1680    /// ```
1681    pub fn join<E: AsRef<[Expr]>>(
1682        self,
1683        other: LazyFrame,
1684        left_on: E,
1685        right_on: E,
1686        args: JoinArgs,
1687    ) -> LazyFrame {
1688        let left_on = left_on.as_ref().to_vec();
1689        let right_on = right_on.as_ref().to_vec();
1690
1691        self._join_impl(other, left_on, right_on, args)
1692    }
1693
1694    fn _join_impl(
1695        self,
1696        other: LazyFrame,
1697        left_on: Vec<Expr>,
1698        right_on: Vec<Expr>,
1699        args: JoinArgs,
1700    ) -> LazyFrame {
1701        let JoinArgs {
1702            how,
1703            validation,
1704            suffix,
1705            slice,
1706            nulls_equal,
1707            coalesce,
1708            maintain_order,
1709        } = args;
1710
1711        if slice.is_some() {
1712            panic!("impl error: slice is not handled")
1713        }
1714
1715        let mut builder = self
1716            .join_builder()
1717            .with(other)
1718            .left_on(left_on)
1719            .right_on(right_on)
1720            .how(how)
1721            .validate(validation)
1722            .join_nulls(nulls_equal)
1723            .coalesce(coalesce)
1724            .maintain_order(maintain_order);
1725
1726        if let Some(suffix) = suffix {
1727            builder = builder.suffix(suffix);
1728        }
1729
1730        // Note: args.slice is set by the optimizer
1731        builder.finish()
1732    }
1733
1734    /// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1735    ///
1736    /// After the `JoinBuilder` has been created and set up, calling
1737    /// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1738    /// representing the `join` operation.
1739    pub fn join_builder(self) -> JoinBuilder {
1740        JoinBuilder::new(self)
1741    }
1742
1743    /// Add or replace a column, given as an expression, to a DataFrame.
1744    ///
1745    /// # Example
1746    ///
1747    /// ```rust
1748    /// use polars_core::prelude::*;
1749    /// use polars_lazy::prelude::*;
1750    /// fn add_column(df: DataFrame) -> LazyFrame {
1751    ///     df.lazy()
1752    ///         .with_column(
1753    ///             when(col("sepal_length").lt(lit(5.0)))
1754    ///             .then(lit(10))
1755    ///             .otherwise(lit(1))
1756    ///             .alias("new_column_name"),
1757    ///         )
1758    /// }
1759    /// ```
1760    pub fn with_column(self, expr: Expr) -> LazyFrame {
1761        let opt_state = self.get_opt_state();
1762        let lp = self
1763            .get_plan_builder()
1764            .with_columns(
1765                vec![expr],
1766                ProjectionOptions {
1767                    run_parallel: false,
1768                    duplicate_check: true,
1769                    should_broadcast: true,
1770                },
1771            )
1772            .build();
1773        Self::from_logical_plan(lp, opt_state)
1774    }
1775
1776    /// Add or replace multiple columns, given as expressions, to a DataFrame.
1777    ///
1778    /// # Example
1779    ///
1780    /// ```rust
1781    /// use polars_core::prelude::*;
1782    /// use polars_lazy::prelude::*;
1783    /// fn add_columns(df: DataFrame) -> LazyFrame {
1784    ///     df.lazy()
1785    ///         .with_columns(
1786    ///             vec![lit(10).alias("foo"), lit(100).alias("bar")]
1787    ///          )
1788    /// }
1789    /// ```
1790    pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1791        let exprs = exprs.as_ref().to_vec();
1792        self.with_columns_impl(
1793            exprs,
1794            ProjectionOptions {
1795                run_parallel: true,
1796                duplicate_check: true,
1797                should_broadcast: true,
1798            },
1799        )
1800    }
1801
1802    /// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1803    pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1804        let exprs = exprs.as_ref().to_vec();
1805        self.with_columns_impl(
1806            exprs,
1807            ProjectionOptions {
1808                run_parallel: false,
1809                duplicate_check: true,
1810                should_broadcast: true,
1811            },
1812        )
1813    }
1814
1815    /// Match or evolve to a certain schema.
1816    pub fn match_to_schema(
1817        self,
1818        schema: SchemaRef,
1819        per_column: Arc<[MatchToSchemaPerColumn]>,
1820        extra_columns: ExtraColumnsPolicy,
1821    ) -> LazyFrame {
1822        let opt_state = self.get_opt_state();
1823        let lp = self
1824            .get_plan_builder()
1825            .match_to_schema(schema, per_column, extra_columns)
1826            .build();
1827        Self::from_logical_plan(lp, opt_state)
1828    }
1829
1830    fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1831        let opt_state = self.get_opt_state();
1832        let lp = self.get_plan_builder().with_columns(exprs, options).build();
1833        Self::from_logical_plan(lp, opt_state)
1834    }
1835
1836    pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1837        let contexts = contexts
1838            .as_ref()
1839            .iter()
1840            .map(|lf| lf.logical_plan.clone())
1841            .collect();
1842        let opt_state = self.get_opt_state();
1843        let lp = self.get_plan_builder().with_context(contexts).build();
1844        Self::from_logical_plan(lp, opt_state)
1845    }
1846
1847    /// Aggregate all the columns as their maximum values.
1848    ///
1849    /// Aggregated columns will have the same names as the original columns.
1850    pub fn max(self) -> Self {
1851        self.map_private(DslFunction::Stats(StatsFunction::Max))
1852    }
1853
1854    /// Aggregate all the columns as their minimum values.
1855    ///
1856    /// Aggregated columns will have the same names as the original columns.
1857    pub fn min(self) -> Self {
1858        self.map_private(DslFunction::Stats(StatsFunction::Min))
1859    }
1860
1861    /// Aggregate all the columns as their sum values.
1862    ///
1863    /// Aggregated columns will have the same names as the original columns.
1864    ///
1865    /// - Boolean columns will sum to a `u32` containing the number of `true`s.
1866    /// - For integer columns, the ordinary checks for overflow are performed:
1867    ///   if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1868    ///   silently wrap.
1869    /// - String columns will sum to None.
1870    pub fn sum(self) -> Self {
1871        self.map_private(DslFunction::Stats(StatsFunction::Sum))
1872    }
1873
1874    /// Aggregate all the columns as their mean values.
1875    ///
1876    /// - Boolean and integer columns are converted to `f64` before computing the mean.
1877    /// - String columns will have a mean of None.
1878    pub fn mean(self) -> Self {
1879        self.map_private(DslFunction::Stats(StatsFunction::Mean))
1880    }
1881
1882    /// Aggregate all the columns as their median values.
1883    ///
1884    /// - Boolean and integer results are converted to `f64`. However, they are still
1885    ///   susceptible to overflow before this conversion occurs.
1886    /// - String columns will sum to None.
1887    pub fn median(self) -> Self {
1888        self.map_private(DslFunction::Stats(StatsFunction::Median))
1889    }
1890
1891    /// Aggregate all the columns as their quantile values.
1892    pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1893        self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1894            quantile,
1895            method,
1896        }))
1897    }
1898
1899    /// Aggregate all the columns as their standard deviation values.
1900    ///
1901    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1902    /// computing the variance, where `N` is the number of rows.
1903    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1904    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1905    /// > likelihood estimate of the variance for normally distributed variables. The
1906    /// > standard deviation computed in this function is the square root of the estimated
1907    /// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1908    /// > standard deviation per se.
1909    ///
1910    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1911    pub fn std(self, ddof: u8) -> Self {
1912        self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1913    }
1914
1915    /// Aggregate all the columns as their variance values.
1916    ///
1917    /// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1918    /// computing the variance, where `N` is the number of rows.
1919    /// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1920    /// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1921    /// > likelihood estimate of the variance for normally distributed variables.
1922    ///
1923    /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1924    pub fn var(self, ddof: u8) -> Self {
1925        self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1926    }
1927
1928    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1929    pub fn explode<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(self, columns: E) -> LazyFrame {
1930        self.explode_impl(columns, false)
1931    }
1932
1933    /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1934    fn explode_impl<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(
1935        self,
1936        columns: E,
1937        allow_empty: bool,
1938    ) -> LazyFrame {
1939        let columns = columns
1940            .as_ref()
1941            .iter()
1942            .map(|e| e.clone().into())
1943            .collect::<Vec<_>>();
1944        let opt_state = self.get_opt_state();
1945        let lp = self
1946            .get_plan_builder()
1947            .explode(columns, allow_empty)
1948            .build();
1949        Self::from_logical_plan(lp, opt_state)
1950    }
1951
1952    /// Aggregate all the columns as the sum of their null value count.
1953    pub fn null_count(self) -> LazyFrame {
1954        self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1955    }
1956
1957    /// Drop non-unique rows and maintain the order of kept rows.
1958    ///
1959    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1960    /// `None`, all columns are considered.
1961    pub fn unique_stable(
1962        self,
1963        subset: Option<Vec<PlSmallStr>>,
1964        keep_strategy: UniqueKeepStrategy,
1965    ) -> LazyFrame {
1966        self.unique_stable_generic(subset, keep_strategy)
1967    }
1968
1969    pub fn unique_stable_generic<E, IE>(
1970        self,
1971        subset: Option<E>,
1972        keep_strategy: UniqueKeepStrategy,
1973    ) -> LazyFrame
1974    where
1975        E: AsRef<[IE]>,
1976        IE: Into<Selector> + Clone,
1977    {
1978        let subset = subset.map(|s| {
1979            s.as_ref()
1980                .iter()
1981                .map(|e| e.clone().into())
1982                .collect::<Vec<_>>()
1983        });
1984
1985        let opt_state = self.get_opt_state();
1986        let options = DistinctOptionsDSL {
1987            subset,
1988            maintain_order: true,
1989            keep_strategy,
1990        };
1991        let lp = self.get_plan_builder().distinct(options).build();
1992        Self::from_logical_plan(lp, opt_state)
1993    }
1994
1995    /// Drop non-unique rows without maintaining the order of kept rows.
1996    ///
1997    /// The order of the kept rows may change; to maintain the original row order, use
1998    /// [`unique_stable`](LazyFrame::unique_stable).
1999    ///
2000    /// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
2001    /// all columns are considered.
2002    pub fn unique(
2003        self,
2004        subset: Option<Vec<String>>,
2005        keep_strategy: UniqueKeepStrategy,
2006    ) -> LazyFrame {
2007        self.unique_generic(subset, keep_strategy)
2008    }
2009
2010    pub fn unique_generic<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(
2011        self,
2012        subset: Option<E>,
2013        keep_strategy: UniqueKeepStrategy,
2014    ) -> LazyFrame {
2015        let subset = subset.map(|s| {
2016            s.as_ref()
2017                .iter()
2018                .map(|e| e.clone().into())
2019                .collect::<Vec<_>>()
2020        });
2021        let opt_state = self.get_opt_state();
2022        let options = DistinctOptionsDSL {
2023            subset,
2024            maintain_order: false,
2025            keep_strategy,
2026        };
2027        let lp = self.get_plan_builder().distinct(options).build();
2028        Self::from_logical_plan(lp, opt_state)
2029    }
2030
2031    /// Drop rows containing one or more NaN values.
2032    ///
2033    /// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
2034    /// floating point columns are considered.
2035    pub fn drop_nans(self, subset: Option<Vec<Expr>>) -> LazyFrame {
2036        let opt_state = self.get_opt_state();
2037        let lp = self.get_plan_builder().drop_nans(subset).build();
2038        Self::from_logical_plan(lp, opt_state)
2039    }
2040
2041    /// Drop rows containing one or more None values.
2042    ///
2043    /// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
2044    /// columns are considered.
2045    pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> LazyFrame {
2046        let opt_state = self.get_opt_state();
2047        let lp = self.get_plan_builder().drop_nulls(subset).build();
2048        Self::from_logical_plan(lp, opt_state)
2049    }
2050
2051    /// Slice the DataFrame using an offset (starting row) and a length.
2052    ///
2053    /// If `offset` is negative, it is counted from the end of the DataFrame. For
2054    /// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
2055    /// end.
2056    ///
2057    /// If `offset` and `len` are such that the slice extends beyond the end of the
2058    /// DataFrame, the portion between `offset` and the end will be returned. In this
2059    /// case, the number of rows in the returned DataFrame will be less than `len`.
2060    pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
2061        let opt_state = self.get_opt_state();
2062        let lp = self.get_plan_builder().slice(offset, len).build();
2063        Self::from_logical_plan(lp, opt_state)
2064    }
2065
2066    /// Get the first row.
2067    ///
2068    /// Equivalent to `self.slice(0, 1)`.
2069    pub fn first(self) -> LazyFrame {
2070        self.slice(0, 1)
2071    }
2072
2073    /// Get the last row.
2074    ///
2075    /// Equivalent to `self.slice(-1, 1)`.
2076    pub fn last(self) -> LazyFrame {
2077        self.slice(-1, 1)
2078    }
2079
2080    /// Get the last `n` rows.
2081    ///
2082    /// Equivalent to `self.slice(-(n as i64), n)`.
2083    pub fn tail(self, n: IdxSize) -> LazyFrame {
2084        let neg_tail = -(n as i64);
2085        self.slice(neg_tail, n)
2086    }
2087
2088    /// Unpivot the DataFrame from wide to long format.
2089    ///
2090    /// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
2091    #[cfg(feature = "pivot")]
2092    pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
2093        let opt_state = self.get_opt_state();
2094        let lp = self.get_plan_builder().unpivot(args).build();
2095        Self::from_logical_plan(lp, opt_state)
2096    }
2097
2098    /// Limit the DataFrame to the first `n` rows.
2099    ///
2100    /// Note if you don't want the rows to be scanned, use [`fetch`](LazyFrame::fetch).
2101    pub fn limit(self, n: IdxSize) -> LazyFrame {
2102        self.slice(0, n)
2103    }
2104
2105    /// Apply a function/closure once the logical plan get executed.
2106    ///
2107    /// The function has access to the whole materialized DataFrame at the time it is
2108    /// called.
2109    ///
2110    /// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
2111    /// with `LazyFrame::with_column` or `with_columns`.
2112    ///
2113    /// ## Warning
2114    /// This can blow up in your face if the schema is changed due to the operation. The
2115    /// optimizer relies on a correct schema.
2116    ///
2117    /// You can toggle certain optimizations off.
2118    pub fn map<F>(
2119        self,
2120        function: F,
2121        optimizations: AllowedOptimizations,
2122        schema: Option<Arc<dyn UdfSchema>>,
2123        name: Option<&'static str>,
2124    ) -> LazyFrame
2125    where
2126        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
2127    {
2128        let opt_state = self.get_opt_state();
2129        let lp = self
2130            .get_plan_builder()
2131            .map(
2132                function,
2133                optimizations,
2134                schema,
2135                PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
2136            )
2137            .build();
2138        Self::from_logical_plan(lp, opt_state)
2139    }
2140
2141    #[cfg(feature = "python")]
2142    pub fn map_python(
2143        self,
2144        function: polars_utils::python_function::PythonFunction,
2145        optimizations: AllowedOptimizations,
2146        schema: Option<SchemaRef>,
2147        validate_output: bool,
2148    ) -> LazyFrame {
2149        let opt_state = self.get_opt_state();
2150        let lp = self
2151            .get_plan_builder()
2152            .map_python(function, optimizations, schema, validate_output)
2153            .build();
2154        Self::from_logical_plan(lp, opt_state)
2155    }
2156
2157    pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2158        let opt_state = self.get_opt_state();
2159        let lp = self.get_plan_builder().map_private(function).build();
2160        Self::from_logical_plan(lp, opt_state)
2161    }
2162
2163    /// Add a new column at index 0 that counts the rows.
2164    ///
2165    /// `name` is the name of the new column. `offset` is where to start counting from; if
2166    /// `None`, it is set to `0`.
2167    ///
2168    /// # Warning
2169    /// This can have a negative effect on query performance. This may for instance block
2170    /// predicate pushdown optimization.
2171    pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2172    where
2173        S: Into<PlSmallStr>,
2174    {
2175        let name = name.into();
2176
2177        match &self.logical_plan {
2178            v @ DslPlan::Scan { scan_type, .. }
2179                if !matches!(&**scan_type, FileScan::Anonymous { .. }) =>
2180            {
2181                let DslPlan::Scan {
2182                    sources,
2183                    mut unified_scan_args,
2184                    scan_type,
2185                    file_info,
2186                    cached_ir: _,
2187                } = v.clone()
2188                else {
2189                    unreachable!()
2190                };
2191
2192                unified_scan_args.row_index = Some(RowIndex {
2193                    name,
2194                    offset: offset.unwrap_or(0),
2195                });
2196
2197                DslPlan::Scan {
2198                    sources,
2199                    unified_scan_args,
2200                    scan_type,
2201                    file_info,
2202                    cached_ir: Default::default(),
2203                }
2204                .into()
2205            },
2206            _ => self.map_private(DslFunction::RowIndex { name, offset }),
2207        }
2208    }
2209
2210    /// Return the number of non-null elements for each column.
2211    pub fn count(self) -> LazyFrame {
2212        self.select(vec![col(PlSmallStr::from_static("*")).count()])
2213    }
2214
2215    /// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2216    /// inserted as columns.
2217    #[cfg(feature = "dtype-struct")]
2218    pub fn unnest<E, IE>(self, cols: E) -> Self
2219    where
2220        E: AsRef<[IE]>,
2221        IE: Into<Selector> + Clone,
2222    {
2223        let cols = cols
2224            .as_ref()
2225            .iter()
2226            .map(|ie| ie.clone().into())
2227            .collect::<Vec<_>>();
2228        self.map_private(DslFunction::Unnest(cols))
2229    }
2230
2231    #[cfg(feature = "merge_sorted")]
2232    pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2233    where
2234        S: Into<PlSmallStr>,
2235    {
2236        let key = key.into();
2237
2238        let lp = DslPlan::MergeSorted {
2239            input_left: Arc::new(self.logical_plan),
2240            input_right: Arc::new(other.logical_plan),
2241            key,
2242        };
2243        Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2244    }
2245}
2246
2247/// Utility struct for lazy group_by operation.
2248#[derive(Clone)]
2249pub struct LazyGroupBy {
2250    pub logical_plan: DslPlan,
2251    opt_state: OptFlags,
2252    keys: Vec<Expr>,
2253    maintain_order: bool,
2254    #[cfg(feature = "dynamic_group_by")]
2255    dynamic_options: Option<DynamicGroupOptions>,
2256    #[cfg(feature = "dynamic_group_by")]
2257    rolling_options: Option<RollingGroupOptions>,
2258}
2259
2260impl From<LazyGroupBy> for LazyFrame {
2261    fn from(lgb: LazyGroupBy) -> Self {
2262        Self {
2263            logical_plan: lgb.logical_plan,
2264            opt_state: lgb.opt_state,
2265            cached_arena: Default::default(),
2266        }
2267    }
2268}
2269
2270impl LazyGroupBy {
2271    /// Group by and aggregate.
2272    ///
2273    /// Select a column with [col] and choose an aggregation.
2274    /// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2275    ///
2276    /// # Example
2277    ///
2278    /// ```rust
2279    /// use polars_core::prelude::*;
2280    /// use polars_lazy::prelude::*;
2281    ///
2282    /// fn example(df: DataFrame) -> LazyFrame {
2283    ///       df.lazy()
2284    ///        .group_by_stable([col("date")])
2285    ///        .agg([
2286    ///            col("rain").min().alias("min_rain"),
2287    ///            col("rain").sum().alias("sum_rain"),
2288    ///            col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2289    ///        ])
2290    /// }
2291    /// ```
2292    pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2293        #[cfg(feature = "dynamic_group_by")]
2294        let lp = DslBuilder::from(self.logical_plan)
2295            .group_by(
2296                self.keys,
2297                aggs,
2298                None,
2299                self.maintain_order,
2300                self.dynamic_options,
2301                self.rolling_options,
2302            )
2303            .build();
2304
2305        #[cfg(not(feature = "dynamic_group_by"))]
2306        let lp = DslBuilder::from(self.logical_plan)
2307            .group_by(self.keys, aggs, None, self.maintain_order)
2308            .build();
2309        LazyFrame::from_logical_plan(lp, self.opt_state)
2310    }
2311
2312    /// Return first n rows of each group
2313    pub fn head(self, n: Option<usize>) -> LazyFrame {
2314        let keys = self
2315            .keys
2316            .iter()
2317            .filter_map(|expr| expr_output_name(expr).ok())
2318            .collect::<Vec<_>>();
2319
2320        self.agg([col(PlSmallStr::from_static("*"))
2321            .exclude(keys.iter().cloned())
2322            .head(n)])
2323            .explode_impl(
2324                [col(PlSmallStr::from_static("*")).exclude(keys.iter().cloned())],
2325                true,
2326            )
2327    }
2328
2329    /// Return last n rows of each group
2330    pub fn tail(self, n: Option<usize>) -> LazyFrame {
2331        let keys = self
2332            .keys
2333            .iter()
2334            .filter_map(|expr| expr_output_name(expr).ok())
2335            .collect::<Vec<_>>();
2336
2337        self.agg([col(PlSmallStr::from_static("*"))
2338            .exclude(keys.iter().cloned())
2339            .tail(n)])
2340            .explode_impl(
2341                [col(PlSmallStr::from_static("*")).exclude(keys.iter().cloned())],
2342                true,
2343            )
2344    }
2345
2346    /// Apply a function over the groups as a new DataFrame.
2347    ///
2348    /// **It is not recommended that you use this as materializing the DataFrame is very
2349    /// expensive.**
2350    pub fn apply<F>(self, f: F, schema: SchemaRef) -> LazyFrame
2351    where
2352        F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
2353    {
2354        #[cfg(feature = "dynamic_group_by")]
2355        let options = GroupbyOptions {
2356            dynamic: self.dynamic_options,
2357            rolling: self.rolling_options,
2358            slice: None,
2359        };
2360
2361        #[cfg(not(feature = "dynamic_group_by"))]
2362        let options = GroupbyOptions { slice: None };
2363
2364        let lp = DslPlan::GroupBy {
2365            input: Arc::new(self.logical_plan),
2366            keys: self.keys,
2367            aggs: vec![],
2368            apply: Some((Arc::new(f), schema)),
2369            maintain_order: self.maintain_order,
2370            options: Arc::new(options),
2371        };
2372        LazyFrame::from_logical_plan(lp, self.opt_state)
2373    }
2374}
2375
2376#[must_use]
2377pub struct JoinBuilder {
2378    lf: LazyFrame,
2379    how: JoinType,
2380    other: Option<LazyFrame>,
2381    left_on: Vec<Expr>,
2382    right_on: Vec<Expr>,
2383    allow_parallel: bool,
2384    force_parallel: bool,
2385    suffix: Option<PlSmallStr>,
2386    validation: JoinValidation,
2387    nulls_equal: bool,
2388    coalesce: JoinCoalesce,
2389    maintain_order: MaintainOrderJoin,
2390}
2391impl JoinBuilder {
2392    /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2393    pub fn new(lf: LazyFrame) -> Self {
2394        Self {
2395            lf,
2396            other: None,
2397            how: JoinType::Inner,
2398            left_on: vec![],
2399            right_on: vec![],
2400            allow_parallel: true,
2401            force_parallel: false,
2402            suffix: None,
2403            validation: Default::default(),
2404            nulls_equal: false,
2405            coalesce: Default::default(),
2406            maintain_order: Default::default(),
2407        }
2408    }
2409
2410    /// The right table in the join.
2411    pub fn with(mut self, other: LazyFrame) -> Self {
2412        self.other = Some(other);
2413        self
2414    }
2415
2416    /// Select the join type.
2417    pub fn how(mut self, how: JoinType) -> Self {
2418        self.how = how;
2419        self
2420    }
2421
2422    pub fn validate(mut self, validation: JoinValidation) -> Self {
2423        self.validation = validation;
2424        self
2425    }
2426
2427    /// The expressions you want to join both tables on.
2428    ///
2429    /// The passed expressions must be valid in both `LazyFrame`s in the join.
2430    pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2431        let on = on.as_ref().to_vec();
2432        self.left_on.clone_from(&on);
2433        self.right_on = on;
2434        self
2435    }
2436
2437    /// The expressions you want to join the left table on.
2438    ///
2439    /// The passed expressions must be valid in the left table.
2440    pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2441        self.left_on = on.as_ref().to_vec();
2442        self
2443    }
2444
2445    /// The expressions you want to join the right table on.
2446    ///
2447    /// The passed expressions must be valid in the right table.
2448    pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2449        self.right_on = on.as_ref().to_vec();
2450        self
2451    }
2452
2453    /// Allow parallel table evaluation.
2454    pub fn allow_parallel(mut self, allow: bool) -> Self {
2455        self.allow_parallel = allow;
2456        self
2457    }
2458
2459    /// Force parallel table evaluation.
2460    pub fn force_parallel(mut self, force: bool) -> Self {
2461        self.force_parallel = force;
2462        self
2463    }
2464
2465    /// Join on null values. By default null values will never produce matches.
2466    pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2467        self.nulls_equal = nulls_equal;
2468        self
2469    }
2470
2471    /// Suffix to add duplicate column names in join.
2472    /// Defaults to `"_right"` if this method is never called.
2473    pub fn suffix<S>(mut self, suffix: S) -> Self
2474    where
2475        S: Into<PlSmallStr>,
2476    {
2477        self.suffix = Some(suffix.into());
2478        self
2479    }
2480
2481    /// Whether to coalesce join columns.
2482    pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2483        self.coalesce = coalesce;
2484        self
2485    }
2486
2487    /// Whether to preserve the row order.
2488    pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2489        self.maintain_order = maintain_order;
2490        self
2491    }
2492
2493    /// Finish builder
2494    pub fn finish(self) -> LazyFrame {
2495        let opt_state = self.lf.opt_state;
2496        let other = self.other.expect("'with' not set in join builder");
2497
2498        let args = JoinArgs {
2499            how: self.how,
2500            validation: self.validation,
2501            suffix: self.suffix,
2502            slice: None,
2503            nulls_equal: self.nulls_equal,
2504            coalesce: self.coalesce,
2505            maintain_order: self.maintain_order,
2506        };
2507
2508        let lp = self
2509            .lf
2510            .get_plan_builder()
2511            .join(
2512                other.logical_plan,
2513                self.left_on,
2514                self.right_on,
2515                JoinOptions {
2516                    allow_parallel: self.allow_parallel,
2517                    force_parallel: self.force_parallel,
2518                    args,
2519                    ..Default::default()
2520                }
2521                .into(),
2522            )
2523            .build();
2524        LazyFrame::from_logical_plan(lp, opt_state)
2525    }
2526
2527    // Finish with join predicates
2528    pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2529        let opt_state = self.lf.opt_state;
2530        let other = self.other.expect("with not set");
2531
2532        // Decompose `And` conjunctions into their component expressions
2533        fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2534            if let Expr::BinaryExpr {
2535                op: Operator::And,
2536                left,
2537                right,
2538            } = predicate
2539            {
2540                decompose_and((*left).clone(), expanded_predicates);
2541                decompose_and((*right).clone(), expanded_predicates);
2542            } else {
2543                expanded_predicates.push(predicate);
2544            }
2545        }
2546        let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2547        for predicate in predicates {
2548            decompose_and(predicate, &mut expanded_predicates);
2549        }
2550        let predicates: Vec<Expr> = expanded_predicates;
2551
2552        // Decompose `is_between` predicates to allow for cleaner expression of range joins
2553        #[cfg(feature = "is_between")]
2554        let predicates: Vec<Expr> = {
2555            let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2556            for predicate in predicates {
2557                if let Expr::Function {
2558                    function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2559                    input,
2560                    ..
2561                } = &predicate
2562                {
2563                    if let [expr, lower, upper] = input.as_slice() {
2564                        match closed {
2565                            ClosedInterval::Both => {
2566                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2567                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2568                            },
2569                            ClosedInterval::Right => {
2570                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2571                                expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2572                            },
2573                            ClosedInterval::Left => {
2574                                expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2575                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2576                            },
2577                            ClosedInterval::None => {
2578                                expanded_predicates.push(expr.clone().gt(lower.clone()));
2579                                expanded_predicates.push(expr.clone().lt(upper.clone()));
2580                            },
2581                        }
2582                        continue;
2583                    }
2584                }
2585                expanded_predicates.push(predicate);
2586            }
2587            expanded_predicates
2588        };
2589
2590        let args = JoinArgs {
2591            how: self.how,
2592            validation: self.validation,
2593            suffix: self.suffix,
2594            slice: None,
2595            nulls_equal: self.nulls_equal,
2596            coalesce: self.coalesce,
2597            maintain_order: self.maintain_order,
2598        };
2599        let options = JoinOptions {
2600            allow_parallel: self.allow_parallel,
2601            force_parallel: self.force_parallel,
2602            args,
2603            ..Default::default()
2604        };
2605
2606        let lp = DslPlan::Join {
2607            input_left: Arc::new(self.lf.logical_plan),
2608            input_right: Arc::new(other.logical_plan),
2609            left_on: Default::default(),
2610            right_on: Default::default(),
2611            predicates,
2612            options: Arc::from(options),
2613        };
2614
2615        LazyFrame::from_logical_plan(lp, opt_state)
2616    }
2617}
2618
2619pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2620    #[cfg(not(feature = "new_streaming"))]
2621    {
2622        None
2623    }
2624    #[cfg(feature = "new_streaming")]
2625    {
2626        Some(streaming_dispatch::build_streaming_query_executor)
2627    }
2628};
2629#[cfg(feature = "new_streaming")]
2630pub use streaming_dispatch::build_streaming_query_executor;
2631
2632#[cfg(feature = "new_streaming")]
2633mod streaming_dispatch {
2634    use std::sync::{Arc, Mutex};
2635
2636    use polars_core::POOL;
2637    use polars_core::error::PolarsResult;
2638    use polars_core::frame::DataFrame;
2639    use polars_expr::state::ExecutionState;
2640    use polars_mem_engine::Executor;
2641    use polars_plan::dsl::SinkTypeIR;
2642    use polars_plan::plans::{AExpr, IR};
2643    use polars_utils::arena::{Arena, Node};
2644
2645    pub fn build_streaming_query_executor(
2646        node: Node,
2647        ir_arena: &mut Arena<IR>,
2648        expr_arena: &mut Arena<AExpr>,
2649    ) -> PolarsResult<Box<dyn Executor>> {
2650        let rechunk = match ir_arena.get(node) {
2651            IR::Scan {
2652                unified_scan_args, ..
2653            } => unified_scan_args.rechunk,
2654            _ => false,
2655        };
2656
2657        let node = ir_arena.add(IR::Sink {
2658            input: node,
2659            payload: SinkTypeIR::Memory,
2660        });
2661
2662        polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2663            .map(Some)
2664            .map(Mutex::new)
2665            .map(Arc::new)
2666            .map(|x| StreamingQueryExecutor {
2667                executor: x,
2668                rechunk,
2669            })
2670            .map(|x| Box::new(x) as Box<dyn Executor>)
2671    }
2672
2673    // Note: Arc/Mutex is because Executor requires Sync, but SlotMap is not Sync.
2674    struct StreamingQueryExecutor {
2675        executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2676        rechunk: bool,
2677    }
2678
2679    impl Executor for StreamingQueryExecutor {
2680        fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2681            // Must not block rayon thread on pending new-streaming future.
2682            assert!(POOL.current_thread_index().is_none());
2683
2684            let mut df = { self.executor.try_lock().unwrap().take() }
2685                .expect("unhandled: execute() more than once")
2686                .execute()
2687                .map(|x| x.unwrap_single())?;
2688
2689            if self.rechunk {
2690                df.as_single_chunk_par();
2691            }
2692
2693            Ok(df)
2694        }
2695    }
2696}