1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "ipc")]
21pub use ipc::*;
22#[cfg(feature = "json")]
23pub use ndjson::*;
24#[cfg(feature = "parquet")]
25pub use parquet::*;
26use polars_compute::rolling::QuantileMethod;
27use polars_core::POOL;
28use polars_core::error::feature_gated;
29use polars_core::prelude::*;
30use polars_expr::{ExpressionConversionState, create_physical_expr};
31use polars_io::RowIndex;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38use polars_utils::plpath::PlPath;
39use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
40
41use crate::frame::cached_arenas::CachedArena;
42use crate::prelude::*;
43
44pub trait IntoLazy {
45 fn lazy(self) -> LazyFrame;
46}
47
48impl IntoLazy for DataFrame {
49 fn lazy(self) -> LazyFrame {
51 let lp = DslBuilder::from_existing_df(self).build();
52 LazyFrame {
53 logical_plan: lp,
54 opt_state: Default::default(),
55 cached_arena: Default::default(),
56 }
57 }
58}
59
60impl IntoLazy for LazyFrame {
61 fn lazy(self) -> LazyFrame {
62 self
63 }
64}
65
66#[derive(Clone, Default)]
71#[must_use]
72pub struct LazyFrame {
73 pub logical_plan: DslPlan,
74 pub(crate) opt_state: OptFlags,
75 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
76}
77
78impl From<DslPlan> for LazyFrame {
79 fn from(plan: DslPlan) -> Self {
80 Self {
81 logical_plan: plan,
82 opt_state: OptFlags::default(),
83 cached_arena: Default::default(),
84 }
85 }
86}
87
88impl LazyFrame {
89 pub(crate) fn from_inner(
90 logical_plan: DslPlan,
91 opt_state: OptFlags,
92 cached_arena: Arc<Mutex<Option<CachedArena>>>,
93 ) -> Self {
94 Self {
95 logical_plan,
96 opt_state,
97 cached_arena,
98 }
99 }
100
101 pub(crate) fn get_plan_builder(self) -> DslBuilder {
102 DslBuilder::from(self.logical_plan)
103 }
104
105 fn get_opt_state(&self) -> OptFlags {
106 self.opt_state
107 }
108
109 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
110 LazyFrame {
111 logical_plan,
112 opt_state,
113 cached_arena: Default::default(),
114 }
115 }
116
117 pub fn get_current_optimizations(&self) -> OptFlags {
119 self.opt_state
120 }
121
122 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
124 self.opt_state = opt_state;
125 self
126 }
127
128 pub fn without_optimizations(self) -> Self {
130 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
131 }
132
133 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
135 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
136 self
137 }
138
139 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
141 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
142 self
143 }
144
145 pub fn with_collapse_joins(mut self, toggle: bool) -> Self {
147 self.opt_state.set(OptFlags::COLLAPSE_JOINS, toggle);
148 self
149 }
150
151 pub fn with_check_order(mut self, toggle: bool) -> Self {
154 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
155 self
156 }
157
158 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
160 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
161 self
162 }
163
164 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
166 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
167 self
168 }
169
170 pub fn with_type_check(mut self, toggle: bool) -> Self {
172 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
173 self
174 }
175
176 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
178 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
179 self
180 }
181
182 #[cfg(feature = "cse")]
184 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
185 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
186 self
187 }
188
189 #[cfg(feature = "cse")]
191 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
192 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
193 self
194 }
195
196 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
198 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
199 self
200 }
201
202 #[cfg(feature = "new_streaming")]
203 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
204 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
205 self
206 }
207
208 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
210 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
211 self
212 }
213
214 pub fn _with_eager(mut self, toggle: bool) -> Self {
216 self.opt_state.set(OptFlags::EAGER, toggle);
217 self
218 }
219
220 pub fn describe_plan(&self) -> PolarsResult<String> {
222 Ok(self.clone().to_alp()?.describe())
223 }
224
225 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
227 Ok(self.clone().to_alp()?.describe_tree_format())
228 }
229
230 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
234 Ok(self.clone().to_alp_optimized()?.describe())
235 }
236
237 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
241 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
242 }
243
244 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
249 if optimized {
250 self.describe_optimized_plan()
251 } else {
252 self.describe_plan()
253 }
254 }
255
256 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
296 let opt_state = self.get_opt_state();
297 let lp = self
298 .get_plan_builder()
299 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
300 .build();
301 Self::from_logical_plan(lp, opt_state)
302 }
303
304 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
324 self,
325 by_exprs: E,
326 sort_options: SortMultipleOptions,
327 ) -> Self {
328 let by_exprs = by_exprs.as_ref().to_vec();
329 if by_exprs.is_empty() {
330 self
331 } else {
332 let opt_state = self.get_opt_state();
333 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
334 Self::from_logical_plan(lp, opt_state)
335 }
336 }
337
338 pub fn top_k<E: AsRef<[Expr]>>(
339 self,
340 k: IdxSize,
341 by_exprs: E,
342 sort_options: SortMultipleOptions,
343 ) -> Self {
344 self.sort_by_exprs(
346 by_exprs,
347 sort_options.with_order_reversed().with_nulls_last(true),
348 )
349 .slice(0, k)
350 }
351
352 pub fn bottom_k<E: AsRef<[Expr]>>(
353 self,
354 k: IdxSize,
355 by_exprs: E,
356 sort_options: SortMultipleOptions,
357 ) -> Self {
358 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
360 .slice(0, k)
361 }
362
363 pub fn reverse(self) -> Self {
379 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
380 }
381
382 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
390 where
391 I: IntoIterator<Item = T>,
392 J: IntoIterator<Item = S>,
393 T: AsRef<str>,
394 S: AsRef<str>,
395 {
396 let iter = existing.into_iter();
397 let cap = iter.size_hint().0;
398 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
399 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
400
401 for (existing, new) in iter.zip(new) {
404 let existing = existing.as_ref();
405 let new = new.as_ref();
406 if new != existing {
407 existing_vec.push(existing.into());
408 new_vec.push(new.into());
409 }
410 }
411
412 self.map_private(DslFunction::Rename {
413 existing: existing_vec.into(),
414 new: new_vec.into(),
415 strict,
416 })
417 }
418
419 pub fn drop(self, columns: Selector) -> Self {
426 let opt_state = self.get_opt_state();
427 let lp = self.get_plan_builder().drop(columns).build();
428 Self::from_logical_plan(lp, opt_state)
429 }
430
431 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
436 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
437 }
438
439 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
444 self.select(vec![
445 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
446 ])
447 }
448
449 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
451 let opt_state = self.get_opt_state();
452 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
453 Self::from_logical_plan(lp, opt_state)
454 }
455
456 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
458 let opt_state = self.get_opt_state();
459 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
460 Self::from_logical_plan(lp, opt_state)
461 }
462
463 pub fn cache(self) -> Self {
467 let opt_state = self.get_opt_state();
468 let lp = self.get_plan_builder().cache().build();
469 Self::from_logical_plan(lp, opt_state)
470 }
471
472 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
474 let cast_cols: Vec<Expr> = dtypes
475 .into_iter()
476 .map(|(name, dt)| {
477 let name = PlSmallStr::from_str(name);
478
479 if strict {
480 col(name).strict_cast(dt)
481 } else {
482 col(name).cast(dt)
483 }
484 })
485 .collect();
486
487 if cast_cols.is_empty() {
488 self
489 } else {
490 self.with_columns(cast_cols)
491 }
492 }
493
494 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
496 self.with_columns(vec![if strict {
497 col(PlSmallStr::from_static("*")).strict_cast(dtype)
498 } else {
499 col(PlSmallStr::from_static("*")).cast(dtype)
500 }])
501 }
502
503 pub fn optimize(
504 self,
505 lp_arena: &mut Arena<IR>,
506 expr_arena: &mut Arena<AExpr>,
507 ) -> PolarsResult<Node> {
508 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
509 }
510
511 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
512 let (mut lp_arena, mut expr_arena) = self.get_arenas();
513 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
514
515 Ok(IRPlan::new(node, lp_arena, expr_arena))
516 }
517
518 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
519 let (mut lp_arena, mut expr_arena) = self.get_arenas();
520 let node = to_alp(
521 self.logical_plan,
522 &mut expr_arena,
523 &mut lp_arena,
524 &mut self.opt_state,
525 )?;
526 let plan = IRPlan::new(node, lp_arena, expr_arena);
527 Ok(plan)
528 }
529
530 pub(crate) fn optimize_with_scratch(
531 self,
532 lp_arena: &mut Arena<IR>,
533 expr_arena: &mut Arena<AExpr>,
534 scratch: &mut Vec<Node>,
535 ) -> PolarsResult<Node> {
536 #[allow(unused_mut)]
537 let mut opt_state = self.opt_state;
538 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
539
540 #[cfg(feature = "cse")]
541 if new_streaming {
542 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
545 }
546
547 let lp_top = optimize(
548 self.logical_plan,
549 opt_state,
550 lp_arena,
551 expr_arena,
552 scratch,
553 Some(&|expr, expr_arena, schema| {
554 let phys_expr = create_physical_expr(
555 expr,
556 Context::Default,
557 expr_arena,
558 schema,
559 &mut ExpressionConversionState::new(true),
560 )
561 .ok()?;
562 let io_expr = phys_expr_to_io_expr(phys_expr);
563 Some(io_expr)
564 }),
565 )?;
566
567 Ok(lp_top)
568 }
569
570 fn prepare_collect_post_opt<P>(
571 mut self,
572 check_sink: bool,
573 query_start: Option<std::time::Instant>,
574 post_opt: P,
575 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
576 where
577 P: FnOnce(
578 Node,
579 &mut Arena<IR>,
580 &mut Arena<AExpr>,
581 Option<std::time::Duration>,
582 ) -> PolarsResult<()>,
583 {
584 let (mut lp_arena, mut expr_arena) = self.get_arenas();
585
586 let mut scratch = vec![];
587 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
588
589 post_opt(
590 lp_top,
591 &mut lp_arena,
592 &mut expr_arena,
593 query_start.map(|s| s.elapsed()),
596 )?;
597
598 let no_file_sink = if check_sink {
600 !matches!(
601 lp_arena.get(lp_top),
602 IR::Sink {
603 payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
604 ..
605 }
606 )
607 } else {
608 true
609 };
610 let physical_plan = create_physical_plan(
611 lp_top,
612 &mut lp_arena,
613 &mut expr_arena,
614 BUILD_STREAMING_EXECUTOR,
615 )?;
616
617 let state = ExecutionState::new();
618 Ok((state, physical_plan, no_file_sink))
619 }
620
621 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
623 where
624 P: FnOnce(
625 Node,
626 &mut Arena<IR>,
627 &mut Arena<AExpr>,
628 Option<std::time::Duration>,
629 ) -> PolarsResult<()>,
630 {
631 let (mut state, mut physical_plan, _) =
632 self.prepare_collect_post_opt(false, None, post_opt)?;
633 physical_plan.execute(&mut state)
634 }
635
636 #[allow(unused_mut)]
637 fn prepare_collect(
638 self,
639 check_sink: bool,
640 query_start: Option<std::time::Instant>,
641 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
642 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
643 }
644
645 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
650 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
651 payload.clone()
652 } else {
653 self.logical_plan = DslPlan::Sink {
654 input: Arc::new(self.logical_plan),
655 payload: SinkType::Memory,
656 };
657 SinkType::Memory
658 };
659
660 if engine == Engine::Auto {
662 engine = match payload {
663 #[cfg(feature = "new_streaming")]
664 SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
665 _ => Engine::InMemory,
666 };
667 }
668 if engine == Engine::Gpu {
670 engine = Engine::InMemory;
671 }
672
673 #[cfg(feature = "new_streaming")]
674 {
675 if let Some(result) = self.try_new_streaming_if_requested() {
676 return result.map(|v| v.unwrap_single());
677 }
678 }
679
680 match engine {
681 Engine::Auto => unreachable!(),
682 Engine::Streaming => {
683 feature_gated!("new_streaming", self = self.with_new_streaming(true))
684 },
685 _ => {},
686 }
687 let mut alp_plan = self.clone().to_alp_optimized()?;
688
689 match engine {
690 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
691 let result = polars_stream::run_query(
692 alp_plan.lp_top,
693 &mut alp_plan.lp_arena,
694 &mut alp_plan.expr_arena,
695 );
696 result.map(|v| v.unwrap_single())
697 }),
698 Engine::Gpu => {
699 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
700 },
701 Engine::InMemory => {
702 let mut physical_plan = create_physical_plan(
703 alp_plan.lp_top,
704 &mut alp_plan.lp_arena,
705 &mut alp_plan.expr_arena,
706 BUILD_STREAMING_EXECUTOR,
707 )?;
708 let mut state = ExecutionState::new();
709 physical_plan.execute(&mut state)
710 },
711 }
712 }
713
714 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
715 let sink_multiple = LazyFrame {
716 logical_plan: DslPlan::SinkMultiple { inputs: plans },
717 opt_state,
718 cached_arena: Default::default(),
719 };
720 sink_multiple.explain(true)
721 }
722
723 pub fn collect_all_with_engine(
724 plans: Vec<DslPlan>,
725 mut engine: Engine,
726 opt_state: OptFlags,
727 ) -> PolarsResult<Vec<DataFrame>> {
728 if plans.is_empty() {
729 return Ok(Vec::new());
730 }
731
732 if engine == Engine::Auto {
734 engine = Engine::InMemory;
735 }
736 if engine == Engine::Gpu {
738 engine = Engine::InMemory;
739 }
740
741 let mut sink_multiple = LazyFrame {
742 logical_plan: DslPlan::SinkMultiple { inputs: plans },
743 opt_state,
744 cached_arena: Default::default(),
745 };
746
747 #[cfg(feature = "new_streaming")]
748 {
749 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
750 return result.map(|v| v.unwrap_multiple());
751 }
752 }
753
754 match engine {
755 Engine::Auto => unreachable!(),
756 Engine::Streaming => {
757 feature_gated!(
758 "new_streaming",
759 sink_multiple = sink_multiple.with_new_streaming(true)
760 )
761 },
762 _ => {},
763 }
764 let mut alp_plan = sink_multiple.to_alp_optimized()?;
765
766 if engine == Engine::Streaming {
767 feature_gated!("new_streaming", {
768 let result = polars_stream::run_query(
769 alp_plan.lp_top,
770 &mut alp_plan.lp_arena,
771 &mut alp_plan.expr_arena,
772 );
773 return result.map(|v| v.unwrap_multiple());
774 });
775 }
776
777 let IR::SinkMultiple { inputs } = alp_plan.root() else {
778 unreachable!()
779 };
780
781 let mut multiplan = create_multiple_physical_plans(
782 inputs.clone().as_slice(),
783 &mut alp_plan.lp_arena,
784 &mut alp_plan.expr_arena,
785 BUILD_STREAMING_EXECUTOR,
786 )?;
787
788 match engine {
789 Engine::Gpu => polars_bail!(
790 InvalidOperation: "collect_all is not supported for the gpu engine"
791 ),
792 Engine::InMemory => {
793 let mut state = ExecutionState::new();
797 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
798 cache_prefiller.execute(&mut state)?;
799 }
800 let out = POOL.install(|| {
801 multiplan
802 .physical_plans
803 .chunks_mut(POOL.current_num_threads() * 3)
804 .map(|chunk| {
805 chunk
806 .into_par_iter()
807 .enumerate()
808 .map(|(idx, input)| {
809 let mut input = std::mem::take(input);
810 let mut state = state.split();
811 state.branch_idx += idx;
812
813 let df = input.execute(&mut state)?;
814 Ok(df)
815 })
816 .collect::<PolarsResult<Vec<_>>>()
817 })
818 .collect::<PolarsResult<Vec<_>>>()
819 });
820 Ok(out?.into_iter().flatten().collect())
821 },
822 _ => unreachable!(),
823 }
824 }
825
826 pub fn collect(self) -> PolarsResult<DataFrame> {
844 self.collect_with_engine(Engine::InMemory)
845 }
846
847 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
850 where
851 P: FnOnce(
852 Node,
853 &mut Arena<IR>,
854 &mut Arena<AExpr>,
855 Option<std::time::Duration>,
856 ) -> PolarsResult<()>,
857 {
858 let query_start = std::time::Instant::now();
859 let (mut state, mut physical_plan, _) =
860 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
861 state.time_nodes(query_start);
862 let out = physical_plan.execute(&mut state)?;
863 let timer_df = state.finish_timer()?;
864 Ok((out, timer_df))
865 }
866
867 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
875 self._profile_post_opt(|_, _, _, _| Ok(()))
876 }
877
878 #[cfg(feature = "parquet")]
882 pub fn sink_parquet(
883 self,
884 target: SinkTarget,
885 options: ParquetWriteOptions,
886 cloud_options: Option<polars_io::cloud::CloudOptions>,
887 sink_options: SinkOptions,
888 ) -> PolarsResult<Self> {
889 self.sink(SinkType::File(FileSinkType {
890 target,
891 sink_options,
892 file_type: FileType::Parquet(options),
893 cloud_options,
894 }))
895 }
896
897 #[cfg(feature = "ipc")]
901 pub fn sink_ipc(
902 self,
903 target: SinkTarget,
904 options: IpcWriterOptions,
905 cloud_options: Option<polars_io::cloud::CloudOptions>,
906 sink_options: SinkOptions,
907 ) -> PolarsResult<Self> {
908 self.sink(SinkType::File(FileSinkType {
909 target,
910 sink_options,
911 file_type: FileType::Ipc(options),
912 cloud_options,
913 }))
914 }
915
916 #[cfg(feature = "csv")]
920 pub fn sink_csv(
921 self,
922 target: SinkTarget,
923 options: CsvWriterOptions,
924 cloud_options: Option<polars_io::cloud::CloudOptions>,
925 sink_options: SinkOptions,
926 ) -> PolarsResult<Self> {
927 self.sink(SinkType::File(FileSinkType {
928 target,
929 sink_options,
930 file_type: FileType::Csv(options),
931 cloud_options,
932 }))
933 }
934
935 #[cfg(feature = "json")]
939 pub fn sink_json(
940 self,
941 target: SinkTarget,
942 options: JsonWriterOptions,
943 cloud_options: Option<polars_io::cloud::CloudOptions>,
944 sink_options: SinkOptions,
945 ) -> PolarsResult<Self> {
946 self.sink(SinkType::File(FileSinkType {
947 target,
948 sink_options,
949 file_type: FileType::Json(options),
950 cloud_options,
951 }))
952 }
953
954 #[cfg(feature = "parquet")]
958 #[allow(clippy::too_many_arguments)]
959 pub fn sink_parquet_partitioned(
960 self,
961 base_path: Arc<PlPath>,
962 file_path_cb: Option<PartitionTargetCallback>,
963 variant: PartitionVariant,
964 options: ParquetWriteOptions,
965 cloud_options: Option<polars_io::cloud::CloudOptions>,
966 sink_options: SinkOptions,
967 per_partition_sort_by: Option<Vec<SortColumn>>,
968 finish_callback: Option<SinkFinishCallback>,
969 ) -> PolarsResult<Self> {
970 self.sink(SinkType::Partition(PartitionSinkType {
971 base_path,
972 file_path_cb,
973 sink_options,
974 variant,
975 file_type: FileType::Parquet(options),
976 cloud_options,
977 per_partition_sort_by,
978 finish_callback,
979 }))
980 }
981
982 #[cfg(feature = "ipc")]
986 #[allow(clippy::too_many_arguments)]
987 pub fn sink_ipc_partitioned(
988 self,
989 base_path: Arc<PlPath>,
990 file_path_cb: Option<PartitionTargetCallback>,
991 variant: PartitionVariant,
992 options: IpcWriterOptions,
993 cloud_options: Option<polars_io::cloud::CloudOptions>,
994 sink_options: SinkOptions,
995 per_partition_sort_by: Option<Vec<SortColumn>>,
996 finish_callback: Option<SinkFinishCallback>,
997 ) -> PolarsResult<Self> {
998 self.sink(SinkType::Partition(PartitionSinkType {
999 base_path,
1000 file_path_cb,
1001 sink_options,
1002 variant,
1003 file_type: FileType::Ipc(options),
1004 cloud_options,
1005 per_partition_sort_by,
1006 finish_callback,
1007 }))
1008 }
1009
1010 #[cfg(feature = "csv")]
1014 #[allow(clippy::too_many_arguments)]
1015 pub fn sink_csv_partitioned(
1016 self,
1017 base_path: Arc<PlPath>,
1018 file_path_cb: Option<PartitionTargetCallback>,
1019 variant: PartitionVariant,
1020 options: CsvWriterOptions,
1021 cloud_options: Option<polars_io::cloud::CloudOptions>,
1022 sink_options: SinkOptions,
1023 per_partition_sort_by: Option<Vec<SortColumn>>,
1024 finish_callback: Option<SinkFinishCallback>,
1025 ) -> PolarsResult<Self> {
1026 self.sink(SinkType::Partition(PartitionSinkType {
1027 base_path,
1028 file_path_cb,
1029 sink_options,
1030 variant,
1031 file_type: FileType::Csv(options),
1032 cloud_options,
1033 per_partition_sort_by,
1034 finish_callback,
1035 }))
1036 }
1037
1038 #[cfg(feature = "json")]
1042 #[allow(clippy::too_many_arguments)]
1043 pub fn sink_json_partitioned(
1044 self,
1045 base_path: Arc<PlPath>,
1046 file_path_cb: Option<PartitionTargetCallback>,
1047 variant: PartitionVariant,
1048 options: JsonWriterOptions,
1049 cloud_options: Option<polars_io::cloud::CloudOptions>,
1050 sink_options: SinkOptions,
1051 per_partition_sort_by: Option<Vec<SortColumn>>,
1052 finish_callback: Option<SinkFinishCallback>,
1053 ) -> PolarsResult<Self> {
1054 self.sink(SinkType::Partition(PartitionSinkType {
1055 base_path,
1056 file_path_cb,
1057 sink_options,
1058 variant,
1059 file_type: FileType::Json(options),
1060 cloud_options,
1061 per_partition_sort_by,
1062 finish_callback,
1063 }))
1064 }
1065
1066 #[cfg(feature = "new_streaming")]
1067 pub fn try_new_streaming_if_requested(
1068 &mut self,
1069 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1070 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1071 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1072
1073 if auto_new_streaming || force_new_streaming {
1074 let mut new_stream_lazy = self.clone();
1077 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1078 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1079 Ok(v) => v,
1080 Err(e) => return Some(Err(e)),
1081 };
1082
1083 let f = || {
1084 polars_stream::run_query(
1085 alp_plan.lp_top,
1086 &mut alp_plan.lp_arena,
1087 &mut alp_plan.expr_arena,
1088 )
1089 };
1090
1091 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1092 Ok(v) => return Some(v),
1093 Err(e) => {
1094 if !force_new_streaming
1097 && auto_new_streaming
1098 && e.downcast_ref::<&str>()
1099 .map(|s| s.starts_with("not yet implemented"))
1100 .unwrap_or(false)
1101 {
1102 if polars_core::config::verbose() {
1103 eprintln!(
1104 "caught unimplemented error in new streaming engine, falling back to normal engine"
1105 );
1106 }
1107 } else {
1108 std::panic::resume_unwind(e);
1109 }
1110 },
1111 }
1112 }
1113
1114 None
1115 }
1116
1117 fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1118 polars_ensure!(
1119 !matches!(self.logical_plan, DslPlan::Sink { .. }),
1120 InvalidOperation: "cannot create a sink on top of another sink"
1121 );
1122 self.logical_plan = DslPlan::Sink {
1123 input: Arc::new(self.logical_plan),
1124 payload,
1125 };
1126 Ok(self)
1127 }
1128
1129 pub fn filter(self, predicate: Expr) -> Self {
1147 let opt_state = self.get_opt_state();
1148 let lp = self.get_plan_builder().filter(predicate).build();
1149 Self::from_logical_plan(lp, opt_state)
1150 }
1151
1152 pub fn remove(self, predicate: Expr) -> Self {
1170 self.filter(predicate.neq_missing(lit(true)))
1171 }
1172
1173 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1199 let exprs = exprs.as_ref().to_vec();
1200 self.select_impl(
1201 exprs,
1202 ProjectionOptions {
1203 run_parallel: true,
1204 duplicate_check: true,
1205 should_broadcast: true,
1206 },
1207 )
1208 }
1209
1210 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1211 let exprs = exprs.as_ref().to_vec();
1212 self.select_impl(
1213 exprs,
1214 ProjectionOptions {
1215 run_parallel: false,
1216 duplicate_check: true,
1217 should_broadcast: true,
1218 },
1219 )
1220 }
1221
1222 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1223 let opt_state = self.get_opt_state();
1224 let lp = self.get_plan_builder().project(exprs, options).build();
1225 Self::from_logical_plan(lp, opt_state)
1226 }
1227
1228 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1249 let keys = by
1250 .as_ref()
1251 .iter()
1252 .map(|e| e.clone().into())
1253 .collect::<Vec<_>>();
1254 let opt_state = self.get_opt_state();
1255
1256 #[cfg(feature = "dynamic_group_by")]
1257 {
1258 LazyGroupBy {
1259 logical_plan: self.logical_plan,
1260 opt_state,
1261 keys,
1262 maintain_order: false,
1263 dynamic_options: None,
1264 rolling_options: None,
1265 }
1266 }
1267
1268 #[cfg(not(feature = "dynamic_group_by"))]
1269 {
1270 LazyGroupBy {
1271 logical_plan: self.logical_plan,
1272 opt_state,
1273 keys,
1274 maintain_order: false,
1275 }
1276 }
1277 }
1278
1279 #[cfg(feature = "dynamic_group_by")]
1287 pub fn rolling<E: AsRef<[Expr]>>(
1288 mut self,
1289 index_column: Expr,
1290 group_by: E,
1291 mut options: RollingGroupOptions,
1292 ) -> LazyGroupBy {
1293 if let Expr::Column(name) = index_column {
1294 options.index_column = name;
1295 } else {
1296 let output_field = index_column
1297 .to_field(&self.collect_schema().unwrap())
1298 .unwrap();
1299 return self.with_column(index_column).rolling(
1300 Expr::Column(output_field.name().clone()),
1301 group_by,
1302 options,
1303 );
1304 }
1305 let opt_state = self.get_opt_state();
1306 LazyGroupBy {
1307 logical_plan: self.logical_plan,
1308 opt_state,
1309 keys: group_by.as_ref().to_vec(),
1310 maintain_order: true,
1311 dynamic_options: None,
1312 rolling_options: Some(options),
1313 }
1314 }
1315
1316 #[cfg(feature = "dynamic_group_by")]
1332 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1333 mut self,
1334 index_column: Expr,
1335 group_by: E,
1336 mut options: DynamicGroupOptions,
1337 ) -> LazyGroupBy {
1338 if let Expr::Column(name) = index_column {
1339 options.index_column = name;
1340 } else {
1341 let output_field = index_column
1342 .to_field(&self.collect_schema().unwrap())
1343 .unwrap();
1344 return self.with_column(index_column).group_by_dynamic(
1345 Expr::Column(output_field.name().clone()),
1346 group_by,
1347 options,
1348 );
1349 }
1350 let opt_state = self.get_opt_state();
1351 LazyGroupBy {
1352 logical_plan: self.logical_plan,
1353 opt_state,
1354 keys: group_by.as_ref().to_vec(),
1355 maintain_order: true,
1356 dynamic_options: Some(options),
1357 rolling_options: None,
1358 }
1359 }
1360
1361 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1363 let keys = by
1364 .as_ref()
1365 .iter()
1366 .map(|e| e.clone().into())
1367 .collect::<Vec<_>>();
1368 let opt_state = self.get_opt_state();
1369
1370 #[cfg(feature = "dynamic_group_by")]
1371 {
1372 LazyGroupBy {
1373 logical_plan: self.logical_plan,
1374 opt_state,
1375 keys,
1376 maintain_order: true,
1377 dynamic_options: None,
1378 rolling_options: None,
1379 }
1380 }
1381
1382 #[cfg(not(feature = "dynamic_group_by"))]
1383 {
1384 LazyGroupBy {
1385 logical_plan: self.logical_plan,
1386 opt_state,
1387 keys,
1388 maintain_order: true,
1389 }
1390 }
1391 }
1392
1393 #[cfg(feature = "semi_anti_join")]
1410 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1411 self.join(
1412 other,
1413 [left_on.into()],
1414 [right_on.into()],
1415 JoinArgs::new(JoinType::Anti),
1416 )
1417 }
1418
1419 #[cfg(feature = "cross_join")]
1421 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1422 self.join(
1423 other,
1424 vec![],
1425 vec![],
1426 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1427 )
1428 }
1429
1430 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1447 self.join(
1448 other,
1449 [left_on.into()],
1450 [right_on.into()],
1451 JoinArgs::new(JoinType::Left),
1452 )
1453 }
1454
1455 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1472 self.join(
1473 other,
1474 [left_on.into()],
1475 [right_on.into()],
1476 JoinArgs::new(JoinType::Inner),
1477 )
1478 }
1479
1480 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1497 self.join(
1498 other,
1499 [left_on.into()],
1500 [right_on.into()],
1501 JoinArgs::new(JoinType::Full),
1502 )
1503 }
1504
1505 #[cfg(feature = "semi_anti_join")]
1522 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1523 self.join(
1524 other,
1525 [left_on.into()],
1526 [right_on.into()],
1527 JoinArgs::new(JoinType::Semi),
1528 )
1529 }
1530
1531 pub fn join<E: AsRef<[Expr]>>(
1553 self,
1554 other: LazyFrame,
1555 left_on: E,
1556 right_on: E,
1557 args: JoinArgs,
1558 ) -> LazyFrame {
1559 let left_on = left_on.as_ref().to_vec();
1560 let right_on = right_on.as_ref().to_vec();
1561
1562 self._join_impl(other, left_on, right_on, args)
1563 }
1564
1565 fn _join_impl(
1566 self,
1567 other: LazyFrame,
1568 left_on: Vec<Expr>,
1569 right_on: Vec<Expr>,
1570 args: JoinArgs,
1571 ) -> LazyFrame {
1572 let JoinArgs {
1573 how,
1574 validation,
1575 suffix,
1576 slice,
1577 nulls_equal,
1578 coalesce,
1579 maintain_order,
1580 } = args;
1581
1582 if slice.is_some() {
1583 panic!("impl error: slice is not handled")
1584 }
1585
1586 let mut builder = self
1587 .join_builder()
1588 .with(other)
1589 .left_on(left_on)
1590 .right_on(right_on)
1591 .how(how)
1592 .validate(validation)
1593 .join_nulls(nulls_equal)
1594 .coalesce(coalesce)
1595 .maintain_order(maintain_order);
1596
1597 if let Some(suffix) = suffix {
1598 builder = builder.suffix(suffix);
1599 }
1600
1601 builder.finish()
1603 }
1604
1605 pub fn join_builder(self) -> JoinBuilder {
1611 JoinBuilder::new(self)
1612 }
1613
1614 pub fn with_column(self, expr: Expr) -> LazyFrame {
1632 let opt_state = self.get_opt_state();
1633 let lp = self
1634 .get_plan_builder()
1635 .with_columns(
1636 vec![expr],
1637 ProjectionOptions {
1638 run_parallel: false,
1639 duplicate_check: true,
1640 should_broadcast: true,
1641 },
1642 )
1643 .build();
1644 Self::from_logical_plan(lp, opt_state)
1645 }
1646
1647 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1662 let exprs = exprs.as_ref().to_vec();
1663 self.with_columns_impl(
1664 exprs,
1665 ProjectionOptions {
1666 run_parallel: true,
1667 duplicate_check: true,
1668 should_broadcast: true,
1669 },
1670 )
1671 }
1672
1673 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1675 let exprs = exprs.as_ref().to_vec();
1676 self.with_columns_impl(
1677 exprs,
1678 ProjectionOptions {
1679 run_parallel: false,
1680 duplicate_check: true,
1681 should_broadcast: true,
1682 },
1683 )
1684 }
1685
1686 pub fn match_to_schema(
1688 self,
1689 schema: SchemaRef,
1690 per_column: Arc<[MatchToSchemaPerColumn]>,
1691 extra_columns: ExtraColumnsPolicy,
1692 ) -> LazyFrame {
1693 let opt_state = self.get_opt_state();
1694 let lp = self
1695 .get_plan_builder()
1696 .match_to_schema(schema, per_column, extra_columns)
1697 .build();
1698 Self::from_logical_plan(lp, opt_state)
1699 }
1700
1701 pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1702 let opt_state = self.get_opt_state();
1703 let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1704 Self::from_logical_plan(lp, opt_state)
1705 }
1706
1707 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1708 let opt_state = self.get_opt_state();
1709 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1710 Self::from_logical_plan(lp, opt_state)
1711 }
1712
1713 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1714 let contexts = contexts
1715 .as_ref()
1716 .iter()
1717 .map(|lf| lf.logical_plan.clone())
1718 .collect();
1719 let opt_state = self.get_opt_state();
1720 let lp = self.get_plan_builder().with_context(contexts).build();
1721 Self::from_logical_plan(lp, opt_state)
1722 }
1723
1724 pub fn max(self) -> Self {
1728 self.map_private(DslFunction::Stats(StatsFunction::Max))
1729 }
1730
1731 pub fn min(self) -> Self {
1735 self.map_private(DslFunction::Stats(StatsFunction::Min))
1736 }
1737
1738 pub fn sum(self) -> Self {
1748 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1749 }
1750
1751 pub fn mean(self) -> Self {
1756 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1757 }
1758
1759 pub fn median(self) -> Self {
1765 self.map_private(DslFunction::Stats(StatsFunction::Median))
1766 }
1767
1768 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1770 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1771 quantile,
1772 method,
1773 }))
1774 }
1775
1776 pub fn std(self, ddof: u8) -> Self {
1789 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1790 }
1791
1792 pub fn var(self, ddof: u8) -> Self {
1802 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1803 }
1804
1805 pub fn explode(self, columns: Selector) -> LazyFrame {
1807 self.explode_impl(columns, false)
1808 }
1809
1810 fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1812 let opt_state = self.get_opt_state();
1813 let lp = self
1814 .get_plan_builder()
1815 .explode(columns, allow_empty)
1816 .build();
1817 Self::from_logical_plan(lp, opt_state)
1818 }
1819
1820 pub fn null_count(self) -> LazyFrame {
1822 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1823 }
1824
1825 pub fn unique_stable(
1830 self,
1831 subset: Option<Selector>,
1832 keep_strategy: UniqueKeepStrategy,
1833 ) -> LazyFrame {
1834 self.unique_stable_generic(subset, keep_strategy)
1835 }
1836
1837 pub fn unique_stable_generic(
1838 self,
1839 subset: Option<Selector>,
1840 keep_strategy: UniqueKeepStrategy,
1841 ) -> LazyFrame {
1842 let opt_state = self.get_opt_state();
1843 let options = DistinctOptionsDSL {
1844 subset,
1845 maintain_order: true,
1846 keep_strategy,
1847 };
1848 let lp = self.get_plan_builder().distinct(options).build();
1849 Self::from_logical_plan(lp, opt_state)
1850 }
1851
1852 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1860 self.unique_generic(subset, keep_strategy)
1861 }
1862
1863 pub fn unique_generic(
1864 self,
1865 subset: Option<Selector>,
1866 keep_strategy: UniqueKeepStrategy,
1867 ) -> LazyFrame {
1868 let opt_state = self.get_opt_state();
1869 let options = DistinctOptionsDSL {
1870 subset,
1871 maintain_order: false,
1872 keep_strategy,
1873 };
1874 let lp = self.get_plan_builder().distinct(options).build();
1875 Self::from_logical_plan(lp, opt_state)
1876 }
1877
1878 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1883 let opt_state = self.get_opt_state();
1884 let lp = self.get_plan_builder().drop_nans(subset).build();
1885 Self::from_logical_plan(lp, opt_state)
1886 }
1887
1888 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1893 let opt_state = self.get_opt_state();
1894 let lp = self.get_plan_builder().drop_nulls(subset).build();
1895 Self::from_logical_plan(lp, opt_state)
1896 }
1897
1898 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1908 let opt_state = self.get_opt_state();
1909 let lp = self.get_plan_builder().slice(offset, len).build();
1910 Self::from_logical_plan(lp, opt_state)
1911 }
1912
1913 pub fn first(self) -> LazyFrame {
1917 self.slice(0, 1)
1918 }
1919
1920 pub fn last(self) -> LazyFrame {
1924 self.slice(-1, 1)
1925 }
1926
1927 pub fn tail(self, n: IdxSize) -> LazyFrame {
1931 let neg_tail = -(n as i64);
1932 self.slice(neg_tail, n)
1933 }
1934
1935 #[cfg(feature = "pivot")]
1939 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1940 let opt_state = self.get_opt_state();
1941 let lp = self.get_plan_builder().unpivot(args).build();
1942 Self::from_logical_plan(lp, opt_state)
1943 }
1944
1945 pub fn limit(self, n: IdxSize) -> LazyFrame {
1947 self.slice(0, n)
1948 }
1949
1950 pub fn map<F>(
1964 self,
1965 function: F,
1966 optimizations: AllowedOptimizations,
1967 schema: Option<Arc<dyn UdfSchema>>,
1968 name: Option<&'static str>,
1969 ) -> LazyFrame
1970 where
1971 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1972 {
1973 let opt_state = self.get_opt_state();
1974 let lp = self
1975 .get_plan_builder()
1976 .map(
1977 function,
1978 optimizations,
1979 schema,
1980 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1981 )
1982 .build();
1983 Self::from_logical_plan(lp, opt_state)
1984 }
1985
1986 #[cfg(feature = "python")]
1987 pub fn map_python(
1988 self,
1989 function: polars_utils::python_function::PythonFunction,
1990 optimizations: AllowedOptimizations,
1991 schema: Option<SchemaRef>,
1992 validate_output: bool,
1993 ) -> LazyFrame {
1994 let opt_state = self.get_opt_state();
1995 let lp = self
1996 .get_plan_builder()
1997 .map_python(function, optimizations, schema, validate_output)
1998 .build();
1999 Self::from_logical_plan(lp, opt_state)
2000 }
2001
2002 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2003 let opt_state = self.get_opt_state();
2004 let lp = self.get_plan_builder().map_private(function).build();
2005 Self::from_logical_plan(lp, opt_state)
2006 }
2007
2008 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2017 where
2018 S: Into<PlSmallStr>,
2019 {
2020 let name = name.into();
2021
2022 match &self.logical_plan {
2023 v @ DslPlan::Scan { scan_type, .. }
2024 if !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2025 {
2026 let DslPlan::Scan {
2027 sources,
2028 mut unified_scan_args,
2029 scan_type,
2030 cached_ir: _,
2031 } = v.clone()
2032 else {
2033 unreachable!()
2034 };
2035
2036 unified_scan_args.row_index = Some(RowIndex {
2037 name,
2038 offset: offset.unwrap_or(0),
2039 });
2040
2041 DslPlan::Scan {
2042 sources,
2043 unified_scan_args,
2044 scan_type,
2045 cached_ir: Default::default(),
2046 }
2047 .into()
2048 },
2049 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2050 }
2051 }
2052
2053 pub fn count(self) -> LazyFrame {
2055 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2056 }
2057
2058 #[cfg(feature = "dtype-struct")]
2061 pub fn unnest(self, cols: Selector) -> Self {
2062 self.map_private(DslFunction::Unnest(cols))
2063 }
2064
2065 #[cfg(feature = "merge_sorted")]
2066 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2067 where
2068 S: Into<PlSmallStr>,
2069 {
2070 let key = key.into();
2071
2072 let lp = DslPlan::MergeSorted {
2073 input_left: Arc::new(self.logical_plan),
2074 input_right: Arc::new(other.logical_plan),
2075 key,
2076 };
2077 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2078 }
2079}
2080
2081#[derive(Clone)]
2083pub struct LazyGroupBy {
2084 pub logical_plan: DslPlan,
2085 opt_state: OptFlags,
2086 keys: Vec<Expr>,
2087 maintain_order: bool,
2088 #[cfg(feature = "dynamic_group_by")]
2089 dynamic_options: Option<DynamicGroupOptions>,
2090 #[cfg(feature = "dynamic_group_by")]
2091 rolling_options: Option<RollingGroupOptions>,
2092}
2093
2094impl From<LazyGroupBy> for LazyFrame {
2095 fn from(lgb: LazyGroupBy) -> Self {
2096 Self {
2097 logical_plan: lgb.logical_plan,
2098 opt_state: lgb.opt_state,
2099 cached_arena: Default::default(),
2100 }
2101 }
2102}
2103
2104impl LazyGroupBy {
2105 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2127 #[cfg(feature = "dynamic_group_by")]
2128 let lp = DslBuilder::from(self.logical_plan)
2129 .group_by(
2130 self.keys,
2131 aggs,
2132 None,
2133 self.maintain_order,
2134 self.dynamic_options,
2135 self.rolling_options,
2136 )
2137 .build();
2138
2139 #[cfg(not(feature = "dynamic_group_by"))]
2140 let lp = DslBuilder::from(self.logical_plan)
2141 .group_by(self.keys, aggs, None, self.maintain_order)
2142 .build();
2143 LazyFrame::from_logical_plan(lp, self.opt_state)
2144 }
2145
2146 pub fn head(self, n: Option<usize>) -> LazyFrame {
2148 let keys = self
2149 .keys
2150 .iter()
2151 .filter_map(|expr| expr_output_name(expr).ok())
2152 .collect::<Vec<_>>();
2153
2154 self.agg([all().as_expr().head(n)])
2155 .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2156 }
2157
2158 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2160 let keys = self
2161 .keys
2162 .iter()
2163 .filter_map(|expr| expr_output_name(expr).ok())
2164 .collect::<Vec<_>>();
2165
2166 self.agg([all().as_expr().tail(n)])
2167 .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2168 }
2169
2170 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2175 #[cfg(feature = "dynamic_group_by")]
2176 let options = GroupbyOptions {
2177 dynamic: self.dynamic_options,
2178 rolling: self.rolling_options,
2179 slice: None,
2180 };
2181
2182 #[cfg(not(feature = "dynamic_group_by"))]
2183 let options = GroupbyOptions { slice: None };
2184
2185 let lp = DslPlan::GroupBy {
2186 input: Arc::new(self.logical_plan),
2187 keys: self.keys,
2188 aggs: vec![],
2189 apply: Some((f, schema)),
2190 maintain_order: self.maintain_order,
2191 options: Arc::new(options),
2192 };
2193 LazyFrame::from_logical_plan(lp, self.opt_state)
2194 }
2195}
2196
2197#[must_use]
2198pub struct JoinBuilder {
2199 lf: LazyFrame,
2200 how: JoinType,
2201 other: Option<LazyFrame>,
2202 left_on: Vec<Expr>,
2203 right_on: Vec<Expr>,
2204 allow_parallel: bool,
2205 force_parallel: bool,
2206 suffix: Option<PlSmallStr>,
2207 validation: JoinValidation,
2208 nulls_equal: bool,
2209 coalesce: JoinCoalesce,
2210 maintain_order: MaintainOrderJoin,
2211}
2212impl JoinBuilder {
2213 pub fn new(lf: LazyFrame) -> Self {
2215 Self {
2216 lf,
2217 other: None,
2218 how: JoinType::Inner,
2219 left_on: vec![],
2220 right_on: vec![],
2221 allow_parallel: true,
2222 force_parallel: false,
2223 suffix: None,
2224 validation: Default::default(),
2225 nulls_equal: false,
2226 coalesce: Default::default(),
2227 maintain_order: Default::default(),
2228 }
2229 }
2230
2231 pub fn with(mut self, other: LazyFrame) -> Self {
2233 self.other = Some(other);
2234 self
2235 }
2236
2237 pub fn how(mut self, how: JoinType) -> Self {
2239 self.how = how;
2240 self
2241 }
2242
2243 pub fn validate(mut self, validation: JoinValidation) -> Self {
2244 self.validation = validation;
2245 self
2246 }
2247
2248 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2252 let on = on.as_ref().to_vec();
2253 self.left_on.clone_from(&on);
2254 self.right_on = on;
2255 self
2256 }
2257
2258 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2262 self.left_on = on.as_ref().to_vec();
2263 self
2264 }
2265
2266 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2270 self.right_on = on.as_ref().to_vec();
2271 self
2272 }
2273
2274 pub fn allow_parallel(mut self, allow: bool) -> Self {
2276 self.allow_parallel = allow;
2277 self
2278 }
2279
2280 pub fn force_parallel(mut self, force: bool) -> Self {
2282 self.force_parallel = force;
2283 self
2284 }
2285
2286 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2288 self.nulls_equal = nulls_equal;
2289 self
2290 }
2291
2292 pub fn suffix<S>(mut self, suffix: S) -> Self
2295 where
2296 S: Into<PlSmallStr>,
2297 {
2298 self.suffix = Some(suffix.into());
2299 self
2300 }
2301
2302 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2304 self.coalesce = coalesce;
2305 self
2306 }
2307
2308 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2310 self.maintain_order = maintain_order;
2311 self
2312 }
2313
2314 pub fn finish(self) -> LazyFrame {
2316 let opt_state = self.lf.opt_state;
2317 let other = self.other.expect("'with' not set in join builder");
2318
2319 let args = JoinArgs {
2320 how: self.how,
2321 validation: self.validation,
2322 suffix: self.suffix,
2323 slice: None,
2324 nulls_equal: self.nulls_equal,
2325 coalesce: self.coalesce,
2326 maintain_order: self.maintain_order,
2327 };
2328
2329 let lp = self
2330 .lf
2331 .get_plan_builder()
2332 .join(
2333 other.logical_plan,
2334 self.left_on,
2335 self.right_on,
2336 JoinOptions {
2337 allow_parallel: self.allow_parallel,
2338 force_parallel: self.force_parallel,
2339 args,
2340 }
2341 .into(),
2342 )
2343 .build();
2344 LazyFrame::from_logical_plan(lp, opt_state)
2345 }
2346
2347 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2349 let opt_state = self.lf.opt_state;
2350 let other = self.other.expect("with not set");
2351
2352 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2354 if let Expr::BinaryExpr {
2355 op: Operator::And,
2356 left,
2357 right,
2358 } = predicate
2359 {
2360 decompose_and((*left).clone(), expanded_predicates);
2361 decompose_and((*right).clone(), expanded_predicates);
2362 } else {
2363 expanded_predicates.push(predicate);
2364 }
2365 }
2366 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2367 for predicate in predicates {
2368 decompose_and(predicate, &mut expanded_predicates);
2369 }
2370 let predicates: Vec<Expr> = expanded_predicates;
2371
2372 #[cfg(feature = "is_between")]
2374 let predicates: Vec<Expr> = {
2375 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2376 for predicate in predicates {
2377 if let Expr::Function {
2378 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2379 input,
2380 ..
2381 } = &predicate
2382 {
2383 if let [expr, lower, upper] = input.as_slice() {
2384 match closed {
2385 ClosedInterval::Both => {
2386 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2387 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2388 },
2389 ClosedInterval::Right => {
2390 expanded_predicates.push(expr.clone().gt(lower.clone()));
2391 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2392 },
2393 ClosedInterval::Left => {
2394 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2395 expanded_predicates.push(expr.clone().lt(upper.clone()));
2396 },
2397 ClosedInterval::None => {
2398 expanded_predicates.push(expr.clone().gt(lower.clone()));
2399 expanded_predicates.push(expr.clone().lt(upper.clone()));
2400 },
2401 }
2402 continue;
2403 }
2404 }
2405 expanded_predicates.push(predicate);
2406 }
2407 expanded_predicates
2408 };
2409
2410 let args = JoinArgs {
2411 how: self.how,
2412 validation: self.validation,
2413 suffix: self.suffix,
2414 slice: None,
2415 nulls_equal: self.nulls_equal,
2416 coalesce: self.coalesce,
2417 maintain_order: self.maintain_order,
2418 };
2419 let options = JoinOptions {
2420 allow_parallel: self.allow_parallel,
2421 force_parallel: self.force_parallel,
2422 args,
2423 };
2424
2425 let lp = DslPlan::Join {
2426 input_left: Arc::new(self.lf.logical_plan),
2427 input_right: Arc::new(other.logical_plan),
2428 left_on: Default::default(),
2429 right_on: Default::default(),
2430 predicates,
2431 options: Arc::from(options),
2432 };
2433
2434 LazyFrame::from_logical_plan(lp, opt_state)
2435 }
2436}
2437
2438pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2439 #[cfg(not(feature = "new_streaming"))]
2440 {
2441 None
2442 }
2443 #[cfg(feature = "new_streaming")]
2444 {
2445 Some(streaming_dispatch::build_streaming_query_executor)
2446 }
2447};
2448#[cfg(feature = "new_streaming")]
2449pub use streaming_dispatch::build_streaming_query_executor;
2450
2451#[cfg(feature = "new_streaming")]
2452mod streaming_dispatch {
2453 use std::sync::{Arc, Mutex};
2454
2455 use polars_core::POOL;
2456 use polars_core::error::PolarsResult;
2457 use polars_core::frame::DataFrame;
2458 use polars_expr::state::ExecutionState;
2459 use polars_mem_engine::Executor;
2460 use polars_plan::dsl::SinkTypeIR;
2461 use polars_plan::plans::{AExpr, IR};
2462 use polars_utils::arena::{Arena, Node};
2463
2464 pub fn build_streaming_query_executor(
2465 node: Node,
2466 ir_arena: &mut Arena<IR>,
2467 expr_arena: &mut Arena<AExpr>,
2468 ) -> PolarsResult<Box<dyn Executor>> {
2469 let rechunk = match ir_arena.get(node) {
2470 IR::Scan {
2471 unified_scan_args, ..
2472 } => unified_scan_args.rechunk,
2473 _ => false,
2474 };
2475
2476 let node = match ir_arena.get(node) {
2477 IR::SinkMultiple { .. } => panic!("SinkMultiple not supported"),
2478 IR::Sink { .. } => node,
2479 _ => ir_arena.add(IR::Sink {
2480 input: node,
2481 payload: SinkTypeIR::Memory,
2482 }),
2483 };
2484
2485 polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2486 .map(Some)
2487 .map(Mutex::new)
2488 .map(Arc::new)
2489 .map(|x| StreamingQueryExecutor {
2490 executor: x,
2491 rechunk,
2492 })
2493 .map(|x| Box::new(x) as Box<dyn Executor>)
2494 }
2495
2496 struct StreamingQueryExecutor {
2498 executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2499 rechunk: bool,
2500 }
2501
2502 impl Executor for StreamingQueryExecutor {
2503 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2504 assert!(POOL.current_thread_index().is_none());
2506
2507 let mut df = { self.executor.try_lock().unwrap().take() }
2508 .expect("unhandled: execute() more than once")
2509 .execute()
2510 .map(|x| x.unwrap_single())?;
2511
2512 if self.rechunk {
2513 df.as_single_chunk_par();
2514 }
2515
2516 Ok(df)
2517 }
2518 }
2519}