1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::error::feature_gated;
26#[cfg(feature = "pivot")]
27use polars_core::frame::PivotColumnNaming;
28use polars_core::prelude::*;
29use polars_core::query_result::QueryResult;
30use polars_io::RowIndex;
31use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38
39use crate::frame::cached_arenas::CachedArena;
40use crate::prelude::*;
41
42pub trait IntoLazy {
43 fn lazy(self) -> LazyFrame;
44}
45
46impl IntoLazy for DataFrame {
47 fn lazy(self) -> LazyFrame {
49 let lp = DslBuilder::from_existing_df(self).build();
50 LazyFrame {
51 logical_plan: lp,
52 opt_state: Default::default(),
53 cached_arena: Default::default(),
54 }
55 }
56}
57
58impl IntoLazy for LazyFrame {
59 fn lazy(self) -> LazyFrame {
60 self
61 }
62}
63
64#[derive(Clone, Default)]
69#[must_use]
70pub struct LazyFrame {
71 pub logical_plan: DslPlan,
72 pub(crate) opt_state: OptFlags,
73 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
74}
75
76impl From<DslPlan> for LazyFrame {
77 fn from(plan: DslPlan) -> Self {
78 Self {
79 logical_plan: plan,
80 opt_state: OptFlags::default(),
81 cached_arena: Default::default(),
82 }
83 }
84}
85
86impl LazyFrame {
87 pub(crate) fn from_inner(
88 logical_plan: DslPlan,
89 opt_state: OptFlags,
90 cached_arena: Arc<Mutex<Option<CachedArena>>>,
91 ) -> Self {
92 Self {
93 logical_plan,
94 opt_state,
95 cached_arena,
96 }
97 }
98
99 pub(crate) fn get_plan_builder(self) -> DslBuilder {
100 DslBuilder::from(self.logical_plan)
101 }
102
103 fn get_opt_state(&self) -> OptFlags {
104 self.opt_state
105 }
106
107 pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
108 LazyFrame {
109 logical_plan,
110 opt_state,
111 cached_arena: Default::default(),
112 }
113 }
114
115 pub fn get_current_optimizations(&self) -> OptFlags {
117 self.opt_state
118 }
119
120 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
122 self.opt_state = opt_state;
123 self
124 }
125
126 pub fn without_optimizations(self) -> Self {
128 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
129 }
130
131 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
133 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
134 self
135 }
136
137 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
139 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
140 self
141 }
142
143 pub fn with_check_order(mut self, toggle: bool) -> Self {
146 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
147 self
148 }
149
150 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
152 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
153 self
154 }
155
156 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
158 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
159 self
160 }
161
162 pub fn with_type_check(mut self, toggle: bool) -> Self {
164 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
165 self
166 }
167
168 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
170 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
171 self
172 }
173
174 #[cfg(feature = "cse")]
176 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
177 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
178 self
179 }
180
181 #[cfg(feature = "cse")]
183 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
184 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
185 self
186 }
187
188 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
190 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
191 self
192 }
193
194 #[cfg(feature = "new_streaming")]
195 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
196 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
197 self
198 }
199
200 pub fn with_gpu(mut self, toggle: bool) -> Self {
201 self.opt_state.set(OptFlags::GPU, toggle);
202 self
203 }
204
205 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
207 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
208 self
209 }
210
211 pub fn _with_eager(mut self, toggle: bool) -> Self {
213 self.opt_state.set(OptFlags::EAGER, toggle);
214 self
215 }
216
217 pub fn describe_plan(&self) -> PolarsResult<String> {
219 Ok(self.clone().to_alp()?.describe())
220 }
221
222 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
224 Ok(self.clone().to_alp()?.describe_tree_format())
225 }
226
227 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
231 Ok(self.clone().to_alp_optimized()?.describe())
232 }
233
234 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
238 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
239 }
240
241 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
246 if optimized {
247 self.describe_optimized_plan()
248 } else {
249 self.describe_plan()
250 }
251 }
252
253 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
293 let opt_state = self.get_opt_state();
294 let lp = self
295 .get_plan_builder()
296 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
297 .build();
298 Self::from_logical_plan(lp, opt_state)
299 }
300
301 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
321 self,
322 by_exprs: E,
323 sort_options: SortMultipleOptions,
324 ) -> Self {
325 let by_exprs = by_exprs.as_ref().to_vec();
326 if by_exprs.is_empty() {
327 self
328 } else {
329 let opt_state = self.get_opt_state();
330 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
331 Self::from_logical_plan(lp, opt_state)
332 }
333 }
334
335 pub fn top_k<E: AsRef<[Expr]>>(
336 self,
337 k: IdxSize,
338 by_exprs: E,
339 sort_options: SortMultipleOptions,
340 ) -> Self {
341 self.sort_by_exprs(
343 by_exprs,
344 sort_options.with_order_reversed().with_nulls_last(true),
345 )
346 .slice(0, k)
347 }
348
349 pub fn bottom_k<E: AsRef<[Expr]>>(
350 self,
351 k: IdxSize,
352 by_exprs: E,
353 sort_options: SortMultipleOptions,
354 ) -> Self {
355 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
357 .slice(0, k)
358 }
359
360 pub fn reverse(self) -> Self {
376 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
377 }
378
379 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
387 where
388 I: IntoIterator<Item = T>,
389 J: IntoIterator<Item = S>,
390 T: AsRef<str>,
391 S: AsRef<str>,
392 {
393 let iter = existing.into_iter();
394 let cap = iter.size_hint().0;
395 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
396 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
397
398 for (existing, new) in iter.zip(new) {
401 let existing = existing.as_ref();
402 let new = new.as_ref();
403 if new != existing {
404 existing_vec.push(existing.into());
405 new_vec.push(new.into());
406 }
407 }
408
409 self.map_private(DslFunction::Rename {
410 existing: existing_vec.into(),
411 new: new_vec.into(),
412 strict,
413 })
414 }
415
416 pub fn drop(self, columns: Selector) -> Self {
423 let opt_state = self.get_opt_state();
424 let lp = self.get_plan_builder().drop(columns).build();
425 Self::from_logical_plan(lp, opt_state)
426 }
427
428 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
433 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
434 }
435
436 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
441 self.select(vec![
442 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
443 ])
444 }
445
446 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448 let opt_state = self.get_opt_state();
449 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
450 Self::from_logical_plan(lp, opt_state)
451 }
452
453 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
455 let opt_state = self.get_opt_state();
456 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
457 Self::from_logical_plan(lp, opt_state)
458 }
459
460 pub fn cache(self) -> Self {
464 let opt_state = self.get_opt_state();
465 let lp = self.get_plan_builder().cache().build();
466 Self::from_logical_plan(lp, opt_state)
467 }
468
469 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
471 let cast_cols: Vec<Expr> = dtypes
472 .into_iter()
473 .map(|(name, dt)| {
474 let name = PlSmallStr::from_str(name);
475
476 if strict {
477 col(name).strict_cast(dt)
478 } else {
479 col(name).cast(dt)
480 }
481 })
482 .collect();
483
484 if cast_cols.is_empty() {
485 self
486 } else {
487 self.with_columns(cast_cols)
488 }
489 }
490
491 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
493 self.with_columns(vec![if strict {
494 col(PlSmallStr::from_static("*")).strict_cast(dtype)
495 } else {
496 col(PlSmallStr::from_static("*")).cast(dtype)
497 }])
498 }
499
500 pub fn optimize(
501 self,
502 lp_arena: &mut Arena<IR>,
503 expr_arena: &mut Arena<AExpr>,
504 ) -> PolarsResult<Node> {
505 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
506 }
507
508 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
509 let (mut lp_arena, mut expr_arena) = self.get_arenas();
510 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
511
512 Ok(IRPlan::new(node, lp_arena, expr_arena))
513 }
514
515 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
516 let (mut lp_arena, mut expr_arena) = self.get_arenas();
517 let node = to_alp(
518 self.logical_plan,
519 &mut expr_arena,
520 &mut lp_arena,
521 &mut self.opt_state,
522 )?;
523 let plan = IRPlan::new(node, lp_arena, expr_arena);
524 Ok(plan)
525 }
526
527 pub(crate) fn optimize_with_scratch(
528 self,
529 lp_arena: &mut Arena<IR>,
530 expr_arena: &mut Arena<AExpr>,
531 scratch: &mut Vec<Node>,
532 ) -> PolarsResult<Node> {
533 let lp_top = optimize(
534 self.logical_plan,
535 self.opt_state,
536 lp_arena,
537 expr_arena,
538 scratch,
539 apply_scan_predicate_to_scan_ir,
540 )?;
541
542 Ok(lp_top)
543 }
544
545 fn prepare_collect_post_opt<P>(
546 mut self,
547 check_sink: bool,
548 query_start: Option<std::time::Instant>,
549 post_opt: P,
550 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
551 where
552 P: FnOnce(
553 Node,
554 &mut Arena<IR>,
555 &mut Arena<AExpr>,
556 Option<std::time::Duration>,
557 ) -> PolarsResult<()>,
558 {
559 let (mut lp_arena, mut expr_arena) = self.get_arenas();
560
561 let mut scratch = vec![];
562 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
563
564 post_opt(
565 lp_top,
566 &mut lp_arena,
567 &mut expr_arena,
568 query_start.map(|s| s.elapsed()),
571 )?;
572
573 let no_file_sink = if check_sink {
575 !matches!(
576 lp_arena.get(lp_top),
577 IR::Sink {
578 payload: SinkTypeIR::File { .. },
579 ..
580 }
581 )
582 } else {
583 true
584 };
585 let physical_plan = create_physical_plan(
586 lp_top,
587 &mut lp_arena,
588 &mut expr_arena,
589 BUILD_STREAMING_EXECUTOR,
590 )?;
591
592 let state = ExecutionState::new();
593 Ok((state, physical_plan, no_file_sink))
594 }
595
596 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
598 where
599 P: FnOnce(
600 Node,
601 &mut Arena<IR>,
602 &mut Arena<AExpr>,
603 Option<std::time::Duration>,
604 ) -> PolarsResult<()>,
605 {
606 let (mut state, mut physical_plan, _) =
607 self.prepare_collect_post_opt(false, None, post_opt)?;
608 physical_plan.execute(&mut state)
609 }
610
611 #[allow(unused_mut)]
612 fn prepare_collect(
613 self,
614 check_sink: bool,
615 query_start: Option<std::time::Instant>,
616 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
617 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
618 }
619
620 pub fn collect_with_engine(mut self, engine: Engine) -> PolarsResult<QueryResult> {
625 let engine = match engine {
626 Engine::Streaming => Engine::Streaming,
627 _ if std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1") => {
628 Engine::Streaming
629 },
630 Engine::Auto => Engine::InMemory,
631 v => v,
632 };
633
634 if engine != Engine::Streaming
635 && std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1")
636 {
637 feature_gated!("new_streaming", {
638 if let Some(r) = self.clone()._collect_with_streaming_suppress_todo_panic() {
639 return r;
640 }
641 })
642 }
643 match engine {
644 Engine::Streaming => {
645 feature_gated!("new_streaming", self = self.with_new_streaming(true))
646 },
647 Engine::Gpu => self = self.with_gpu(true),
648 _ => (),
649 }
650
651 let mut ir_plan = self.to_alp_optimized()?;
652
653 ir_plan.ensure_root_node_is_sink();
654
655 match engine {
656 Engine::Streaming => feature_gated!("new_streaming", {
657 polars_stream::run_query(
658 ir_plan.lp_top,
659 &mut ir_plan.lp_arena,
660 &mut ir_plan.expr_arena,
661 )
662 }),
663 Engine::InMemory | Engine::Gpu => {
664 if let IR::SinkMultiple { inputs } = ir_plan.root() {
665 polars_ensure!(
666 engine != Engine::Gpu,
667 InvalidOperation:
668 "collect_all is not supported for the gpu engine"
669 );
670
671 return create_multiple_physical_plans(
672 inputs.clone().as_slice(),
673 &mut ir_plan.lp_arena,
674 &mut ir_plan.expr_arena,
675 BUILD_STREAMING_EXECUTOR,
676 )?
677 .execute()
678 .map(QueryResult::Multiple);
679 }
680
681 let mut physical_plan = create_physical_plan(
682 ir_plan.lp_top,
683 &mut ir_plan.lp_arena,
684 &mut ir_plan.expr_arena,
685 BUILD_STREAMING_EXECUTOR,
686 )?;
687 let mut state = ExecutionState::new();
688 physical_plan.execute(&mut state).map(QueryResult::Single)
689 },
690 Engine::Auto => unreachable!(),
691 }
692 }
693
694 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
695 let sink_multiple = LazyFrame {
696 logical_plan: DslPlan::SinkMultiple { inputs: plans },
697 opt_state,
698 cached_arena: Default::default(),
699 };
700 sink_multiple.explain(true)
701 }
702
703 pub fn collect_all_with_engine(
704 plans: Vec<DslPlan>,
705 engine: Engine,
706 opt_state: OptFlags,
707 ) -> PolarsResult<Vec<DataFrame>> {
708 if plans.is_empty() {
709 return Ok(Vec::new());
710 }
711
712 LazyFrame {
713 logical_plan: DslPlan::SinkMultiple { inputs: plans },
714 opt_state,
715 cached_arena: Default::default(),
716 }
717 .collect_with_engine(engine)
718 .map(|r| r.unwrap_multiple())
719 }
720
721 pub fn collect(self) -> PolarsResult<DataFrame> {
739 self.collect_with_engine(Engine::Auto).map(|r| match r {
740 QueryResult::Single(df) => df,
741 QueryResult::Multiple(_) => DataFrame::empty(),
743 })
744 }
745
746 #[cfg(feature = "async")]
751 pub fn collect_batches(
752 self,
753 engine: Engine,
754 maintain_order: bool,
755 chunk_size: Option<NonZeroUsize>,
756 lazy: bool,
757 ) -> PolarsResult<CollectBatches> {
758 let (send, recv) = sync_channel(1);
759 let runner_send = send.clone();
760 let ldf = self.sink_batches(
761 PlanCallback::new(move |df| {
762 let send_result = send.send(Ok(df));
764 Ok(send_result.is_err())
765 }),
766 maintain_order,
767 chunk_size,
768 )?;
769 let runner = move || {
770 polars_core::runtime::ASYNC.spawn_blocking(move || {
774 if let Err(e) = ldf.collect_with_engine(engine) {
775 runner_send.send(Err(e)).ok();
776 }
777 });
778 };
779
780 let mut collect_batches = CollectBatches {
781 recv,
782 runner: Some(Box::new(runner)),
783 };
784 if !lazy {
785 collect_batches.start();
786 }
787 Ok(collect_batches)
788 }
789
790 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
793 where
794 P: FnOnce(
795 Node,
796 &mut Arena<IR>,
797 &mut Arena<AExpr>,
798 Option<std::time::Duration>,
799 ) -> PolarsResult<()>,
800 {
801 let query_start = std::time::Instant::now();
802 let (mut state, mut physical_plan, _) =
803 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
804 state.time_nodes(query_start);
805 let out = physical_plan.execute(&mut state)?;
806 let timer_df = state.finish_timer()?;
807 Ok((out, timer_df))
808 }
809
810 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
818 self._profile_post_opt(|_, _, _, _| Ok(()))
819 }
820
821 pub fn sink_batches(
822 mut self,
823 function: PlanCallback<DataFrame, bool>,
824 maintain_order: bool,
825 chunk_size: Option<NonZeroUsize>,
826 ) -> PolarsResult<Self> {
827 use polars_plan::prelude::sink::CallbackSinkType;
828
829 polars_ensure!(
830 !matches!(self.logical_plan, DslPlan::Sink { .. }),
831 InvalidOperation: "cannot create a sink on top of another sink"
832 );
833
834 self.logical_plan = DslPlan::Sink {
835 input: Arc::new(self.logical_plan),
836 payload: SinkType::Callback(CallbackSinkType {
837 function,
838 maintain_order,
839 chunk_size,
840 }),
841 };
842
843 Ok(self)
844 }
845
846 #[cfg(feature = "new_streaming")]
848 fn _collect_with_streaming_suppress_todo_panic(
849 mut self,
850 ) -> Option<PolarsResult<polars_core::query_result::QueryResult>> {
851 self.opt_state |= OptFlags::NEW_STREAMING;
852 let mut ir_plan = match self.to_alp_optimized() {
853 Ok(v) => v,
854 Err(e) => return Some(Err(e)),
855 };
856
857 ir_plan.ensure_root_node_is_sink();
858
859 let f = || {
860 polars_stream::run_query(
861 ir_plan.lp_top,
862 &mut ir_plan.lp_arena,
863 &mut ir_plan.expr_arena,
864 )
865 };
866
867 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
868 Ok(v) => Some(v),
869 Err(e) => {
870 if e.downcast_ref::<&str>()
873 .is_some_and(|s| s.starts_with("not yet implemented"))
874 {
875 if polars_core::config::verbose() {
876 eprintln!(
877 "caught unimplemented error in new streaming engine, falling back to normal engine"
878 );
879 }
880 None
881 } else {
882 std::panic::resume_unwind(e)
883 }
884 },
885 }
886 }
887
888 pub fn sink(
889 mut self,
890 sink_type: SinkDestination,
891 file_format: FileWriteFormat,
892 unified_sink_args: UnifiedSinkArgs,
893 ) -> PolarsResult<Self> {
894 polars_ensure!(
895 !matches!(self.logical_plan, DslPlan::Sink { .. }),
896 InvalidOperation: "cannot create a sink on top of another sink"
897 );
898
899 self.logical_plan = DslPlan::Sink {
900 input: Arc::new(self.logical_plan),
901 payload: match sink_type {
902 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
903 target,
904 file_format,
905 unified_sink_args,
906 }),
907 SinkDestination::Partitioned {
908 base_path,
909 file_path_provider,
910 partition_strategy,
911 max_rows_per_file,
912 approximate_bytes_per_file,
913 } => SinkType::Partitioned(PartitionedSinkOptions {
914 base_path,
915 file_path_provider,
916 partition_strategy,
917 file_format,
918 unified_sink_args,
919 max_rows_per_file,
920 approximate_bytes_per_file,
921 }),
922 },
923 };
924 Ok(self)
925 }
926
927 pub fn filter(self, predicate: Expr) -> Self {
945 let opt_state = self.get_opt_state();
946 let lp = self.get_plan_builder().filter(predicate).build();
947 Self::from_logical_plan(lp, opt_state)
948 }
949
950 pub fn remove(self, predicate: Expr) -> Self {
968 self.filter(predicate.neq_missing(lit(true)))
969 }
970
971 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
997 let exprs = exprs.as_ref().to_vec();
998 self.select_impl(
999 exprs,
1000 ProjectionOptions {
1001 run_parallel: true,
1002 duplicate_check: true,
1003 should_broadcast: true,
1004 },
1005 )
1006 }
1007
1008 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1009 let exprs = exprs.as_ref().to_vec();
1010 self.select_impl(
1011 exprs,
1012 ProjectionOptions {
1013 run_parallel: false,
1014 duplicate_check: true,
1015 should_broadcast: true,
1016 },
1017 )
1018 }
1019
1020 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1021 let opt_state = self.get_opt_state();
1022 let lp = self.get_plan_builder().project(exprs, options).build();
1023 Self::from_logical_plan(lp, opt_state)
1024 }
1025
1026 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1047 let keys = by
1048 .as_ref()
1049 .iter()
1050 .map(|e| e.clone().into())
1051 .collect::<Vec<_>>();
1052 let opt_state = self.get_opt_state();
1053
1054 #[cfg(feature = "dynamic_group_by")]
1055 {
1056 LazyGroupBy {
1057 logical_plan: self.logical_plan,
1058 opt_state,
1059 keys,
1060 predicates: vec![],
1061 maintain_order: false,
1062 dynamic_options: None,
1063 rolling_options: None,
1064 }
1065 }
1066
1067 #[cfg(not(feature = "dynamic_group_by"))]
1068 {
1069 LazyGroupBy {
1070 logical_plan: self.logical_plan,
1071 opt_state,
1072 keys,
1073 predicates: vec![],
1074 maintain_order: false,
1075 }
1076 }
1077 }
1078
1079 #[cfg(feature = "dynamic_group_by")]
1087 pub fn rolling<E: AsRef<[Expr]>>(
1088 mut self,
1089 index_column: Expr,
1090 group_by: E,
1091 mut options: RollingGroupOptions,
1092 ) -> LazyGroupBy {
1093 if let Expr::Column(name) = index_column {
1094 options.index_column = name;
1095 } else {
1096 let output_field = index_column
1097 .to_field(&self.collect_schema().unwrap())
1098 .unwrap();
1099 return self.with_column(index_column).rolling(
1100 Expr::Column(output_field.name().clone()),
1101 group_by,
1102 options,
1103 );
1104 }
1105 let opt_state = self.get_opt_state();
1106 LazyGroupBy {
1107 logical_plan: self.logical_plan,
1108 opt_state,
1109 predicates: vec![],
1110 keys: group_by.as_ref().to_vec(),
1111 maintain_order: true,
1112 dynamic_options: None,
1113 rolling_options: Some(options),
1114 }
1115 }
1116
1117 #[cfg(feature = "dynamic_group_by")]
1133 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1134 mut self,
1135 index_column: Expr,
1136 group_by: E,
1137 mut options: DynamicGroupOptions,
1138 ) -> LazyGroupBy {
1139 if let Expr::Column(name) = index_column {
1140 options.index_column = name;
1141 } else {
1142 let output_field = index_column
1143 .to_field(&self.collect_schema().unwrap())
1144 .unwrap();
1145 return self.with_column(index_column).group_by_dynamic(
1146 Expr::Column(output_field.name().clone()),
1147 group_by,
1148 options,
1149 );
1150 }
1151 let opt_state = self.get_opt_state();
1152 LazyGroupBy {
1153 logical_plan: self.logical_plan,
1154 opt_state,
1155 predicates: vec![],
1156 keys: group_by.as_ref().to_vec(),
1157 maintain_order: true,
1158 dynamic_options: Some(options),
1159 rolling_options: None,
1160 }
1161 }
1162
1163 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1165 let keys = by
1166 .as_ref()
1167 .iter()
1168 .map(|e| e.clone().into())
1169 .collect::<Vec<_>>();
1170 let opt_state = self.get_opt_state();
1171
1172 #[cfg(feature = "dynamic_group_by")]
1173 {
1174 LazyGroupBy {
1175 logical_plan: self.logical_plan,
1176 opt_state,
1177 keys,
1178 predicates: vec![],
1179 maintain_order: true,
1180 dynamic_options: None,
1181 rolling_options: None,
1182 }
1183 }
1184
1185 #[cfg(not(feature = "dynamic_group_by"))]
1186 {
1187 LazyGroupBy {
1188 logical_plan: self.logical_plan,
1189 opt_state,
1190 keys,
1191 predicates: vec![],
1192 maintain_order: true,
1193 }
1194 }
1195 }
1196
1197 #[cfg(feature = "semi_anti_join")]
1214 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1215 self.join(
1216 other,
1217 [left_on.into()],
1218 [right_on.into()],
1219 JoinArgs::new(JoinType::Anti),
1220 )
1221 }
1222
1223 #[cfg(feature = "cross_join")]
1225 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1226 self.join(
1227 other,
1228 vec![],
1229 vec![],
1230 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1231 )
1232 }
1233
1234 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1251 self.join(
1252 other,
1253 [left_on.into()],
1254 [right_on.into()],
1255 JoinArgs::new(JoinType::Left),
1256 )
1257 }
1258
1259 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1276 self.join(
1277 other,
1278 [left_on.into()],
1279 [right_on.into()],
1280 JoinArgs::new(JoinType::Inner),
1281 )
1282 }
1283
1284 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1301 self.join(
1302 other,
1303 [left_on.into()],
1304 [right_on.into()],
1305 JoinArgs::new(JoinType::Full),
1306 )
1307 }
1308
1309 #[cfg(feature = "semi_anti_join")]
1326 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1327 self.join(
1328 other,
1329 [left_on.into()],
1330 [right_on.into()],
1331 JoinArgs::new(JoinType::Semi),
1332 )
1333 }
1334
1335 pub fn join<E: AsRef<[Expr]>>(
1357 self,
1358 other: LazyFrame,
1359 left_on: E,
1360 right_on: E,
1361 args: JoinArgs,
1362 ) -> LazyFrame {
1363 let left_on = left_on.as_ref().to_vec();
1364 let right_on = right_on.as_ref().to_vec();
1365
1366 self._join_impl(other, left_on, right_on, args)
1367 }
1368
1369 fn _join_impl(
1370 self,
1371 other: LazyFrame,
1372 left_on: Vec<Expr>,
1373 right_on: Vec<Expr>,
1374 args: JoinArgs,
1375 ) -> LazyFrame {
1376 let JoinArgs {
1377 how,
1378 validation,
1379 suffix,
1380 slice,
1381 nulls_equal,
1382 coalesce,
1383 maintain_order,
1384 build_side,
1385 } = args;
1386
1387 if slice.is_some() {
1388 panic!("impl error: slice is not handled")
1389 }
1390
1391 let mut builder = self
1392 .join_builder()
1393 .with(other)
1394 .left_on(left_on)
1395 .right_on(right_on)
1396 .how(how)
1397 .validate(validation)
1398 .join_nulls(nulls_equal)
1399 .coalesce(coalesce)
1400 .maintain_order(maintain_order)
1401 .build_side(build_side);
1402
1403 if let Some(suffix) = suffix {
1404 builder = builder.suffix(suffix);
1405 }
1406
1407 builder.finish()
1409 }
1410
1411 pub fn join_builder(self) -> JoinBuilder {
1417 JoinBuilder::new(self)
1418 }
1419
1420 pub fn gather(self, idxs: LazyFrame, null_on_oob: bool) -> LazyFrame {
1424 let opt_state = self.get_opt_state();
1425 let lp = self
1426 .get_plan_builder()
1427 .gather(idxs.logical_plan, null_on_oob)
1428 .build();
1429 Self::from_logical_plan(lp, opt_state)
1430 }
1431
1432 pub fn with_column(self, expr: Expr) -> LazyFrame {
1450 let opt_state = self.get_opt_state();
1451 let lp = self
1452 .get_plan_builder()
1453 .with_columns(
1454 vec![expr],
1455 ProjectionOptions {
1456 run_parallel: false,
1457 duplicate_check: true,
1458 should_broadcast: true,
1459 },
1460 )
1461 .build();
1462 Self::from_logical_plan(lp, opt_state)
1463 }
1464
1465 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1480 let exprs = exprs.as_ref().to_vec();
1481 self.with_columns_impl(
1482 exprs,
1483 ProjectionOptions {
1484 run_parallel: true,
1485 duplicate_check: true,
1486 should_broadcast: true,
1487 },
1488 )
1489 }
1490
1491 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1493 let exprs = exprs.as_ref().to_vec();
1494 self.with_columns_impl(
1495 exprs,
1496 ProjectionOptions {
1497 run_parallel: false,
1498 duplicate_check: true,
1499 should_broadcast: true,
1500 },
1501 )
1502 }
1503
1504 pub fn match_to_schema(
1506 self,
1507 schema: SchemaRef,
1508 per_column: Arc<[MatchToSchemaPerColumn]>,
1509 extra_columns: ExtraColumnsPolicy,
1510 ) -> LazyFrame {
1511 let opt_state = self.get_opt_state();
1512 let lp = self
1513 .get_plan_builder()
1514 .match_to_schema(schema, per_column, extra_columns)
1515 .build();
1516 Self::from_logical_plan(lp, opt_state)
1517 }
1518
1519 pub fn pipe_with_schema(
1520 self,
1521 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1522 ) -> Self {
1523 let opt_state = self.get_opt_state();
1524 let lp = self
1525 .get_plan_builder()
1526 .pipe_with_schema(vec![], callback)
1527 .build();
1528 Self::from_logical_plan(lp, opt_state)
1529 }
1530
1531 pub fn pipe_with_schemas(
1532 self,
1533 others: Vec<LazyFrame>,
1534 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1535 ) -> Self {
1536 let opt_state = self.get_opt_state();
1537 let lp = self
1538 .get_plan_builder()
1539 .pipe_with_schema(
1540 others.into_iter().map(|lf| lf.logical_plan).collect(),
1541 callback,
1542 )
1543 .build();
1544 Self::from_logical_plan(lp, opt_state)
1545 }
1546
1547 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1548 let opt_state = self.get_opt_state();
1549 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1550 Self::from_logical_plan(lp, opt_state)
1551 }
1552
1553 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1554 let contexts = contexts
1555 .as_ref()
1556 .iter()
1557 .map(|lf| lf.logical_plan.clone())
1558 .collect();
1559 let opt_state = self.get_opt_state();
1560 let lp = self.get_plan_builder().with_context(contexts).build();
1561 Self::from_logical_plan(lp, opt_state)
1562 }
1563
1564 pub fn max(self) -> Self {
1568 self.map_private(DslFunction::Stats(StatsFunction::Max))
1569 }
1570
1571 pub fn min(self) -> Self {
1575 self.map_private(DslFunction::Stats(StatsFunction::Min))
1576 }
1577
1578 pub fn sum(self) -> Self {
1588 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1589 }
1590
1591 pub fn mean(self) -> Self {
1596 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1597 }
1598
1599 pub fn median(self) -> Self {
1605 self.map_private(DslFunction::Stats(StatsFunction::Median))
1606 }
1607
1608 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1610 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1611 quantile,
1612 method,
1613 }))
1614 }
1615
1616 pub fn std(self, ddof: u8) -> Self {
1629 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1630 }
1631
1632 pub fn var(self, ddof: u8) -> Self {
1642 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1643 }
1644
1645 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1647 self.explode_impl(columns, options, false)
1648 }
1649
1650 fn explode_impl(
1652 self,
1653 columns: Selector,
1654 options: ExplodeOptions,
1655 allow_empty: bool,
1656 ) -> LazyFrame {
1657 let opt_state = self.get_opt_state();
1658 let lp = self
1659 .get_plan_builder()
1660 .explode(columns, options, allow_empty)
1661 .build();
1662 Self::from_logical_plan(lp, opt_state)
1663 }
1664
1665 pub fn null_count(self) -> LazyFrame {
1667 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1668 }
1669
1670 pub fn unique_stable(
1675 self,
1676 subset: Option<Selector>,
1677 keep_strategy: UniqueKeepStrategy,
1678 ) -> LazyFrame {
1679 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1680 self.unique_stable_generic(subset, keep_strategy)
1681 }
1682
1683 pub fn unique_stable_generic(
1684 self,
1685 subset: Option<Vec<Expr>>,
1686 keep_strategy: UniqueKeepStrategy,
1687 ) -> LazyFrame {
1688 let opt_state = self.get_opt_state();
1689 let options = DistinctOptionsDSL {
1690 subset,
1691 maintain_order: true,
1692 keep_strategy,
1693 };
1694 let lp = self.get_plan_builder().distinct(options).build();
1695 Self::from_logical_plan(lp, opt_state)
1696 }
1697
1698 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1706 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1707 self.unique_generic(subset, keep_strategy)
1708 }
1709
1710 pub fn unique_generic(
1711 self,
1712 subset: Option<Vec<Expr>>,
1713 keep_strategy: UniqueKeepStrategy,
1714 ) -> LazyFrame {
1715 let opt_state = self.get_opt_state();
1716 let options = DistinctOptionsDSL {
1717 subset,
1718 maintain_order: false,
1719 keep_strategy,
1720 };
1721 let lp = self.get_plan_builder().distinct(options).build();
1722 Self::from_logical_plan(lp, opt_state)
1723 }
1724
1725 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1730 let opt_state = self.get_opt_state();
1731 let lp = self.get_plan_builder().drop_nans(subset).build();
1732 Self::from_logical_plan(lp, opt_state)
1733 }
1734
1735 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1740 let opt_state = self.get_opt_state();
1741 let lp = self.get_plan_builder().drop_nulls(subset).build();
1742 Self::from_logical_plan(lp, opt_state)
1743 }
1744
1745 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1755 let opt_state = self.get_opt_state();
1756 let lp = self.get_plan_builder().slice(offset, len).build();
1757 Self::from_logical_plan(lp, opt_state)
1758 }
1759
1760 pub fn clear(self) -> LazyFrame {
1762 self.slice(0, 0)
1763 }
1764
1765 pub fn first(self) -> LazyFrame {
1769 self.slice(0, 1)
1770 }
1771
1772 pub fn last(self) -> LazyFrame {
1776 self.slice(-1, 1)
1777 }
1778
1779 pub fn tail(self, n: IdxSize) -> LazyFrame {
1783 let neg_tail = -(n as i64);
1784 self.slice(neg_tail, n)
1785 }
1786
1787 #[cfg(feature = "pivot")]
1788 #[expect(clippy::too_many_arguments)]
1789 pub fn pivot(
1790 self,
1791 on: Selector,
1792 on_columns: Arc<DataFrame>,
1793 index: Selector,
1794 values: Selector,
1795 agg: Expr,
1796 maintain_order: bool,
1797 separator: PlSmallStr,
1798 column_naming: PivotColumnNaming,
1799 ) -> LazyFrame {
1800 let opt_state = self.get_opt_state();
1801 let lp = self
1802 .get_plan_builder()
1803 .pivot(
1804 on,
1805 on_columns,
1806 index,
1807 values,
1808 agg,
1809 maintain_order,
1810 separator,
1811 column_naming,
1812 )
1813 .build();
1814 Self::from_logical_plan(lp, opt_state)
1815 }
1816
1817 #[cfg(feature = "pivot")]
1821 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1822 let opt_state = self.get_opt_state();
1823 let lp = self.get_plan_builder().unpivot(args).build();
1824 Self::from_logical_plan(lp, opt_state)
1825 }
1826
1827 pub fn limit(self, n: IdxSize) -> LazyFrame {
1829 self.slice(0, n)
1830 }
1831
1832 pub fn map<F>(
1846 self,
1847 function: F,
1848 optimizations: AllowedOptimizations,
1849 schema: Option<Arc<dyn UdfSchema>>,
1850 name: Option<&'static str>,
1851 ) -> LazyFrame
1852 where
1853 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1854 {
1855 let opt_state = self.get_opt_state();
1856 let lp = self
1857 .get_plan_builder()
1858 .map(
1859 function,
1860 optimizations,
1861 schema,
1862 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1863 )
1864 .build();
1865 Self::from_logical_plan(lp, opt_state)
1866 }
1867
1868 #[cfg(feature = "python")]
1869 pub fn map_python(
1870 self,
1871 function: polars_utils::python_function::PythonFunction,
1872 optimizations: AllowedOptimizations,
1873 schema: Option<SchemaRef>,
1874 validate_output: bool,
1875 ) -> LazyFrame {
1876 let opt_state = self.get_opt_state();
1877 let lp = self
1878 .get_plan_builder()
1879 .map_python(function, optimizations, schema, validate_output)
1880 .build();
1881 Self::from_logical_plan(lp, opt_state)
1882 }
1883
1884 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1885 let opt_state = self.get_opt_state();
1886 let lp = self.get_plan_builder().map_private(function).build();
1887 Self::from_logical_plan(lp, opt_state)
1888 }
1889
1890 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1899 where
1900 S: Into<PlSmallStr>,
1901 {
1902 let name = name.into();
1903
1904 match &self.logical_plan {
1905 v @ DslPlan::Scan {
1906 scan_type,
1907 unified_scan_args,
1908 ..
1909 } if unified_scan_args.row_index.is_none()
1910 && !matches!(
1911 &**scan_type,
1912 FileScanDsl::Anonymous { .. } | FileScanDsl::ExpandedPaths { .. }
1913 ) =>
1914 {
1915 let DslPlan::Scan {
1916 sources,
1917 mut unified_scan_args,
1918 scan_type,
1919 cached_ir: _,
1920 } = v.clone()
1921 else {
1922 unreachable!()
1923 };
1924
1925 unified_scan_args.row_index = Some(RowIndex {
1926 name,
1927 offset: offset.unwrap_or(0),
1928 });
1929
1930 DslPlan::Scan {
1931 sources,
1932 unified_scan_args,
1933 scan_type,
1934 cached_ir: Default::default(),
1935 }
1936 .into()
1937 },
1938 _ => self.map_private(DslFunction::RowIndex { name, offset }),
1939 }
1940 }
1941
1942 pub fn count(self) -> LazyFrame {
1944 self.select(vec![col(PlSmallStr::from_static("*")).count()])
1945 }
1946
1947 #[cfg(feature = "dtype-struct")]
1950 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1951 self.map_private(DslFunction::Unnest {
1952 columns: cols,
1953 separator,
1954 })
1955 }
1956
1957 #[cfg(feature = "merge_sorted")]
1958 pub fn merge_sorted<S>(
1959 self,
1960 other: LazyFrame,
1961 key: S,
1962 maintain_order: bool,
1963 ) -> PolarsResult<LazyFrame>
1964 where
1965 S: Into<PlSmallStr>,
1966 {
1967 let key = key.into();
1968
1969 let lp = DslPlan::MergeSorted {
1970 input_left: Arc::new(self.logical_plan),
1971 input_right: Arc::new(other.logical_plan),
1972 key,
1973 maintain_order,
1974 };
1975 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1976 }
1977
1978 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1979 let lp = DslPlan::MapFunction {
1980 input: Arc::new(self.logical_plan),
1981 function: DslFunction::Hint(hint),
1982 };
1983 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1984 }
1985}
1986
1987#[derive(Clone)]
1989pub struct LazyGroupBy {
1990 pub logical_plan: DslPlan,
1991 opt_state: OptFlags,
1992 keys: Vec<Expr>,
1993 predicates: Vec<Expr>,
1994 maintain_order: bool,
1995 #[cfg(feature = "dynamic_group_by")]
1996 dynamic_options: Option<DynamicGroupOptions>,
1997 #[cfg(feature = "dynamic_group_by")]
1998 rolling_options: Option<RollingGroupOptions>,
1999}
2000
2001impl From<LazyGroupBy> for LazyFrame {
2002 fn from(lgb: LazyGroupBy) -> Self {
2003 Self {
2004 logical_plan: lgb.logical_plan,
2005 opt_state: lgb.opt_state,
2006 cached_arena: Default::default(),
2007 }
2008 }
2009}
2010
2011impl LazyGroupBy {
2012 pub fn having(mut self, predicate: Expr) -> Self {
2033 self.predicates.push(predicate);
2034 self
2035 }
2036
2037 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2059 #[cfg(feature = "dynamic_group_by")]
2060 let lp = DslBuilder::from(self.logical_plan)
2061 .group_by(
2062 self.keys,
2063 self.predicates,
2064 aggs,
2065 None,
2066 self.maintain_order,
2067 self.dynamic_options,
2068 self.rolling_options,
2069 )
2070 .build();
2071
2072 #[cfg(not(feature = "dynamic_group_by"))]
2073 let lp = DslBuilder::from(self.logical_plan)
2074 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2075 .build();
2076 LazyFrame::from_logical_plan(lp, self.opt_state)
2077 }
2078
2079 pub fn head(self, n: Option<usize>) -> LazyFrame {
2081 let keys = self
2082 .keys
2083 .iter()
2084 .filter_map(|expr| expr_output_name(expr).ok())
2085 .collect::<Vec<_>>();
2086
2087 self.agg([all().as_expr().head(n)]).explode_impl(
2088 all() - by_name(keys.iter().cloned(), false, false),
2089 ExplodeOptions {
2090 empty_as_null: true,
2091 keep_nulls: true,
2092 },
2093 true,
2094 )
2095 }
2096
2097 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2099 let keys = self
2100 .keys
2101 .iter()
2102 .filter_map(|expr| expr_output_name(expr).ok())
2103 .collect::<Vec<_>>();
2104
2105 self.agg([all().as_expr().tail(n)]).explode_impl(
2106 all() - by_name(keys.iter().cloned(), false, false),
2107 ExplodeOptions {
2108 empty_as_null: true,
2109 keep_nulls: true,
2110 },
2111 true,
2112 )
2113 }
2114
2115 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2120 if !self.predicates.is_empty() {
2121 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2122 }
2123
2124 #[cfg(feature = "dynamic_group_by")]
2125 let options = GroupbyOptions {
2126 dynamic: self.dynamic_options,
2127 rolling: self.rolling_options,
2128 slice: None,
2129 };
2130
2131 #[cfg(not(feature = "dynamic_group_by"))]
2132 let options = GroupbyOptions { slice: None };
2133
2134 let lp = DslPlan::GroupBy {
2135 input: Arc::new(self.logical_plan),
2136 keys: self.keys,
2137 predicates: vec![],
2138 aggs: vec![],
2139 apply: Some((f, schema)),
2140 maintain_order: self.maintain_order,
2141 options: Arc::new(options),
2142 };
2143 LazyFrame::from_logical_plan(lp, self.opt_state)
2144 }
2145}
2146
2147#[must_use]
2148pub struct JoinBuilder {
2149 lf: LazyFrame,
2150 how: JoinType,
2151 other: Option<LazyFrame>,
2152 left_on: Vec<Expr>,
2153 right_on: Vec<Expr>,
2154 allow_parallel: bool,
2155 force_parallel: bool,
2156 suffix: Option<PlSmallStr>,
2157 validation: JoinValidation,
2158 nulls_equal: bool,
2159 coalesce: JoinCoalesce,
2160 maintain_order: MaintainOrderJoin,
2161 build_side: Option<JoinBuildSide>,
2162}
2163impl JoinBuilder {
2164 pub fn new(lf: LazyFrame) -> Self {
2166 Self {
2167 lf,
2168 other: None,
2169 how: JoinType::Inner,
2170 left_on: vec![],
2171 right_on: vec![],
2172 allow_parallel: true,
2173 force_parallel: false,
2174 suffix: None,
2175 validation: Default::default(),
2176 nulls_equal: false,
2177 coalesce: Default::default(),
2178 maintain_order: Default::default(),
2179 build_side: None,
2180 }
2181 }
2182
2183 pub fn with(mut self, other: LazyFrame) -> Self {
2185 self.other = Some(other);
2186 self
2187 }
2188
2189 pub fn how(mut self, how: JoinType) -> Self {
2191 self.how = how;
2192 self
2193 }
2194
2195 pub fn validate(mut self, validation: JoinValidation) -> Self {
2196 self.validation = validation;
2197 self
2198 }
2199
2200 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2204 let on = on.as_ref().to_vec();
2205 self.left_on.clone_from(&on);
2206 self.right_on = on;
2207 self
2208 }
2209
2210 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2214 self.left_on = on.as_ref().to_vec();
2215 self
2216 }
2217
2218 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2222 self.right_on = on.as_ref().to_vec();
2223 self
2224 }
2225
2226 pub fn allow_parallel(mut self, allow: bool) -> Self {
2228 self.allow_parallel = allow;
2229 self
2230 }
2231
2232 pub fn force_parallel(mut self, force: bool) -> Self {
2234 self.force_parallel = force;
2235 self
2236 }
2237
2238 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2240 self.nulls_equal = nulls_equal;
2241 self
2242 }
2243
2244 pub fn suffix<S>(mut self, suffix: S) -> Self
2247 where
2248 S: Into<PlSmallStr>,
2249 {
2250 self.suffix = Some(suffix.into());
2251 self
2252 }
2253
2254 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2256 self.coalesce = coalesce;
2257 self
2258 }
2259
2260 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2262 self.maintain_order = maintain_order;
2263 self
2264 }
2265
2266 pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2268 self.build_side = build_side;
2269 self
2270 }
2271
2272 pub fn finish(self) -> LazyFrame {
2274 let opt_state = self.lf.opt_state;
2275 let other = self.other.expect("'with' not set in join builder");
2276
2277 let args = JoinArgs {
2278 how: self.how,
2279 validation: self.validation,
2280 suffix: self.suffix,
2281 slice: None,
2282 nulls_equal: self.nulls_equal,
2283 coalesce: self.coalesce,
2284 maintain_order: self.maintain_order,
2285 build_side: self.build_side,
2286 };
2287
2288 let lp = self
2289 .lf
2290 .get_plan_builder()
2291 .join(
2292 other.logical_plan,
2293 self.left_on,
2294 self.right_on,
2295 JoinOptions {
2296 allow_parallel: self.allow_parallel,
2297 force_parallel: self.force_parallel,
2298 args,
2299 }
2300 .into(),
2301 )
2302 .build();
2303 LazyFrame::from_logical_plan(lp, opt_state)
2304 }
2305
2306 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2308 let opt_state = self.lf.opt_state;
2309 let other = self.other.expect("with not set");
2310
2311 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2313 if let Expr::BinaryExpr {
2314 op: Operator::And,
2315 left,
2316 right,
2317 } = predicate
2318 {
2319 decompose_and((*left).clone(), expanded_predicates);
2320 decompose_and((*right).clone(), expanded_predicates);
2321 } else {
2322 expanded_predicates.push(predicate);
2323 }
2324 }
2325 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2326 for predicate in predicates {
2327 decompose_and(predicate, &mut expanded_predicates);
2328 }
2329 let predicates: Vec<Expr> = expanded_predicates;
2330
2331 #[cfg(feature = "is_between")]
2333 let predicates: Vec<Expr> = {
2334 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2335 for predicate in predicates {
2336 if let Expr::Function {
2337 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2338 input,
2339 ..
2340 } = &predicate
2341 {
2342 if let [expr, lower, upper] = input.as_slice() {
2343 match closed {
2344 ClosedInterval::Both => {
2345 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2346 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2347 },
2348 ClosedInterval::Right => {
2349 expanded_predicates.push(expr.clone().gt(lower.clone()));
2350 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2351 },
2352 ClosedInterval::Left => {
2353 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2354 expanded_predicates.push(expr.clone().lt(upper.clone()));
2355 },
2356 ClosedInterval::None => {
2357 expanded_predicates.push(expr.clone().gt(lower.clone()));
2358 expanded_predicates.push(expr.clone().lt(upper.clone()));
2359 },
2360 }
2361 continue;
2362 }
2363 }
2364 expanded_predicates.push(predicate);
2365 }
2366 expanded_predicates
2367 };
2368
2369 let args = JoinArgs {
2370 how: self.how,
2371 validation: self.validation,
2372 suffix: self.suffix,
2373 slice: None,
2374 nulls_equal: self.nulls_equal,
2375 coalesce: self.coalesce,
2376 maintain_order: self.maintain_order,
2377 build_side: self.build_side,
2378 };
2379 let options = JoinOptions {
2380 allow_parallel: self.allow_parallel,
2381 force_parallel: self.force_parallel,
2382 args,
2383 };
2384
2385 let lp = DslPlan::Join {
2386 input_left: Arc::new(self.lf.logical_plan),
2387 input_right: Arc::new(other.logical_plan),
2388 left_on: Default::default(),
2389 right_on: Default::default(),
2390 predicates,
2391 options: Arc::from(options),
2392 };
2393
2394 LazyFrame::from_logical_plan(lp, opt_state)
2395 }
2396}
2397
2398pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2399 #[cfg(not(feature = "new_streaming"))]
2400 {
2401 None
2402 }
2403 #[cfg(feature = "new_streaming")]
2404 {
2405 Some(polars_stream::build_streaming_query_executor)
2406 }
2407};
2408
2409pub struct CollectBatches {
2410 recv: Receiver<PolarsResult<DataFrame>>,
2411 runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2412}
2413
2414impl CollectBatches {
2415 pub fn start(&mut self) {
2417 if let Some(runner) = self.runner.take() {
2418 runner()
2419 }
2420 }
2421}
2422
2423impl Iterator for CollectBatches {
2424 type Item = PolarsResult<DataFrame>;
2425
2426 fn next(&mut self) -> Option<Self::Item> {
2427 self.start();
2428 self.recv.recv().ok()
2429 }
2430}