1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::error::feature_gated;
26use polars_core::prelude::*;
27use polars_core::query_result::QueryResult;
28use polars_io::RowIndex;
29use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
30use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
31use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
32#[cfg(feature = "is_between")]
33use polars_ops::prelude::ClosedInterval;
34pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
35use polars_utils::pl_str::PlSmallStr;
36
37use crate::frame::cached_arenas::CachedArena;
38use crate::prelude::*;
39
40pub trait IntoLazy {
41 fn lazy(self) -> LazyFrame;
42}
43
44impl IntoLazy for DataFrame {
45 fn lazy(self) -> LazyFrame {
47 let lp = DslBuilder::from_existing_df(self).build();
48 LazyFrame {
49 logical_plan: lp,
50 opt_state: Default::default(),
51 cached_arena: Default::default(),
52 }
53 }
54}
55
56impl IntoLazy for LazyFrame {
57 fn lazy(self) -> LazyFrame {
58 self
59 }
60}
61
62#[derive(Clone, Default)]
67#[must_use]
68pub struct LazyFrame {
69 pub logical_plan: DslPlan,
70 pub(crate) opt_state: OptFlags,
71 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
72}
73
74impl From<DslPlan> for LazyFrame {
75 fn from(plan: DslPlan) -> Self {
76 Self {
77 logical_plan: plan,
78 opt_state: OptFlags::default(),
79 cached_arena: Default::default(),
80 }
81 }
82}
83
84impl LazyFrame {
85 pub(crate) fn from_inner(
86 logical_plan: DslPlan,
87 opt_state: OptFlags,
88 cached_arena: Arc<Mutex<Option<CachedArena>>>,
89 ) -> Self {
90 Self {
91 logical_plan,
92 opt_state,
93 cached_arena,
94 }
95 }
96
97 pub(crate) fn get_plan_builder(self) -> DslBuilder {
98 DslBuilder::from(self.logical_plan)
99 }
100
101 fn get_opt_state(&self) -> OptFlags {
102 self.opt_state
103 }
104
105 pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
106 LazyFrame {
107 logical_plan,
108 opt_state,
109 cached_arena: Default::default(),
110 }
111 }
112
113 pub fn get_current_optimizations(&self) -> OptFlags {
115 self.opt_state
116 }
117
118 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
120 self.opt_state = opt_state;
121 self
122 }
123
124 pub fn without_optimizations(self) -> Self {
126 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
127 }
128
129 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
131 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
132 self
133 }
134
135 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
137 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
138 self
139 }
140
141 pub fn with_check_order(mut self, toggle: bool) -> Self {
144 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
145 self
146 }
147
148 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
150 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
151 self
152 }
153
154 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
156 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
157 self
158 }
159
160 pub fn with_type_check(mut self, toggle: bool) -> Self {
162 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
163 self
164 }
165
166 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
168 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
169 self
170 }
171
172 #[cfg(feature = "cse")]
174 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
175 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
176 self
177 }
178
179 #[cfg(feature = "cse")]
181 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
182 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
183 self
184 }
185
186 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
188 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
189 self
190 }
191
192 #[cfg(feature = "new_streaming")]
193 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
194 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
195 self
196 }
197
198 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
200 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
201 self
202 }
203
204 pub fn _with_eager(mut self, toggle: bool) -> Self {
206 self.opt_state.set(OptFlags::EAGER, toggle);
207 self
208 }
209
210 pub fn describe_plan(&self) -> PolarsResult<String> {
212 Ok(self.clone().to_alp()?.describe())
213 }
214
215 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
217 Ok(self.clone().to_alp()?.describe_tree_format())
218 }
219
220 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
224 Ok(self.clone().to_alp_optimized()?.describe())
225 }
226
227 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
231 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
232 }
233
234 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
239 if optimized {
240 self.describe_optimized_plan()
241 } else {
242 self.describe_plan()
243 }
244 }
245
246 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
286 let opt_state = self.get_opt_state();
287 let lp = self
288 .get_plan_builder()
289 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
290 .build();
291 Self::from_logical_plan(lp, opt_state)
292 }
293
294 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
314 self,
315 by_exprs: E,
316 sort_options: SortMultipleOptions,
317 ) -> Self {
318 let by_exprs = by_exprs.as_ref().to_vec();
319 if by_exprs.is_empty() {
320 self
321 } else {
322 let opt_state = self.get_opt_state();
323 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
324 Self::from_logical_plan(lp, opt_state)
325 }
326 }
327
328 pub fn top_k<E: AsRef<[Expr]>>(
329 self,
330 k: IdxSize,
331 by_exprs: E,
332 sort_options: SortMultipleOptions,
333 ) -> Self {
334 self.sort_by_exprs(
336 by_exprs,
337 sort_options.with_order_reversed().with_nulls_last(true),
338 )
339 .slice(0, k)
340 }
341
342 pub fn bottom_k<E: AsRef<[Expr]>>(
343 self,
344 k: IdxSize,
345 by_exprs: E,
346 sort_options: SortMultipleOptions,
347 ) -> Self {
348 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
350 .slice(0, k)
351 }
352
353 pub fn reverse(self) -> Self {
369 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
370 }
371
372 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
380 where
381 I: IntoIterator<Item = T>,
382 J: IntoIterator<Item = S>,
383 T: AsRef<str>,
384 S: AsRef<str>,
385 {
386 let iter = existing.into_iter();
387 let cap = iter.size_hint().0;
388 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
389 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390
391 for (existing, new) in iter.zip(new) {
394 let existing = existing.as_ref();
395 let new = new.as_ref();
396 if new != existing {
397 existing_vec.push(existing.into());
398 new_vec.push(new.into());
399 }
400 }
401
402 self.map_private(DslFunction::Rename {
403 existing: existing_vec.into(),
404 new: new_vec.into(),
405 strict,
406 })
407 }
408
409 pub fn drop(self, columns: Selector) -> Self {
416 let opt_state = self.get_opt_state();
417 let lp = self.get_plan_builder().drop(columns).build();
418 Self::from_logical_plan(lp, opt_state)
419 }
420
421 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
426 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
427 }
428
429 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
434 self.select(vec![
435 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
436 ])
437 }
438
439 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
441 let opt_state = self.get_opt_state();
442 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
443 Self::from_logical_plan(lp, opt_state)
444 }
445
446 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448 let opt_state = self.get_opt_state();
449 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
450 Self::from_logical_plan(lp, opt_state)
451 }
452
453 pub fn cache(self) -> Self {
457 let opt_state = self.get_opt_state();
458 let lp = self.get_plan_builder().cache().build();
459 Self::from_logical_plan(lp, opt_state)
460 }
461
462 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
464 let cast_cols: Vec<Expr> = dtypes
465 .into_iter()
466 .map(|(name, dt)| {
467 let name = PlSmallStr::from_str(name);
468
469 if strict {
470 col(name).strict_cast(dt)
471 } else {
472 col(name).cast(dt)
473 }
474 })
475 .collect();
476
477 if cast_cols.is_empty() {
478 self
479 } else {
480 self.with_columns(cast_cols)
481 }
482 }
483
484 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
486 self.with_columns(vec![if strict {
487 col(PlSmallStr::from_static("*")).strict_cast(dtype)
488 } else {
489 col(PlSmallStr::from_static("*")).cast(dtype)
490 }])
491 }
492
493 pub fn optimize(
494 self,
495 lp_arena: &mut Arena<IR>,
496 expr_arena: &mut Arena<AExpr>,
497 ) -> PolarsResult<Node> {
498 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
499 }
500
501 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
502 let (mut lp_arena, mut expr_arena) = self.get_arenas();
503 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
504
505 Ok(IRPlan::new(node, lp_arena, expr_arena))
506 }
507
508 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
509 let (mut lp_arena, mut expr_arena) = self.get_arenas();
510 let node = to_alp(
511 self.logical_plan,
512 &mut expr_arena,
513 &mut lp_arena,
514 &mut self.opt_state,
515 )?;
516 let plan = IRPlan::new(node, lp_arena, expr_arena);
517 Ok(plan)
518 }
519
520 pub(crate) fn optimize_with_scratch(
521 self,
522 lp_arena: &mut Arena<IR>,
523 expr_arena: &mut Arena<AExpr>,
524 scratch: &mut Vec<Node>,
525 ) -> PolarsResult<Node> {
526 let lp_top = optimize(
527 self.logical_plan,
528 self.opt_state,
529 lp_arena,
530 expr_arena,
531 scratch,
532 apply_scan_predicate_to_scan_ir,
533 )?;
534
535 Ok(lp_top)
536 }
537
538 fn prepare_collect_post_opt<P>(
539 mut self,
540 check_sink: bool,
541 query_start: Option<std::time::Instant>,
542 post_opt: P,
543 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
544 where
545 P: FnOnce(
546 Node,
547 &mut Arena<IR>,
548 &mut Arena<AExpr>,
549 Option<std::time::Duration>,
550 ) -> PolarsResult<()>,
551 {
552 let (mut lp_arena, mut expr_arena) = self.get_arenas();
553
554 let mut scratch = vec![];
555 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
556
557 post_opt(
558 lp_top,
559 &mut lp_arena,
560 &mut expr_arena,
561 query_start.map(|s| s.elapsed()),
564 )?;
565
566 let no_file_sink = if check_sink {
568 !matches!(
569 lp_arena.get(lp_top),
570 IR::Sink {
571 payload: SinkTypeIR::File { .. },
572 ..
573 }
574 )
575 } else {
576 true
577 };
578 let physical_plan = create_physical_plan(
579 lp_top,
580 &mut lp_arena,
581 &mut expr_arena,
582 BUILD_STREAMING_EXECUTOR,
583 )?;
584
585 let state = ExecutionState::new();
586 Ok((state, physical_plan, no_file_sink))
587 }
588
589 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
591 where
592 P: FnOnce(
593 Node,
594 &mut Arena<IR>,
595 &mut Arena<AExpr>,
596 Option<std::time::Duration>,
597 ) -> PolarsResult<()>,
598 {
599 let (mut state, mut physical_plan, _) =
600 self.prepare_collect_post_opt(false, None, post_opt)?;
601 physical_plan.execute(&mut state)
602 }
603
604 #[allow(unused_mut)]
605 fn prepare_collect(
606 self,
607 check_sink: bool,
608 query_start: Option<std::time::Instant>,
609 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
610 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
611 }
612
613 pub fn collect_with_engine(mut self, engine: Engine) -> PolarsResult<QueryResult> {
618 #[cfg(feature = "new_streaming")]
619 {
620 if let Some(result) = self.try_new_streaming_if_requested() {
621 return result;
622 }
623 }
624
625 if let Engine::Streaming = engine {
626 feature_gated!("new_streaming", self = self.with_new_streaming(true))
627 }
628
629 let mut ir_plan = self.to_alp_optimized()?;
630
631 ir_plan.ensure_root_node_is_sink();
632
633 match engine {
634 Engine::Streaming => feature_gated!("new_streaming", {
635 polars_stream::run_query(
636 ir_plan.lp_top,
637 &mut ir_plan.lp_arena,
638 &mut ir_plan.expr_arena,
639 )
640 }),
641 Engine::Auto | Engine::InMemory | Engine::Gpu => {
642 if let IR::SinkMultiple { inputs } = ir_plan.root() {
643 polars_ensure!(
644 engine != Engine::Gpu,
645 InvalidOperation:
646 "collect_all is not supported for the gpu engine"
647 );
648
649 return create_multiple_physical_plans(
650 inputs.clone().as_slice(),
651 &mut ir_plan.lp_arena,
652 &mut ir_plan.expr_arena,
653 BUILD_STREAMING_EXECUTOR,
654 )?
655 .execute()
656 .map(QueryResult::Multiple);
657 }
658
659 let mut physical_plan = create_physical_plan(
660 ir_plan.lp_top,
661 &mut ir_plan.lp_arena,
662 &mut ir_plan.expr_arena,
663 BUILD_STREAMING_EXECUTOR,
664 )?;
665 let mut state = ExecutionState::new();
666 physical_plan.execute(&mut state).map(QueryResult::Single)
667 },
668 }
669 }
670
671 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
672 let sink_multiple = LazyFrame {
673 logical_plan: DslPlan::SinkMultiple { inputs: plans },
674 opt_state,
675 cached_arena: Default::default(),
676 };
677 sink_multiple.explain(true)
678 }
679
680 pub fn collect_all_with_engine(
681 plans: Vec<DslPlan>,
682 engine: Engine,
683 opt_state: OptFlags,
684 ) -> PolarsResult<Vec<DataFrame>> {
685 if plans.is_empty() {
686 return Ok(Vec::new());
687 }
688
689 LazyFrame {
690 logical_plan: DslPlan::SinkMultiple { inputs: plans },
691 opt_state,
692 cached_arena: Default::default(),
693 }
694 .collect_with_engine(engine)
695 .map(|r| r.unwrap_multiple())
696 }
697
698 pub fn collect(self) -> PolarsResult<DataFrame> {
716 self.collect_with_engine(Engine::Auto).map(|r| match r {
717 QueryResult::Single(df) => df,
718 QueryResult::Multiple(_) => DataFrame::empty(),
720 })
721 }
722
723 #[cfg(feature = "async")]
728 pub fn collect_batches(
729 self,
730 engine: Engine,
731 maintain_order: bool,
732 chunk_size: Option<NonZeroUsize>,
733 lazy: bool,
734 ) -> PolarsResult<CollectBatches> {
735 let (send, recv) = sync_channel(1);
736 let runner_send = send.clone();
737 let ldf = self.sink_batches(
738 PlanCallback::new(move |df| {
739 let send_result = send.send(Ok(df));
741 Ok(send_result.is_err())
742 }),
743 maintain_order,
744 chunk_size,
745 )?;
746 let runner = move || {
747 polars_io::pl_async::get_runtime().spawn_blocking(move || {
750 if let Err(e) = ldf.collect_with_engine(engine) {
751 runner_send.send(Err(e)).ok();
752 }
753 });
754 };
755
756 let mut collect_batches = CollectBatches {
757 recv,
758 runner: Some(Box::new(runner)),
759 };
760 if !lazy {
761 collect_batches.start();
762 }
763 Ok(collect_batches)
764 }
765
766 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
769 where
770 P: FnOnce(
771 Node,
772 &mut Arena<IR>,
773 &mut Arena<AExpr>,
774 Option<std::time::Duration>,
775 ) -> PolarsResult<()>,
776 {
777 let query_start = std::time::Instant::now();
778 let (mut state, mut physical_plan, _) =
779 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
780 state.time_nodes(query_start);
781 let out = physical_plan.execute(&mut state)?;
782 let timer_df = state.finish_timer()?;
783 Ok((out, timer_df))
784 }
785
786 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
794 self._profile_post_opt(|_, _, _, _| Ok(()))
795 }
796
797 pub fn sink_batches(
798 mut self,
799 function: PlanCallback<DataFrame, bool>,
800 maintain_order: bool,
801 chunk_size: Option<NonZeroUsize>,
802 ) -> PolarsResult<Self> {
803 use polars_plan::prelude::sink::CallbackSinkType;
804
805 polars_ensure!(
806 !matches!(self.logical_plan, DslPlan::Sink { .. }),
807 InvalidOperation: "cannot create a sink on top of another sink"
808 );
809
810 self.logical_plan = DslPlan::Sink {
811 input: Arc::new(self.logical_plan),
812 payload: SinkType::Callback(CallbackSinkType {
813 function,
814 maintain_order,
815 chunk_size,
816 }),
817 };
818
819 Ok(self)
820 }
821
822 #[cfg(feature = "new_streaming")]
823 pub fn try_new_streaming_if_requested(
824 &mut self,
825 ) -> Option<PolarsResult<polars_core::query_result::QueryResult>> {
826 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
827 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
828
829 if auto_new_streaming || force_new_streaming {
830 let mut new_stream_lazy = self.clone();
833 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
834 let mut ir_plan = match new_stream_lazy.to_alp_optimized() {
835 Ok(v) => v,
836 Err(e) => return Some(Err(e)),
837 };
838
839 ir_plan.ensure_root_node_is_sink();
840
841 let f = || {
842 polars_stream::run_query(
843 ir_plan.lp_top,
844 &mut ir_plan.lp_arena,
845 &mut ir_plan.expr_arena,
846 )
847 };
848
849 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
850 Ok(v) => return Some(v),
851 Err(e) => {
852 if !force_new_streaming
855 && auto_new_streaming
856 && e.downcast_ref::<&str>()
857 .map(|s| s.starts_with("not yet implemented"))
858 .unwrap_or(false)
859 {
860 if polars_core::config::verbose() {
861 eprintln!(
862 "caught unimplemented error in new streaming engine, falling back to normal engine"
863 );
864 }
865 } else {
866 std::panic::resume_unwind(e);
867 }
868 },
869 }
870 }
871
872 None
873 }
874
875 pub fn sink(
876 mut self,
877 sink_type: SinkDestination,
878 file_format: FileWriteFormat,
879 unified_sink_args: UnifiedSinkArgs,
880 ) -> PolarsResult<Self> {
881 polars_ensure!(
882 !matches!(self.logical_plan, DslPlan::Sink { .. }),
883 InvalidOperation: "cannot create a sink on top of another sink"
884 );
885
886 self.logical_plan = DslPlan::Sink {
887 input: Arc::new(self.logical_plan),
888 payload: match sink_type {
889 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
890 target,
891 file_format,
892 unified_sink_args,
893 }),
894 SinkDestination::Partitioned {
895 base_path,
896 file_path_provider,
897 partition_strategy,
898 max_rows_per_file,
899 approximate_bytes_per_file,
900 } => SinkType::Partitioned(PartitionedSinkOptions {
901 base_path,
902 file_path_provider,
903 partition_strategy,
904 file_format,
905 unified_sink_args,
906 max_rows_per_file,
907 approximate_bytes_per_file,
908 }),
909 },
910 };
911 Ok(self)
912 }
913
914 pub fn filter(self, predicate: Expr) -> Self {
932 let opt_state = self.get_opt_state();
933 let lp = self.get_plan_builder().filter(predicate).build();
934 Self::from_logical_plan(lp, opt_state)
935 }
936
937 pub fn remove(self, predicate: Expr) -> Self {
955 self.filter(predicate.neq_missing(lit(true)))
956 }
957
958 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
984 let exprs = exprs.as_ref().to_vec();
985 self.select_impl(
986 exprs,
987 ProjectionOptions {
988 run_parallel: true,
989 duplicate_check: true,
990 should_broadcast: true,
991 },
992 )
993 }
994
995 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
996 let exprs = exprs.as_ref().to_vec();
997 self.select_impl(
998 exprs,
999 ProjectionOptions {
1000 run_parallel: false,
1001 duplicate_check: true,
1002 should_broadcast: true,
1003 },
1004 )
1005 }
1006
1007 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1008 let opt_state = self.get_opt_state();
1009 let lp = self.get_plan_builder().project(exprs, options).build();
1010 Self::from_logical_plan(lp, opt_state)
1011 }
1012
1013 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1034 let keys = by
1035 .as_ref()
1036 .iter()
1037 .map(|e| e.clone().into())
1038 .collect::<Vec<_>>();
1039 let opt_state = self.get_opt_state();
1040
1041 #[cfg(feature = "dynamic_group_by")]
1042 {
1043 LazyGroupBy {
1044 logical_plan: self.logical_plan,
1045 opt_state,
1046 keys,
1047 predicates: vec![],
1048 maintain_order: false,
1049 dynamic_options: None,
1050 rolling_options: None,
1051 }
1052 }
1053
1054 #[cfg(not(feature = "dynamic_group_by"))]
1055 {
1056 LazyGroupBy {
1057 logical_plan: self.logical_plan,
1058 opt_state,
1059 keys,
1060 predicates: vec![],
1061 maintain_order: false,
1062 }
1063 }
1064 }
1065
1066 #[cfg(feature = "dynamic_group_by")]
1074 pub fn rolling<E: AsRef<[Expr]>>(
1075 mut self,
1076 index_column: Expr,
1077 group_by: E,
1078 mut options: RollingGroupOptions,
1079 ) -> LazyGroupBy {
1080 if let Expr::Column(name) = index_column {
1081 options.index_column = name;
1082 } else {
1083 let output_field = index_column
1084 .to_field(&self.collect_schema().unwrap())
1085 .unwrap();
1086 return self.with_column(index_column).rolling(
1087 Expr::Column(output_field.name().clone()),
1088 group_by,
1089 options,
1090 );
1091 }
1092 let opt_state = self.get_opt_state();
1093 LazyGroupBy {
1094 logical_plan: self.logical_plan,
1095 opt_state,
1096 predicates: vec![],
1097 keys: group_by.as_ref().to_vec(),
1098 maintain_order: true,
1099 dynamic_options: None,
1100 rolling_options: Some(options),
1101 }
1102 }
1103
1104 #[cfg(feature = "dynamic_group_by")]
1120 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1121 mut self,
1122 index_column: Expr,
1123 group_by: E,
1124 mut options: DynamicGroupOptions,
1125 ) -> LazyGroupBy {
1126 if let Expr::Column(name) = index_column {
1127 options.index_column = name;
1128 } else {
1129 let output_field = index_column
1130 .to_field(&self.collect_schema().unwrap())
1131 .unwrap();
1132 return self.with_column(index_column).group_by_dynamic(
1133 Expr::Column(output_field.name().clone()),
1134 group_by,
1135 options,
1136 );
1137 }
1138 let opt_state = self.get_opt_state();
1139 LazyGroupBy {
1140 logical_plan: self.logical_plan,
1141 opt_state,
1142 predicates: vec![],
1143 keys: group_by.as_ref().to_vec(),
1144 maintain_order: true,
1145 dynamic_options: Some(options),
1146 rolling_options: None,
1147 }
1148 }
1149
1150 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1152 let keys = by
1153 .as_ref()
1154 .iter()
1155 .map(|e| e.clone().into())
1156 .collect::<Vec<_>>();
1157 let opt_state = self.get_opt_state();
1158
1159 #[cfg(feature = "dynamic_group_by")]
1160 {
1161 LazyGroupBy {
1162 logical_plan: self.logical_plan,
1163 opt_state,
1164 keys,
1165 predicates: vec![],
1166 maintain_order: true,
1167 dynamic_options: None,
1168 rolling_options: None,
1169 }
1170 }
1171
1172 #[cfg(not(feature = "dynamic_group_by"))]
1173 {
1174 LazyGroupBy {
1175 logical_plan: self.logical_plan,
1176 opt_state,
1177 keys,
1178 predicates: vec![],
1179 maintain_order: true,
1180 }
1181 }
1182 }
1183
1184 #[cfg(feature = "semi_anti_join")]
1201 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1202 self.join(
1203 other,
1204 [left_on.into()],
1205 [right_on.into()],
1206 JoinArgs::new(JoinType::Anti),
1207 )
1208 }
1209
1210 #[cfg(feature = "cross_join")]
1212 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1213 self.join(
1214 other,
1215 vec![],
1216 vec![],
1217 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1218 )
1219 }
1220
1221 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1238 self.join(
1239 other,
1240 [left_on.into()],
1241 [right_on.into()],
1242 JoinArgs::new(JoinType::Left),
1243 )
1244 }
1245
1246 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1263 self.join(
1264 other,
1265 [left_on.into()],
1266 [right_on.into()],
1267 JoinArgs::new(JoinType::Inner),
1268 )
1269 }
1270
1271 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1288 self.join(
1289 other,
1290 [left_on.into()],
1291 [right_on.into()],
1292 JoinArgs::new(JoinType::Full),
1293 )
1294 }
1295
1296 #[cfg(feature = "semi_anti_join")]
1313 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1314 self.join(
1315 other,
1316 [left_on.into()],
1317 [right_on.into()],
1318 JoinArgs::new(JoinType::Semi),
1319 )
1320 }
1321
1322 pub fn join<E: AsRef<[Expr]>>(
1344 self,
1345 other: LazyFrame,
1346 left_on: E,
1347 right_on: E,
1348 args: JoinArgs,
1349 ) -> LazyFrame {
1350 let left_on = left_on.as_ref().to_vec();
1351 let right_on = right_on.as_ref().to_vec();
1352
1353 self._join_impl(other, left_on, right_on, args)
1354 }
1355
1356 fn _join_impl(
1357 self,
1358 other: LazyFrame,
1359 left_on: Vec<Expr>,
1360 right_on: Vec<Expr>,
1361 args: JoinArgs,
1362 ) -> LazyFrame {
1363 let JoinArgs {
1364 how,
1365 validation,
1366 suffix,
1367 slice,
1368 nulls_equal,
1369 coalesce,
1370 maintain_order,
1371 build_side,
1372 } = args;
1373
1374 if slice.is_some() {
1375 panic!("impl error: slice is not handled")
1376 }
1377
1378 let mut builder = self
1379 .join_builder()
1380 .with(other)
1381 .left_on(left_on)
1382 .right_on(right_on)
1383 .how(how)
1384 .validate(validation)
1385 .join_nulls(nulls_equal)
1386 .coalesce(coalesce)
1387 .maintain_order(maintain_order)
1388 .build_side(build_side);
1389
1390 if let Some(suffix) = suffix {
1391 builder = builder.suffix(suffix);
1392 }
1393
1394 builder.finish()
1396 }
1397
1398 pub fn join_builder(self) -> JoinBuilder {
1404 JoinBuilder::new(self)
1405 }
1406
1407 pub fn with_column(self, expr: Expr) -> LazyFrame {
1425 let opt_state = self.get_opt_state();
1426 let lp = self
1427 .get_plan_builder()
1428 .with_columns(
1429 vec![expr],
1430 ProjectionOptions {
1431 run_parallel: false,
1432 duplicate_check: true,
1433 should_broadcast: true,
1434 },
1435 )
1436 .build();
1437 Self::from_logical_plan(lp, opt_state)
1438 }
1439
1440 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1455 let exprs = exprs.as_ref().to_vec();
1456 self.with_columns_impl(
1457 exprs,
1458 ProjectionOptions {
1459 run_parallel: true,
1460 duplicate_check: true,
1461 should_broadcast: true,
1462 },
1463 )
1464 }
1465
1466 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1468 let exprs = exprs.as_ref().to_vec();
1469 self.with_columns_impl(
1470 exprs,
1471 ProjectionOptions {
1472 run_parallel: false,
1473 duplicate_check: true,
1474 should_broadcast: true,
1475 },
1476 )
1477 }
1478
1479 pub fn match_to_schema(
1481 self,
1482 schema: SchemaRef,
1483 per_column: Arc<[MatchToSchemaPerColumn]>,
1484 extra_columns: ExtraColumnsPolicy,
1485 ) -> LazyFrame {
1486 let opt_state = self.get_opt_state();
1487 let lp = self
1488 .get_plan_builder()
1489 .match_to_schema(schema, per_column, extra_columns)
1490 .build();
1491 Self::from_logical_plan(lp, opt_state)
1492 }
1493
1494 pub fn pipe_with_schema(
1495 self,
1496 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1497 ) -> Self {
1498 let opt_state = self.get_opt_state();
1499 let lp = self
1500 .get_plan_builder()
1501 .pipe_with_schema(vec![], callback)
1502 .build();
1503 Self::from_logical_plan(lp, opt_state)
1504 }
1505
1506 pub fn pipe_with_schemas(
1507 self,
1508 others: Vec<LazyFrame>,
1509 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1510 ) -> Self {
1511 let opt_state = self.get_opt_state();
1512 let lp = self
1513 .get_plan_builder()
1514 .pipe_with_schema(
1515 others.into_iter().map(|lf| lf.logical_plan).collect(),
1516 callback,
1517 )
1518 .build();
1519 Self::from_logical_plan(lp, opt_state)
1520 }
1521
1522 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1523 let opt_state = self.get_opt_state();
1524 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1525 Self::from_logical_plan(lp, opt_state)
1526 }
1527
1528 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1529 let contexts = contexts
1530 .as_ref()
1531 .iter()
1532 .map(|lf| lf.logical_plan.clone())
1533 .collect();
1534 let opt_state = self.get_opt_state();
1535 let lp = self.get_plan_builder().with_context(contexts).build();
1536 Self::from_logical_plan(lp, opt_state)
1537 }
1538
1539 pub fn max(self) -> Self {
1543 self.map_private(DslFunction::Stats(StatsFunction::Max))
1544 }
1545
1546 pub fn min(self) -> Self {
1550 self.map_private(DslFunction::Stats(StatsFunction::Min))
1551 }
1552
1553 pub fn sum(self) -> Self {
1563 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1564 }
1565
1566 pub fn mean(self) -> Self {
1571 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1572 }
1573
1574 pub fn median(self) -> Self {
1580 self.map_private(DslFunction::Stats(StatsFunction::Median))
1581 }
1582
1583 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1585 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1586 quantile,
1587 method,
1588 }))
1589 }
1590
1591 pub fn std(self, ddof: u8) -> Self {
1604 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1605 }
1606
1607 pub fn var(self, ddof: u8) -> Self {
1617 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1618 }
1619
1620 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1622 self.explode_impl(columns, options, false)
1623 }
1624
1625 fn explode_impl(
1627 self,
1628 columns: Selector,
1629 options: ExplodeOptions,
1630 allow_empty: bool,
1631 ) -> LazyFrame {
1632 let opt_state = self.get_opt_state();
1633 let lp = self
1634 .get_plan_builder()
1635 .explode(columns, options, allow_empty)
1636 .build();
1637 Self::from_logical_plan(lp, opt_state)
1638 }
1639
1640 pub fn null_count(self) -> LazyFrame {
1642 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1643 }
1644
1645 pub fn unique_stable(
1650 self,
1651 subset: Option<Selector>,
1652 keep_strategy: UniqueKeepStrategy,
1653 ) -> LazyFrame {
1654 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1655 self.unique_stable_generic(subset, keep_strategy)
1656 }
1657
1658 pub fn unique_stable_generic(
1659 self,
1660 subset: Option<Vec<Expr>>,
1661 keep_strategy: UniqueKeepStrategy,
1662 ) -> LazyFrame {
1663 let opt_state = self.get_opt_state();
1664 let options = DistinctOptionsDSL {
1665 subset,
1666 maintain_order: true,
1667 keep_strategy,
1668 };
1669 let lp = self.get_plan_builder().distinct(options).build();
1670 Self::from_logical_plan(lp, opt_state)
1671 }
1672
1673 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1681 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1682 self.unique_generic(subset, keep_strategy)
1683 }
1684
1685 pub fn unique_generic(
1686 self,
1687 subset: Option<Vec<Expr>>,
1688 keep_strategy: UniqueKeepStrategy,
1689 ) -> LazyFrame {
1690 let opt_state = self.get_opt_state();
1691 let options = DistinctOptionsDSL {
1692 subset,
1693 maintain_order: false,
1694 keep_strategy,
1695 };
1696 let lp = self.get_plan_builder().distinct(options).build();
1697 Self::from_logical_plan(lp, opt_state)
1698 }
1699
1700 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1705 let opt_state = self.get_opt_state();
1706 let lp = self.get_plan_builder().drop_nans(subset).build();
1707 Self::from_logical_plan(lp, opt_state)
1708 }
1709
1710 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1715 let opt_state = self.get_opt_state();
1716 let lp = self.get_plan_builder().drop_nulls(subset).build();
1717 Self::from_logical_plan(lp, opt_state)
1718 }
1719
1720 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1730 let opt_state = self.get_opt_state();
1731 let lp = self.get_plan_builder().slice(offset, len).build();
1732 Self::from_logical_plan(lp, opt_state)
1733 }
1734
1735 pub fn clear(self) -> LazyFrame {
1737 self.slice(0, 0)
1738 }
1739
1740 pub fn first(self) -> LazyFrame {
1744 self.slice(0, 1)
1745 }
1746
1747 pub fn last(self) -> LazyFrame {
1751 self.slice(-1, 1)
1752 }
1753
1754 pub fn tail(self, n: IdxSize) -> LazyFrame {
1758 let neg_tail = -(n as i64);
1759 self.slice(neg_tail, n)
1760 }
1761
1762 #[cfg(feature = "pivot")]
1763 #[expect(clippy::too_many_arguments)]
1764 pub fn pivot(
1765 self,
1766 on: Selector,
1767 on_columns: Arc<DataFrame>,
1768 index: Selector,
1769 values: Selector,
1770 agg: Expr,
1771 maintain_order: bool,
1772 separator: PlSmallStr,
1773 ) -> LazyFrame {
1774 let opt_state = self.get_opt_state();
1775 let lp = self
1776 .get_plan_builder()
1777 .pivot(
1778 on,
1779 on_columns,
1780 index,
1781 values,
1782 agg,
1783 maintain_order,
1784 separator,
1785 )
1786 .build();
1787 Self::from_logical_plan(lp, opt_state)
1788 }
1789
1790 #[cfg(feature = "pivot")]
1794 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1795 let opt_state = self.get_opt_state();
1796 let lp = self.get_plan_builder().unpivot(args).build();
1797 Self::from_logical_plan(lp, opt_state)
1798 }
1799
1800 pub fn limit(self, n: IdxSize) -> LazyFrame {
1802 self.slice(0, n)
1803 }
1804
1805 pub fn map<F>(
1819 self,
1820 function: F,
1821 optimizations: AllowedOptimizations,
1822 schema: Option<Arc<dyn UdfSchema>>,
1823 name: Option<&'static str>,
1824 ) -> LazyFrame
1825 where
1826 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1827 {
1828 let opt_state = self.get_opt_state();
1829 let lp = self
1830 .get_plan_builder()
1831 .map(
1832 function,
1833 optimizations,
1834 schema,
1835 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1836 )
1837 .build();
1838 Self::from_logical_plan(lp, opt_state)
1839 }
1840
1841 #[cfg(feature = "python")]
1842 pub fn map_python(
1843 self,
1844 function: polars_utils::python_function::PythonFunction,
1845 optimizations: AllowedOptimizations,
1846 schema: Option<SchemaRef>,
1847 validate_output: bool,
1848 ) -> LazyFrame {
1849 let opt_state = self.get_opt_state();
1850 let lp = self
1851 .get_plan_builder()
1852 .map_python(function, optimizations, schema, validate_output)
1853 .build();
1854 Self::from_logical_plan(lp, opt_state)
1855 }
1856
1857 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1858 let opt_state = self.get_opt_state();
1859 let lp = self.get_plan_builder().map_private(function).build();
1860 Self::from_logical_plan(lp, opt_state)
1861 }
1862
1863 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1872 where
1873 S: Into<PlSmallStr>,
1874 {
1875 let name = name.into();
1876
1877 match &self.logical_plan {
1878 v @ DslPlan::Scan {
1879 scan_type,
1880 unified_scan_args,
1881 ..
1882 } if unified_scan_args.row_index.is_none()
1883 && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1884 {
1885 let DslPlan::Scan {
1886 sources,
1887 mut unified_scan_args,
1888 scan_type,
1889 cached_ir: _,
1890 } = v.clone()
1891 else {
1892 unreachable!()
1893 };
1894
1895 unified_scan_args.row_index = Some(RowIndex {
1896 name,
1897 offset: offset.unwrap_or(0),
1898 });
1899
1900 DslPlan::Scan {
1901 sources,
1902 unified_scan_args,
1903 scan_type,
1904 cached_ir: Default::default(),
1905 }
1906 .into()
1907 },
1908 _ => self.map_private(DslFunction::RowIndex { name, offset }),
1909 }
1910 }
1911
1912 pub fn count(self) -> LazyFrame {
1914 self.select(vec![col(PlSmallStr::from_static("*")).count()])
1915 }
1916
1917 #[cfg(feature = "dtype-struct")]
1920 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1921 self.map_private(DslFunction::Unnest {
1922 columns: cols,
1923 separator,
1924 })
1925 }
1926
1927 #[cfg(feature = "merge_sorted")]
1928 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1929 where
1930 S: Into<PlSmallStr>,
1931 {
1932 let key = key.into();
1933
1934 let lp = DslPlan::MergeSorted {
1935 input_left: Arc::new(self.logical_plan),
1936 input_right: Arc::new(other.logical_plan),
1937 key,
1938 };
1939 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1940 }
1941
1942 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1943 let lp = DslPlan::MapFunction {
1944 input: Arc::new(self.logical_plan),
1945 function: DslFunction::Hint(hint),
1946 };
1947 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1948 }
1949}
1950
1951#[derive(Clone)]
1953pub struct LazyGroupBy {
1954 pub logical_plan: DslPlan,
1955 opt_state: OptFlags,
1956 keys: Vec<Expr>,
1957 predicates: Vec<Expr>,
1958 maintain_order: bool,
1959 #[cfg(feature = "dynamic_group_by")]
1960 dynamic_options: Option<DynamicGroupOptions>,
1961 #[cfg(feature = "dynamic_group_by")]
1962 rolling_options: Option<RollingGroupOptions>,
1963}
1964
1965impl From<LazyGroupBy> for LazyFrame {
1966 fn from(lgb: LazyGroupBy) -> Self {
1967 Self {
1968 logical_plan: lgb.logical_plan,
1969 opt_state: lgb.opt_state,
1970 cached_arena: Default::default(),
1971 }
1972 }
1973}
1974
1975impl LazyGroupBy {
1976 pub fn having(mut self, predicate: Expr) -> Self {
1997 self.predicates.push(predicate);
1998 self
1999 }
2000
2001 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2023 #[cfg(feature = "dynamic_group_by")]
2024 let lp = DslBuilder::from(self.logical_plan)
2025 .group_by(
2026 self.keys,
2027 self.predicates,
2028 aggs,
2029 None,
2030 self.maintain_order,
2031 self.dynamic_options,
2032 self.rolling_options,
2033 )
2034 .build();
2035
2036 #[cfg(not(feature = "dynamic_group_by"))]
2037 let lp = DslBuilder::from(self.logical_plan)
2038 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2039 .build();
2040 LazyFrame::from_logical_plan(lp, self.opt_state)
2041 }
2042
2043 pub fn head(self, n: Option<usize>) -> LazyFrame {
2045 let keys = self
2046 .keys
2047 .iter()
2048 .filter_map(|expr| expr_output_name(expr).ok())
2049 .collect::<Vec<_>>();
2050
2051 self.agg([all().as_expr().head(n)]).explode_impl(
2052 all() - by_name(keys.iter().cloned(), false, false),
2053 ExplodeOptions {
2054 empty_as_null: true,
2055 keep_nulls: true,
2056 },
2057 true,
2058 )
2059 }
2060
2061 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2063 let keys = self
2064 .keys
2065 .iter()
2066 .filter_map(|expr| expr_output_name(expr).ok())
2067 .collect::<Vec<_>>();
2068
2069 self.agg([all().as_expr().tail(n)]).explode_impl(
2070 all() - by_name(keys.iter().cloned(), false, false),
2071 ExplodeOptions {
2072 empty_as_null: true,
2073 keep_nulls: true,
2074 },
2075 true,
2076 )
2077 }
2078
2079 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2084 if !self.predicates.is_empty() {
2085 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2086 }
2087
2088 #[cfg(feature = "dynamic_group_by")]
2089 let options = GroupbyOptions {
2090 dynamic: self.dynamic_options,
2091 rolling: self.rolling_options,
2092 slice: None,
2093 };
2094
2095 #[cfg(not(feature = "dynamic_group_by"))]
2096 let options = GroupbyOptions { slice: None };
2097
2098 let lp = DslPlan::GroupBy {
2099 input: Arc::new(self.logical_plan),
2100 keys: self.keys,
2101 predicates: vec![],
2102 aggs: vec![],
2103 apply: Some((f, schema)),
2104 maintain_order: self.maintain_order,
2105 options: Arc::new(options),
2106 };
2107 LazyFrame::from_logical_plan(lp, self.opt_state)
2108 }
2109}
2110
2111#[must_use]
2112pub struct JoinBuilder {
2113 lf: LazyFrame,
2114 how: JoinType,
2115 other: Option<LazyFrame>,
2116 left_on: Vec<Expr>,
2117 right_on: Vec<Expr>,
2118 allow_parallel: bool,
2119 force_parallel: bool,
2120 suffix: Option<PlSmallStr>,
2121 validation: JoinValidation,
2122 nulls_equal: bool,
2123 coalesce: JoinCoalesce,
2124 maintain_order: MaintainOrderJoin,
2125 build_side: Option<JoinBuildSide>,
2126}
2127impl JoinBuilder {
2128 pub fn new(lf: LazyFrame) -> Self {
2130 Self {
2131 lf,
2132 other: None,
2133 how: JoinType::Inner,
2134 left_on: vec![],
2135 right_on: vec![],
2136 allow_parallel: true,
2137 force_parallel: false,
2138 suffix: None,
2139 validation: Default::default(),
2140 nulls_equal: false,
2141 coalesce: Default::default(),
2142 maintain_order: Default::default(),
2143 build_side: None,
2144 }
2145 }
2146
2147 pub fn with(mut self, other: LazyFrame) -> Self {
2149 self.other = Some(other);
2150 self
2151 }
2152
2153 pub fn how(mut self, how: JoinType) -> Self {
2155 self.how = how;
2156 self
2157 }
2158
2159 pub fn validate(mut self, validation: JoinValidation) -> Self {
2160 self.validation = validation;
2161 self
2162 }
2163
2164 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2168 let on = on.as_ref().to_vec();
2169 self.left_on.clone_from(&on);
2170 self.right_on = on;
2171 self
2172 }
2173
2174 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2178 self.left_on = on.as_ref().to_vec();
2179 self
2180 }
2181
2182 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2186 self.right_on = on.as_ref().to_vec();
2187 self
2188 }
2189
2190 pub fn allow_parallel(mut self, allow: bool) -> Self {
2192 self.allow_parallel = allow;
2193 self
2194 }
2195
2196 pub fn force_parallel(mut self, force: bool) -> Self {
2198 self.force_parallel = force;
2199 self
2200 }
2201
2202 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2204 self.nulls_equal = nulls_equal;
2205 self
2206 }
2207
2208 pub fn suffix<S>(mut self, suffix: S) -> Self
2211 where
2212 S: Into<PlSmallStr>,
2213 {
2214 self.suffix = Some(suffix.into());
2215 self
2216 }
2217
2218 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2220 self.coalesce = coalesce;
2221 self
2222 }
2223
2224 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2226 self.maintain_order = maintain_order;
2227 self
2228 }
2229
2230 pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2232 self.build_side = build_side;
2233 self
2234 }
2235
2236 pub fn finish(self) -> LazyFrame {
2238 let opt_state = self.lf.opt_state;
2239 let other = self.other.expect("'with' not set in join builder");
2240
2241 let args = JoinArgs {
2242 how: self.how,
2243 validation: self.validation,
2244 suffix: self.suffix,
2245 slice: None,
2246 nulls_equal: self.nulls_equal,
2247 coalesce: self.coalesce,
2248 maintain_order: self.maintain_order,
2249 build_side: self.build_side,
2250 };
2251
2252 let lp = self
2253 .lf
2254 .get_plan_builder()
2255 .join(
2256 other.logical_plan,
2257 self.left_on,
2258 self.right_on,
2259 JoinOptions {
2260 allow_parallel: self.allow_parallel,
2261 force_parallel: self.force_parallel,
2262 args,
2263 }
2264 .into(),
2265 )
2266 .build();
2267 LazyFrame::from_logical_plan(lp, opt_state)
2268 }
2269
2270 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2272 let opt_state = self.lf.opt_state;
2273 let other = self.other.expect("with not set");
2274
2275 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2277 if let Expr::BinaryExpr {
2278 op: Operator::And,
2279 left,
2280 right,
2281 } = predicate
2282 {
2283 decompose_and((*left).clone(), expanded_predicates);
2284 decompose_and((*right).clone(), expanded_predicates);
2285 } else {
2286 expanded_predicates.push(predicate);
2287 }
2288 }
2289 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2290 for predicate in predicates {
2291 decompose_and(predicate, &mut expanded_predicates);
2292 }
2293 let predicates: Vec<Expr> = expanded_predicates;
2294
2295 #[cfg(feature = "is_between")]
2297 let predicates: Vec<Expr> = {
2298 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2299 for predicate in predicates {
2300 if let Expr::Function {
2301 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2302 input,
2303 ..
2304 } = &predicate
2305 {
2306 if let [expr, lower, upper] = input.as_slice() {
2307 match closed {
2308 ClosedInterval::Both => {
2309 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2310 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2311 },
2312 ClosedInterval::Right => {
2313 expanded_predicates.push(expr.clone().gt(lower.clone()));
2314 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2315 },
2316 ClosedInterval::Left => {
2317 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2318 expanded_predicates.push(expr.clone().lt(upper.clone()));
2319 },
2320 ClosedInterval::None => {
2321 expanded_predicates.push(expr.clone().gt(lower.clone()));
2322 expanded_predicates.push(expr.clone().lt(upper.clone()));
2323 },
2324 }
2325 continue;
2326 }
2327 }
2328 expanded_predicates.push(predicate);
2329 }
2330 expanded_predicates
2331 };
2332
2333 let args = JoinArgs {
2334 how: self.how,
2335 validation: self.validation,
2336 suffix: self.suffix,
2337 slice: None,
2338 nulls_equal: self.nulls_equal,
2339 coalesce: self.coalesce,
2340 maintain_order: self.maintain_order,
2341 build_side: self.build_side,
2342 };
2343 let options = JoinOptions {
2344 allow_parallel: self.allow_parallel,
2345 force_parallel: self.force_parallel,
2346 args,
2347 };
2348
2349 let lp = DslPlan::Join {
2350 input_left: Arc::new(self.lf.logical_plan),
2351 input_right: Arc::new(other.logical_plan),
2352 left_on: Default::default(),
2353 right_on: Default::default(),
2354 predicates,
2355 options: Arc::from(options),
2356 };
2357
2358 LazyFrame::from_logical_plan(lp, opt_state)
2359 }
2360}
2361
2362pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2363 #[cfg(not(feature = "new_streaming"))]
2364 {
2365 None
2366 }
2367 #[cfg(feature = "new_streaming")]
2368 {
2369 Some(polars_stream::build_streaming_query_executor)
2370 }
2371};
2372
2373pub struct CollectBatches {
2374 recv: Receiver<PolarsResult<DataFrame>>,
2375 runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2376}
2377
2378impl CollectBatches {
2379 pub fn start(&mut self) {
2381 if let Some(runner) = self.runner.take() {
2382 runner()
2383 }
2384 }
2385}
2386
2387impl Iterator for CollectBatches {
2388 type Item = PolarsResult<DataFrame>;
2389
2390 fn next(&mut self) -> Option<Self::Item> {
2391 self.start();
2392 self.recv.recv().ok()
2393 }
2394}