1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12#[cfg(any(
13 feature = "parquet",
14 feature = "ipc",
15 feature = "csv",
16 feature = "json"
17))]
18use std::path::Path;
19use std::sync::{Arc, Mutex};
20
21pub use anonymous_scan::*;
22#[cfg(feature = "csv")]
23pub use csv::*;
24#[cfg(not(target_arch = "wasm32"))]
25pub use exitable::*;
26pub use file_list_reader::*;
27#[cfg(feature = "ipc")]
28pub use ipc::*;
29#[cfg(feature = "json")]
30pub use ndjson::*;
31#[cfg(feature = "parquet")]
32pub use parquet::*;
33use polars_compute::rolling::QuantileMethod;
34use polars_core::POOL;
35use polars_core::error::feature_gated;
36use polars_core::prelude::*;
37use polars_expr::{ExpressionConversionState, create_physical_expr};
38use polars_io::RowIndex;
39use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
40use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
41#[cfg(feature = "is_between")]
42use polars_ops::prelude::ClosedInterval;
43pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
44use polars_plan::global::FETCH_ROWS;
45use polars_utils::pl_str::PlSmallStr;
46use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
47
48use crate::frame::cached_arenas::CachedArena;
49#[cfg(feature = "streaming")]
50use crate::physical_plan::streaming::insert_streaming_nodes;
51use crate::prelude::*;
52
53pub trait IntoLazy {
54 fn lazy(self) -> LazyFrame;
55}
56
57impl IntoLazy for DataFrame {
58 fn lazy(self) -> LazyFrame {
60 let lp = DslBuilder::from_existing_df(self).build();
61 LazyFrame {
62 logical_plan: lp,
63 opt_state: Default::default(),
64 cached_arena: Default::default(),
65 }
66 }
67}
68
69impl IntoLazy for LazyFrame {
70 fn lazy(self) -> LazyFrame {
71 self
72 }
73}
74
75#[derive(Clone, Default)]
80#[must_use]
81pub struct LazyFrame {
82 pub logical_plan: DslPlan,
83 pub(crate) opt_state: OptFlags,
84 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
85}
86
87impl From<DslPlan> for LazyFrame {
88 fn from(plan: DslPlan) -> Self {
89 Self {
90 logical_plan: plan,
91 opt_state: OptFlags::default(),
92 cached_arena: Default::default(),
93 }
94 }
95}
96
97impl LazyFrame {
98 pub(crate) fn from_inner(
99 logical_plan: DslPlan,
100 opt_state: OptFlags,
101 cached_arena: Arc<Mutex<Option<CachedArena>>>,
102 ) -> Self {
103 Self {
104 logical_plan,
105 opt_state,
106 cached_arena,
107 }
108 }
109
110 pub(crate) fn get_plan_builder(self) -> DslBuilder {
111 DslBuilder::from(self.logical_plan)
112 }
113
114 fn get_opt_state(&self) -> OptFlags {
115 self.opt_state
116 }
117
118 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
119 LazyFrame {
120 logical_plan,
121 opt_state,
122 cached_arena: Default::default(),
123 }
124 }
125
126 pub fn get_current_optimizations(&self) -> OptFlags {
128 self.opt_state
129 }
130
131 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
133 self.opt_state = opt_state;
134 self
135 }
136
137 pub fn without_optimizations(self) -> Self {
139 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
140 }
141
142 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
144 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
145 self
146 }
147
148 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
150 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
151 self
152 }
153
154 pub fn with_collapse_joins(mut self, toggle: bool) -> Self {
156 self.opt_state.set(OptFlags::COLLAPSE_JOINS, toggle);
157 self
158 }
159
160 pub fn with_check_order(mut self, toggle: bool) -> Self {
163 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
164 self
165 }
166
167 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
169 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
170 self
171 }
172
173 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
175 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
176 self
177 }
178
179 pub fn with_type_check(mut self, toggle: bool) -> Self {
181 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
182 self
183 }
184
185 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
187 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
188 self
189 }
190
191 #[cfg(feature = "cse")]
193 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
194 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
195 self
196 }
197
198 #[cfg(feature = "cse")]
200 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
201 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
202 self
203 }
204
205 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
207 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
208 self
209 }
210
211 #[cfg(feature = "streaming")]
213 pub fn with_streaming(mut self, toggle: bool) -> Self {
214 self.opt_state.set(OptFlags::STREAMING, toggle);
215 self
216 }
217
218 #[cfg(feature = "new_streaming")]
219 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
220 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
221 self
222 }
223
224 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
226 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
227 self
228 }
229
230 pub fn _with_eager(mut self, toggle: bool) -> Self {
232 self.opt_state.set(OptFlags::EAGER, toggle);
233 self
234 }
235
236 pub fn describe_plan(&self) -> PolarsResult<String> {
238 Ok(self.clone().to_alp()?.describe())
239 }
240
241 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
243 Ok(self.clone().to_alp()?.describe_tree_format())
244 }
245
246 fn _describe_to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
249 let (mut lp_arena, mut expr_arena) = self.get_arenas();
250 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], true)?;
251
252 Ok(IRPlan::new(node, lp_arena, expr_arena))
253 }
254
255 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
259 Ok(self.clone()._describe_to_alp_optimized()?.describe())
260 }
261
262 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
266 Ok(self
267 .clone()
268 ._describe_to_alp_optimized()?
269 .describe_tree_format())
270 }
271
272 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
277 if optimized {
278 self.describe_optimized_plan()
279 } else {
280 self.describe_plan()
281 }
282 }
283
284 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
324 let opt_state = self.get_opt_state();
325 let lp = self
326 .get_plan_builder()
327 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
328 .build();
329 Self::from_logical_plan(lp, opt_state)
330 }
331
332 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
352 self,
353 by_exprs: E,
354 sort_options: SortMultipleOptions,
355 ) -> Self {
356 let by_exprs = by_exprs.as_ref().to_vec();
357 if by_exprs.is_empty() {
358 self
359 } else {
360 let opt_state = self.get_opt_state();
361 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
362 Self::from_logical_plan(lp, opt_state)
363 }
364 }
365
366 pub fn top_k<E: AsRef<[Expr]>>(
367 self,
368 k: IdxSize,
369 by_exprs: E,
370 sort_options: SortMultipleOptions,
371 ) -> Self {
372 self.sort_by_exprs(
374 by_exprs,
375 sort_options.with_order_reversed().with_nulls_last(true),
376 )
377 .slice(0, k)
378 }
379
380 pub fn bottom_k<E: AsRef<[Expr]>>(
381 self,
382 k: IdxSize,
383 by_exprs: E,
384 sort_options: SortMultipleOptions,
385 ) -> Self {
386 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
388 .slice(0, k)
389 }
390
391 pub fn reverse(self) -> Self {
407 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
408 }
409
410 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
418 where
419 I: IntoIterator<Item = T>,
420 J: IntoIterator<Item = S>,
421 T: AsRef<str>,
422 S: AsRef<str>,
423 {
424 let iter = existing.into_iter();
425 let cap = iter.size_hint().0;
426 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
427 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
428
429 for (existing, new) in iter.zip(new) {
432 let existing = existing.as_ref();
433 let new = new.as_ref();
434 if new != existing {
435 existing_vec.push(existing.into());
436 new_vec.push(new.into());
437 }
438 }
439
440 self.map_private(DslFunction::Rename {
441 existing: existing_vec.into(),
442 new: new_vec.into(),
443 strict,
444 })
445 }
446
447 fn _drop<I, T>(self, columns: I, strict: bool) -> Self
454 where
455 I: IntoIterator<Item = T>,
456 T: Into<Selector>,
457 {
458 let to_drop = columns.into_iter().map(|c| c.into()).collect();
459
460 let opt_state = self.get_opt_state();
461 let lp = self.get_plan_builder().drop(to_drop, strict).build();
462 Self::from_logical_plan(lp, opt_state)
463 }
464
465 pub fn drop<I, T>(self, columns: I) -> Self
472 where
473 I: IntoIterator<Item = T>,
474 T: Into<Selector>,
475 {
476 self._drop(columns, true)
477 }
478
479 pub fn drop_no_validate<I, T>(self, columns: I) -> Self
485 where
486 I: IntoIterator<Item = T>,
487 T: Into<Selector>,
488 {
489 self._drop(columns, false)
490 }
491
492 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
497 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
498 }
499
500 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
505 self.select(vec![
506 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
507 ])
508 }
509
510 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
512 let opt_state = self.get_opt_state();
513 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
514 Self::from_logical_plan(lp, opt_state)
515 }
516
517 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
519 let opt_state = self.get_opt_state();
520 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
521 Self::from_logical_plan(lp, opt_state)
522 }
523
524 pub fn cache(self) -> Self {
528 let opt_state = self.get_opt_state();
529 let lp = self.get_plan_builder().cache().build();
530 Self::from_logical_plan(lp, opt_state)
531 }
532
533 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
535 let cast_cols: Vec<Expr> = dtypes
536 .into_iter()
537 .map(|(name, dt)| {
538 let name = PlSmallStr::from_str(name);
539
540 if strict {
541 col(name).strict_cast(dt)
542 } else {
543 col(name).cast(dt)
544 }
545 })
546 .collect();
547
548 if cast_cols.is_empty() {
549 self.clone()
550 } else {
551 self.with_columns(cast_cols)
552 }
553 }
554
555 pub fn cast_all(self, dtype: DataType, strict: bool) -> Self {
557 self.with_columns(vec![if strict {
558 col(PlSmallStr::from_static("*")).strict_cast(dtype)
559 } else {
560 col(PlSmallStr::from_static("*")).cast(dtype)
561 }])
562 }
563
564 pub fn fetch(self, n_rows: usize) -> PolarsResult<DataFrame> {
571 FETCH_ROWS.with(|fetch_rows| fetch_rows.set(Some(n_rows)));
572 let res = self.collect();
573 FETCH_ROWS.with(|fetch_rows| fetch_rows.set(None));
574 res
575 }
576
577 pub fn optimize(
578 self,
579 lp_arena: &mut Arena<IR>,
580 expr_arena: &mut Arena<AExpr>,
581 ) -> PolarsResult<Node> {
582 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
583 }
584
585 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
586 let (mut lp_arena, mut expr_arena) = self.get_arenas();
587 let node =
588 self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;
589
590 Ok(IRPlan::new(node, lp_arena, expr_arena))
591 }
592
593 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
594 let (mut lp_arena, mut expr_arena) = self.get_arenas();
595 let node = to_alp(
596 self.logical_plan,
597 &mut expr_arena,
598 &mut lp_arena,
599 &mut self.opt_state,
600 )?;
601 let plan = IRPlan::new(node, lp_arena, expr_arena);
602 Ok(plan)
603 }
604
605 pub(crate) fn optimize_with_scratch(
606 self,
607 lp_arena: &mut Arena<IR>,
608 expr_arena: &mut Arena<AExpr>,
609 scratch: &mut Vec<Node>,
610 enable_fmt: bool,
611 ) -> PolarsResult<Node> {
612 #[allow(unused_mut)]
613 let mut opt_state = self.opt_state;
614 let streaming = self.opt_state.contains(OptFlags::STREAMING);
615 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
616 #[cfg(feature = "cse")]
617 if streaming && !new_streaming {
618 opt_state &= !OptFlags::COMM_SUBPLAN_ELIM;
619 }
620
621 #[cfg(feature = "cse")]
622 if new_streaming {
623 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
626 }
627
628 let lp_top = optimize(
629 self.logical_plan,
630 opt_state,
631 lp_arena,
632 expr_arena,
633 scratch,
634 Some(&|expr, expr_arena, schema| {
635 let phys_expr = create_physical_expr(
636 expr,
637 Context::Default,
638 expr_arena,
639 schema,
640 &mut ExpressionConversionState::new(true, 0),
641 )
642 .ok()?;
643 let io_expr = phys_expr_to_io_expr(phys_expr);
644 Some(io_expr)
645 }),
646 )?;
647
648 if streaming {
649 #[cfg(feature = "streaming")]
650 {
651 insert_streaming_nodes(
652 lp_top,
653 lp_arena,
654 expr_arena,
655 scratch,
656 enable_fmt,
657 true,
658 opt_state.contains(OptFlags::ROW_ESTIMATE),
659 )?;
660 }
661 #[cfg(not(feature = "streaming"))]
662 {
663 _ = enable_fmt;
664 panic!("activate feature 'streaming'")
665 }
666 }
667
668 Ok(lp_top)
669 }
670
671 fn prepare_collect_post_opt<P>(
672 mut self,
673 check_sink: bool,
674 query_start: Option<std::time::Instant>,
675 post_opt: P,
676 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
677 where
678 P: FnOnce(
679 Node,
680 &mut Arena<IR>,
681 &mut Arena<AExpr>,
682 Option<std::time::Duration>,
683 ) -> PolarsResult<()>,
684 {
685 let (mut lp_arena, mut expr_arena) = self.get_arenas();
686
687 let mut scratch = vec![];
688 let lp_top =
689 self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;
690
691 post_opt(
692 lp_top,
693 &mut lp_arena,
694 &mut expr_arena,
695 query_start.map(|s| s.elapsed()),
698 )?;
699
700 let no_file_sink = if check_sink {
702 !matches!(
703 lp_arena.get(lp_top),
704 IR::Sink {
705 payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
706 ..
707 }
708 )
709 } else {
710 true
711 };
712 let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;
713
714 let state = ExecutionState::new();
715 Ok((state, physical_plan, no_file_sink))
716 }
717
718 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
720 where
721 P: FnOnce(
722 Node,
723 &mut Arena<IR>,
724 &mut Arena<AExpr>,
725 Option<std::time::Duration>,
726 ) -> PolarsResult<()>,
727 {
728 let (mut state, mut physical_plan, _) =
729 self.prepare_collect_post_opt(false, None, post_opt)?;
730 physical_plan.execute(&mut state)
731 }
732
733 #[allow(unused_mut)]
734 fn prepare_collect(
735 self,
736 check_sink: bool,
737 query_start: Option<std::time::Instant>,
738 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
739 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
740 }
741
742 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
747 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
748 payload.clone()
749 } else {
750 self.logical_plan = DslPlan::Sink {
751 input: Arc::new(self.logical_plan),
752 payload: SinkType::Memory,
753 };
754 SinkType::Memory
755 };
756
757 if engine == Engine::Auto {
759 engine = match payload {
760 SinkType::Memory => Engine::InMemory,
761 SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
762 };
763 }
764 if engine == Engine::Gpu {
766 engine = Engine::InMemory;
767 }
768
769 #[cfg(feature = "new_streaming")]
770 {
771 if let Some(result) = self.try_new_streaming_if_requested() {
772 return result.map(|v| v.unwrap());
773 }
774 }
775
776 match engine {
777 Engine::Auto => unreachable!(),
778 Engine::Streaming => {
779 feature_gated!("new_streaming", self = self.with_new_streaming(true))
780 },
781 Engine::OldStreaming => feature_gated!("streaming", self = self.with_streaming(true)),
782 _ => {},
783 }
784 let mut alp_plan = self.clone().to_alp_optimized()?;
785
786 match engine {
787 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
788 let string_cache_hold = StringCacheHolder::hold();
789 let result = polars_stream::run_query(
790 alp_plan.lp_top,
791 &mut alp_plan.lp_arena,
792 &mut alp_plan.expr_arena,
793 );
794 drop(string_cache_hold);
795 result.map(|v| v.unwrap())
796 }),
797 _ if matches!(payload, SinkType::Partition { .. }) => Err(polars_err!(
798 InvalidOperation: "partition sinks are not supported on for the '{}' engine",
799 engine.into_static_str()
800 )),
801 Engine::Gpu => {
802 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
803 },
804 Engine::InMemory => {
805 let mut physical_plan = create_physical_plan(
806 alp_plan.lp_top,
807 &mut alp_plan.lp_arena,
808 &mut alp_plan.expr_arena,
809 )?;
810 let mut state = ExecutionState::new();
811 physical_plan.execute(&mut state)
812 },
813 Engine::OldStreaming => {
814 self.opt_state |= OptFlags::STREAMING;
815 let (mut state, mut physical_plan, is_streaming) =
816 self.prepare_collect(true, None)?;
817 polars_ensure!(
818 is_streaming,
819 ComputeError: format!("cannot run the whole query in a streaming order")
820 );
821 physical_plan.execute(&mut state)
822 },
823 }
824 }
825
826 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
827 let sink_multiple = LazyFrame {
828 logical_plan: DslPlan::SinkMultiple { inputs: plans },
829 opt_state,
830 cached_arena: Default::default(),
831 };
832 sink_multiple.explain(true)
833 }
834
835 pub fn collect_all_with_engine(
836 plans: Vec<DslPlan>,
837 mut engine: Engine,
838 opt_state: OptFlags,
839 ) -> PolarsResult<Vec<DataFrame>> {
840 if plans.is_empty() {
841 return Ok(Vec::new());
842 }
843
844 if engine == Engine::Auto {
846 engine = Engine::InMemory;
847 }
848 if engine == Engine::Gpu {
850 engine = Engine::InMemory;
851 }
852
853 let mut sink_multiple = LazyFrame {
854 logical_plan: DslPlan::SinkMultiple { inputs: plans },
855 opt_state,
856 cached_arena: Default::default(),
857 };
858
859 #[cfg(feature = "new_streaming")]
860 {
861 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
862 return result.map(|v| v.unwrap_err());
863 }
864 }
865
866 match engine {
867 Engine::Auto => unreachable!(),
868 Engine::Streaming => {
869 feature_gated!(
870 "new_streaming",
871 sink_multiple = sink_multiple.with_new_streaming(true)
872 )
873 },
874 Engine::OldStreaming => feature_gated!(
875 "streaming",
876 sink_multiple = sink_multiple.with_streaming(true)
877 ),
878 _ => {},
879 }
880 let mut alp_plan = sink_multiple.to_alp_optimized()?;
881
882 if engine == Engine::Streaming {
883 feature_gated!("new_streaming", {
884 let string_cache_hold = StringCacheHolder::hold();
885 let result = polars_stream::run_query(
886 alp_plan.lp_top,
887 &mut alp_plan.lp_arena,
888 &mut alp_plan.expr_arena,
889 );
890 drop(string_cache_hold);
891 return result.map(|v| v.unwrap_err());
892 });
893 }
894
895 let IR::SinkMultiple { inputs } = alp_plan.root() else {
896 unreachable!()
897 };
898
899 let mut multiplan = create_multiple_physical_plans(
900 inputs.clone().as_slice(),
901 &mut alp_plan.lp_arena,
902 &mut alp_plan.expr_arena,
903 )?;
904
905 match engine {
906 Engine::Gpu => polars_bail!(
907 InvalidOperation: "collect_all is not supported for the gpu engine"
908 ),
909 Engine::InMemory => {
910 let mut state = ExecutionState::new();
914 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
915 cache_prefiller.execute(&mut state)?;
916 }
917 let out = POOL.install(|| {
918 multiplan
919 .physical_plans
920 .chunks_mut(POOL.current_num_threads() * 3)
921 .map(|chunk| {
922 chunk
923 .into_par_iter()
924 .enumerate()
925 .map(|(idx, input)| {
926 let mut input = std::mem::take(input);
927 let mut state = state.split();
928 state.branch_idx += idx;
929
930 let df = input.execute(&mut state)?;
931 Ok(df)
932 })
933 .collect::<PolarsResult<Vec<_>>>()
934 })
935 .collect::<PolarsResult<Vec<_>>>()
936 });
937 Ok(out?.into_iter().flatten().collect())
938 },
939 Engine::OldStreaming => panic!("This is no longer supported"),
940 _ => unreachable!(),
941 }
942 }
943
944 pub fn collect(self) -> PolarsResult<DataFrame> {
962 self.collect_with_engine(Engine::InMemory)
963 }
964
965 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
968 where
969 P: FnOnce(
970 Node,
971 &mut Arena<IR>,
972 &mut Arena<AExpr>,
973 Option<std::time::Duration>,
974 ) -> PolarsResult<()>,
975 {
976 let query_start = std::time::Instant::now();
977 let (mut state, mut physical_plan, _) =
978 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
979 state.time_nodes(query_start);
980 let out = physical_plan.execute(&mut state)?;
981 let timer_df = state.finish_timer()?;
982 Ok((out, timer_df))
983 }
984
985 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
993 self._profile_post_opt(|_, _, _, _| Ok(()))
994 }
995
996 #[cfg(feature = "parquet")]
1000 pub fn sink_parquet(
1001 self,
1002 path: &dyn AsRef<Path>,
1003 options: ParquetWriteOptions,
1004 cloud_options: Option<polars_io::cloud::CloudOptions>,
1005 sink_options: SinkOptions,
1006 ) -> PolarsResult<Self> {
1007 self.sink(SinkType::File(FileSinkType {
1008 path: Arc::new(path.as_ref().to_path_buf()),
1009 sink_options,
1010 file_type: FileType::Parquet(options),
1011 cloud_options,
1012 }))
1013 }
1014
1015 #[cfg(feature = "ipc")]
1019 pub fn sink_ipc(
1020 self,
1021 path: impl AsRef<Path>,
1022 options: IpcWriterOptions,
1023 cloud_options: Option<polars_io::cloud::CloudOptions>,
1024 sink_options: SinkOptions,
1025 ) -> PolarsResult<Self> {
1026 self.sink(SinkType::File(FileSinkType {
1027 path: Arc::new(path.as_ref().to_path_buf()),
1028 sink_options,
1029 file_type: FileType::Ipc(options),
1030 cloud_options,
1031 }))
1032 }
1033
1034 #[cfg(feature = "csv")]
1038 pub fn sink_csv(
1039 self,
1040 path: impl AsRef<Path>,
1041 options: CsvWriterOptions,
1042 cloud_options: Option<polars_io::cloud::CloudOptions>,
1043 sink_options: SinkOptions,
1044 ) -> PolarsResult<Self> {
1045 self.sink(SinkType::File(FileSinkType {
1046 path: Arc::new(path.as_ref().to_path_buf()),
1047 sink_options,
1048 file_type: FileType::Csv(options),
1049 cloud_options,
1050 }))
1051 }
1052
1053 #[cfg(feature = "json")]
1057 pub fn sink_json(
1058 self,
1059 path: impl AsRef<Path>,
1060 options: JsonWriterOptions,
1061 cloud_options: Option<polars_io::cloud::CloudOptions>,
1062 sink_options: SinkOptions,
1063 ) -> PolarsResult<Self> {
1064 self.sink(SinkType::File(FileSinkType {
1065 path: Arc::new(path.as_ref().to_path_buf()),
1066 sink_options,
1067 file_type: FileType::Json(options),
1068 cloud_options,
1069 }))
1070 }
1071
1072 #[cfg(feature = "parquet")]
1076 pub fn sink_parquet_partitioned(
1077 self,
1078 path_f_string: impl AsRef<Path>,
1079 variant: PartitionVariant,
1080 options: ParquetWriteOptions,
1081 cloud_options: Option<polars_io::cloud::CloudOptions>,
1082 sink_options: SinkOptions,
1083 ) -> PolarsResult<Self> {
1084 self.sink(SinkType::Partition(PartitionSinkType {
1085 path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
1086 sink_options,
1087 variant,
1088 file_type: FileType::Parquet(options),
1089 cloud_options,
1090 }))
1091 }
1092
1093 #[cfg(feature = "ipc")]
1097 pub fn sink_ipc_partitioned(
1098 self,
1099 path_f_string: impl AsRef<Path>,
1100 variant: PartitionVariant,
1101 options: IpcWriterOptions,
1102 cloud_options: Option<polars_io::cloud::CloudOptions>,
1103 sink_options: SinkOptions,
1104 ) -> PolarsResult<Self> {
1105 self.sink(SinkType::Partition(PartitionSinkType {
1106 path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
1107 sink_options,
1108 variant,
1109 file_type: FileType::Ipc(options),
1110 cloud_options,
1111 }))
1112 }
1113
1114 #[cfg(feature = "csv")]
1118 pub fn sink_csv_partitioned(
1119 self,
1120 path_f_string: impl AsRef<Path>,
1121 variant: PartitionVariant,
1122 options: CsvWriterOptions,
1123 cloud_options: Option<polars_io::cloud::CloudOptions>,
1124 sink_options: SinkOptions,
1125 ) -> PolarsResult<Self> {
1126 self.sink(SinkType::Partition(PartitionSinkType {
1127 path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
1128 sink_options,
1129 variant,
1130 file_type: FileType::Csv(options),
1131 cloud_options,
1132 }))
1133 }
1134
1135 #[cfg(feature = "json")]
1139 pub fn sink_json_partitioned(
1140 self,
1141 path_f_string: impl AsRef<Path>,
1142 variant: PartitionVariant,
1143 options: JsonWriterOptions,
1144 cloud_options: Option<polars_io::cloud::CloudOptions>,
1145 sink_options: SinkOptions,
1146 ) -> PolarsResult<Self> {
1147 self.sink(SinkType::Partition(PartitionSinkType {
1148 path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
1149 sink_options,
1150 variant,
1151 file_type: FileType::Json(options),
1152 cloud_options,
1153 }))
1154 }
1155
1156 #[cfg(feature = "new_streaming")]
1157 pub fn try_new_streaming_if_requested(
1158 &mut self,
1159 ) -> Option<PolarsResult<Result<DataFrame, Vec<DataFrame>>>> {
1160 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1161 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1162
1163 if auto_new_streaming || force_new_streaming {
1164 let mut new_stream_lazy = self.clone();
1167 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1168 new_stream_lazy.opt_state &= !OptFlags::STREAMING;
1169 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1170 Ok(v) => v,
1171 Err(e) => return Some(Err(e)),
1172 };
1173
1174 let _hold = StringCacheHolder::hold();
1175 let f = || {
1176 polars_stream::run_query(
1177 alp_plan.lp_top,
1178 &mut alp_plan.lp_arena,
1179 &mut alp_plan.expr_arena,
1180 )
1181 };
1182
1183 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1184 Ok(v) => return Some(v),
1185 Err(e) => {
1186 if !force_new_streaming
1189 && auto_new_streaming
1190 && e.downcast_ref::<&str>()
1191 .map(|s| s.starts_with("not yet implemented"))
1192 .unwrap_or(false)
1193 {
1194 if polars_core::config::verbose() {
1195 eprintln!(
1196 "caught unimplemented error in new streaming engine, falling back to normal engine"
1197 );
1198 }
1199 } else {
1200 std::panic::resume_unwind(e);
1201 }
1202 },
1203 }
1204 }
1205
1206 None
1207 }
1208
1209 fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1210 polars_ensure!(
1211 !matches!(self.logical_plan, DslPlan::Sink { .. }),
1212 InvalidOperation: "cannot create a sink on top of another sink"
1213 );
1214 self.logical_plan = DslPlan::Sink {
1215 input: Arc::new(self.logical_plan),
1216 payload: payload.clone(),
1217 };
1218 Ok(self)
1219 }
1220
1221 pub fn filter(self, predicate: Expr) -> Self {
1239 let opt_state = self.get_opt_state();
1240 let lp = self.get_plan_builder().filter(predicate).build();
1241 Self::from_logical_plan(lp, opt_state)
1242 }
1243
1244 pub fn remove(self, predicate: Expr) -> Self {
1262 self.filter(predicate.neq_missing(lit(true)))
1263 }
1264
1265 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1291 let exprs = exprs.as_ref().to_vec();
1292 self.select_impl(
1293 exprs,
1294 ProjectionOptions {
1295 run_parallel: true,
1296 duplicate_check: true,
1297 should_broadcast: true,
1298 },
1299 )
1300 }
1301
1302 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1303 let exprs = exprs.as_ref().to_vec();
1304 self.select_impl(
1305 exprs,
1306 ProjectionOptions {
1307 run_parallel: false,
1308 duplicate_check: true,
1309 should_broadcast: true,
1310 },
1311 )
1312 }
1313
1314 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1315 let opt_state = self.get_opt_state();
1316 let lp = self.get_plan_builder().project(exprs, options).build();
1317 Self::from_logical_plan(lp, opt_state)
1318 }
1319
1320 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1341 let keys = by
1342 .as_ref()
1343 .iter()
1344 .map(|e| e.clone().into())
1345 .collect::<Vec<_>>();
1346 let opt_state = self.get_opt_state();
1347
1348 #[cfg(feature = "dynamic_group_by")]
1349 {
1350 LazyGroupBy {
1351 logical_plan: self.logical_plan,
1352 opt_state,
1353 keys,
1354 maintain_order: false,
1355 dynamic_options: None,
1356 rolling_options: None,
1357 }
1358 }
1359
1360 #[cfg(not(feature = "dynamic_group_by"))]
1361 {
1362 LazyGroupBy {
1363 logical_plan: self.logical_plan,
1364 opt_state,
1365 keys,
1366 maintain_order: false,
1367 }
1368 }
1369 }
1370
1371 #[cfg(feature = "dynamic_group_by")]
1379 pub fn rolling<E: AsRef<[Expr]>>(
1380 mut self,
1381 index_column: Expr,
1382 group_by: E,
1383 mut options: RollingGroupOptions,
1384 ) -> LazyGroupBy {
1385 if let Expr::Column(name) = index_column {
1386 options.index_column = name;
1387 } else {
1388 let output_field = index_column
1389 .to_field(&self.collect_schema().unwrap(), Context::Default)
1390 .unwrap();
1391 return self.with_column(index_column).rolling(
1392 Expr::Column(output_field.name().clone()),
1393 group_by,
1394 options,
1395 );
1396 }
1397 let opt_state = self.get_opt_state();
1398 LazyGroupBy {
1399 logical_plan: self.logical_plan,
1400 opt_state,
1401 keys: group_by.as_ref().to_vec(),
1402 maintain_order: true,
1403 dynamic_options: None,
1404 rolling_options: Some(options),
1405 }
1406 }
1407
1408 #[cfg(feature = "dynamic_group_by")]
1424 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1425 mut self,
1426 index_column: Expr,
1427 group_by: E,
1428 mut options: DynamicGroupOptions,
1429 ) -> LazyGroupBy {
1430 if let Expr::Column(name) = index_column {
1431 options.index_column = name;
1432 } else {
1433 let output_field = index_column
1434 .to_field(&self.collect_schema().unwrap(), Context::Default)
1435 .unwrap();
1436 return self.with_column(index_column).group_by_dynamic(
1437 Expr::Column(output_field.name().clone()),
1438 group_by,
1439 options,
1440 );
1441 }
1442 let opt_state = self.get_opt_state();
1443 LazyGroupBy {
1444 logical_plan: self.logical_plan,
1445 opt_state,
1446 keys: group_by.as_ref().to_vec(),
1447 maintain_order: true,
1448 dynamic_options: Some(options),
1449 rolling_options: None,
1450 }
1451 }
1452
1453 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1455 let keys = by
1456 .as_ref()
1457 .iter()
1458 .map(|e| e.clone().into())
1459 .collect::<Vec<_>>();
1460 let opt_state = self.get_opt_state();
1461
1462 #[cfg(feature = "dynamic_group_by")]
1463 {
1464 LazyGroupBy {
1465 logical_plan: self.logical_plan,
1466 opt_state,
1467 keys,
1468 maintain_order: true,
1469 dynamic_options: None,
1470 rolling_options: None,
1471 }
1472 }
1473
1474 #[cfg(not(feature = "dynamic_group_by"))]
1475 {
1476 LazyGroupBy {
1477 logical_plan: self.logical_plan,
1478 opt_state,
1479 keys,
1480 maintain_order: true,
1481 }
1482 }
1483 }
1484
1485 #[cfg(feature = "semi_anti_join")]
1502 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1503 self.join(
1504 other,
1505 [left_on.into()],
1506 [right_on.into()],
1507 JoinArgs::new(JoinType::Anti),
1508 )
1509 }
1510
1511 #[cfg(feature = "cross_join")]
1513 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1514 self.join(
1515 other,
1516 vec![],
1517 vec![],
1518 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1519 )
1520 }
1521
1522 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1539 self.join(
1540 other,
1541 [left_on.into()],
1542 [right_on.into()],
1543 JoinArgs::new(JoinType::Left),
1544 )
1545 }
1546
1547 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1564 self.join(
1565 other,
1566 [left_on.into()],
1567 [right_on.into()],
1568 JoinArgs::new(JoinType::Inner),
1569 )
1570 }
1571
1572 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1589 self.join(
1590 other,
1591 [left_on.into()],
1592 [right_on.into()],
1593 JoinArgs::new(JoinType::Full),
1594 )
1595 }
1596
1597 #[cfg(feature = "semi_anti_join")]
1614 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1615 self.join(
1616 other,
1617 [left_on.into()],
1618 [right_on.into()],
1619 JoinArgs::new(JoinType::Semi),
1620 )
1621 }
1622
1623 pub fn join<E: AsRef<[Expr]>>(
1645 self,
1646 other: LazyFrame,
1647 left_on: E,
1648 right_on: E,
1649 args: JoinArgs,
1650 ) -> LazyFrame {
1651 let left_on = left_on.as_ref().to_vec();
1652 let right_on = right_on.as_ref().to_vec();
1653
1654 self._join_impl(other, left_on, right_on, args)
1655 }
1656
1657 fn _join_impl(
1658 self,
1659 other: LazyFrame,
1660 left_on: Vec<Expr>,
1661 right_on: Vec<Expr>,
1662 args: JoinArgs,
1663 ) -> LazyFrame {
1664 let JoinArgs {
1665 how,
1666 validation,
1667 suffix,
1668 slice,
1669 nulls_equal,
1670 coalesce,
1671 maintain_order,
1672 } = args;
1673
1674 if slice.is_some() {
1675 panic!("impl error: slice is not handled")
1676 }
1677
1678 let mut builder = self
1679 .join_builder()
1680 .with(other)
1681 .left_on(left_on)
1682 .right_on(right_on)
1683 .how(how)
1684 .validate(validation)
1685 .join_nulls(nulls_equal)
1686 .coalesce(coalesce)
1687 .maintain_order(maintain_order);
1688
1689 if let Some(suffix) = suffix {
1690 builder = builder.suffix(suffix);
1691 }
1692
1693 builder.finish()
1695 }
1696
1697 pub fn join_builder(self) -> JoinBuilder {
1703 JoinBuilder::new(self)
1704 }
1705
1706 pub fn with_column(self, expr: Expr) -> LazyFrame {
1724 let opt_state = self.get_opt_state();
1725 let lp = self
1726 .get_plan_builder()
1727 .with_columns(
1728 vec![expr],
1729 ProjectionOptions {
1730 run_parallel: false,
1731 duplicate_check: true,
1732 should_broadcast: true,
1733 },
1734 )
1735 .build();
1736 Self::from_logical_plan(lp, opt_state)
1737 }
1738
1739 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1754 let exprs = exprs.as_ref().to_vec();
1755 self.with_columns_impl(
1756 exprs,
1757 ProjectionOptions {
1758 run_parallel: true,
1759 duplicate_check: true,
1760 should_broadcast: true,
1761 },
1762 )
1763 }
1764
1765 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1767 let exprs = exprs.as_ref().to_vec();
1768 self.with_columns_impl(
1769 exprs,
1770 ProjectionOptions {
1771 run_parallel: false,
1772 duplicate_check: true,
1773 should_broadcast: true,
1774 },
1775 )
1776 }
1777
1778 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1779 let opt_state = self.get_opt_state();
1780 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1781 Self::from_logical_plan(lp, opt_state)
1782 }
1783
1784 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1785 let contexts = contexts
1786 .as_ref()
1787 .iter()
1788 .map(|lf| lf.logical_plan.clone())
1789 .collect();
1790 let opt_state = self.get_opt_state();
1791 let lp = self.get_plan_builder().with_context(contexts).build();
1792 Self::from_logical_plan(lp, opt_state)
1793 }
1794
1795 pub fn max(self) -> Self {
1799 self.map_private(DslFunction::Stats(StatsFunction::Max))
1800 }
1801
1802 pub fn min(self) -> Self {
1806 self.map_private(DslFunction::Stats(StatsFunction::Min))
1807 }
1808
1809 pub fn sum(self) -> Self {
1819 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1820 }
1821
1822 pub fn mean(self) -> Self {
1827 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1828 }
1829
1830 pub fn median(self) -> Self {
1836 self.map_private(DslFunction::Stats(StatsFunction::Median))
1837 }
1838
1839 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1841 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1842 quantile,
1843 method,
1844 }))
1845 }
1846
1847 pub fn std(self, ddof: u8) -> Self {
1860 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1861 }
1862
1863 pub fn var(self, ddof: u8) -> Self {
1873 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1874 }
1875
1876 pub fn explode<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(self, columns: E) -> LazyFrame {
1878 self.explode_impl(columns, false)
1879 }
1880
1881 fn explode_impl<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(
1883 self,
1884 columns: E,
1885 allow_empty: bool,
1886 ) -> LazyFrame {
1887 let columns = columns
1888 .as_ref()
1889 .iter()
1890 .map(|e| e.clone().into())
1891 .collect::<Vec<_>>();
1892 let opt_state = self.get_opt_state();
1893 let lp = self
1894 .get_plan_builder()
1895 .explode(columns, allow_empty)
1896 .build();
1897 Self::from_logical_plan(lp, opt_state)
1898 }
1899
1900 pub fn null_count(self) -> LazyFrame {
1902 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1903 }
1904
1905 pub fn unique_stable(
1910 self,
1911 subset: Option<Vec<PlSmallStr>>,
1912 keep_strategy: UniqueKeepStrategy,
1913 ) -> LazyFrame {
1914 self.unique_stable_generic(subset, keep_strategy)
1915 }
1916
1917 pub fn unique_stable_generic<E, IE>(
1918 self,
1919 subset: Option<E>,
1920 keep_strategy: UniqueKeepStrategy,
1921 ) -> LazyFrame
1922 where
1923 E: AsRef<[IE]>,
1924 IE: Into<Selector> + Clone,
1925 {
1926 let subset = subset.map(|s| {
1927 s.as_ref()
1928 .iter()
1929 .map(|e| e.clone().into())
1930 .collect::<Vec<_>>()
1931 });
1932
1933 let opt_state = self.get_opt_state();
1934 let options = DistinctOptionsDSL {
1935 subset,
1936 maintain_order: true,
1937 keep_strategy,
1938 };
1939 let lp = self.get_plan_builder().distinct(options).build();
1940 Self::from_logical_plan(lp, opt_state)
1941 }
1942
1943 pub fn unique(
1951 self,
1952 subset: Option<Vec<String>>,
1953 keep_strategy: UniqueKeepStrategy,
1954 ) -> LazyFrame {
1955 self.unique_generic(subset, keep_strategy)
1956 }
1957
1958 pub fn unique_generic<E: AsRef<[IE]>, IE: Into<Selector> + Clone>(
1959 self,
1960 subset: Option<E>,
1961 keep_strategy: UniqueKeepStrategy,
1962 ) -> LazyFrame {
1963 let subset = subset.map(|s| {
1964 s.as_ref()
1965 .iter()
1966 .map(|e| e.clone().into())
1967 .collect::<Vec<_>>()
1968 });
1969 let opt_state = self.get_opt_state();
1970 let options = DistinctOptionsDSL {
1971 subset,
1972 maintain_order: false,
1973 keep_strategy,
1974 };
1975 let lp = self.get_plan_builder().distinct(options).build();
1976 Self::from_logical_plan(lp, opt_state)
1977 }
1978
1979 pub fn drop_nans(self, subset: Option<Vec<Expr>>) -> LazyFrame {
1984 let opt_state = self.get_opt_state();
1985 let lp = self.get_plan_builder().drop_nans(subset).build();
1986 Self::from_logical_plan(lp, opt_state)
1987 }
1988
1989 pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> LazyFrame {
1994 let opt_state = self.get_opt_state();
1995 let lp = self.get_plan_builder().drop_nulls(subset).build();
1996 Self::from_logical_plan(lp, opt_state)
1997 }
1998
1999 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
2009 let opt_state = self.get_opt_state();
2010 let lp = self.get_plan_builder().slice(offset, len).build();
2011 Self::from_logical_plan(lp, opt_state)
2012 }
2013
2014 pub fn first(self) -> LazyFrame {
2018 self.slice(0, 1)
2019 }
2020
2021 pub fn last(self) -> LazyFrame {
2025 self.slice(-1, 1)
2026 }
2027
2028 pub fn tail(self, n: IdxSize) -> LazyFrame {
2032 let neg_tail = -(n as i64);
2033 self.slice(neg_tail, n)
2034 }
2035
2036 #[cfg(feature = "pivot")]
2040 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
2041 let opt_state = self.get_opt_state();
2042 let lp = self.get_plan_builder().unpivot(args).build();
2043 Self::from_logical_plan(lp, opt_state)
2044 }
2045
2046 pub fn limit(self, n: IdxSize) -> LazyFrame {
2050 self.slice(0, n)
2051 }
2052
2053 pub fn map<F>(
2067 self,
2068 function: F,
2069 optimizations: AllowedOptimizations,
2070 schema: Option<Arc<dyn UdfSchema>>,
2071 name: Option<&'static str>,
2072 ) -> LazyFrame
2073 where
2074 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
2075 {
2076 let opt_state = self.get_opt_state();
2077 let lp = self
2078 .get_plan_builder()
2079 .map(
2080 function,
2081 optimizations,
2082 schema,
2083 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
2084 )
2085 .build();
2086 Self::from_logical_plan(lp, opt_state)
2087 }
2088
2089 #[cfg(feature = "python")]
2090 pub fn map_python(
2091 self,
2092 function: polars_utils::python_function::PythonFunction,
2093 optimizations: AllowedOptimizations,
2094 schema: Option<SchemaRef>,
2095 validate_output: bool,
2096 ) -> LazyFrame {
2097 let opt_state = self.get_opt_state();
2098 let lp = self
2099 .get_plan_builder()
2100 .map_python(function, optimizations, schema, validate_output)
2101 .build();
2102 Self::from_logical_plan(lp, opt_state)
2103 }
2104
2105 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2106 let opt_state = self.get_opt_state();
2107 let lp = self.get_plan_builder().map_private(function).build();
2108 Self::from_logical_plan(lp, opt_state)
2109 }
2110
2111 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2120 where
2121 S: Into<PlSmallStr>,
2122 {
2123 let name = name.into();
2124
2125 match &self.logical_plan {
2126 v @ DslPlan::Scan { scan_type, .. }
2127 if !matches!(&**scan_type, FileScan::Anonymous { .. }) =>
2128 {
2129 let DslPlan::Scan {
2130 sources,
2131 mut file_options,
2132 scan_type,
2133 file_info,
2134 cached_ir: _,
2135 } = v.clone()
2136 else {
2137 unreachable!()
2138 };
2139
2140 file_options.row_index = Some(RowIndex {
2141 name,
2142 offset: offset.unwrap_or(0),
2143 });
2144
2145 DslPlan::Scan {
2146 sources,
2147 file_options,
2148 scan_type,
2149 file_info,
2150 cached_ir: Default::default(),
2151 }
2152 .into()
2153 },
2154 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2155 }
2156 }
2157
2158 pub fn count(self) -> LazyFrame {
2160 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2161 }
2162
2163 #[cfg(feature = "dtype-struct")]
2166 pub fn unnest<E, IE>(self, cols: E) -> Self
2167 where
2168 E: AsRef<[IE]>,
2169 IE: Into<Selector> + Clone,
2170 {
2171 let cols = cols
2172 .as_ref()
2173 .iter()
2174 .map(|ie| ie.clone().into())
2175 .collect::<Vec<_>>();
2176 self.map_private(DslFunction::Unnest(cols))
2177 }
2178
2179 #[cfg(feature = "merge_sorted")]
2180 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2181 where
2182 S: Into<PlSmallStr>,
2183 {
2184 let key = key.into();
2185
2186 let lp = DslPlan::MergeSorted {
2187 input_left: Arc::new(self.logical_plan),
2188 input_right: Arc::new(other.logical_plan),
2189 key,
2190 };
2191 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2192 }
2193}
2194
2195#[derive(Clone)]
2197pub struct LazyGroupBy {
2198 pub logical_plan: DslPlan,
2199 opt_state: OptFlags,
2200 keys: Vec<Expr>,
2201 maintain_order: bool,
2202 #[cfg(feature = "dynamic_group_by")]
2203 dynamic_options: Option<DynamicGroupOptions>,
2204 #[cfg(feature = "dynamic_group_by")]
2205 rolling_options: Option<RollingGroupOptions>,
2206}
2207
2208impl From<LazyGroupBy> for LazyFrame {
2209 fn from(lgb: LazyGroupBy) -> Self {
2210 Self {
2211 logical_plan: lgb.logical_plan,
2212 opt_state: lgb.opt_state,
2213 cached_arena: Default::default(),
2214 }
2215 }
2216}
2217
2218impl LazyGroupBy {
2219 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2241 #[cfg(feature = "dynamic_group_by")]
2242 let lp = DslBuilder::from(self.logical_plan)
2243 .group_by(
2244 self.keys,
2245 aggs,
2246 None,
2247 self.maintain_order,
2248 self.dynamic_options,
2249 self.rolling_options,
2250 )
2251 .build();
2252
2253 #[cfg(not(feature = "dynamic_group_by"))]
2254 let lp = DslBuilder::from(self.logical_plan)
2255 .group_by(self.keys, aggs, None, self.maintain_order)
2256 .build();
2257 LazyFrame::from_logical_plan(lp, self.opt_state)
2258 }
2259
2260 pub fn head(self, n: Option<usize>) -> LazyFrame {
2262 let keys = self
2263 .keys
2264 .iter()
2265 .filter_map(|expr| expr_output_name(expr).ok())
2266 .collect::<Vec<_>>();
2267
2268 self.agg([col(PlSmallStr::from_static("*"))
2269 .exclude(keys.iter().cloned())
2270 .head(n)])
2271 .explode_impl(
2272 [col(PlSmallStr::from_static("*")).exclude(keys.iter().cloned())],
2273 true,
2274 )
2275 }
2276
2277 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2279 let keys = self
2280 .keys
2281 .iter()
2282 .filter_map(|expr| expr_output_name(expr).ok())
2283 .collect::<Vec<_>>();
2284
2285 self.agg([col(PlSmallStr::from_static("*"))
2286 .exclude(keys.iter().cloned())
2287 .tail(n)])
2288 .explode_impl(
2289 [col(PlSmallStr::from_static("*")).exclude(keys.iter().cloned())],
2290 true,
2291 )
2292 }
2293
2294 pub fn apply<F>(self, f: F, schema: SchemaRef) -> LazyFrame
2299 where
2300 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
2301 {
2302 #[cfg(feature = "dynamic_group_by")]
2303 let options = GroupbyOptions {
2304 dynamic: self.dynamic_options,
2305 rolling: self.rolling_options,
2306 slice: None,
2307 };
2308
2309 #[cfg(not(feature = "dynamic_group_by"))]
2310 let options = GroupbyOptions { slice: None };
2311
2312 let lp = DslPlan::GroupBy {
2313 input: Arc::new(self.logical_plan),
2314 keys: self.keys,
2315 aggs: vec![],
2316 apply: Some((Arc::new(f), schema)),
2317 maintain_order: self.maintain_order,
2318 options: Arc::new(options),
2319 };
2320 LazyFrame::from_logical_plan(lp, self.opt_state)
2321 }
2322}
2323
2324#[must_use]
2325pub struct JoinBuilder {
2326 lf: LazyFrame,
2327 how: JoinType,
2328 other: Option<LazyFrame>,
2329 left_on: Vec<Expr>,
2330 right_on: Vec<Expr>,
2331 allow_parallel: bool,
2332 force_parallel: bool,
2333 suffix: Option<PlSmallStr>,
2334 validation: JoinValidation,
2335 nulls_equal: bool,
2336 coalesce: JoinCoalesce,
2337 maintain_order: MaintainOrderJoin,
2338}
2339impl JoinBuilder {
2340 pub fn new(lf: LazyFrame) -> Self {
2342 Self {
2343 lf,
2344 other: None,
2345 how: JoinType::Inner,
2346 left_on: vec![],
2347 right_on: vec![],
2348 allow_parallel: true,
2349 force_parallel: false,
2350 suffix: None,
2351 validation: Default::default(),
2352 nulls_equal: false,
2353 coalesce: Default::default(),
2354 maintain_order: Default::default(),
2355 }
2356 }
2357
2358 pub fn with(mut self, other: LazyFrame) -> Self {
2360 self.other = Some(other);
2361 self
2362 }
2363
2364 pub fn how(mut self, how: JoinType) -> Self {
2366 self.how = how;
2367 self
2368 }
2369
2370 pub fn validate(mut self, validation: JoinValidation) -> Self {
2371 self.validation = validation;
2372 self
2373 }
2374
2375 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2379 let on = on.as_ref().to_vec();
2380 self.left_on.clone_from(&on);
2381 self.right_on = on;
2382 self
2383 }
2384
2385 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2389 self.left_on = on.as_ref().to_vec();
2390 self
2391 }
2392
2393 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2397 self.right_on = on.as_ref().to_vec();
2398 self
2399 }
2400
2401 pub fn allow_parallel(mut self, allow: bool) -> Self {
2403 self.allow_parallel = allow;
2404 self
2405 }
2406
2407 pub fn force_parallel(mut self, force: bool) -> Self {
2409 self.force_parallel = force;
2410 self
2411 }
2412
2413 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2415 self.nulls_equal = nulls_equal;
2416 self
2417 }
2418
2419 pub fn suffix<S>(mut self, suffix: S) -> Self
2422 where
2423 S: Into<PlSmallStr>,
2424 {
2425 self.suffix = Some(suffix.into());
2426 self
2427 }
2428
2429 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2431 self.coalesce = coalesce;
2432 self
2433 }
2434
2435 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2437 self.maintain_order = maintain_order;
2438 self
2439 }
2440
2441 pub fn finish(self) -> LazyFrame {
2443 let opt_state = self.lf.opt_state;
2444 let other = self.other.expect("'with' not set in join builder");
2445
2446 let args = JoinArgs {
2447 how: self.how,
2448 validation: self.validation,
2449 suffix: self.suffix,
2450 slice: None,
2451 nulls_equal: self.nulls_equal,
2452 coalesce: self.coalesce,
2453 maintain_order: self.maintain_order,
2454 };
2455
2456 let lp = self
2457 .lf
2458 .get_plan_builder()
2459 .join(
2460 other.logical_plan,
2461 self.left_on,
2462 self.right_on,
2463 JoinOptions {
2464 allow_parallel: self.allow_parallel,
2465 force_parallel: self.force_parallel,
2466 args,
2467 ..Default::default()
2468 }
2469 .into(),
2470 )
2471 .build();
2472 LazyFrame::from_logical_plan(lp, opt_state)
2473 }
2474
2475 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2477 let opt_state = self.lf.opt_state;
2478 let other = self.other.expect("with not set");
2479
2480 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2482 if let Expr::BinaryExpr {
2483 op: Operator::And,
2484 left,
2485 right,
2486 } = predicate
2487 {
2488 decompose_and((*left).clone(), expanded_predicates);
2489 decompose_and((*right).clone(), expanded_predicates);
2490 } else {
2491 expanded_predicates.push(predicate);
2492 }
2493 }
2494 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2495 for predicate in predicates {
2496 decompose_and(predicate, &mut expanded_predicates);
2497 }
2498 let predicates: Vec<Expr> = expanded_predicates;
2499
2500 #[cfg(feature = "is_between")]
2502 let predicates: Vec<Expr> = {
2503 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2504 for predicate in predicates {
2505 if let Expr::Function {
2506 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2507 input,
2508 ..
2509 } = &predicate
2510 {
2511 if let [expr, lower, upper] = input.as_slice() {
2512 match closed {
2513 ClosedInterval::Both => {
2514 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2515 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2516 },
2517 ClosedInterval::Right => {
2518 expanded_predicates.push(expr.clone().gt(lower.clone()));
2519 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2520 },
2521 ClosedInterval::Left => {
2522 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2523 expanded_predicates.push(expr.clone().lt(upper.clone()));
2524 },
2525 ClosedInterval::None => {
2526 expanded_predicates.push(expr.clone().gt(lower.clone()));
2527 expanded_predicates.push(expr.clone().lt(upper.clone()));
2528 },
2529 }
2530 continue;
2531 }
2532 }
2533 expanded_predicates.push(predicate);
2534 }
2535 expanded_predicates
2536 };
2537
2538 let args = JoinArgs {
2539 how: self.how,
2540 validation: self.validation,
2541 suffix: self.suffix,
2542 slice: None,
2543 nulls_equal: self.nulls_equal,
2544 coalesce: self.coalesce,
2545 maintain_order: self.maintain_order,
2546 };
2547 let options = JoinOptions {
2548 allow_parallel: self.allow_parallel,
2549 force_parallel: self.force_parallel,
2550 args,
2551 ..Default::default()
2552 };
2553
2554 let lp = DslPlan::Join {
2555 input_left: Arc::new(self.lf.logical_plan),
2556 input_right: Arc::new(other.logical_plan),
2557 left_on: Default::default(),
2558 right_on: Default::default(),
2559 predicates,
2560 options: Arc::from(options),
2561 };
2562
2563 LazyFrame::from_logical_plan(lp, opt_state)
2564 }
2565}