1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::error::feature_gated;
26#[cfg(feature = "pivot")]
27use polars_core::frame::PivotColumnNaming;
28use polars_core::prelude::*;
29use polars_core::query_result::QueryResult;
30use polars_io::RowIndex;
31use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38
39use crate::frame::cached_arenas::CachedArena;
40use crate::prelude::*;
41
42pub trait IntoLazy {
43 fn lazy(self) -> LazyFrame;
44}
45
46impl IntoLazy for DataFrame {
47 fn lazy(self) -> LazyFrame {
49 let lp = DslBuilder::from_existing_df(self).build();
50 LazyFrame {
51 logical_plan: lp,
52 opt_state: Default::default(),
53 cached_arena: Default::default(),
54 }
55 }
56}
57
58impl IntoLazy for LazyFrame {
59 fn lazy(self) -> LazyFrame {
60 self
61 }
62}
63
64#[derive(Clone, Default)]
69#[must_use]
70pub struct LazyFrame {
71 pub logical_plan: DslPlan,
72 pub(crate) opt_state: OptFlags,
73 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
74}
75
76impl From<DslPlan> for LazyFrame {
77 fn from(plan: DslPlan) -> Self {
78 Self {
79 logical_plan: plan,
80 opt_state: OptFlags::default(),
81 cached_arena: Default::default(),
82 }
83 }
84}
85
86impl LazyFrame {
87 pub(crate) fn from_inner(
88 logical_plan: DslPlan,
89 opt_state: OptFlags,
90 cached_arena: Arc<Mutex<Option<CachedArena>>>,
91 ) -> Self {
92 Self {
93 logical_plan,
94 opt_state,
95 cached_arena,
96 }
97 }
98
99 pub(crate) fn get_plan_builder(self) -> DslBuilder {
100 DslBuilder::from(self.logical_plan)
101 }
102
103 fn get_opt_state(&self) -> OptFlags {
104 self.opt_state
105 }
106
107 pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
108 LazyFrame {
109 logical_plan,
110 opt_state,
111 cached_arena: Default::default(),
112 }
113 }
114
115 pub fn get_current_optimizations(&self) -> OptFlags {
117 self.opt_state
118 }
119
120 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
122 self.opt_state = opt_state;
123 self
124 }
125
126 pub fn without_optimizations(self) -> Self {
128 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
129 }
130
131 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
133 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
134 self
135 }
136
137 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
139 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
140 self
141 }
142
143 pub fn with_check_order(mut self, toggle: bool) -> Self {
146 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
147 self
148 }
149
150 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
152 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
153 self
154 }
155
156 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
158 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
159 self
160 }
161
162 pub fn with_type_check(mut self, toggle: bool) -> Self {
164 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
165 self
166 }
167
168 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
170 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
171 self
172 }
173
174 #[cfg(feature = "cse")]
176 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
177 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
178 self
179 }
180
181 #[cfg(feature = "cse")]
183 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
184 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
185 self
186 }
187
188 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
190 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
191 self
192 }
193
194 #[cfg(feature = "streaming")]
195 pub fn with_streaming(mut self, toggle: bool) -> Self {
196 self.opt_state.set(OptFlags::STREAMING, toggle);
197 self
198 }
199
200 pub fn with_gpu(mut self, toggle: bool) -> Self {
201 self.opt_state.set(OptFlags::GPU, toggle);
202 self
203 }
204
205 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
207 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
208 self
209 }
210
211 pub fn _with_eager(mut self, toggle: bool) -> Self {
213 self.opt_state.set(OptFlags::EAGER, toggle);
214 self
215 }
216
217 pub fn describe_plan(&self) -> PolarsResult<String> {
219 Ok(self.clone().to_alp()?.describe())
220 }
221
222 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
224 Ok(self.clone().to_alp()?.describe_tree_format())
225 }
226
227 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
231 Ok(self.clone().to_alp_optimized()?.describe())
232 }
233
234 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
238 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
239 }
240
241 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
246 if optimized {
247 self.describe_optimized_plan()
248 } else {
249 self.describe_plan()
250 }
251 }
252
253 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
293 let opt_state = self.get_opt_state();
294 let lp = self
295 .get_plan_builder()
296 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
297 .build();
298 Self::from_logical_plan(lp, opt_state)
299 }
300
301 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
321 self,
322 by_exprs: E,
323 sort_options: SortMultipleOptions,
324 ) -> Self {
325 let by_exprs = by_exprs.as_ref().to_vec();
326 if by_exprs.is_empty() {
327 self
328 } else {
329 let opt_state = self.get_opt_state();
330 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
331 Self::from_logical_plan(lp, opt_state)
332 }
333 }
334
335 pub fn top_k<E: AsRef<[Expr]>>(
336 self,
337 k: IdxSize,
338 by_exprs: E,
339 sort_options: SortMultipleOptions,
340 ) -> Self {
341 self.sort_by_exprs(
343 by_exprs,
344 sort_options.with_order_reversed().with_nulls_last(true),
345 )
346 .slice(0, k)
347 }
348
349 pub fn bottom_k<E: AsRef<[Expr]>>(
350 self,
351 k: IdxSize,
352 by_exprs: E,
353 sort_options: SortMultipleOptions,
354 ) -> Self {
355 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
357 .slice(0, k)
358 }
359
360 pub fn reverse(self) -> Self {
376 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
377 }
378
379 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
387 where
388 I: IntoIterator<Item = T>,
389 J: IntoIterator<Item = S>,
390 T: AsRef<str>,
391 S: AsRef<str>,
392 {
393 let iter = existing.into_iter();
394 let cap = iter.size_hint().0;
395 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
396 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
397
398 for (existing, new) in iter.zip(new) {
401 let existing = existing.as_ref();
402 let new = new.as_ref();
403 if new != existing {
404 existing_vec.push(existing.into());
405 new_vec.push(new.into());
406 }
407 }
408
409 self.map_private(DslFunction::Rename {
410 existing: existing_vec.into(),
411 new: new_vec.into(),
412 strict,
413 })
414 }
415
416 pub fn drop(self, columns: Selector) -> Self {
423 let opt_state = self.get_opt_state();
424 let lp = self.get_plan_builder().drop(columns).build();
425 Self::from_logical_plan(lp, opt_state)
426 }
427
428 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
433 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
434 }
435
436 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
441 self.select(vec![
442 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
443 ])
444 }
445
446 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448 let opt_state = self.get_opt_state();
449 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
450 Self::from_logical_plan(lp, opt_state)
451 }
452
453 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
455 let opt_state = self.get_opt_state();
456 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
457 Self::from_logical_plan(lp, opt_state)
458 }
459
460 pub fn cache(self) -> Self {
464 let opt_state = self.get_opt_state();
465 let lp = self.get_plan_builder().cache().build();
466 Self::from_logical_plan(lp, opt_state)
467 }
468
469 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
471 let cast_cols: Vec<Expr> = dtypes
472 .into_iter()
473 .map(|(name, dt)| {
474 let name = PlSmallStr::from_str(name);
475
476 if strict {
477 col(name).strict_cast(dt)
478 } else {
479 col(name).cast(dt)
480 }
481 })
482 .collect();
483
484 if cast_cols.is_empty() {
485 self
486 } else {
487 self.with_columns(cast_cols)
488 }
489 }
490
491 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
493 self.with_columns(vec![if strict {
494 col(PlSmallStr::from_static("*")).strict_cast(dtype)
495 } else {
496 col(PlSmallStr::from_static("*")).cast(dtype)
497 }])
498 }
499
500 pub fn optimize(
501 self,
502 lp_arena: &mut Arena<IR>,
503 expr_arena: &mut Arena<AExpr>,
504 ) -> PolarsResult<Node> {
505 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
506 }
507
508 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
509 let (mut lp_arena, mut expr_arena) = self.get_arenas();
510 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
511
512 Ok(IRPlan::new(node, lp_arena, expr_arena))
513 }
514
515 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
516 let (mut lp_arena, mut expr_arena) = self.get_arenas();
517 let node = to_alp(
518 self.logical_plan,
519 &mut expr_arena,
520 &mut lp_arena,
521 &mut self.opt_state,
522 )?;
523 let plan = IRPlan::new(node, lp_arena, expr_arena);
524 Ok(plan)
525 }
526
527 pub(crate) fn optimize_with_scratch(
528 self,
529 lp_arena: &mut Arena<IR>,
530 expr_arena: &mut Arena<AExpr>,
531 scratch: &mut Vec<Node>,
532 ) -> PolarsResult<Node> {
533 let lp_top = optimize(
534 self.logical_plan,
535 self.opt_state,
536 lp_arena,
537 expr_arena,
538 scratch,
539 apply_scan_predicate_to_scan_ir,
540 )?;
541
542 Ok(lp_top)
543 }
544
545 fn prepare_collect_post_opt<P>(
546 mut self,
547 check_sink: bool,
548 query_start: Option<std::time::Instant>,
549 post_opt: P,
550 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
551 where
552 P: FnOnce(
553 Node,
554 &mut Arena<IR>,
555 &mut Arena<AExpr>,
556 Option<std::time::Duration>,
557 ) -> PolarsResult<()>,
558 {
559 let (mut lp_arena, mut expr_arena) = self.get_arenas();
560
561 let mut scratch = vec![];
562 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
563
564 post_opt(
565 lp_top,
566 &mut lp_arena,
567 &mut expr_arena,
568 query_start.map(|s| s.elapsed()),
571 )?;
572
573 let no_file_sink = if check_sink {
575 !matches!(
576 lp_arena.get(lp_top),
577 IR::Sink {
578 payload: SinkTypeIR::File { .. },
579 ..
580 }
581 )
582 } else {
583 true
584 };
585 let physical_plan = create_physical_plan(
586 lp_top,
587 &mut lp_arena,
588 &mut expr_arena,
589 BUILD_STREAMING_EXECUTOR,
590 )?;
591
592 let state = ExecutionState::new();
593 Ok((state, physical_plan, no_file_sink))
594 }
595
596 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
598 where
599 P: FnOnce(
600 Node,
601 &mut Arena<IR>,
602 &mut Arena<AExpr>,
603 Option<std::time::Duration>,
604 ) -> PolarsResult<()>,
605 {
606 let (mut state, mut physical_plan, _) =
607 self.prepare_collect_post_opt(false, None, post_opt)?;
608 physical_plan.execute(&mut state)
609 }
610
611 #[allow(unused_mut)]
612 fn prepare_collect(
613 self,
614 check_sink: bool,
615 query_start: Option<std::time::Instant>,
616 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
617 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
618 }
619
620 pub fn collect_with_engine(mut self, engine: Engine) -> PolarsResult<QueryResult> {
625 let engine = match engine {
626 Engine::Streaming => Engine::Streaming,
627 _ if std::env::var("POLARS_FORCE_STREAMING").as_deref() == Ok("1") => Engine::Streaming,
628 Engine::Auto => Engine::InMemory,
629 v => v,
630 };
631
632 if engine != Engine::Streaming
633 && std::env::var("POLARS_AUTO_STREAMING").as_deref() == Ok("1")
634 {
635 feature_gated!("streaming", {
636 if let Some(r) = self.clone()._collect_with_streaming_suppress_todo_panic() {
637 return r;
638 }
639 })
640 }
641 match engine {
642 Engine::Streaming => {
643 feature_gated!("streaming", self = self.with_streaming(true))
644 },
645 Engine::Gpu => self = self.with_gpu(true),
646 _ => (),
647 }
648
649 let mut ir_plan = self.to_alp_optimized()?;
650
651 ir_plan.ensure_root_node_is_sink();
652
653 match engine {
654 Engine::Streaming => feature_gated!("streaming", {
655 polars_stream::run_query(
656 ir_plan.lp_top,
657 &mut ir_plan.lp_arena,
658 &mut ir_plan.expr_arena,
659 )
660 }),
661 Engine::InMemory | Engine::Gpu => {
662 if let IR::SinkMultiple { inputs } = ir_plan.root() {
663 polars_ensure!(
664 engine != Engine::Gpu,
665 InvalidOperation:
666 "collect_all is not supported for the gpu engine"
667 );
668
669 return create_multiple_physical_plans(
670 inputs.clone().as_slice(),
671 &mut ir_plan.lp_arena,
672 &mut ir_plan.expr_arena,
673 BUILD_STREAMING_EXECUTOR,
674 )?
675 .execute()
676 .map(QueryResult::Multiple);
677 }
678
679 let mut physical_plan = create_physical_plan(
680 ir_plan.lp_top,
681 &mut ir_plan.lp_arena,
682 &mut ir_plan.expr_arena,
683 BUILD_STREAMING_EXECUTOR,
684 )?;
685 let mut state = ExecutionState::new();
686 physical_plan.execute(&mut state).map(QueryResult::Single)
687 },
688 Engine::Auto => unreachable!(),
689 }
690 }
691
692 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
693 let sink_multiple = LazyFrame {
694 logical_plan: DslPlan::SinkMultiple { inputs: plans },
695 opt_state,
696 cached_arena: Default::default(),
697 };
698 sink_multiple.explain(true)
699 }
700
701 pub fn collect_all_with_engine(
702 plans: Vec<DslPlan>,
703 engine: Engine,
704 opt_state: OptFlags,
705 ) -> PolarsResult<Vec<DataFrame>> {
706 if plans.is_empty() {
707 return Ok(Vec::new());
708 }
709
710 LazyFrame {
711 logical_plan: DslPlan::SinkMultiple { inputs: plans },
712 opt_state,
713 cached_arena: Default::default(),
714 }
715 .collect_with_engine(engine)
716 .map(|r| r.unwrap_multiple())
717 }
718
719 pub fn collect(self) -> PolarsResult<DataFrame> {
737 self.collect_with_engine(Engine::Auto).map(|r| match r {
738 QueryResult::Single(df) => df,
739 QueryResult::Multiple(_) => DataFrame::empty(),
741 })
742 }
743
744 #[cfg(feature = "async")]
749 pub fn collect_batches(
750 self,
751 engine: Engine,
752 maintain_order: bool,
753 chunk_size: Option<NonZeroUsize>,
754 lazy: bool,
755 ) -> PolarsResult<CollectBatches> {
756 let (send, recv) = sync_channel(1);
757 let runner_send = send.clone();
758 let ldf = self.sink_batches(
759 PlanCallback::new(move |df| {
760 let send_result = send.send(Ok(df));
762 Ok(send_result.is_err())
763 }),
764 maintain_order,
765 chunk_size,
766 )?;
767 let runner = move || {
768 polars_core::runtime::ASYNC.spawn_blocking(move || {
770 if let Err(e) = ldf.collect_with_engine(engine) {
771 runner_send.send(Err(e)).ok();
772 }
773 });
774 };
775
776 let mut collect_batches = CollectBatches {
777 recv,
778 runner: Some(Box::new(runner)),
779 };
780 if !lazy {
781 collect_batches.start();
782 }
783 Ok(collect_batches)
784 }
785
786 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
789 where
790 P: FnOnce(
791 Node,
792 &mut Arena<IR>,
793 &mut Arena<AExpr>,
794 Option<std::time::Duration>,
795 ) -> PolarsResult<()>,
796 {
797 let query_start = std::time::Instant::now();
798 let (mut state, mut physical_plan, _) =
799 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
800 state.time_nodes(query_start);
801 let out = physical_plan.execute(&mut state)?;
802 let timer_df = state.finish_timer()?;
803 Ok((out, timer_df))
804 }
805
806 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
814 self._profile_post_opt(|_, _, _, _| Ok(()))
815 }
816
817 pub fn sink_batches(
818 mut self,
819 function: PlanCallback<DataFrame, bool>,
820 maintain_order: bool,
821 chunk_size: Option<NonZeroUsize>,
822 ) -> PolarsResult<Self> {
823 use polars_plan::prelude::sink::CallbackSinkType;
824
825 polars_ensure!(
826 !matches!(self.logical_plan, DslPlan::Sink { .. }),
827 InvalidOperation: "cannot create a sink on top of another sink"
828 );
829
830 self.logical_plan = DslPlan::Sink {
831 input: Arc::new(self.logical_plan),
832 payload: SinkType::Callback(CallbackSinkType {
833 function,
834 maintain_order,
835 chunk_size,
836 }),
837 };
838
839 Ok(self)
840 }
841
842 #[cfg(feature = "streaming")]
844 fn _collect_with_streaming_suppress_todo_panic(
845 mut self,
846 ) -> Option<PolarsResult<polars_core::query_result::QueryResult>> {
847 self.opt_state |= OptFlags::STREAMING;
848 let mut ir_plan = match self.to_alp_optimized() {
849 Ok(v) => v,
850 Err(e) => return Some(Err(e)),
851 };
852
853 ir_plan.ensure_root_node_is_sink();
854
855 let f = || {
856 polars_stream::run_query(
857 ir_plan.lp_top,
858 &mut ir_plan.lp_arena,
859 &mut ir_plan.expr_arena,
860 )
861 };
862
863 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
864 Ok(v) => Some(v),
865 Err(e) => {
866 if e.downcast_ref::<&str>()
869 .is_some_and(|s| s.starts_with("not yet implemented"))
870 {
871 if polars_core::config::verbose() {
872 eprintln!(
873 "caught unimplemented error in new streaming engine, falling back to normal engine"
874 );
875 }
876 None
877 } else {
878 std::panic::resume_unwind(e)
879 }
880 },
881 }
882 }
883
884 pub fn sink(
885 mut self,
886 sink_type: SinkDestination,
887 file_format: FileWriteFormat,
888 unified_sink_args: UnifiedSinkArgs,
889 ) -> PolarsResult<Self> {
890 polars_ensure!(
891 !matches!(self.logical_plan, DslPlan::Sink { .. }),
892 InvalidOperation: "cannot create a sink on top of another sink"
893 );
894
895 self.logical_plan = DslPlan::Sink {
896 input: Arc::new(self.logical_plan),
897 payload: match sink_type {
898 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
899 target,
900 file_format,
901 unified_sink_args,
902 }),
903 SinkDestination::Partitioned {
904 base_path,
905 file_path_provider,
906 partition_strategy,
907 max_rows_per_file,
908 approximate_bytes_per_file,
909 } => SinkType::Partitioned(PartitionedSinkOptions {
910 base_path,
911 file_path_provider,
912 partition_strategy,
913 file_format,
914 unified_sink_args,
915 max_rows_per_file,
916 approximate_bytes_per_file,
917 }),
918 },
919 };
920 Ok(self)
921 }
922
923 pub fn filter(self, predicate: Expr) -> Self {
941 let opt_state = self.get_opt_state();
942 let lp = self.get_plan_builder().filter(predicate).build();
943 Self::from_logical_plan(lp, opt_state)
944 }
945
946 pub fn remove(self, predicate: Expr) -> Self {
964 self.filter(predicate.neq_missing(lit(true)))
965 }
966
967 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
993 let exprs = exprs.as_ref().to_vec();
994 self.select_impl(
995 exprs,
996 ProjectionOptions {
997 run_parallel: true,
998 duplicate_check: true,
999 should_broadcast: true,
1000 },
1001 )
1002 }
1003
1004 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1005 let exprs = exprs.as_ref().to_vec();
1006 self.select_impl(
1007 exprs,
1008 ProjectionOptions {
1009 run_parallel: false,
1010 duplicate_check: true,
1011 should_broadcast: true,
1012 },
1013 )
1014 }
1015
1016 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1017 let opt_state = self.get_opt_state();
1018 let lp = self.get_plan_builder().project(exprs, options).build();
1019 Self::from_logical_plan(lp, opt_state)
1020 }
1021
1022 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1043 let keys = by
1044 .as_ref()
1045 .iter()
1046 .map(|e| e.clone().into())
1047 .collect::<Vec<_>>();
1048 let opt_state = self.get_opt_state();
1049
1050 #[cfg(feature = "dynamic_group_by")]
1051 {
1052 LazyGroupBy {
1053 logical_plan: self.logical_plan,
1054 opt_state,
1055 keys,
1056 predicates: vec![],
1057 maintain_order: false,
1058 dynamic_options: None,
1059 rolling_options: None,
1060 }
1061 }
1062
1063 #[cfg(not(feature = "dynamic_group_by"))]
1064 {
1065 LazyGroupBy {
1066 logical_plan: self.logical_plan,
1067 opt_state,
1068 keys,
1069 predicates: vec![],
1070 maintain_order: false,
1071 }
1072 }
1073 }
1074
1075 #[cfg(feature = "dynamic_group_by")]
1083 pub fn rolling<E: AsRef<[Expr]>>(
1084 mut self,
1085 index_column: Expr,
1086 group_by: E,
1087 mut options: RollingGroupOptions,
1088 ) -> LazyGroupBy {
1089 if let Expr::Column(name) = index_column {
1090 options.index_column = name;
1091 } else {
1092 let output_field = index_column
1093 .to_field(&self.collect_schema().unwrap())
1094 .unwrap();
1095 return self.with_column(index_column).rolling(
1096 Expr::Column(output_field.name().clone()),
1097 group_by,
1098 options,
1099 );
1100 }
1101 let opt_state = self.get_opt_state();
1102 LazyGroupBy {
1103 logical_plan: self.logical_plan,
1104 opt_state,
1105 predicates: vec![],
1106 keys: group_by.as_ref().to_vec(),
1107 maintain_order: true,
1108 dynamic_options: None,
1109 rolling_options: Some(options),
1110 }
1111 }
1112
1113 #[cfg(feature = "dynamic_group_by")]
1129 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1130 mut self,
1131 index_column: Expr,
1132 group_by: E,
1133 mut options: DynamicGroupOptions,
1134 ) -> LazyGroupBy {
1135 if let Expr::Column(name) = index_column {
1136 options.index_column = name;
1137 } else {
1138 let output_field = index_column
1139 .to_field(&self.collect_schema().unwrap())
1140 .unwrap();
1141 return self.with_column(index_column).group_by_dynamic(
1142 Expr::Column(output_field.name().clone()),
1143 group_by,
1144 options,
1145 );
1146 }
1147 let opt_state = self.get_opt_state();
1148 LazyGroupBy {
1149 logical_plan: self.logical_plan,
1150 opt_state,
1151 predicates: vec![],
1152 keys: group_by.as_ref().to_vec(),
1153 maintain_order: true,
1154 dynamic_options: Some(options),
1155 rolling_options: None,
1156 }
1157 }
1158
1159 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1161 let keys = by
1162 .as_ref()
1163 .iter()
1164 .map(|e| e.clone().into())
1165 .collect::<Vec<_>>();
1166 let opt_state = self.get_opt_state();
1167
1168 #[cfg(feature = "dynamic_group_by")]
1169 {
1170 LazyGroupBy {
1171 logical_plan: self.logical_plan,
1172 opt_state,
1173 keys,
1174 predicates: vec![],
1175 maintain_order: true,
1176 dynamic_options: None,
1177 rolling_options: None,
1178 }
1179 }
1180
1181 #[cfg(not(feature = "dynamic_group_by"))]
1182 {
1183 LazyGroupBy {
1184 logical_plan: self.logical_plan,
1185 opt_state,
1186 keys,
1187 predicates: vec![],
1188 maintain_order: true,
1189 }
1190 }
1191 }
1192
1193 #[cfg(feature = "semi_anti_join")]
1210 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1211 self.join(
1212 other,
1213 [left_on.into()],
1214 [right_on.into()],
1215 JoinArgs::new(JoinType::Anti),
1216 )
1217 }
1218
1219 #[cfg(feature = "cross_join")]
1221 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1222 self.join(
1223 other,
1224 vec![],
1225 vec![],
1226 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1227 )
1228 }
1229
1230 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1247 self.join(
1248 other,
1249 [left_on.into()],
1250 [right_on.into()],
1251 JoinArgs::new(JoinType::Left),
1252 )
1253 }
1254
1255 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1272 self.join(
1273 other,
1274 [left_on.into()],
1275 [right_on.into()],
1276 JoinArgs::new(JoinType::Inner),
1277 )
1278 }
1279
1280 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1297 self.join(
1298 other,
1299 [left_on.into()],
1300 [right_on.into()],
1301 JoinArgs::new(JoinType::Full),
1302 )
1303 }
1304
1305 #[cfg(feature = "semi_anti_join")]
1322 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1323 self.join(
1324 other,
1325 [left_on.into()],
1326 [right_on.into()],
1327 JoinArgs::new(JoinType::Semi),
1328 )
1329 }
1330
1331 pub fn join<E: AsRef<[Expr]>>(
1353 self,
1354 other: LazyFrame,
1355 left_on: E,
1356 right_on: E,
1357 args: JoinArgs,
1358 ) -> LazyFrame {
1359 let left_on = left_on.as_ref().to_vec();
1360 let right_on = right_on.as_ref().to_vec();
1361
1362 self._join_impl(other, left_on, right_on, args)
1363 }
1364
1365 fn _join_impl(
1366 self,
1367 other: LazyFrame,
1368 left_on: Vec<Expr>,
1369 right_on: Vec<Expr>,
1370 args: JoinArgs,
1371 ) -> LazyFrame {
1372 let JoinArgs {
1373 how,
1374 validation,
1375 suffix,
1376 slice,
1377 nulls_equal,
1378 coalesce,
1379 maintain_order,
1380 build_side,
1381 } = args;
1382
1383 if slice.is_some() {
1384 panic!("impl error: slice is not handled")
1385 }
1386
1387 let mut builder = self
1388 .join_builder()
1389 .with(other)
1390 .left_on(left_on)
1391 .right_on(right_on)
1392 .how(how)
1393 .validate(validation)
1394 .join_nulls(nulls_equal)
1395 .coalesce(coalesce)
1396 .maintain_order(maintain_order)
1397 .build_side(build_side);
1398
1399 if let Some(suffix) = suffix {
1400 builder = builder.suffix(suffix);
1401 }
1402
1403 builder.finish()
1405 }
1406
1407 pub fn join_builder(self) -> JoinBuilder {
1413 JoinBuilder::new(self)
1414 }
1415
1416 pub fn gather(self, idxs: LazyFrame, null_on_oob: bool) -> LazyFrame {
1420 let opt_state = self.get_opt_state();
1421 let lp = self
1422 .get_plan_builder()
1423 .gather(idxs.logical_plan, null_on_oob)
1424 .build();
1425 Self::from_logical_plan(lp, opt_state)
1426 }
1427
1428 pub fn with_column(self, expr: Expr) -> LazyFrame {
1446 let opt_state = self.get_opt_state();
1447 let lp = self
1448 .get_plan_builder()
1449 .with_columns(
1450 vec![expr],
1451 ProjectionOptions {
1452 run_parallel: false,
1453 duplicate_check: true,
1454 should_broadcast: true,
1455 },
1456 )
1457 .build();
1458 Self::from_logical_plan(lp, opt_state)
1459 }
1460
1461 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1476 let exprs = exprs.as_ref().to_vec();
1477 self.with_columns_impl(
1478 exprs,
1479 ProjectionOptions {
1480 run_parallel: true,
1481 duplicate_check: true,
1482 should_broadcast: true,
1483 },
1484 )
1485 }
1486
1487 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1489 let exprs = exprs.as_ref().to_vec();
1490 self.with_columns_impl(
1491 exprs,
1492 ProjectionOptions {
1493 run_parallel: false,
1494 duplicate_check: true,
1495 should_broadcast: true,
1496 },
1497 )
1498 }
1499
1500 pub fn match_to_schema(
1502 self,
1503 schema: SchemaRef,
1504 per_column: Arc<[MatchToSchemaPerColumn]>,
1505 extra_columns: ExtraColumnsPolicy,
1506 ) -> LazyFrame {
1507 let opt_state = self.get_opt_state();
1508 let lp = self
1509 .get_plan_builder()
1510 .match_to_schema(schema, per_column, extra_columns)
1511 .build();
1512 Self::from_logical_plan(lp, opt_state)
1513 }
1514
1515 pub fn pipe_with_schema(
1516 self,
1517 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1518 ) -> Self {
1519 let opt_state = self.get_opt_state();
1520 let lp = self
1521 .get_plan_builder()
1522 .pipe_with_schema(vec![], callback)
1523 .build();
1524 Self::from_logical_plan(lp, opt_state)
1525 }
1526
1527 pub fn pipe_with_schemas(
1528 self,
1529 others: Vec<LazyFrame>,
1530 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1531 ) -> Self {
1532 let opt_state = self.get_opt_state();
1533 let lp = self
1534 .get_plan_builder()
1535 .pipe_with_schema(
1536 others.into_iter().map(|lf| lf.logical_plan).collect(),
1537 callback,
1538 )
1539 .build();
1540 Self::from_logical_plan(lp, opt_state)
1541 }
1542
1543 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1544 let opt_state = self.get_opt_state();
1545 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1546 Self::from_logical_plan(lp, opt_state)
1547 }
1548
1549 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1550 let contexts = contexts
1551 .as_ref()
1552 .iter()
1553 .map(|lf| lf.logical_plan.clone())
1554 .collect();
1555 let opt_state = self.get_opt_state();
1556 let lp = self.get_plan_builder().with_context(contexts).build();
1557 Self::from_logical_plan(lp, opt_state)
1558 }
1559
1560 pub fn max(self) -> Self {
1564 self.map_private(DslFunction::Stats(StatsFunction::Max))
1565 }
1566
1567 pub fn min(self) -> Self {
1571 self.map_private(DslFunction::Stats(StatsFunction::Min))
1572 }
1573
1574 pub fn sum(self) -> Self {
1584 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1585 }
1586
1587 pub fn mean(self) -> Self {
1592 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1593 }
1594
1595 pub fn median(self) -> Self {
1601 self.map_private(DslFunction::Stats(StatsFunction::Median))
1602 }
1603
1604 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1606 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1607 quantile,
1608 method,
1609 }))
1610 }
1611
1612 pub fn std(self, ddof: u8) -> Self {
1625 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1626 }
1627
1628 pub fn var(self, ddof: u8) -> Self {
1638 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1639 }
1640
1641 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1643 self.explode_impl(columns, options, false)
1644 }
1645
1646 fn explode_impl(
1648 self,
1649 columns: Selector,
1650 options: ExplodeOptions,
1651 allow_empty: bool,
1652 ) -> LazyFrame {
1653 let opt_state = self.get_opt_state();
1654 let lp = self
1655 .get_plan_builder()
1656 .explode(columns, options, allow_empty)
1657 .build();
1658 Self::from_logical_plan(lp, opt_state)
1659 }
1660
1661 pub fn null_count(self) -> LazyFrame {
1663 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1664 }
1665
1666 pub fn unique_stable(
1671 self,
1672 subset: Option<Selector>,
1673 keep_strategy: UniqueKeepStrategy,
1674 ) -> LazyFrame {
1675 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1676 self.unique_stable_generic(subset, keep_strategy)
1677 }
1678
1679 pub fn unique_stable_generic(
1680 self,
1681 subset: Option<Vec<Expr>>,
1682 keep_strategy: UniqueKeepStrategy,
1683 ) -> LazyFrame {
1684 let opt_state = self.get_opt_state();
1685 let options = DistinctOptionsDSL {
1686 subset,
1687 maintain_order: true,
1688 keep_strategy,
1689 };
1690 let lp = self.get_plan_builder().distinct(options).build();
1691 Self::from_logical_plan(lp, opt_state)
1692 }
1693
1694 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1702 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1703 self.unique_generic(subset, keep_strategy)
1704 }
1705
1706 pub fn unique_generic(
1707 self,
1708 subset: Option<Vec<Expr>>,
1709 keep_strategy: UniqueKeepStrategy,
1710 ) -> LazyFrame {
1711 let opt_state = self.get_opt_state();
1712 let options = DistinctOptionsDSL {
1713 subset,
1714 maintain_order: false,
1715 keep_strategy,
1716 };
1717 let lp = self.get_plan_builder().distinct(options).build();
1718 Self::from_logical_plan(lp, opt_state)
1719 }
1720
1721 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1726 let opt_state = self.get_opt_state();
1727 let lp = self.get_plan_builder().drop_nans(subset).build();
1728 Self::from_logical_plan(lp, opt_state)
1729 }
1730
1731 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1736 let opt_state = self.get_opt_state();
1737 let lp = self.get_plan_builder().drop_nulls(subset).build();
1738 Self::from_logical_plan(lp, opt_state)
1739 }
1740
1741 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1751 let opt_state = self.get_opt_state();
1752 let lp = self.get_plan_builder().slice(offset, len).build();
1753 Self::from_logical_plan(lp, opt_state)
1754 }
1755
1756 pub fn clear(self) -> LazyFrame {
1758 self.slice(0, 0)
1759 }
1760
1761 pub fn first(self) -> LazyFrame {
1765 self.slice(0, 1)
1766 }
1767
1768 pub fn last(self) -> LazyFrame {
1772 self.slice(-1, 1)
1773 }
1774
1775 pub fn tail(self, n: IdxSize) -> LazyFrame {
1779 let neg_tail = -(n as i64);
1780 self.slice(neg_tail, n)
1781 }
1782
1783 #[cfg(feature = "pivot")]
1784 #[expect(clippy::too_many_arguments)]
1785 pub fn pivot(
1786 self,
1787 on: Selector,
1788 on_columns: Arc<DataFrame>,
1789 index: Selector,
1790 values: Selector,
1791 agg: Expr,
1792 maintain_order: bool,
1793 separator: PlSmallStr,
1794 column_naming: PivotColumnNaming,
1795 ) -> LazyFrame {
1796 let opt_state = self.get_opt_state();
1797 let lp = self
1798 .get_plan_builder()
1799 .pivot(
1800 on,
1801 on_columns,
1802 index,
1803 values,
1804 agg,
1805 maintain_order,
1806 separator,
1807 column_naming,
1808 )
1809 .build();
1810 Self::from_logical_plan(lp, opt_state)
1811 }
1812
1813 #[cfg(feature = "pivot")]
1817 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1818 let opt_state = self.get_opt_state();
1819 let lp = self.get_plan_builder().unpivot(args).build();
1820 Self::from_logical_plan(lp, opt_state)
1821 }
1822
1823 pub fn limit(self, n: IdxSize) -> LazyFrame {
1825 self.slice(0, n)
1826 }
1827
1828 pub fn map<F>(
1842 self,
1843 function: F,
1844 optimizations: AllowedOptimizations,
1845 schema: Option<Arc<dyn UdfSchema>>,
1846 name: Option<&'static str>,
1847 ) -> LazyFrame
1848 where
1849 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1850 {
1851 let opt_state = self.get_opt_state();
1852 let lp = self
1853 .get_plan_builder()
1854 .map(
1855 function,
1856 optimizations,
1857 schema,
1858 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1859 )
1860 .build();
1861 Self::from_logical_plan(lp, opt_state)
1862 }
1863
1864 #[cfg(feature = "python")]
1865 pub fn map_python(
1866 self,
1867 function: polars_utils::python_function::PythonFunction,
1868 optimizations: AllowedOptimizations,
1869 schema: Option<SchemaRef>,
1870 validate_output: bool,
1871 ) -> LazyFrame {
1872 let opt_state = self.get_opt_state();
1873 let lp = self
1874 .get_plan_builder()
1875 .map_python(function, optimizations, schema, validate_output)
1876 .build();
1877 Self::from_logical_plan(lp, opt_state)
1878 }
1879
1880 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1881 let opt_state = self.get_opt_state();
1882 let lp = self.get_plan_builder().map_private(function).build();
1883 Self::from_logical_plan(lp, opt_state)
1884 }
1885
1886 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1895 where
1896 S: Into<PlSmallStr>,
1897 {
1898 let name = name.into();
1899
1900 match &self.logical_plan {
1901 v @ DslPlan::Scan {
1902 scan_type,
1903 unified_scan_args,
1904 ..
1905 } if unified_scan_args.row_index.is_none()
1906 && !matches!(
1907 &**scan_type,
1908 FileScanDsl::Anonymous { .. } | FileScanDsl::ExpandedPaths { .. }
1909 ) =>
1910 {
1911 let DslPlan::Scan {
1912 sources,
1913 mut unified_scan_args,
1914 scan_type,
1915 cached_ir: _,
1916 } = v.clone()
1917 else {
1918 unreachable!()
1919 };
1920
1921 unified_scan_args.row_index = Some(RowIndex {
1922 name,
1923 offset: offset.unwrap_or(0),
1924 });
1925
1926 DslPlan::Scan {
1927 sources,
1928 unified_scan_args,
1929 scan_type,
1930 cached_ir: Default::default(),
1931 }
1932 .into()
1933 },
1934 _ => self.map_private(DslFunction::RowIndex { name, offset }),
1935 }
1936 }
1937
1938 pub fn count(self) -> LazyFrame {
1940 self.select(vec![col(PlSmallStr::from_static("*")).count()])
1941 }
1942
1943 #[cfg(feature = "dtype-struct")]
1946 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1947 self.map_private(DslFunction::Unnest {
1948 columns: cols,
1949 separator,
1950 })
1951 }
1952
1953 #[cfg(feature = "merge_sorted")]
1954 pub fn merge_sorted<S>(
1955 self,
1956 other: LazyFrame,
1957 key: S,
1958 maintain_order: bool,
1959 ) -> PolarsResult<LazyFrame>
1960 where
1961 S: Into<PlSmallStr>,
1962 {
1963 let key = key.into();
1964
1965 let lp = DslPlan::MergeSorted {
1966 input_left: Arc::new(self.logical_plan),
1967 input_right: Arc::new(other.logical_plan),
1968 key,
1969 maintain_order,
1970 };
1971 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1972 }
1973
1974 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1975 let lp = DslPlan::MapFunction {
1976 input: Arc::new(self.logical_plan),
1977 function: DslFunction::Hint(hint),
1978 };
1979 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1980 }
1981}
1982
1983#[derive(Clone)]
1985pub struct LazyGroupBy {
1986 pub logical_plan: DslPlan,
1987 opt_state: OptFlags,
1988 keys: Vec<Expr>,
1989 predicates: Vec<Expr>,
1990 maintain_order: bool,
1991 #[cfg(feature = "dynamic_group_by")]
1992 dynamic_options: Option<DynamicGroupOptions>,
1993 #[cfg(feature = "dynamic_group_by")]
1994 rolling_options: Option<RollingGroupOptions>,
1995}
1996
1997impl From<LazyGroupBy> for LazyFrame {
1998 fn from(lgb: LazyGroupBy) -> Self {
1999 Self {
2000 logical_plan: lgb.logical_plan,
2001 opt_state: lgb.opt_state,
2002 cached_arena: Default::default(),
2003 }
2004 }
2005}
2006
2007impl LazyGroupBy {
2008 pub fn having(mut self, predicate: Expr) -> Self {
2029 self.predicates.push(predicate);
2030 self
2031 }
2032
2033 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2055 #[cfg(feature = "dynamic_group_by")]
2056 let lp = DslBuilder::from(self.logical_plan)
2057 .group_by(
2058 self.keys,
2059 self.predicates,
2060 aggs,
2061 None,
2062 self.maintain_order,
2063 self.dynamic_options,
2064 self.rolling_options,
2065 )
2066 .build();
2067
2068 #[cfg(not(feature = "dynamic_group_by"))]
2069 let lp = DslBuilder::from(self.logical_plan)
2070 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2071 .build();
2072 LazyFrame::from_logical_plan(lp, self.opt_state)
2073 }
2074
2075 pub fn head(self, n: Option<usize>) -> LazyFrame {
2077 let keys = self
2078 .keys
2079 .iter()
2080 .filter_map(|expr| expr_output_name(expr).ok())
2081 .collect::<Vec<_>>();
2082
2083 self.agg([all().as_expr().head(n)]).explode_impl(
2084 all() - by_name(keys.iter().cloned(), false, false),
2085 ExplodeOptions {
2086 empty_as_null: true,
2087 keep_nulls: true,
2088 },
2089 true,
2090 )
2091 }
2092
2093 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2095 let keys = self
2096 .keys
2097 .iter()
2098 .filter_map(|expr| expr_output_name(expr).ok())
2099 .collect::<Vec<_>>();
2100
2101 self.agg([all().as_expr().tail(n)]).explode_impl(
2102 all() - by_name(keys.iter().cloned(), false, false),
2103 ExplodeOptions {
2104 empty_as_null: true,
2105 keep_nulls: true,
2106 },
2107 true,
2108 )
2109 }
2110
2111 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2116 if !self.predicates.is_empty() {
2117 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2118 }
2119
2120 #[cfg(feature = "dynamic_group_by")]
2121 let options = GroupbyOptions {
2122 dynamic: self.dynamic_options,
2123 rolling: self.rolling_options,
2124 slice: None,
2125 };
2126
2127 #[cfg(not(feature = "dynamic_group_by"))]
2128 let options = GroupbyOptions { slice: None };
2129
2130 let lp = DslPlan::GroupBy {
2131 input: Arc::new(self.logical_plan),
2132 keys: self.keys,
2133 predicates: vec![],
2134 aggs: vec![],
2135 apply: Some((f, schema)),
2136 maintain_order: self.maintain_order,
2137 options: Arc::new(options),
2138 };
2139 LazyFrame::from_logical_plan(lp, self.opt_state)
2140 }
2141}
2142
2143#[must_use]
2144pub struct JoinBuilder {
2145 lf: LazyFrame,
2146 how: JoinType,
2147 other: Option<LazyFrame>,
2148 left_on: Vec<Expr>,
2149 right_on: Vec<Expr>,
2150 allow_parallel: bool,
2151 force_parallel: bool,
2152 suffix: Option<PlSmallStr>,
2153 validation: JoinValidation,
2154 nulls_equal: bool,
2155 coalesce: JoinCoalesce,
2156 maintain_order: MaintainOrderJoin,
2157 build_side: Option<JoinBuildSide>,
2158}
2159impl JoinBuilder {
2160 pub fn new(lf: LazyFrame) -> Self {
2162 Self {
2163 lf,
2164 other: None,
2165 how: JoinType::Inner,
2166 left_on: vec![],
2167 right_on: vec![],
2168 allow_parallel: true,
2169 force_parallel: false,
2170 suffix: None,
2171 validation: Default::default(),
2172 nulls_equal: false,
2173 coalesce: Default::default(),
2174 maintain_order: Default::default(),
2175 build_side: None,
2176 }
2177 }
2178
2179 pub fn with(mut self, other: LazyFrame) -> Self {
2181 self.other = Some(other);
2182 self
2183 }
2184
2185 pub fn how(mut self, how: JoinType) -> Self {
2187 self.how = how;
2188 self
2189 }
2190
2191 pub fn validate(mut self, validation: JoinValidation) -> Self {
2192 self.validation = validation;
2193 self
2194 }
2195
2196 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2200 let on = on.as_ref().to_vec();
2201 self.left_on.clone_from(&on);
2202 self.right_on = on;
2203 self
2204 }
2205
2206 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2210 self.left_on = on.as_ref().to_vec();
2211 self
2212 }
2213
2214 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2218 self.right_on = on.as_ref().to_vec();
2219 self
2220 }
2221
2222 pub fn allow_parallel(mut self, allow: bool) -> Self {
2224 self.allow_parallel = allow;
2225 self
2226 }
2227
2228 pub fn force_parallel(mut self, force: bool) -> Self {
2230 self.force_parallel = force;
2231 self
2232 }
2233
2234 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2236 self.nulls_equal = nulls_equal;
2237 self
2238 }
2239
2240 pub fn suffix<S>(mut self, suffix: S) -> Self
2243 where
2244 S: Into<PlSmallStr>,
2245 {
2246 self.suffix = Some(suffix.into());
2247 self
2248 }
2249
2250 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2252 self.coalesce = coalesce;
2253 self
2254 }
2255
2256 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2258 self.maintain_order = maintain_order;
2259 self
2260 }
2261
2262 pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2264 self.build_side = build_side;
2265 self
2266 }
2267
2268 pub fn finish(self) -> LazyFrame {
2270 let opt_state = self.lf.opt_state;
2271 let other = self.other.expect("'with' not set in join builder");
2272
2273 let args = JoinArgs {
2274 how: self.how,
2275 validation: self.validation,
2276 suffix: self.suffix,
2277 slice: None,
2278 nulls_equal: self.nulls_equal,
2279 coalesce: self.coalesce,
2280 maintain_order: self.maintain_order,
2281 build_side: self.build_side,
2282 };
2283
2284 let lp = self
2285 .lf
2286 .get_plan_builder()
2287 .join(
2288 other.logical_plan,
2289 self.left_on,
2290 self.right_on,
2291 JoinOptions {
2292 allow_parallel: self.allow_parallel,
2293 force_parallel: self.force_parallel,
2294 args,
2295 }
2296 .into(),
2297 )
2298 .build();
2299 LazyFrame::from_logical_plan(lp, opt_state)
2300 }
2301
2302 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2304 let opt_state = self.lf.opt_state;
2305 let other = self.other.expect("with not set");
2306
2307 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2309 if let Expr::BinaryExpr {
2310 op: Operator::And,
2311 left,
2312 right,
2313 } = predicate
2314 {
2315 decompose_and((*left).clone(), expanded_predicates);
2316 decompose_and((*right).clone(), expanded_predicates);
2317 } else {
2318 expanded_predicates.push(predicate);
2319 }
2320 }
2321 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2322 for predicate in predicates {
2323 decompose_and(predicate, &mut expanded_predicates);
2324 }
2325 let predicates: Vec<Expr> = expanded_predicates;
2326
2327 #[cfg(feature = "is_between")]
2329 let predicates: Vec<Expr> = {
2330 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2331 for predicate in predicates {
2332 if let Expr::Function {
2333 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2334 input,
2335 ..
2336 } = &predicate
2337 {
2338 if let [expr, lower, upper] = input.as_slice() {
2339 match closed {
2340 ClosedInterval::Both => {
2341 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2342 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2343 },
2344 ClosedInterval::Right => {
2345 expanded_predicates.push(expr.clone().gt(lower.clone()));
2346 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2347 },
2348 ClosedInterval::Left => {
2349 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2350 expanded_predicates.push(expr.clone().lt(upper.clone()));
2351 },
2352 ClosedInterval::None => {
2353 expanded_predicates.push(expr.clone().gt(lower.clone()));
2354 expanded_predicates.push(expr.clone().lt(upper.clone()));
2355 },
2356 }
2357 continue;
2358 }
2359 }
2360 expanded_predicates.push(predicate);
2361 }
2362 expanded_predicates
2363 };
2364
2365 let args = JoinArgs {
2366 how: self.how,
2367 validation: self.validation,
2368 suffix: self.suffix,
2369 slice: None,
2370 nulls_equal: self.nulls_equal,
2371 coalesce: self.coalesce,
2372 maintain_order: self.maintain_order,
2373 build_side: self.build_side,
2374 };
2375 let options = JoinOptions {
2376 allow_parallel: self.allow_parallel,
2377 force_parallel: self.force_parallel,
2378 args,
2379 };
2380
2381 let lp = DslPlan::Join {
2382 input_left: Arc::new(self.lf.logical_plan),
2383 input_right: Arc::new(other.logical_plan),
2384 left_on: Default::default(),
2385 right_on: Default::default(),
2386 predicates,
2387 options: Arc::from(options),
2388 };
2389
2390 LazyFrame::from_logical_plan(lp, opt_state)
2391 }
2392}
2393
2394pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2395 #[cfg(not(feature = "streaming"))]
2396 {
2397 None
2398 }
2399 #[cfg(feature = "streaming")]
2400 {
2401 Some(polars_stream::build_streaming_query_executor)
2402 }
2403};
2404
2405pub struct CollectBatches {
2406 recv: Receiver<PolarsResult<DataFrame>>,
2407 runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2408}
2409
2410impl CollectBatches {
2411 pub fn start(&mut self) {
2413 if let Some(runner) = self.runner.take() {
2414 runner()
2415 }
2416 }
2417}
2418
2419impl Iterator for CollectBatches {
2420 type Item = PolarsResult<DataFrame>;
2421
2422 fn next(&mut self) -> Option<Self::Item> {
2423 self.start();
2424 self.recv.recv().ok()
2425 }
2426}