1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::mpsc::{Receiver, sync_channel};
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "json")]
21pub use ndjson::*;
22#[cfg(feature = "parquet")]
23pub use parquet::*;
24use polars_compute::rolling::QuantileMethod;
25use polars_core::POOL;
26use polars_core::error::feature_gated;
27use polars_core::prelude::*;
28use polars_io::RowIndex;
29use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
30use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
31use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
32#[cfg(feature = "is_between")]
33use polars_ops::prelude::ClosedInterval;
34pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
35use polars_utils::pl_str::PlSmallStr;
36use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
37
38use crate::frame::cached_arenas::CachedArena;
39use crate::prelude::*;
40
41pub trait IntoLazy {
42 fn lazy(self) -> LazyFrame;
43}
44
45impl IntoLazy for DataFrame {
46 fn lazy(self) -> LazyFrame {
48 let lp = DslBuilder::from_existing_df(self).build();
49 LazyFrame {
50 logical_plan: lp,
51 opt_state: Default::default(),
52 cached_arena: Default::default(),
53 }
54 }
55}
56
57impl IntoLazy for LazyFrame {
58 fn lazy(self) -> LazyFrame {
59 self
60 }
61}
62
63#[derive(Clone, Default)]
68#[must_use]
69pub struct LazyFrame {
70 pub logical_plan: DslPlan,
71 pub(crate) opt_state: OptFlags,
72 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
73}
74
75impl From<DslPlan> for LazyFrame {
76 fn from(plan: DslPlan) -> Self {
77 Self {
78 logical_plan: plan,
79 opt_state: OptFlags::default(),
80 cached_arena: Default::default(),
81 }
82 }
83}
84
85impl LazyFrame {
86 pub(crate) fn from_inner(
87 logical_plan: DslPlan,
88 opt_state: OptFlags,
89 cached_arena: Arc<Mutex<Option<CachedArena>>>,
90 ) -> Self {
91 Self {
92 logical_plan,
93 opt_state,
94 cached_arena,
95 }
96 }
97
98 pub(crate) fn get_plan_builder(self) -> DslBuilder {
99 DslBuilder::from(self.logical_plan)
100 }
101
102 fn get_opt_state(&self) -> OptFlags {
103 self.opt_state
104 }
105
106 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
107 LazyFrame {
108 logical_plan,
109 opt_state,
110 cached_arena: Default::default(),
111 }
112 }
113
114 pub fn get_current_optimizations(&self) -> OptFlags {
116 self.opt_state
117 }
118
119 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
121 self.opt_state = opt_state;
122 self
123 }
124
125 pub fn without_optimizations(self) -> Self {
127 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
128 }
129
130 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
132 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
133 self
134 }
135
136 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
138 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
139 self
140 }
141
142 pub fn with_check_order(mut self, toggle: bool) -> Self {
145 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
146 self
147 }
148
149 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
151 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
152 self
153 }
154
155 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
157 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
158 self
159 }
160
161 pub fn with_type_check(mut self, toggle: bool) -> Self {
163 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
164 self
165 }
166
167 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
169 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
170 self
171 }
172
173 #[cfg(feature = "cse")]
175 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
176 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
177 self
178 }
179
180 #[cfg(feature = "cse")]
182 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
183 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
184 self
185 }
186
187 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
189 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
190 self
191 }
192
193 #[cfg(feature = "new_streaming")]
194 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
195 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
196 self
197 }
198
199 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
201 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
202 self
203 }
204
205 pub fn _with_eager(mut self, toggle: bool) -> Self {
207 self.opt_state.set(OptFlags::EAGER, toggle);
208 self
209 }
210
211 pub fn describe_plan(&self) -> PolarsResult<String> {
213 Ok(self.clone().to_alp()?.describe())
214 }
215
216 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
218 Ok(self.clone().to_alp()?.describe_tree_format())
219 }
220
221 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
225 Ok(self.clone().to_alp_optimized()?.describe())
226 }
227
228 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
232 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
233 }
234
235 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
240 if optimized {
241 self.describe_optimized_plan()
242 } else {
243 self.describe_plan()
244 }
245 }
246
247 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
287 let opt_state = self.get_opt_state();
288 let lp = self
289 .get_plan_builder()
290 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
291 .build();
292 Self::from_logical_plan(lp, opt_state)
293 }
294
295 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
315 self,
316 by_exprs: E,
317 sort_options: SortMultipleOptions,
318 ) -> Self {
319 let by_exprs = by_exprs.as_ref().to_vec();
320 if by_exprs.is_empty() {
321 self
322 } else {
323 let opt_state = self.get_opt_state();
324 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
325 Self::from_logical_plan(lp, opt_state)
326 }
327 }
328
329 pub fn top_k<E: AsRef<[Expr]>>(
330 self,
331 k: IdxSize,
332 by_exprs: E,
333 sort_options: SortMultipleOptions,
334 ) -> Self {
335 self.sort_by_exprs(
337 by_exprs,
338 sort_options.with_order_reversed().with_nulls_last(true),
339 )
340 .slice(0, k)
341 }
342
343 pub fn bottom_k<E: AsRef<[Expr]>>(
344 self,
345 k: IdxSize,
346 by_exprs: E,
347 sort_options: SortMultipleOptions,
348 ) -> Self {
349 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
351 .slice(0, k)
352 }
353
354 pub fn reverse(self) -> Self {
370 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
371 }
372
373 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
381 where
382 I: IntoIterator<Item = T>,
383 J: IntoIterator<Item = S>,
384 T: AsRef<str>,
385 S: AsRef<str>,
386 {
387 let iter = existing.into_iter();
388 let cap = iter.size_hint().0;
389 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
391
392 for (existing, new) in iter.zip(new) {
395 let existing = existing.as_ref();
396 let new = new.as_ref();
397 if new != existing {
398 existing_vec.push(existing.into());
399 new_vec.push(new.into());
400 }
401 }
402
403 self.map_private(DslFunction::Rename {
404 existing: existing_vec.into(),
405 new: new_vec.into(),
406 strict,
407 })
408 }
409
410 pub fn drop(self, columns: Selector) -> Self {
417 let opt_state = self.get_opt_state();
418 let lp = self.get_plan_builder().drop(columns).build();
419 Self::from_logical_plan(lp, opt_state)
420 }
421
422 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
427 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
428 }
429
430 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
435 self.select(vec![
436 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
437 ])
438 }
439
440 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
442 let opt_state = self.get_opt_state();
443 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
444 Self::from_logical_plan(lp, opt_state)
445 }
446
447 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
449 let opt_state = self.get_opt_state();
450 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
451 Self::from_logical_plan(lp, opt_state)
452 }
453
454 pub fn cache(self) -> Self {
458 let opt_state = self.get_opt_state();
459 let lp = self.get_plan_builder().cache().build();
460 Self::from_logical_plan(lp, opt_state)
461 }
462
463 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
465 let cast_cols: Vec<Expr> = dtypes
466 .into_iter()
467 .map(|(name, dt)| {
468 let name = PlSmallStr::from_str(name);
469
470 if strict {
471 col(name).strict_cast(dt)
472 } else {
473 col(name).cast(dt)
474 }
475 })
476 .collect();
477
478 if cast_cols.is_empty() {
479 self
480 } else {
481 self.with_columns(cast_cols)
482 }
483 }
484
485 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
487 self.with_columns(vec![if strict {
488 col(PlSmallStr::from_static("*")).strict_cast(dtype)
489 } else {
490 col(PlSmallStr::from_static("*")).cast(dtype)
491 }])
492 }
493
494 pub fn optimize(
495 self,
496 lp_arena: &mut Arena<IR>,
497 expr_arena: &mut Arena<AExpr>,
498 ) -> PolarsResult<Node> {
499 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
500 }
501
502 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
503 let (mut lp_arena, mut expr_arena) = self.get_arenas();
504 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
505
506 Ok(IRPlan::new(node, lp_arena, expr_arena))
507 }
508
509 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
510 let (mut lp_arena, mut expr_arena) = self.get_arenas();
511 let node = to_alp(
512 self.logical_plan,
513 &mut expr_arena,
514 &mut lp_arena,
515 &mut self.opt_state,
516 )?;
517 let plan = IRPlan::new(node, lp_arena, expr_arena);
518 Ok(plan)
519 }
520
521 pub(crate) fn optimize_with_scratch(
522 self,
523 lp_arena: &mut Arena<IR>,
524 expr_arena: &mut Arena<AExpr>,
525 scratch: &mut Vec<Node>,
526 ) -> PolarsResult<Node> {
527 #[allow(unused_mut)]
528 let mut opt_state = self.opt_state;
529 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
530
531 #[cfg(feature = "cse")]
532 if new_streaming {
533 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
536 }
537
538 let lp_top = optimize(
539 self.logical_plan,
540 opt_state,
541 lp_arena,
542 expr_arena,
543 scratch,
544 apply_scan_predicate_to_scan_ir,
545 )?;
546
547 Ok(lp_top)
548 }
549
550 fn prepare_collect_post_opt<P>(
551 mut self,
552 check_sink: bool,
553 query_start: Option<std::time::Instant>,
554 post_opt: P,
555 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
556 where
557 P: FnOnce(
558 Node,
559 &mut Arena<IR>,
560 &mut Arena<AExpr>,
561 Option<std::time::Duration>,
562 ) -> PolarsResult<()>,
563 {
564 let (mut lp_arena, mut expr_arena) = self.get_arenas();
565
566 let mut scratch = vec![];
567 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
568
569 post_opt(
570 lp_top,
571 &mut lp_arena,
572 &mut expr_arena,
573 query_start.map(|s| s.elapsed()),
576 )?;
577
578 let no_file_sink = if check_sink {
580 !matches!(
581 lp_arena.get(lp_top),
582 IR::Sink {
583 payload: SinkTypeIR::File { .. },
584 ..
585 }
586 )
587 } else {
588 true
589 };
590 let physical_plan = create_physical_plan(
591 lp_top,
592 &mut lp_arena,
593 &mut expr_arena,
594 BUILD_STREAMING_EXECUTOR,
595 )?;
596
597 let state = ExecutionState::new();
598 Ok((state, physical_plan, no_file_sink))
599 }
600
601 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
603 where
604 P: FnOnce(
605 Node,
606 &mut Arena<IR>,
607 &mut Arena<AExpr>,
608 Option<std::time::Duration>,
609 ) -> PolarsResult<()>,
610 {
611 let (mut state, mut physical_plan, _) =
612 self.prepare_collect_post_opt(false, None, post_opt)?;
613 physical_plan.execute(&mut state)
614 }
615
616 #[allow(unused_mut)]
617 fn prepare_collect(
618 self,
619 check_sink: bool,
620 query_start: Option<std::time::Instant>,
621 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
622 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
623 }
624
625 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
630 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
631 payload.clone()
632 } else {
633 self.logical_plan = DslPlan::Sink {
634 input: Arc::new(self.logical_plan),
635 payload: SinkType::Memory,
636 };
637 SinkType::Memory
638 };
639
640 if engine == Engine::Auto {
642 engine = match payload {
643 #[cfg(feature = "new_streaming")]
644 SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
645 _ => Engine::InMemory,
646 };
647 }
648 if engine == Engine::Gpu {
650 engine = Engine::InMemory;
651 }
652
653 #[cfg(feature = "new_streaming")]
654 {
655 if let Some(result) = self.try_new_streaming_if_requested() {
656 return result.map(|v| v.unwrap_single());
657 }
658 }
659
660 match engine {
661 Engine::Auto => unreachable!(),
662 Engine::Streaming => {
663 feature_gated!("new_streaming", self = self.with_new_streaming(true))
664 },
665 _ => {},
666 }
667 let mut alp_plan = self.clone().to_alp_optimized()?;
668
669 match engine {
670 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
671 let result = polars_stream::run_query(
672 alp_plan.lp_top,
673 &mut alp_plan.lp_arena,
674 &mut alp_plan.expr_arena,
675 );
676 result.map(|v| v.unwrap_single())
677 }),
678 Engine::Gpu => {
679 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
680 },
681 Engine::InMemory => {
682 let mut physical_plan = create_physical_plan(
683 alp_plan.lp_top,
684 &mut alp_plan.lp_arena,
685 &mut alp_plan.expr_arena,
686 BUILD_STREAMING_EXECUTOR,
687 )?;
688 let mut state = ExecutionState::new();
689 physical_plan.execute(&mut state)
690 },
691 }
692 }
693
694 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
695 let sink_multiple = LazyFrame {
696 logical_plan: DslPlan::SinkMultiple { inputs: plans },
697 opt_state,
698 cached_arena: Default::default(),
699 };
700 sink_multiple.explain(true)
701 }
702
703 pub fn collect_all_with_engine(
704 plans: Vec<DslPlan>,
705 mut engine: Engine,
706 opt_state: OptFlags,
707 ) -> PolarsResult<Vec<DataFrame>> {
708 if plans.is_empty() {
709 return Ok(Vec::new());
710 }
711
712 if engine == Engine::Auto {
714 engine = Engine::InMemory;
715 }
716 if engine == Engine::Gpu {
718 engine = Engine::InMemory;
719 }
720
721 let mut sink_multiple = LazyFrame {
722 logical_plan: DslPlan::SinkMultiple { inputs: plans },
723 opt_state,
724 cached_arena: Default::default(),
725 };
726
727 #[cfg(feature = "new_streaming")]
728 {
729 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
730 return result.map(|v| v.unwrap_multiple());
731 }
732 }
733
734 match engine {
735 Engine::Auto => unreachable!(),
736 Engine::Streaming => {
737 feature_gated!(
738 "new_streaming",
739 sink_multiple = sink_multiple.with_new_streaming(true)
740 )
741 },
742 _ => {},
743 }
744 let mut alp_plan = sink_multiple.to_alp_optimized()?;
745
746 if engine == Engine::Streaming {
747 feature_gated!("new_streaming", {
748 let result = polars_stream::run_query(
749 alp_plan.lp_top,
750 &mut alp_plan.lp_arena,
751 &mut alp_plan.expr_arena,
752 );
753 return result.map(|v| v.unwrap_multiple());
754 });
755 }
756
757 let IR::SinkMultiple { inputs } = alp_plan.root() else {
758 unreachable!()
759 };
760
761 let mut multiplan = create_multiple_physical_plans(
762 inputs.clone().as_slice(),
763 &mut alp_plan.lp_arena,
764 &mut alp_plan.expr_arena,
765 BUILD_STREAMING_EXECUTOR,
766 )?;
767
768 match engine {
769 Engine::Gpu => polars_bail!(
770 InvalidOperation: "collect_all is not supported for the gpu engine"
771 ),
772 Engine::InMemory => {
773 let mut state = ExecutionState::new();
777 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
778 cache_prefiller.execute(&mut state)?;
779 }
780 let out = POOL.install(|| {
781 multiplan
782 .physical_plans
783 .chunks_mut(POOL.current_num_threads() * 3)
784 .map(|chunk| {
785 chunk
786 .into_par_iter()
787 .enumerate()
788 .map(|(idx, input)| {
789 let mut input = std::mem::take(input);
790 let mut state = state.split();
791 state.branch_idx += idx;
792
793 let df = input.execute(&mut state)?;
794 Ok(df)
795 })
796 .collect::<PolarsResult<Vec<_>>>()
797 })
798 .collect::<PolarsResult<Vec<_>>>()
799 });
800 Ok(out?.into_iter().flatten().collect())
801 },
802 _ => unreachable!(),
803 }
804 }
805
806 pub fn collect(self) -> PolarsResult<DataFrame> {
824 self.collect_with_engine(Engine::InMemory)
825 }
826
827 #[cfg(feature = "async")]
832 pub fn collect_batches(
833 self,
834 engine: Engine,
835 maintain_order: bool,
836 chunk_size: Option<NonZeroUsize>,
837 lazy: bool,
838 ) -> PolarsResult<CollectBatches> {
839 let (send, recv) = sync_channel(1);
840 let runner_send = send.clone();
841 let ldf = self.sink_batches(
842 PlanCallback::new(move |df| {
843 let send_result = send.send(Ok(df));
845 Ok(send_result.is_err())
846 }),
847 maintain_order,
848 chunk_size,
849 )?;
850 let runner = move || {
851 polars_io::pl_async::get_runtime().spawn_blocking(move || {
854 if let Err(e) = ldf.collect_with_engine(engine) {
855 runner_send.send(Err(e)).ok();
856 }
857 });
858 };
859
860 let mut collect_batches = CollectBatches {
861 recv,
862 runner: Some(Box::new(runner)),
863 };
864 if !lazy {
865 collect_batches.start();
866 }
867 Ok(collect_batches)
868 }
869
870 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
873 where
874 P: FnOnce(
875 Node,
876 &mut Arena<IR>,
877 &mut Arena<AExpr>,
878 Option<std::time::Duration>,
879 ) -> PolarsResult<()>,
880 {
881 let query_start = std::time::Instant::now();
882 let (mut state, mut physical_plan, _) =
883 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
884 state.time_nodes(query_start);
885 let out = physical_plan.execute(&mut state)?;
886 let timer_df = state.finish_timer()?;
887 Ok((out, timer_df))
888 }
889
890 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
898 self._profile_post_opt(|_, _, _, _| Ok(()))
899 }
900
901 pub fn sink_batches(
902 mut self,
903 function: PlanCallback<DataFrame, bool>,
904 maintain_order: bool,
905 chunk_size: Option<NonZeroUsize>,
906 ) -> PolarsResult<Self> {
907 use polars_plan::prelude::sink::CallbackSinkType;
908
909 polars_ensure!(
910 !matches!(self.logical_plan, DslPlan::Sink { .. }),
911 InvalidOperation: "cannot create a sink on top of another sink"
912 );
913
914 self.logical_plan = DslPlan::Sink {
915 input: Arc::new(self.logical_plan),
916 payload: SinkType::Callback(CallbackSinkType {
917 function,
918 maintain_order,
919 chunk_size,
920 }),
921 };
922
923 Ok(self)
924 }
925
926 #[cfg(feature = "new_streaming")]
927 pub fn try_new_streaming_if_requested(
928 &mut self,
929 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
930 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
931 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
932
933 if auto_new_streaming || force_new_streaming {
934 let mut new_stream_lazy = self.clone();
937 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
938 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
939 Ok(v) => v,
940 Err(e) => return Some(Err(e)),
941 };
942
943 let f = || {
944 polars_stream::run_query(
945 alp_plan.lp_top,
946 &mut alp_plan.lp_arena,
947 &mut alp_plan.expr_arena,
948 )
949 };
950
951 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
952 Ok(v) => return Some(v),
953 Err(e) => {
954 if !force_new_streaming
957 && auto_new_streaming
958 && e.downcast_ref::<&str>()
959 .map(|s| s.starts_with("not yet implemented"))
960 .unwrap_or(false)
961 {
962 if polars_core::config::verbose() {
963 eprintln!(
964 "caught unimplemented error in new streaming engine, falling back to normal engine"
965 );
966 }
967 } else {
968 std::panic::resume_unwind(e);
969 }
970 },
971 }
972 }
973
974 None
975 }
976
977 pub fn sink(
978 mut self,
979 sink_type: SinkDestination,
980 file_format: impl Into<Arc<FileType>>,
981 unified_sink_args: UnifiedSinkArgs,
982 ) -> PolarsResult<Self> {
983 polars_ensure!(
984 !matches!(self.logical_plan, DslPlan::Sink { .. }),
985 InvalidOperation: "cannot create a sink on top of another sink"
986 );
987
988 self.logical_plan = DslPlan::Sink {
989 input: Arc::new(self.logical_plan),
990 payload: match sink_type {
991 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
992 target,
993 file_format: file_format.into(),
994 unified_sink_args,
995 }),
996 SinkDestination::Partitioned {
997 base_path,
998 file_path_provider,
999 partition_strategy,
1000 finish_callback,
1001 max_rows_per_file,
1002 approximate_bytes_per_file,
1003 } => SinkType::Partitioned(PartitionedSinkOptions {
1004 base_path,
1005 file_path_provider,
1006 partition_strategy,
1007 finish_callback,
1008 file_format: file_format.into(),
1009 unified_sink_args,
1010 max_rows_per_file,
1011 approximate_bytes_per_file,
1012 }),
1013 },
1014 };
1015 Ok(self)
1016 }
1017
1018 pub fn filter(self, predicate: Expr) -> Self {
1036 let opt_state = self.get_opt_state();
1037 let lp = self.get_plan_builder().filter(predicate).build();
1038 Self::from_logical_plan(lp, opt_state)
1039 }
1040
1041 pub fn remove(self, predicate: Expr) -> Self {
1059 self.filter(predicate.neq_missing(lit(true)))
1060 }
1061
1062 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1088 let exprs = exprs.as_ref().to_vec();
1089 self.select_impl(
1090 exprs,
1091 ProjectionOptions {
1092 run_parallel: true,
1093 duplicate_check: true,
1094 should_broadcast: true,
1095 },
1096 )
1097 }
1098
1099 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1100 let exprs = exprs.as_ref().to_vec();
1101 self.select_impl(
1102 exprs,
1103 ProjectionOptions {
1104 run_parallel: false,
1105 duplicate_check: true,
1106 should_broadcast: true,
1107 },
1108 )
1109 }
1110
1111 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1112 let opt_state = self.get_opt_state();
1113 let lp = self.get_plan_builder().project(exprs, options).build();
1114 Self::from_logical_plan(lp, opt_state)
1115 }
1116
1117 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1138 let keys = by
1139 .as_ref()
1140 .iter()
1141 .map(|e| e.clone().into())
1142 .collect::<Vec<_>>();
1143 let opt_state = self.get_opt_state();
1144
1145 #[cfg(feature = "dynamic_group_by")]
1146 {
1147 LazyGroupBy {
1148 logical_plan: self.logical_plan,
1149 opt_state,
1150 keys,
1151 predicates: vec![],
1152 maintain_order: false,
1153 dynamic_options: None,
1154 rolling_options: None,
1155 }
1156 }
1157
1158 #[cfg(not(feature = "dynamic_group_by"))]
1159 {
1160 LazyGroupBy {
1161 logical_plan: self.logical_plan,
1162 opt_state,
1163 keys,
1164 predicates: vec![],
1165 maintain_order: false,
1166 }
1167 }
1168 }
1169
1170 #[cfg(feature = "dynamic_group_by")]
1178 pub fn rolling<E: AsRef<[Expr]>>(
1179 mut self,
1180 index_column: Expr,
1181 group_by: E,
1182 mut options: RollingGroupOptions,
1183 ) -> LazyGroupBy {
1184 if let Expr::Column(name) = index_column {
1185 options.index_column = name;
1186 } else {
1187 let output_field = index_column
1188 .to_field(&self.collect_schema().unwrap())
1189 .unwrap();
1190 return self.with_column(index_column).rolling(
1191 Expr::Column(output_field.name().clone()),
1192 group_by,
1193 options,
1194 );
1195 }
1196 let opt_state = self.get_opt_state();
1197 LazyGroupBy {
1198 logical_plan: self.logical_plan,
1199 opt_state,
1200 predicates: vec![],
1201 keys: group_by.as_ref().to_vec(),
1202 maintain_order: true,
1203 dynamic_options: None,
1204 rolling_options: Some(options),
1205 }
1206 }
1207
1208 #[cfg(feature = "dynamic_group_by")]
1224 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1225 mut self,
1226 index_column: Expr,
1227 group_by: E,
1228 mut options: DynamicGroupOptions,
1229 ) -> LazyGroupBy {
1230 if let Expr::Column(name) = index_column {
1231 options.index_column = name;
1232 } else {
1233 let output_field = index_column
1234 .to_field(&self.collect_schema().unwrap())
1235 .unwrap();
1236 return self.with_column(index_column).group_by_dynamic(
1237 Expr::Column(output_field.name().clone()),
1238 group_by,
1239 options,
1240 );
1241 }
1242 let opt_state = self.get_opt_state();
1243 LazyGroupBy {
1244 logical_plan: self.logical_plan,
1245 opt_state,
1246 predicates: vec![],
1247 keys: group_by.as_ref().to_vec(),
1248 maintain_order: true,
1249 dynamic_options: Some(options),
1250 rolling_options: None,
1251 }
1252 }
1253
1254 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1256 let keys = by
1257 .as_ref()
1258 .iter()
1259 .map(|e| e.clone().into())
1260 .collect::<Vec<_>>();
1261 let opt_state = self.get_opt_state();
1262
1263 #[cfg(feature = "dynamic_group_by")]
1264 {
1265 LazyGroupBy {
1266 logical_plan: self.logical_plan,
1267 opt_state,
1268 keys,
1269 predicates: vec![],
1270 maintain_order: true,
1271 dynamic_options: None,
1272 rolling_options: None,
1273 }
1274 }
1275
1276 #[cfg(not(feature = "dynamic_group_by"))]
1277 {
1278 LazyGroupBy {
1279 logical_plan: self.logical_plan,
1280 opt_state,
1281 keys,
1282 predicates: vec![],
1283 maintain_order: true,
1284 }
1285 }
1286 }
1287
1288 #[cfg(feature = "semi_anti_join")]
1305 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1306 self.join(
1307 other,
1308 [left_on.into()],
1309 [right_on.into()],
1310 JoinArgs::new(JoinType::Anti),
1311 )
1312 }
1313
1314 #[cfg(feature = "cross_join")]
1316 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1317 self.join(
1318 other,
1319 vec![],
1320 vec![],
1321 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1322 )
1323 }
1324
1325 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1342 self.join(
1343 other,
1344 [left_on.into()],
1345 [right_on.into()],
1346 JoinArgs::new(JoinType::Left),
1347 )
1348 }
1349
1350 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1367 self.join(
1368 other,
1369 [left_on.into()],
1370 [right_on.into()],
1371 JoinArgs::new(JoinType::Inner),
1372 )
1373 }
1374
1375 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1392 self.join(
1393 other,
1394 [left_on.into()],
1395 [right_on.into()],
1396 JoinArgs::new(JoinType::Full),
1397 )
1398 }
1399
1400 #[cfg(feature = "semi_anti_join")]
1417 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1418 self.join(
1419 other,
1420 [left_on.into()],
1421 [right_on.into()],
1422 JoinArgs::new(JoinType::Semi),
1423 )
1424 }
1425
1426 pub fn join<E: AsRef<[Expr]>>(
1448 self,
1449 other: LazyFrame,
1450 left_on: E,
1451 right_on: E,
1452 args: JoinArgs,
1453 ) -> LazyFrame {
1454 let left_on = left_on.as_ref().to_vec();
1455 let right_on = right_on.as_ref().to_vec();
1456
1457 self._join_impl(other, left_on, right_on, args)
1458 }
1459
1460 fn _join_impl(
1461 self,
1462 other: LazyFrame,
1463 left_on: Vec<Expr>,
1464 right_on: Vec<Expr>,
1465 args: JoinArgs,
1466 ) -> LazyFrame {
1467 let JoinArgs {
1468 how,
1469 validation,
1470 suffix,
1471 slice,
1472 nulls_equal,
1473 coalesce,
1474 maintain_order,
1475 } = args;
1476
1477 if slice.is_some() {
1478 panic!("impl error: slice is not handled")
1479 }
1480
1481 let mut builder = self
1482 .join_builder()
1483 .with(other)
1484 .left_on(left_on)
1485 .right_on(right_on)
1486 .how(how)
1487 .validate(validation)
1488 .join_nulls(nulls_equal)
1489 .coalesce(coalesce)
1490 .maintain_order(maintain_order);
1491
1492 if let Some(suffix) = suffix {
1493 builder = builder.suffix(suffix);
1494 }
1495
1496 builder.finish()
1498 }
1499
1500 pub fn join_builder(self) -> JoinBuilder {
1506 JoinBuilder::new(self)
1507 }
1508
1509 pub fn with_column(self, expr: Expr) -> LazyFrame {
1527 let opt_state = self.get_opt_state();
1528 let lp = self
1529 .get_plan_builder()
1530 .with_columns(
1531 vec![expr],
1532 ProjectionOptions {
1533 run_parallel: false,
1534 duplicate_check: true,
1535 should_broadcast: true,
1536 },
1537 )
1538 .build();
1539 Self::from_logical_plan(lp, opt_state)
1540 }
1541
1542 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1557 let exprs = exprs.as_ref().to_vec();
1558 self.with_columns_impl(
1559 exprs,
1560 ProjectionOptions {
1561 run_parallel: true,
1562 duplicate_check: true,
1563 should_broadcast: true,
1564 },
1565 )
1566 }
1567
1568 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1570 let exprs = exprs.as_ref().to_vec();
1571 self.with_columns_impl(
1572 exprs,
1573 ProjectionOptions {
1574 run_parallel: false,
1575 duplicate_check: true,
1576 should_broadcast: true,
1577 },
1578 )
1579 }
1580
1581 pub fn match_to_schema(
1583 self,
1584 schema: SchemaRef,
1585 per_column: Arc<[MatchToSchemaPerColumn]>,
1586 extra_columns: ExtraColumnsPolicy,
1587 ) -> LazyFrame {
1588 let opt_state = self.get_opt_state();
1589 let lp = self
1590 .get_plan_builder()
1591 .match_to_schema(schema, per_column, extra_columns)
1592 .build();
1593 Self::from_logical_plan(lp, opt_state)
1594 }
1595
1596 pub fn pipe_with_schema(
1597 self,
1598 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1599 ) -> Self {
1600 let opt_state = self.get_opt_state();
1601 let lp = self
1602 .get_plan_builder()
1603 .pipe_with_schema(vec![], callback)
1604 .build();
1605 Self::from_logical_plan(lp, opt_state)
1606 }
1607
1608 pub fn pipe_with_schemas(
1609 self,
1610 others: Vec<LazyFrame>,
1611 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1612 ) -> Self {
1613 let opt_state = self.get_opt_state();
1614 let lp = self
1615 .get_plan_builder()
1616 .pipe_with_schema(
1617 others.into_iter().map(|lf| lf.logical_plan).collect(),
1618 callback,
1619 )
1620 .build();
1621 Self::from_logical_plan(lp, opt_state)
1622 }
1623
1624 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1625 let opt_state = self.get_opt_state();
1626 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1627 Self::from_logical_plan(lp, opt_state)
1628 }
1629
1630 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1631 let contexts = contexts
1632 .as_ref()
1633 .iter()
1634 .map(|lf| lf.logical_plan.clone())
1635 .collect();
1636 let opt_state = self.get_opt_state();
1637 let lp = self.get_plan_builder().with_context(contexts).build();
1638 Self::from_logical_plan(lp, opt_state)
1639 }
1640
1641 pub fn max(self) -> Self {
1645 self.map_private(DslFunction::Stats(StatsFunction::Max))
1646 }
1647
1648 pub fn min(self) -> Self {
1652 self.map_private(DslFunction::Stats(StatsFunction::Min))
1653 }
1654
1655 pub fn sum(self) -> Self {
1665 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1666 }
1667
1668 pub fn mean(self) -> Self {
1673 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1674 }
1675
1676 pub fn median(self) -> Self {
1682 self.map_private(DslFunction::Stats(StatsFunction::Median))
1683 }
1684
1685 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1687 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1688 quantile,
1689 method,
1690 }))
1691 }
1692
1693 pub fn std(self, ddof: u8) -> Self {
1706 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1707 }
1708
1709 pub fn var(self, ddof: u8) -> Self {
1719 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1720 }
1721
1722 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1724 self.explode_impl(columns, options, false)
1725 }
1726
1727 fn explode_impl(
1729 self,
1730 columns: Selector,
1731 options: ExplodeOptions,
1732 allow_empty: bool,
1733 ) -> LazyFrame {
1734 let opt_state = self.get_opt_state();
1735 let lp = self
1736 .get_plan_builder()
1737 .explode(columns, options, allow_empty)
1738 .build();
1739 Self::from_logical_plan(lp, opt_state)
1740 }
1741
1742 pub fn null_count(self) -> LazyFrame {
1744 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1745 }
1746
1747 pub fn unique_stable(
1752 self,
1753 subset: Option<Selector>,
1754 keep_strategy: UniqueKeepStrategy,
1755 ) -> LazyFrame {
1756 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1757 self.unique_stable_generic(subset, keep_strategy)
1758 }
1759
1760 pub fn unique_stable_generic(
1761 self,
1762 subset: Option<Vec<Expr>>,
1763 keep_strategy: UniqueKeepStrategy,
1764 ) -> LazyFrame {
1765 let opt_state = self.get_opt_state();
1766 let options = DistinctOptionsDSL {
1767 subset,
1768 maintain_order: true,
1769 keep_strategy,
1770 };
1771 let lp = self.get_plan_builder().distinct(options).build();
1772 Self::from_logical_plan(lp, opt_state)
1773 }
1774
1775 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1783 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1784 self.unique_generic(subset, keep_strategy)
1785 }
1786
1787 pub fn unique_generic(
1788 self,
1789 subset: Option<Vec<Expr>>,
1790 keep_strategy: UniqueKeepStrategy,
1791 ) -> LazyFrame {
1792 let opt_state = self.get_opt_state();
1793 let options = DistinctOptionsDSL {
1794 subset,
1795 maintain_order: false,
1796 keep_strategy,
1797 };
1798 let lp = self.get_plan_builder().distinct(options).build();
1799 Self::from_logical_plan(lp, opt_state)
1800 }
1801
1802 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1807 let opt_state = self.get_opt_state();
1808 let lp = self.get_plan_builder().drop_nans(subset).build();
1809 Self::from_logical_plan(lp, opt_state)
1810 }
1811
1812 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1817 let opt_state = self.get_opt_state();
1818 let lp = self.get_plan_builder().drop_nulls(subset).build();
1819 Self::from_logical_plan(lp, opt_state)
1820 }
1821
1822 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1832 let opt_state = self.get_opt_state();
1833 let lp = self.get_plan_builder().slice(offset, len).build();
1834 Self::from_logical_plan(lp, opt_state)
1835 }
1836
1837 pub fn first(self) -> LazyFrame {
1841 self.slice(0, 1)
1842 }
1843
1844 pub fn last(self) -> LazyFrame {
1848 self.slice(-1, 1)
1849 }
1850
1851 pub fn tail(self, n: IdxSize) -> LazyFrame {
1855 let neg_tail = -(n as i64);
1856 self.slice(neg_tail, n)
1857 }
1858
1859 #[cfg(feature = "pivot")]
1860 #[expect(clippy::too_many_arguments)]
1861 pub fn pivot(
1862 self,
1863 on: Selector,
1864 on_columns: Arc<DataFrame>,
1865 index: Selector,
1866 values: Selector,
1867 agg: Expr,
1868 maintain_order: bool,
1869 separator: PlSmallStr,
1870 ) -> LazyFrame {
1871 let opt_state = self.get_opt_state();
1872 let lp = self
1873 .get_plan_builder()
1874 .pivot(
1875 on,
1876 on_columns,
1877 index,
1878 values,
1879 agg,
1880 maintain_order,
1881 separator,
1882 )
1883 .build();
1884 Self::from_logical_plan(lp, opt_state)
1885 }
1886
1887 #[cfg(feature = "pivot")]
1891 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1892 let opt_state = self.get_opt_state();
1893 let lp = self.get_plan_builder().unpivot(args).build();
1894 Self::from_logical_plan(lp, opt_state)
1895 }
1896
1897 pub fn limit(self, n: IdxSize) -> LazyFrame {
1899 self.slice(0, n)
1900 }
1901
1902 pub fn map<F>(
1916 self,
1917 function: F,
1918 optimizations: AllowedOptimizations,
1919 schema: Option<Arc<dyn UdfSchema>>,
1920 name: Option<&'static str>,
1921 ) -> LazyFrame
1922 where
1923 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1924 {
1925 let opt_state = self.get_opt_state();
1926 let lp = self
1927 .get_plan_builder()
1928 .map(
1929 function,
1930 optimizations,
1931 schema,
1932 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1933 )
1934 .build();
1935 Self::from_logical_plan(lp, opt_state)
1936 }
1937
1938 #[cfg(feature = "python")]
1939 pub fn map_python(
1940 self,
1941 function: polars_utils::python_function::PythonFunction,
1942 optimizations: AllowedOptimizations,
1943 schema: Option<SchemaRef>,
1944 validate_output: bool,
1945 ) -> LazyFrame {
1946 let opt_state = self.get_opt_state();
1947 let lp = self
1948 .get_plan_builder()
1949 .map_python(function, optimizations, schema, validate_output)
1950 .build();
1951 Self::from_logical_plan(lp, opt_state)
1952 }
1953
1954 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1955 let opt_state = self.get_opt_state();
1956 let lp = self.get_plan_builder().map_private(function).build();
1957 Self::from_logical_plan(lp, opt_state)
1958 }
1959
1960 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1969 where
1970 S: Into<PlSmallStr>,
1971 {
1972 let name = name.into();
1973
1974 match &self.logical_plan {
1975 v @ DslPlan::Scan {
1976 scan_type,
1977 unified_scan_args,
1978 ..
1979 } if unified_scan_args.row_index.is_none()
1980 && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1981 {
1982 let DslPlan::Scan {
1983 sources,
1984 mut unified_scan_args,
1985 scan_type,
1986 cached_ir: _,
1987 } = v.clone()
1988 else {
1989 unreachable!()
1990 };
1991
1992 unified_scan_args.row_index = Some(RowIndex {
1993 name,
1994 offset: offset.unwrap_or(0),
1995 });
1996
1997 DslPlan::Scan {
1998 sources,
1999 unified_scan_args,
2000 scan_type,
2001 cached_ir: Default::default(),
2002 }
2003 .into()
2004 },
2005 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2006 }
2007 }
2008
2009 pub fn count(self) -> LazyFrame {
2011 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2012 }
2013
2014 #[cfg(feature = "dtype-struct")]
2017 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2018 self.map_private(DslFunction::Unnest {
2019 columns: cols,
2020 separator,
2021 })
2022 }
2023
2024 #[cfg(feature = "merge_sorted")]
2025 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2026 where
2027 S: Into<PlSmallStr>,
2028 {
2029 let key = key.into();
2030
2031 let lp = DslPlan::MergeSorted {
2032 input_left: Arc::new(self.logical_plan),
2033 input_right: Arc::new(other.logical_plan),
2034 key,
2035 };
2036 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2037 }
2038
2039 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
2040 let lp = DslPlan::MapFunction {
2041 input: Arc::new(self.logical_plan),
2042 function: DslFunction::Hint(hint),
2043 };
2044 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2045 }
2046}
2047
2048#[derive(Clone)]
2050pub struct LazyGroupBy {
2051 pub logical_plan: DslPlan,
2052 opt_state: OptFlags,
2053 keys: Vec<Expr>,
2054 predicates: Vec<Expr>,
2055 maintain_order: bool,
2056 #[cfg(feature = "dynamic_group_by")]
2057 dynamic_options: Option<DynamicGroupOptions>,
2058 #[cfg(feature = "dynamic_group_by")]
2059 rolling_options: Option<RollingGroupOptions>,
2060}
2061
2062impl From<LazyGroupBy> for LazyFrame {
2063 fn from(lgb: LazyGroupBy) -> Self {
2064 Self {
2065 logical_plan: lgb.logical_plan,
2066 opt_state: lgb.opt_state,
2067 cached_arena: Default::default(),
2068 }
2069 }
2070}
2071
2072impl LazyGroupBy {
2073 pub fn having(mut self, predicate: Expr) -> Self {
2094 self.predicates.push(predicate);
2095 self
2096 }
2097
2098 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2120 #[cfg(feature = "dynamic_group_by")]
2121 let lp = DslBuilder::from(self.logical_plan)
2122 .group_by(
2123 self.keys,
2124 self.predicates,
2125 aggs,
2126 None,
2127 self.maintain_order,
2128 self.dynamic_options,
2129 self.rolling_options,
2130 )
2131 .build();
2132
2133 #[cfg(not(feature = "dynamic_group_by"))]
2134 let lp = DslBuilder::from(self.logical_plan)
2135 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2136 .build();
2137 LazyFrame::from_logical_plan(lp, self.opt_state)
2138 }
2139
2140 pub fn head(self, n: Option<usize>) -> LazyFrame {
2142 let keys = self
2143 .keys
2144 .iter()
2145 .filter_map(|expr| expr_output_name(expr).ok())
2146 .collect::<Vec<_>>();
2147
2148 self.agg([all().as_expr().head(n)]).explode_impl(
2149 all() - by_name(keys.iter().cloned(), false),
2150 ExplodeOptions {
2151 empty_as_null: true,
2152 keep_nulls: true,
2153 },
2154 true,
2155 )
2156 }
2157
2158 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2160 let keys = self
2161 .keys
2162 .iter()
2163 .filter_map(|expr| expr_output_name(expr).ok())
2164 .collect::<Vec<_>>();
2165
2166 self.agg([all().as_expr().tail(n)]).explode_impl(
2167 all() - by_name(keys.iter().cloned(), false),
2168 ExplodeOptions {
2169 empty_as_null: true,
2170 keep_nulls: true,
2171 },
2172 true,
2173 )
2174 }
2175
2176 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2181 if !self.predicates.is_empty() {
2182 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2183 }
2184
2185 #[cfg(feature = "dynamic_group_by")]
2186 let options = GroupbyOptions {
2187 dynamic: self.dynamic_options,
2188 rolling: self.rolling_options,
2189 slice: None,
2190 };
2191
2192 #[cfg(not(feature = "dynamic_group_by"))]
2193 let options = GroupbyOptions { slice: None };
2194
2195 let lp = DslPlan::GroupBy {
2196 input: Arc::new(self.logical_plan),
2197 keys: self.keys,
2198 predicates: vec![],
2199 aggs: vec![],
2200 apply: Some((f, schema)),
2201 maintain_order: self.maintain_order,
2202 options: Arc::new(options),
2203 };
2204 LazyFrame::from_logical_plan(lp, self.opt_state)
2205 }
2206}
2207
2208#[must_use]
2209pub struct JoinBuilder {
2210 lf: LazyFrame,
2211 how: JoinType,
2212 other: Option<LazyFrame>,
2213 left_on: Vec<Expr>,
2214 right_on: Vec<Expr>,
2215 allow_parallel: bool,
2216 force_parallel: bool,
2217 suffix: Option<PlSmallStr>,
2218 validation: JoinValidation,
2219 nulls_equal: bool,
2220 coalesce: JoinCoalesce,
2221 maintain_order: MaintainOrderJoin,
2222}
2223impl JoinBuilder {
2224 pub fn new(lf: LazyFrame) -> Self {
2226 Self {
2227 lf,
2228 other: None,
2229 how: JoinType::Inner,
2230 left_on: vec![],
2231 right_on: vec![],
2232 allow_parallel: true,
2233 force_parallel: false,
2234 suffix: None,
2235 validation: Default::default(),
2236 nulls_equal: false,
2237 coalesce: Default::default(),
2238 maintain_order: Default::default(),
2239 }
2240 }
2241
2242 pub fn with(mut self, other: LazyFrame) -> Self {
2244 self.other = Some(other);
2245 self
2246 }
2247
2248 pub fn how(mut self, how: JoinType) -> Self {
2250 self.how = how;
2251 self
2252 }
2253
2254 pub fn validate(mut self, validation: JoinValidation) -> Self {
2255 self.validation = validation;
2256 self
2257 }
2258
2259 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2263 let on = on.as_ref().to_vec();
2264 self.left_on.clone_from(&on);
2265 self.right_on = on;
2266 self
2267 }
2268
2269 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2273 self.left_on = on.as_ref().to_vec();
2274 self
2275 }
2276
2277 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2281 self.right_on = on.as_ref().to_vec();
2282 self
2283 }
2284
2285 pub fn allow_parallel(mut self, allow: bool) -> Self {
2287 self.allow_parallel = allow;
2288 self
2289 }
2290
2291 pub fn force_parallel(mut self, force: bool) -> Self {
2293 self.force_parallel = force;
2294 self
2295 }
2296
2297 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2299 self.nulls_equal = nulls_equal;
2300 self
2301 }
2302
2303 pub fn suffix<S>(mut self, suffix: S) -> Self
2306 where
2307 S: Into<PlSmallStr>,
2308 {
2309 self.suffix = Some(suffix.into());
2310 self
2311 }
2312
2313 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2315 self.coalesce = coalesce;
2316 self
2317 }
2318
2319 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2321 self.maintain_order = maintain_order;
2322 self
2323 }
2324
2325 pub fn finish(self) -> LazyFrame {
2327 let opt_state = self.lf.opt_state;
2328 let other = self.other.expect("'with' not set in join builder");
2329
2330 let args = JoinArgs {
2331 how: self.how,
2332 validation: self.validation,
2333 suffix: self.suffix,
2334 slice: None,
2335 nulls_equal: self.nulls_equal,
2336 coalesce: self.coalesce,
2337 maintain_order: self.maintain_order,
2338 };
2339
2340 let lp = self
2341 .lf
2342 .get_plan_builder()
2343 .join(
2344 other.logical_plan,
2345 self.left_on,
2346 self.right_on,
2347 JoinOptions {
2348 allow_parallel: self.allow_parallel,
2349 force_parallel: self.force_parallel,
2350 args,
2351 }
2352 .into(),
2353 )
2354 .build();
2355 LazyFrame::from_logical_plan(lp, opt_state)
2356 }
2357
2358 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2360 let opt_state = self.lf.opt_state;
2361 let other = self.other.expect("with not set");
2362
2363 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2365 if let Expr::BinaryExpr {
2366 op: Operator::And,
2367 left,
2368 right,
2369 } = predicate
2370 {
2371 decompose_and((*left).clone(), expanded_predicates);
2372 decompose_and((*right).clone(), expanded_predicates);
2373 } else {
2374 expanded_predicates.push(predicate);
2375 }
2376 }
2377 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2378 for predicate in predicates {
2379 decompose_and(predicate, &mut expanded_predicates);
2380 }
2381 let predicates: Vec<Expr> = expanded_predicates;
2382
2383 #[cfg(feature = "is_between")]
2385 let predicates: Vec<Expr> = {
2386 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2387 for predicate in predicates {
2388 if let Expr::Function {
2389 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2390 input,
2391 ..
2392 } = &predicate
2393 {
2394 if let [expr, lower, upper] = input.as_slice() {
2395 match closed {
2396 ClosedInterval::Both => {
2397 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2398 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2399 },
2400 ClosedInterval::Right => {
2401 expanded_predicates.push(expr.clone().gt(lower.clone()));
2402 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2403 },
2404 ClosedInterval::Left => {
2405 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2406 expanded_predicates.push(expr.clone().lt(upper.clone()));
2407 },
2408 ClosedInterval::None => {
2409 expanded_predicates.push(expr.clone().gt(lower.clone()));
2410 expanded_predicates.push(expr.clone().lt(upper.clone()));
2411 },
2412 }
2413 continue;
2414 }
2415 }
2416 expanded_predicates.push(predicate);
2417 }
2418 expanded_predicates
2419 };
2420
2421 let args = JoinArgs {
2422 how: self.how,
2423 validation: self.validation,
2424 suffix: self.suffix,
2425 slice: None,
2426 nulls_equal: self.nulls_equal,
2427 coalesce: self.coalesce,
2428 maintain_order: self.maintain_order,
2429 };
2430 let options = JoinOptions {
2431 allow_parallel: self.allow_parallel,
2432 force_parallel: self.force_parallel,
2433 args,
2434 };
2435
2436 let lp = DslPlan::Join {
2437 input_left: Arc::new(self.lf.logical_plan),
2438 input_right: Arc::new(other.logical_plan),
2439 left_on: Default::default(),
2440 right_on: Default::default(),
2441 predicates,
2442 options: Arc::from(options),
2443 };
2444
2445 LazyFrame::from_logical_plan(lp, opt_state)
2446 }
2447}
2448
2449pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2450 #[cfg(not(feature = "new_streaming"))]
2451 {
2452 None
2453 }
2454 #[cfg(feature = "new_streaming")]
2455 {
2456 Some(polars_stream::build_streaming_query_executor)
2457 }
2458};
2459
2460pub struct CollectBatches {
2461 recv: Receiver<PolarsResult<DataFrame>>,
2462 runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2463}
2464
2465impl CollectBatches {
2466 pub fn start(&mut self) {
2468 if let Some(runner) = self.runner.take() {
2469 runner()
2470 }
2471 }
2472}
2473
2474impl Iterator for CollectBatches {
2475 type Item = PolarsResult<DataFrame>;
2476
2477 fn next(&mut self) -> Option<Self::Item> {
2478 self.start();
2479 self.recv.recv().ok()
2480 }
2481}