1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::error::feature_gated;
26#[cfg(feature = "pivot")]
27use polars_core::frame::PivotColumnNaming;
28use polars_core::prelude::*;
29use polars_core::query_result::QueryResult;
30use polars_io::RowIndex;
31use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38
39use crate::frame::cached_arenas::CachedArena;
40use crate::prelude::*;
41
42pub trait IntoLazy {
43 fn lazy(self) -> LazyFrame;
44}
45
46impl IntoLazy for DataFrame {
47 fn lazy(self) -> LazyFrame {
49 let lp = DslBuilder::from_existing_df(self).build();
50 LazyFrame {
51 logical_plan: lp,
52 opt_state: Default::default(),
53 cached_arena: Default::default(),
54 }
55 }
56}
57
58impl IntoLazy for LazyFrame {
59 fn lazy(self) -> LazyFrame {
60 self
61 }
62}
63
64#[derive(Clone, Default)]
69#[must_use]
70pub struct LazyFrame {
71 pub logical_plan: DslPlan,
72 pub(crate) opt_state: OptFlags,
73 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
74}
75
76impl From<DslPlan> for LazyFrame {
77 fn from(plan: DslPlan) -> Self {
78 Self {
79 logical_plan: plan,
80 opt_state: OptFlags::default(),
81 cached_arena: Default::default(),
82 }
83 }
84}
85
86impl LazyFrame {
87 pub(crate) fn from_inner(
88 logical_plan: DslPlan,
89 opt_state: OptFlags,
90 cached_arena: Arc<Mutex<Option<CachedArena>>>,
91 ) -> Self {
92 Self {
93 logical_plan,
94 opt_state,
95 cached_arena,
96 }
97 }
98
99 pub(crate) fn get_plan_builder(self) -> DslBuilder {
100 DslBuilder::from(self.logical_plan)
101 }
102
103 fn get_opt_state(&self) -> OptFlags {
104 self.opt_state
105 }
106
107 pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
108 LazyFrame {
109 logical_plan,
110 opt_state,
111 cached_arena: Default::default(),
112 }
113 }
114
115 pub fn get_current_optimizations(&self) -> OptFlags {
117 self.opt_state
118 }
119
120 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
122 self.opt_state = opt_state;
123 self
124 }
125
126 pub fn without_optimizations(self) -> Self {
128 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
129 }
130
131 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
133 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
134 self
135 }
136
137 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
139 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
140 self
141 }
142
143 pub fn with_check_order(mut self, toggle: bool) -> Self {
146 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
147 self
148 }
149
150 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
152 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
153 self
154 }
155
156 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
158 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
159 self
160 }
161
162 pub fn with_type_check(mut self, toggle: bool) -> Self {
164 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
165 self
166 }
167
168 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
170 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
171 self
172 }
173
174 #[cfg(feature = "cse")]
176 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
177 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
178 self
179 }
180
181 #[cfg(feature = "cse")]
183 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
184 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
185 self
186 }
187
188 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
190 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
191 self
192 }
193
194 #[cfg(feature = "new_streaming")]
195 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
196 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
197 self
198 }
199
200 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
202 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
203 self
204 }
205
206 pub fn _with_eager(mut self, toggle: bool) -> Self {
208 self.opt_state.set(OptFlags::EAGER, toggle);
209 self
210 }
211
212 pub fn describe_plan(&self) -> PolarsResult<String> {
214 Ok(self.clone().to_alp()?.describe())
215 }
216
217 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
219 Ok(self.clone().to_alp()?.describe_tree_format())
220 }
221
222 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
226 Ok(self.clone().to_alp_optimized()?.describe())
227 }
228
229 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
233 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
234 }
235
236 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
241 if optimized {
242 self.describe_optimized_plan()
243 } else {
244 self.describe_plan()
245 }
246 }
247
248 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
288 let opt_state = self.get_opt_state();
289 let lp = self
290 .get_plan_builder()
291 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
292 .build();
293 Self::from_logical_plan(lp, opt_state)
294 }
295
296 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
316 self,
317 by_exprs: E,
318 sort_options: SortMultipleOptions,
319 ) -> Self {
320 let by_exprs = by_exprs.as_ref().to_vec();
321 if by_exprs.is_empty() {
322 self
323 } else {
324 let opt_state = self.get_opt_state();
325 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
326 Self::from_logical_plan(lp, opt_state)
327 }
328 }
329
330 pub fn top_k<E: AsRef<[Expr]>>(
331 self,
332 k: IdxSize,
333 by_exprs: E,
334 sort_options: SortMultipleOptions,
335 ) -> Self {
336 self.sort_by_exprs(
338 by_exprs,
339 sort_options.with_order_reversed().with_nulls_last(true),
340 )
341 .slice(0, k)
342 }
343
344 pub fn bottom_k<E: AsRef<[Expr]>>(
345 self,
346 k: IdxSize,
347 by_exprs: E,
348 sort_options: SortMultipleOptions,
349 ) -> Self {
350 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
352 .slice(0, k)
353 }
354
355 pub fn reverse(self) -> Self {
371 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
372 }
373
374 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
382 where
383 I: IntoIterator<Item = T>,
384 J: IntoIterator<Item = S>,
385 T: AsRef<str>,
386 S: AsRef<str>,
387 {
388 let iter = existing.into_iter();
389 let cap = iter.size_hint().0;
390 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
391 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
392
393 for (existing, new) in iter.zip(new) {
396 let existing = existing.as_ref();
397 let new = new.as_ref();
398 if new != existing {
399 existing_vec.push(existing.into());
400 new_vec.push(new.into());
401 }
402 }
403
404 self.map_private(DslFunction::Rename {
405 existing: existing_vec.into(),
406 new: new_vec.into(),
407 strict,
408 })
409 }
410
411 pub fn drop(self, columns: Selector) -> Self {
418 let opt_state = self.get_opt_state();
419 let lp = self.get_plan_builder().drop(columns).build();
420 Self::from_logical_plan(lp, opt_state)
421 }
422
423 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
428 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
429 }
430
431 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
436 self.select(vec![
437 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
438 ])
439 }
440
441 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
443 let opt_state = self.get_opt_state();
444 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
445 Self::from_logical_plan(lp, opt_state)
446 }
447
448 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
450 let opt_state = self.get_opt_state();
451 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
452 Self::from_logical_plan(lp, opt_state)
453 }
454
455 pub fn cache(self) -> Self {
459 let opt_state = self.get_opt_state();
460 let lp = self.get_plan_builder().cache().build();
461 Self::from_logical_plan(lp, opt_state)
462 }
463
464 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
466 let cast_cols: Vec<Expr> = dtypes
467 .into_iter()
468 .map(|(name, dt)| {
469 let name = PlSmallStr::from_str(name);
470
471 if strict {
472 col(name).strict_cast(dt)
473 } else {
474 col(name).cast(dt)
475 }
476 })
477 .collect();
478
479 if cast_cols.is_empty() {
480 self
481 } else {
482 self.with_columns(cast_cols)
483 }
484 }
485
486 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
488 self.with_columns(vec![if strict {
489 col(PlSmallStr::from_static("*")).strict_cast(dtype)
490 } else {
491 col(PlSmallStr::from_static("*")).cast(dtype)
492 }])
493 }
494
495 pub fn optimize(
496 self,
497 lp_arena: &mut Arena<IR>,
498 expr_arena: &mut Arena<AExpr>,
499 ) -> PolarsResult<Node> {
500 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
501 }
502
503 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
504 let (mut lp_arena, mut expr_arena) = self.get_arenas();
505 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
506
507 Ok(IRPlan::new(node, lp_arena, expr_arena))
508 }
509
510 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
511 let (mut lp_arena, mut expr_arena) = self.get_arenas();
512 let node = to_alp(
513 self.logical_plan,
514 &mut expr_arena,
515 &mut lp_arena,
516 &mut self.opt_state,
517 )?;
518 let plan = IRPlan::new(node, lp_arena, expr_arena);
519 Ok(plan)
520 }
521
522 pub(crate) fn optimize_with_scratch(
523 self,
524 lp_arena: &mut Arena<IR>,
525 expr_arena: &mut Arena<AExpr>,
526 scratch: &mut Vec<Node>,
527 ) -> PolarsResult<Node> {
528 let lp_top = optimize(
529 self.logical_plan,
530 self.opt_state,
531 lp_arena,
532 expr_arena,
533 scratch,
534 apply_scan_predicate_to_scan_ir,
535 )?;
536
537 Ok(lp_top)
538 }
539
540 fn prepare_collect_post_opt<P>(
541 mut self,
542 check_sink: bool,
543 query_start: Option<std::time::Instant>,
544 post_opt: P,
545 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
546 where
547 P: FnOnce(
548 Node,
549 &mut Arena<IR>,
550 &mut Arena<AExpr>,
551 Option<std::time::Duration>,
552 ) -> PolarsResult<()>,
553 {
554 let (mut lp_arena, mut expr_arena) = self.get_arenas();
555
556 let mut scratch = vec![];
557 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
558
559 post_opt(
560 lp_top,
561 &mut lp_arena,
562 &mut expr_arena,
563 query_start.map(|s| s.elapsed()),
566 )?;
567
568 let no_file_sink = if check_sink {
570 !matches!(
571 lp_arena.get(lp_top),
572 IR::Sink {
573 payload: SinkTypeIR::File { .. },
574 ..
575 }
576 )
577 } else {
578 true
579 };
580 let physical_plan = create_physical_plan(
581 lp_top,
582 &mut lp_arena,
583 &mut expr_arena,
584 BUILD_STREAMING_EXECUTOR,
585 )?;
586
587 let state = ExecutionState::new();
588 Ok((state, physical_plan, no_file_sink))
589 }
590
591 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
593 where
594 P: FnOnce(
595 Node,
596 &mut Arena<IR>,
597 &mut Arena<AExpr>,
598 Option<std::time::Duration>,
599 ) -> PolarsResult<()>,
600 {
601 let (mut state, mut physical_plan, _) =
602 self.prepare_collect_post_opt(false, None, post_opt)?;
603 physical_plan.execute(&mut state)
604 }
605
606 #[allow(unused_mut)]
607 fn prepare_collect(
608 self,
609 check_sink: bool,
610 query_start: Option<std::time::Instant>,
611 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
612 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
613 }
614
615 pub fn collect_with_engine(mut self, engine: Engine) -> PolarsResult<QueryResult> {
620 let engine = match engine {
621 Engine::Streaming => Engine::Streaming,
622 _ if std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1") => {
623 Engine::Streaming
624 },
625 Engine::Auto => Engine::InMemory,
626 v => v,
627 };
628
629 if engine != Engine::Streaming
630 && std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1")
631 {
632 feature_gated!("new_streaming", {
633 if let Some(r) = self.clone()._collect_with_streaming_suppress_todo_panic() {
634 return r;
635 }
636 })
637 }
638
639 if let Engine::Streaming = engine {
640 feature_gated!("new_streaming", self = self.with_new_streaming(true))
641 }
642
643 let mut ir_plan = self.to_alp_optimized()?;
644
645 ir_plan.ensure_root_node_is_sink();
646
647 match engine {
648 Engine::Streaming => feature_gated!("new_streaming", {
649 polars_stream::run_query(
650 ir_plan.lp_top,
651 &mut ir_plan.lp_arena,
652 &mut ir_plan.expr_arena,
653 )
654 }),
655 Engine::InMemory | Engine::Gpu => {
656 if let IR::SinkMultiple { inputs } = ir_plan.root() {
657 polars_ensure!(
658 engine != Engine::Gpu,
659 InvalidOperation:
660 "collect_all is not supported for the gpu engine"
661 );
662
663 return create_multiple_physical_plans(
664 inputs.clone().as_slice(),
665 &mut ir_plan.lp_arena,
666 &mut ir_plan.expr_arena,
667 BUILD_STREAMING_EXECUTOR,
668 )?
669 .execute()
670 .map(QueryResult::Multiple);
671 }
672
673 let mut physical_plan = create_physical_plan(
674 ir_plan.lp_top,
675 &mut ir_plan.lp_arena,
676 &mut ir_plan.expr_arena,
677 BUILD_STREAMING_EXECUTOR,
678 )?;
679 let mut state = ExecutionState::new();
680 physical_plan.execute(&mut state).map(QueryResult::Single)
681 },
682 Engine::Auto => unreachable!(),
683 }
684 }
685
686 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
687 let sink_multiple = LazyFrame {
688 logical_plan: DslPlan::SinkMultiple { inputs: plans },
689 opt_state,
690 cached_arena: Default::default(),
691 };
692 sink_multiple.explain(true)
693 }
694
695 pub fn collect_all_with_engine(
696 plans: Vec<DslPlan>,
697 engine: Engine,
698 opt_state: OptFlags,
699 ) -> PolarsResult<Vec<DataFrame>> {
700 if plans.is_empty() {
701 return Ok(Vec::new());
702 }
703
704 LazyFrame {
705 logical_plan: DslPlan::SinkMultiple { inputs: plans },
706 opt_state,
707 cached_arena: Default::default(),
708 }
709 .collect_with_engine(engine)
710 .map(|r| r.unwrap_multiple())
711 }
712
713 pub fn collect(self) -> PolarsResult<DataFrame> {
731 self.collect_with_engine(Engine::Auto).map(|r| match r {
732 QueryResult::Single(df) => df,
733 QueryResult::Multiple(_) => DataFrame::empty(),
735 })
736 }
737
738 #[cfg(feature = "async")]
743 pub fn collect_batches(
744 self,
745 engine: Engine,
746 maintain_order: bool,
747 chunk_size: Option<NonZeroUsize>,
748 lazy: bool,
749 ) -> PolarsResult<CollectBatches> {
750 let (send, recv) = sync_channel(1);
751 let runner_send = send.clone();
752 let ldf = self.sink_batches(
753 PlanCallback::new(move |df| {
754 let send_result = send.send(Ok(df));
756 Ok(send_result.is_err())
757 }),
758 maintain_order,
759 chunk_size,
760 )?;
761 let runner = move || {
762 polars_io::pl_async::get_runtime().spawn_blocking(move || {
765 if let Err(e) = ldf.collect_with_engine(engine) {
766 runner_send.send(Err(e)).ok();
767 }
768 });
769 };
770
771 let mut collect_batches = CollectBatches {
772 recv,
773 runner: Some(Box::new(runner)),
774 };
775 if !lazy {
776 collect_batches.start();
777 }
778 Ok(collect_batches)
779 }
780
781 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
784 where
785 P: FnOnce(
786 Node,
787 &mut Arena<IR>,
788 &mut Arena<AExpr>,
789 Option<std::time::Duration>,
790 ) -> PolarsResult<()>,
791 {
792 let query_start = std::time::Instant::now();
793 let (mut state, mut physical_plan, _) =
794 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
795 state.time_nodes(query_start);
796 let out = physical_plan.execute(&mut state)?;
797 let timer_df = state.finish_timer()?;
798 Ok((out, timer_df))
799 }
800
801 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
809 self._profile_post_opt(|_, _, _, _| Ok(()))
810 }
811
812 pub fn sink_batches(
813 mut self,
814 function: PlanCallback<DataFrame, bool>,
815 maintain_order: bool,
816 chunk_size: Option<NonZeroUsize>,
817 ) -> PolarsResult<Self> {
818 use polars_plan::prelude::sink::CallbackSinkType;
819
820 polars_ensure!(
821 !matches!(self.logical_plan, DslPlan::Sink { .. }),
822 InvalidOperation: "cannot create a sink on top of another sink"
823 );
824
825 self.logical_plan = DslPlan::Sink {
826 input: Arc::new(self.logical_plan),
827 payload: SinkType::Callback(CallbackSinkType {
828 function,
829 maintain_order,
830 chunk_size,
831 }),
832 };
833
834 Ok(self)
835 }
836
837 #[cfg(feature = "new_streaming")]
839 fn _collect_with_streaming_suppress_todo_panic(
840 mut self,
841 ) -> Option<PolarsResult<polars_core::query_result::QueryResult>> {
842 self.opt_state |= OptFlags::NEW_STREAMING;
843 let mut ir_plan = match self.to_alp_optimized() {
844 Ok(v) => v,
845 Err(e) => return Some(Err(e)),
846 };
847
848 ir_plan.ensure_root_node_is_sink();
849
850 let f = || {
851 polars_stream::run_query(
852 ir_plan.lp_top,
853 &mut ir_plan.lp_arena,
854 &mut ir_plan.expr_arena,
855 )
856 };
857
858 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
859 Ok(v) => Some(v),
860 Err(e) => {
861 if e.downcast_ref::<&str>()
864 .is_some_and(|s| s.starts_with("not yet implemented"))
865 {
866 if polars_core::config::verbose() {
867 eprintln!(
868 "caught unimplemented error in new streaming engine, falling back to normal engine"
869 );
870 }
871 None
872 } else {
873 std::panic::resume_unwind(e)
874 }
875 },
876 }
877 }
878
879 pub fn sink(
880 mut self,
881 sink_type: SinkDestination,
882 file_format: FileWriteFormat,
883 unified_sink_args: UnifiedSinkArgs,
884 ) -> PolarsResult<Self> {
885 polars_ensure!(
886 !matches!(self.logical_plan, DslPlan::Sink { .. }),
887 InvalidOperation: "cannot create a sink on top of another sink"
888 );
889
890 self.logical_plan = DslPlan::Sink {
891 input: Arc::new(self.logical_plan),
892 payload: match sink_type {
893 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
894 target,
895 file_format,
896 unified_sink_args,
897 }),
898 SinkDestination::Partitioned {
899 base_path,
900 file_path_provider,
901 partition_strategy,
902 max_rows_per_file,
903 approximate_bytes_per_file,
904 } => SinkType::Partitioned(PartitionedSinkOptions {
905 base_path,
906 file_path_provider,
907 partition_strategy,
908 file_format,
909 unified_sink_args,
910 max_rows_per_file,
911 approximate_bytes_per_file,
912 }),
913 },
914 };
915 Ok(self)
916 }
917
918 pub fn filter(self, predicate: Expr) -> Self {
936 let opt_state = self.get_opt_state();
937 let lp = self.get_plan_builder().filter(predicate).build();
938 Self::from_logical_plan(lp, opt_state)
939 }
940
941 pub fn remove(self, predicate: Expr) -> Self {
959 self.filter(predicate.neq_missing(lit(true)))
960 }
961
962 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
988 let exprs = exprs.as_ref().to_vec();
989 self.select_impl(
990 exprs,
991 ProjectionOptions {
992 run_parallel: true,
993 duplicate_check: true,
994 should_broadcast: true,
995 },
996 )
997 }
998
999 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1000 let exprs = exprs.as_ref().to_vec();
1001 self.select_impl(
1002 exprs,
1003 ProjectionOptions {
1004 run_parallel: false,
1005 duplicate_check: true,
1006 should_broadcast: true,
1007 },
1008 )
1009 }
1010
1011 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1012 let opt_state = self.get_opt_state();
1013 let lp = self.get_plan_builder().project(exprs, options).build();
1014 Self::from_logical_plan(lp, opt_state)
1015 }
1016
1017 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1038 let keys = by
1039 .as_ref()
1040 .iter()
1041 .map(|e| e.clone().into())
1042 .collect::<Vec<_>>();
1043 let opt_state = self.get_opt_state();
1044
1045 #[cfg(feature = "dynamic_group_by")]
1046 {
1047 LazyGroupBy {
1048 logical_plan: self.logical_plan,
1049 opt_state,
1050 keys,
1051 predicates: vec![],
1052 maintain_order: false,
1053 dynamic_options: None,
1054 rolling_options: None,
1055 }
1056 }
1057
1058 #[cfg(not(feature = "dynamic_group_by"))]
1059 {
1060 LazyGroupBy {
1061 logical_plan: self.logical_plan,
1062 opt_state,
1063 keys,
1064 predicates: vec![],
1065 maintain_order: false,
1066 }
1067 }
1068 }
1069
1070 #[cfg(feature = "dynamic_group_by")]
1078 pub fn rolling<E: AsRef<[Expr]>>(
1079 mut self,
1080 index_column: Expr,
1081 group_by: E,
1082 mut options: RollingGroupOptions,
1083 ) -> LazyGroupBy {
1084 if let Expr::Column(name) = index_column {
1085 options.index_column = name;
1086 } else {
1087 let output_field = index_column
1088 .to_field(&self.collect_schema().unwrap())
1089 .unwrap();
1090 return self.with_column(index_column).rolling(
1091 Expr::Column(output_field.name().clone()),
1092 group_by,
1093 options,
1094 );
1095 }
1096 let opt_state = self.get_opt_state();
1097 LazyGroupBy {
1098 logical_plan: self.logical_plan,
1099 opt_state,
1100 predicates: vec![],
1101 keys: group_by.as_ref().to_vec(),
1102 maintain_order: true,
1103 dynamic_options: None,
1104 rolling_options: Some(options),
1105 }
1106 }
1107
1108 #[cfg(feature = "dynamic_group_by")]
1124 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1125 mut self,
1126 index_column: Expr,
1127 group_by: E,
1128 mut options: DynamicGroupOptions,
1129 ) -> LazyGroupBy {
1130 if let Expr::Column(name) = index_column {
1131 options.index_column = name;
1132 } else {
1133 let output_field = index_column
1134 .to_field(&self.collect_schema().unwrap())
1135 .unwrap();
1136 return self.with_column(index_column).group_by_dynamic(
1137 Expr::Column(output_field.name().clone()),
1138 group_by,
1139 options,
1140 );
1141 }
1142 let opt_state = self.get_opt_state();
1143 LazyGroupBy {
1144 logical_plan: self.logical_plan,
1145 opt_state,
1146 predicates: vec![],
1147 keys: group_by.as_ref().to_vec(),
1148 maintain_order: true,
1149 dynamic_options: Some(options),
1150 rolling_options: None,
1151 }
1152 }
1153
1154 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1156 let keys = by
1157 .as_ref()
1158 .iter()
1159 .map(|e| e.clone().into())
1160 .collect::<Vec<_>>();
1161 let opt_state = self.get_opt_state();
1162
1163 #[cfg(feature = "dynamic_group_by")]
1164 {
1165 LazyGroupBy {
1166 logical_plan: self.logical_plan,
1167 opt_state,
1168 keys,
1169 predicates: vec![],
1170 maintain_order: true,
1171 dynamic_options: None,
1172 rolling_options: None,
1173 }
1174 }
1175
1176 #[cfg(not(feature = "dynamic_group_by"))]
1177 {
1178 LazyGroupBy {
1179 logical_plan: self.logical_plan,
1180 opt_state,
1181 keys,
1182 predicates: vec![],
1183 maintain_order: true,
1184 }
1185 }
1186 }
1187
1188 #[cfg(feature = "semi_anti_join")]
1205 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1206 self.join(
1207 other,
1208 [left_on.into()],
1209 [right_on.into()],
1210 JoinArgs::new(JoinType::Anti),
1211 )
1212 }
1213
1214 #[cfg(feature = "cross_join")]
1216 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1217 self.join(
1218 other,
1219 vec![],
1220 vec![],
1221 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1222 )
1223 }
1224
1225 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1242 self.join(
1243 other,
1244 [left_on.into()],
1245 [right_on.into()],
1246 JoinArgs::new(JoinType::Left),
1247 )
1248 }
1249
1250 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1267 self.join(
1268 other,
1269 [left_on.into()],
1270 [right_on.into()],
1271 JoinArgs::new(JoinType::Inner),
1272 )
1273 }
1274
1275 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1292 self.join(
1293 other,
1294 [left_on.into()],
1295 [right_on.into()],
1296 JoinArgs::new(JoinType::Full),
1297 )
1298 }
1299
1300 #[cfg(feature = "semi_anti_join")]
1317 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1318 self.join(
1319 other,
1320 [left_on.into()],
1321 [right_on.into()],
1322 JoinArgs::new(JoinType::Semi),
1323 )
1324 }
1325
1326 pub fn join<E: AsRef<[Expr]>>(
1348 self,
1349 other: LazyFrame,
1350 left_on: E,
1351 right_on: E,
1352 args: JoinArgs,
1353 ) -> LazyFrame {
1354 let left_on = left_on.as_ref().to_vec();
1355 let right_on = right_on.as_ref().to_vec();
1356
1357 self._join_impl(other, left_on, right_on, args)
1358 }
1359
1360 fn _join_impl(
1361 self,
1362 other: LazyFrame,
1363 left_on: Vec<Expr>,
1364 right_on: Vec<Expr>,
1365 args: JoinArgs,
1366 ) -> LazyFrame {
1367 let JoinArgs {
1368 how,
1369 validation,
1370 suffix,
1371 slice,
1372 nulls_equal,
1373 coalesce,
1374 maintain_order,
1375 build_side,
1376 } = args;
1377
1378 if slice.is_some() {
1379 panic!("impl error: slice is not handled")
1380 }
1381
1382 let mut builder = self
1383 .join_builder()
1384 .with(other)
1385 .left_on(left_on)
1386 .right_on(right_on)
1387 .how(how)
1388 .validate(validation)
1389 .join_nulls(nulls_equal)
1390 .coalesce(coalesce)
1391 .maintain_order(maintain_order)
1392 .build_side(build_side);
1393
1394 if let Some(suffix) = suffix {
1395 builder = builder.suffix(suffix);
1396 }
1397
1398 builder.finish()
1400 }
1401
1402 pub fn join_builder(self) -> JoinBuilder {
1408 JoinBuilder::new(self)
1409 }
1410
1411 pub fn with_column(self, expr: Expr) -> LazyFrame {
1429 let opt_state = self.get_opt_state();
1430 let lp = self
1431 .get_plan_builder()
1432 .with_columns(
1433 vec![expr],
1434 ProjectionOptions {
1435 run_parallel: false,
1436 duplicate_check: true,
1437 should_broadcast: true,
1438 },
1439 )
1440 .build();
1441 Self::from_logical_plan(lp, opt_state)
1442 }
1443
1444 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1459 let exprs = exprs.as_ref().to_vec();
1460 self.with_columns_impl(
1461 exprs,
1462 ProjectionOptions {
1463 run_parallel: true,
1464 duplicate_check: true,
1465 should_broadcast: true,
1466 },
1467 )
1468 }
1469
1470 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1472 let exprs = exprs.as_ref().to_vec();
1473 self.with_columns_impl(
1474 exprs,
1475 ProjectionOptions {
1476 run_parallel: false,
1477 duplicate_check: true,
1478 should_broadcast: true,
1479 },
1480 )
1481 }
1482
1483 pub fn match_to_schema(
1485 self,
1486 schema: SchemaRef,
1487 per_column: Arc<[MatchToSchemaPerColumn]>,
1488 extra_columns: ExtraColumnsPolicy,
1489 ) -> LazyFrame {
1490 let opt_state = self.get_opt_state();
1491 let lp = self
1492 .get_plan_builder()
1493 .match_to_schema(schema, per_column, extra_columns)
1494 .build();
1495 Self::from_logical_plan(lp, opt_state)
1496 }
1497
1498 pub fn pipe_with_schema(
1499 self,
1500 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1501 ) -> Self {
1502 let opt_state = self.get_opt_state();
1503 let lp = self
1504 .get_plan_builder()
1505 .pipe_with_schema(vec![], callback)
1506 .build();
1507 Self::from_logical_plan(lp, opt_state)
1508 }
1509
1510 pub fn pipe_with_schemas(
1511 self,
1512 others: Vec<LazyFrame>,
1513 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1514 ) -> Self {
1515 let opt_state = self.get_opt_state();
1516 let lp = self
1517 .get_plan_builder()
1518 .pipe_with_schema(
1519 others.into_iter().map(|lf| lf.logical_plan).collect(),
1520 callback,
1521 )
1522 .build();
1523 Self::from_logical_plan(lp, opt_state)
1524 }
1525
1526 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1527 let opt_state = self.get_opt_state();
1528 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1529 Self::from_logical_plan(lp, opt_state)
1530 }
1531
1532 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1533 let contexts = contexts
1534 .as_ref()
1535 .iter()
1536 .map(|lf| lf.logical_plan.clone())
1537 .collect();
1538 let opt_state = self.get_opt_state();
1539 let lp = self.get_plan_builder().with_context(contexts).build();
1540 Self::from_logical_plan(lp, opt_state)
1541 }
1542
1543 pub fn max(self) -> Self {
1547 self.map_private(DslFunction::Stats(StatsFunction::Max))
1548 }
1549
1550 pub fn min(self) -> Self {
1554 self.map_private(DslFunction::Stats(StatsFunction::Min))
1555 }
1556
1557 pub fn sum(self) -> Self {
1567 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1568 }
1569
1570 pub fn mean(self) -> Self {
1575 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1576 }
1577
1578 pub fn median(self) -> Self {
1584 self.map_private(DslFunction::Stats(StatsFunction::Median))
1585 }
1586
1587 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1589 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1590 quantile,
1591 method,
1592 }))
1593 }
1594
1595 pub fn std(self, ddof: u8) -> Self {
1608 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1609 }
1610
1611 pub fn var(self, ddof: u8) -> Self {
1621 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1622 }
1623
1624 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1626 self.explode_impl(columns, options, false)
1627 }
1628
1629 fn explode_impl(
1631 self,
1632 columns: Selector,
1633 options: ExplodeOptions,
1634 allow_empty: bool,
1635 ) -> LazyFrame {
1636 let opt_state = self.get_opt_state();
1637 let lp = self
1638 .get_plan_builder()
1639 .explode(columns, options, allow_empty)
1640 .build();
1641 Self::from_logical_plan(lp, opt_state)
1642 }
1643
1644 pub fn null_count(self) -> LazyFrame {
1646 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1647 }
1648
1649 pub fn unique_stable(
1654 self,
1655 subset: Option<Selector>,
1656 keep_strategy: UniqueKeepStrategy,
1657 ) -> LazyFrame {
1658 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1659 self.unique_stable_generic(subset, keep_strategy)
1660 }
1661
1662 pub fn unique_stable_generic(
1663 self,
1664 subset: Option<Vec<Expr>>,
1665 keep_strategy: UniqueKeepStrategy,
1666 ) -> LazyFrame {
1667 let opt_state = self.get_opt_state();
1668 let options = DistinctOptionsDSL {
1669 subset,
1670 maintain_order: true,
1671 keep_strategy,
1672 };
1673 let lp = self.get_plan_builder().distinct(options).build();
1674 Self::from_logical_plan(lp, opt_state)
1675 }
1676
1677 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1685 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1686 self.unique_generic(subset, keep_strategy)
1687 }
1688
1689 pub fn unique_generic(
1690 self,
1691 subset: Option<Vec<Expr>>,
1692 keep_strategy: UniqueKeepStrategy,
1693 ) -> LazyFrame {
1694 let opt_state = self.get_opt_state();
1695 let options = DistinctOptionsDSL {
1696 subset,
1697 maintain_order: false,
1698 keep_strategy,
1699 };
1700 let lp = self.get_plan_builder().distinct(options).build();
1701 Self::from_logical_plan(lp, opt_state)
1702 }
1703
1704 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1709 let opt_state = self.get_opt_state();
1710 let lp = self.get_plan_builder().drop_nans(subset).build();
1711 Self::from_logical_plan(lp, opt_state)
1712 }
1713
1714 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1719 let opt_state = self.get_opt_state();
1720 let lp = self.get_plan_builder().drop_nulls(subset).build();
1721 Self::from_logical_plan(lp, opt_state)
1722 }
1723
1724 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1734 let opt_state = self.get_opt_state();
1735 let lp = self.get_plan_builder().slice(offset, len).build();
1736 Self::from_logical_plan(lp, opt_state)
1737 }
1738
1739 pub fn clear(self) -> LazyFrame {
1741 self.slice(0, 0)
1742 }
1743
1744 pub fn first(self) -> LazyFrame {
1748 self.slice(0, 1)
1749 }
1750
1751 pub fn last(self) -> LazyFrame {
1755 self.slice(-1, 1)
1756 }
1757
1758 pub fn tail(self, n: IdxSize) -> LazyFrame {
1762 let neg_tail = -(n as i64);
1763 self.slice(neg_tail, n)
1764 }
1765
1766 #[cfg(feature = "pivot")]
1767 #[expect(clippy::too_many_arguments)]
1768 pub fn pivot(
1769 self,
1770 on: Selector,
1771 on_columns: Arc<DataFrame>,
1772 index: Selector,
1773 values: Selector,
1774 agg: Expr,
1775 maintain_order: bool,
1776 separator: PlSmallStr,
1777 column_naming: PivotColumnNaming,
1778 ) -> LazyFrame {
1779 let opt_state = self.get_opt_state();
1780 let lp = self
1781 .get_plan_builder()
1782 .pivot(
1783 on,
1784 on_columns,
1785 index,
1786 values,
1787 agg,
1788 maintain_order,
1789 separator,
1790 column_naming,
1791 )
1792 .build();
1793 Self::from_logical_plan(lp, opt_state)
1794 }
1795
1796 #[cfg(feature = "pivot")]
1800 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1801 let opt_state = self.get_opt_state();
1802 let lp = self.get_plan_builder().unpivot(args).build();
1803 Self::from_logical_plan(lp, opt_state)
1804 }
1805
1806 pub fn limit(self, n: IdxSize) -> LazyFrame {
1808 self.slice(0, n)
1809 }
1810
1811 pub fn map<F>(
1825 self,
1826 function: F,
1827 optimizations: AllowedOptimizations,
1828 schema: Option<Arc<dyn UdfSchema>>,
1829 name: Option<&'static str>,
1830 ) -> LazyFrame
1831 where
1832 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1833 {
1834 let opt_state = self.get_opt_state();
1835 let lp = self
1836 .get_plan_builder()
1837 .map(
1838 function,
1839 optimizations,
1840 schema,
1841 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1842 )
1843 .build();
1844 Self::from_logical_plan(lp, opt_state)
1845 }
1846
1847 #[cfg(feature = "python")]
1848 pub fn map_python(
1849 self,
1850 function: polars_utils::python_function::PythonFunction,
1851 optimizations: AllowedOptimizations,
1852 schema: Option<SchemaRef>,
1853 validate_output: bool,
1854 ) -> LazyFrame {
1855 let opt_state = self.get_opt_state();
1856 let lp = self
1857 .get_plan_builder()
1858 .map_python(function, optimizations, schema, validate_output)
1859 .build();
1860 Self::from_logical_plan(lp, opt_state)
1861 }
1862
1863 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1864 let opt_state = self.get_opt_state();
1865 let lp = self.get_plan_builder().map_private(function).build();
1866 Self::from_logical_plan(lp, opt_state)
1867 }
1868
1869 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1878 where
1879 S: Into<PlSmallStr>,
1880 {
1881 let name = name.into();
1882
1883 match &self.logical_plan {
1884 v @ DslPlan::Scan {
1885 scan_type,
1886 unified_scan_args,
1887 ..
1888 } if unified_scan_args.row_index.is_none()
1889 && !matches!(
1890 &**scan_type,
1891 FileScanDsl::Anonymous { .. } | FileScanDsl::ExpandedPaths { .. }
1892 ) =>
1893 {
1894 let DslPlan::Scan {
1895 sources,
1896 mut unified_scan_args,
1897 scan_type,
1898 cached_ir: _,
1899 } = v.clone()
1900 else {
1901 unreachable!()
1902 };
1903
1904 unified_scan_args.row_index = Some(RowIndex {
1905 name,
1906 offset: offset.unwrap_or(0),
1907 });
1908
1909 DslPlan::Scan {
1910 sources,
1911 unified_scan_args,
1912 scan_type,
1913 cached_ir: Default::default(),
1914 }
1915 .into()
1916 },
1917 _ => self.map_private(DslFunction::RowIndex { name, offset }),
1918 }
1919 }
1920
1921 pub fn count(self) -> LazyFrame {
1923 self.select(vec![col(PlSmallStr::from_static("*")).count()])
1924 }
1925
1926 #[cfg(feature = "dtype-struct")]
1929 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1930 self.map_private(DslFunction::Unnest {
1931 columns: cols,
1932 separator,
1933 })
1934 }
1935
1936 #[cfg(feature = "merge_sorted")]
1937 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1938 where
1939 S: Into<PlSmallStr>,
1940 {
1941 let key = key.into();
1942
1943 let lp = DslPlan::MergeSorted {
1944 input_left: Arc::new(self.logical_plan),
1945 input_right: Arc::new(other.logical_plan),
1946 key,
1947 };
1948 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1949 }
1950
1951 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1952 let lp = DslPlan::MapFunction {
1953 input: Arc::new(self.logical_plan),
1954 function: DslFunction::Hint(hint),
1955 };
1956 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1957 }
1958}
1959
1960#[derive(Clone)]
1962pub struct LazyGroupBy {
1963 pub logical_plan: DslPlan,
1964 opt_state: OptFlags,
1965 keys: Vec<Expr>,
1966 predicates: Vec<Expr>,
1967 maintain_order: bool,
1968 #[cfg(feature = "dynamic_group_by")]
1969 dynamic_options: Option<DynamicGroupOptions>,
1970 #[cfg(feature = "dynamic_group_by")]
1971 rolling_options: Option<RollingGroupOptions>,
1972}
1973
1974impl From<LazyGroupBy> for LazyFrame {
1975 fn from(lgb: LazyGroupBy) -> Self {
1976 Self {
1977 logical_plan: lgb.logical_plan,
1978 opt_state: lgb.opt_state,
1979 cached_arena: Default::default(),
1980 }
1981 }
1982}
1983
1984impl LazyGroupBy {
1985 pub fn having(mut self, predicate: Expr) -> Self {
2006 self.predicates.push(predicate);
2007 self
2008 }
2009
2010 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2032 #[cfg(feature = "dynamic_group_by")]
2033 let lp = DslBuilder::from(self.logical_plan)
2034 .group_by(
2035 self.keys,
2036 self.predicates,
2037 aggs,
2038 None,
2039 self.maintain_order,
2040 self.dynamic_options,
2041 self.rolling_options,
2042 )
2043 .build();
2044
2045 #[cfg(not(feature = "dynamic_group_by"))]
2046 let lp = DslBuilder::from(self.logical_plan)
2047 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2048 .build();
2049 LazyFrame::from_logical_plan(lp, self.opt_state)
2050 }
2051
2052 pub fn head(self, n: Option<usize>) -> LazyFrame {
2054 let keys = self
2055 .keys
2056 .iter()
2057 .filter_map(|expr| expr_output_name(expr).ok())
2058 .collect::<Vec<_>>();
2059
2060 self.agg([all().as_expr().head(n)]).explode_impl(
2061 all() - by_name(keys.iter().cloned(), false, false),
2062 ExplodeOptions {
2063 empty_as_null: true,
2064 keep_nulls: true,
2065 },
2066 true,
2067 )
2068 }
2069
2070 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2072 let keys = self
2073 .keys
2074 .iter()
2075 .filter_map(|expr| expr_output_name(expr).ok())
2076 .collect::<Vec<_>>();
2077
2078 self.agg([all().as_expr().tail(n)]).explode_impl(
2079 all() - by_name(keys.iter().cloned(), false, false),
2080 ExplodeOptions {
2081 empty_as_null: true,
2082 keep_nulls: true,
2083 },
2084 true,
2085 )
2086 }
2087
2088 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2093 if !self.predicates.is_empty() {
2094 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2095 }
2096
2097 #[cfg(feature = "dynamic_group_by")]
2098 let options = GroupbyOptions {
2099 dynamic: self.dynamic_options,
2100 rolling: self.rolling_options,
2101 slice: None,
2102 };
2103
2104 #[cfg(not(feature = "dynamic_group_by"))]
2105 let options = GroupbyOptions { slice: None };
2106
2107 let lp = DslPlan::GroupBy {
2108 input: Arc::new(self.logical_plan),
2109 keys: self.keys,
2110 predicates: vec![],
2111 aggs: vec![],
2112 apply: Some((f, schema)),
2113 maintain_order: self.maintain_order,
2114 options: Arc::new(options),
2115 };
2116 LazyFrame::from_logical_plan(lp, self.opt_state)
2117 }
2118}
2119
2120#[must_use]
2121pub struct JoinBuilder {
2122 lf: LazyFrame,
2123 how: JoinType,
2124 other: Option<LazyFrame>,
2125 left_on: Vec<Expr>,
2126 right_on: Vec<Expr>,
2127 allow_parallel: bool,
2128 force_parallel: bool,
2129 suffix: Option<PlSmallStr>,
2130 validation: JoinValidation,
2131 nulls_equal: bool,
2132 coalesce: JoinCoalesce,
2133 maintain_order: MaintainOrderJoin,
2134 build_side: Option<JoinBuildSide>,
2135}
2136impl JoinBuilder {
2137 pub fn new(lf: LazyFrame) -> Self {
2139 Self {
2140 lf,
2141 other: None,
2142 how: JoinType::Inner,
2143 left_on: vec![],
2144 right_on: vec![],
2145 allow_parallel: true,
2146 force_parallel: false,
2147 suffix: None,
2148 validation: Default::default(),
2149 nulls_equal: false,
2150 coalesce: Default::default(),
2151 maintain_order: Default::default(),
2152 build_side: None,
2153 }
2154 }
2155
2156 pub fn with(mut self, other: LazyFrame) -> Self {
2158 self.other = Some(other);
2159 self
2160 }
2161
2162 pub fn how(mut self, how: JoinType) -> Self {
2164 self.how = how;
2165 self
2166 }
2167
2168 pub fn validate(mut self, validation: JoinValidation) -> Self {
2169 self.validation = validation;
2170 self
2171 }
2172
2173 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2177 let on = on.as_ref().to_vec();
2178 self.left_on.clone_from(&on);
2179 self.right_on = on;
2180 self
2181 }
2182
2183 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2187 self.left_on = on.as_ref().to_vec();
2188 self
2189 }
2190
2191 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2195 self.right_on = on.as_ref().to_vec();
2196 self
2197 }
2198
2199 pub fn allow_parallel(mut self, allow: bool) -> Self {
2201 self.allow_parallel = allow;
2202 self
2203 }
2204
2205 pub fn force_parallel(mut self, force: bool) -> Self {
2207 self.force_parallel = force;
2208 self
2209 }
2210
2211 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2213 self.nulls_equal = nulls_equal;
2214 self
2215 }
2216
2217 pub fn suffix<S>(mut self, suffix: S) -> Self
2220 where
2221 S: Into<PlSmallStr>,
2222 {
2223 self.suffix = Some(suffix.into());
2224 self
2225 }
2226
2227 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2229 self.coalesce = coalesce;
2230 self
2231 }
2232
2233 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2235 self.maintain_order = maintain_order;
2236 self
2237 }
2238
2239 pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2241 self.build_side = build_side;
2242 self
2243 }
2244
2245 pub fn finish(self) -> LazyFrame {
2247 let opt_state = self.lf.opt_state;
2248 let other = self.other.expect("'with' not set in join builder");
2249
2250 let args = JoinArgs {
2251 how: self.how,
2252 validation: self.validation,
2253 suffix: self.suffix,
2254 slice: None,
2255 nulls_equal: self.nulls_equal,
2256 coalesce: self.coalesce,
2257 maintain_order: self.maintain_order,
2258 build_side: self.build_side,
2259 };
2260
2261 let lp = self
2262 .lf
2263 .get_plan_builder()
2264 .join(
2265 other.logical_plan,
2266 self.left_on,
2267 self.right_on,
2268 JoinOptions {
2269 allow_parallel: self.allow_parallel,
2270 force_parallel: self.force_parallel,
2271 args,
2272 }
2273 .into(),
2274 )
2275 .build();
2276 LazyFrame::from_logical_plan(lp, opt_state)
2277 }
2278
2279 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2281 let opt_state = self.lf.opt_state;
2282 let other = self.other.expect("with not set");
2283
2284 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2286 if let Expr::BinaryExpr {
2287 op: Operator::And,
2288 left,
2289 right,
2290 } = predicate
2291 {
2292 decompose_and((*left).clone(), expanded_predicates);
2293 decompose_and((*right).clone(), expanded_predicates);
2294 } else {
2295 expanded_predicates.push(predicate);
2296 }
2297 }
2298 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2299 for predicate in predicates {
2300 decompose_and(predicate, &mut expanded_predicates);
2301 }
2302 let predicates: Vec<Expr> = expanded_predicates;
2303
2304 #[cfg(feature = "is_between")]
2306 let predicates: Vec<Expr> = {
2307 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2308 for predicate in predicates {
2309 if let Expr::Function {
2310 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2311 input,
2312 ..
2313 } = &predicate
2314 {
2315 if let [expr, lower, upper] = input.as_slice() {
2316 match closed {
2317 ClosedInterval::Both => {
2318 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2319 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2320 },
2321 ClosedInterval::Right => {
2322 expanded_predicates.push(expr.clone().gt(lower.clone()));
2323 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2324 },
2325 ClosedInterval::Left => {
2326 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2327 expanded_predicates.push(expr.clone().lt(upper.clone()));
2328 },
2329 ClosedInterval::None => {
2330 expanded_predicates.push(expr.clone().gt(lower.clone()));
2331 expanded_predicates.push(expr.clone().lt(upper.clone()));
2332 },
2333 }
2334 continue;
2335 }
2336 }
2337 expanded_predicates.push(predicate);
2338 }
2339 expanded_predicates
2340 };
2341
2342 let args = JoinArgs {
2343 how: self.how,
2344 validation: self.validation,
2345 suffix: self.suffix,
2346 slice: None,
2347 nulls_equal: self.nulls_equal,
2348 coalesce: self.coalesce,
2349 maintain_order: self.maintain_order,
2350 build_side: self.build_side,
2351 };
2352 let options = JoinOptions {
2353 allow_parallel: self.allow_parallel,
2354 force_parallel: self.force_parallel,
2355 args,
2356 };
2357
2358 let lp = DslPlan::Join {
2359 input_left: Arc::new(self.lf.logical_plan),
2360 input_right: Arc::new(other.logical_plan),
2361 left_on: Default::default(),
2362 right_on: Default::default(),
2363 predicates,
2364 options: Arc::from(options),
2365 };
2366
2367 LazyFrame::from_logical_plan(lp, opt_state)
2368 }
2369}
2370
2371pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2372 #[cfg(not(feature = "new_streaming"))]
2373 {
2374 None
2375 }
2376 #[cfg(feature = "new_streaming")]
2377 {
2378 Some(polars_stream::build_streaming_query_executor)
2379 }
2380};
2381
2382pub struct CollectBatches {
2383 recv: Receiver<PolarsResult<DataFrame>>,
2384 runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2385}
2386
2387impl CollectBatches {
2388 pub fn start(&mut self) {
2390 if let Some(runner) = self.runner.take() {
2391 runner()
2392 }
2393 }
2394}
2395
2396impl Iterator for CollectBatches {
2397 type Item = PolarsResult<DataFrame>;
2398
2399 fn next(&mut self) -> Option<Self::Item> {
2400 self.start();
2401 self.recv.recv().ok()
2402 }
2403}