1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::{Arc, Mutex};
12
13pub use anonymous_scan::*;
14#[cfg(feature = "csv")]
15pub use csv::*;
16#[cfg(not(target_arch = "wasm32"))]
17pub use exitable::*;
18pub use file_list_reader::*;
19#[cfg(feature = "json")]
20pub use ndjson::*;
21#[cfg(feature = "parquet")]
22pub use parquet::*;
23use polars_compute::rolling::QuantileMethod;
24use polars_core::POOL;
25use polars_core::error::feature_gated;
26use polars_core::prelude::*;
27use polars_io::RowIndex;
28use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
29use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
30use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
31#[cfg(feature = "is_between")]
32use polars_ops::prelude::ClosedInterval;
33pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
34use polars_utils::pl_str::PlSmallStr;
35use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
36
37use crate::frame::cached_arenas::CachedArena;
38use crate::prelude::*;
39
40pub trait IntoLazy {
41 fn lazy(self) -> LazyFrame;
42}
43
44impl IntoLazy for DataFrame {
45 fn lazy(self) -> LazyFrame {
47 let lp = DslBuilder::from_existing_df(self).build();
48 LazyFrame {
49 logical_plan: lp,
50 opt_state: Default::default(),
51 cached_arena: Default::default(),
52 }
53 }
54}
55
56impl IntoLazy for LazyFrame {
57 fn lazy(self) -> LazyFrame {
58 self
59 }
60}
61
62#[derive(Clone, Default)]
67#[must_use]
68pub struct LazyFrame {
69 pub logical_plan: DslPlan,
70 pub(crate) opt_state: OptFlags,
71 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
72}
73
74impl From<DslPlan> for LazyFrame {
75 fn from(plan: DslPlan) -> Self {
76 Self {
77 logical_plan: plan,
78 opt_state: OptFlags::default(),
79 cached_arena: Default::default(),
80 }
81 }
82}
83
84impl LazyFrame {
85 pub(crate) fn from_inner(
86 logical_plan: DslPlan,
87 opt_state: OptFlags,
88 cached_arena: Arc<Mutex<Option<CachedArena>>>,
89 ) -> Self {
90 Self {
91 logical_plan,
92 opt_state,
93 cached_arena,
94 }
95 }
96
97 pub(crate) fn get_plan_builder(self) -> DslBuilder {
98 DslBuilder::from(self.logical_plan)
99 }
100
101 fn get_opt_state(&self) -> OptFlags {
102 self.opt_state
103 }
104
105 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
106 LazyFrame {
107 logical_plan,
108 opt_state,
109 cached_arena: Default::default(),
110 }
111 }
112
113 pub fn get_current_optimizations(&self) -> OptFlags {
115 self.opt_state
116 }
117
118 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
120 self.opt_state = opt_state;
121 self
122 }
123
124 pub fn without_optimizations(self) -> Self {
126 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
127 }
128
129 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
131 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
132 self
133 }
134
135 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
137 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
138 self
139 }
140
141 pub fn with_check_order(mut self, toggle: bool) -> Self {
144 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
145 self
146 }
147
148 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
150 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
151 self
152 }
153
154 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
156 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
157 self
158 }
159
160 pub fn with_type_check(mut self, toggle: bool) -> Self {
162 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
163 self
164 }
165
166 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
168 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
169 self
170 }
171
172 #[cfg(feature = "cse")]
174 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
175 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
176 self
177 }
178
179 #[cfg(feature = "cse")]
181 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
182 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
183 self
184 }
185
186 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
188 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
189 self
190 }
191
192 #[cfg(feature = "new_streaming")]
193 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
194 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
195 self
196 }
197
198 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
200 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
201 self
202 }
203
204 pub fn _with_eager(mut self, toggle: bool) -> Self {
206 self.opt_state.set(OptFlags::EAGER, toggle);
207 self
208 }
209
210 pub fn describe_plan(&self) -> PolarsResult<String> {
212 Ok(self.clone().to_alp()?.describe())
213 }
214
215 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
217 Ok(self.clone().to_alp()?.describe_tree_format())
218 }
219
220 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
224 Ok(self.clone().to_alp_optimized()?.describe())
225 }
226
227 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
231 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
232 }
233
234 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
239 if optimized {
240 self.describe_optimized_plan()
241 } else {
242 self.describe_plan()
243 }
244 }
245
246 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
286 let opt_state = self.get_opt_state();
287 let lp = self
288 .get_plan_builder()
289 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
290 .build();
291 Self::from_logical_plan(lp, opt_state)
292 }
293
294 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
314 self,
315 by_exprs: E,
316 sort_options: SortMultipleOptions,
317 ) -> Self {
318 let by_exprs = by_exprs.as_ref().to_vec();
319 if by_exprs.is_empty() {
320 self
321 } else {
322 let opt_state = self.get_opt_state();
323 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
324 Self::from_logical_plan(lp, opt_state)
325 }
326 }
327
328 pub fn top_k<E: AsRef<[Expr]>>(
329 self,
330 k: IdxSize,
331 by_exprs: E,
332 sort_options: SortMultipleOptions,
333 ) -> Self {
334 self.sort_by_exprs(
336 by_exprs,
337 sort_options.with_order_reversed().with_nulls_last(true),
338 )
339 .slice(0, k)
340 }
341
342 pub fn bottom_k<E: AsRef<[Expr]>>(
343 self,
344 k: IdxSize,
345 by_exprs: E,
346 sort_options: SortMultipleOptions,
347 ) -> Self {
348 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
350 .slice(0, k)
351 }
352
353 pub fn reverse(self) -> Self {
369 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
370 }
371
372 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
380 where
381 I: IntoIterator<Item = T>,
382 J: IntoIterator<Item = S>,
383 T: AsRef<str>,
384 S: AsRef<str>,
385 {
386 let iter = existing.into_iter();
387 let cap = iter.size_hint().0;
388 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
389 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390
391 for (existing, new) in iter.zip(new) {
394 let existing = existing.as_ref();
395 let new = new.as_ref();
396 if new != existing {
397 existing_vec.push(existing.into());
398 new_vec.push(new.into());
399 }
400 }
401
402 self.map_private(DslFunction::Rename {
403 existing: existing_vec.into(),
404 new: new_vec.into(),
405 strict,
406 })
407 }
408
409 pub fn drop(self, columns: Selector) -> Self {
416 let opt_state = self.get_opt_state();
417 let lp = self.get_plan_builder().drop(columns).build();
418 Self::from_logical_plan(lp, opt_state)
419 }
420
421 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
426 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
427 }
428
429 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
434 self.select(vec![
435 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
436 ])
437 }
438
439 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
441 let opt_state = self.get_opt_state();
442 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
443 Self::from_logical_plan(lp, opt_state)
444 }
445
446 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448 let opt_state = self.get_opt_state();
449 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
450 Self::from_logical_plan(lp, opt_state)
451 }
452
453 pub fn cache(self) -> Self {
457 let opt_state = self.get_opt_state();
458 let lp = self.get_plan_builder().cache().build();
459 Self::from_logical_plan(lp, opt_state)
460 }
461
462 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
464 let cast_cols: Vec<Expr> = dtypes
465 .into_iter()
466 .map(|(name, dt)| {
467 let name = PlSmallStr::from_str(name);
468
469 if strict {
470 col(name).strict_cast(dt)
471 } else {
472 col(name).cast(dt)
473 }
474 })
475 .collect();
476
477 if cast_cols.is_empty() {
478 self
479 } else {
480 self.with_columns(cast_cols)
481 }
482 }
483
484 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
486 self.with_columns(vec![if strict {
487 col(PlSmallStr::from_static("*")).strict_cast(dtype)
488 } else {
489 col(PlSmallStr::from_static("*")).cast(dtype)
490 }])
491 }
492
493 pub fn optimize(
494 self,
495 lp_arena: &mut Arena<IR>,
496 expr_arena: &mut Arena<AExpr>,
497 ) -> PolarsResult<Node> {
498 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
499 }
500
501 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
502 let (mut lp_arena, mut expr_arena) = self.get_arenas();
503 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
504
505 Ok(IRPlan::new(node, lp_arena, expr_arena))
506 }
507
508 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
509 let (mut lp_arena, mut expr_arena) = self.get_arenas();
510 let node = to_alp(
511 self.logical_plan,
512 &mut expr_arena,
513 &mut lp_arena,
514 &mut self.opt_state,
515 )?;
516 let plan = IRPlan::new(node, lp_arena, expr_arena);
517 Ok(plan)
518 }
519
520 pub(crate) fn optimize_with_scratch(
521 self,
522 lp_arena: &mut Arena<IR>,
523 expr_arena: &mut Arena<AExpr>,
524 scratch: &mut Vec<Node>,
525 ) -> PolarsResult<Node> {
526 #[allow(unused_mut)]
527 let mut opt_state = self.opt_state;
528 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
529
530 #[cfg(feature = "cse")]
531 if new_streaming {
532 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
535 }
536
537 let lp_top = optimize(
538 self.logical_plan,
539 opt_state,
540 lp_arena,
541 expr_arena,
542 scratch,
543 apply_scan_predicate_to_scan_ir,
544 )?;
545
546 Ok(lp_top)
547 }
548
549 fn prepare_collect_post_opt<P>(
550 mut self,
551 check_sink: bool,
552 query_start: Option<std::time::Instant>,
553 post_opt: P,
554 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
555 where
556 P: FnOnce(
557 Node,
558 &mut Arena<IR>,
559 &mut Arena<AExpr>,
560 Option<std::time::Duration>,
561 ) -> PolarsResult<()>,
562 {
563 let (mut lp_arena, mut expr_arena) = self.get_arenas();
564
565 let mut scratch = vec![];
566 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
567
568 post_opt(
569 lp_top,
570 &mut lp_arena,
571 &mut expr_arena,
572 query_start.map(|s| s.elapsed()),
575 )?;
576
577 let no_file_sink = if check_sink {
579 !matches!(
580 lp_arena.get(lp_top),
581 IR::Sink {
582 payload: SinkTypeIR::File { .. },
583 ..
584 }
585 )
586 } else {
587 true
588 };
589 let physical_plan = create_physical_plan(
590 lp_top,
591 &mut lp_arena,
592 &mut expr_arena,
593 BUILD_STREAMING_EXECUTOR,
594 )?;
595
596 let state = ExecutionState::new();
597 Ok((state, physical_plan, no_file_sink))
598 }
599
600 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
602 where
603 P: FnOnce(
604 Node,
605 &mut Arena<IR>,
606 &mut Arena<AExpr>,
607 Option<std::time::Duration>,
608 ) -> PolarsResult<()>,
609 {
610 let (mut state, mut physical_plan, _) =
611 self.prepare_collect_post_opt(false, None, post_opt)?;
612 physical_plan.execute(&mut state)
613 }
614
615 #[allow(unused_mut)]
616 fn prepare_collect(
617 self,
618 check_sink: bool,
619 query_start: Option<std::time::Instant>,
620 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
621 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
622 }
623
624 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
629 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
630 payload.clone()
631 } else {
632 self.logical_plan = DslPlan::Sink {
633 input: Arc::new(self.logical_plan),
634 payload: SinkType::Memory,
635 };
636 SinkType::Memory
637 };
638
639 if engine == Engine::Auto {
641 engine = match payload {
642 #[cfg(feature = "new_streaming")]
643 SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
644 _ => Engine::InMemory,
645 };
646 }
647 if engine == Engine::Gpu {
649 engine = Engine::InMemory;
650 }
651
652 #[cfg(feature = "new_streaming")]
653 {
654 if let Some(result) = self.try_new_streaming_if_requested() {
655 return result.map(|v| v.unwrap_single());
656 }
657 }
658
659 match engine {
660 Engine::Auto => unreachable!(),
661 Engine::Streaming => {
662 feature_gated!("new_streaming", self = self.with_new_streaming(true))
663 },
664 _ => {},
665 }
666 let mut alp_plan = self.clone().to_alp_optimized()?;
667
668 match engine {
669 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
670 let result = polars_stream::run_query(
671 alp_plan.lp_top,
672 &mut alp_plan.lp_arena,
673 &mut alp_plan.expr_arena,
674 );
675 result.map(|v| v.unwrap_single())
676 }),
677 Engine::Gpu => {
678 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
679 },
680 Engine::InMemory => {
681 let mut physical_plan = create_physical_plan(
682 alp_plan.lp_top,
683 &mut alp_plan.lp_arena,
684 &mut alp_plan.expr_arena,
685 BUILD_STREAMING_EXECUTOR,
686 )?;
687 let mut state = ExecutionState::new();
688 physical_plan.execute(&mut state)
689 },
690 }
691 }
692
693 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
694 let sink_multiple = LazyFrame {
695 logical_plan: DslPlan::SinkMultiple { inputs: plans },
696 opt_state,
697 cached_arena: Default::default(),
698 };
699 sink_multiple.explain(true)
700 }
701
702 pub fn collect_all_with_engine(
703 plans: Vec<DslPlan>,
704 mut engine: Engine,
705 opt_state: OptFlags,
706 ) -> PolarsResult<Vec<DataFrame>> {
707 if plans.is_empty() {
708 return Ok(Vec::new());
709 }
710
711 if engine == Engine::Auto {
713 engine = Engine::InMemory;
714 }
715 if engine == Engine::Gpu {
717 engine = Engine::InMemory;
718 }
719
720 let mut sink_multiple = LazyFrame {
721 logical_plan: DslPlan::SinkMultiple { inputs: plans },
722 opt_state,
723 cached_arena: Default::default(),
724 };
725
726 #[cfg(feature = "new_streaming")]
727 {
728 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
729 return result.map(|v| v.unwrap_multiple());
730 }
731 }
732
733 match engine {
734 Engine::Auto => unreachable!(),
735 Engine::Streaming => {
736 feature_gated!(
737 "new_streaming",
738 sink_multiple = sink_multiple.with_new_streaming(true)
739 )
740 },
741 _ => {},
742 }
743 let mut alp_plan = sink_multiple.to_alp_optimized()?;
744
745 if engine == Engine::Streaming {
746 feature_gated!("new_streaming", {
747 let result = polars_stream::run_query(
748 alp_plan.lp_top,
749 &mut alp_plan.lp_arena,
750 &mut alp_plan.expr_arena,
751 );
752 return result.map(|v| v.unwrap_multiple());
753 });
754 }
755
756 let IR::SinkMultiple { inputs } = alp_plan.root() else {
757 unreachable!()
758 };
759
760 let mut multiplan = create_multiple_physical_plans(
761 inputs.clone().as_slice(),
762 &mut alp_plan.lp_arena,
763 &mut alp_plan.expr_arena,
764 BUILD_STREAMING_EXECUTOR,
765 )?;
766
767 match engine {
768 Engine::Gpu => polars_bail!(
769 InvalidOperation: "collect_all is not supported for the gpu engine"
770 ),
771 Engine::InMemory => {
772 let mut state = ExecutionState::new();
776 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
777 cache_prefiller.execute(&mut state)?;
778 }
779 let out = POOL.install(|| {
780 multiplan
781 .physical_plans
782 .chunks_mut(POOL.current_num_threads() * 3)
783 .map(|chunk| {
784 chunk
785 .into_par_iter()
786 .enumerate()
787 .map(|(idx, input)| {
788 let mut input = std::mem::take(input);
789 let mut state = state.split();
790 state.branch_idx += idx;
791
792 let df = input.execute(&mut state)?;
793 Ok(df)
794 })
795 .collect::<PolarsResult<Vec<_>>>()
796 })
797 .collect::<PolarsResult<Vec<_>>>()
798 });
799 Ok(out?.into_iter().flatten().collect())
800 },
801 _ => unreachable!(),
802 }
803 }
804
805 pub fn collect(self) -> PolarsResult<DataFrame> {
823 self.collect_with_engine(Engine::InMemory)
824 }
825
826 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
829 where
830 P: FnOnce(
831 Node,
832 &mut Arena<IR>,
833 &mut Arena<AExpr>,
834 Option<std::time::Duration>,
835 ) -> PolarsResult<()>,
836 {
837 let query_start = std::time::Instant::now();
838 let (mut state, mut physical_plan, _) =
839 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
840 state.time_nodes(query_start);
841 let out = physical_plan.execute(&mut state)?;
842 let timer_df = state.finish_timer()?;
843 Ok((out, timer_df))
844 }
845
846 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
854 self._profile_post_opt(|_, _, _, _| Ok(()))
855 }
856
857 pub fn sink_batches(
858 mut self,
859 function: PlanCallback<DataFrame, bool>,
860 maintain_order: bool,
861 chunk_size: Option<NonZeroUsize>,
862 ) -> PolarsResult<Self> {
863 use polars_plan::prelude::sink::CallbackSinkType;
864
865 polars_ensure!(
866 !matches!(self.logical_plan, DslPlan::Sink { .. }),
867 InvalidOperation: "cannot create a sink on top of another sink"
868 );
869
870 self.logical_plan = DslPlan::Sink {
871 input: Arc::new(self.logical_plan),
872 payload: SinkType::Callback(CallbackSinkType {
873 function,
874 maintain_order,
875 chunk_size,
876 }),
877 };
878
879 Ok(self)
880 }
881
882 #[cfg(feature = "new_streaming")]
883 pub fn try_new_streaming_if_requested(
884 &mut self,
885 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
886 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
887 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
888
889 if auto_new_streaming || force_new_streaming {
890 let mut new_stream_lazy = self.clone();
893 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
894 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
895 Ok(v) => v,
896 Err(e) => return Some(Err(e)),
897 };
898
899 let f = || {
900 polars_stream::run_query(
901 alp_plan.lp_top,
902 &mut alp_plan.lp_arena,
903 &mut alp_plan.expr_arena,
904 )
905 };
906
907 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
908 Ok(v) => return Some(v),
909 Err(e) => {
910 if !force_new_streaming
913 && auto_new_streaming
914 && e.downcast_ref::<&str>()
915 .map(|s| s.starts_with("not yet implemented"))
916 .unwrap_or(false)
917 {
918 if polars_core::config::verbose() {
919 eprintln!(
920 "caught unimplemented error in new streaming engine, falling back to normal engine"
921 );
922 }
923 } else {
924 std::panic::resume_unwind(e);
925 }
926 },
927 }
928 }
929
930 None
931 }
932
933 pub fn sink(
934 mut self,
935 sink_type: SinkDestination,
936 file_format: impl Into<Arc<FileType>>,
937 unified_sink_args: UnifiedSinkArgs,
938 ) -> PolarsResult<Self> {
939 polars_ensure!(
940 !matches!(self.logical_plan, DslPlan::Sink { .. }),
941 InvalidOperation: "cannot create a sink on top of another sink"
942 );
943
944 self.logical_plan = DslPlan::Sink {
945 input: Arc::new(self.logical_plan),
946 payload: match sink_type {
947 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
948 target,
949 file_format: file_format.into(),
950 unified_sink_args,
951 }),
952 SinkDestination::Partitioned {
953 base_path,
954 file_path_provider,
955 partition_strategy,
956 finish_callback,
957 max_rows_per_file,
958 approximate_bytes_per_file,
959 } => SinkType::Partitioned(PartitionedSinkOptions {
960 base_path,
961 file_path_provider,
962 partition_strategy,
963 finish_callback,
964 file_format: file_format.into(),
965 unified_sink_args,
966 max_rows_per_file,
967 approximate_bytes_per_file,
968 }),
969 },
970 };
971 Ok(self)
972 }
973
974 pub fn filter(self, predicate: Expr) -> Self {
992 let opt_state = self.get_opt_state();
993 let lp = self.get_plan_builder().filter(predicate).build();
994 Self::from_logical_plan(lp, opt_state)
995 }
996
997 pub fn remove(self, predicate: Expr) -> Self {
1015 self.filter(predicate.neq_missing(lit(true)))
1016 }
1017
1018 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1044 let exprs = exprs.as_ref().to_vec();
1045 self.select_impl(
1046 exprs,
1047 ProjectionOptions {
1048 run_parallel: true,
1049 duplicate_check: true,
1050 should_broadcast: true,
1051 },
1052 )
1053 }
1054
1055 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1056 let exprs = exprs.as_ref().to_vec();
1057 self.select_impl(
1058 exprs,
1059 ProjectionOptions {
1060 run_parallel: false,
1061 duplicate_check: true,
1062 should_broadcast: true,
1063 },
1064 )
1065 }
1066
1067 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1068 let opt_state = self.get_opt_state();
1069 let lp = self.get_plan_builder().project(exprs, options).build();
1070 Self::from_logical_plan(lp, opt_state)
1071 }
1072
1073 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1094 let keys = by
1095 .as_ref()
1096 .iter()
1097 .map(|e| e.clone().into())
1098 .collect::<Vec<_>>();
1099 let opt_state = self.get_opt_state();
1100
1101 #[cfg(feature = "dynamic_group_by")]
1102 {
1103 LazyGroupBy {
1104 logical_plan: self.logical_plan,
1105 opt_state,
1106 keys,
1107 predicates: vec![],
1108 maintain_order: false,
1109 dynamic_options: None,
1110 rolling_options: None,
1111 }
1112 }
1113
1114 #[cfg(not(feature = "dynamic_group_by"))]
1115 {
1116 LazyGroupBy {
1117 logical_plan: self.logical_plan,
1118 opt_state,
1119 keys,
1120 predicates: vec![],
1121 maintain_order: false,
1122 }
1123 }
1124 }
1125
1126 #[cfg(feature = "dynamic_group_by")]
1134 pub fn rolling<E: AsRef<[Expr]>>(
1135 mut self,
1136 index_column: Expr,
1137 group_by: E,
1138 mut options: RollingGroupOptions,
1139 ) -> LazyGroupBy {
1140 if let Expr::Column(name) = index_column {
1141 options.index_column = name;
1142 } else {
1143 let output_field = index_column
1144 .to_field(&self.collect_schema().unwrap())
1145 .unwrap();
1146 return self.with_column(index_column).rolling(
1147 Expr::Column(output_field.name().clone()),
1148 group_by,
1149 options,
1150 );
1151 }
1152 let opt_state = self.get_opt_state();
1153 LazyGroupBy {
1154 logical_plan: self.logical_plan,
1155 opt_state,
1156 predicates: vec![],
1157 keys: group_by.as_ref().to_vec(),
1158 maintain_order: true,
1159 dynamic_options: None,
1160 rolling_options: Some(options),
1161 }
1162 }
1163
1164 #[cfg(feature = "dynamic_group_by")]
1180 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1181 mut self,
1182 index_column: Expr,
1183 group_by: E,
1184 mut options: DynamicGroupOptions,
1185 ) -> LazyGroupBy {
1186 if let Expr::Column(name) = index_column {
1187 options.index_column = name;
1188 } else {
1189 let output_field = index_column
1190 .to_field(&self.collect_schema().unwrap())
1191 .unwrap();
1192 return self.with_column(index_column).group_by_dynamic(
1193 Expr::Column(output_field.name().clone()),
1194 group_by,
1195 options,
1196 );
1197 }
1198 let opt_state = self.get_opt_state();
1199 LazyGroupBy {
1200 logical_plan: self.logical_plan,
1201 opt_state,
1202 predicates: vec![],
1203 keys: group_by.as_ref().to_vec(),
1204 maintain_order: true,
1205 dynamic_options: Some(options),
1206 rolling_options: None,
1207 }
1208 }
1209
1210 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1212 let keys = by
1213 .as_ref()
1214 .iter()
1215 .map(|e| e.clone().into())
1216 .collect::<Vec<_>>();
1217 let opt_state = self.get_opt_state();
1218
1219 #[cfg(feature = "dynamic_group_by")]
1220 {
1221 LazyGroupBy {
1222 logical_plan: self.logical_plan,
1223 opt_state,
1224 keys,
1225 predicates: vec![],
1226 maintain_order: true,
1227 dynamic_options: None,
1228 rolling_options: None,
1229 }
1230 }
1231
1232 #[cfg(not(feature = "dynamic_group_by"))]
1233 {
1234 LazyGroupBy {
1235 logical_plan: self.logical_plan,
1236 opt_state,
1237 keys,
1238 predicates: vec![],
1239 maintain_order: true,
1240 }
1241 }
1242 }
1243
1244 #[cfg(feature = "semi_anti_join")]
1261 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1262 self.join(
1263 other,
1264 [left_on.into()],
1265 [right_on.into()],
1266 JoinArgs::new(JoinType::Anti),
1267 )
1268 }
1269
1270 #[cfg(feature = "cross_join")]
1272 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1273 self.join(
1274 other,
1275 vec![],
1276 vec![],
1277 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1278 )
1279 }
1280
1281 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1298 self.join(
1299 other,
1300 [left_on.into()],
1301 [right_on.into()],
1302 JoinArgs::new(JoinType::Left),
1303 )
1304 }
1305
1306 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1323 self.join(
1324 other,
1325 [left_on.into()],
1326 [right_on.into()],
1327 JoinArgs::new(JoinType::Inner),
1328 )
1329 }
1330
1331 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1348 self.join(
1349 other,
1350 [left_on.into()],
1351 [right_on.into()],
1352 JoinArgs::new(JoinType::Full),
1353 )
1354 }
1355
1356 #[cfg(feature = "semi_anti_join")]
1373 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1374 self.join(
1375 other,
1376 [left_on.into()],
1377 [right_on.into()],
1378 JoinArgs::new(JoinType::Semi),
1379 )
1380 }
1381
1382 pub fn join<E: AsRef<[Expr]>>(
1404 self,
1405 other: LazyFrame,
1406 left_on: E,
1407 right_on: E,
1408 args: JoinArgs,
1409 ) -> LazyFrame {
1410 let left_on = left_on.as_ref().to_vec();
1411 let right_on = right_on.as_ref().to_vec();
1412
1413 self._join_impl(other, left_on, right_on, args)
1414 }
1415
1416 fn _join_impl(
1417 self,
1418 other: LazyFrame,
1419 left_on: Vec<Expr>,
1420 right_on: Vec<Expr>,
1421 args: JoinArgs,
1422 ) -> LazyFrame {
1423 let JoinArgs {
1424 how,
1425 validation,
1426 suffix,
1427 slice,
1428 nulls_equal,
1429 coalesce,
1430 maintain_order,
1431 } = args;
1432
1433 if slice.is_some() {
1434 panic!("impl error: slice is not handled")
1435 }
1436
1437 let mut builder = self
1438 .join_builder()
1439 .with(other)
1440 .left_on(left_on)
1441 .right_on(right_on)
1442 .how(how)
1443 .validate(validation)
1444 .join_nulls(nulls_equal)
1445 .coalesce(coalesce)
1446 .maintain_order(maintain_order);
1447
1448 if let Some(suffix) = suffix {
1449 builder = builder.suffix(suffix);
1450 }
1451
1452 builder.finish()
1454 }
1455
1456 pub fn join_builder(self) -> JoinBuilder {
1462 JoinBuilder::new(self)
1463 }
1464
1465 pub fn with_column(self, expr: Expr) -> LazyFrame {
1483 let opt_state = self.get_opt_state();
1484 let lp = self
1485 .get_plan_builder()
1486 .with_columns(
1487 vec![expr],
1488 ProjectionOptions {
1489 run_parallel: false,
1490 duplicate_check: true,
1491 should_broadcast: true,
1492 },
1493 )
1494 .build();
1495 Self::from_logical_plan(lp, opt_state)
1496 }
1497
1498 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1513 let exprs = exprs.as_ref().to_vec();
1514 self.with_columns_impl(
1515 exprs,
1516 ProjectionOptions {
1517 run_parallel: true,
1518 duplicate_check: true,
1519 should_broadcast: true,
1520 },
1521 )
1522 }
1523
1524 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1526 let exprs = exprs.as_ref().to_vec();
1527 self.with_columns_impl(
1528 exprs,
1529 ProjectionOptions {
1530 run_parallel: false,
1531 duplicate_check: true,
1532 should_broadcast: true,
1533 },
1534 )
1535 }
1536
1537 pub fn match_to_schema(
1539 self,
1540 schema: SchemaRef,
1541 per_column: Arc<[MatchToSchemaPerColumn]>,
1542 extra_columns: ExtraColumnsPolicy,
1543 ) -> LazyFrame {
1544 let opt_state = self.get_opt_state();
1545 let lp = self
1546 .get_plan_builder()
1547 .match_to_schema(schema, per_column, extra_columns)
1548 .build();
1549 Self::from_logical_plan(lp, opt_state)
1550 }
1551
1552 pub fn pipe_with_schema(
1553 self,
1554 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1555 ) -> Self {
1556 let opt_state = self.get_opt_state();
1557 let lp = self
1558 .get_plan_builder()
1559 .pipe_with_schema(vec![], callback)
1560 .build();
1561 Self::from_logical_plan(lp, opt_state)
1562 }
1563
1564 pub fn pipe_with_schemas(
1565 self,
1566 others: Vec<LazyFrame>,
1567 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1568 ) -> Self {
1569 let opt_state = self.get_opt_state();
1570 let lp = self
1571 .get_plan_builder()
1572 .pipe_with_schema(
1573 others.into_iter().map(|lf| lf.logical_plan).collect(),
1574 callback,
1575 )
1576 .build();
1577 Self::from_logical_plan(lp, opt_state)
1578 }
1579
1580 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1581 let opt_state = self.get_opt_state();
1582 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1583 Self::from_logical_plan(lp, opt_state)
1584 }
1585
1586 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1587 let contexts = contexts
1588 .as_ref()
1589 .iter()
1590 .map(|lf| lf.logical_plan.clone())
1591 .collect();
1592 let opt_state = self.get_opt_state();
1593 let lp = self.get_plan_builder().with_context(contexts).build();
1594 Self::from_logical_plan(lp, opt_state)
1595 }
1596
1597 pub fn max(self) -> Self {
1601 self.map_private(DslFunction::Stats(StatsFunction::Max))
1602 }
1603
1604 pub fn min(self) -> Self {
1608 self.map_private(DslFunction::Stats(StatsFunction::Min))
1609 }
1610
1611 pub fn sum(self) -> Self {
1621 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1622 }
1623
1624 pub fn mean(self) -> Self {
1629 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1630 }
1631
1632 pub fn median(self) -> Self {
1638 self.map_private(DslFunction::Stats(StatsFunction::Median))
1639 }
1640
1641 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1643 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1644 quantile,
1645 method,
1646 }))
1647 }
1648
1649 pub fn std(self, ddof: u8) -> Self {
1662 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1663 }
1664
1665 pub fn var(self, ddof: u8) -> Self {
1675 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1676 }
1677
1678 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1680 self.explode_impl(columns, options, false)
1681 }
1682
1683 fn explode_impl(
1685 self,
1686 columns: Selector,
1687 options: ExplodeOptions,
1688 allow_empty: bool,
1689 ) -> LazyFrame {
1690 let opt_state = self.get_opt_state();
1691 let lp = self
1692 .get_plan_builder()
1693 .explode(columns, options, allow_empty)
1694 .build();
1695 Self::from_logical_plan(lp, opt_state)
1696 }
1697
1698 pub fn null_count(self) -> LazyFrame {
1700 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1701 }
1702
1703 pub fn unique_stable(
1708 self,
1709 subset: Option<Selector>,
1710 keep_strategy: UniqueKeepStrategy,
1711 ) -> LazyFrame {
1712 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1713 self.unique_stable_generic(subset, keep_strategy)
1714 }
1715
1716 pub fn unique_stable_generic(
1717 self,
1718 subset: Option<Vec<Expr>>,
1719 keep_strategy: UniqueKeepStrategy,
1720 ) -> LazyFrame {
1721 let opt_state = self.get_opt_state();
1722 let options = DistinctOptionsDSL {
1723 subset,
1724 maintain_order: true,
1725 keep_strategy,
1726 };
1727 let lp = self.get_plan_builder().distinct(options).build();
1728 Self::from_logical_plan(lp, opt_state)
1729 }
1730
1731 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1739 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1740 self.unique_generic(subset, keep_strategy)
1741 }
1742
1743 pub fn unique_generic(
1744 self,
1745 subset: Option<Vec<Expr>>,
1746 keep_strategy: UniqueKeepStrategy,
1747 ) -> LazyFrame {
1748 let opt_state = self.get_opt_state();
1749 let options = DistinctOptionsDSL {
1750 subset,
1751 maintain_order: false,
1752 keep_strategy,
1753 };
1754 let lp = self.get_plan_builder().distinct(options).build();
1755 Self::from_logical_plan(lp, opt_state)
1756 }
1757
1758 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1763 let opt_state = self.get_opt_state();
1764 let lp = self.get_plan_builder().drop_nans(subset).build();
1765 Self::from_logical_plan(lp, opt_state)
1766 }
1767
1768 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1773 let opt_state = self.get_opt_state();
1774 let lp = self.get_plan_builder().drop_nulls(subset).build();
1775 Self::from_logical_plan(lp, opt_state)
1776 }
1777
1778 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1788 let opt_state = self.get_opt_state();
1789 let lp = self.get_plan_builder().slice(offset, len).build();
1790 Self::from_logical_plan(lp, opt_state)
1791 }
1792
1793 pub fn first(self) -> LazyFrame {
1797 self.slice(0, 1)
1798 }
1799
1800 pub fn last(self) -> LazyFrame {
1804 self.slice(-1, 1)
1805 }
1806
1807 pub fn tail(self, n: IdxSize) -> LazyFrame {
1811 let neg_tail = -(n as i64);
1812 self.slice(neg_tail, n)
1813 }
1814
1815 #[cfg(feature = "pivot")]
1816 #[expect(clippy::too_many_arguments)]
1817 pub fn pivot(
1818 self,
1819 on: Selector,
1820 on_columns: Arc<DataFrame>,
1821 index: Selector,
1822 values: Selector,
1823 agg: Expr,
1824 maintain_order: bool,
1825 separator: PlSmallStr,
1826 ) -> LazyFrame {
1827 let opt_state = self.get_opt_state();
1828 let lp = self
1829 .get_plan_builder()
1830 .pivot(
1831 on,
1832 on_columns,
1833 index,
1834 values,
1835 agg,
1836 maintain_order,
1837 separator,
1838 )
1839 .build();
1840 Self::from_logical_plan(lp, opt_state)
1841 }
1842
1843 #[cfg(feature = "pivot")]
1847 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1848 let opt_state = self.get_opt_state();
1849 let lp = self.get_plan_builder().unpivot(args).build();
1850 Self::from_logical_plan(lp, opt_state)
1851 }
1852
1853 pub fn limit(self, n: IdxSize) -> LazyFrame {
1855 self.slice(0, n)
1856 }
1857
1858 pub fn map<F>(
1872 self,
1873 function: F,
1874 optimizations: AllowedOptimizations,
1875 schema: Option<Arc<dyn UdfSchema>>,
1876 name: Option<&'static str>,
1877 ) -> LazyFrame
1878 where
1879 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1880 {
1881 let opt_state = self.get_opt_state();
1882 let lp = self
1883 .get_plan_builder()
1884 .map(
1885 function,
1886 optimizations,
1887 schema,
1888 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1889 )
1890 .build();
1891 Self::from_logical_plan(lp, opt_state)
1892 }
1893
1894 #[cfg(feature = "python")]
1895 pub fn map_python(
1896 self,
1897 function: polars_utils::python_function::PythonFunction,
1898 optimizations: AllowedOptimizations,
1899 schema: Option<SchemaRef>,
1900 validate_output: bool,
1901 ) -> LazyFrame {
1902 let opt_state = self.get_opt_state();
1903 let lp = self
1904 .get_plan_builder()
1905 .map_python(function, optimizations, schema, validate_output)
1906 .build();
1907 Self::from_logical_plan(lp, opt_state)
1908 }
1909
1910 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1911 let opt_state = self.get_opt_state();
1912 let lp = self.get_plan_builder().map_private(function).build();
1913 Self::from_logical_plan(lp, opt_state)
1914 }
1915
1916 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1925 where
1926 S: Into<PlSmallStr>,
1927 {
1928 let name = name.into();
1929
1930 match &self.logical_plan {
1931 v @ DslPlan::Scan {
1932 scan_type,
1933 unified_scan_args,
1934 ..
1935 } if unified_scan_args.row_index.is_none()
1936 && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1937 {
1938 let DslPlan::Scan {
1939 sources,
1940 mut unified_scan_args,
1941 scan_type,
1942 cached_ir: _,
1943 } = v.clone()
1944 else {
1945 unreachable!()
1946 };
1947
1948 unified_scan_args.row_index = Some(RowIndex {
1949 name,
1950 offset: offset.unwrap_or(0),
1951 });
1952
1953 DslPlan::Scan {
1954 sources,
1955 unified_scan_args,
1956 scan_type,
1957 cached_ir: Default::default(),
1958 }
1959 .into()
1960 },
1961 _ => self.map_private(DslFunction::RowIndex { name, offset }),
1962 }
1963 }
1964
1965 pub fn count(self) -> LazyFrame {
1967 self.select(vec![col(PlSmallStr::from_static("*")).count()])
1968 }
1969
1970 #[cfg(feature = "dtype-struct")]
1973 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1974 self.map_private(DslFunction::Unnest {
1975 columns: cols,
1976 separator,
1977 })
1978 }
1979
1980 #[cfg(feature = "merge_sorted")]
1981 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1982 where
1983 S: Into<PlSmallStr>,
1984 {
1985 let key = key.into();
1986
1987 let lp = DslPlan::MergeSorted {
1988 input_left: Arc::new(self.logical_plan),
1989 input_right: Arc::new(other.logical_plan),
1990 key,
1991 };
1992 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1993 }
1994
1995 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1996 let lp = DslPlan::MapFunction {
1997 input: Arc::new(self.logical_plan),
1998 function: DslFunction::Hint(hint),
1999 };
2000 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2001 }
2002}
2003
2004#[derive(Clone)]
2006pub struct LazyGroupBy {
2007 pub logical_plan: DslPlan,
2008 opt_state: OptFlags,
2009 keys: Vec<Expr>,
2010 predicates: Vec<Expr>,
2011 maintain_order: bool,
2012 #[cfg(feature = "dynamic_group_by")]
2013 dynamic_options: Option<DynamicGroupOptions>,
2014 #[cfg(feature = "dynamic_group_by")]
2015 rolling_options: Option<RollingGroupOptions>,
2016}
2017
2018impl From<LazyGroupBy> for LazyFrame {
2019 fn from(lgb: LazyGroupBy) -> Self {
2020 Self {
2021 logical_plan: lgb.logical_plan,
2022 opt_state: lgb.opt_state,
2023 cached_arena: Default::default(),
2024 }
2025 }
2026}
2027
2028impl LazyGroupBy {
2029 pub fn having(mut self, predicate: Expr) -> Self {
2050 self.predicates.push(predicate);
2051 self
2052 }
2053
2054 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2076 #[cfg(feature = "dynamic_group_by")]
2077 let lp = DslBuilder::from(self.logical_plan)
2078 .group_by(
2079 self.keys,
2080 self.predicates,
2081 aggs,
2082 None,
2083 self.maintain_order,
2084 self.dynamic_options,
2085 self.rolling_options,
2086 )
2087 .build();
2088
2089 #[cfg(not(feature = "dynamic_group_by"))]
2090 let lp = DslBuilder::from(self.logical_plan)
2091 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2092 .build();
2093 LazyFrame::from_logical_plan(lp, self.opt_state)
2094 }
2095
2096 pub fn head(self, n: Option<usize>) -> LazyFrame {
2098 let keys = self
2099 .keys
2100 .iter()
2101 .filter_map(|expr| expr_output_name(expr).ok())
2102 .collect::<Vec<_>>();
2103
2104 self.agg([all().as_expr().head(n)]).explode_impl(
2105 all() - by_name(keys.iter().cloned(), false),
2106 ExplodeOptions {
2107 empty_as_null: true,
2108 keep_nulls: true,
2109 },
2110 true,
2111 )
2112 }
2113
2114 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2116 let keys = self
2117 .keys
2118 .iter()
2119 .filter_map(|expr| expr_output_name(expr).ok())
2120 .collect::<Vec<_>>();
2121
2122 self.agg([all().as_expr().tail(n)]).explode_impl(
2123 all() - by_name(keys.iter().cloned(), false),
2124 ExplodeOptions {
2125 empty_as_null: true,
2126 keep_nulls: true,
2127 },
2128 true,
2129 )
2130 }
2131
2132 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2137 if !self.predicates.is_empty() {
2138 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2139 }
2140
2141 #[cfg(feature = "dynamic_group_by")]
2142 let options = GroupbyOptions {
2143 dynamic: self.dynamic_options,
2144 rolling: self.rolling_options,
2145 slice: None,
2146 };
2147
2148 #[cfg(not(feature = "dynamic_group_by"))]
2149 let options = GroupbyOptions { slice: None };
2150
2151 let lp = DslPlan::GroupBy {
2152 input: Arc::new(self.logical_plan),
2153 keys: self.keys,
2154 predicates: vec![],
2155 aggs: vec![],
2156 apply: Some((f, schema)),
2157 maintain_order: self.maintain_order,
2158 options: Arc::new(options),
2159 };
2160 LazyFrame::from_logical_plan(lp, self.opt_state)
2161 }
2162}
2163
2164#[must_use]
2165pub struct JoinBuilder {
2166 lf: LazyFrame,
2167 how: JoinType,
2168 other: Option<LazyFrame>,
2169 left_on: Vec<Expr>,
2170 right_on: Vec<Expr>,
2171 allow_parallel: bool,
2172 force_parallel: bool,
2173 suffix: Option<PlSmallStr>,
2174 validation: JoinValidation,
2175 nulls_equal: bool,
2176 coalesce: JoinCoalesce,
2177 maintain_order: MaintainOrderJoin,
2178}
2179impl JoinBuilder {
2180 pub fn new(lf: LazyFrame) -> Self {
2182 Self {
2183 lf,
2184 other: None,
2185 how: JoinType::Inner,
2186 left_on: vec![],
2187 right_on: vec![],
2188 allow_parallel: true,
2189 force_parallel: false,
2190 suffix: None,
2191 validation: Default::default(),
2192 nulls_equal: false,
2193 coalesce: Default::default(),
2194 maintain_order: Default::default(),
2195 }
2196 }
2197
2198 pub fn with(mut self, other: LazyFrame) -> Self {
2200 self.other = Some(other);
2201 self
2202 }
2203
2204 pub fn how(mut self, how: JoinType) -> Self {
2206 self.how = how;
2207 self
2208 }
2209
2210 pub fn validate(mut self, validation: JoinValidation) -> Self {
2211 self.validation = validation;
2212 self
2213 }
2214
2215 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2219 let on = on.as_ref().to_vec();
2220 self.left_on.clone_from(&on);
2221 self.right_on = on;
2222 self
2223 }
2224
2225 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2229 self.left_on = on.as_ref().to_vec();
2230 self
2231 }
2232
2233 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2237 self.right_on = on.as_ref().to_vec();
2238 self
2239 }
2240
2241 pub fn allow_parallel(mut self, allow: bool) -> Self {
2243 self.allow_parallel = allow;
2244 self
2245 }
2246
2247 pub fn force_parallel(mut self, force: bool) -> Self {
2249 self.force_parallel = force;
2250 self
2251 }
2252
2253 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2255 self.nulls_equal = nulls_equal;
2256 self
2257 }
2258
2259 pub fn suffix<S>(mut self, suffix: S) -> Self
2262 where
2263 S: Into<PlSmallStr>,
2264 {
2265 self.suffix = Some(suffix.into());
2266 self
2267 }
2268
2269 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2271 self.coalesce = coalesce;
2272 self
2273 }
2274
2275 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2277 self.maintain_order = maintain_order;
2278 self
2279 }
2280
2281 pub fn finish(self) -> LazyFrame {
2283 let opt_state = self.lf.opt_state;
2284 let other = self.other.expect("'with' not set in join builder");
2285
2286 let args = JoinArgs {
2287 how: self.how,
2288 validation: self.validation,
2289 suffix: self.suffix,
2290 slice: None,
2291 nulls_equal: self.nulls_equal,
2292 coalesce: self.coalesce,
2293 maintain_order: self.maintain_order,
2294 };
2295
2296 let lp = self
2297 .lf
2298 .get_plan_builder()
2299 .join(
2300 other.logical_plan,
2301 self.left_on,
2302 self.right_on,
2303 JoinOptions {
2304 allow_parallel: self.allow_parallel,
2305 force_parallel: self.force_parallel,
2306 args,
2307 }
2308 .into(),
2309 )
2310 .build();
2311 LazyFrame::from_logical_plan(lp, opt_state)
2312 }
2313
2314 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2316 let opt_state = self.lf.opt_state;
2317 let other = self.other.expect("with not set");
2318
2319 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2321 if let Expr::BinaryExpr {
2322 op: Operator::And,
2323 left,
2324 right,
2325 } = predicate
2326 {
2327 decompose_and((*left).clone(), expanded_predicates);
2328 decompose_and((*right).clone(), expanded_predicates);
2329 } else {
2330 expanded_predicates.push(predicate);
2331 }
2332 }
2333 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2334 for predicate in predicates {
2335 decompose_and(predicate, &mut expanded_predicates);
2336 }
2337 let predicates: Vec<Expr> = expanded_predicates;
2338
2339 #[cfg(feature = "is_between")]
2341 let predicates: Vec<Expr> = {
2342 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2343 for predicate in predicates {
2344 if let Expr::Function {
2345 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2346 input,
2347 ..
2348 } = &predicate
2349 {
2350 if let [expr, lower, upper] = input.as_slice() {
2351 match closed {
2352 ClosedInterval::Both => {
2353 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2354 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2355 },
2356 ClosedInterval::Right => {
2357 expanded_predicates.push(expr.clone().gt(lower.clone()));
2358 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2359 },
2360 ClosedInterval::Left => {
2361 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2362 expanded_predicates.push(expr.clone().lt(upper.clone()));
2363 },
2364 ClosedInterval::None => {
2365 expanded_predicates.push(expr.clone().gt(lower.clone()));
2366 expanded_predicates.push(expr.clone().lt(upper.clone()));
2367 },
2368 }
2369 continue;
2370 }
2371 }
2372 expanded_predicates.push(predicate);
2373 }
2374 expanded_predicates
2375 };
2376
2377 let args = JoinArgs {
2378 how: self.how,
2379 validation: self.validation,
2380 suffix: self.suffix,
2381 slice: None,
2382 nulls_equal: self.nulls_equal,
2383 coalesce: self.coalesce,
2384 maintain_order: self.maintain_order,
2385 };
2386 let options = JoinOptions {
2387 allow_parallel: self.allow_parallel,
2388 force_parallel: self.force_parallel,
2389 args,
2390 };
2391
2392 let lp = DslPlan::Join {
2393 input_left: Arc::new(self.lf.logical_plan),
2394 input_right: Arc::new(other.logical_plan),
2395 left_on: Default::default(),
2396 right_on: Default::default(),
2397 predicates,
2398 options: Arc::from(options),
2399 };
2400
2401 LazyFrame::from_logical_plan(lp, opt_state)
2402 }
2403}
2404
2405pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2406 #[cfg(not(feature = "new_streaming"))]
2407 {
2408 None
2409 }
2410 #[cfg(feature = "new_streaming")]
2411 {
2412 Some(polars_stream::build_streaming_query_executor)
2413 }
2414};