1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::path::PathBuf;
13use std::sync::{Arc, Mutex};
14
15pub use anonymous_scan::*;
16#[cfg(feature = "csv")]
17pub use csv::*;
18#[cfg(not(target_arch = "wasm32"))]
19pub use exitable::*;
20pub use file_list_reader::*;
21#[cfg(feature = "ipc")]
22pub use ipc::*;
23#[cfg(feature = "json")]
24pub use ndjson::*;
25#[cfg(feature = "parquet")]
26pub use parquet::*;
27use polars_compute::rolling::QuantileMethod;
28use polars_core::POOL;
29#[cfg(all(feature = "new_streaming", feature = "dtype-categorical"))]
30use polars_core::StringCacheHolder;
31use polars_core::error::feature_gated;
32use polars_core::prelude::*;
33use polars_expr::{ExpressionConversionState, create_physical_expr};
34use polars_io::RowIndex;
35use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
36use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
37#[cfg(feature = "is_between")]
38use polars_ops::prelude::ClosedInterval;
39pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
40use polars_plan::global::FETCH_ROWS;
41use polars_utils::pl_str::PlSmallStr;
42use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
43
44use crate::frame::cached_arenas::CachedArena;
45#[cfg(feature = "streaming")]
46use crate::physical_plan::streaming::insert_streaming_nodes;
47use crate::prelude::*;
48
49pub trait IntoLazy {
50 fn lazy(self) -> LazyFrame;
51}
52
53impl IntoLazy for DataFrame {
54 fn lazy(self) -> LazyFrame {
56 let lp = DslBuilder::from_existing_df(self).build();
57 LazyFrame {
58 logical_plan: lp,
59 opt_state: Default::default(),
60 cached_arena: Default::default(),
61 }
62 }
63}
64
65impl IntoLazy for LazyFrame {
66 fn lazy(self) -> LazyFrame {
67 self
68 }
69}
70
71#[derive(Clone, Default)]
76#[must_use]
77pub struct LazyFrame {
78 pub logical_plan: DslPlan,
79 pub(crate) opt_state: OptFlags,
80 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
81}
82
83impl From<DslPlan> for LazyFrame {
84 fn from(plan: DslPlan) -> Self {
85 Self {
86 logical_plan: plan,
87 opt_state: OptFlags::default(),
88 cached_arena: Default::default(),
89 }
90 }
91}
92
93impl LazyFrame {
94 pub(crate) fn from_inner(
95 logical_plan: DslPlan,
96 opt_state: OptFlags,
97 cached_arena: Arc<Mutex<Option<CachedArena>>>,
98 ) -> Self {
99 Self {
100 logical_plan,
101 opt_state,
102 cached_arena,
103 }
104 }
105
106 pub(crate) fn get_plan_builder(self) -> DslBuilder {
107 DslBuilder::from(self.logical_plan)
108 }
109
110 fn get_opt_state(&self) -> OptFlags {
111 self.opt_state
112 }
113
114 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
115 LazyFrame {
116 logical_plan,
117 opt_state,
118 cached_arena: Default::default(),
119 }
120 }
121
122 pub fn get_current_optimizations(&self) -> OptFlags {
124 self.opt_state
125 }
126
127 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
129 self.opt_state = opt_state;
130 self
131 }
132
133 pub fn without_optimizations(self) -> Self {
135 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
136 }
137
138 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
140 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
141 self
142 }
143
144 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
146 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
147 self
148 }
149
150 pub fn with_collapse_joins(mut self, toggle: bool) -> Self {
152 self.opt_state.set(OptFlags::COLLAPSE_JOINS, toggle);
153 self
154 }
155
156 pub fn with_check_order(mut self, toggle: bool) -> Self {
159 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
160 self
161 }
162
163 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
165 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
166 self
167 }
168
169 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
171 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
172 self
173 }
174
175 pub fn with_type_check(mut self, toggle: bool) -> Self {
177 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
178 self
179 }
180
181 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
183 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
184 self
185 }
186
187 #[cfg(feature = "cse")]
189 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
190 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
191 self
192 }
193
194 #[cfg(feature = "cse")]
196 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
197 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
198 self
199 }
200
201 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
203 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
204 self
205 }
206
207 #[cfg(feature = "streaming")]
209 pub fn with_streaming(mut self, toggle: bool) -> Self {
210 self.opt_state.set(OptFlags::STREAMING, toggle);
211 self
212 }
213
214 #[cfg(feature = "new_streaming")]
215 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
216 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
217 self
218 }
219
220 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
222 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
223 self
224 }
225
226 pub fn _with_eager(mut self, toggle: bool) -> Self {
228 self.opt_state.set(OptFlags::EAGER, toggle);
229 self
230 }
231
232 pub fn describe_plan(&self) -> PolarsResult<String> {
234 Ok(self.clone().to_alp()?.describe())
235 }
236
237 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
239 Ok(self.clone().to_alp()?.describe_tree_format())
240 }
241
242 fn _describe_to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
245 let (mut lp_arena, mut expr_arena) = self.get_arenas();
246 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], true)?;
247
248 Ok(IRPlan::new(node, lp_arena, expr_arena))
249 }
250
251 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
255 Ok(self.clone()._describe_to_alp_optimized()?.describe())
256 }
257
258 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
262 Ok(self
263 .clone()
264 ._describe_to_alp_optimized()?
265 .describe_tree_format())
266 }
267
268 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
273 if optimized {
274 self.describe_optimized_plan()
275 } else {
276 self.describe_plan()
277 }
278 }
279
280 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
320 let opt_state = self.get_opt_state();
321 let lp = self
322 .get_plan_builder()
323 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
324 .build();
325 Self::from_logical_plan(lp, opt_state)
326 }
327
328 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
348 self,
349 by_exprs: E,
350 sort_options: SortMultipleOptions,
351 ) -> Self {
352 let by_exprs = by_exprs.as_ref().to_vec();
353 if by_exprs.is_empty() {
354 self
355 } else {
356 let opt_state = self.get_opt_state();
357 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
358 Self::from_logical_plan(lp, opt_state)
359 }
360 }
361
362 pub fn top_k<E: AsRef<[Expr]>>(
363 self,
364 k: IdxSize,
365 by_exprs: E,
366 sort_options: SortMultipleOptions,
367 ) -> Self {
368 self.sort_by_exprs(
370 by_exprs,
371 sort_options.with_order_reversed().with_nulls_last(true),
372 )
373 .slice(0, k)
374 }
375
376 pub fn bottom_k<E: AsRef<[Expr]>>(
377 self,
378 k: IdxSize,
379 by_exprs: E,
380 sort_options: SortMultipleOptions,
381 ) -> Self {
382 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
384 .slice(0, k)
385 }
386
387 pub fn reverse(self) -> Self {
403 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
404 }
405
406 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
414 where
415 I: IntoIterator<Item = T>,
416 J: IntoIterator<Item = S>,
417 T: AsRef<str>,
418 S: AsRef<str>,
419 {
420 let iter = existing.into_iter();
421 let cap = iter.size_hint().0;
422 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
423 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
424
425 for (existing, new) in iter.zip(new) {
428 let existing = existing.as_ref();
429 let new = new.as_ref();
430 if new != existing {
431 existing_vec.push(existing.into());
432 new_vec.push(new.into());
433 }
434 }
435
436 self.map_private(DslFunction::Rename {
437 existing: existing_vec.into(),
438 new: new_vec.into(),
439 strict,
440 })
441 }
442
443 fn _drop<I, T>(self, columns: I, strict: bool) -> Self
450 where
451 I: IntoIterator<Item = T>,
452 T: Into<Selector>,
453 {
454 let to_drop = columns.into_iter().map(|c| c.into()).collect();
455
456 let opt_state = self.get_opt_state();
457 let lp = self.get_plan_builder().drop(to_drop, strict).build();
458 Self::from_logical_plan(lp, opt_state)
459 }
460
461 pub fn drop<I, T>(self, columns: I) -> Self
468 where
469 I: IntoIterator<Item = T>,
470 T: Into<Selector>,
471 {
472 self._drop(columns, true)
473 }
474
475 pub fn drop_no_validate<I, T>(self, columns: I) -> Self
481 where
482 I: IntoIterator<Item = T>,
483 T: Into<Selector>,
484 {
485 self._drop(columns, false)
486 }
487
488 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
493 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
494 }
495
496 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
501 self.select(vec![
502 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
503 ])
504 }
505
506 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
508 let opt_state = self.get_opt_state();
509 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
510 Self::from_logical_plan(lp, opt_state)
511 }
512
513 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
515 let opt_state = self.get_opt_state();
516 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
517 Self::from_logical_plan(lp, opt_state)
518 }
519
520 pub fn cache(self) -> Self {
524 let opt_state = self.get_opt_state();
525 let lp = self.get_plan_builder().cache().build();
526 Self::from_logical_plan(lp, opt_state)
527 }
528
529 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
531 let cast_cols: Vec<Expr> = dtypes
532 .into_iter()
533 .map(|(name, dt)| {
534 let name = PlSmallStr::from_str(name);
535
536 if strict {
537 col(name).strict_cast(dt)
538 } else {
539 col(name).cast(dt)
540 }
541 })
542 .collect();
543
544 if cast_cols.is_empty() {
545 self.clone()
546 } else {
547 self.with_columns(cast_cols)
548 }
549 }
550
551 pub fn cast_all(self, dtype: DataType, strict: bool) -> Self {
553 self.with_columns(vec![if strict {
554 col(PlSmallStr::from_static("*")).strict_cast(dtype)
555 } else {
556 col(PlSmallStr::from_static("*")).cast(dtype)
557 }])
558 }
559
560 pub fn fetch(self, n_rows: usize) -> PolarsResult<DataFrame> {
567 FETCH_ROWS.with(|fetch_rows| fetch_rows.set(Some(n_rows)));
568 let res = self.collect();
569 FETCH_ROWS.with(|fetch_rows| fetch_rows.set(None));
570 res
571 }
572
573 pub fn optimize(
574 self,
575 lp_arena: &mut Arena<IR>,
576 expr_arena: &mut Arena<AExpr>,
577 ) -> PolarsResult<Node> {
578 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
579 }
580
581 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
582 let (mut lp_arena, mut expr_arena) = self.get_arenas();
583 let node =
584 self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;
585
586 Ok(IRPlan::new(node, lp_arena, expr_arena))
587 }
588
589 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
590 let (mut lp_arena, mut expr_arena) = self.get_arenas();
591 let node = to_alp(
592 self.logical_plan,
593 &mut expr_arena,
594 &mut lp_arena,
595 &mut self.opt_state,
596 )?;
597 let plan = IRPlan::new(node, lp_arena, expr_arena);
598 Ok(plan)
599 }
600
601 pub(crate) fn optimize_with_scratch(
602 self,
603 lp_arena: &mut Arena<IR>,
604 expr_arena: &mut Arena<AExpr>,
605 scratch: &mut Vec<Node>,
606 enable_fmt: bool,
607 ) -> PolarsResult<Node> {
608 #[allow(unused_mut)]
609 let mut opt_state = self.opt_state;
610 let streaming = self.opt_state.contains(OptFlags::STREAMING);
611 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
612 #[cfg(feature = "cse")]
613 if streaming && !new_streaming {
614 opt_state &= !OptFlags::COMM_SUBPLAN_ELIM;
615 }
616
617 #[cfg(feature = "cse")]
618 if new_streaming {
619 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
622 }
623
624 let lp_top = optimize(
625 self.logical_plan,
626 opt_state,
627 lp_arena,
628 expr_arena,
629 scratch,
630 Some(&|expr, expr_arena, schema| {
631 let phys_expr = create_physical_expr(
632 expr,
633 Context::Default,
634 expr_arena,
635 schema,
636 &mut ExpressionConversionState::new(true),
637 )
638 .ok()?;
639 let io_expr = phys_expr_to_io_expr(phys_expr);
640 Some(io_expr)
641 }),
642 )?;
643
644 if streaming {
645 #[cfg(feature = "streaming")]
646 {
647 insert_streaming_nodes(
648 lp_top,
649 lp_arena,
650 expr_arena,
651 scratch,
652 enable_fmt,
653 true,
654 opt_state.contains(OptFlags::ROW_ESTIMATE),
655 )?;
656 }
657 #[cfg(not(feature = "streaming"))]
658 {
659 _ = enable_fmt;
660 panic!("activate feature 'streaming'")
661 }
662 }
663
664 Ok(lp_top)
665 }
666
667 fn prepare_collect_post_opt<P>(
668 mut self,
669 check_sink: bool,
670 query_start: Option<std::time::Instant>,
671 post_opt: P,
672 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
673 where
674 P: FnOnce(
675 Node,
676 &mut Arena<IR>,
677 &mut Arena<AExpr>,
678 Option<std::time::Duration>,
679 ) -> PolarsResult<()>,
680 {
681 let (mut lp_arena, mut expr_arena) = self.get_arenas();
682
683 let mut scratch = vec![];
684 let lp_top =
685 self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;
686
687 post_opt(
688 lp_top,
689 &mut lp_arena,
690 &mut expr_arena,
691 query_start.map(|s| s.elapsed()),
694 )?;
695
696 let no_file_sink = if check_sink {
698 !matches!(
699 lp_arena.get(lp_top),
700 IR::Sink {
701 payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
702 ..
703 }
704 )
705 } else {
706 true
707 };
708 let physical_plan = create_physical_plan(
709 lp_top,
710 &mut lp_arena,
711 &mut expr_arena,
712 BUILD_STREAMING_EXECUTOR,
713 )?;
714
715 let state = ExecutionState::new();
716 Ok((state, physical_plan, no_file_sink))
717 }
718
719 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
721 where
722 P: FnOnce(
723 Node,
724 &mut Arena<IR>,
725 &mut Arena<AExpr>,
726 Option<std::time::Duration>,
727 ) -> PolarsResult<()>,
728 {
729 let (mut state, mut physical_plan, _) =
730 self.prepare_collect_post_opt(false, None, post_opt)?;
731 physical_plan.execute(&mut state)
732 }
733
734 #[allow(unused_mut)]
735 fn prepare_collect(
736 self,
737 check_sink: bool,
738 query_start: Option<std::time::Instant>,
739 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
740 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
741 }
742
743 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
748 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
749 payload.clone()
750 } else {
751 self.logical_plan = DslPlan::Sink {
752 input: Arc::new(self.logical_plan),
753 payload: SinkType::Memory,
754 };
755 SinkType::Memory
756 };
757
758 if engine == Engine::Auto {
760 engine = match payload {
761 #[cfg(feature = "new_streaming")]
762 SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
763 _ => Engine::InMemory,
764 };
765 }
766 if engine == Engine::Gpu {
768 engine = Engine::InMemory;
769 }
770
771 #[cfg(feature = "new_streaming")]
772 {
773 if let Some(result) = self.try_new_streaming_if_requested() {
774 return result.map(|v| v.unwrap_single());
775 }
776 }
777
778 match engine {
779 Engine::Auto => unreachable!(),
780 Engine::Streaming => {
781 feature_gated!("new_streaming", self = self.with_new_streaming(true))
782 },
783 Engine::OldStreaming => feature_gated!("streaming", self = self.with_streaming(true)),
784 _ => {},
785 }
786 let mut alp_plan = self.clone().to_alp_optimized()?;
787
788 match engine {
789 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
790 #[cfg(feature = "dtype-categorical")]
791 let string_cache_hold = StringCacheHolder::hold();
792 let result = polars_stream::run_query(
793 alp_plan.lp_top,
794 &mut alp_plan.lp_arena,
795 &mut alp_plan.expr_arena,
796 );
797 #[cfg(feature = "dtype-categorical")]
798 drop(string_cache_hold);
799 result.map(|v| v.unwrap_single())
800 }),
801 _ if matches!(payload, SinkType::Partition { .. }) => Err(polars_err!(
802 InvalidOperation: "partition sinks are not supported on for the '{}' engine",
803 engine.into_static_str()
804 )),
805 Engine::Gpu => {
806 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
807 },
808 Engine::InMemory => {
809 let mut physical_plan = create_physical_plan(
810 alp_plan.lp_top,
811 &mut alp_plan.lp_arena,
812 &mut alp_plan.expr_arena,
813 BUILD_STREAMING_EXECUTOR,
814 )?;
815 let mut state = ExecutionState::new();
816 physical_plan.execute(&mut state)
817 },
818 Engine::OldStreaming => {
819 self.opt_state |= OptFlags::STREAMING;
820 let (mut state, mut physical_plan, is_streaming) =
821 self.prepare_collect(true, None)?;
822 polars_ensure!(
823 is_streaming,
824 ComputeError: format!("cannot run the whole query in a streaming order")
825 );
826 physical_plan.execute(&mut state)
827 },
828 }
829 }
830
831 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
832 let sink_multiple = LazyFrame {
833 logical_plan: DslPlan::SinkMultiple { inputs: plans },
834 opt_state,
835 cached_arena: Default::default(),
836 };
837 sink_multiple.explain(true)
838 }
839
840 pub fn collect_all_with_engine(
841 plans: Vec<DslPlan>,
842 mut engine: Engine,
843 opt_state: OptFlags,
844 ) -> PolarsResult<Vec<DataFrame>> {
845 if plans.is_empty() {
846 return Ok(Vec::new());
847 }
848
849 if engine == Engine::Auto {
851 engine = Engine::InMemory;
852 }
853 if engine == Engine::Gpu {
855 engine = Engine::InMemory;
856 }
857
858 let mut sink_multiple = LazyFrame {
859 logical_plan: DslPlan::SinkMultiple { inputs: plans },
860 opt_state,
861 cached_arena: Default::default(),
862 };
863
864 #[cfg(feature = "new_streaming")]
865 {
866 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
867 return result.map(|v| v.unwrap_multiple());
868 }
869 }
870
871 match engine {
872 Engine::Auto => unreachable!(),
873 Engine::Streaming => {
874 feature_gated!(
875 "new_streaming",
876 sink_multiple = sink_multiple.with_new_streaming(true)
877 )
878 },
879 Engine::OldStreaming => feature_gated!(
880 "streaming",
881 sink_multiple = sink_multiple.with_streaming(true)
882 ),
883 _ => {},
884 }
885 let mut alp_plan = sink_multiple.to_alp_optimized()?;
886
887 if engine == Engine::Streaming {
888 feature_gated!("new_streaming", {
889 #[cfg(feature = "dtype-categorical")]
890 let string_cache_hold = StringCacheHolder::hold();
891 let result = polars_stream::run_query(
892 alp_plan.lp_top,
893 &mut alp_plan.lp_arena,
894 &mut alp_plan.expr_arena,
895 );
896 #[cfg(feature = "dtype-categorical")]
897 drop(string_cache_hold);
898 return result.map(|v| v.unwrap_multiple());
899 });
900 }
901
902 let IR::SinkMultiple { inputs } = alp_plan.root() else {
903 unreachable!()
904 };
905
906 let mut multiplan = create_multiple_physical_plans(
907 inputs.clone().as_slice(),
908 &mut alp_plan.lp_arena,
909 &mut alp_plan.expr_arena,
910 BUILD_STREAMING_EXECUTOR,
911 )?;
912
913 match engine {
914 Engine::Gpu => polars_bail!(
915 InvalidOperation: "collect_all is not supported for the gpu engine"
916 ),
917 Engine::InMemory => {
918 let mut state = ExecutionState::new();
922 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
923 cache_prefiller.execute(&mut state)?;
924 }
925 let out = POOL.install(|| {
926 multiplan
927 .physical_plans
928 .chunks_mut(POOL.current_num_threads() * 3)
929 .map(|chunk| {
930 chunk
931 .into_par_iter()
932 .enumerate()
933 .map(|(idx, input)| {
934 let mut input = std::mem::take(input);
935 let mut state = state.split();
936 state.branch_idx += idx;
937
938 let df = input.execute(&mut state)?;
939 Ok(df)
940 })
941 .collect::<PolarsResult<Vec<_>>>()
942 })
943 .collect::<PolarsResult<Vec<_>>>()
944 });
945 Ok(out?.into_iter().flatten().collect())
946 },
947 Engine::OldStreaming => panic!("This is no longer supported"),
948 _ => unreachable!(),
949 }
950 }
951
952 pub fn collect(self) -> PolarsResult<DataFrame> {
970 self.collect_with_engine(Engine::InMemory)
971 }
972
973 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
976 where
977 P: FnOnce(
978 Node,
979 &mut Arena<IR>,
980 &mut Arena<AExpr>,
981 Option<std::time::Duration>,
982 ) -> PolarsResult<()>,
983 {
984 let query_start = std::time::Instant::now();
985 let (mut state, mut physical_plan, _) =
986 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
987 state.time_nodes(query_start);
988 let out = physical_plan.execute(&mut state)?;
989 let timer_df = state.finish_timer()?;
990 Ok((out, timer_df))
991 }
992
993 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
1001 self._profile_post_opt(|_, _, _, _| Ok(()))
1002 }
1003
1004 #[cfg(feature = "parquet")]
1008 pub fn sink_parquet(
1009 self,
1010 target: SinkTarget,
1011 options: ParquetWriteOptions,
1012 cloud_options: Option<polars_io::cloud::CloudOptions>,
1013 sink_options: SinkOptions,
1014 ) -> PolarsResult<Self> {
1015 self.sink(SinkType::File(FileSinkType {
1016 target,
1017 sink_options,
1018 file_type: FileType::Parquet(options),
1019 cloud_options,
1020 }))
1021 }
1022
1023 #[cfg(feature = "ipc")]
1027 pub fn sink_ipc(
1028 self,
1029 target: SinkTarget,
1030 options: IpcWriterOptions,
1031 cloud_options: Option<polars_io::cloud::CloudOptions>,
1032 sink_options: SinkOptions,
1033 ) -> PolarsResult<Self> {
1034 self.sink(SinkType::File(FileSinkType {
1035 target,
1036 sink_options,
1037 file_type: FileType::Ipc(options),
1038 cloud_options,
1039 }))
1040 }
1041
1042 #[cfg(feature = "csv")]
1046 pub fn sink_csv(
1047 self,
1048 target: SinkTarget,
1049 options: CsvWriterOptions,
1050 cloud_options: Option<polars_io::cloud::CloudOptions>,
1051 sink_options: SinkOptions,
1052 ) -> PolarsResult<Self> {
1053 self.sink(SinkType::File(FileSinkType {
1054 target,
1055 sink_options,
1056 file_type: FileType::Csv(options),
1057 cloud_options,
1058 }))
1059 }
1060
1061 #[cfg(feature = "json")]
1065 pub fn sink_json(
1066 self,
1067 target: SinkTarget,
1068 options: JsonWriterOptions,
1069 cloud_options: Option<polars_io::cloud::CloudOptions>,
1070 sink_options: SinkOptions,
1071 ) -> PolarsResult<Self> {
1072 self.sink(SinkType::File(FileSinkType {
1073 target,
1074 sink_options,
1075 file_type: FileType::Json(options),
1076 cloud_options,
1077 }))
1078 }
1079
1080 #[cfg(feature = "parquet")]
1084 #[allow(clippy::too_many_arguments)]
1085 pub fn sink_parquet_partitioned(
1086 self,
1087 base_path: Arc<PathBuf>,
1088 file_path_cb: Option<PartitionTargetCallback>,
1089 variant: PartitionVariant,
1090 options: ParquetWriteOptions,
1091 cloud_options: Option<polars_io::cloud::CloudOptions>,
1092 sink_options: SinkOptions,
1093 per_partition_sort_by: Option<Vec<SortColumn>>,
1094 finish_callback: Option<SinkFinishCallback>,
1095 ) -> PolarsResult<Self> {
1096 self.sink(SinkType::Partition(PartitionSinkType {
1097 base_path,
1098 file_path_cb,
1099 sink_options,
1100 variant,
1101 file_type: FileType::Parquet(options),
1102 cloud_options,
1103 per_partition_sort_by,
1104 finish_callback,
1105 }))
1106 }
1107
1108 #[cfg(feature = "ipc")]
1112 #[allow(clippy::too_many_arguments)]
1113 pub fn sink_ipc_partitioned(
1114 self,
1115 base_path: Arc<PathBuf>,
1116 file_path_cb: Option<PartitionTargetCallback>,
1117 variant: PartitionVariant,
1118 options: IpcWriterOptions,
1119 cloud_options: Option<polars_io::cloud::CloudOptions>,
1120 sink_options: SinkOptions,
1121 per_partition_sort_by: Option<Vec<SortColumn>>,
1122 finish_callback: Option<SinkFinishCallback>,
1123 ) -> PolarsResult<Self> {
1124 self.sink(SinkType::Partition(PartitionSinkType {
1125 base_path,
1126 file_path_cb,
1127 sink_options,
1128 variant,
1129 file_type: FileType::Ipc(options),
1130 cloud_options,
1131 per_partition_sort_by,
1132 finish_callback,
1133 }))
1134 }
1135
1136 #[cfg(feature = "csv")]
1140 #[allow(clippy::too_many_arguments)]
1141 pub fn sink_csv_partitioned(
1142 self,
1143 base_path: Arc<PathBuf>,
1144 file_path_cb: Option<PartitionTargetCallback>,
1145 variant: PartitionVariant,
1146 options: CsvWriterOptions,
1147 cloud_options: Option<polars_io::cloud::CloudOptions>,
1148 sink_options: SinkOptions,
1149 per_partition_sort_by: Option<Vec<SortColumn>>,
1150 finish_callback: Option<SinkFinishCallback>,
1151 ) -> PolarsResult<Self> {
1152 self.sink(SinkType::Partition(PartitionSinkType {
1153 base_path,
1154 file_path_cb,
1155 sink_options,
1156 variant,
1157 file_type: FileType::Csv(options),
1158 cloud_options,
1159 per_partition_sort_by,
1160 finish_callback,
1161 }))
1162 }
1163
1164 #[cfg(feature = "json")]
1168 #[allow(clippy::too_many_arguments)]
1169 pub fn sink_json_partitioned(
1170 self,
1171 base_path: Arc<PathBuf>,
1172 file_path_cb: Option<PartitionTargetCallback>,
1173 variant: PartitionVariant,
1174 options: JsonWriterOptions,
1175 cloud_options: Option<polars_io::cloud::CloudOptions>,
1176 sink_options: SinkOptions,
1177 per_partition_sort_by: Option<Vec<SortColumn>>,
1178 finish_callback: Option<SinkFinishCallback>,
1179 ) -> PolarsResult<Self> {
1180 self.sink(SinkType::Partition(PartitionSinkType {
1181 base_path,
1182 file_path_cb,
1183 sink_options,
1184 variant,
1185 file_type: FileType::Json(options),
1186 cloud_options,
1187 per_partition_sort_by,
1188 finish_callback,
1189 }))
1190 }
1191
1192 #[cfg(feature = "new_streaming")]
1193 pub fn try_new_streaming_if_requested(
1194 &mut self,
1195 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1196 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1197 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1198
1199 if auto_new_streaming || force_new_streaming {
1200 let mut new_stream_lazy = self.clone();
1203 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1204 new_stream_lazy.opt_state &= !OptFlags::STREAMING;
1205 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1206 Ok(v) => v,
1207 Err(e) => return Some(Err(e)),
1208 };
1209
1210 #[cfg(feature = "dtype-categorical")]
1211 let _hold = StringCacheHolder::hold();
1212 let f = || {
1213 polars_stream::run_query(
1214 alp_plan.lp_top,
1215 &mut alp_plan.lp_arena,
1216 &mut alp_plan.expr_arena,
1217 )
1218 };
1219
1220 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1221 Ok(v) => return Some(v),
1222 Err(e) => {
1223 if !force_new_streaming
1226 && auto_new_streaming
1227 && e.downcast_ref::<&str>()
1228 .map(|s| s.starts_with("not yet implemented"))
1229 .unwrap_or(false)
1230 {
1231 if polars_core::config::verbose() {
1232 eprintln!(
1233 "caught unimplemented error in new streaming engine, falling back to normal engine"
1234 );
1235 }
1236 } else {
1237 std::panic::resume_unwind(e);
1238 }
1239 },
1240 }
1241 }
1242
1243 None
1244 }
1245
1246 fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1247 polars_ensure!(
1248 !matches!(self.logical_plan, DslPlan::Sink { .. }),
1249 InvalidOperation: "cannot create a sink on top of another sink"
1250 );
1251 self.logical_plan = DslPlan::Sink {
1252 input: Arc::new(self.logical_plan),
1253 payload: payload.clone(),
1254 };
1255 Ok(self)
1256 }
1257
1258 pub fn filter(self, predicate: Expr) -> Self {
1276 let opt_state = self.get_opt_state();
1277 let lp = self.get_plan_builder().filter(predicate).build();
1278 Self::from_logical_plan(lp, opt_state)
1279 }
1280
1281 pub fn remove(self, predicate: Expr) -> Self {
1299 self.filter(predicate.neq_missing(lit(true)))
1300 }
1301
1302 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1328 let exprs = exprs.as_ref().to_vec();
1329 self.select_impl(
1330 exprs,
1331 ProjectionOptions {
1332 run_parallel: true,
1333 duplicate_check: true,
1334 should_broadcast: true,
1335 },
1336 )
1337 }
1338
1339 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1340 let exprs = exprs.as_ref().to_vec();
1341 self.select_impl(
1342 exprs,
1343 ProjectionOptions {
1344 run_parallel: false,
1345 duplicate_check: true,
1346 should_broadcast: true,
1347 },
1348 )
1349 }
1350
1351 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1352 let opt_state = self.get_opt_state();
1353 let lp = self.get_plan_builder().project(exprs, options).build();
1354 Self::from_logical_plan(lp, opt_state)
1355 }
1356
1357 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1378 let keys = by
1379 .as_ref()
1380 .iter()
1381 .map(|e| e.clone().into())
1382 .collect::<Vec<_>>();
1383 let opt_state = self.get_opt_state();
1384
1385 #[cfg(feature = "dynamic_group_by")]
1386 {
1387 LazyGroupBy {
1388 logical_plan: self.logical_plan,
1389 opt_state,
1390 keys,
1391 maintain_order: false,
1392 dynamic_options: None,
1393 rolling_options: None,
1394 }
1395 }
1396
1397 #[cfg(not(feature = "dynamic_group_by"))]
1398 {
1399 LazyGroupBy {
1400 logical_plan: self.logical_plan,
1401 opt_state,
1402 keys,
1403 maintain_order: false,
1404 }
1405 }
1406 }
1407
1408 #[cfg(feature = "dynamic_group_by")]
1416 pub fn rolling<E: AsRef<[Expr]>>(
1417 mut self,
1418 index_column: Expr,
1419 group_by: E,
1420 mut options: RollingGroupOptions,
1421 ) -> LazyGroupBy {
1422 if let Expr::Column(name) = index_column {
1423 options.index_column = name;
1424 } else {
1425 let output_field = index_column
1426 .to_field(&self.collect_schema().unwrap(), Context::Default)
1427 .unwrap();
1428 return self.with_column(index_column).rolling(
1429 Expr::Column(output_field.name().clone()),
1430 group_by,
1431 options,
1432 );
1433 }
1434 let opt_state = self.get_opt_state();
1435 LazyGroupBy {
1436 logical_plan: self.logical_plan,
1437 opt_state,
1438 keys: group_by.as_ref().to_vec(),
1439 maintain_order: true,
1440 dynamic_options: None,
1441 rolling_options: Some(options),
1442 }
1443 }
1444
1445 #[cfg(feature = "dynamic_group_by")]
1461 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1462 mut self,
1463 index_column: Expr,
1464 group_by: E,
1465 mut options: DynamicGroupOptions,
1466 ) -> LazyGroupBy {
1467 if let Expr::Column(name) = index_column {
1468 options.index_column = name;
1469 } else {
1470 let output_field = index_column
1471 .to_field(&self.collect_schema().unwrap(), Context::Default)
1472 .unwrap();
1473 return self.with_column(index_column).group_by_dynamic(
1474 Expr::Column(output_field.name().clone()),
1475 group_by,
1476 options,
1477 );
1478 }
1479 let opt_state = self.get_opt_state();
1480 LazyGroupBy {
1481 logical_plan: self.logical_plan,
1482 opt_state,
1483 keys: group_by.as_ref().to_vec(),
1484 maintain_order: true,
1485 dynamic_options: Some(options),
1486 rolling_options: None,
1487 }
1488 }
1489
1490 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1492 let keys = by
1493 .as_ref()
1494 .iter()
1495 .map(|e| e.clone().into())
1496 .collect::<Vec<_>>();
1497 let opt_state = self.get_opt_state();
1498
1499 #[cfg(feature = "dynamic_group_by")]
1500 {
1501 LazyGroupBy {
1502 logical_plan: self.logical_plan,
1503 opt_state,
1504 keys,
1505 maintain_order: true,
1506 dynamic_options: None,
1507 rolling_options: None,
1508 }
1509 }
1510
1511 #[cfg(not(feature = "dynamic_group_by"))]
1512 {
1513 LazyGroupBy {
1514 logical_plan: self.logical_plan,
1515 opt_state,
1516 keys,
1517 maintain_order: true,
1518 }
1519 }
1520 }
1521
1522 #[cfg(feature = "semi_anti_join")]
1539 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1540 self.join(
1541 other,
1542 [left_on.into()],
1543 [right_on.into()],
1544 JoinArgs::new(JoinType::Anti),
1545 )
1546 }
1547
1548 #[cfg(feature = "cross_join")]
1550 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1551 self.join(
1552 other,
1553 vec![],
1554 vec![],
1555 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1556 )
1557 }
1558
1559 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1576 self.join(
1577 other,
1578 [left_on.into()],
1579 [right_on.into()],
1580 JoinArgs::new(JoinType::Left),
1581 )
1582 }
1583
1584 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1601 self.join(
1602 other,
1603 [left_on.into()],
1604 [right_on.into()],
1605 JoinArgs::new(JoinType::Inner),
1606 )
1607 }
1608
1609 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1626 self.join(
1627 other,
1628 [left_on.into()],
1629 [right_on.into()],
1630 JoinArgs::new(JoinType::Full),
1631 )
1632 }
1633
1634 #[cfg(feature = "semi_anti_join")]
1651 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1652 self.join(
1653 other,
1654 [left_on.into()],
1655 [right_on.into()],
1656 JoinArgs::new(JoinType::Semi),
1657 )
1658 }
1659
1660 pub fn join<E: AsRef<[Expr]>>(
1682 self,
1683 other: LazyFrame,
1684 left_on: E,
1685 right_on: E,
1686 args: JoinArgs,
1687 ) -> LazyFrame {
1688 let left_on = left_on.as_ref().to_vec();
1689 let right_on = right_on.as_ref().to_vec();
1690
1691 self._join_impl(other, left_on, right_on, args)
1692 }
1693
1694 fn _join_impl(
1695 self,
1696 other: LazyFrame,
1697 left_on: Vec<Expr>,
1698 right_on: Vec<Expr>,
1699 args: JoinArgs,
1700 ) -> LazyFrame {
1701 let JoinArgs {
1702 how,
1703 validation,
1704 suffix,
1705 slice,
1706 nulls_equal,
1707 coalesce,
1708 maintain_order,
1709 } = args;
1710
1711 if slice.is_some() {
1712 panic!("impl error: slice is not handled")
1713 }
1714
1715 let mut builder = self
1716 .join_builder()
1717 .with(other)
1718 .left_on(left_on)
1719 .right_on(right_on)
1720 .how(how)
1721 .validate(validation)
1722 .join_nulls(nulls_equal)
1723 .coalesce(coalesce)
1724 .maintain_order(maintain_order);
1725
1726 if let Some(suffix) = suffix {
1727 builder = builder.suffix(suffix);
1728 }
1729
1730 builder.finish()
1732 }
1733
1734 pub fn join_builder(self) -> JoinBuilder {
1740 JoinBuilder::new(self)
1741 }
1742
1743 pub fn with_column(self, expr: Expr) -> LazyFrame {
1761 let opt_state = self.get_opt_state();
1762 let lp = self
1763 .get_plan_builder()
1764 .with_columns(
1765 vec![expr],
1766 ProjectionOptions {
1767 run_parallel: false,
1768 duplicate_check: true,
1769 should_broadcast: true,
1770 },
1771 )
1772 .build();
1773 Self::from_logical_plan(lp, opt_state)
1774 }
1775
1776 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1791 let exprs = exprs.as_ref().to_vec();
1792 self.with_columns_impl(
1793 exprs,
1794 ProjectionOptions {
1795 run_parallel: true,
1796 duplicate_check: true,
1797 should_broadcast: true,
1798 },
1799 )
1800 }
1801
1802 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1804 let exprs = exprs.as_ref().to_vec();
1805 self.with_columns_impl(
1806 exprs,
1807 ProjectionOptions {
1808 run_parallel: false,
1809 duplicate_check: true,
1810 should_broadcast: true,
1811 },
1812 )
1813 }
1814
1815 pub fn match_to_schema(
1817 self,
1818 schema: SchemaRef,
1819 per_column: Arc<[MatchToSchemaPerColumn]>,
1820 extra_columns: ExtraColumnsPolicy,
1821 ) -> LazyFrame {
1822 let opt_state = self.get_opt_state();
1823 let lp = self
1824 .get_plan_builder()
1825 .match_to_schema(schema, per_column, extra_columns)
1826 .build();
1827 Self::from_logical_plan(lp, opt_state)
1828 }
1829
1830 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1831 let opt_state = self.get_opt_state();
1832 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1833 Self::from_logical_plan(lp, opt_state)
1834 }
1835
1836 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1837 let contexts = contexts
1838 .as_ref()
1839 .iter()
1840 .map(|lf| lf.logical_plan.clone())
1841 .collect();
1842 let opt_state = self.get_opt_state();
1843 let lp = self.get_plan_builder().with_context(contexts).build();
1844 Self::from_logical_plan(lp, opt_state)
1845 }
1846
1847 pub fn max(self) -> Self {
1851 self.map_private(DslFunction::Stats(StatsFunction::Max))
1852 }
1853
1854 pub fn min(self) -> Self {
1858 self.map_private(DslFunction::Stats(StatsFunction::Min))
1859 }
1860
1861 pub fn sum(self) -> Self {
1871 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1872 }
1873
1874 pub fn mean(self) -> Self {
1879 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1880 }
1881
1882 pub fn median(self) -> Self {
1888 self.map_private(DslFunction::Stats(StatsFunction::Median))
1889 }
1890
1891 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1893 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1894 quantile,
1895 method,
1896 }))
1897 }
1898
1899 pub fn std(self, ddof: u8) -> Self {
1912 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1913 }
1914
1915 pub fn var(self, ddof: u8) -> Self {
1925 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1926 }
1927
1928 pub fn explode<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(self, columns: E) -> LazyFrame {
1930 self.explode_impl(columns, false)
1931 }
1932
1933 fn explode_impl<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(
1935 self,
1936 columns: E,
1937 allow_empty: bool,
1938 ) -> LazyFrame {
1939 let columns = columns
1940 .as_ref()
1941 .iter()
1942 .map(|e| e.clone().into())
1943 .collect::<Vec<_>>();
1944 let opt_state = self.get_opt_state();
1945 let lp = self
1946 .get_plan_builder()
1947 .explode(columns, allow_empty)
1948 .build();
1949 Self::from_logical_plan(lp, opt_state)
1950 }
1951
1952 pub fn null_count(self) -> LazyFrame {
1954 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1955 }
1956
1957 pub fn unique_stable(
1962 self,
1963 subset: Option<Vec<PlSmallStr>>,
1964 keep_strategy: UniqueKeepStrategy,
1965 ) -> LazyFrame {
1966 self.unique_stable_generic(subset, keep_strategy)
1967 }
1968
1969 pub fn unique_stable_generic<E, IE>(
1970 self,
1971 subset: Option<E>,
1972 keep_strategy: UniqueKeepStrategy,
1973 ) -> LazyFrame
1974 where
1975 E: AsRef<[IE]>,
1976 IE: Into<Selector> + Clone,
1977 {
1978 let subset = subset.map(|s| {
1979 s.as_ref()
1980 .iter()
1981 .map(|e| e.clone().into())
1982 .collect::<Vec<_>>()
1983 });
1984
1985 let opt_state = self.get_opt_state();
1986 let options = DistinctOptionsDSL {
1987 subset,
1988 maintain_order: true,
1989 keep_strategy,
1990 };
1991 let lp = self.get_plan_builder().distinct(options).build();
1992 Self::from_logical_plan(lp, opt_state)
1993 }
1994
1995 pub fn unique(
2003 self,
2004 subset: Option<Vec<String>>,
2005 keep_strategy: UniqueKeepStrategy,
2006 ) -> LazyFrame {
2007 self.unique_generic(subset, keep_strategy)
2008 }
2009
2010 pub fn unique_generic<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(
2011 self,
2012 subset: Option<E>,
2013 keep_strategy: UniqueKeepStrategy,
2014 ) -> LazyFrame {
2015 let subset = subset.map(|s| {
2016 s.as_ref()
2017 .iter()
2018 .map(|e| e.clone().into())
2019 .collect::<Vec<_>>()
2020 });
2021 let opt_state = self.get_opt_state();
2022 let options = DistinctOptionsDSL {
2023 subset,
2024 maintain_order: false,
2025 keep_strategy,
2026 };
2027 let lp = self.get_plan_builder().distinct(options).build();
2028 Self::from_logical_plan(lp, opt_state)
2029 }
2030
2031 pub fn drop_nans(self, subset: Option<Vec<Expr>>) -> LazyFrame {
2036 let opt_state = self.get_opt_state();
2037 let lp = self.get_plan_builder().drop_nans(subset).build();
2038 Self::from_logical_plan(lp, opt_state)
2039 }
2040
2041 pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> LazyFrame {
2046 let opt_state = self.get_opt_state();
2047 let lp = self.get_plan_builder().drop_nulls(subset).build();
2048 Self::from_logical_plan(lp, opt_state)
2049 }
2050
2051 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
2061 let opt_state = self.get_opt_state();
2062 let lp = self.get_plan_builder().slice(offset, len).build();
2063 Self::from_logical_plan(lp, opt_state)
2064 }
2065
2066 pub fn first(self) -> LazyFrame {
2070 self.slice(0, 1)
2071 }
2072
2073 pub fn last(self) -> LazyFrame {
2077 self.slice(-1, 1)
2078 }
2079
2080 pub fn tail(self, n: IdxSize) -> LazyFrame {
2084 let neg_tail = -(n as i64);
2085 self.slice(neg_tail, n)
2086 }
2087
2088 #[cfg(feature = "pivot")]
2092 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
2093 let opt_state = self.get_opt_state();
2094 let lp = self.get_plan_builder().unpivot(args).build();
2095 Self::from_logical_plan(lp, opt_state)
2096 }
2097
2098 pub fn limit(self, n: IdxSize) -> LazyFrame {
2102 self.slice(0, n)
2103 }
2104
2105 pub fn map<F>(
2119 self,
2120 function: F,
2121 optimizations: AllowedOptimizations,
2122 schema: Option<Arc<dyn UdfSchema>>,
2123 name: Option<&'static str>,
2124 ) -> LazyFrame
2125 where
2126 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
2127 {
2128 let opt_state = self.get_opt_state();
2129 let lp = self
2130 .get_plan_builder()
2131 .map(
2132 function,
2133 optimizations,
2134 schema,
2135 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
2136 )
2137 .build();
2138 Self::from_logical_plan(lp, opt_state)
2139 }
2140
2141 #[cfg(feature = "python")]
2142 pub fn map_python(
2143 self,
2144 function: polars_utils::python_function::PythonFunction,
2145 optimizations: AllowedOptimizations,
2146 schema: Option<SchemaRef>,
2147 validate_output: bool,
2148 ) -> LazyFrame {
2149 let opt_state = self.get_opt_state();
2150 let lp = self
2151 .get_plan_builder()
2152 .map_python(function, optimizations, schema, validate_output)
2153 .build();
2154 Self::from_logical_plan(lp, opt_state)
2155 }
2156
2157 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2158 let opt_state = self.get_opt_state();
2159 let lp = self.get_plan_builder().map_private(function).build();
2160 Self::from_logical_plan(lp, opt_state)
2161 }
2162
2163 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2172 where
2173 S: Into<PlSmallStr>,
2174 {
2175 let name = name.into();
2176
2177 match &self.logical_plan {
2178 v @ DslPlan::Scan { scan_type, .. }
2179 if !matches!(&**scan_type, FileScan::Anonymous { .. }) =>
2180 {
2181 let DslPlan::Scan {
2182 sources,
2183 mut unified_scan_args,
2184 scan_type,
2185 file_info,
2186 cached_ir: _,
2187 } = v.clone()
2188 else {
2189 unreachable!()
2190 };
2191
2192 unified_scan_args.row_index = Some(RowIndex {
2193 name,
2194 offset: offset.unwrap_or(0),
2195 });
2196
2197 DslPlan::Scan {
2198 sources,
2199 unified_scan_args,
2200 scan_type,
2201 file_info,
2202 cached_ir: Default::default(),
2203 }
2204 .into()
2205 },
2206 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2207 }
2208 }
2209
2210 pub fn count(self) -> LazyFrame {
2212 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2213 }
2214
2215 #[cfg(feature = "dtype-struct")]
2218 pub fn unnest<E, IE>(self, cols: E) -> Self
2219 where
2220 E: AsRef<[IE]>,
2221 IE: Into<Selector> + Clone,
2222 {
2223 let cols = cols
2224 .as_ref()
2225 .iter()
2226 .map(|ie| ie.clone().into())
2227 .collect::<Vec<_>>();
2228 self.map_private(DslFunction::Unnest(cols))
2229 }
2230
2231 #[cfg(feature = "merge_sorted")]
2232 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2233 where
2234 S: Into<PlSmallStr>,
2235 {
2236 let key = key.into();
2237
2238 let lp = DslPlan::MergeSorted {
2239 input_left: Arc::new(self.logical_plan),
2240 input_right: Arc::new(other.logical_plan),
2241 key,
2242 };
2243 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2244 }
2245}
2246
2247#[derive(Clone)]
2249pub struct LazyGroupBy {
2250 pub logical_plan: DslPlan,
2251 opt_state: OptFlags,
2252 keys: Vec<Expr>,
2253 maintain_order: bool,
2254 #[cfg(feature = "dynamic_group_by")]
2255 dynamic_options: Option<DynamicGroupOptions>,
2256 #[cfg(feature = "dynamic_group_by")]
2257 rolling_options: Option<RollingGroupOptions>,
2258}
2259
2260impl From<LazyGroupBy> for LazyFrame {
2261 fn from(lgb: LazyGroupBy) -> Self {
2262 Self {
2263 logical_plan: lgb.logical_plan,
2264 opt_state: lgb.opt_state,
2265 cached_arena: Default::default(),
2266 }
2267 }
2268}
2269
2270impl LazyGroupBy {
2271 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2293 #[cfg(feature = "dynamic_group_by")]
2294 let lp = DslBuilder::from(self.logical_plan)
2295 .group_by(
2296 self.keys,
2297 aggs,
2298 None,
2299 self.maintain_order,
2300 self.dynamic_options,
2301 self.rolling_options,
2302 )
2303 .build();
2304
2305 #[cfg(not(feature = "dynamic_group_by"))]
2306 let lp = DslBuilder::from(self.logical_plan)
2307 .group_by(self.keys, aggs, None, self.maintain_order)
2308 .build();
2309 LazyFrame::from_logical_plan(lp, self.opt_state)
2310 }
2311
2312 pub fn head(self, n: Option<usize>) -> LazyFrame {
2314 let keys = self
2315 .keys
2316 .iter()
2317 .filter_map(|expr| expr_output_name(expr).ok())
2318 .collect::<Vec<_>>();
2319
2320 self.agg([col(PlSmallStr::from_static("*"))
2321 .exclude(keys.iter().cloned())
2322 .head(n)])
2323 .explode_impl(
2324 [col(PlSmallStr::from_static("*")).exclude(keys.iter().cloned())],
2325 true,
2326 )
2327 }
2328
2329 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2331 let keys = self
2332 .keys
2333 .iter()
2334 .filter_map(|expr| expr_output_name(expr).ok())
2335 .collect::<Vec<_>>();
2336
2337 self.agg([col(PlSmallStr::from_static("*"))
2338 .exclude(keys.iter().cloned())
2339 .tail(n)])
2340 .explode_impl(
2341 [col(PlSmallStr::from_static("*")).exclude(keys.iter().cloned())],
2342 true,
2343 )
2344 }
2345
2346 pub fn apply<F>(self, f: F, schema: SchemaRef) -> LazyFrame
2351 where
2352 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
2353 {
2354 #[cfg(feature = "dynamic_group_by")]
2355 let options = GroupbyOptions {
2356 dynamic: self.dynamic_options,
2357 rolling: self.rolling_options,
2358 slice: None,
2359 };
2360
2361 #[cfg(not(feature = "dynamic_group_by"))]
2362 let options = GroupbyOptions { slice: None };
2363
2364 let lp = DslPlan::GroupBy {
2365 input: Arc::new(self.logical_plan),
2366 keys: self.keys,
2367 aggs: vec![],
2368 apply: Some((Arc::new(f), schema)),
2369 maintain_order: self.maintain_order,
2370 options: Arc::new(options),
2371 };
2372 LazyFrame::from_logical_plan(lp, self.opt_state)
2373 }
2374}
2375
2376#[must_use]
2377pub struct JoinBuilder {
2378 lf: LazyFrame,
2379 how: JoinType,
2380 other: Option<LazyFrame>,
2381 left_on: Vec<Expr>,
2382 right_on: Vec<Expr>,
2383 allow_parallel: bool,
2384 force_parallel: bool,
2385 suffix: Option<PlSmallStr>,
2386 validation: JoinValidation,
2387 nulls_equal: bool,
2388 coalesce: JoinCoalesce,
2389 maintain_order: MaintainOrderJoin,
2390}
2391impl JoinBuilder {
2392 pub fn new(lf: LazyFrame) -> Self {
2394 Self {
2395 lf,
2396 other: None,
2397 how: JoinType::Inner,
2398 left_on: vec![],
2399 right_on: vec![],
2400 allow_parallel: true,
2401 force_parallel: false,
2402 suffix: None,
2403 validation: Default::default(),
2404 nulls_equal: false,
2405 coalesce: Default::default(),
2406 maintain_order: Default::default(),
2407 }
2408 }
2409
2410 pub fn with(mut self, other: LazyFrame) -> Self {
2412 self.other = Some(other);
2413 self
2414 }
2415
2416 pub fn how(mut self, how: JoinType) -> Self {
2418 self.how = how;
2419 self
2420 }
2421
2422 pub fn validate(mut self, validation: JoinValidation) -> Self {
2423 self.validation = validation;
2424 self
2425 }
2426
2427 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2431 let on = on.as_ref().to_vec();
2432 self.left_on.clone_from(&on);
2433 self.right_on = on;
2434 self
2435 }
2436
2437 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2441 self.left_on = on.as_ref().to_vec();
2442 self
2443 }
2444
2445 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2449 self.right_on = on.as_ref().to_vec();
2450 self
2451 }
2452
2453 pub fn allow_parallel(mut self, allow: bool) -> Self {
2455 self.allow_parallel = allow;
2456 self
2457 }
2458
2459 pub fn force_parallel(mut self, force: bool) -> Self {
2461 self.force_parallel = force;
2462 self
2463 }
2464
2465 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2467 self.nulls_equal = nulls_equal;
2468 self
2469 }
2470
2471 pub fn suffix<S>(mut self, suffix: S) -> Self
2474 where
2475 S: Into<PlSmallStr>,
2476 {
2477 self.suffix = Some(suffix.into());
2478 self
2479 }
2480
2481 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2483 self.coalesce = coalesce;
2484 self
2485 }
2486
2487 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2489 self.maintain_order = maintain_order;
2490 self
2491 }
2492
2493 pub fn finish(self) -> LazyFrame {
2495 let opt_state = self.lf.opt_state;
2496 let other = self.other.expect("'with' not set in join builder");
2497
2498 let args = JoinArgs {
2499 how: self.how,
2500 validation: self.validation,
2501 suffix: self.suffix,
2502 slice: None,
2503 nulls_equal: self.nulls_equal,
2504 coalesce: self.coalesce,
2505 maintain_order: self.maintain_order,
2506 };
2507
2508 let lp = self
2509 .lf
2510 .get_plan_builder()
2511 .join(
2512 other.logical_plan,
2513 self.left_on,
2514 self.right_on,
2515 JoinOptions {
2516 allow_parallel: self.allow_parallel,
2517 force_parallel: self.force_parallel,
2518 args,
2519 ..Default::default()
2520 }
2521 .into(),
2522 )
2523 .build();
2524 LazyFrame::from_logical_plan(lp, opt_state)
2525 }
2526
2527 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2529 let opt_state = self.lf.opt_state;
2530 let other = self.other.expect("with not set");
2531
2532 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2534 if let Expr::BinaryExpr {
2535 op: Operator::And,
2536 left,
2537 right,
2538 } = predicate
2539 {
2540 decompose_and((*left).clone(), expanded_predicates);
2541 decompose_and((*right).clone(), expanded_predicates);
2542 } else {
2543 expanded_predicates.push(predicate);
2544 }
2545 }
2546 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2547 for predicate in predicates {
2548 decompose_and(predicate, &mut expanded_predicates);
2549 }
2550 let predicates: Vec<Expr> = expanded_predicates;
2551
2552 #[cfg(feature = "is_between")]
2554 let predicates: Vec<Expr> = {
2555 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2556 for predicate in predicates {
2557 if let Expr::Function {
2558 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2559 input,
2560 ..
2561 } = &predicate
2562 {
2563 if let [expr, lower, upper] = input.as_slice() {
2564 match closed {
2565 ClosedInterval::Both => {
2566 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2567 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2568 },
2569 ClosedInterval::Right => {
2570 expanded_predicates.push(expr.clone().gt(lower.clone()));
2571 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2572 },
2573 ClosedInterval::Left => {
2574 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2575 expanded_predicates.push(expr.clone().lt(upper.clone()));
2576 },
2577 ClosedInterval::None => {
2578 expanded_predicates.push(expr.clone().gt(lower.clone()));
2579 expanded_predicates.push(expr.clone().lt(upper.clone()));
2580 },
2581 }
2582 continue;
2583 }
2584 }
2585 expanded_predicates.push(predicate);
2586 }
2587 expanded_predicates
2588 };
2589
2590 let args = JoinArgs {
2591 how: self.how,
2592 validation: self.validation,
2593 suffix: self.suffix,
2594 slice: None,
2595 nulls_equal: self.nulls_equal,
2596 coalesce: self.coalesce,
2597 maintain_order: self.maintain_order,
2598 };
2599 let options = JoinOptions {
2600 allow_parallel: self.allow_parallel,
2601 force_parallel: self.force_parallel,
2602 args,
2603 ..Default::default()
2604 };
2605
2606 let lp = DslPlan::Join {
2607 input_left: Arc::new(self.lf.logical_plan),
2608 input_right: Arc::new(other.logical_plan),
2609 left_on: Default::default(),
2610 right_on: Default::default(),
2611 predicates,
2612 options: Arc::from(options),
2613 };
2614
2615 LazyFrame::from_logical_plan(lp, opt_state)
2616 }
2617}
2618
2619pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2620 #[cfg(not(feature = "new_streaming"))]
2621 {
2622 None
2623 }
2624 #[cfg(feature = "new_streaming")]
2625 {
2626 Some(streaming_dispatch::build_streaming_query_executor)
2627 }
2628};
2629#[cfg(feature = "new_streaming")]
2630pub use streaming_dispatch::build_streaming_query_executor;
2631
2632#[cfg(feature = "new_streaming")]
2633mod streaming_dispatch {
2634 use std::sync::{Arc, Mutex};
2635
2636 use polars_core::POOL;
2637 use polars_core::error::PolarsResult;
2638 use polars_core::frame::DataFrame;
2639 use polars_expr::state::ExecutionState;
2640 use polars_mem_engine::Executor;
2641 use polars_plan::dsl::SinkTypeIR;
2642 use polars_plan::plans::{AExpr, IR};
2643 use polars_utils::arena::{Arena, Node};
2644
2645 pub fn build_streaming_query_executor(
2646 node: Node,
2647 ir_arena: &mut Arena<IR>,
2648 expr_arena: &mut Arena<AExpr>,
2649 ) -> PolarsResult<Box<dyn Executor>> {
2650 let rechunk = match ir_arena.get(node) {
2651 IR::Scan {
2652 unified_scan_args, ..
2653 } => unified_scan_args.rechunk,
2654 _ => false,
2655 };
2656
2657 let node = ir_arena.add(IR::Sink {
2658 input: node,
2659 payload: SinkTypeIR::Memory,
2660 });
2661
2662 polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2663 .map(Some)
2664 .map(Mutex::new)
2665 .map(Arc::new)
2666 .map(|x| StreamingQueryExecutor {
2667 executor: x,
2668 rechunk,
2669 })
2670 .map(|x| Box::new(x) as Box<dyn Executor>)
2671 }
2672
2673 struct StreamingQueryExecutor {
2675 executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2676 rechunk: bool,
2677 }
2678
2679 impl Executor for StreamingQueryExecutor {
2680 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2681 assert!(POOL.current_thread_index().is_none());
2683
2684 let mut df = { self.executor.try_lock().unwrap().take() }
2685 .expect("unhandled: execute() more than once")
2686 .execute()
2687 .map(|x| x.unwrap_single())?;
2688
2689 if self.rechunk {
2690 df.as_single_chunk_par();
2691 }
2692
2693 Ok(df)
2694 }
2695 }
2696}