1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::error::feature_gated;
26#[cfg(feature = "pivot")]
27use polars_core::frame::PivotColumnNaming;
28use polars_core::prelude::*;
29use polars_core::query_result::QueryResult;
30use polars_io::RowIndex;
31use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38
39use crate::frame::cached_arenas::CachedArena;
40use crate::prelude::*;
41
42pub trait IntoLazy {
43 fn lazy(self) -> LazyFrame;
44}
45
46impl IntoLazy for DataFrame {
47 fn lazy(self) -> LazyFrame {
49 let lp = DslBuilder::from_existing_df(self).build();
50 LazyFrame {
51 logical_plan: lp,
52 opt_state: Default::default(),
53 cached_arena: Default::default(),
54 }
55 }
56}
57
58impl IntoLazy for LazyFrame {
59 fn lazy(self) -> LazyFrame {
60 self
61 }
62}
63
64#[derive(Clone, Default)]
69#[must_use]
70pub struct LazyFrame {
71 pub logical_plan: DslPlan,
72 pub(crate) opt_state: OptFlags,
73 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
74}
75
76impl From<DslPlan> for LazyFrame {
77 fn from(plan: DslPlan) -> Self {
78 Self {
79 logical_plan: plan,
80 opt_state: OptFlags::default(),
81 cached_arena: Default::default(),
82 }
83 }
84}
85
86impl LazyFrame {
87 pub(crate) fn from_inner(
88 logical_plan: DslPlan,
89 opt_state: OptFlags,
90 cached_arena: Arc<Mutex<Option<CachedArena>>>,
91 ) -> Self {
92 Self {
93 logical_plan,
94 opt_state,
95 cached_arena,
96 }
97 }
98
99 pub(crate) fn get_plan_builder(self) -> DslBuilder {
100 DslBuilder::from(self.logical_plan)
101 }
102
103 fn get_opt_state(&self) -> OptFlags {
104 self.opt_state
105 }
106
107 pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
108 LazyFrame {
109 logical_plan,
110 opt_state,
111 cached_arena: Default::default(),
112 }
113 }
114
115 pub fn get_current_optimizations(&self) -> OptFlags {
117 self.opt_state
118 }
119
120 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
122 self.opt_state = opt_state;
123 self
124 }
125
126 pub fn without_optimizations(self) -> Self {
128 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
129 }
130
131 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
133 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
134 self
135 }
136
137 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
139 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
140 self
141 }
142
143 pub fn with_check_order(mut self, toggle: bool) -> Self {
146 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
147 self
148 }
149
150 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
152 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
153 self
154 }
155
156 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
158 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
159 self
160 }
161
162 pub fn with_type_check(mut self, toggle: bool) -> Self {
164 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
165 self
166 }
167
168 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
170 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
171 self
172 }
173
174 #[cfg(feature = "cse")]
176 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
177 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
178 self
179 }
180
181 #[cfg(feature = "cse")]
183 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
184 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
185 self
186 }
187
188 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
190 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
191 self
192 }
193
194 #[cfg(feature = "new_streaming")]
195 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
196 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
197 self
198 }
199
200 pub fn with_gpu(mut self, toggle: bool) -> Self {
201 self.opt_state.set(OptFlags::GPU, toggle);
202 self
203 }
204
205 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
207 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
208 self
209 }
210
211 pub fn _with_eager(mut self, toggle: bool) -> Self {
213 self.opt_state.set(OptFlags::EAGER, toggle);
214 self
215 }
216
217 pub fn describe_plan(&self) -> PolarsResult<String> {
219 Ok(self.clone().to_alp()?.describe())
220 }
221
222 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
224 Ok(self.clone().to_alp()?.describe_tree_format())
225 }
226
227 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
231 Ok(self.clone().to_alp_optimized()?.describe())
232 }
233
234 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
238 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
239 }
240
241 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
246 if optimized {
247 self.describe_optimized_plan()
248 } else {
249 self.describe_plan()
250 }
251 }
252
253 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
293 let opt_state = self.get_opt_state();
294 let lp = self
295 .get_plan_builder()
296 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
297 .build();
298 Self::from_logical_plan(lp, opt_state)
299 }
300
301 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
321 self,
322 by_exprs: E,
323 sort_options: SortMultipleOptions,
324 ) -> Self {
325 let by_exprs = by_exprs.as_ref().to_vec();
326 if by_exprs.is_empty() {
327 self
328 } else {
329 let opt_state = self.get_opt_state();
330 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
331 Self::from_logical_plan(lp, opt_state)
332 }
333 }
334
335 pub fn top_k<E: AsRef<[Expr]>>(
336 self,
337 k: IdxSize,
338 by_exprs: E,
339 sort_options: SortMultipleOptions,
340 ) -> Self {
341 self.sort_by_exprs(
343 by_exprs,
344 sort_options.with_order_reversed().with_nulls_last(true),
345 )
346 .slice(0, k)
347 }
348
349 pub fn bottom_k<E: AsRef<[Expr]>>(
350 self,
351 k: IdxSize,
352 by_exprs: E,
353 sort_options: SortMultipleOptions,
354 ) -> Self {
355 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
357 .slice(0, k)
358 }
359
360 pub fn reverse(self) -> Self {
376 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
377 }
378
379 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
387 where
388 I: IntoIterator<Item = T>,
389 J: IntoIterator<Item = S>,
390 T: AsRef<str>,
391 S: AsRef<str>,
392 {
393 let iter = existing.into_iter();
394 let cap = iter.size_hint().0;
395 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
396 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
397
398 for (existing, new) in iter.zip(new) {
401 let existing = existing.as_ref();
402 let new = new.as_ref();
403 if new != existing {
404 existing_vec.push(existing.into());
405 new_vec.push(new.into());
406 }
407 }
408
409 self.map_private(DslFunction::Rename {
410 existing: existing_vec.into(),
411 new: new_vec.into(),
412 strict,
413 })
414 }
415
416 pub fn drop(self, columns: Selector) -> Self {
423 let opt_state = self.get_opt_state();
424 let lp = self.get_plan_builder().drop(columns).build();
425 Self::from_logical_plan(lp, opt_state)
426 }
427
428 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
433 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
434 }
435
436 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
441 self.select(vec![
442 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
443 ])
444 }
445
446 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448 let opt_state = self.get_opt_state();
449 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
450 Self::from_logical_plan(lp, opt_state)
451 }
452
453 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
455 let opt_state = self.get_opt_state();
456 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
457 Self::from_logical_plan(lp, opt_state)
458 }
459
460 pub fn cache(self) -> Self {
464 let opt_state = self.get_opt_state();
465 let lp = self.get_plan_builder().cache().build();
466 Self::from_logical_plan(lp, opt_state)
467 }
468
469 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
471 let cast_cols: Vec<Expr> = dtypes
472 .into_iter()
473 .map(|(name, dt)| {
474 let name = PlSmallStr::from_str(name);
475
476 if strict {
477 col(name).strict_cast(dt)
478 } else {
479 col(name).cast(dt)
480 }
481 })
482 .collect();
483
484 if cast_cols.is_empty() {
485 self
486 } else {
487 self.with_columns(cast_cols)
488 }
489 }
490
491 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
493 self.with_columns(vec![if strict {
494 col(PlSmallStr::from_static("*")).strict_cast(dtype)
495 } else {
496 col(PlSmallStr::from_static("*")).cast(dtype)
497 }])
498 }
499
500 pub fn optimize(
501 self,
502 lp_arena: &mut Arena<IR>,
503 expr_arena: &mut Arena<AExpr>,
504 ) -> PolarsResult<Node> {
505 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
506 }
507
508 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
509 let (mut lp_arena, mut expr_arena) = self.get_arenas();
510 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
511
512 Ok(IRPlan::new(node, lp_arena, expr_arena))
513 }
514
515 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
516 let (mut lp_arena, mut expr_arena) = self.get_arenas();
517 let node = to_alp(
518 self.logical_plan,
519 &mut expr_arena,
520 &mut lp_arena,
521 &mut self.opt_state,
522 )?;
523 let plan = IRPlan::new(node, lp_arena, expr_arena);
524 Ok(plan)
525 }
526
527 pub(crate) fn optimize_with_scratch(
528 self,
529 lp_arena: &mut Arena<IR>,
530 expr_arena: &mut Arena<AExpr>,
531 scratch: &mut Vec<Node>,
532 ) -> PolarsResult<Node> {
533 let lp_top = optimize(
534 self.logical_plan,
535 self.opt_state,
536 lp_arena,
537 expr_arena,
538 scratch,
539 apply_scan_predicate_to_scan_ir,
540 )?;
541
542 Ok(lp_top)
543 }
544
545 fn prepare_collect_post_opt<P>(
546 mut self,
547 check_sink: bool,
548 query_start: Option<std::time::Instant>,
549 post_opt: P,
550 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
551 where
552 P: FnOnce(
553 Node,
554 &mut Arena<IR>,
555 &mut Arena<AExpr>,
556 Option<std::time::Duration>,
557 ) -> PolarsResult<()>,
558 {
559 let (mut lp_arena, mut expr_arena) = self.get_arenas();
560
561 let mut scratch = vec![];
562 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
563
564 post_opt(
565 lp_top,
566 &mut lp_arena,
567 &mut expr_arena,
568 query_start.map(|s| s.elapsed()),
571 )?;
572
573 let no_file_sink = if check_sink {
575 !matches!(
576 lp_arena.get(lp_top),
577 IR::Sink {
578 payload: SinkTypeIR::File { .. },
579 ..
580 }
581 )
582 } else {
583 true
584 };
585 let physical_plan = create_physical_plan(
586 lp_top,
587 &mut lp_arena,
588 &mut expr_arena,
589 BUILD_STREAMING_EXECUTOR,
590 )?;
591
592 let state = ExecutionState::new();
593 Ok((state, physical_plan, no_file_sink))
594 }
595
596 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
598 where
599 P: FnOnce(
600 Node,
601 &mut Arena<IR>,
602 &mut Arena<AExpr>,
603 Option<std::time::Duration>,
604 ) -> PolarsResult<()>,
605 {
606 let (mut state, mut physical_plan, _) =
607 self.prepare_collect_post_opt(false, None, post_opt)?;
608 physical_plan.execute(&mut state)
609 }
610
611 #[allow(unused_mut)]
612 fn prepare_collect(
613 self,
614 check_sink: bool,
615 query_start: Option<std::time::Instant>,
616 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
617 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
618 }
619
620 pub fn collect_with_engine(mut self, engine: Engine) -> PolarsResult<QueryResult> {
625 let engine = match engine {
626 Engine::Streaming => Engine::Streaming,
627 _ if std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1") => {
628 Engine::Streaming
629 },
630 Engine::Auto => Engine::InMemory,
631 v => v,
632 };
633
634 if engine != Engine::Streaming
635 && std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1")
636 {
637 feature_gated!("new_streaming", {
638 if let Some(r) = self.clone()._collect_with_streaming_suppress_todo_panic() {
639 return r;
640 }
641 })
642 }
643 match engine {
644 Engine::Streaming => {
645 feature_gated!("new_streaming", self = self.with_new_streaming(true))
646 },
647 Engine::Gpu => self = self.with_gpu(true),
648 _ => (),
649 }
650
651 let mut ir_plan = self.to_alp_optimized()?;
652
653 ir_plan.ensure_root_node_is_sink();
654
655 match engine {
656 Engine::Streaming => feature_gated!("new_streaming", {
657 polars_stream::run_query(
658 ir_plan.lp_top,
659 &mut ir_plan.lp_arena,
660 &mut ir_plan.expr_arena,
661 )
662 }),
663 Engine::InMemory | Engine::Gpu => {
664 if let IR::SinkMultiple { inputs } = ir_plan.root() {
665 polars_ensure!(
666 engine != Engine::Gpu,
667 InvalidOperation:
668 "collect_all is not supported for the gpu engine"
669 );
670
671 return create_multiple_physical_plans(
672 inputs.clone().as_slice(),
673 &mut ir_plan.lp_arena,
674 &mut ir_plan.expr_arena,
675 BUILD_STREAMING_EXECUTOR,
676 )?
677 .execute()
678 .map(QueryResult::Multiple);
679 }
680
681 let mut physical_plan = create_physical_plan(
682 ir_plan.lp_top,
683 &mut ir_plan.lp_arena,
684 &mut ir_plan.expr_arena,
685 BUILD_STREAMING_EXECUTOR,
686 )?;
687 let mut state = ExecutionState::new();
688 physical_plan.execute(&mut state).map(QueryResult::Single)
689 },
690 Engine::Auto => unreachable!(),
691 }
692 }
693
694 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
695 let sink_multiple = LazyFrame {
696 logical_plan: DslPlan::SinkMultiple { inputs: plans },
697 opt_state,
698 cached_arena: Default::default(),
699 };
700 sink_multiple.explain(true)
701 }
702
703 pub fn collect_all_with_engine(
704 plans: Vec<DslPlan>,
705 engine: Engine,
706 opt_state: OptFlags,
707 ) -> PolarsResult<Vec<DataFrame>> {
708 if plans.is_empty() {
709 return Ok(Vec::new());
710 }
711
712 LazyFrame {
713 logical_plan: DslPlan::SinkMultiple { inputs: plans },
714 opt_state,
715 cached_arena: Default::default(),
716 }
717 .collect_with_engine(engine)
718 .map(|r| r.unwrap_multiple())
719 }
720
721 pub fn collect(self) -> PolarsResult<DataFrame> {
739 self.collect_with_engine(Engine::Auto).map(|r| match r {
740 QueryResult::Single(df) => df,
741 QueryResult::Multiple(_) => DataFrame::empty(),
743 })
744 }
745
746 #[cfg(feature = "async")]
751 pub fn collect_batches(
752 self,
753 engine: Engine,
754 maintain_order: bool,
755 chunk_size: Option<NonZeroUsize>,
756 lazy: bool,
757 ) -> PolarsResult<CollectBatches> {
758 let (send, recv) = sync_channel(1);
759 let runner_send = send.clone();
760 let ldf = self.sink_batches(
761 PlanCallback::new(move |df| {
762 let send_result = send.send(Ok(df));
764 Ok(send_result.is_err())
765 }),
766 maintain_order,
767 chunk_size,
768 )?;
769 let runner = move || {
770 polars_io::pl_async::get_runtime().spawn_blocking(move || {
773 if let Err(e) = ldf.collect_with_engine(engine) {
774 runner_send.send(Err(e)).ok();
775 }
776 });
777 };
778
779 let mut collect_batches = CollectBatches {
780 recv,
781 runner: Some(Box::new(runner)),
782 };
783 if !lazy {
784 collect_batches.start();
785 }
786 Ok(collect_batches)
787 }
788
789 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
792 where
793 P: FnOnce(
794 Node,
795 &mut Arena<IR>,
796 &mut Arena<AExpr>,
797 Option<std::time::Duration>,
798 ) -> PolarsResult<()>,
799 {
800 let query_start = std::time::Instant::now();
801 let (mut state, mut physical_plan, _) =
802 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
803 state.time_nodes(query_start);
804 let out = physical_plan.execute(&mut state)?;
805 let timer_df = state.finish_timer()?;
806 Ok((out, timer_df))
807 }
808
809 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
817 self._profile_post_opt(|_, _, _, _| Ok(()))
818 }
819
820 pub fn sink_batches(
821 mut self,
822 function: PlanCallback<DataFrame, bool>,
823 maintain_order: bool,
824 chunk_size: Option<NonZeroUsize>,
825 ) -> PolarsResult<Self> {
826 use polars_plan::prelude::sink::CallbackSinkType;
827
828 polars_ensure!(
829 !matches!(self.logical_plan, DslPlan::Sink { .. }),
830 InvalidOperation: "cannot create a sink on top of another sink"
831 );
832
833 self.logical_plan = DslPlan::Sink {
834 input: Arc::new(self.logical_plan),
835 payload: SinkType::Callback(CallbackSinkType {
836 function,
837 maintain_order,
838 chunk_size,
839 }),
840 };
841
842 Ok(self)
843 }
844
845 #[cfg(feature = "new_streaming")]
847 fn _collect_with_streaming_suppress_todo_panic(
848 mut self,
849 ) -> Option<PolarsResult<polars_core::query_result::QueryResult>> {
850 self.opt_state |= OptFlags::NEW_STREAMING;
851 let mut ir_plan = match self.to_alp_optimized() {
852 Ok(v) => v,
853 Err(e) => return Some(Err(e)),
854 };
855
856 ir_plan.ensure_root_node_is_sink();
857
858 let f = || {
859 polars_stream::run_query(
860 ir_plan.lp_top,
861 &mut ir_plan.lp_arena,
862 &mut ir_plan.expr_arena,
863 )
864 };
865
866 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
867 Ok(v) => Some(v),
868 Err(e) => {
869 if e.downcast_ref::<&str>()
872 .is_some_and(|s| s.starts_with("not yet implemented"))
873 {
874 if polars_core::config::verbose() {
875 eprintln!(
876 "caught unimplemented error in new streaming engine, falling back to normal engine"
877 );
878 }
879 None
880 } else {
881 std::panic::resume_unwind(e)
882 }
883 },
884 }
885 }
886
887 pub fn sink(
888 mut self,
889 sink_type: SinkDestination,
890 file_format: FileWriteFormat,
891 unified_sink_args: UnifiedSinkArgs,
892 ) -> PolarsResult<Self> {
893 polars_ensure!(
894 !matches!(self.logical_plan, DslPlan::Sink { .. }),
895 InvalidOperation: "cannot create a sink on top of another sink"
896 );
897
898 self.logical_plan = DslPlan::Sink {
899 input: Arc::new(self.logical_plan),
900 payload: match sink_type {
901 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
902 target,
903 file_format,
904 unified_sink_args,
905 }),
906 SinkDestination::Partitioned {
907 base_path,
908 file_path_provider,
909 partition_strategy,
910 max_rows_per_file,
911 approximate_bytes_per_file,
912 } => SinkType::Partitioned(PartitionedSinkOptions {
913 base_path,
914 file_path_provider,
915 partition_strategy,
916 file_format,
917 unified_sink_args,
918 max_rows_per_file,
919 approximate_bytes_per_file,
920 }),
921 },
922 };
923 Ok(self)
924 }
925
926 pub fn filter(self, predicate: Expr) -> Self {
944 let opt_state = self.get_opt_state();
945 let lp = self.get_plan_builder().filter(predicate).build();
946 Self::from_logical_plan(lp, opt_state)
947 }
948
949 pub fn remove(self, predicate: Expr) -> Self {
967 self.filter(predicate.neq_missing(lit(true)))
968 }
969
970 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
996 let exprs = exprs.as_ref().to_vec();
997 self.select_impl(
998 exprs,
999 ProjectionOptions {
1000 run_parallel: true,
1001 duplicate_check: true,
1002 should_broadcast: true,
1003 },
1004 )
1005 }
1006
1007 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1008 let exprs = exprs.as_ref().to_vec();
1009 self.select_impl(
1010 exprs,
1011 ProjectionOptions {
1012 run_parallel: false,
1013 duplicate_check: true,
1014 should_broadcast: true,
1015 },
1016 )
1017 }
1018
1019 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1020 let opt_state = self.get_opt_state();
1021 let lp = self.get_plan_builder().project(exprs, options).build();
1022 Self::from_logical_plan(lp, opt_state)
1023 }
1024
1025 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1046 let keys = by
1047 .as_ref()
1048 .iter()
1049 .map(|e| e.clone().into())
1050 .collect::<Vec<_>>();
1051 let opt_state = self.get_opt_state();
1052
1053 #[cfg(feature = "dynamic_group_by")]
1054 {
1055 LazyGroupBy {
1056 logical_plan: self.logical_plan,
1057 opt_state,
1058 keys,
1059 predicates: vec![],
1060 maintain_order: false,
1061 dynamic_options: None,
1062 rolling_options: None,
1063 }
1064 }
1065
1066 #[cfg(not(feature = "dynamic_group_by"))]
1067 {
1068 LazyGroupBy {
1069 logical_plan: self.logical_plan,
1070 opt_state,
1071 keys,
1072 predicates: vec![],
1073 maintain_order: false,
1074 }
1075 }
1076 }
1077
1078 #[cfg(feature = "dynamic_group_by")]
1086 pub fn rolling<E: AsRef<[Expr]>>(
1087 mut self,
1088 index_column: Expr,
1089 group_by: E,
1090 mut options: RollingGroupOptions,
1091 ) -> LazyGroupBy {
1092 if let Expr::Column(name) = index_column {
1093 options.index_column = name;
1094 } else {
1095 let output_field = index_column
1096 .to_field(&self.collect_schema().unwrap())
1097 .unwrap();
1098 return self.with_column(index_column).rolling(
1099 Expr::Column(output_field.name().clone()),
1100 group_by,
1101 options,
1102 );
1103 }
1104 let opt_state = self.get_opt_state();
1105 LazyGroupBy {
1106 logical_plan: self.logical_plan,
1107 opt_state,
1108 predicates: vec![],
1109 keys: group_by.as_ref().to_vec(),
1110 maintain_order: true,
1111 dynamic_options: None,
1112 rolling_options: Some(options),
1113 }
1114 }
1115
1116 #[cfg(feature = "dynamic_group_by")]
1132 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1133 mut self,
1134 index_column: Expr,
1135 group_by: E,
1136 mut options: DynamicGroupOptions,
1137 ) -> LazyGroupBy {
1138 if let Expr::Column(name) = index_column {
1139 options.index_column = name;
1140 } else {
1141 let output_field = index_column
1142 .to_field(&self.collect_schema().unwrap())
1143 .unwrap();
1144 return self.with_column(index_column).group_by_dynamic(
1145 Expr::Column(output_field.name().clone()),
1146 group_by,
1147 options,
1148 );
1149 }
1150 let opt_state = self.get_opt_state();
1151 LazyGroupBy {
1152 logical_plan: self.logical_plan,
1153 opt_state,
1154 predicates: vec![],
1155 keys: group_by.as_ref().to_vec(),
1156 maintain_order: true,
1157 dynamic_options: Some(options),
1158 rolling_options: None,
1159 }
1160 }
1161
1162 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1164 let keys = by
1165 .as_ref()
1166 .iter()
1167 .map(|e| e.clone().into())
1168 .collect::<Vec<_>>();
1169 let opt_state = self.get_opt_state();
1170
1171 #[cfg(feature = "dynamic_group_by")]
1172 {
1173 LazyGroupBy {
1174 logical_plan: self.logical_plan,
1175 opt_state,
1176 keys,
1177 predicates: vec![],
1178 maintain_order: true,
1179 dynamic_options: None,
1180 rolling_options: None,
1181 }
1182 }
1183
1184 #[cfg(not(feature = "dynamic_group_by"))]
1185 {
1186 LazyGroupBy {
1187 logical_plan: self.logical_plan,
1188 opt_state,
1189 keys,
1190 predicates: vec![],
1191 maintain_order: true,
1192 }
1193 }
1194 }
1195
1196 #[cfg(feature = "semi_anti_join")]
1213 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1214 self.join(
1215 other,
1216 [left_on.into()],
1217 [right_on.into()],
1218 JoinArgs::new(JoinType::Anti),
1219 )
1220 }
1221
1222 #[cfg(feature = "cross_join")]
1224 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1225 self.join(
1226 other,
1227 vec![],
1228 vec![],
1229 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1230 )
1231 }
1232
1233 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1250 self.join(
1251 other,
1252 [left_on.into()],
1253 [right_on.into()],
1254 JoinArgs::new(JoinType::Left),
1255 )
1256 }
1257
1258 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1275 self.join(
1276 other,
1277 [left_on.into()],
1278 [right_on.into()],
1279 JoinArgs::new(JoinType::Inner),
1280 )
1281 }
1282
1283 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1300 self.join(
1301 other,
1302 [left_on.into()],
1303 [right_on.into()],
1304 JoinArgs::new(JoinType::Full),
1305 )
1306 }
1307
1308 #[cfg(feature = "semi_anti_join")]
1325 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1326 self.join(
1327 other,
1328 [left_on.into()],
1329 [right_on.into()],
1330 JoinArgs::new(JoinType::Semi),
1331 )
1332 }
1333
1334 pub fn join<E: AsRef<[Expr]>>(
1356 self,
1357 other: LazyFrame,
1358 left_on: E,
1359 right_on: E,
1360 args: JoinArgs,
1361 ) -> LazyFrame {
1362 let left_on = left_on.as_ref().to_vec();
1363 let right_on = right_on.as_ref().to_vec();
1364
1365 self._join_impl(other, left_on, right_on, args)
1366 }
1367
1368 fn _join_impl(
1369 self,
1370 other: LazyFrame,
1371 left_on: Vec<Expr>,
1372 right_on: Vec<Expr>,
1373 args: JoinArgs,
1374 ) -> LazyFrame {
1375 let JoinArgs {
1376 how,
1377 validation,
1378 suffix,
1379 slice,
1380 nulls_equal,
1381 coalesce,
1382 maintain_order,
1383 build_side,
1384 } = args;
1385
1386 if slice.is_some() {
1387 panic!("impl error: slice is not handled")
1388 }
1389
1390 let mut builder = self
1391 .join_builder()
1392 .with(other)
1393 .left_on(left_on)
1394 .right_on(right_on)
1395 .how(how)
1396 .validate(validation)
1397 .join_nulls(nulls_equal)
1398 .coalesce(coalesce)
1399 .maintain_order(maintain_order)
1400 .build_side(build_side);
1401
1402 if let Some(suffix) = suffix {
1403 builder = builder.suffix(suffix);
1404 }
1405
1406 builder.finish()
1408 }
1409
1410 pub fn join_builder(self) -> JoinBuilder {
1416 JoinBuilder::new(self)
1417 }
1418
1419 pub fn with_column(self, expr: Expr) -> LazyFrame {
1437 let opt_state = self.get_opt_state();
1438 let lp = self
1439 .get_plan_builder()
1440 .with_columns(
1441 vec![expr],
1442 ProjectionOptions {
1443 run_parallel: false,
1444 duplicate_check: true,
1445 should_broadcast: true,
1446 },
1447 )
1448 .build();
1449 Self::from_logical_plan(lp, opt_state)
1450 }
1451
1452 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1467 let exprs = exprs.as_ref().to_vec();
1468 self.with_columns_impl(
1469 exprs,
1470 ProjectionOptions {
1471 run_parallel: true,
1472 duplicate_check: true,
1473 should_broadcast: true,
1474 },
1475 )
1476 }
1477
1478 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1480 let exprs = exprs.as_ref().to_vec();
1481 self.with_columns_impl(
1482 exprs,
1483 ProjectionOptions {
1484 run_parallel: false,
1485 duplicate_check: true,
1486 should_broadcast: true,
1487 },
1488 )
1489 }
1490
1491 pub fn match_to_schema(
1493 self,
1494 schema: SchemaRef,
1495 per_column: Arc<[MatchToSchemaPerColumn]>,
1496 extra_columns: ExtraColumnsPolicy,
1497 ) -> LazyFrame {
1498 let opt_state = self.get_opt_state();
1499 let lp = self
1500 .get_plan_builder()
1501 .match_to_schema(schema, per_column, extra_columns)
1502 .build();
1503 Self::from_logical_plan(lp, opt_state)
1504 }
1505
1506 pub fn pipe_with_schema(
1507 self,
1508 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1509 ) -> Self {
1510 let opt_state = self.get_opt_state();
1511 let lp = self
1512 .get_plan_builder()
1513 .pipe_with_schema(vec![], callback)
1514 .build();
1515 Self::from_logical_plan(lp, opt_state)
1516 }
1517
1518 pub fn pipe_with_schemas(
1519 self,
1520 others: Vec<LazyFrame>,
1521 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1522 ) -> Self {
1523 let opt_state = self.get_opt_state();
1524 let lp = self
1525 .get_plan_builder()
1526 .pipe_with_schema(
1527 others.into_iter().map(|lf| lf.logical_plan).collect(),
1528 callback,
1529 )
1530 .build();
1531 Self::from_logical_plan(lp, opt_state)
1532 }
1533
1534 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1535 let opt_state = self.get_opt_state();
1536 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1537 Self::from_logical_plan(lp, opt_state)
1538 }
1539
1540 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1541 let contexts = contexts
1542 .as_ref()
1543 .iter()
1544 .map(|lf| lf.logical_plan.clone())
1545 .collect();
1546 let opt_state = self.get_opt_state();
1547 let lp = self.get_plan_builder().with_context(contexts).build();
1548 Self::from_logical_plan(lp, opt_state)
1549 }
1550
1551 pub fn max(self) -> Self {
1555 self.map_private(DslFunction::Stats(StatsFunction::Max))
1556 }
1557
1558 pub fn min(self) -> Self {
1562 self.map_private(DslFunction::Stats(StatsFunction::Min))
1563 }
1564
1565 pub fn sum(self) -> Self {
1575 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1576 }
1577
1578 pub fn mean(self) -> Self {
1583 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1584 }
1585
1586 pub fn median(self) -> Self {
1592 self.map_private(DslFunction::Stats(StatsFunction::Median))
1593 }
1594
1595 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1597 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1598 quantile,
1599 method,
1600 }))
1601 }
1602
1603 pub fn std(self, ddof: u8) -> Self {
1616 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1617 }
1618
1619 pub fn var(self, ddof: u8) -> Self {
1629 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1630 }
1631
1632 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1634 self.explode_impl(columns, options, false)
1635 }
1636
1637 fn explode_impl(
1639 self,
1640 columns: Selector,
1641 options: ExplodeOptions,
1642 allow_empty: bool,
1643 ) -> LazyFrame {
1644 let opt_state = self.get_opt_state();
1645 let lp = self
1646 .get_plan_builder()
1647 .explode(columns, options, allow_empty)
1648 .build();
1649 Self::from_logical_plan(lp, opt_state)
1650 }
1651
1652 pub fn null_count(self) -> LazyFrame {
1654 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1655 }
1656
1657 pub fn unique_stable(
1662 self,
1663 subset: Option<Selector>,
1664 keep_strategy: UniqueKeepStrategy,
1665 ) -> LazyFrame {
1666 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1667 self.unique_stable_generic(subset, keep_strategy)
1668 }
1669
1670 pub fn unique_stable_generic(
1671 self,
1672 subset: Option<Vec<Expr>>,
1673 keep_strategy: UniqueKeepStrategy,
1674 ) -> LazyFrame {
1675 let opt_state = self.get_opt_state();
1676 let options = DistinctOptionsDSL {
1677 subset,
1678 maintain_order: true,
1679 keep_strategy,
1680 };
1681 let lp = self.get_plan_builder().distinct(options).build();
1682 Self::from_logical_plan(lp, opt_state)
1683 }
1684
1685 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1693 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1694 self.unique_generic(subset, keep_strategy)
1695 }
1696
1697 pub fn unique_generic(
1698 self,
1699 subset: Option<Vec<Expr>>,
1700 keep_strategy: UniqueKeepStrategy,
1701 ) -> LazyFrame {
1702 let opt_state = self.get_opt_state();
1703 let options = DistinctOptionsDSL {
1704 subset,
1705 maintain_order: false,
1706 keep_strategy,
1707 };
1708 let lp = self.get_plan_builder().distinct(options).build();
1709 Self::from_logical_plan(lp, opt_state)
1710 }
1711
1712 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1717 let opt_state = self.get_opt_state();
1718 let lp = self.get_plan_builder().drop_nans(subset).build();
1719 Self::from_logical_plan(lp, opt_state)
1720 }
1721
1722 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1727 let opt_state = self.get_opt_state();
1728 let lp = self.get_plan_builder().drop_nulls(subset).build();
1729 Self::from_logical_plan(lp, opt_state)
1730 }
1731
1732 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1742 let opt_state = self.get_opt_state();
1743 let lp = self.get_plan_builder().slice(offset, len).build();
1744 Self::from_logical_plan(lp, opt_state)
1745 }
1746
1747 pub fn clear(self) -> LazyFrame {
1749 self.slice(0, 0)
1750 }
1751
1752 pub fn first(self) -> LazyFrame {
1756 self.slice(0, 1)
1757 }
1758
1759 pub fn last(self) -> LazyFrame {
1763 self.slice(-1, 1)
1764 }
1765
1766 pub fn tail(self, n: IdxSize) -> LazyFrame {
1770 let neg_tail = -(n as i64);
1771 self.slice(neg_tail, n)
1772 }
1773
1774 #[cfg(feature = "pivot")]
1775 #[expect(clippy::too_many_arguments)]
1776 pub fn pivot(
1777 self,
1778 on: Selector,
1779 on_columns: Arc<DataFrame>,
1780 index: Selector,
1781 values: Selector,
1782 agg: Expr,
1783 maintain_order: bool,
1784 separator: PlSmallStr,
1785 column_naming: PivotColumnNaming,
1786 ) -> LazyFrame {
1787 let opt_state = self.get_opt_state();
1788 let lp = self
1789 .get_plan_builder()
1790 .pivot(
1791 on,
1792 on_columns,
1793 index,
1794 values,
1795 agg,
1796 maintain_order,
1797 separator,
1798 column_naming,
1799 )
1800 .build();
1801 Self::from_logical_plan(lp, opt_state)
1802 }
1803
1804 #[cfg(feature = "pivot")]
1808 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1809 let opt_state = self.get_opt_state();
1810 let lp = self.get_plan_builder().unpivot(args).build();
1811 Self::from_logical_plan(lp, opt_state)
1812 }
1813
1814 pub fn limit(self, n: IdxSize) -> LazyFrame {
1816 self.slice(0, n)
1817 }
1818
1819 pub fn map<F>(
1833 self,
1834 function: F,
1835 optimizations: AllowedOptimizations,
1836 schema: Option<Arc<dyn UdfSchema>>,
1837 name: Option<&'static str>,
1838 ) -> LazyFrame
1839 where
1840 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1841 {
1842 let opt_state = self.get_opt_state();
1843 let lp = self
1844 .get_plan_builder()
1845 .map(
1846 function,
1847 optimizations,
1848 schema,
1849 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1850 )
1851 .build();
1852 Self::from_logical_plan(lp, opt_state)
1853 }
1854
1855 #[cfg(feature = "python")]
1856 pub fn map_python(
1857 self,
1858 function: polars_utils::python_function::PythonFunction,
1859 optimizations: AllowedOptimizations,
1860 schema: Option<SchemaRef>,
1861 validate_output: bool,
1862 ) -> LazyFrame {
1863 let opt_state = self.get_opt_state();
1864 let lp = self
1865 .get_plan_builder()
1866 .map_python(function, optimizations, schema, validate_output)
1867 .build();
1868 Self::from_logical_plan(lp, opt_state)
1869 }
1870
1871 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1872 let opt_state = self.get_opt_state();
1873 let lp = self.get_plan_builder().map_private(function).build();
1874 Self::from_logical_plan(lp, opt_state)
1875 }
1876
1877 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1886 where
1887 S: Into<PlSmallStr>,
1888 {
1889 let name = name.into();
1890
1891 match &self.logical_plan {
1892 v @ DslPlan::Scan {
1893 scan_type,
1894 unified_scan_args,
1895 ..
1896 } if unified_scan_args.row_index.is_none()
1897 && !matches!(
1898 &**scan_type,
1899 FileScanDsl::Anonymous { .. } | FileScanDsl::ExpandedPaths { .. }
1900 ) =>
1901 {
1902 let DslPlan::Scan {
1903 sources,
1904 mut unified_scan_args,
1905 scan_type,
1906 cached_ir: _,
1907 } = v.clone()
1908 else {
1909 unreachable!()
1910 };
1911
1912 unified_scan_args.row_index = Some(RowIndex {
1913 name,
1914 offset: offset.unwrap_or(0),
1915 });
1916
1917 DslPlan::Scan {
1918 sources,
1919 unified_scan_args,
1920 scan_type,
1921 cached_ir: Default::default(),
1922 }
1923 .into()
1924 },
1925 _ => self.map_private(DslFunction::RowIndex { name, offset }),
1926 }
1927 }
1928
1929 pub fn count(self) -> LazyFrame {
1931 self.select(vec![col(PlSmallStr::from_static("*")).count()])
1932 }
1933
1934 #[cfg(feature = "dtype-struct")]
1937 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1938 self.map_private(DslFunction::Unnest {
1939 columns: cols,
1940 separator,
1941 })
1942 }
1943
1944 #[cfg(feature = "merge_sorted")]
1945 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1946 where
1947 S: Into<PlSmallStr>,
1948 {
1949 let key = key.into();
1950
1951 let lp = DslPlan::MergeSorted {
1952 input_left: Arc::new(self.logical_plan),
1953 input_right: Arc::new(other.logical_plan),
1954 key,
1955 };
1956 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1957 }
1958
1959 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1960 let lp = DslPlan::MapFunction {
1961 input: Arc::new(self.logical_plan),
1962 function: DslFunction::Hint(hint),
1963 };
1964 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1965 }
1966}
1967
1968#[derive(Clone)]
1970pub struct LazyGroupBy {
1971 pub logical_plan: DslPlan,
1972 opt_state: OptFlags,
1973 keys: Vec<Expr>,
1974 predicates: Vec<Expr>,
1975 maintain_order: bool,
1976 #[cfg(feature = "dynamic_group_by")]
1977 dynamic_options: Option<DynamicGroupOptions>,
1978 #[cfg(feature = "dynamic_group_by")]
1979 rolling_options: Option<RollingGroupOptions>,
1980}
1981
1982impl From<LazyGroupBy> for LazyFrame {
1983 fn from(lgb: LazyGroupBy) -> Self {
1984 Self {
1985 logical_plan: lgb.logical_plan,
1986 opt_state: lgb.opt_state,
1987 cached_arena: Default::default(),
1988 }
1989 }
1990}
1991
1992impl LazyGroupBy {
1993 pub fn having(mut self, predicate: Expr) -> Self {
2014 self.predicates.push(predicate);
2015 self
2016 }
2017
2018 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2040 #[cfg(feature = "dynamic_group_by")]
2041 let lp = DslBuilder::from(self.logical_plan)
2042 .group_by(
2043 self.keys,
2044 self.predicates,
2045 aggs,
2046 None,
2047 self.maintain_order,
2048 self.dynamic_options,
2049 self.rolling_options,
2050 )
2051 .build();
2052
2053 #[cfg(not(feature = "dynamic_group_by"))]
2054 let lp = DslBuilder::from(self.logical_plan)
2055 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2056 .build();
2057 LazyFrame::from_logical_plan(lp, self.opt_state)
2058 }
2059
2060 pub fn head(self, n: Option<usize>) -> LazyFrame {
2062 let keys = self
2063 .keys
2064 .iter()
2065 .filter_map(|expr| expr_output_name(expr).ok())
2066 .collect::<Vec<_>>();
2067
2068 self.agg([all().as_expr().head(n)]).explode_impl(
2069 all() - by_name(keys.iter().cloned(), false, false),
2070 ExplodeOptions {
2071 empty_as_null: true,
2072 keep_nulls: true,
2073 },
2074 true,
2075 )
2076 }
2077
2078 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2080 let keys = self
2081 .keys
2082 .iter()
2083 .filter_map(|expr| expr_output_name(expr).ok())
2084 .collect::<Vec<_>>();
2085
2086 self.agg([all().as_expr().tail(n)]).explode_impl(
2087 all() - by_name(keys.iter().cloned(), false, false),
2088 ExplodeOptions {
2089 empty_as_null: true,
2090 keep_nulls: true,
2091 },
2092 true,
2093 )
2094 }
2095
2096 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2101 if !self.predicates.is_empty() {
2102 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2103 }
2104
2105 #[cfg(feature = "dynamic_group_by")]
2106 let options = GroupbyOptions {
2107 dynamic: self.dynamic_options,
2108 rolling: self.rolling_options,
2109 slice: None,
2110 };
2111
2112 #[cfg(not(feature = "dynamic_group_by"))]
2113 let options = GroupbyOptions { slice: None };
2114
2115 let lp = DslPlan::GroupBy {
2116 input: Arc::new(self.logical_plan),
2117 keys: self.keys,
2118 predicates: vec![],
2119 aggs: vec![],
2120 apply: Some((f, schema)),
2121 maintain_order: self.maintain_order,
2122 options: Arc::new(options),
2123 };
2124 LazyFrame::from_logical_plan(lp, self.opt_state)
2125 }
2126}
2127
2128#[must_use]
2129pub struct JoinBuilder {
2130 lf: LazyFrame,
2131 how: JoinType,
2132 other: Option<LazyFrame>,
2133 left_on: Vec<Expr>,
2134 right_on: Vec<Expr>,
2135 allow_parallel: bool,
2136 force_parallel: bool,
2137 suffix: Option<PlSmallStr>,
2138 validation: JoinValidation,
2139 nulls_equal: bool,
2140 coalesce: JoinCoalesce,
2141 maintain_order: MaintainOrderJoin,
2142 build_side: Option<JoinBuildSide>,
2143}
2144impl JoinBuilder {
2145 pub fn new(lf: LazyFrame) -> Self {
2147 Self {
2148 lf,
2149 other: None,
2150 how: JoinType::Inner,
2151 left_on: vec![],
2152 right_on: vec![],
2153 allow_parallel: true,
2154 force_parallel: false,
2155 suffix: None,
2156 validation: Default::default(),
2157 nulls_equal: false,
2158 coalesce: Default::default(),
2159 maintain_order: Default::default(),
2160 build_side: None,
2161 }
2162 }
2163
2164 pub fn with(mut self, other: LazyFrame) -> Self {
2166 self.other = Some(other);
2167 self
2168 }
2169
2170 pub fn how(mut self, how: JoinType) -> Self {
2172 self.how = how;
2173 self
2174 }
2175
2176 pub fn validate(mut self, validation: JoinValidation) -> Self {
2177 self.validation = validation;
2178 self
2179 }
2180
2181 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2185 let on = on.as_ref().to_vec();
2186 self.left_on.clone_from(&on);
2187 self.right_on = on;
2188 self
2189 }
2190
2191 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2195 self.left_on = on.as_ref().to_vec();
2196 self
2197 }
2198
2199 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2203 self.right_on = on.as_ref().to_vec();
2204 self
2205 }
2206
2207 pub fn allow_parallel(mut self, allow: bool) -> Self {
2209 self.allow_parallel = allow;
2210 self
2211 }
2212
2213 pub fn force_parallel(mut self, force: bool) -> Self {
2215 self.force_parallel = force;
2216 self
2217 }
2218
2219 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2221 self.nulls_equal = nulls_equal;
2222 self
2223 }
2224
2225 pub fn suffix<S>(mut self, suffix: S) -> Self
2228 where
2229 S: Into<PlSmallStr>,
2230 {
2231 self.suffix = Some(suffix.into());
2232 self
2233 }
2234
2235 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2237 self.coalesce = coalesce;
2238 self
2239 }
2240
2241 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2243 self.maintain_order = maintain_order;
2244 self
2245 }
2246
2247 pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2249 self.build_side = build_side;
2250 self
2251 }
2252
2253 pub fn finish(self) -> LazyFrame {
2255 let opt_state = self.lf.opt_state;
2256 let other = self.other.expect("'with' not set in join builder");
2257
2258 let args = JoinArgs {
2259 how: self.how,
2260 validation: self.validation,
2261 suffix: self.suffix,
2262 slice: None,
2263 nulls_equal: self.nulls_equal,
2264 coalesce: self.coalesce,
2265 maintain_order: self.maintain_order,
2266 build_side: self.build_side,
2267 };
2268
2269 let lp = self
2270 .lf
2271 .get_plan_builder()
2272 .join(
2273 other.logical_plan,
2274 self.left_on,
2275 self.right_on,
2276 JoinOptions {
2277 allow_parallel: self.allow_parallel,
2278 force_parallel: self.force_parallel,
2279 args,
2280 }
2281 .into(),
2282 )
2283 .build();
2284 LazyFrame::from_logical_plan(lp, opt_state)
2285 }
2286
2287 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2289 let opt_state = self.lf.opt_state;
2290 let other = self.other.expect("with not set");
2291
2292 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2294 if let Expr::BinaryExpr {
2295 op: Operator::And,
2296 left,
2297 right,
2298 } = predicate
2299 {
2300 decompose_and((*left).clone(), expanded_predicates);
2301 decompose_and((*right).clone(), expanded_predicates);
2302 } else {
2303 expanded_predicates.push(predicate);
2304 }
2305 }
2306 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2307 for predicate in predicates {
2308 decompose_and(predicate, &mut expanded_predicates);
2309 }
2310 let predicates: Vec<Expr> = expanded_predicates;
2311
2312 #[cfg(feature = "is_between")]
2314 let predicates: Vec<Expr> = {
2315 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2316 for predicate in predicates {
2317 if let Expr::Function {
2318 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2319 input,
2320 ..
2321 } = &predicate
2322 {
2323 if let [expr, lower, upper] = input.as_slice() {
2324 match closed {
2325 ClosedInterval::Both => {
2326 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2327 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2328 },
2329 ClosedInterval::Right => {
2330 expanded_predicates.push(expr.clone().gt(lower.clone()));
2331 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2332 },
2333 ClosedInterval::Left => {
2334 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2335 expanded_predicates.push(expr.clone().lt(upper.clone()));
2336 },
2337 ClosedInterval::None => {
2338 expanded_predicates.push(expr.clone().gt(lower.clone()));
2339 expanded_predicates.push(expr.clone().lt(upper.clone()));
2340 },
2341 }
2342 continue;
2343 }
2344 }
2345 expanded_predicates.push(predicate);
2346 }
2347 expanded_predicates
2348 };
2349
2350 let args = JoinArgs {
2351 how: self.how,
2352 validation: self.validation,
2353 suffix: self.suffix,
2354 slice: None,
2355 nulls_equal: self.nulls_equal,
2356 coalesce: self.coalesce,
2357 maintain_order: self.maintain_order,
2358 build_side: self.build_side,
2359 };
2360 let options = JoinOptions {
2361 allow_parallel: self.allow_parallel,
2362 force_parallel: self.force_parallel,
2363 args,
2364 };
2365
2366 let lp = DslPlan::Join {
2367 input_left: Arc::new(self.lf.logical_plan),
2368 input_right: Arc::new(other.logical_plan),
2369 left_on: Default::default(),
2370 right_on: Default::default(),
2371 predicates,
2372 options: Arc::from(options),
2373 };
2374
2375 LazyFrame::from_logical_plan(lp, opt_state)
2376 }
2377}
2378
2379pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2380 #[cfg(not(feature = "new_streaming"))]
2381 {
2382 None
2383 }
2384 #[cfg(feature = "new_streaming")]
2385 {
2386 Some(polars_stream::build_streaming_query_executor)
2387 }
2388};
2389
2390pub struct CollectBatches {
2391 recv: Receiver<PolarsResult<DataFrame>>,
2392 runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2393}
2394
2395impl CollectBatches {
2396 pub fn start(&mut self) {
2398 if let Some(runner) = self.runner.take() {
2399 runner()
2400 }
2401 }
2402}
2403
2404impl Iterator for CollectBatches {
2405 type Item = PolarsResult<DataFrame>;
2406
2407 fn next(&mut self) -> Option<Self::Item> {
2408 self.start();
2409 self.recv.recv().ok()
2410 }
2411}