1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::POOL;
26use polars_core::error::feature_gated;
27use polars_core::prelude::*;
28use polars_io::RowIndex;
29use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
30use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
31use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
32#[cfg(feature = "is_between")]
33use polars_ops::prelude::ClosedInterval;
34pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
35use polars_utils::pl_str::PlSmallStr;
36use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
37
38use crate::frame::cached_arenas::CachedArena;
39use crate::prelude::*;
40
41pub trait IntoLazy {
42 fn lazy(self) -> LazyFrame;
43}
44
45impl IntoLazy for DataFrame {
46 fn lazy(self) -> LazyFrame {
48 let lp = DslBuilder::from_existing_df(self).build();
49 LazyFrame {
50 logical_plan: lp,
51 opt_state: Default::default(),
52 cached_arena: Default::default(),
53 }
54 }
55}
56
57impl IntoLazy for LazyFrame {
58 fn lazy(self) -> LazyFrame {
59 self
60 }
61}
62
63#[derive(Clone, Default)]
68#[must_use]
69pub struct LazyFrame {
70 pub logical_plan: DslPlan,
71 pub(crate) opt_state: OptFlags,
72 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
73}
74
75impl From<DslPlan> for LazyFrame {
76 fn from(plan: DslPlan) -> Self {
77 Self {
78 logical_plan: plan,
79 opt_state: OptFlags::default(),
80 cached_arena: Default::default(),
81 }
82 }
83}
84
85impl LazyFrame {
86 pub(crate) fn from_inner(
87 logical_plan: DslPlan,
88 opt_state: OptFlags,
89 cached_arena: Arc<Mutex<Option<CachedArena>>>,
90 ) -> Self {
91 Self {
92 logical_plan,
93 opt_state,
94 cached_arena,
95 }
96 }
97
98 pub(crate) fn get_plan_builder(self) -> DslBuilder {
99 DslBuilder::from(self.logical_plan)
100 }
101
102 fn get_opt_state(&self) -> OptFlags {
103 self.opt_state
104 }
105
106 pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
107 LazyFrame {
108 logical_plan,
109 opt_state,
110 cached_arena: Default::default(),
111 }
112 }
113
114 pub fn get_current_optimizations(&self) -> OptFlags {
116 self.opt_state
117 }
118
119 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
121 self.opt_state = opt_state;
122 self
123 }
124
125 pub fn without_optimizations(self) -> Self {
127 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
128 }
129
130 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
132 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
133 self
134 }
135
136 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
138 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
139 self
140 }
141
142 pub fn with_check_order(mut self, toggle: bool) -> Self {
145 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
146 self
147 }
148
149 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
151 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
152 self
153 }
154
155 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
157 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
158 self
159 }
160
161 pub fn with_type_check(mut self, toggle: bool) -> Self {
163 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
164 self
165 }
166
167 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
169 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
170 self
171 }
172
173 #[cfg(feature = "cse")]
175 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
176 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
177 self
178 }
179
180 #[cfg(feature = "cse")]
182 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
183 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
184 self
185 }
186
187 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
189 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
190 self
191 }
192
193 #[cfg(feature = "new_streaming")]
194 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
195 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
196 self
197 }
198
199 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
201 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
202 self
203 }
204
205 pub fn _with_eager(mut self, toggle: bool) -> Self {
207 self.opt_state.set(OptFlags::EAGER, toggle);
208 self
209 }
210
211 pub fn describe_plan(&self) -> PolarsResult<String> {
213 Ok(self.clone().to_alp()?.describe())
214 }
215
216 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
218 Ok(self.clone().to_alp()?.describe_tree_format())
219 }
220
221 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
225 Ok(self.clone().to_alp_optimized()?.describe())
226 }
227
228 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
232 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
233 }
234
235 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
240 if optimized {
241 self.describe_optimized_plan()
242 } else {
243 self.describe_plan()
244 }
245 }
246
247 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
287 let opt_state = self.get_opt_state();
288 let lp = self
289 .get_plan_builder()
290 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
291 .build();
292 Self::from_logical_plan(lp, opt_state)
293 }
294
295 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
315 self,
316 by_exprs: E,
317 sort_options: SortMultipleOptions,
318 ) -> Self {
319 let by_exprs = by_exprs.as_ref().to_vec();
320 if by_exprs.is_empty() {
321 self
322 } else {
323 let opt_state = self.get_opt_state();
324 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
325 Self::from_logical_plan(lp, opt_state)
326 }
327 }
328
329 pub fn top_k<E: AsRef<[Expr]>>(
330 self,
331 k: IdxSize,
332 by_exprs: E,
333 sort_options: SortMultipleOptions,
334 ) -> Self {
335 self.sort_by_exprs(
337 by_exprs,
338 sort_options.with_order_reversed().with_nulls_last(true),
339 )
340 .slice(0, k)
341 }
342
343 pub fn bottom_k<E: AsRef<[Expr]>>(
344 self,
345 k: IdxSize,
346 by_exprs: E,
347 sort_options: SortMultipleOptions,
348 ) -> Self {
349 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
351 .slice(0, k)
352 }
353
354 pub fn reverse(self) -> Self {
370 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
371 }
372
373 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
381 where
382 I: IntoIterator<Item = T>,
383 J: IntoIterator<Item = S>,
384 T: AsRef<str>,
385 S: AsRef<str>,
386 {
387 let iter = existing.into_iter();
388 let cap = iter.size_hint().0;
389 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
391
392 for (existing, new) in iter.zip(new) {
395 let existing = existing.as_ref();
396 let new = new.as_ref();
397 if new != existing {
398 existing_vec.push(existing.into());
399 new_vec.push(new.into());
400 }
401 }
402
403 self.map_private(DslFunction::Rename {
404 existing: existing_vec.into(),
405 new: new_vec.into(),
406 strict,
407 })
408 }
409
410 pub fn drop(self, columns: Selector) -> Self {
417 let opt_state = self.get_opt_state();
418 let lp = self.get_plan_builder().drop(columns).build();
419 Self::from_logical_plan(lp, opt_state)
420 }
421
422 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
427 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
428 }
429
430 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
435 self.select(vec![
436 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
437 ])
438 }
439
440 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
442 let opt_state = self.get_opt_state();
443 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
444 Self::from_logical_plan(lp, opt_state)
445 }
446
447 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
449 let opt_state = self.get_opt_state();
450 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
451 Self::from_logical_plan(lp, opt_state)
452 }
453
454 pub fn cache(self) -> Self {
458 let opt_state = self.get_opt_state();
459 let lp = self.get_plan_builder().cache().build();
460 Self::from_logical_plan(lp, opt_state)
461 }
462
463 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
465 let cast_cols: Vec<Expr> = dtypes
466 .into_iter()
467 .map(|(name, dt)| {
468 let name = PlSmallStr::from_str(name);
469
470 if strict {
471 col(name).strict_cast(dt)
472 } else {
473 col(name).cast(dt)
474 }
475 })
476 .collect();
477
478 if cast_cols.is_empty() {
479 self
480 } else {
481 self.with_columns(cast_cols)
482 }
483 }
484
485 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
487 self.with_columns(vec![if strict {
488 col(PlSmallStr::from_static("*")).strict_cast(dtype)
489 } else {
490 col(PlSmallStr::from_static("*")).cast(dtype)
491 }])
492 }
493
494 pub fn optimize(
495 self,
496 lp_arena: &mut Arena<IR>,
497 expr_arena: &mut Arena<AExpr>,
498 ) -> PolarsResult<Node> {
499 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
500 }
501
502 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
503 let (mut lp_arena, mut expr_arena) = self.get_arenas();
504 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
505
506 Ok(IRPlan::new(node, lp_arena, expr_arena))
507 }
508
509 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
510 let (mut lp_arena, mut expr_arena) = self.get_arenas();
511 let node = to_alp(
512 self.logical_plan,
513 &mut expr_arena,
514 &mut lp_arena,
515 &mut self.opt_state,
516 )?;
517 let plan = IRPlan::new(node, lp_arena, expr_arena);
518 Ok(plan)
519 }
520
521 pub(crate) fn optimize_with_scratch(
522 self,
523 lp_arena: &mut Arena<IR>,
524 expr_arena: &mut Arena<AExpr>,
525 scratch: &mut Vec<Node>,
526 ) -> PolarsResult<Node> {
527 let lp_top = optimize(
528 self.logical_plan,
529 self.opt_state,
530 lp_arena,
531 expr_arena,
532 scratch,
533 apply_scan_predicate_to_scan_ir,
534 )?;
535
536 Ok(lp_top)
537 }
538
539 fn prepare_collect_post_opt<P>(
540 mut self,
541 check_sink: bool,
542 query_start: Option<std::time::Instant>,
543 post_opt: P,
544 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
545 where
546 P: FnOnce(
547 Node,
548 &mut Arena<IR>,
549 &mut Arena<AExpr>,
550 Option<std::time::Duration>,
551 ) -> PolarsResult<()>,
552 {
553 let (mut lp_arena, mut expr_arena) = self.get_arenas();
554
555 let mut scratch = vec![];
556 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
557
558 post_opt(
559 lp_top,
560 &mut lp_arena,
561 &mut expr_arena,
562 query_start.map(|s| s.elapsed()),
565 )?;
566
567 let no_file_sink = if check_sink {
569 !matches!(
570 lp_arena.get(lp_top),
571 IR::Sink {
572 payload: SinkTypeIR::File { .. },
573 ..
574 }
575 )
576 } else {
577 true
578 };
579 let physical_plan = create_physical_plan(
580 lp_top,
581 &mut lp_arena,
582 &mut expr_arena,
583 BUILD_STREAMING_EXECUTOR,
584 )?;
585
586 let state = ExecutionState::new();
587 Ok((state, physical_plan, no_file_sink))
588 }
589
590 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
592 where
593 P: FnOnce(
594 Node,
595 &mut Arena<IR>,
596 &mut Arena<AExpr>,
597 Option<std::time::Duration>,
598 ) -> PolarsResult<()>,
599 {
600 let (mut state, mut physical_plan, _) =
601 self.prepare_collect_post_opt(false, None, post_opt)?;
602 physical_plan.execute(&mut state)
603 }
604
605 #[allow(unused_mut)]
606 fn prepare_collect(
607 self,
608 check_sink: bool,
609 query_start: Option<std::time::Instant>,
610 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
611 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
612 }
613
614 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
619 let payload = match &self.logical_plan {
620 DslPlan::Sink { payload, .. } => payload.clone(),
621 DslPlan::SinkMultiple { .. } => {
622 polars_ensure!(matches!(engine, Engine::Auto | Engine::Streaming), InvalidOperation: "lazy multisinks only supported on streaming engine");
623 feature_gated!("new_streaming", {
624 let sink_multiple = self.with_new_streaming(true);
625 let mut alp_plan = sink_multiple.to_alp_optimized()?;
626 let result = polars_stream::run_query(
627 alp_plan.lp_top,
628 &mut alp_plan.lp_arena,
629 &mut alp_plan.expr_arena,
630 );
631 return result.map(|_| DataFrame::empty());
632 })
633 },
634 _ => {
635 self.logical_plan = DslPlan::Sink {
636 input: Arc::new(self.logical_plan),
637 payload: SinkType::Memory,
638 };
639 SinkType::Memory
640 },
641 };
642
643 if engine == Engine::Auto {
645 engine = match payload {
646 #[cfg(feature = "new_streaming")]
647 SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
648 _ => Engine::InMemory,
649 };
650 }
651 if engine == Engine::Gpu {
653 engine = Engine::InMemory;
654 }
655
656 #[cfg(feature = "new_streaming")]
657 {
658 if let Some(result) = self.try_new_streaming_if_requested() {
659 return result.map(|v| v.unwrap_single());
660 }
661 }
662
663 match engine {
664 Engine::Auto => unreachable!(),
665 Engine::Streaming => {
666 feature_gated!("new_streaming", self = self.with_new_streaming(true))
667 },
668 _ => {},
669 }
670 let mut alp_plan = self.clone().to_alp_optimized()?;
671
672 match engine {
673 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
674 let result = polars_stream::run_query(
675 alp_plan.lp_top,
676 &mut alp_plan.lp_arena,
677 &mut alp_plan.expr_arena,
678 );
679 result.map(|v| v.unwrap_single())
680 }),
681 Engine::Gpu => {
682 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
683 },
684 Engine::InMemory => {
685 let mut physical_plan = create_physical_plan(
686 alp_plan.lp_top,
687 &mut alp_plan.lp_arena,
688 &mut alp_plan.expr_arena,
689 BUILD_STREAMING_EXECUTOR,
690 )?;
691 let mut state = ExecutionState::new();
692 physical_plan.execute(&mut state)
693 },
694 }
695 }
696
697 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
698 let sink_multiple = LazyFrame {
699 logical_plan: DslPlan::SinkMultiple { inputs: plans },
700 opt_state,
701 cached_arena: Default::default(),
702 };
703 sink_multiple.explain(true)
704 }
705
706 pub fn collect_all_with_engine(
707 plans: Vec<DslPlan>,
708 mut engine: Engine,
709 opt_state: OptFlags,
710 ) -> PolarsResult<Vec<DataFrame>> {
711 if plans.is_empty() {
712 return Ok(Vec::new());
713 }
714
715 if engine == Engine::Auto {
717 engine = Engine::InMemory;
718 }
719 if engine == Engine::Gpu {
721 engine = Engine::InMemory;
722 }
723
724 let mut sink_multiple = LazyFrame {
725 logical_plan: DslPlan::SinkMultiple { inputs: plans },
726 opt_state,
727 cached_arena: Default::default(),
728 };
729
730 #[cfg(feature = "new_streaming")]
731 {
732 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
733 return result.map(|v| v.unwrap_multiple());
734 }
735 }
736
737 match engine {
738 Engine::Auto => unreachable!(),
739 Engine::Streaming => {
740 feature_gated!(
741 "new_streaming",
742 sink_multiple = sink_multiple.with_new_streaming(true)
743 )
744 },
745 _ => {},
746 }
747 let mut alp_plan = sink_multiple.to_alp_optimized()?;
748
749 if engine == Engine::Streaming {
750 feature_gated!("new_streaming", {
751 let result = polars_stream::run_query(
752 alp_plan.lp_top,
753 &mut alp_plan.lp_arena,
754 &mut alp_plan.expr_arena,
755 );
756 return result.map(|v| v.unwrap_multiple());
757 });
758 }
759
760 let IR::SinkMultiple { inputs } = alp_plan.root() else {
761 unreachable!()
762 };
763
764 let mut multiplan = create_multiple_physical_plans(
765 inputs.clone().as_slice(),
766 &mut alp_plan.lp_arena,
767 &mut alp_plan.expr_arena,
768 BUILD_STREAMING_EXECUTOR,
769 )?;
770
771 match engine {
772 Engine::Gpu => polars_bail!(
773 InvalidOperation: "collect_all is not supported for the gpu engine"
774 ),
775 Engine::InMemory => {
776 let mut state = ExecutionState::new();
780 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
781 cache_prefiller.execute(&mut state)?;
782 }
783 let out = POOL.install(|| {
784 multiplan
785 .physical_plans
786 .chunks_mut(POOL.current_num_threads() * 3)
787 .map(|chunk| {
788 chunk
789 .into_par_iter()
790 .enumerate()
791 .map(|(idx, input)| {
792 let mut input = std::mem::take(input);
793 let mut state = state.split();
794 state.branch_idx += idx;
795
796 let df = input.execute(&mut state)?;
797 Ok(df)
798 })
799 .collect::<PolarsResult<Vec<_>>>()
800 })
801 .collect::<PolarsResult<Vec<_>>>()
802 });
803 Ok(out?.into_iter().flatten().collect())
804 },
805 _ => unreachable!(),
806 }
807 }
808
809 pub fn collect(self) -> PolarsResult<DataFrame> {
827 self.collect_with_engine(Engine::InMemory)
828 }
829
830 #[cfg(feature = "async")]
835 pub fn collect_batches(
836 self,
837 engine: Engine,
838 maintain_order: bool,
839 chunk_size: Option<NonZeroUsize>,
840 lazy: bool,
841 ) -> PolarsResult<CollectBatches> {
842 let (send, recv) = sync_channel(1);
843 let runner_send = send.clone();
844 let ldf = self.sink_batches(
845 PlanCallback::new(move |df| {
846 let send_result = send.send(Ok(df));
848 Ok(send_result.is_err())
849 }),
850 maintain_order,
851 chunk_size,
852 )?;
853 let runner = move || {
854 polars_io::pl_async::get_runtime().spawn_blocking(move || {
857 if let Err(e) = ldf.collect_with_engine(engine) {
858 runner_send.send(Err(e)).ok();
859 }
860 });
861 };
862
863 let mut collect_batches = CollectBatches {
864 recv,
865 runner: Some(Box::new(runner)),
866 };
867 if !lazy {
868 collect_batches.start();
869 }
870 Ok(collect_batches)
871 }
872
873 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
876 where
877 P: FnOnce(
878 Node,
879 &mut Arena<IR>,
880 &mut Arena<AExpr>,
881 Option<std::time::Duration>,
882 ) -> PolarsResult<()>,
883 {
884 let query_start = std::time::Instant::now();
885 let (mut state, mut physical_plan, _) =
886 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
887 state.time_nodes(query_start);
888 let out = physical_plan.execute(&mut state)?;
889 let timer_df = state.finish_timer()?;
890 Ok((out, timer_df))
891 }
892
893 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
901 self._profile_post_opt(|_, _, _, _| Ok(()))
902 }
903
904 pub fn sink_batches(
905 mut self,
906 function: PlanCallback<DataFrame, bool>,
907 maintain_order: bool,
908 chunk_size: Option<NonZeroUsize>,
909 ) -> PolarsResult<Self> {
910 use polars_plan::prelude::sink::CallbackSinkType;
911
912 polars_ensure!(
913 !matches!(self.logical_plan, DslPlan::Sink { .. }),
914 InvalidOperation: "cannot create a sink on top of another sink"
915 );
916
917 self.logical_plan = DslPlan::Sink {
918 input: Arc::new(self.logical_plan),
919 payload: SinkType::Callback(CallbackSinkType {
920 function,
921 maintain_order,
922 chunk_size,
923 }),
924 };
925
926 Ok(self)
927 }
928
929 #[cfg(feature = "new_streaming")]
930 pub fn try_new_streaming_if_requested(
931 &mut self,
932 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
933 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
934 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
935
936 if auto_new_streaming || force_new_streaming {
937 let mut new_stream_lazy = self.clone();
940 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
941 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
942 Ok(v) => v,
943 Err(e) => return Some(Err(e)),
944 };
945
946 let f = || {
947 polars_stream::run_query(
948 alp_plan.lp_top,
949 &mut alp_plan.lp_arena,
950 &mut alp_plan.expr_arena,
951 )
952 };
953
954 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
955 Ok(v) => return Some(v),
956 Err(e) => {
957 if !force_new_streaming
960 && auto_new_streaming
961 && e.downcast_ref::<&str>()
962 .map(|s| s.starts_with("not yet implemented"))
963 .unwrap_or(false)
964 {
965 if polars_core::config::verbose() {
966 eprintln!(
967 "caught unimplemented error in new streaming engine, falling back to normal engine"
968 );
969 }
970 } else {
971 std::panic::resume_unwind(e);
972 }
973 },
974 }
975 }
976
977 None
978 }
979
980 pub fn sink(
981 mut self,
982 sink_type: SinkDestination,
983 file_format: FileWriteFormat,
984 unified_sink_args: UnifiedSinkArgs,
985 ) -> PolarsResult<Self> {
986 polars_ensure!(
987 !matches!(self.logical_plan, DslPlan::Sink { .. }),
988 InvalidOperation: "cannot create a sink on top of another sink"
989 );
990
991 self.logical_plan = DslPlan::Sink {
992 input: Arc::new(self.logical_plan),
993 payload: match sink_type {
994 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
995 target,
996 file_format,
997 unified_sink_args,
998 }),
999 SinkDestination::Partitioned {
1000 base_path,
1001 file_path_provider,
1002 partition_strategy,
1003 max_rows_per_file,
1004 approximate_bytes_per_file,
1005 } => SinkType::Partitioned(PartitionedSinkOptions {
1006 base_path,
1007 file_path_provider,
1008 partition_strategy,
1009 file_format,
1010 unified_sink_args,
1011 max_rows_per_file,
1012 approximate_bytes_per_file,
1013 }),
1014 },
1015 };
1016 Ok(self)
1017 }
1018
1019 pub fn filter(self, predicate: Expr) -> Self {
1037 let opt_state = self.get_opt_state();
1038 let lp = self.get_plan_builder().filter(predicate).build();
1039 Self::from_logical_plan(lp, opt_state)
1040 }
1041
1042 pub fn remove(self, predicate: Expr) -> Self {
1060 self.filter(predicate.neq_missing(lit(true)))
1061 }
1062
1063 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1089 let exprs = exprs.as_ref().to_vec();
1090 self.select_impl(
1091 exprs,
1092 ProjectionOptions {
1093 run_parallel: true,
1094 duplicate_check: true,
1095 should_broadcast: true,
1096 },
1097 )
1098 }
1099
1100 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1101 let exprs = exprs.as_ref().to_vec();
1102 self.select_impl(
1103 exprs,
1104 ProjectionOptions {
1105 run_parallel: false,
1106 duplicate_check: true,
1107 should_broadcast: true,
1108 },
1109 )
1110 }
1111
1112 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1113 let opt_state = self.get_opt_state();
1114 let lp = self.get_plan_builder().project(exprs, options).build();
1115 Self::from_logical_plan(lp, opt_state)
1116 }
1117
1118 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1139 let keys = by
1140 .as_ref()
1141 .iter()
1142 .map(|e| e.clone().into())
1143 .collect::<Vec<_>>();
1144 let opt_state = self.get_opt_state();
1145
1146 #[cfg(feature = "dynamic_group_by")]
1147 {
1148 LazyGroupBy {
1149 logical_plan: self.logical_plan,
1150 opt_state,
1151 keys,
1152 predicates: vec![],
1153 maintain_order: false,
1154 dynamic_options: None,
1155 rolling_options: None,
1156 }
1157 }
1158
1159 #[cfg(not(feature = "dynamic_group_by"))]
1160 {
1161 LazyGroupBy {
1162 logical_plan: self.logical_plan,
1163 opt_state,
1164 keys,
1165 predicates: vec![],
1166 maintain_order: false,
1167 }
1168 }
1169 }
1170
1171 #[cfg(feature = "dynamic_group_by")]
1179 pub fn rolling<E: AsRef<[Expr]>>(
1180 mut self,
1181 index_column: Expr,
1182 group_by: E,
1183 mut options: RollingGroupOptions,
1184 ) -> LazyGroupBy {
1185 if let Expr::Column(name) = index_column {
1186 options.index_column = name;
1187 } else {
1188 let output_field = index_column
1189 .to_field(&self.collect_schema().unwrap())
1190 .unwrap();
1191 return self.with_column(index_column).rolling(
1192 Expr::Column(output_field.name().clone()),
1193 group_by,
1194 options,
1195 );
1196 }
1197 let opt_state = self.get_opt_state();
1198 LazyGroupBy {
1199 logical_plan: self.logical_plan,
1200 opt_state,
1201 predicates: vec![],
1202 keys: group_by.as_ref().to_vec(),
1203 maintain_order: true,
1204 dynamic_options: None,
1205 rolling_options: Some(options),
1206 }
1207 }
1208
1209 #[cfg(feature = "dynamic_group_by")]
1225 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1226 mut self,
1227 index_column: Expr,
1228 group_by: E,
1229 mut options: DynamicGroupOptions,
1230 ) -> LazyGroupBy {
1231 if let Expr::Column(name) = index_column {
1232 options.index_column = name;
1233 } else {
1234 let output_field = index_column
1235 .to_field(&self.collect_schema().unwrap())
1236 .unwrap();
1237 return self.with_column(index_column).group_by_dynamic(
1238 Expr::Column(output_field.name().clone()),
1239 group_by,
1240 options,
1241 );
1242 }
1243 let opt_state = self.get_opt_state();
1244 LazyGroupBy {
1245 logical_plan: self.logical_plan,
1246 opt_state,
1247 predicates: vec![],
1248 keys: group_by.as_ref().to_vec(),
1249 maintain_order: true,
1250 dynamic_options: Some(options),
1251 rolling_options: None,
1252 }
1253 }
1254
1255 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1257 let keys = by
1258 .as_ref()
1259 .iter()
1260 .map(|e| e.clone().into())
1261 .collect::<Vec<_>>();
1262 let opt_state = self.get_opt_state();
1263
1264 #[cfg(feature = "dynamic_group_by")]
1265 {
1266 LazyGroupBy {
1267 logical_plan: self.logical_plan,
1268 opt_state,
1269 keys,
1270 predicates: vec![],
1271 maintain_order: true,
1272 dynamic_options: None,
1273 rolling_options: None,
1274 }
1275 }
1276
1277 #[cfg(not(feature = "dynamic_group_by"))]
1278 {
1279 LazyGroupBy {
1280 logical_plan: self.logical_plan,
1281 opt_state,
1282 keys,
1283 predicates: vec![],
1284 maintain_order: true,
1285 }
1286 }
1287 }
1288
1289 #[cfg(feature = "semi_anti_join")]
1306 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1307 self.join(
1308 other,
1309 [left_on.into()],
1310 [right_on.into()],
1311 JoinArgs::new(JoinType::Anti),
1312 )
1313 }
1314
1315 #[cfg(feature = "cross_join")]
1317 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1318 self.join(
1319 other,
1320 vec![],
1321 vec![],
1322 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1323 )
1324 }
1325
1326 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1343 self.join(
1344 other,
1345 [left_on.into()],
1346 [right_on.into()],
1347 JoinArgs::new(JoinType::Left),
1348 )
1349 }
1350
1351 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1368 self.join(
1369 other,
1370 [left_on.into()],
1371 [right_on.into()],
1372 JoinArgs::new(JoinType::Inner),
1373 )
1374 }
1375
1376 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1393 self.join(
1394 other,
1395 [left_on.into()],
1396 [right_on.into()],
1397 JoinArgs::new(JoinType::Full),
1398 )
1399 }
1400
1401 #[cfg(feature = "semi_anti_join")]
1418 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1419 self.join(
1420 other,
1421 [left_on.into()],
1422 [right_on.into()],
1423 JoinArgs::new(JoinType::Semi),
1424 )
1425 }
1426
1427 pub fn join<E: AsRef<[Expr]>>(
1449 self,
1450 other: LazyFrame,
1451 left_on: E,
1452 right_on: E,
1453 args: JoinArgs,
1454 ) -> LazyFrame {
1455 let left_on = left_on.as_ref().to_vec();
1456 let right_on = right_on.as_ref().to_vec();
1457
1458 self._join_impl(other, left_on, right_on, args)
1459 }
1460
1461 fn _join_impl(
1462 self,
1463 other: LazyFrame,
1464 left_on: Vec<Expr>,
1465 right_on: Vec<Expr>,
1466 args: JoinArgs,
1467 ) -> LazyFrame {
1468 let JoinArgs {
1469 how,
1470 validation,
1471 suffix,
1472 slice,
1473 nulls_equal,
1474 coalesce,
1475 maintain_order,
1476 } = args;
1477
1478 if slice.is_some() {
1479 panic!("impl error: slice is not handled")
1480 }
1481
1482 let mut builder = self
1483 .join_builder()
1484 .with(other)
1485 .left_on(left_on)
1486 .right_on(right_on)
1487 .how(how)
1488 .validate(validation)
1489 .join_nulls(nulls_equal)
1490 .coalesce(coalesce)
1491 .maintain_order(maintain_order);
1492
1493 if let Some(suffix) = suffix {
1494 builder = builder.suffix(suffix);
1495 }
1496
1497 builder.finish()
1499 }
1500
1501 pub fn join_builder(self) -> JoinBuilder {
1507 JoinBuilder::new(self)
1508 }
1509
1510 pub fn with_column(self, expr: Expr) -> LazyFrame {
1528 let opt_state = self.get_opt_state();
1529 let lp = self
1530 .get_plan_builder()
1531 .with_columns(
1532 vec![expr],
1533 ProjectionOptions {
1534 run_parallel: false,
1535 duplicate_check: true,
1536 should_broadcast: true,
1537 },
1538 )
1539 .build();
1540 Self::from_logical_plan(lp, opt_state)
1541 }
1542
1543 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1558 let exprs = exprs.as_ref().to_vec();
1559 self.with_columns_impl(
1560 exprs,
1561 ProjectionOptions {
1562 run_parallel: true,
1563 duplicate_check: true,
1564 should_broadcast: true,
1565 },
1566 )
1567 }
1568
1569 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1571 let exprs = exprs.as_ref().to_vec();
1572 self.with_columns_impl(
1573 exprs,
1574 ProjectionOptions {
1575 run_parallel: false,
1576 duplicate_check: true,
1577 should_broadcast: true,
1578 },
1579 )
1580 }
1581
1582 pub fn match_to_schema(
1584 self,
1585 schema: SchemaRef,
1586 per_column: Arc<[MatchToSchemaPerColumn]>,
1587 extra_columns: ExtraColumnsPolicy,
1588 ) -> LazyFrame {
1589 let opt_state = self.get_opt_state();
1590 let lp = self
1591 .get_plan_builder()
1592 .match_to_schema(schema, per_column, extra_columns)
1593 .build();
1594 Self::from_logical_plan(lp, opt_state)
1595 }
1596
1597 pub fn pipe_with_schema(
1598 self,
1599 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1600 ) -> Self {
1601 let opt_state = self.get_opt_state();
1602 let lp = self
1603 .get_plan_builder()
1604 .pipe_with_schema(vec![], callback)
1605 .build();
1606 Self::from_logical_plan(lp, opt_state)
1607 }
1608
1609 pub fn pipe_with_schemas(
1610 self,
1611 others: Vec<LazyFrame>,
1612 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1613 ) -> Self {
1614 let opt_state = self.get_opt_state();
1615 let lp = self
1616 .get_plan_builder()
1617 .pipe_with_schema(
1618 others.into_iter().map(|lf| lf.logical_plan).collect(),
1619 callback,
1620 )
1621 .build();
1622 Self::from_logical_plan(lp, opt_state)
1623 }
1624
1625 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1626 let opt_state = self.get_opt_state();
1627 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1628 Self::from_logical_plan(lp, opt_state)
1629 }
1630
1631 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1632 let contexts = contexts
1633 .as_ref()
1634 .iter()
1635 .map(|lf| lf.logical_plan.clone())
1636 .collect();
1637 let opt_state = self.get_opt_state();
1638 let lp = self.get_plan_builder().with_context(contexts).build();
1639 Self::from_logical_plan(lp, opt_state)
1640 }
1641
1642 pub fn max(self) -> Self {
1646 self.map_private(DslFunction::Stats(StatsFunction::Max))
1647 }
1648
1649 pub fn min(self) -> Self {
1653 self.map_private(DslFunction::Stats(StatsFunction::Min))
1654 }
1655
1656 pub fn sum(self) -> Self {
1666 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1667 }
1668
1669 pub fn mean(self) -> Self {
1674 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1675 }
1676
1677 pub fn median(self) -> Self {
1683 self.map_private(DslFunction::Stats(StatsFunction::Median))
1684 }
1685
1686 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1688 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1689 quantile,
1690 method,
1691 }))
1692 }
1693
1694 pub fn std(self, ddof: u8) -> Self {
1707 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1708 }
1709
1710 pub fn var(self, ddof: u8) -> Self {
1720 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1721 }
1722
1723 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1725 self.explode_impl(columns, options, false)
1726 }
1727
1728 fn explode_impl(
1730 self,
1731 columns: Selector,
1732 options: ExplodeOptions,
1733 allow_empty: bool,
1734 ) -> LazyFrame {
1735 let opt_state = self.get_opt_state();
1736 let lp = self
1737 .get_plan_builder()
1738 .explode(columns, options, allow_empty)
1739 .build();
1740 Self::from_logical_plan(lp, opt_state)
1741 }
1742
1743 pub fn null_count(self) -> LazyFrame {
1745 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1746 }
1747
1748 pub fn unique_stable(
1753 self,
1754 subset: Option<Selector>,
1755 keep_strategy: UniqueKeepStrategy,
1756 ) -> LazyFrame {
1757 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1758 self.unique_stable_generic(subset, keep_strategy)
1759 }
1760
1761 pub fn unique_stable_generic(
1762 self,
1763 subset: Option<Vec<Expr>>,
1764 keep_strategy: UniqueKeepStrategy,
1765 ) -> LazyFrame {
1766 let opt_state = self.get_opt_state();
1767 let options = DistinctOptionsDSL {
1768 subset,
1769 maintain_order: true,
1770 keep_strategy,
1771 };
1772 let lp = self.get_plan_builder().distinct(options).build();
1773 Self::from_logical_plan(lp, opt_state)
1774 }
1775
1776 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1784 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1785 self.unique_generic(subset, keep_strategy)
1786 }
1787
1788 pub fn unique_generic(
1789 self,
1790 subset: Option<Vec<Expr>>,
1791 keep_strategy: UniqueKeepStrategy,
1792 ) -> LazyFrame {
1793 let opt_state = self.get_opt_state();
1794 let options = DistinctOptionsDSL {
1795 subset,
1796 maintain_order: false,
1797 keep_strategy,
1798 };
1799 let lp = self.get_plan_builder().distinct(options).build();
1800 Self::from_logical_plan(lp, opt_state)
1801 }
1802
1803 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1808 let opt_state = self.get_opt_state();
1809 let lp = self.get_plan_builder().drop_nans(subset).build();
1810 Self::from_logical_plan(lp, opt_state)
1811 }
1812
1813 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1818 let opt_state = self.get_opt_state();
1819 let lp = self.get_plan_builder().drop_nulls(subset).build();
1820 Self::from_logical_plan(lp, opt_state)
1821 }
1822
1823 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1833 let opt_state = self.get_opt_state();
1834 let lp = self.get_plan_builder().slice(offset, len).build();
1835 Self::from_logical_plan(lp, opt_state)
1836 }
1837
1838 pub fn first(self) -> LazyFrame {
1842 self.slice(0, 1)
1843 }
1844
1845 pub fn last(self) -> LazyFrame {
1849 self.slice(-1, 1)
1850 }
1851
1852 pub fn tail(self, n: IdxSize) -> LazyFrame {
1856 let neg_tail = -(n as i64);
1857 self.slice(neg_tail, n)
1858 }
1859
1860 #[cfg(feature = "pivot")]
1861 #[expect(clippy::too_many_arguments)]
1862 pub fn pivot(
1863 self,
1864 on: Selector,
1865 on_columns: Arc<DataFrame>,
1866 index: Selector,
1867 values: Selector,
1868 agg: Expr,
1869 maintain_order: bool,
1870 separator: PlSmallStr,
1871 ) -> LazyFrame {
1872 let opt_state = self.get_opt_state();
1873 let lp = self
1874 .get_plan_builder()
1875 .pivot(
1876 on,
1877 on_columns,
1878 index,
1879 values,
1880 agg,
1881 maintain_order,
1882 separator,
1883 )
1884 .build();
1885 Self::from_logical_plan(lp, opt_state)
1886 }
1887
1888 #[cfg(feature = "pivot")]
1892 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1893 let opt_state = self.get_opt_state();
1894 let lp = self.get_plan_builder().unpivot(args).build();
1895 Self::from_logical_plan(lp, opt_state)
1896 }
1897
1898 pub fn limit(self, n: IdxSize) -> LazyFrame {
1900 self.slice(0, n)
1901 }
1902
1903 pub fn map<F>(
1917 self,
1918 function: F,
1919 optimizations: AllowedOptimizations,
1920 schema: Option<Arc<dyn UdfSchema>>,
1921 name: Option<&'static str>,
1922 ) -> LazyFrame
1923 where
1924 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1925 {
1926 let opt_state = self.get_opt_state();
1927 let lp = self
1928 .get_plan_builder()
1929 .map(
1930 function,
1931 optimizations,
1932 schema,
1933 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1934 )
1935 .build();
1936 Self::from_logical_plan(lp, opt_state)
1937 }
1938
1939 #[cfg(feature = "python")]
1940 pub fn map_python(
1941 self,
1942 function: polars_utils::python_function::PythonFunction,
1943 optimizations: AllowedOptimizations,
1944 schema: Option<SchemaRef>,
1945 validate_output: bool,
1946 ) -> LazyFrame {
1947 let opt_state = self.get_opt_state();
1948 let lp = self
1949 .get_plan_builder()
1950 .map_python(function, optimizations, schema, validate_output)
1951 .build();
1952 Self::from_logical_plan(lp, opt_state)
1953 }
1954
1955 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1956 let opt_state = self.get_opt_state();
1957 let lp = self.get_plan_builder().map_private(function).build();
1958 Self::from_logical_plan(lp, opt_state)
1959 }
1960
1961 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1970 where
1971 S: Into<PlSmallStr>,
1972 {
1973 let name = name.into();
1974
1975 match &self.logical_plan {
1976 v @ DslPlan::Scan {
1977 scan_type,
1978 unified_scan_args,
1979 ..
1980 } if unified_scan_args.row_index.is_none()
1981 && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1982 {
1983 let DslPlan::Scan {
1984 sources,
1985 mut unified_scan_args,
1986 scan_type,
1987 cached_ir: _,
1988 } = v.clone()
1989 else {
1990 unreachable!()
1991 };
1992
1993 unified_scan_args.row_index = Some(RowIndex {
1994 name,
1995 offset: offset.unwrap_or(0),
1996 });
1997
1998 DslPlan::Scan {
1999 sources,
2000 unified_scan_args,
2001 scan_type,
2002 cached_ir: Default::default(),
2003 }
2004 .into()
2005 },
2006 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2007 }
2008 }
2009
2010 pub fn count(self) -> LazyFrame {
2012 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2013 }
2014
2015 #[cfg(feature = "dtype-struct")]
2018 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2019 self.map_private(DslFunction::Unnest {
2020 columns: cols,
2021 separator,
2022 })
2023 }
2024
2025 #[cfg(feature = "merge_sorted")]
2026 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2027 where
2028 S: Into<PlSmallStr>,
2029 {
2030 let key = key.into();
2031
2032 let lp = DslPlan::MergeSorted {
2033 input_left: Arc::new(self.logical_plan),
2034 input_right: Arc::new(other.logical_plan),
2035 key,
2036 };
2037 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2038 }
2039
2040 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
2041 let lp = DslPlan::MapFunction {
2042 input: Arc::new(self.logical_plan),
2043 function: DslFunction::Hint(hint),
2044 };
2045 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2046 }
2047}
2048
2049#[derive(Clone)]
2051pub struct LazyGroupBy {
2052 pub logical_plan: DslPlan,
2053 opt_state: OptFlags,
2054 keys: Vec<Expr>,
2055 predicates: Vec<Expr>,
2056 maintain_order: bool,
2057 #[cfg(feature = "dynamic_group_by")]
2058 dynamic_options: Option<DynamicGroupOptions>,
2059 #[cfg(feature = "dynamic_group_by")]
2060 rolling_options: Option<RollingGroupOptions>,
2061}
2062
2063impl From<LazyGroupBy> for LazyFrame {
2064 fn from(lgb: LazyGroupBy) -> Self {
2065 Self {
2066 logical_plan: lgb.logical_plan,
2067 opt_state: lgb.opt_state,
2068 cached_arena: Default::default(),
2069 }
2070 }
2071}
2072
2073impl LazyGroupBy {
2074 pub fn having(mut self, predicate: Expr) -> Self {
2095 self.predicates.push(predicate);
2096 self
2097 }
2098
2099 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2121 #[cfg(feature = "dynamic_group_by")]
2122 let lp = DslBuilder::from(self.logical_plan)
2123 .group_by(
2124 self.keys,
2125 self.predicates,
2126 aggs,
2127 None,
2128 self.maintain_order,
2129 self.dynamic_options,
2130 self.rolling_options,
2131 )
2132 .build();
2133
2134 #[cfg(not(feature = "dynamic_group_by"))]
2135 let lp = DslBuilder::from(self.logical_plan)
2136 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2137 .build();
2138 LazyFrame::from_logical_plan(lp, self.opt_state)
2139 }
2140
2141 pub fn head(self, n: Option<usize>) -> LazyFrame {
2143 let keys = self
2144 .keys
2145 .iter()
2146 .filter_map(|expr| expr_output_name(expr).ok())
2147 .collect::<Vec<_>>();
2148
2149 self.agg([all().as_expr().head(n)]).explode_impl(
2150 all() - by_name(keys.iter().cloned(), false),
2151 ExplodeOptions {
2152 empty_as_null: true,
2153 keep_nulls: true,
2154 },
2155 true,
2156 )
2157 }
2158
2159 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2161 let keys = self
2162 .keys
2163 .iter()
2164 .filter_map(|expr| expr_output_name(expr).ok())
2165 .collect::<Vec<_>>();
2166
2167 self.agg([all().as_expr().tail(n)]).explode_impl(
2168 all() - by_name(keys.iter().cloned(), false),
2169 ExplodeOptions {
2170 empty_as_null: true,
2171 keep_nulls: true,
2172 },
2173 true,
2174 )
2175 }
2176
2177 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2182 if !self.predicates.is_empty() {
2183 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2184 }
2185
2186 #[cfg(feature = "dynamic_group_by")]
2187 let options = GroupbyOptions {
2188 dynamic: self.dynamic_options,
2189 rolling: self.rolling_options,
2190 slice: None,
2191 };
2192
2193 #[cfg(not(feature = "dynamic_group_by"))]
2194 let options = GroupbyOptions { slice: None };
2195
2196 let lp = DslPlan::GroupBy {
2197 input: Arc::new(self.logical_plan),
2198 keys: self.keys,
2199 predicates: vec![],
2200 aggs: vec![],
2201 apply: Some((f, schema)),
2202 maintain_order: self.maintain_order,
2203 options: Arc::new(options),
2204 };
2205 LazyFrame::from_logical_plan(lp, self.opt_state)
2206 }
2207}
2208
2209#[must_use]
2210pub struct JoinBuilder {
2211 lf: LazyFrame,
2212 how: JoinType,
2213 other: Option<LazyFrame>,
2214 left_on: Vec<Expr>,
2215 right_on: Vec<Expr>,
2216 allow_parallel: bool,
2217 force_parallel: bool,
2218 suffix: Option<PlSmallStr>,
2219 validation: JoinValidation,
2220 nulls_equal: bool,
2221 coalesce: JoinCoalesce,
2222 maintain_order: MaintainOrderJoin,
2223}
2224impl JoinBuilder {
2225 pub fn new(lf: LazyFrame) -> Self {
2227 Self {
2228 lf,
2229 other: None,
2230 how: JoinType::Inner,
2231 left_on: vec![],
2232 right_on: vec![],
2233 allow_parallel: true,
2234 force_parallel: false,
2235 suffix: None,
2236 validation: Default::default(),
2237 nulls_equal: false,
2238 coalesce: Default::default(),
2239 maintain_order: Default::default(),
2240 }
2241 }
2242
2243 pub fn with(mut self, other: LazyFrame) -> Self {
2245 self.other = Some(other);
2246 self
2247 }
2248
2249 pub fn how(mut self, how: JoinType) -> Self {
2251 self.how = how;
2252 self
2253 }
2254
2255 pub fn validate(mut self, validation: JoinValidation) -> Self {
2256 self.validation = validation;
2257 self
2258 }
2259
2260 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2264 let on = on.as_ref().to_vec();
2265 self.left_on.clone_from(&on);
2266 self.right_on = on;
2267 self
2268 }
2269
2270 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2274 self.left_on = on.as_ref().to_vec();
2275 self
2276 }
2277
2278 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2282 self.right_on = on.as_ref().to_vec();
2283 self
2284 }
2285
2286 pub fn allow_parallel(mut self, allow: bool) -> Self {
2288 self.allow_parallel = allow;
2289 self
2290 }
2291
2292 pub fn force_parallel(mut self, force: bool) -> Self {
2294 self.force_parallel = force;
2295 self
2296 }
2297
2298 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2300 self.nulls_equal = nulls_equal;
2301 self
2302 }
2303
2304 pub fn suffix<S>(mut self, suffix: S) -> Self
2307 where
2308 S: Into<PlSmallStr>,
2309 {
2310 self.suffix = Some(suffix.into());
2311 self
2312 }
2313
2314 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2316 self.coalesce = coalesce;
2317 self
2318 }
2319
2320 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2322 self.maintain_order = maintain_order;
2323 self
2324 }
2325
2326 pub fn finish(self) -> LazyFrame {
2328 let opt_state = self.lf.opt_state;
2329 let other = self.other.expect("'with' not set in join builder");
2330
2331 let args = JoinArgs {
2332 how: self.how,
2333 validation: self.validation,
2334 suffix: self.suffix,
2335 slice: None,
2336 nulls_equal: self.nulls_equal,
2337 coalesce: self.coalesce,
2338 maintain_order: self.maintain_order,
2339 };
2340
2341 let lp = self
2342 .lf
2343 .get_plan_builder()
2344 .join(
2345 other.logical_plan,
2346 self.left_on,
2347 self.right_on,
2348 JoinOptions {
2349 allow_parallel: self.allow_parallel,
2350 force_parallel: self.force_parallel,
2351 args,
2352 }
2353 .into(),
2354 )
2355 .build();
2356 LazyFrame::from_logical_plan(lp, opt_state)
2357 }
2358
2359 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2361 let opt_state = self.lf.opt_state;
2362 let other = self.other.expect("with not set");
2363
2364 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2366 if let Expr::BinaryExpr {
2367 op: Operator::And,
2368 left,
2369 right,
2370 } = predicate
2371 {
2372 decompose_and((*left).clone(), expanded_predicates);
2373 decompose_and((*right).clone(), expanded_predicates);
2374 } else {
2375 expanded_predicates.push(predicate);
2376 }
2377 }
2378 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2379 for predicate in predicates {
2380 decompose_and(predicate, &mut expanded_predicates);
2381 }
2382 let predicates: Vec<Expr> = expanded_predicates;
2383
2384 #[cfg(feature = "is_between")]
2386 let predicates: Vec<Expr> = {
2387 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2388 for predicate in predicates {
2389 if let Expr::Function {
2390 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2391 input,
2392 ..
2393 } = &predicate
2394 {
2395 if let [expr, lower, upper] = input.as_slice() {
2396 match closed {
2397 ClosedInterval::Both => {
2398 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2399 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2400 },
2401 ClosedInterval::Right => {
2402 expanded_predicates.push(expr.clone().gt(lower.clone()));
2403 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2404 },
2405 ClosedInterval::Left => {
2406 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2407 expanded_predicates.push(expr.clone().lt(upper.clone()));
2408 },
2409 ClosedInterval::None => {
2410 expanded_predicates.push(expr.clone().gt(lower.clone()));
2411 expanded_predicates.push(expr.clone().lt(upper.clone()));
2412 },
2413 }
2414 continue;
2415 }
2416 }
2417 expanded_predicates.push(predicate);
2418 }
2419 expanded_predicates
2420 };
2421
2422 let args = JoinArgs {
2423 how: self.how,
2424 validation: self.validation,
2425 suffix: self.suffix,
2426 slice: None,
2427 nulls_equal: self.nulls_equal,
2428 coalesce: self.coalesce,
2429 maintain_order: self.maintain_order,
2430 };
2431 let options = JoinOptions {
2432 allow_parallel: self.allow_parallel,
2433 force_parallel: self.force_parallel,
2434 args,
2435 };
2436
2437 let lp = DslPlan::Join {
2438 input_left: Arc::new(self.lf.logical_plan),
2439 input_right: Arc::new(other.logical_plan),
2440 left_on: Default::default(),
2441 right_on: Default::default(),
2442 predicates,
2443 options: Arc::from(options),
2444 };
2445
2446 LazyFrame::from_logical_plan(lp, opt_state)
2447 }
2448}
2449
2450pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2451 #[cfg(not(feature = "new_streaming"))]
2452 {
2453 None
2454 }
2455 #[cfg(feature = "new_streaming")]
2456 {
2457 Some(polars_stream::build_streaming_query_executor)
2458 }
2459};
2460
2461pub struct CollectBatches {
2462 recv: Receiver<PolarsResult<DataFrame>>,
2463 runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2464}
2465
2466impl CollectBatches {
2467 pub fn start(&mut self) {
2469 if let Some(runner) = self.runner.take() {
2470 runner()
2471 }
2472 }
2473}
2474
2475impl Iterator for CollectBatches {
2476 type Item = PolarsResult<DataFrame>;
2477
2478 fn next(&mut self) -> Option<Self::Item> {
2479 self.start();
2480 self.recv.recv().ok()
2481 }
2482}