1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::POOL;
26use polars_core::error::feature_gated;
27use polars_core::prelude::*;
28use polars_io::RowIndex;
29use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
30use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
31use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
32#[cfg(feature = "is_between")]
33use polars_ops::prelude::ClosedInterval;
34pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
35use polars_utils::pl_str::PlSmallStr;
36use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
37
38use crate::frame::cached_arenas::CachedArena;
39use crate::prelude::*;
40
41pub trait IntoLazy {
42 fn lazy(self) -> LazyFrame;
43}
44
45impl IntoLazy for DataFrame {
46 fn lazy(self) -> LazyFrame {
48 let lp = DslBuilder::from_existing_df(self).build();
49 LazyFrame {
50 logical_plan: lp,
51 opt_state: Default::default(),
52 cached_arena: Default::default(),
53 }
54 }
55}
56
57impl IntoLazy for LazyFrame {
58 fn lazy(self) -> LazyFrame {
59 self
60 }
61}
62
63#[derive(Clone, Default)]
68#[must_use]
69pub struct LazyFrame {
70 pub logical_plan: DslPlan,
71 pub(crate) opt_state: OptFlags,
72 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
73}
74
75impl From<DslPlan> for LazyFrame {
76 fn from(plan: DslPlan) -> Self {
77 Self {
78 logical_plan: plan,
79 opt_state: OptFlags::default(),
80 cached_arena: Default::default(),
81 }
82 }
83}
84
85impl LazyFrame {
86 pub(crate) fn from_inner(
87 logical_plan: DslPlan,
88 opt_state: OptFlags,
89 cached_arena: Arc<Mutex<Option<CachedArena>>>,
90 ) -> Self {
91 Self {
92 logical_plan,
93 opt_state,
94 cached_arena,
95 }
96 }
97
98 pub(crate) fn get_plan_builder(self) -> DslBuilder {
99 DslBuilder::from(self.logical_plan)
100 }
101
102 fn get_opt_state(&self) -> OptFlags {
103 self.opt_state
104 }
105
106 pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
107 LazyFrame {
108 logical_plan,
109 opt_state,
110 cached_arena: Default::default(),
111 }
112 }
113
114 pub fn get_current_optimizations(&self) -> OptFlags {
116 self.opt_state
117 }
118
119 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
121 self.opt_state = opt_state;
122 self
123 }
124
125 pub fn without_optimizations(self) -> Self {
127 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
128 }
129
130 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
132 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
133 self
134 }
135
136 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
138 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
139 self
140 }
141
142 pub fn with_check_order(mut self, toggle: bool) -> Self {
145 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
146 self
147 }
148
149 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
151 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
152 self
153 }
154
155 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
157 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
158 self
159 }
160
161 pub fn with_type_check(mut self, toggle: bool) -> Self {
163 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
164 self
165 }
166
167 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
169 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
170 self
171 }
172
173 #[cfg(feature = "cse")]
175 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
176 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
177 self
178 }
179
180 #[cfg(feature = "cse")]
182 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
183 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
184 self
185 }
186
187 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
189 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
190 self
191 }
192
193 #[cfg(feature = "new_streaming")]
194 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
195 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
196 self
197 }
198
199 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
201 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
202 self
203 }
204
205 pub fn _with_eager(mut self, toggle: bool) -> Self {
207 self.opt_state.set(OptFlags::EAGER, toggle);
208 self
209 }
210
211 pub fn describe_plan(&self) -> PolarsResult<String> {
213 Ok(self.clone().to_alp()?.describe())
214 }
215
216 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
218 Ok(self.clone().to_alp()?.describe_tree_format())
219 }
220
221 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
225 Ok(self.clone().to_alp_optimized()?.describe())
226 }
227
228 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
232 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
233 }
234
235 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
240 if optimized {
241 self.describe_optimized_plan()
242 } else {
243 self.describe_plan()
244 }
245 }
246
247 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
287 let opt_state = self.get_opt_state();
288 let lp = self
289 .get_plan_builder()
290 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
291 .build();
292 Self::from_logical_plan(lp, opt_state)
293 }
294
295 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
315 self,
316 by_exprs: E,
317 sort_options: SortMultipleOptions,
318 ) -> Self {
319 let by_exprs = by_exprs.as_ref().to_vec();
320 if by_exprs.is_empty() {
321 self
322 } else {
323 let opt_state = self.get_opt_state();
324 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
325 Self::from_logical_plan(lp, opt_state)
326 }
327 }
328
329 pub fn top_k<E: AsRef<[Expr]>>(
330 self,
331 k: IdxSize,
332 by_exprs: E,
333 sort_options: SortMultipleOptions,
334 ) -> Self {
335 self.sort_by_exprs(
337 by_exprs,
338 sort_options.with_order_reversed().with_nulls_last(true),
339 )
340 .slice(0, k)
341 }
342
343 pub fn bottom_k<E: AsRef<[Expr]>>(
344 self,
345 k: IdxSize,
346 by_exprs: E,
347 sort_options: SortMultipleOptions,
348 ) -> Self {
349 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
351 .slice(0, k)
352 }
353
354 pub fn reverse(self) -> Self {
370 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
371 }
372
373 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
381 where
382 I: IntoIterator<Item = T>,
383 J: IntoIterator<Item = S>,
384 T: AsRef<str>,
385 S: AsRef<str>,
386 {
387 let iter = existing.into_iter();
388 let cap = iter.size_hint().0;
389 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
391
392 for (existing, new) in iter.zip(new) {
395 let existing = existing.as_ref();
396 let new = new.as_ref();
397 if new != existing {
398 existing_vec.push(existing.into());
399 new_vec.push(new.into());
400 }
401 }
402
403 self.map_private(DslFunction::Rename {
404 existing: existing_vec.into(),
405 new: new_vec.into(),
406 strict,
407 })
408 }
409
410 pub fn drop(self, columns: Selector) -> Self {
417 let opt_state = self.get_opt_state();
418 let lp = self.get_plan_builder().drop(columns).build();
419 Self::from_logical_plan(lp, opt_state)
420 }
421
422 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
427 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
428 }
429
430 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
435 self.select(vec![
436 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
437 ])
438 }
439
440 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
442 let opt_state = self.get_opt_state();
443 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
444 Self::from_logical_plan(lp, opt_state)
445 }
446
447 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
449 let opt_state = self.get_opt_state();
450 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
451 Self::from_logical_plan(lp, opt_state)
452 }
453
454 pub fn cache(self) -> Self {
458 let opt_state = self.get_opt_state();
459 let lp = self.get_plan_builder().cache().build();
460 Self::from_logical_plan(lp, opt_state)
461 }
462
463 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
465 let cast_cols: Vec<Expr> = dtypes
466 .into_iter()
467 .map(|(name, dt)| {
468 let name = PlSmallStr::from_str(name);
469
470 if strict {
471 col(name).strict_cast(dt)
472 } else {
473 col(name).cast(dt)
474 }
475 })
476 .collect();
477
478 if cast_cols.is_empty() {
479 self
480 } else {
481 self.with_columns(cast_cols)
482 }
483 }
484
485 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
487 self.with_columns(vec![if strict {
488 col(PlSmallStr::from_static("*")).strict_cast(dtype)
489 } else {
490 col(PlSmallStr::from_static("*")).cast(dtype)
491 }])
492 }
493
494 pub fn optimize(
495 self,
496 lp_arena: &mut Arena<IR>,
497 expr_arena: &mut Arena<AExpr>,
498 ) -> PolarsResult<Node> {
499 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
500 }
501
502 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
503 let (mut lp_arena, mut expr_arena) = self.get_arenas();
504 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
505
506 Ok(IRPlan::new(node, lp_arena, expr_arena))
507 }
508
509 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
510 let (mut lp_arena, mut expr_arena) = self.get_arenas();
511 let node = to_alp(
512 self.logical_plan,
513 &mut expr_arena,
514 &mut lp_arena,
515 &mut self.opt_state,
516 )?;
517 let plan = IRPlan::new(node, lp_arena, expr_arena);
518 Ok(plan)
519 }
520
521 pub(crate) fn optimize_with_scratch(
522 self,
523 lp_arena: &mut Arena<IR>,
524 expr_arena: &mut Arena<AExpr>,
525 scratch: &mut Vec<Node>,
526 ) -> PolarsResult<Node> {
527 let lp_top = optimize(
528 self.logical_plan,
529 self.opt_state,
530 lp_arena,
531 expr_arena,
532 scratch,
533 apply_scan_predicate_to_scan_ir,
534 )?;
535
536 Ok(lp_top)
537 }
538
539 fn prepare_collect_post_opt<P>(
540 mut self,
541 check_sink: bool,
542 query_start: Option<std::time::Instant>,
543 post_opt: P,
544 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
545 where
546 P: FnOnce(
547 Node,
548 &mut Arena<IR>,
549 &mut Arena<AExpr>,
550 Option<std::time::Duration>,
551 ) -> PolarsResult<()>,
552 {
553 let (mut lp_arena, mut expr_arena) = self.get_arenas();
554
555 let mut scratch = vec![];
556 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
557
558 post_opt(
559 lp_top,
560 &mut lp_arena,
561 &mut expr_arena,
562 query_start.map(|s| s.elapsed()),
565 )?;
566
567 let no_file_sink = if check_sink {
569 !matches!(
570 lp_arena.get(lp_top),
571 IR::Sink {
572 payload: SinkTypeIR::File { .. },
573 ..
574 }
575 )
576 } else {
577 true
578 };
579 let physical_plan = create_physical_plan(
580 lp_top,
581 &mut lp_arena,
582 &mut expr_arena,
583 BUILD_STREAMING_EXECUTOR,
584 )?;
585
586 let state = ExecutionState::new();
587 Ok((state, physical_plan, no_file_sink))
588 }
589
590 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
592 where
593 P: FnOnce(
594 Node,
595 &mut Arena<IR>,
596 &mut Arena<AExpr>,
597 Option<std::time::Duration>,
598 ) -> PolarsResult<()>,
599 {
600 let (mut state, mut physical_plan, _) =
601 self.prepare_collect_post_opt(false, None, post_opt)?;
602 physical_plan.execute(&mut state)
603 }
604
605 #[allow(unused_mut)]
606 fn prepare_collect(
607 self,
608 check_sink: bool,
609 query_start: Option<std::time::Instant>,
610 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
611 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
612 }
613
614 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
619 let payload = match &self.logical_plan {
620 DslPlan::Sink { payload, .. } => payload.clone(),
621 DslPlan::SinkMultiple { .. } => {
622 polars_ensure!(matches!(engine, Engine::Auto | Engine::Streaming), InvalidOperation: "lazy multisinks only supported on streaming engine");
623 feature_gated!("new_streaming", {
624 let sink_multiple = self.with_new_streaming(true);
625 let mut alp_plan = sink_multiple.to_alp_optimized()?;
626 let result = polars_stream::run_query(
627 alp_plan.lp_top,
628 &mut alp_plan.lp_arena,
629 &mut alp_plan.expr_arena,
630 );
631 return result.map(|_| DataFrame::empty());
632 })
633 },
634 _ => {
635 self.logical_plan = DslPlan::Sink {
636 input: Arc::new(self.logical_plan),
637 payload: SinkType::Memory,
638 };
639 SinkType::Memory
640 },
641 };
642
643 if engine == Engine::Auto {
645 engine = match payload {
646 #[cfg(feature = "new_streaming")]
647 SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
648 _ => Engine::InMemory,
649 };
650 }
651 if engine == Engine::Gpu {
653 engine = Engine::InMemory;
654 }
655
656 #[cfg(feature = "new_streaming")]
657 {
658 if let Some(result) = self.try_new_streaming_if_requested() {
659 return result.map(|v| v.unwrap_single());
660 }
661 }
662
663 match engine {
664 Engine::Auto => unreachable!(),
665 Engine::Streaming => {
666 feature_gated!("new_streaming", self = self.with_new_streaming(true))
667 },
668 _ => {},
669 }
670 let mut alp_plan = self.clone().to_alp_optimized()?;
671
672 match engine {
673 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
674 let result = polars_stream::run_query(
675 alp_plan.lp_top,
676 &mut alp_plan.lp_arena,
677 &mut alp_plan.expr_arena,
678 );
679 result.map(|v| v.unwrap_single())
680 }),
681 Engine::Gpu => {
682 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
683 },
684 Engine::InMemory => {
685 let mut physical_plan = create_physical_plan(
686 alp_plan.lp_top,
687 &mut alp_plan.lp_arena,
688 &mut alp_plan.expr_arena,
689 BUILD_STREAMING_EXECUTOR,
690 )?;
691 let mut state = ExecutionState::new();
692 physical_plan.execute(&mut state)
693 },
694 }
695 }
696
697 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
698 let sink_multiple = LazyFrame {
699 logical_plan: DslPlan::SinkMultiple { inputs: plans },
700 opt_state,
701 cached_arena: Default::default(),
702 };
703 sink_multiple.explain(true)
704 }
705
706 pub fn collect_all_with_engine(
707 plans: Vec<DslPlan>,
708 mut engine: Engine,
709 opt_state: OptFlags,
710 ) -> PolarsResult<Vec<DataFrame>> {
711 if plans.is_empty() {
712 return Ok(Vec::new());
713 }
714
715 if engine == Engine::Auto {
717 engine = Engine::InMemory;
718 }
719 if engine == Engine::Gpu {
721 engine = Engine::InMemory;
722 }
723
724 let mut sink_multiple = LazyFrame {
725 logical_plan: DslPlan::SinkMultiple { inputs: plans },
726 opt_state,
727 cached_arena: Default::default(),
728 };
729
730 #[cfg(feature = "new_streaming")]
731 {
732 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
733 return result.map(|v| v.unwrap_multiple());
734 }
735 }
736
737 match engine {
738 Engine::Auto => unreachable!(),
739 Engine::Streaming => {
740 feature_gated!(
741 "new_streaming",
742 sink_multiple = sink_multiple.with_new_streaming(true)
743 )
744 },
745 _ => {},
746 }
747 let mut alp_plan = sink_multiple.to_alp_optimized()?;
748
749 if engine == Engine::Streaming {
750 feature_gated!("new_streaming", {
751 let result = polars_stream::run_query(
752 alp_plan.lp_top,
753 &mut alp_plan.lp_arena,
754 &mut alp_plan.expr_arena,
755 );
756 return result.map(|v| v.unwrap_multiple());
757 });
758 }
759
760 let IR::SinkMultiple { inputs } = alp_plan.root() else {
761 unreachable!()
762 };
763
764 let mut multiplan = create_multiple_physical_plans(
765 inputs.clone().as_slice(),
766 &mut alp_plan.lp_arena,
767 &mut alp_plan.expr_arena,
768 BUILD_STREAMING_EXECUTOR,
769 )?;
770
771 match engine {
772 Engine::Gpu => polars_bail!(
773 InvalidOperation: "collect_all is not supported for the gpu engine"
774 ),
775 Engine::InMemory => {
776 let mut state = ExecutionState::new();
780 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
781 cache_prefiller.execute(&mut state)?;
782 }
783 let out = POOL.install(|| {
784 multiplan
785 .physical_plans
786 .chunks_mut(POOL.current_num_threads() * 3)
787 .map(|chunk| {
788 chunk
789 .into_par_iter()
790 .enumerate()
791 .map(|(idx, input)| {
792 let mut input = std::mem::take(input);
793 let mut state = state.split();
794 state.branch_idx += idx;
795
796 let df = input.execute(&mut state)?;
797 Ok(df)
798 })
799 .collect::<PolarsResult<Vec<_>>>()
800 })
801 .collect::<PolarsResult<Vec<_>>>()
802 });
803 Ok(out?.into_iter().flatten().collect())
804 },
805 _ => unreachable!(),
806 }
807 }
808
809 pub fn collect(self) -> PolarsResult<DataFrame> {
827 self.collect_with_engine(Engine::InMemory)
828 }
829
830 #[cfg(feature = "async")]
835 pub fn collect_batches(
836 self,
837 engine: Engine,
838 maintain_order: bool,
839 chunk_size: Option<NonZeroUsize>,
840 lazy: bool,
841 ) -> PolarsResult<CollectBatches> {
842 let (send, recv) = sync_channel(1);
843 let runner_send = send.clone();
844 let ldf = self.sink_batches(
845 PlanCallback::new(move |df| {
846 let send_result = send.send(Ok(df));
848 Ok(send_result.is_err())
849 }),
850 maintain_order,
851 chunk_size,
852 )?;
853 let runner = move || {
854 polars_io::pl_async::get_runtime().spawn_blocking(move || {
857 if let Err(e) = ldf.collect_with_engine(engine) {
858 runner_send.send(Err(e)).ok();
859 }
860 });
861 };
862
863 let mut collect_batches = CollectBatches {
864 recv,
865 runner: Some(Box::new(runner)),
866 };
867 if !lazy {
868 collect_batches.start();
869 }
870 Ok(collect_batches)
871 }
872
873 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
876 where
877 P: FnOnce(
878 Node,
879 &mut Arena<IR>,
880 &mut Arena<AExpr>,
881 Option<std::time::Duration>,
882 ) -> PolarsResult<()>,
883 {
884 let query_start = std::time::Instant::now();
885 let (mut state, mut physical_plan, _) =
886 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
887 state.time_nodes(query_start);
888 let out = physical_plan.execute(&mut state)?;
889 let timer_df = state.finish_timer()?;
890 Ok((out, timer_df))
891 }
892
893 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
901 self._profile_post_opt(|_, _, _, _| Ok(()))
902 }
903
904 pub fn sink_batches(
905 mut self,
906 function: PlanCallback<DataFrame, bool>,
907 maintain_order: bool,
908 chunk_size: Option<NonZeroUsize>,
909 ) -> PolarsResult<Self> {
910 use polars_plan::prelude::sink::CallbackSinkType;
911
912 polars_ensure!(
913 !matches!(self.logical_plan, DslPlan::Sink { .. }),
914 InvalidOperation: "cannot create a sink on top of another sink"
915 );
916
917 self.logical_plan = DslPlan::Sink {
918 input: Arc::new(self.logical_plan),
919 payload: SinkType::Callback(CallbackSinkType {
920 function,
921 maintain_order,
922 chunk_size,
923 }),
924 };
925
926 Ok(self)
927 }
928
929 #[cfg(feature = "new_streaming")]
930 pub fn try_new_streaming_if_requested(
931 &mut self,
932 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
933 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
934 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
935
936 if auto_new_streaming || force_new_streaming {
937 let mut new_stream_lazy = self.clone();
940 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
941 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
942 Ok(v) => v,
943 Err(e) => return Some(Err(e)),
944 };
945
946 let f = || {
947 polars_stream::run_query(
948 alp_plan.lp_top,
949 &mut alp_plan.lp_arena,
950 &mut alp_plan.expr_arena,
951 )
952 };
953
954 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
955 Ok(v) => return Some(v),
956 Err(e) => {
957 if !force_new_streaming
960 && auto_new_streaming
961 && e.downcast_ref::<&str>()
962 .map(|s| s.starts_with("not yet implemented"))
963 .unwrap_or(false)
964 {
965 if polars_core::config::verbose() {
966 eprintln!(
967 "caught unimplemented error in new streaming engine, falling back to normal engine"
968 );
969 }
970 } else {
971 std::panic::resume_unwind(e);
972 }
973 },
974 }
975 }
976
977 None
978 }
979
980 pub fn sink(
981 mut self,
982 sink_type: SinkDestination,
983 file_format: FileWriteFormat,
984 unified_sink_args: UnifiedSinkArgs,
985 ) -> PolarsResult<Self> {
986 polars_ensure!(
987 !matches!(self.logical_plan, DslPlan::Sink { .. }),
988 InvalidOperation: "cannot create a sink on top of another sink"
989 );
990
991 self.logical_plan = DslPlan::Sink {
992 input: Arc::new(self.logical_plan),
993 payload: match sink_type {
994 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
995 target,
996 file_format,
997 unified_sink_args,
998 }),
999 SinkDestination::Partitioned {
1000 base_path,
1001 file_path_provider,
1002 partition_strategy,
1003 max_rows_per_file,
1004 approximate_bytes_per_file,
1005 } => SinkType::Partitioned(PartitionedSinkOptions {
1006 base_path,
1007 file_path_provider,
1008 partition_strategy,
1009 file_format,
1010 unified_sink_args,
1011 max_rows_per_file,
1012 approximate_bytes_per_file,
1013 }),
1014 },
1015 };
1016 Ok(self)
1017 }
1018
1019 pub fn filter(self, predicate: Expr) -> Self {
1037 let opt_state = self.get_opt_state();
1038 let lp = self.get_plan_builder().filter(predicate).build();
1039 Self::from_logical_plan(lp, opt_state)
1040 }
1041
1042 pub fn remove(self, predicate: Expr) -> Self {
1060 self.filter(predicate.neq_missing(lit(true)))
1061 }
1062
1063 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1089 let exprs = exprs.as_ref().to_vec();
1090 self.select_impl(
1091 exprs,
1092 ProjectionOptions {
1093 run_parallel: true,
1094 duplicate_check: true,
1095 should_broadcast: true,
1096 },
1097 )
1098 }
1099
1100 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1101 let exprs = exprs.as_ref().to_vec();
1102 self.select_impl(
1103 exprs,
1104 ProjectionOptions {
1105 run_parallel: false,
1106 duplicate_check: true,
1107 should_broadcast: true,
1108 },
1109 )
1110 }
1111
1112 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1113 let opt_state = self.get_opt_state();
1114 let lp = self.get_plan_builder().project(exprs, options).build();
1115 Self::from_logical_plan(lp, opt_state)
1116 }
1117
1118 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1139 let keys = by
1140 .as_ref()
1141 .iter()
1142 .map(|e| e.clone().into())
1143 .collect::<Vec<_>>();
1144 let opt_state = self.get_opt_state();
1145
1146 #[cfg(feature = "dynamic_group_by")]
1147 {
1148 LazyGroupBy {
1149 logical_plan: self.logical_plan,
1150 opt_state,
1151 keys,
1152 predicates: vec![],
1153 maintain_order: false,
1154 dynamic_options: None,
1155 rolling_options: None,
1156 }
1157 }
1158
1159 #[cfg(not(feature = "dynamic_group_by"))]
1160 {
1161 LazyGroupBy {
1162 logical_plan: self.logical_plan,
1163 opt_state,
1164 keys,
1165 predicates: vec![],
1166 maintain_order: false,
1167 }
1168 }
1169 }
1170
1171 #[cfg(feature = "dynamic_group_by")]
1179 pub fn rolling<E: AsRef<[Expr]>>(
1180 mut self,
1181 index_column: Expr,
1182 group_by: E,
1183 mut options: RollingGroupOptions,
1184 ) -> LazyGroupBy {
1185 if let Expr::Column(name) = index_column {
1186 options.index_column = name;
1187 } else {
1188 let output_field = index_column
1189 .to_field(&self.collect_schema().unwrap())
1190 .unwrap();
1191 return self.with_column(index_column).rolling(
1192 Expr::Column(output_field.name().clone()),
1193 group_by,
1194 options,
1195 );
1196 }
1197 let opt_state = self.get_opt_state();
1198 LazyGroupBy {
1199 logical_plan: self.logical_plan,
1200 opt_state,
1201 predicates: vec![],
1202 keys: group_by.as_ref().to_vec(),
1203 maintain_order: true,
1204 dynamic_options: None,
1205 rolling_options: Some(options),
1206 }
1207 }
1208
1209 #[cfg(feature = "dynamic_group_by")]
1225 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1226 mut self,
1227 index_column: Expr,
1228 group_by: E,
1229 mut options: DynamicGroupOptions,
1230 ) -> LazyGroupBy {
1231 if let Expr::Column(name) = index_column {
1232 options.index_column = name;
1233 } else {
1234 let output_field = index_column
1235 .to_field(&self.collect_schema().unwrap())
1236 .unwrap();
1237 return self.with_column(index_column).group_by_dynamic(
1238 Expr::Column(output_field.name().clone()),
1239 group_by,
1240 options,
1241 );
1242 }
1243 let opt_state = self.get_opt_state();
1244 LazyGroupBy {
1245 logical_plan: self.logical_plan,
1246 opt_state,
1247 predicates: vec![],
1248 keys: group_by.as_ref().to_vec(),
1249 maintain_order: true,
1250 dynamic_options: Some(options),
1251 rolling_options: None,
1252 }
1253 }
1254
1255 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1257 let keys = by
1258 .as_ref()
1259 .iter()
1260 .map(|e| e.clone().into())
1261 .collect::<Vec<_>>();
1262 let opt_state = self.get_opt_state();
1263
1264 #[cfg(feature = "dynamic_group_by")]
1265 {
1266 LazyGroupBy {
1267 logical_plan: self.logical_plan,
1268 opt_state,
1269 keys,
1270 predicates: vec![],
1271 maintain_order: true,
1272 dynamic_options: None,
1273 rolling_options: None,
1274 }
1275 }
1276
1277 #[cfg(not(feature = "dynamic_group_by"))]
1278 {
1279 LazyGroupBy {
1280 logical_plan: self.logical_plan,
1281 opt_state,
1282 keys,
1283 predicates: vec![],
1284 maintain_order: true,
1285 }
1286 }
1287 }
1288
1289 #[cfg(feature = "semi_anti_join")]
1306 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1307 self.join(
1308 other,
1309 [left_on.into()],
1310 [right_on.into()],
1311 JoinArgs::new(JoinType::Anti),
1312 )
1313 }
1314
1315 #[cfg(feature = "cross_join")]
1317 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1318 self.join(
1319 other,
1320 vec![],
1321 vec![],
1322 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1323 )
1324 }
1325
1326 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1343 self.join(
1344 other,
1345 [left_on.into()],
1346 [right_on.into()],
1347 JoinArgs::new(JoinType::Left),
1348 )
1349 }
1350
1351 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1368 self.join(
1369 other,
1370 [left_on.into()],
1371 [right_on.into()],
1372 JoinArgs::new(JoinType::Inner),
1373 )
1374 }
1375
1376 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1393 self.join(
1394 other,
1395 [left_on.into()],
1396 [right_on.into()],
1397 JoinArgs::new(JoinType::Full),
1398 )
1399 }
1400
1401 #[cfg(feature = "semi_anti_join")]
1418 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1419 self.join(
1420 other,
1421 [left_on.into()],
1422 [right_on.into()],
1423 JoinArgs::new(JoinType::Semi),
1424 )
1425 }
1426
1427 pub fn join<E: AsRef<[Expr]>>(
1449 self,
1450 other: LazyFrame,
1451 left_on: E,
1452 right_on: E,
1453 args: JoinArgs,
1454 ) -> LazyFrame {
1455 let left_on = left_on.as_ref().to_vec();
1456 let right_on = right_on.as_ref().to_vec();
1457
1458 self._join_impl(other, left_on, right_on, args)
1459 }
1460
1461 fn _join_impl(
1462 self,
1463 other: LazyFrame,
1464 left_on: Vec<Expr>,
1465 right_on: Vec<Expr>,
1466 args: JoinArgs,
1467 ) -> LazyFrame {
1468 let JoinArgs {
1469 how,
1470 validation,
1471 suffix,
1472 slice,
1473 nulls_equal,
1474 coalesce,
1475 maintain_order,
1476 build_side,
1477 } = args;
1478
1479 if slice.is_some() {
1480 panic!("impl error: slice is not handled")
1481 }
1482
1483 let mut builder = self
1484 .join_builder()
1485 .with(other)
1486 .left_on(left_on)
1487 .right_on(right_on)
1488 .how(how)
1489 .validate(validation)
1490 .join_nulls(nulls_equal)
1491 .coalesce(coalesce)
1492 .maintain_order(maintain_order)
1493 .build_side(build_side);
1494
1495 if let Some(suffix) = suffix {
1496 builder = builder.suffix(suffix);
1497 }
1498
1499 builder.finish()
1501 }
1502
1503 pub fn join_builder(self) -> JoinBuilder {
1509 JoinBuilder::new(self)
1510 }
1511
1512 pub fn with_column(self, expr: Expr) -> LazyFrame {
1530 let opt_state = self.get_opt_state();
1531 let lp = self
1532 .get_plan_builder()
1533 .with_columns(
1534 vec![expr],
1535 ProjectionOptions {
1536 run_parallel: false,
1537 duplicate_check: true,
1538 should_broadcast: true,
1539 },
1540 )
1541 .build();
1542 Self::from_logical_plan(lp, opt_state)
1543 }
1544
1545 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1560 let exprs = exprs.as_ref().to_vec();
1561 self.with_columns_impl(
1562 exprs,
1563 ProjectionOptions {
1564 run_parallel: true,
1565 duplicate_check: true,
1566 should_broadcast: true,
1567 },
1568 )
1569 }
1570
1571 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1573 let exprs = exprs.as_ref().to_vec();
1574 self.with_columns_impl(
1575 exprs,
1576 ProjectionOptions {
1577 run_parallel: false,
1578 duplicate_check: true,
1579 should_broadcast: true,
1580 },
1581 )
1582 }
1583
1584 pub fn match_to_schema(
1586 self,
1587 schema: SchemaRef,
1588 per_column: Arc<[MatchToSchemaPerColumn]>,
1589 extra_columns: ExtraColumnsPolicy,
1590 ) -> LazyFrame {
1591 let opt_state = self.get_opt_state();
1592 let lp = self
1593 .get_plan_builder()
1594 .match_to_schema(schema, per_column, extra_columns)
1595 .build();
1596 Self::from_logical_plan(lp, opt_state)
1597 }
1598
1599 pub fn pipe_with_schema(
1600 self,
1601 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1602 ) -> Self {
1603 let opt_state = self.get_opt_state();
1604 let lp = self
1605 .get_plan_builder()
1606 .pipe_with_schema(vec![], callback)
1607 .build();
1608 Self::from_logical_plan(lp, opt_state)
1609 }
1610
1611 pub fn pipe_with_schemas(
1612 self,
1613 others: Vec<LazyFrame>,
1614 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1615 ) -> Self {
1616 let opt_state = self.get_opt_state();
1617 let lp = self
1618 .get_plan_builder()
1619 .pipe_with_schema(
1620 others.into_iter().map(|lf| lf.logical_plan).collect(),
1621 callback,
1622 )
1623 .build();
1624 Self::from_logical_plan(lp, opt_state)
1625 }
1626
1627 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1628 let opt_state = self.get_opt_state();
1629 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1630 Self::from_logical_plan(lp, opt_state)
1631 }
1632
1633 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1634 let contexts = contexts
1635 .as_ref()
1636 .iter()
1637 .map(|lf| lf.logical_plan.clone())
1638 .collect();
1639 let opt_state = self.get_opt_state();
1640 let lp = self.get_plan_builder().with_context(contexts).build();
1641 Self::from_logical_plan(lp, opt_state)
1642 }
1643
1644 pub fn max(self) -> Self {
1648 self.map_private(DslFunction::Stats(StatsFunction::Max))
1649 }
1650
1651 pub fn min(self) -> Self {
1655 self.map_private(DslFunction::Stats(StatsFunction::Min))
1656 }
1657
1658 pub fn sum(self) -> Self {
1668 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1669 }
1670
1671 pub fn mean(self) -> Self {
1676 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1677 }
1678
1679 pub fn median(self) -> Self {
1685 self.map_private(DslFunction::Stats(StatsFunction::Median))
1686 }
1687
1688 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1690 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1691 quantile,
1692 method,
1693 }))
1694 }
1695
1696 pub fn std(self, ddof: u8) -> Self {
1709 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1710 }
1711
1712 pub fn var(self, ddof: u8) -> Self {
1722 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1723 }
1724
1725 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1727 self.explode_impl(columns, options, false)
1728 }
1729
1730 fn explode_impl(
1732 self,
1733 columns: Selector,
1734 options: ExplodeOptions,
1735 allow_empty: bool,
1736 ) -> LazyFrame {
1737 let opt_state = self.get_opt_state();
1738 let lp = self
1739 .get_plan_builder()
1740 .explode(columns, options, allow_empty)
1741 .build();
1742 Self::from_logical_plan(lp, opt_state)
1743 }
1744
1745 pub fn null_count(self) -> LazyFrame {
1747 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1748 }
1749
1750 pub fn unique_stable(
1755 self,
1756 subset: Option<Selector>,
1757 keep_strategy: UniqueKeepStrategy,
1758 ) -> LazyFrame {
1759 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1760 self.unique_stable_generic(subset, keep_strategy)
1761 }
1762
1763 pub fn unique_stable_generic(
1764 self,
1765 subset: Option<Vec<Expr>>,
1766 keep_strategy: UniqueKeepStrategy,
1767 ) -> LazyFrame {
1768 let opt_state = self.get_opt_state();
1769 let options = DistinctOptionsDSL {
1770 subset,
1771 maintain_order: true,
1772 keep_strategy,
1773 };
1774 let lp = self.get_plan_builder().distinct(options).build();
1775 Self::from_logical_plan(lp, opt_state)
1776 }
1777
1778 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1786 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1787 self.unique_generic(subset, keep_strategy)
1788 }
1789
1790 pub fn unique_generic(
1791 self,
1792 subset: Option<Vec<Expr>>,
1793 keep_strategy: UniqueKeepStrategy,
1794 ) -> LazyFrame {
1795 let opt_state = self.get_opt_state();
1796 let options = DistinctOptionsDSL {
1797 subset,
1798 maintain_order: false,
1799 keep_strategy,
1800 };
1801 let lp = self.get_plan_builder().distinct(options).build();
1802 Self::from_logical_plan(lp, opt_state)
1803 }
1804
1805 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1810 let opt_state = self.get_opt_state();
1811 let lp = self.get_plan_builder().drop_nans(subset).build();
1812 Self::from_logical_plan(lp, opt_state)
1813 }
1814
1815 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1820 let opt_state = self.get_opt_state();
1821 let lp = self.get_plan_builder().drop_nulls(subset).build();
1822 Self::from_logical_plan(lp, opt_state)
1823 }
1824
1825 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1835 let opt_state = self.get_opt_state();
1836 let lp = self.get_plan_builder().slice(offset, len).build();
1837 Self::from_logical_plan(lp, opt_state)
1838 }
1839
1840 pub fn first(self) -> LazyFrame {
1844 self.slice(0, 1)
1845 }
1846
1847 pub fn last(self) -> LazyFrame {
1851 self.slice(-1, 1)
1852 }
1853
1854 pub fn tail(self, n: IdxSize) -> LazyFrame {
1858 let neg_tail = -(n as i64);
1859 self.slice(neg_tail, n)
1860 }
1861
1862 #[cfg(feature = "pivot")]
1863 #[expect(clippy::too_many_arguments)]
1864 pub fn pivot(
1865 self,
1866 on: Selector,
1867 on_columns: Arc<DataFrame>,
1868 index: Selector,
1869 values: Selector,
1870 agg: Expr,
1871 maintain_order: bool,
1872 separator: PlSmallStr,
1873 ) -> LazyFrame {
1874 let opt_state = self.get_opt_state();
1875 let lp = self
1876 .get_plan_builder()
1877 .pivot(
1878 on,
1879 on_columns,
1880 index,
1881 values,
1882 agg,
1883 maintain_order,
1884 separator,
1885 )
1886 .build();
1887 Self::from_logical_plan(lp, opt_state)
1888 }
1889
1890 #[cfg(feature = "pivot")]
1894 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1895 let opt_state = self.get_opt_state();
1896 let lp = self.get_plan_builder().unpivot(args).build();
1897 Self::from_logical_plan(lp, opt_state)
1898 }
1899
1900 pub fn limit(self, n: IdxSize) -> LazyFrame {
1902 self.slice(0, n)
1903 }
1904
1905 pub fn map<F>(
1919 self,
1920 function: F,
1921 optimizations: AllowedOptimizations,
1922 schema: Option<Arc<dyn UdfSchema>>,
1923 name: Option<&'static str>,
1924 ) -> LazyFrame
1925 where
1926 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1927 {
1928 let opt_state = self.get_opt_state();
1929 let lp = self
1930 .get_plan_builder()
1931 .map(
1932 function,
1933 optimizations,
1934 schema,
1935 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1936 )
1937 .build();
1938 Self::from_logical_plan(lp, opt_state)
1939 }
1940
1941 #[cfg(feature = "python")]
1942 pub fn map_python(
1943 self,
1944 function: polars_utils::python_function::PythonFunction,
1945 optimizations: AllowedOptimizations,
1946 schema: Option<SchemaRef>,
1947 validate_output: bool,
1948 ) -> LazyFrame {
1949 let opt_state = self.get_opt_state();
1950 let lp = self
1951 .get_plan_builder()
1952 .map_python(function, optimizations, schema, validate_output)
1953 .build();
1954 Self::from_logical_plan(lp, opt_state)
1955 }
1956
1957 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1958 let opt_state = self.get_opt_state();
1959 let lp = self.get_plan_builder().map_private(function).build();
1960 Self::from_logical_plan(lp, opt_state)
1961 }
1962
1963 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1972 where
1973 S: Into<PlSmallStr>,
1974 {
1975 let name = name.into();
1976
1977 match &self.logical_plan {
1978 v @ DslPlan::Scan {
1979 scan_type,
1980 unified_scan_args,
1981 ..
1982 } if unified_scan_args.row_index.is_none()
1983 && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1984 {
1985 let DslPlan::Scan {
1986 sources,
1987 mut unified_scan_args,
1988 scan_type,
1989 cached_ir: _,
1990 } = v.clone()
1991 else {
1992 unreachable!()
1993 };
1994
1995 unified_scan_args.row_index = Some(RowIndex {
1996 name,
1997 offset: offset.unwrap_or(0),
1998 });
1999
2000 DslPlan::Scan {
2001 sources,
2002 unified_scan_args,
2003 scan_type,
2004 cached_ir: Default::default(),
2005 }
2006 .into()
2007 },
2008 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2009 }
2010 }
2011
2012 pub fn count(self) -> LazyFrame {
2014 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2015 }
2016
2017 #[cfg(feature = "dtype-struct")]
2020 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2021 self.map_private(DslFunction::Unnest {
2022 columns: cols,
2023 separator,
2024 })
2025 }
2026
2027 #[cfg(feature = "merge_sorted")]
2028 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2029 where
2030 S: Into<PlSmallStr>,
2031 {
2032 let key = key.into();
2033
2034 let lp = DslPlan::MergeSorted {
2035 input_left: Arc::new(self.logical_plan),
2036 input_right: Arc::new(other.logical_plan),
2037 key,
2038 };
2039 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2040 }
2041
2042 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
2043 let lp = DslPlan::MapFunction {
2044 input: Arc::new(self.logical_plan),
2045 function: DslFunction::Hint(hint),
2046 };
2047 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2048 }
2049}
2050
2051#[derive(Clone)]
2053pub struct LazyGroupBy {
2054 pub logical_plan: DslPlan,
2055 opt_state: OptFlags,
2056 keys: Vec<Expr>,
2057 predicates: Vec<Expr>,
2058 maintain_order: bool,
2059 #[cfg(feature = "dynamic_group_by")]
2060 dynamic_options: Option<DynamicGroupOptions>,
2061 #[cfg(feature = "dynamic_group_by")]
2062 rolling_options: Option<RollingGroupOptions>,
2063}
2064
2065impl From<LazyGroupBy> for LazyFrame {
2066 fn from(lgb: LazyGroupBy) -> Self {
2067 Self {
2068 logical_plan: lgb.logical_plan,
2069 opt_state: lgb.opt_state,
2070 cached_arena: Default::default(),
2071 }
2072 }
2073}
2074
2075impl LazyGroupBy {
2076 pub fn having(mut self, predicate: Expr) -> Self {
2097 self.predicates.push(predicate);
2098 self
2099 }
2100
2101 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2123 #[cfg(feature = "dynamic_group_by")]
2124 let lp = DslBuilder::from(self.logical_plan)
2125 .group_by(
2126 self.keys,
2127 self.predicates,
2128 aggs,
2129 None,
2130 self.maintain_order,
2131 self.dynamic_options,
2132 self.rolling_options,
2133 )
2134 .build();
2135
2136 #[cfg(not(feature = "dynamic_group_by"))]
2137 let lp = DslBuilder::from(self.logical_plan)
2138 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2139 .build();
2140 LazyFrame::from_logical_plan(lp, self.opt_state)
2141 }
2142
2143 pub fn head(self, n: Option<usize>) -> LazyFrame {
2145 let keys = self
2146 .keys
2147 .iter()
2148 .filter_map(|expr| expr_output_name(expr).ok())
2149 .collect::<Vec<_>>();
2150
2151 self.agg([all().as_expr().head(n)]).explode_impl(
2152 all() - by_name(keys.iter().cloned(), false, false),
2153 ExplodeOptions {
2154 empty_as_null: true,
2155 keep_nulls: true,
2156 },
2157 true,
2158 )
2159 }
2160
2161 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2163 let keys = self
2164 .keys
2165 .iter()
2166 .filter_map(|expr| expr_output_name(expr).ok())
2167 .collect::<Vec<_>>();
2168
2169 self.agg([all().as_expr().tail(n)]).explode_impl(
2170 all() - by_name(keys.iter().cloned(), false, false),
2171 ExplodeOptions {
2172 empty_as_null: true,
2173 keep_nulls: true,
2174 },
2175 true,
2176 )
2177 }
2178
2179 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2184 if !self.predicates.is_empty() {
2185 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2186 }
2187
2188 #[cfg(feature = "dynamic_group_by")]
2189 let options = GroupbyOptions {
2190 dynamic: self.dynamic_options,
2191 rolling: self.rolling_options,
2192 slice: None,
2193 };
2194
2195 #[cfg(not(feature = "dynamic_group_by"))]
2196 let options = GroupbyOptions { slice: None };
2197
2198 let lp = DslPlan::GroupBy {
2199 input: Arc::new(self.logical_plan),
2200 keys: self.keys,
2201 predicates: vec![],
2202 aggs: vec![],
2203 apply: Some((f, schema)),
2204 maintain_order: self.maintain_order,
2205 options: Arc::new(options),
2206 };
2207 LazyFrame::from_logical_plan(lp, self.opt_state)
2208 }
2209}
2210
2211#[must_use]
2212pub struct JoinBuilder {
2213 lf: LazyFrame,
2214 how: JoinType,
2215 other: Option<LazyFrame>,
2216 left_on: Vec<Expr>,
2217 right_on: Vec<Expr>,
2218 allow_parallel: bool,
2219 force_parallel: bool,
2220 suffix: Option<PlSmallStr>,
2221 validation: JoinValidation,
2222 nulls_equal: bool,
2223 coalesce: JoinCoalesce,
2224 maintain_order: MaintainOrderJoin,
2225 build_side: Option<JoinBuildSide>,
2226}
2227impl JoinBuilder {
2228 pub fn new(lf: LazyFrame) -> Self {
2230 Self {
2231 lf,
2232 other: None,
2233 how: JoinType::Inner,
2234 left_on: vec![],
2235 right_on: vec![],
2236 allow_parallel: true,
2237 force_parallel: false,
2238 suffix: None,
2239 validation: Default::default(),
2240 nulls_equal: false,
2241 coalesce: Default::default(),
2242 maintain_order: Default::default(),
2243 build_side: None,
2244 }
2245 }
2246
2247 pub fn with(mut self, other: LazyFrame) -> Self {
2249 self.other = Some(other);
2250 self
2251 }
2252
2253 pub fn how(mut self, how: JoinType) -> Self {
2255 self.how = how;
2256 self
2257 }
2258
2259 pub fn validate(mut self, validation: JoinValidation) -> Self {
2260 self.validation = validation;
2261 self
2262 }
2263
2264 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2268 let on = on.as_ref().to_vec();
2269 self.left_on.clone_from(&on);
2270 self.right_on = on;
2271 self
2272 }
2273
2274 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2278 self.left_on = on.as_ref().to_vec();
2279 self
2280 }
2281
2282 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2286 self.right_on = on.as_ref().to_vec();
2287 self
2288 }
2289
2290 pub fn allow_parallel(mut self, allow: bool) -> Self {
2292 self.allow_parallel = allow;
2293 self
2294 }
2295
2296 pub fn force_parallel(mut self, force: bool) -> Self {
2298 self.force_parallel = force;
2299 self
2300 }
2301
2302 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2304 self.nulls_equal = nulls_equal;
2305 self
2306 }
2307
2308 pub fn suffix<S>(mut self, suffix: S) -> Self
2311 where
2312 S: Into<PlSmallStr>,
2313 {
2314 self.suffix = Some(suffix.into());
2315 self
2316 }
2317
2318 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2320 self.coalesce = coalesce;
2321 self
2322 }
2323
2324 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2326 self.maintain_order = maintain_order;
2327 self
2328 }
2329
2330 pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2332 self.build_side = build_side;
2333 self
2334 }
2335
2336 pub fn finish(self) -> LazyFrame {
2338 let opt_state = self.lf.opt_state;
2339 let other = self.other.expect("'with' not set in join builder");
2340
2341 let args = JoinArgs {
2342 how: self.how,
2343 validation: self.validation,
2344 suffix: self.suffix,
2345 slice: None,
2346 nulls_equal: self.nulls_equal,
2347 coalesce: self.coalesce,
2348 maintain_order: self.maintain_order,
2349 build_side: self.build_side,
2350 };
2351
2352 let lp = self
2353 .lf
2354 .get_plan_builder()
2355 .join(
2356 other.logical_plan,
2357 self.left_on,
2358 self.right_on,
2359 JoinOptions {
2360 allow_parallel: self.allow_parallel,
2361 force_parallel: self.force_parallel,
2362 args,
2363 }
2364 .into(),
2365 )
2366 .build();
2367 LazyFrame::from_logical_plan(lp, opt_state)
2368 }
2369
2370 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2372 let opt_state = self.lf.opt_state;
2373 let other = self.other.expect("with not set");
2374
2375 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2377 if let Expr::BinaryExpr {
2378 op: Operator::And,
2379 left,
2380 right,
2381 } = predicate
2382 {
2383 decompose_and((*left).clone(), expanded_predicates);
2384 decompose_and((*right).clone(), expanded_predicates);
2385 } else {
2386 expanded_predicates.push(predicate);
2387 }
2388 }
2389 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2390 for predicate in predicates {
2391 decompose_and(predicate, &mut expanded_predicates);
2392 }
2393 let predicates: Vec<Expr> = expanded_predicates;
2394
2395 #[cfg(feature = "is_between")]
2397 let predicates: Vec<Expr> = {
2398 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2399 for predicate in predicates {
2400 if let Expr::Function {
2401 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2402 input,
2403 ..
2404 } = &predicate
2405 {
2406 if let [expr, lower, upper] = input.as_slice() {
2407 match closed {
2408 ClosedInterval::Both => {
2409 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2410 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2411 },
2412 ClosedInterval::Right => {
2413 expanded_predicates.push(expr.clone().gt(lower.clone()));
2414 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2415 },
2416 ClosedInterval::Left => {
2417 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2418 expanded_predicates.push(expr.clone().lt(upper.clone()));
2419 },
2420 ClosedInterval::None => {
2421 expanded_predicates.push(expr.clone().gt(lower.clone()));
2422 expanded_predicates.push(expr.clone().lt(upper.clone()));
2423 },
2424 }
2425 continue;
2426 }
2427 }
2428 expanded_predicates.push(predicate);
2429 }
2430 expanded_predicates
2431 };
2432
2433 let args = JoinArgs {
2434 how: self.how,
2435 validation: self.validation,
2436 suffix: self.suffix,
2437 slice: None,
2438 nulls_equal: self.nulls_equal,
2439 coalesce: self.coalesce,
2440 maintain_order: self.maintain_order,
2441 build_side: self.build_side,
2442 };
2443 let options = JoinOptions {
2444 allow_parallel: self.allow_parallel,
2445 force_parallel: self.force_parallel,
2446 args,
2447 };
2448
2449 let lp = DslPlan::Join {
2450 input_left: Arc::new(self.lf.logical_plan),
2451 input_right: Arc::new(other.logical_plan),
2452 left_on: Default::default(),
2453 right_on: Default::default(),
2454 predicates,
2455 options: Arc::from(options),
2456 };
2457
2458 LazyFrame::from_logical_plan(lp, opt_state)
2459 }
2460}
2461
2462pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2463 #[cfg(not(feature = "new_streaming"))]
2464 {
2465 None
2466 }
2467 #[cfg(feature = "new_streaming")]
2468 {
2469 Some(polars_stream::build_streaming_query_executor)
2470 }
2471};
2472
2473pub struct CollectBatches {
2474 recv: Receiver<PolarsResult<DataFrame>>,
2475 runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2476}
2477
2478impl CollectBatches {
2479 pub fn start(&mut self) {
2481 if let Some(runner) = self.runner.take() {
2482 runner()
2483 }
2484 }
2485}
2486
2487impl Iterator for CollectBatches {
2488 type Item = PolarsResult<DataFrame>;
2489
2490 fn next(&mut self) -> Option<Self::Item> {
2491 self.start();
2492 self.recv.recv().ok()
2493 }
2494}