1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9
10use std::num::NonZeroUsize;
11use std::sync::{Arc, Mutex};
12
13pub use anonymous_scan::*;
14#[cfg(feature = "csv")]
15pub use csv::*;
16#[cfg(not(target_arch = "wasm32"))]
17pub use exitable::*;
18pub use file_list_reader::*;
19#[cfg(feature = "json")]
20pub use ndjson::*;
21#[cfg(feature = "parquet")]
22pub use parquet::*;
23use polars_compute::rolling::QuantileMethod;
24use polars_core::POOL;
25use polars_core::error::feature_gated;
26use polars_core::prelude::*;
27use polars_io::RowIndex;
28use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
29use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
30use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
31#[cfg(feature = "is_between")]
32use polars_ops::prelude::ClosedInterval;
33pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
34use polars_utils::pl_str::PlSmallStr;
35use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
36
37use crate::frame::cached_arenas::CachedArena;
38use crate::prelude::*;
39
40pub trait IntoLazy {
41 fn lazy(self) -> LazyFrame;
42}
43
44impl IntoLazy for DataFrame {
45 fn lazy(self) -> LazyFrame {
47 let lp = DslBuilder::from_existing_df(self).build();
48 LazyFrame {
49 logical_plan: lp,
50 opt_state: Default::default(),
51 cached_arena: Default::default(),
52 }
53 }
54}
55
56impl IntoLazy for LazyFrame {
57 fn lazy(self) -> LazyFrame {
58 self
59 }
60}
61
62#[derive(Clone, Default)]
67#[must_use]
68pub struct LazyFrame {
69 pub logical_plan: DslPlan,
70 pub(crate) opt_state: OptFlags,
71 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
72}
73
74impl From<DslPlan> for LazyFrame {
75 fn from(plan: DslPlan) -> Self {
76 Self {
77 logical_plan: plan,
78 opt_state: OptFlags::default(),
79 cached_arena: Default::default(),
80 }
81 }
82}
83
84impl LazyFrame {
85 pub(crate) fn from_inner(
86 logical_plan: DslPlan,
87 opt_state: OptFlags,
88 cached_arena: Arc<Mutex<Option<CachedArena>>>,
89 ) -> Self {
90 Self {
91 logical_plan,
92 opt_state,
93 cached_arena,
94 }
95 }
96
97 pub(crate) fn get_plan_builder(self) -> DslBuilder {
98 DslBuilder::from(self.logical_plan)
99 }
100
101 fn get_opt_state(&self) -> OptFlags {
102 self.opt_state
103 }
104
105 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
106 LazyFrame {
107 logical_plan,
108 opt_state,
109 cached_arena: Default::default(),
110 }
111 }
112
113 pub fn get_current_optimizations(&self) -> OptFlags {
115 self.opt_state
116 }
117
118 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
120 self.opt_state = opt_state;
121 self
122 }
123
124 pub fn without_optimizations(self) -> Self {
126 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
127 }
128
129 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
131 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
132 self
133 }
134
135 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
137 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
138 self
139 }
140
141 pub fn with_check_order(mut self, toggle: bool) -> Self {
144 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
145 self
146 }
147
148 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
150 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
151 self
152 }
153
154 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
156 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
157 self
158 }
159
160 pub fn with_type_check(mut self, toggle: bool) -> Self {
162 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
163 self
164 }
165
166 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
168 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
169 self
170 }
171
172 #[cfg(feature = "cse")]
174 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
175 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
176 self
177 }
178
179 #[cfg(feature = "cse")]
181 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
182 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
183 self
184 }
185
186 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
188 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
189 self
190 }
191
192 #[cfg(feature = "new_streaming")]
193 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
194 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
195 self
196 }
197
198 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
200 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
201 self
202 }
203
204 pub fn _with_eager(mut self, toggle: bool) -> Self {
206 self.opt_state.set(OptFlags::EAGER, toggle);
207 self
208 }
209
210 pub fn describe_plan(&self) -> PolarsResult<String> {
212 Ok(self.clone().to_alp()?.describe())
213 }
214
215 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
217 Ok(self.clone().to_alp()?.describe_tree_format())
218 }
219
220 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
224 Ok(self.clone().to_alp_optimized()?.describe())
225 }
226
227 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
231 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
232 }
233
234 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
239 if optimized {
240 self.describe_optimized_plan()
241 } else {
242 self.describe_plan()
243 }
244 }
245
246 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
286 let opt_state = self.get_opt_state();
287 let lp = self
288 .get_plan_builder()
289 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
290 .build();
291 Self::from_logical_plan(lp, opt_state)
292 }
293
294 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
314 self,
315 by_exprs: E,
316 sort_options: SortMultipleOptions,
317 ) -> Self {
318 let by_exprs = by_exprs.as_ref().to_vec();
319 if by_exprs.is_empty() {
320 self
321 } else {
322 let opt_state = self.get_opt_state();
323 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
324 Self::from_logical_plan(lp, opt_state)
325 }
326 }
327
328 pub fn top_k<E: AsRef<[Expr]>>(
329 self,
330 k: IdxSize,
331 by_exprs: E,
332 sort_options: SortMultipleOptions,
333 ) -> Self {
334 self.sort_by_exprs(
336 by_exprs,
337 sort_options.with_order_reversed().with_nulls_last(true),
338 )
339 .slice(0, k)
340 }
341
342 pub fn bottom_k<E: AsRef<[Expr]>>(
343 self,
344 k: IdxSize,
345 by_exprs: E,
346 sort_options: SortMultipleOptions,
347 ) -> Self {
348 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
350 .slice(0, k)
351 }
352
353 pub fn reverse(self) -> Self {
369 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
370 }
371
372 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
380 where
381 I: IntoIterator<Item = T>,
382 J: IntoIterator<Item = S>,
383 T: AsRef<str>,
384 S: AsRef<str>,
385 {
386 let iter = existing.into_iter();
387 let cap = iter.size_hint().0;
388 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
389 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390
391 for (existing, new) in iter.zip(new) {
394 let existing = existing.as_ref();
395 let new = new.as_ref();
396 if new != existing {
397 existing_vec.push(existing.into());
398 new_vec.push(new.into());
399 }
400 }
401
402 self.map_private(DslFunction::Rename {
403 existing: existing_vec.into(),
404 new: new_vec.into(),
405 strict,
406 })
407 }
408
409 pub fn drop(self, columns: Selector) -> Self {
416 let opt_state = self.get_opt_state();
417 let lp = self.get_plan_builder().drop(columns).build();
418 Self::from_logical_plan(lp, opt_state)
419 }
420
421 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
426 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
427 }
428
429 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
434 self.select(vec![
435 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
436 ])
437 }
438
439 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
441 let opt_state = self.get_opt_state();
442 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
443 Self::from_logical_plan(lp, opt_state)
444 }
445
446 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
448 let opt_state = self.get_opt_state();
449 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
450 Self::from_logical_plan(lp, opt_state)
451 }
452
453 pub fn cache(self) -> Self {
457 let opt_state = self.get_opt_state();
458 let lp = self.get_plan_builder().cache().build();
459 Self::from_logical_plan(lp, opt_state)
460 }
461
462 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
464 let cast_cols: Vec<Expr> = dtypes
465 .into_iter()
466 .map(|(name, dt)| {
467 let name = PlSmallStr::from_str(name);
468
469 if strict {
470 col(name).strict_cast(dt)
471 } else {
472 col(name).cast(dt)
473 }
474 })
475 .collect();
476
477 if cast_cols.is_empty() {
478 self
479 } else {
480 self.with_columns(cast_cols)
481 }
482 }
483
484 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
486 self.with_columns(vec![if strict {
487 col(PlSmallStr::from_static("*")).strict_cast(dtype)
488 } else {
489 col(PlSmallStr::from_static("*")).cast(dtype)
490 }])
491 }
492
493 pub fn optimize(
494 self,
495 lp_arena: &mut Arena<IR>,
496 expr_arena: &mut Arena<AExpr>,
497 ) -> PolarsResult<Node> {
498 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
499 }
500
501 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
502 let (mut lp_arena, mut expr_arena) = self.get_arenas();
503 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
504
505 Ok(IRPlan::new(node, lp_arena, expr_arena))
506 }
507
508 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
509 let (mut lp_arena, mut expr_arena) = self.get_arenas();
510 let node = to_alp(
511 self.logical_plan,
512 &mut expr_arena,
513 &mut lp_arena,
514 &mut self.opt_state,
515 )?;
516 let plan = IRPlan::new(node, lp_arena, expr_arena);
517 Ok(plan)
518 }
519
520 pub(crate) fn optimize_with_scratch(
521 self,
522 lp_arena: &mut Arena<IR>,
523 expr_arena: &mut Arena<AExpr>,
524 scratch: &mut Vec<Node>,
525 ) -> PolarsResult<Node> {
526 #[allow(unused_mut)]
527 let mut opt_state = self.opt_state;
528 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
529
530 #[cfg(feature = "cse")]
531 if new_streaming {
532 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
535 }
536
537 let lp_top = optimize(
538 self.logical_plan,
539 opt_state,
540 lp_arena,
541 expr_arena,
542 scratch,
543 apply_scan_predicate_to_scan_ir,
544 )?;
545
546 Ok(lp_top)
547 }
548
549 fn prepare_collect_post_opt<P>(
550 mut self,
551 check_sink: bool,
552 query_start: Option<std::time::Instant>,
553 post_opt: P,
554 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
555 where
556 P: FnOnce(
557 Node,
558 &mut Arena<IR>,
559 &mut Arena<AExpr>,
560 Option<std::time::Duration>,
561 ) -> PolarsResult<()>,
562 {
563 let (mut lp_arena, mut expr_arena) = self.get_arenas();
564
565 let mut scratch = vec![];
566 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
567
568 post_opt(
569 lp_top,
570 &mut lp_arena,
571 &mut expr_arena,
572 query_start.map(|s| s.elapsed()),
575 )?;
576
577 let no_file_sink = if check_sink {
579 !matches!(
580 lp_arena.get(lp_top),
581 IR::Sink {
582 payload: SinkTypeIR::File { .. },
583 ..
584 }
585 )
586 } else {
587 true
588 };
589 let physical_plan = create_physical_plan(
590 lp_top,
591 &mut lp_arena,
592 &mut expr_arena,
593 BUILD_STREAMING_EXECUTOR,
594 )?;
595
596 let state = ExecutionState::new();
597 Ok((state, physical_plan, no_file_sink))
598 }
599
600 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
602 where
603 P: FnOnce(
604 Node,
605 &mut Arena<IR>,
606 &mut Arena<AExpr>,
607 Option<std::time::Duration>,
608 ) -> PolarsResult<()>,
609 {
610 let (mut state, mut physical_plan, _) =
611 self.prepare_collect_post_opt(false, None, post_opt)?;
612 physical_plan.execute(&mut state)
613 }
614
615 #[allow(unused_mut)]
616 fn prepare_collect(
617 self,
618 check_sink: bool,
619 query_start: Option<std::time::Instant>,
620 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
621 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
622 }
623
624 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
629 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
630 payload.clone()
631 } else {
632 self.logical_plan = DslPlan::Sink {
633 input: Arc::new(self.logical_plan),
634 payload: SinkType::Memory,
635 };
636 SinkType::Memory
637 };
638
639 if engine == Engine::Auto {
641 engine = match payload {
642 #[cfg(feature = "new_streaming")]
643 SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
644 _ => Engine::InMemory,
645 };
646 }
647 if engine == Engine::Gpu {
649 engine = Engine::InMemory;
650 }
651
652 #[cfg(feature = "new_streaming")]
653 {
654 if let Some(result) = self.try_new_streaming_if_requested() {
655 return result.map(|v| v.unwrap_single());
656 }
657 }
658
659 match engine {
660 Engine::Auto => unreachable!(),
661 Engine::Streaming => {
662 feature_gated!("new_streaming", self = self.with_new_streaming(true))
663 },
664 _ => {},
665 }
666 let mut alp_plan = self.clone().to_alp_optimized()?;
667
668 match engine {
669 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
670 let result = polars_stream::run_query(
671 alp_plan.lp_top,
672 &mut alp_plan.lp_arena,
673 &mut alp_plan.expr_arena,
674 );
675 result.map(|v| v.unwrap_single())
676 }),
677 Engine::Gpu => {
678 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
679 },
680 Engine::InMemory => {
681 let mut physical_plan = create_physical_plan(
682 alp_plan.lp_top,
683 &mut alp_plan.lp_arena,
684 &mut alp_plan.expr_arena,
685 BUILD_STREAMING_EXECUTOR,
686 )?;
687 let mut state = ExecutionState::new();
688 physical_plan.execute(&mut state)
689 },
690 }
691 }
692
693 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
694 let sink_multiple = LazyFrame {
695 logical_plan: DslPlan::SinkMultiple { inputs: plans },
696 opt_state,
697 cached_arena: Default::default(),
698 };
699 sink_multiple.explain(true)
700 }
701
702 pub fn collect_all_with_engine(
703 plans: Vec<DslPlan>,
704 mut engine: Engine,
705 opt_state: OptFlags,
706 ) -> PolarsResult<Vec<DataFrame>> {
707 if plans.is_empty() {
708 return Ok(Vec::new());
709 }
710
711 if engine == Engine::Auto {
713 engine = Engine::InMemory;
714 }
715 if engine == Engine::Gpu {
717 engine = Engine::InMemory;
718 }
719
720 let mut sink_multiple = LazyFrame {
721 logical_plan: DslPlan::SinkMultiple { inputs: plans },
722 opt_state,
723 cached_arena: Default::default(),
724 };
725
726 #[cfg(feature = "new_streaming")]
727 {
728 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
729 return result.map(|v| v.unwrap_multiple());
730 }
731 }
732
733 match engine {
734 Engine::Auto => unreachable!(),
735 Engine::Streaming => {
736 feature_gated!(
737 "new_streaming",
738 sink_multiple = sink_multiple.with_new_streaming(true)
739 )
740 },
741 _ => {},
742 }
743 let mut alp_plan = sink_multiple.to_alp_optimized()?;
744
745 if engine == Engine::Streaming {
746 feature_gated!("new_streaming", {
747 let result = polars_stream::run_query(
748 alp_plan.lp_top,
749 &mut alp_plan.lp_arena,
750 &mut alp_plan.expr_arena,
751 );
752 return result.map(|v| v.unwrap_multiple());
753 });
754 }
755
756 let IR::SinkMultiple { inputs } = alp_plan.root() else {
757 unreachable!()
758 };
759
760 let mut multiplan = create_multiple_physical_plans(
761 inputs.clone().as_slice(),
762 &mut alp_plan.lp_arena,
763 &mut alp_plan.expr_arena,
764 BUILD_STREAMING_EXECUTOR,
765 )?;
766
767 match engine {
768 Engine::Gpu => polars_bail!(
769 InvalidOperation: "collect_all is not supported for the gpu engine"
770 ),
771 Engine::InMemory => {
772 let mut state = ExecutionState::new();
776 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
777 cache_prefiller.execute(&mut state)?;
778 }
779 let out = POOL.install(|| {
780 multiplan
781 .physical_plans
782 .chunks_mut(POOL.current_num_threads() * 3)
783 .map(|chunk| {
784 chunk
785 .into_par_iter()
786 .enumerate()
787 .map(|(idx, input)| {
788 let mut input = std::mem::take(input);
789 let mut state = state.split();
790 state.branch_idx += idx;
791
792 let df = input.execute(&mut state)?;
793 Ok(df)
794 })
795 .collect::<PolarsResult<Vec<_>>>()
796 })
797 .collect::<PolarsResult<Vec<_>>>()
798 });
799 Ok(out?.into_iter().flatten().collect())
800 },
801 _ => unreachable!(),
802 }
803 }
804
805 pub fn collect(self) -> PolarsResult<DataFrame> {
823 self.collect_with_engine(Engine::InMemory)
824 }
825
826 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
829 where
830 P: FnOnce(
831 Node,
832 &mut Arena<IR>,
833 &mut Arena<AExpr>,
834 Option<std::time::Duration>,
835 ) -> PolarsResult<()>,
836 {
837 let query_start = std::time::Instant::now();
838 let (mut state, mut physical_plan, _) =
839 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
840 state.time_nodes(query_start);
841 let out = physical_plan.execute(&mut state)?;
842 let timer_df = state.finish_timer()?;
843 Ok((out, timer_df))
844 }
845
846 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
854 self._profile_post_opt(|_, _, _, _| Ok(()))
855 }
856
857 pub fn sink_batches(
858 mut self,
859 function: PlanCallback<DataFrame, bool>,
860 maintain_order: bool,
861 chunk_size: Option<NonZeroUsize>,
862 ) -> PolarsResult<Self> {
863 use polars_plan::prelude::sink::CallbackSinkType;
864
865 polars_ensure!(
866 !matches!(self.logical_plan, DslPlan::Sink { .. }),
867 InvalidOperation: "cannot create a sink on top of another sink"
868 );
869
870 self.logical_plan = DslPlan::Sink {
871 input: Arc::new(self.logical_plan),
872 payload: SinkType::Callback(CallbackSinkType {
873 function,
874 maintain_order,
875 chunk_size,
876 }),
877 };
878
879 Ok(self)
880 }
881
882 #[cfg(feature = "new_streaming")]
883 pub fn try_new_streaming_if_requested(
884 &mut self,
885 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
886 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
887 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
888
889 if auto_new_streaming || force_new_streaming {
890 let mut new_stream_lazy = self.clone();
893 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
894 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
895 Ok(v) => v,
896 Err(e) => return Some(Err(e)),
897 };
898
899 let f = || {
900 polars_stream::run_query(
901 alp_plan.lp_top,
902 &mut alp_plan.lp_arena,
903 &mut alp_plan.expr_arena,
904 )
905 };
906
907 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
908 Ok(v) => return Some(v),
909 Err(e) => {
910 if !force_new_streaming
913 && auto_new_streaming
914 && e.downcast_ref::<&str>()
915 .map(|s| s.starts_with("not yet implemented"))
916 .unwrap_or(false)
917 {
918 if polars_core::config::verbose() {
919 eprintln!(
920 "caught unimplemented error in new streaming engine, falling back to normal engine"
921 );
922 }
923 } else {
924 std::panic::resume_unwind(e);
925 }
926 },
927 }
928 }
929
930 None
931 }
932
933 pub fn sink(
934 mut self,
935 sink_type: SinkDestination,
936 file_format: impl Into<Box<FileType>>,
937 unified_sink_args: UnifiedSinkArgs,
938 ) -> PolarsResult<Self> {
939 polars_ensure!(
940 !matches!(self.logical_plan, DslPlan::Sink { .. }),
941 InvalidOperation: "cannot create a sink on top of another sink"
942 );
943
944 self.logical_plan = DslPlan::Sink {
945 input: Arc::new(self.logical_plan),
946 payload: match sink_type {
947 SinkDestination::File { target } => SinkType::File(FileSinkOptions {
948 target,
949 file_format: file_format.into(),
950 unified_sink_args,
951 }),
952 SinkDestination::Partitioned {
953 base_path,
954 file_path_provider,
955 partition_strategy,
956 finish_callback,
957 } => SinkType::Partitioned(PartitionedSinkOptions {
958 base_path,
959 file_path_provider,
960 partition_strategy,
961 finish_callback,
962 file_format: file_format.into(),
963 unified_sink_args,
964 }),
965 },
966 };
967 Ok(self)
968 }
969
970 pub fn filter(self, predicate: Expr) -> Self {
988 let opt_state = self.get_opt_state();
989 let lp = self.get_plan_builder().filter(predicate).build();
990 Self::from_logical_plan(lp, opt_state)
991 }
992
993 pub fn remove(self, predicate: Expr) -> Self {
1011 self.filter(predicate.neq_missing(lit(true)))
1012 }
1013
1014 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1040 let exprs = exprs.as_ref().to_vec();
1041 self.select_impl(
1042 exprs,
1043 ProjectionOptions {
1044 run_parallel: true,
1045 duplicate_check: true,
1046 should_broadcast: true,
1047 },
1048 )
1049 }
1050
1051 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1052 let exprs = exprs.as_ref().to_vec();
1053 self.select_impl(
1054 exprs,
1055 ProjectionOptions {
1056 run_parallel: false,
1057 duplicate_check: true,
1058 should_broadcast: true,
1059 },
1060 )
1061 }
1062
1063 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1064 let opt_state = self.get_opt_state();
1065 let lp = self.get_plan_builder().project(exprs, options).build();
1066 Self::from_logical_plan(lp, opt_state)
1067 }
1068
1069 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1090 let keys = by
1091 .as_ref()
1092 .iter()
1093 .map(|e| e.clone().into())
1094 .collect::<Vec<_>>();
1095 let opt_state = self.get_opt_state();
1096
1097 #[cfg(feature = "dynamic_group_by")]
1098 {
1099 LazyGroupBy {
1100 logical_plan: self.logical_plan,
1101 opt_state,
1102 keys,
1103 predicates: vec![],
1104 maintain_order: false,
1105 dynamic_options: None,
1106 rolling_options: None,
1107 }
1108 }
1109
1110 #[cfg(not(feature = "dynamic_group_by"))]
1111 {
1112 LazyGroupBy {
1113 logical_plan: self.logical_plan,
1114 opt_state,
1115 keys,
1116 predicates: vec![],
1117 maintain_order: false,
1118 }
1119 }
1120 }
1121
1122 #[cfg(feature = "dynamic_group_by")]
1130 pub fn rolling<E: AsRef<[Expr]>>(
1131 mut self,
1132 index_column: Expr,
1133 group_by: E,
1134 mut options: RollingGroupOptions,
1135 ) -> LazyGroupBy {
1136 if let Expr::Column(name) = index_column {
1137 options.index_column = name;
1138 } else {
1139 let output_field = index_column
1140 .to_field(&self.collect_schema().unwrap())
1141 .unwrap();
1142 return self.with_column(index_column).rolling(
1143 Expr::Column(output_field.name().clone()),
1144 group_by,
1145 options,
1146 );
1147 }
1148 let opt_state = self.get_opt_state();
1149 LazyGroupBy {
1150 logical_plan: self.logical_plan,
1151 opt_state,
1152 predicates: vec![],
1153 keys: group_by.as_ref().to_vec(),
1154 maintain_order: true,
1155 dynamic_options: None,
1156 rolling_options: Some(options),
1157 }
1158 }
1159
1160 #[cfg(feature = "dynamic_group_by")]
1176 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1177 mut self,
1178 index_column: Expr,
1179 group_by: E,
1180 mut options: DynamicGroupOptions,
1181 ) -> LazyGroupBy {
1182 if let Expr::Column(name) = index_column {
1183 options.index_column = name;
1184 } else {
1185 let output_field = index_column
1186 .to_field(&self.collect_schema().unwrap())
1187 .unwrap();
1188 return self.with_column(index_column).group_by_dynamic(
1189 Expr::Column(output_field.name().clone()),
1190 group_by,
1191 options,
1192 );
1193 }
1194 let opt_state = self.get_opt_state();
1195 LazyGroupBy {
1196 logical_plan: self.logical_plan,
1197 opt_state,
1198 predicates: vec![],
1199 keys: group_by.as_ref().to_vec(),
1200 maintain_order: true,
1201 dynamic_options: Some(options),
1202 rolling_options: None,
1203 }
1204 }
1205
1206 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1208 let keys = by
1209 .as_ref()
1210 .iter()
1211 .map(|e| e.clone().into())
1212 .collect::<Vec<_>>();
1213 let opt_state = self.get_opt_state();
1214
1215 #[cfg(feature = "dynamic_group_by")]
1216 {
1217 LazyGroupBy {
1218 logical_plan: self.logical_plan,
1219 opt_state,
1220 keys,
1221 predicates: vec![],
1222 maintain_order: true,
1223 dynamic_options: None,
1224 rolling_options: None,
1225 }
1226 }
1227
1228 #[cfg(not(feature = "dynamic_group_by"))]
1229 {
1230 LazyGroupBy {
1231 logical_plan: self.logical_plan,
1232 opt_state,
1233 keys,
1234 predicates: vec![],
1235 maintain_order: true,
1236 }
1237 }
1238 }
1239
1240 #[cfg(feature = "semi_anti_join")]
1257 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1258 self.join(
1259 other,
1260 [left_on.into()],
1261 [right_on.into()],
1262 JoinArgs::new(JoinType::Anti),
1263 )
1264 }
1265
1266 #[cfg(feature = "cross_join")]
1268 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1269 self.join(
1270 other,
1271 vec![],
1272 vec![],
1273 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1274 )
1275 }
1276
1277 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1294 self.join(
1295 other,
1296 [left_on.into()],
1297 [right_on.into()],
1298 JoinArgs::new(JoinType::Left),
1299 )
1300 }
1301
1302 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1319 self.join(
1320 other,
1321 [left_on.into()],
1322 [right_on.into()],
1323 JoinArgs::new(JoinType::Inner),
1324 )
1325 }
1326
1327 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1344 self.join(
1345 other,
1346 [left_on.into()],
1347 [right_on.into()],
1348 JoinArgs::new(JoinType::Full),
1349 )
1350 }
1351
1352 #[cfg(feature = "semi_anti_join")]
1369 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1370 self.join(
1371 other,
1372 [left_on.into()],
1373 [right_on.into()],
1374 JoinArgs::new(JoinType::Semi),
1375 )
1376 }
1377
1378 pub fn join<E: AsRef<[Expr]>>(
1400 self,
1401 other: LazyFrame,
1402 left_on: E,
1403 right_on: E,
1404 args: JoinArgs,
1405 ) -> LazyFrame {
1406 let left_on = left_on.as_ref().to_vec();
1407 let right_on = right_on.as_ref().to_vec();
1408
1409 self._join_impl(other, left_on, right_on, args)
1410 }
1411
1412 fn _join_impl(
1413 self,
1414 other: LazyFrame,
1415 left_on: Vec<Expr>,
1416 right_on: Vec<Expr>,
1417 args: JoinArgs,
1418 ) -> LazyFrame {
1419 let JoinArgs {
1420 how,
1421 validation,
1422 suffix,
1423 slice,
1424 nulls_equal,
1425 coalesce,
1426 maintain_order,
1427 } = args;
1428
1429 if slice.is_some() {
1430 panic!("impl error: slice is not handled")
1431 }
1432
1433 let mut builder = self
1434 .join_builder()
1435 .with(other)
1436 .left_on(left_on)
1437 .right_on(right_on)
1438 .how(how)
1439 .validate(validation)
1440 .join_nulls(nulls_equal)
1441 .coalesce(coalesce)
1442 .maintain_order(maintain_order);
1443
1444 if let Some(suffix) = suffix {
1445 builder = builder.suffix(suffix);
1446 }
1447
1448 builder.finish()
1450 }
1451
1452 pub fn join_builder(self) -> JoinBuilder {
1458 JoinBuilder::new(self)
1459 }
1460
1461 pub fn with_column(self, expr: Expr) -> LazyFrame {
1479 let opt_state = self.get_opt_state();
1480 let lp = self
1481 .get_plan_builder()
1482 .with_columns(
1483 vec![expr],
1484 ProjectionOptions {
1485 run_parallel: false,
1486 duplicate_check: true,
1487 should_broadcast: true,
1488 },
1489 )
1490 .build();
1491 Self::from_logical_plan(lp, opt_state)
1492 }
1493
1494 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1509 let exprs = exprs.as_ref().to_vec();
1510 self.with_columns_impl(
1511 exprs,
1512 ProjectionOptions {
1513 run_parallel: true,
1514 duplicate_check: true,
1515 should_broadcast: true,
1516 },
1517 )
1518 }
1519
1520 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1522 let exprs = exprs.as_ref().to_vec();
1523 self.with_columns_impl(
1524 exprs,
1525 ProjectionOptions {
1526 run_parallel: false,
1527 duplicate_check: true,
1528 should_broadcast: true,
1529 },
1530 )
1531 }
1532
1533 pub fn match_to_schema(
1535 self,
1536 schema: SchemaRef,
1537 per_column: Arc<[MatchToSchemaPerColumn]>,
1538 extra_columns: ExtraColumnsPolicy,
1539 ) -> LazyFrame {
1540 let opt_state = self.get_opt_state();
1541 let lp = self
1542 .get_plan_builder()
1543 .match_to_schema(schema, per_column, extra_columns)
1544 .build();
1545 Self::from_logical_plan(lp, opt_state)
1546 }
1547
1548 pub fn pipe_with_schema(
1549 self,
1550 callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1551 ) -> Self {
1552 let opt_state = self.get_opt_state();
1553 let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1554 Self::from_logical_plan(lp, opt_state)
1555 }
1556
1557 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1558 let opt_state = self.get_opt_state();
1559 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1560 Self::from_logical_plan(lp, opt_state)
1561 }
1562
1563 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1564 let contexts = contexts
1565 .as_ref()
1566 .iter()
1567 .map(|lf| lf.logical_plan.clone())
1568 .collect();
1569 let opt_state = self.get_opt_state();
1570 let lp = self.get_plan_builder().with_context(contexts).build();
1571 Self::from_logical_plan(lp, opt_state)
1572 }
1573
1574 pub fn max(self) -> Self {
1578 self.map_private(DslFunction::Stats(StatsFunction::Max))
1579 }
1580
1581 pub fn min(self) -> Self {
1585 self.map_private(DslFunction::Stats(StatsFunction::Min))
1586 }
1587
1588 pub fn sum(self) -> Self {
1598 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1599 }
1600
1601 pub fn mean(self) -> Self {
1606 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1607 }
1608
1609 pub fn median(self) -> Self {
1615 self.map_private(DslFunction::Stats(StatsFunction::Median))
1616 }
1617
1618 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1620 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1621 quantile,
1622 method,
1623 }))
1624 }
1625
1626 pub fn std(self, ddof: u8) -> Self {
1639 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1640 }
1641
1642 pub fn var(self, ddof: u8) -> Self {
1652 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1653 }
1654
1655 pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1657 self.explode_impl(columns, options, false)
1658 }
1659
1660 fn explode_impl(
1662 self,
1663 columns: Selector,
1664 options: ExplodeOptions,
1665 allow_empty: bool,
1666 ) -> LazyFrame {
1667 let opt_state = self.get_opt_state();
1668 let lp = self
1669 .get_plan_builder()
1670 .explode(columns, options, allow_empty)
1671 .build();
1672 Self::from_logical_plan(lp, opt_state)
1673 }
1674
1675 pub fn null_count(self) -> LazyFrame {
1677 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1678 }
1679
1680 pub fn unique_stable(
1685 self,
1686 subset: Option<Selector>,
1687 keep_strategy: UniqueKeepStrategy,
1688 ) -> LazyFrame {
1689 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1690 self.unique_stable_generic(subset, keep_strategy)
1691 }
1692
1693 pub fn unique_stable_generic(
1694 self,
1695 subset: Option<Vec<Expr>>,
1696 keep_strategy: UniqueKeepStrategy,
1697 ) -> LazyFrame {
1698 let opt_state = self.get_opt_state();
1699 let options = DistinctOptionsDSL {
1700 subset,
1701 maintain_order: true,
1702 keep_strategy,
1703 };
1704 let lp = self.get_plan_builder().distinct(options).build();
1705 Self::from_logical_plan(lp, opt_state)
1706 }
1707
1708 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1716 let subset = subset.map(|s| vec![Expr::Selector(s)]);
1717 self.unique_generic(subset, keep_strategy)
1718 }
1719
1720 pub fn unique_generic(
1721 self,
1722 subset: Option<Vec<Expr>>,
1723 keep_strategy: UniqueKeepStrategy,
1724 ) -> LazyFrame {
1725 let opt_state = self.get_opt_state();
1726 let options = DistinctOptionsDSL {
1727 subset,
1728 maintain_order: false,
1729 keep_strategy,
1730 };
1731 let lp = self.get_plan_builder().distinct(options).build();
1732 Self::from_logical_plan(lp, opt_state)
1733 }
1734
1735 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1740 let opt_state = self.get_opt_state();
1741 let lp = self.get_plan_builder().drop_nans(subset).build();
1742 Self::from_logical_plan(lp, opt_state)
1743 }
1744
1745 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1750 let opt_state = self.get_opt_state();
1751 let lp = self.get_plan_builder().drop_nulls(subset).build();
1752 Self::from_logical_plan(lp, opt_state)
1753 }
1754
1755 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1765 let opt_state = self.get_opt_state();
1766 let lp = self.get_plan_builder().slice(offset, len).build();
1767 Self::from_logical_plan(lp, opt_state)
1768 }
1769
1770 pub fn first(self) -> LazyFrame {
1774 self.slice(0, 1)
1775 }
1776
1777 pub fn last(self) -> LazyFrame {
1781 self.slice(-1, 1)
1782 }
1783
1784 pub fn tail(self, n: IdxSize) -> LazyFrame {
1788 let neg_tail = -(n as i64);
1789 self.slice(neg_tail, n)
1790 }
1791
1792 #[cfg(feature = "pivot")]
1793 #[expect(clippy::too_many_arguments)]
1794 pub fn pivot(
1795 self,
1796 on: Selector,
1797 on_columns: Arc<DataFrame>,
1798 index: Selector,
1799 values: Selector,
1800 agg: Expr,
1801 maintain_order: bool,
1802 separator: PlSmallStr,
1803 ) -> LazyFrame {
1804 let opt_state = self.get_opt_state();
1805 let lp = self
1806 .get_plan_builder()
1807 .pivot(
1808 on,
1809 on_columns,
1810 index,
1811 values,
1812 agg,
1813 maintain_order,
1814 separator,
1815 )
1816 .build();
1817 Self::from_logical_plan(lp, opt_state)
1818 }
1819
1820 #[cfg(feature = "pivot")]
1824 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1825 let opt_state = self.get_opt_state();
1826 let lp = self.get_plan_builder().unpivot(args).build();
1827 Self::from_logical_plan(lp, opt_state)
1828 }
1829
1830 pub fn limit(self, n: IdxSize) -> LazyFrame {
1832 self.slice(0, n)
1833 }
1834
1835 pub fn map<F>(
1849 self,
1850 function: F,
1851 optimizations: AllowedOptimizations,
1852 schema: Option<Arc<dyn UdfSchema>>,
1853 name: Option<&'static str>,
1854 ) -> LazyFrame
1855 where
1856 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1857 {
1858 let opt_state = self.get_opt_state();
1859 let lp = self
1860 .get_plan_builder()
1861 .map(
1862 function,
1863 optimizations,
1864 schema,
1865 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1866 )
1867 .build();
1868 Self::from_logical_plan(lp, opt_state)
1869 }
1870
1871 #[cfg(feature = "python")]
1872 pub fn map_python(
1873 self,
1874 function: polars_utils::python_function::PythonFunction,
1875 optimizations: AllowedOptimizations,
1876 schema: Option<SchemaRef>,
1877 validate_output: bool,
1878 ) -> LazyFrame {
1879 let opt_state = self.get_opt_state();
1880 let lp = self
1881 .get_plan_builder()
1882 .map_python(function, optimizations, schema, validate_output)
1883 .build();
1884 Self::from_logical_plan(lp, opt_state)
1885 }
1886
1887 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1888 let opt_state = self.get_opt_state();
1889 let lp = self.get_plan_builder().map_private(function).build();
1890 Self::from_logical_plan(lp, opt_state)
1891 }
1892
1893 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1902 where
1903 S: Into<PlSmallStr>,
1904 {
1905 let name = name.into();
1906
1907 match &self.logical_plan {
1908 v @ DslPlan::Scan {
1909 scan_type,
1910 unified_scan_args,
1911 ..
1912 } if unified_scan_args.row_index.is_none()
1913 && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1914 {
1915 let DslPlan::Scan {
1916 sources,
1917 mut unified_scan_args,
1918 scan_type,
1919 cached_ir: _,
1920 } = v.clone()
1921 else {
1922 unreachable!()
1923 };
1924
1925 unified_scan_args.row_index = Some(RowIndex {
1926 name,
1927 offset: offset.unwrap_or(0),
1928 });
1929
1930 DslPlan::Scan {
1931 sources,
1932 unified_scan_args,
1933 scan_type,
1934 cached_ir: Default::default(),
1935 }
1936 .into()
1937 },
1938 _ => self.map_private(DslFunction::RowIndex { name, offset }),
1939 }
1940 }
1941
1942 pub fn count(self) -> LazyFrame {
1944 self.select(vec![col(PlSmallStr::from_static("*")).count()])
1945 }
1946
1947 #[cfg(feature = "dtype-struct")]
1950 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
1951 self.map_private(DslFunction::Unnest {
1952 columns: cols,
1953 separator,
1954 })
1955 }
1956
1957 #[cfg(feature = "merge_sorted")]
1958 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1959 where
1960 S: Into<PlSmallStr>,
1961 {
1962 let key = key.into();
1963
1964 let lp = DslPlan::MergeSorted {
1965 input_left: Arc::new(self.logical_plan),
1966 input_right: Arc::new(other.logical_plan),
1967 key,
1968 };
1969 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1970 }
1971
1972 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
1973 let lp = DslPlan::MapFunction {
1974 input: Arc::new(self.logical_plan),
1975 function: DslFunction::Hint(hint),
1976 };
1977 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
1978 }
1979}
1980
1981#[derive(Clone)]
1983pub struct LazyGroupBy {
1984 pub logical_plan: DslPlan,
1985 opt_state: OptFlags,
1986 keys: Vec<Expr>,
1987 predicates: Vec<Expr>,
1988 maintain_order: bool,
1989 #[cfg(feature = "dynamic_group_by")]
1990 dynamic_options: Option<DynamicGroupOptions>,
1991 #[cfg(feature = "dynamic_group_by")]
1992 rolling_options: Option<RollingGroupOptions>,
1993}
1994
1995impl From<LazyGroupBy> for LazyFrame {
1996 fn from(lgb: LazyGroupBy) -> Self {
1997 Self {
1998 logical_plan: lgb.logical_plan,
1999 opt_state: lgb.opt_state,
2000 cached_arena: Default::default(),
2001 }
2002 }
2003}
2004
2005impl LazyGroupBy {
2006 pub fn having(mut self, predicate: Expr) -> Self {
2027 self.predicates.push(predicate);
2028 self
2029 }
2030
2031 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2053 #[cfg(feature = "dynamic_group_by")]
2054 let lp = DslBuilder::from(self.logical_plan)
2055 .group_by(
2056 self.keys,
2057 self.predicates,
2058 aggs,
2059 None,
2060 self.maintain_order,
2061 self.dynamic_options,
2062 self.rolling_options,
2063 )
2064 .build();
2065
2066 #[cfg(not(feature = "dynamic_group_by"))]
2067 let lp = DslBuilder::from(self.logical_plan)
2068 .group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2069 .build();
2070 LazyFrame::from_logical_plan(lp, self.opt_state)
2071 }
2072
2073 pub fn head(self, n: Option<usize>) -> LazyFrame {
2075 let keys = self
2076 .keys
2077 .iter()
2078 .filter_map(|expr| expr_output_name(expr).ok())
2079 .collect::<Vec<_>>();
2080
2081 self.agg([all().as_expr().head(n)]).explode_impl(
2082 all() - by_name(keys.iter().cloned(), false),
2083 ExplodeOptions {
2084 empty_as_null: true,
2085 keep_nulls: true,
2086 },
2087 true,
2088 )
2089 }
2090
2091 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2093 let keys = self
2094 .keys
2095 .iter()
2096 .filter_map(|expr| expr_output_name(expr).ok())
2097 .collect::<Vec<_>>();
2098
2099 self.agg([all().as_expr().tail(n)]).explode_impl(
2100 all() - by_name(keys.iter().cloned(), false),
2101 ExplodeOptions {
2102 empty_as_null: true,
2103 keep_nulls: true,
2104 },
2105 true,
2106 )
2107 }
2108
2109 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2114 if !self.predicates.is_empty() {
2115 panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2116 }
2117
2118 #[cfg(feature = "dynamic_group_by")]
2119 let options = GroupbyOptions {
2120 dynamic: self.dynamic_options,
2121 rolling: self.rolling_options,
2122 slice: None,
2123 };
2124
2125 #[cfg(not(feature = "dynamic_group_by"))]
2126 let options = GroupbyOptions { slice: None };
2127
2128 let lp = DslPlan::GroupBy {
2129 input: Arc::new(self.logical_plan),
2130 keys: self.keys,
2131 predicates: vec![],
2132 aggs: vec![],
2133 apply: Some((f, schema)),
2134 maintain_order: self.maintain_order,
2135 options: Arc::new(options),
2136 };
2137 LazyFrame::from_logical_plan(lp, self.opt_state)
2138 }
2139}
2140
2141#[must_use]
2142pub struct JoinBuilder {
2143 lf: LazyFrame,
2144 how: JoinType,
2145 other: Option<LazyFrame>,
2146 left_on: Vec<Expr>,
2147 right_on: Vec<Expr>,
2148 allow_parallel: bool,
2149 force_parallel: bool,
2150 suffix: Option<PlSmallStr>,
2151 validation: JoinValidation,
2152 nulls_equal: bool,
2153 coalesce: JoinCoalesce,
2154 maintain_order: MaintainOrderJoin,
2155}
2156impl JoinBuilder {
2157 pub fn new(lf: LazyFrame) -> Self {
2159 Self {
2160 lf,
2161 other: None,
2162 how: JoinType::Inner,
2163 left_on: vec![],
2164 right_on: vec![],
2165 allow_parallel: true,
2166 force_parallel: false,
2167 suffix: None,
2168 validation: Default::default(),
2169 nulls_equal: false,
2170 coalesce: Default::default(),
2171 maintain_order: Default::default(),
2172 }
2173 }
2174
2175 pub fn with(mut self, other: LazyFrame) -> Self {
2177 self.other = Some(other);
2178 self
2179 }
2180
2181 pub fn how(mut self, how: JoinType) -> Self {
2183 self.how = how;
2184 self
2185 }
2186
2187 pub fn validate(mut self, validation: JoinValidation) -> Self {
2188 self.validation = validation;
2189 self
2190 }
2191
2192 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2196 let on = on.as_ref().to_vec();
2197 self.left_on.clone_from(&on);
2198 self.right_on = on;
2199 self
2200 }
2201
2202 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2206 self.left_on = on.as_ref().to_vec();
2207 self
2208 }
2209
2210 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2214 self.right_on = on.as_ref().to_vec();
2215 self
2216 }
2217
2218 pub fn allow_parallel(mut self, allow: bool) -> Self {
2220 self.allow_parallel = allow;
2221 self
2222 }
2223
2224 pub fn force_parallel(mut self, force: bool) -> Self {
2226 self.force_parallel = force;
2227 self
2228 }
2229
2230 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2232 self.nulls_equal = nulls_equal;
2233 self
2234 }
2235
2236 pub fn suffix<S>(mut self, suffix: S) -> Self
2239 where
2240 S: Into<PlSmallStr>,
2241 {
2242 self.suffix = Some(suffix.into());
2243 self
2244 }
2245
2246 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2248 self.coalesce = coalesce;
2249 self
2250 }
2251
2252 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2254 self.maintain_order = maintain_order;
2255 self
2256 }
2257
2258 pub fn finish(self) -> LazyFrame {
2260 let opt_state = self.lf.opt_state;
2261 let other = self.other.expect("'with' not set in join builder");
2262
2263 let args = JoinArgs {
2264 how: self.how,
2265 validation: self.validation,
2266 suffix: self.suffix,
2267 slice: None,
2268 nulls_equal: self.nulls_equal,
2269 coalesce: self.coalesce,
2270 maintain_order: self.maintain_order,
2271 };
2272
2273 let lp = self
2274 .lf
2275 .get_plan_builder()
2276 .join(
2277 other.logical_plan,
2278 self.left_on,
2279 self.right_on,
2280 JoinOptions {
2281 allow_parallel: self.allow_parallel,
2282 force_parallel: self.force_parallel,
2283 args,
2284 }
2285 .into(),
2286 )
2287 .build();
2288 LazyFrame::from_logical_plan(lp, opt_state)
2289 }
2290
2291 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2293 let opt_state = self.lf.opt_state;
2294 let other = self.other.expect("with not set");
2295
2296 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2298 if let Expr::BinaryExpr {
2299 op: Operator::And,
2300 left,
2301 right,
2302 } = predicate
2303 {
2304 decompose_and((*left).clone(), expanded_predicates);
2305 decompose_and((*right).clone(), expanded_predicates);
2306 } else {
2307 expanded_predicates.push(predicate);
2308 }
2309 }
2310 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2311 for predicate in predicates {
2312 decompose_and(predicate, &mut expanded_predicates);
2313 }
2314 let predicates: Vec<Expr> = expanded_predicates;
2315
2316 #[cfg(feature = "is_between")]
2318 let predicates: Vec<Expr> = {
2319 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2320 for predicate in predicates {
2321 if let Expr::Function {
2322 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2323 input,
2324 ..
2325 } = &predicate
2326 {
2327 if let [expr, lower, upper] = input.as_slice() {
2328 match closed {
2329 ClosedInterval::Both => {
2330 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2331 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2332 },
2333 ClosedInterval::Right => {
2334 expanded_predicates.push(expr.clone().gt(lower.clone()));
2335 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2336 },
2337 ClosedInterval::Left => {
2338 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2339 expanded_predicates.push(expr.clone().lt(upper.clone()));
2340 },
2341 ClosedInterval::None => {
2342 expanded_predicates.push(expr.clone().gt(lower.clone()));
2343 expanded_predicates.push(expr.clone().lt(upper.clone()));
2344 },
2345 }
2346 continue;
2347 }
2348 }
2349 expanded_predicates.push(predicate);
2350 }
2351 expanded_predicates
2352 };
2353
2354 let args = JoinArgs {
2355 how: self.how,
2356 validation: self.validation,
2357 suffix: self.suffix,
2358 slice: None,
2359 nulls_equal: self.nulls_equal,
2360 coalesce: self.coalesce,
2361 maintain_order: self.maintain_order,
2362 };
2363 let options = JoinOptions {
2364 allow_parallel: self.allow_parallel,
2365 force_parallel: self.force_parallel,
2366 args,
2367 };
2368
2369 let lp = DslPlan::Join {
2370 input_left: Arc::new(self.lf.logical_plan),
2371 input_right: Arc::new(other.logical_plan),
2372 left_on: Default::default(),
2373 right_on: Default::default(),
2374 predicates,
2375 options: Arc::from(options),
2376 };
2377
2378 LazyFrame::from_logical_plan(lp, opt_state)
2379 }
2380}
2381
2382pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2383 #[cfg(not(feature = "new_streaming"))]
2384 {
2385 None
2386 }
2387 #[cfg(feature = "new_streaming")]
2388 {
2389 Some(polars_stream::build_streaming_query_executor)
2390 }
2391};