1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::sync::{Arc, Mutex};
13
14pub use anonymous_scan::*;
15#[cfg(feature = "csv")]
16pub use csv::*;
17#[cfg(not(target_arch = "wasm32"))]
18pub use exitable::*;
19pub use file_list_reader::*;
20#[cfg(feature = "ipc")]
21pub use ipc::*;
22#[cfg(feature = "json")]
23pub use ndjson::*;
24#[cfg(feature = "parquet")]
25pub use parquet::*;
26use polars_compute::rolling::QuantileMethod;
27use polars_core::POOL;
28use polars_core::error::feature_gated;
29use polars_core::prelude::*;
30use polars_expr::{ExpressionConversionState, create_physical_expr};
31use polars_io::RowIndex;
32use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
34#[cfg(feature = "is_between")]
35use polars_ops::prelude::ClosedInterval;
36pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37use polars_utils::pl_str::PlSmallStr;
38use polars_utils::plpath::PlPath;
39use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
40
41use crate::frame::cached_arenas::CachedArena;
42use crate::prelude::*;
43
44pub trait IntoLazy {
45 fn lazy(self) -> LazyFrame;
46}
47
48impl IntoLazy for DataFrame {
49 fn lazy(self) -> LazyFrame {
51 let lp = DslBuilder::from_existing_df(self).build();
52 LazyFrame {
53 logical_plan: lp,
54 opt_state: Default::default(),
55 cached_arena: Default::default(),
56 }
57 }
58}
59
60impl IntoLazy for LazyFrame {
61 fn lazy(self) -> LazyFrame {
62 self
63 }
64}
65
66#[derive(Clone, Default)]
71#[must_use]
72pub struct LazyFrame {
73 pub logical_plan: DslPlan,
74 pub(crate) opt_state: OptFlags,
75 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
76}
77
78impl From<DslPlan> for LazyFrame {
79 fn from(plan: DslPlan) -> Self {
80 Self {
81 logical_plan: plan,
82 opt_state: OptFlags::default(),
83 cached_arena: Default::default(),
84 }
85 }
86}
87
88impl LazyFrame {
89 pub(crate) fn from_inner(
90 logical_plan: DslPlan,
91 opt_state: OptFlags,
92 cached_arena: Arc<Mutex<Option<CachedArena>>>,
93 ) -> Self {
94 Self {
95 logical_plan,
96 opt_state,
97 cached_arena,
98 }
99 }
100
101 pub(crate) fn get_plan_builder(self) -> DslBuilder {
102 DslBuilder::from(self.logical_plan)
103 }
104
105 fn get_opt_state(&self) -> OptFlags {
106 self.opt_state
107 }
108
109 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
110 LazyFrame {
111 logical_plan,
112 opt_state,
113 cached_arena: Default::default(),
114 }
115 }
116
117 pub fn get_current_optimizations(&self) -> OptFlags {
119 self.opt_state
120 }
121
122 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
124 self.opt_state = opt_state;
125 self
126 }
127
128 pub fn without_optimizations(self) -> Self {
130 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
131 }
132
133 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
135 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
136 self
137 }
138
139 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
141 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
142 self
143 }
144
145 pub fn with_check_order(mut self, toggle: bool) -> Self {
148 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
149 self
150 }
151
152 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
154 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
155 self
156 }
157
158 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
160 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
161 self
162 }
163
164 pub fn with_type_check(mut self, toggle: bool) -> Self {
166 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
167 self
168 }
169
170 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
172 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
173 self
174 }
175
176 #[cfg(feature = "cse")]
178 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
179 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
180 self
181 }
182
183 #[cfg(feature = "cse")]
185 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
186 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
187 self
188 }
189
190 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
192 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
193 self
194 }
195
196 #[cfg(feature = "new_streaming")]
197 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
198 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
199 self
200 }
201
202 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
204 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
205 self
206 }
207
208 pub fn _with_eager(mut self, toggle: bool) -> Self {
210 self.opt_state.set(OptFlags::EAGER, toggle);
211 self
212 }
213
214 pub fn describe_plan(&self) -> PolarsResult<String> {
216 Ok(self.clone().to_alp()?.describe())
217 }
218
219 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
221 Ok(self.clone().to_alp()?.describe_tree_format())
222 }
223
224 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
228 Ok(self.clone().to_alp_optimized()?.describe())
229 }
230
231 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
235 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
236 }
237
238 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
243 if optimized {
244 self.describe_optimized_plan()
245 } else {
246 self.describe_plan()
247 }
248 }
249
250 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
290 let opt_state = self.get_opt_state();
291 let lp = self
292 .get_plan_builder()
293 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
294 .build();
295 Self::from_logical_plan(lp, opt_state)
296 }
297
298 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
318 self,
319 by_exprs: E,
320 sort_options: SortMultipleOptions,
321 ) -> Self {
322 let by_exprs = by_exprs.as_ref().to_vec();
323 if by_exprs.is_empty() {
324 self
325 } else {
326 let opt_state = self.get_opt_state();
327 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
328 Self::from_logical_plan(lp, opt_state)
329 }
330 }
331
332 pub fn top_k<E: AsRef<[Expr]>>(
333 self,
334 k: IdxSize,
335 by_exprs: E,
336 sort_options: SortMultipleOptions,
337 ) -> Self {
338 self.sort_by_exprs(
340 by_exprs,
341 sort_options.with_order_reversed().with_nulls_last(true),
342 )
343 .slice(0, k)
344 }
345
346 pub fn bottom_k<E: AsRef<[Expr]>>(
347 self,
348 k: IdxSize,
349 by_exprs: E,
350 sort_options: SortMultipleOptions,
351 ) -> Self {
352 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
354 .slice(0, k)
355 }
356
357 pub fn reverse(self) -> Self {
373 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
374 }
375
376 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
384 where
385 I: IntoIterator<Item = T>,
386 J: IntoIterator<Item = S>,
387 T: AsRef<str>,
388 S: AsRef<str>,
389 {
390 let iter = existing.into_iter();
391 let cap = iter.size_hint().0;
392 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
393 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
394
395 for (existing, new) in iter.zip(new) {
398 let existing = existing.as_ref();
399 let new = new.as_ref();
400 if new != existing {
401 existing_vec.push(existing.into());
402 new_vec.push(new.into());
403 }
404 }
405
406 self.map_private(DslFunction::Rename {
407 existing: existing_vec.into(),
408 new: new_vec.into(),
409 strict,
410 })
411 }
412
413 pub fn drop(self, columns: Selector) -> Self {
420 let opt_state = self.get_opt_state();
421 let lp = self.get_plan_builder().drop(columns).build();
422 Self::from_logical_plan(lp, opt_state)
423 }
424
425 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
430 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
431 }
432
433 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
438 self.select(vec![
439 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
440 ])
441 }
442
443 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
445 let opt_state = self.get_opt_state();
446 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
447 Self::from_logical_plan(lp, opt_state)
448 }
449
450 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
452 let opt_state = self.get_opt_state();
453 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
454 Self::from_logical_plan(lp, opt_state)
455 }
456
457 pub fn cache(self) -> Self {
461 let opt_state = self.get_opt_state();
462 let lp = self.get_plan_builder().cache().build();
463 Self::from_logical_plan(lp, opt_state)
464 }
465
466 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
468 let cast_cols: Vec<Expr> = dtypes
469 .into_iter()
470 .map(|(name, dt)| {
471 let name = PlSmallStr::from_str(name);
472
473 if strict {
474 col(name).strict_cast(dt)
475 } else {
476 col(name).cast(dt)
477 }
478 })
479 .collect();
480
481 if cast_cols.is_empty() {
482 self
483 } else {
484 self.with_columns(cast_cols)
485 }
486 }
487
488 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
490 self.with_columns(vec![if strict {
491 col(PlSmallStr::from_static("*")).strict_cast(dtype)
492 } else {
493 col(PlSmallStr::from_static("*")).cast(dtype)
494 }])
495 }
496
497 pub fn optimize(
498 self,
499 lp_arena: &mut Arena<IR>,
500 expr_arena: &mut Arena<AExpr>,
501 ) -> PolarsResult<Node> {
502 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
503 }
504
505 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
506 let (mut lp_arena, mut expr_arena) = self.get_arenas();
507 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
508
509 Ok(IRPlan::new(node, lp_arena, expr_arena))
510 }
511
512 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
513 let (mut lp_arena, mut expr_arena) = self.get_arenas();
514 let node = to_alp(
515 self.logical_plan,
516 &mut expr_arena,
517 &mut lp_arena,
518 &mut self.opt_state,
519 )?;
520 let plan = IRPlan::new(node, lp_arena, expr_arena);
521 Ok(plan)
522 }
523
524 pub(crate) fn optimize_with_scratch(
525 self,
526 lp_arena: &mut Arena<IR>,
527 expr_arena: &mut Arena<AExpr>,
528 scratch: &mut Vec<Node>,
529 ) -> PolarsResult<Node> {
530 #[allow(unused_mut)]
531 let mut opt_state = self.opt_state;
532 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
533
534 #[cfg(feature = "cse")]
535 if new_streaming {
536 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
539 }
540
541 let lp_top = optimize(
542 self.logical_plan,
543 opt_state,
544 lp_arena,
545 expr_arena,
546 scratch,
547 Some(&|expr, expr_arena, schema| {
548 let phys_expr = create_physical_expr(
549 expr,
550 Context::Default,
551 expr_arena,
552 schema,
553 &mut ExpressionConversionState::new(true),
554 )
555 .ok()?;
556 let io_expr = phys_expr_to_io_expr(phys_expr);
557 Some(io_expr)
558 }),
559 )?;
560
561 Ok(lp_top)
562 }
563
564 fn prepare_collect_post_opt<P>(
565 mut self,
566 check_sink: bool,
567 query_start: Option<std::time::Instant>,
568 post_opt: P,
569 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
570 where
571 P: FnOnce(
572 Node,
573 &mut Arena<IR>,
574 &mut Arena<AExpr>,
575 Option<std::time::Duration>,
576 ) -> PolarsResult<()>,
577 {
578 let (mut lp_arena, mut expr_arena) = self.get_arenas();
579
580 let mut scratch = vec![];
581 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
582
583 post_opt(
584 lp_top,
585 &mut lp_arena,
586 &mut expr_arena,
587 query_start.map(|s| s.elapsed()),
590 )?;
591
592 let no_file_sink = if check_sink {
594 !matches!(
595 lp_arena.get(lp_top),
596 IR::Sink {
597 payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
598 ..
599 }
600 )
601 } else {
602 true
603 };
604 let physical_plan = create_physical_plan(
605 lp_top,
606 &mut lp_arena,
607 &mut expr_arena,
608 BUILD_STREAMING_EXECUTOR,
609 )?;
610
611 let state = ExecutionState::new();
612 Ok((state, physical_plan, no_file_sink))
613 }
614
615 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
617 where
618 P: FnOnce(
619 Node,
620 &mut Arena<IR>,
621 &mut Arena<AExpr>,
622 Option<std::time::Duration>,
623 ) -> PolarsResult<()>,
624 {
625 let (mut state, mut physical_plan, _) =
626 self.prepare_collect_post_opt(false, None, post_opt)?;
627 physical_plan.execute(&mut state)
628 }
629
630 #[allow(unused_mut)]
631 fn prepare_collect(
632 self,
633 check_sink: bool,
634 query_start: Option<std::time::Instant>,
635 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
636 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
637 }
638
639 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
644 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
645 payload.clone()
646 } else {
647 self.logical_plan = DslPlan::Sink {
648 input: Arc::new(self.logical_plan),
649 payload: SinkType::Memory,
650 };
651 SinkType::Memory
652 };
653
654 if engine == Engine::Auto {
656 engine = match payload {
657 #[cfg(feature = "new_streaming")]
658 SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
659 _ => Engine::InMemory,
660 };
661 }
662 if engine == Engine::Gpu {
664 engine = Engine::InMemory;
665 }
666
667 #[cfg(feature = "new_streaming")]
668 {
669 if let Some(result) = self.try_new_streaming_if_requested() {
670 return result.map(|v| v.unwrap_single());
671 }
672 }
673
674 match engine {
675 Engine::Auto => unreachable!(),
676 Engine::Streaming => {
677 feature_gated!("new_streaming", self = self.with_new_streaming(true))
678 },
679 _ => {},
680 }
681 let mut alp_plan = self.clone().to_alp_optimized()?;
682
683 match engine {
684 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
685 let result = polars_stream::run_query(
686 alp_plan.lp_top,
687 &mut alp_plan.lp_arena,
688 &mut alp_plan.expr_arena,
689 );
690 result.map(|v| v.unwrap_single())
691 }),
692 Engine::Gpu => {
693 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
694 },
695 Engine::InMemory => {
696 let mut physical_plan = create_physical_plan(
697 alp_plan.lp_top,
698 &mut alp_plan.lp_arena,
699 &mut alp_plan.expr_arena,
700 BUILD_STREAMING_EXECUTOR,
701 )?;
702 let mut state = ExecutionState::new();
703 physical_plan.execute(&mut state)
704 },
705 }
706 }
707
708 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
709 let sink_multiple = LazyFrame {
710 logical_plan: DslPlan::SinkMultiple { inputs: plans },
711 opt_state,
712 cached_arena: Default::default(),
713 };
714 sink_multiple.explain(true)
715 }
716
717 pub fn collect_all_with_engine(
718 plans: Vec<DslPlan>,
719 mut engine: Engine,
720 opt_state: OptFlags,
721 ) -> PolarsResult<Vec<DataFrame>> {
722 if plans.is_empty() {
723 return Ok(Vec::new());
724 }
725
726 if engine == Engine::Auto {
728 engine = Engine::InMemory;
729 }
730 if engine == Engine::Gpu {
732 engine = Engine::InMemory;
733 }
734
735 let mut sink_multiple = LazyFrame {
736 logical_plan: DslPlan::SinkMultiple { inputs: plans },
737 opt_state,
738 cached_arena: Default::default(),
739 };
740
741 #[cfg(feature = "new_streaming")]
742 {
743 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
744 return result.map(|v| v.unwrap_multiple());
745 }
746 }
747
748 match engine {
749 Engine::Auto => unreachable!(),
750 Engine::Streaming => {
751 feature_gated!(
752 "new_streaming",
753 sink_multiple = sink_multiple.with_new_streaming(true)
754 )
755 },
756 _ => {},
757 }
758 let mut alp_plan = sink_multiple.to_alp_optimized()?;
759
760 if engine == Engine::Streaming {
761 feature_gated!("new_streaming", {
762 let result = polars_stream::run_query(
763 alp_plan.lp_top,
764 &mut alp_plan.lp_arena,
765 &mut alp_plan.expr_arena,
766 );
767 return result.map(|v| v.unwrap_multiple());
768 });
769 }
770
771 let IR::SinkMultiple { inputs } = alp_plan.root() else {
772 unreachable!()
773 };
774
775 let mut multiplan = create_multiple_physical_plans(
776 inputs.clone().as_slice(),
777 &mut alp_plan.lp_arena,
778 &mut alp_plan.expr_arena,
779 BUILD_STREAMING_EXECUTOR,
780 )?;
781
782 match engine {
783 Engine::Gpu => polars_bail!(
784 InvalidOperation: "collect_all is not supported for the gpu engine"
785 ),
786 Engine::InMemory => {
787 let mut state = ExecutionState::new();
791 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
792 cache_prefiller.execute(&mut state)?;
793 }
794 let out = POOL.install(|| {
795 multiplan
796 .physical_plans
797 .chunks_mut(POOL.current_num_threads() * 3)
798 .map(|chunk| {
799 chunk
800 .into_par_iter()
801 .enumerate()
802 .map(|(idx, input)| {
803 let mut input = std::mem::take(input);
804 let mut state = state.split();
805 state.branch_idx += idx;
806
807 let df = input.execute(&mut state)?;
808 Ok(df)
809 })
810 .collect::<PolarsResult<Vec<_>>>()
811 })
812 .collect::<PolarsResult<Vec<_>>>()
813 });
814 Ok(out?.into_iter().flatten().collect())
815 },
816 _ => unreachable!(),
817 }
818 }
819
820 pub fn collect(self) -> PolarsResult<DataFrame> {
838 self.collect_with_engine(Engine::InMemory)
839 }
840
841 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
844 where
845 P: FnOnce(
846 Node,
847 &mut Arena<IR>,
848 &mut Arena<AExpr>,
849 Option<std::time::Duration>,
850 ) -> PolarsResult<()>,
851 {
852 let query_start = std::time::Instant::now();
853 let (mut state, mut physical_plan, _) =
854 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
855 state.time_nodes(query_start);
856 let out = physical_plan.execute(&mut state)?;
857 let timer_df = state.finish_timer()?;
858 Ok((out, timer_df))
859 }
860
861 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
869 self._profile_post_opt(|_, _, _, _| Ok(()))
870 }
871
872 #[cfg(feature = "parquet")]
876 pub fn sink_parquet(
877 self,
878 target: SinkTarget,
879 options: ParquetWriteOptions,
880 cloud_options: Option<polars_io::cloud::CloudOptions>,
881 sink_options: SinkOptions,
882 ) -> PolarsResult<Self> {
883 self.sink(SinkType::File(FileSinkType {
884 target,
885 sink_options,
886 file_type: FileType::Parquet(options),
887 cloud_options,
888 }))
889 }
890
891 #[cfg(feature = "ipc")]
895 pub fn sink_ipc(
896 self,
897 target: SinkTarget,
898 options: IpcWriterOptions,
899 cloud_options: Option<polars_io::cloud::CloudOptions>,
900 sink_options: SinkOptions,
901 ) -> PolarsResult<Self> {
902 self.sink(SinkType::File(FileSinkType {
903 target,
904 sink_options,
905 file_type: FileType::Ipc(options),
906 cloud_options,
907 }))
908 }
909
910 #[cfg(feature = "csv")]
914 pub fn sink_csv(
915 self,
916 target: SinkTarget,
917 options: CsvWriterOptions,
918 cloud_options: Option<polars_io::cloud::CloudOptions>,
919 sink_options: SinkOptions,
920 ) -> PolarsResult<Self> {
921 self.sink(SinkType::File(FileSinkType {
922 target,
923 sink_options,
924 file_type: FileType::Csv(options),
925 cloud_options,
926 }))
927 }
928
929 #[cfg(feature = "json")]
933 pub fn sink_json(
934 self,
935 target: SinkTarget,
936 options: JsonWriterOptions,
937 cloud_options: Option<polars_io::cloud::CloudOptions>,
938 sink_options: SinkOptions,
939 ) -> PolarsResult<Self> {
940 self.sink(SinkType::File(FileSinkType {
941 target,
942 sink_options,
943 file_type: FileType::Json(options),
944 cloud_options,
945 }))
946 }
947
948 #[cfg(feature = "parquet")]
952 #[allow(clippy::too_many_arguments)]
953 pub fn sink_parquet_partitioned(
954 self,
955 base_path: Arc<PlPath>,
956 file_path_cb: Option<PartitionTargetCallback>,
957 variant: PartitionVariant,
958 options: ParquetWriteOptions,
959 cloud_options: Option<polars_io::cloud::CloudOptions>,
960 sink_options: SinkOptions,
961 per_partition_sort_by: Option<Vec<SortColumn>>,
962 finish_callback: Option<SinkFinishCallback>,
963 ) -> PolarsResult<Self> {
964 self.sink(SinkType::Partition(PartitionSinkType {
965 base_path,
966 file_path_cb,
967 sink_options,
968 variant,
969 file_type: FileType::Parquet(options),
970 cloud_options,
971 per_partition_sort_by,
972 finish_callback,
973 }))
974 }
975
976 #[cfg(feature = "ipc")]
980 #[allow(clippy::too_many_arguments)]
981 pub fn sink_ipc_partitioned(
982 self,
983 base_path: Arc<PlPath>,
984 file_path_cb: Option<PartitionTargetCallback>,
985 variant: PartitionVariant,
986 options: IpcWriterOptions,
987 cloud_options: Option<polars_io::cloud::CloudOptions>,
988 sink_options: SinkOptions,
989 per_partition_sort_by: Option<Vec<SortColumn>>,
990 finish_callback: Option<SinkFinishCallback>,
991 ) -> PolarsResult<Self> {
992 self.sink(SinkType::Partition(PartitionSinkType {
993 base_path,
994 file_path_cb,
995 sink_options,
996 variant,
997 file_type: FileType::Ipc(options),
998 cloud_options,
999 per_partition_sort_by,
1000 finish_callback,
1001 }))
1002 }
1003
1004 #[cfg(feature = "csv")]
1008 #[allow(clippy::too_many_arguments)]
1009 pub fn sink_csv_partitioned(
1010 self,
1011 base_path: Arc<PlPath>,
1012 file_path_cb: Option<PartitionTargetCallback>,
1013 variant: PartitionVariant,
1014 options: CsvWriterOptions,
1015 cloud_options: Option<polars_io::cloud::CloudOptions>,
1016 sink_options: SinkOptions,
1017 per_partition_sort_by: Option<Vec<SortColumn>>,
1018 finish_callback: Option<SinkFinishCallback>,
1019 ) -> PolarsResult<Self> {
1020 self.sink(SinkType::Partition(PartitionSinkType {
1021 base_path,
1022 file_path_cb,
1023 sink_options,
1024 variant,
1025 file_type: FileType::Csv(options),
1026 cloud_options,
1027 per_partition_sort_by,
1028 finish_callback,
1029 }))
1030 }
1031
1032 #[cfg(feature = "json")]
1036 #[allow(clippy::too_many_arguments)]
1037 pub fn sink_json_partitioned(
1038 self,
1039 base_path: Arc<PlPath>,
1040 file_path_cb: Option<PartitionTargetCallback>,
1041 variant: PartitionVariant,
1042 options: JsonWriterOptions,
1043 cloud_options: Option<polars_io::cloud::CloudOptions>,
1044 sink_options: SinkOptions,
1045 per_partition_sort_by: Option<Vec<SortColumn>>,
1046 finish_callback: Option<SinkFinishCallback>,
1047 ) -> PolarsResult<Self> {
1048 self.sink(SinkType::Partition(PartitionSinkType {
1049 base_path,
1050 file_path_cb,
1051 sink_options,
1052 variant,
1053 file_type: FileType::Json(options),
1054 cloud_options,
1055 per_partition_sort_by,
1056 finish_callback,
1057 }))
1058 }
1059
1060 #[cfg(feature = "new_streaming")]
1061 pub fn try_new_streaming_if_requested(
1062 &mut self,
1063 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1064 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1065 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1066
1067 if auto_new_streaming || force_new_streaming {
1068 let mut new_stream_lazy = self.clone();
1071 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1072 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1073 Ok(v) => v,
1074 Err(e) => return Some(Err(e)),
1075 };
1076
1077 let f = || {
1078 polars_stream::run_query(
1079 alp_plan.lp_top,
1080 &mut alp_plan.lp_arena,
1081 &mut alp_plan.expr_arena,
1082 )
1083 };
1084
1085 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1086 Ok(v) => return Some(v),
1087 Err(e) => {
1088 if !force_new_streaming
1091 && auto_new_streaming
1092 && e.downcast_ref::<&str>()
1093 .map(|s| s.starts_with("not yet implemented"))
1094 .unwrap_or(false)
1095 {
1096 if polars_core::config::verbose() {
1097 eprintln!(
1098 "caught unimplemented error in new streaming engine, falling back to normal engine"
1099 );
1100 }
1101 } else {
1102 std::panic::resume_unwind(e);
1103 }
1104 },
1105 }
1106 }
1107
1108 None
1109 }
1110
1111 fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1112 polars_ensure!(
1113 !matches!(self.logical_plan, DslPlan::Sink { .. }),
1114 InvalidOperation: "cannot create a sink on top of another sink"
1115 );
1116 self.logical_plan = DslPlan::Sink {
1117 input: Arc::new(self.logical_plan),
1118 payload,
1119 };
1120 Ok(self)
1121 }
1122
1123 pub fn filter(self, predicate: Expr) -> Self {
1141 let opt_state = self.get_opt_state();
1142 let lp = self.get_plan_builder().filter(predicate).build();
1143 Self::from_logical_plan(lp, opt_state)
1144 }
1145
1146 pub fn remove(self, predicate: Expr) -> Self {
1164 self.filter(predicate.neq_missing(lit(true)))
1165 }
1166
1167 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1193 let exprs = exprs.as_ref().to_vec();
1194 self.select_impl(
1195 exprs,
1196 ProjectionOptions {
1197 run_parallel: true,
1198 duplicate_check: true,
1199 should_broadcast: true,
1200 },
1201 )
1202 }
1203
1204 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1205 let exprs = exprs.as_ref().to_vec();
1206 self.select_impl(
1207 exprs,
1208 ProjectionOptions {
1209 run_parallel: false,
1210 duplicate_check: true,
1211 should_broadcast: true,
1212 },
1213 )
1214 }
1215
1216 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1217 let opt_state = self.get_opt_state();
1218 let lp = self.get_plan_builder().project(exprs, options).build();
1219 Self::from_logical_plan(lp, opt_state)
1220 }
1221
1222 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1243 let keys = by
1244 .as_ref()
1245 .iter()
1246 .map(|e| e.clone().into())
1247 .collect::<Vec<_>>();
1248 let opt_state = self.get_opt_state();
1249
1250 #[cfg(feature = "dynamic_group_by")]
1251 {
1252 LazyGroupBy {
1253 logical_plan: self.logical_plan,
1254 opt_state,
1255 keys,
1256 maintain_order: false,
1257 dynamic_options: None,
1258 rolling_options: None,
1259 }
1260 }
1261
1262 #[cfg(not(feature = "dynamic_group_by"))]
1263 {
1264 LazyGroupBy {
1265 logical_plan: self.logical_plan,
1266 opt_state,
1267 keys,
1268 maintain_order: false,
1269 }
1270 }
1271 }
1272
1273 #[cfg(feature = "dynamic_group_by")]
1281 pub fn rolling<E: AsRef<[Expr]>>(
1282 mut self,
1283 index_column: Expr,
1284 group_by: E,
1285 mut options: RollingGroupOptions,
1286 ) -> LazyGroupBy {
1287 if let Expr::Column(name) = index_column {
1288 options.index_column = name;
1289 } else {
1290 let output_field = index_column
1291 .to_field(&self.collect_schema().unwrap())
1292 .unwrap();
1293 return self.with_column(index_column).rolling(
1294 Expr::Column(output_field.name().clone()),
1295 group_by,
1296 options,
1297 );
1298 }
1299 let opt_state = self.get_opt_state();
1300 LazyGroupBy {
1301 logical_plan: self.logical_plan,
1302 opt_state,
1303 keys: group_by.as_ref().to_vec(),
1304 maintain_order: true,
1305 dynamic_options: None,
1306 rolling_options: Some(options),
1307 }
1308 }
1309
1310 #[cfg(feature = "dynamic_group_by")]
1326 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1327 mut self,
1328 index_column: Expr,
1329 group_by: E,
1330 mut options: DynamicGroupOptions,
1331 ) -> LazyGroupBy {
1332 if let Expr::Column(name) = index_column {
1333 options.index_column = name;
1334 } else {
1335 let output_field = index_column
1336 .to_field(&self.collect_schema().unwrap())
1337 .unwrap();
1338 return self.with_column(index_column).group_by_dynamic(
1339 Expr::Column(output_field.name().clone()),
1340 group_by,
1341 options,
1342 );
1343 }
1344 let opt_state = self.get_opt_state();
1345 LazyGroupBy {
1346 logical_plan: self.logical_plan,
1347 opt_state,
1348 keys: group_by.as_ref().to_vec(),
1349 maintain_order: true,
1350 dynamic_options: Some(options),
1351 rolling_options: None,
1352 }
1353 }
1354
1355 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1357 let keys = by
1358 .as_ref()
1359 .iter()
1360 .map(|e| e.clone().into())
1361 .collect::<Vec<_>>();
1362 let opt_state = self.get_opt_state();
1363
1364 #[cfg(feature = "dynamic_group_by")]
1365 {
1366 LazyGroupBy {
1367 logical_plan: self.logical_plan,
1368 opt_state,
1369 keys,
1370 maintain_order: true,
1371 dynamic_options: None,
1372 rolling_options: None,
1373 }
1374 }
1375
1376 #[cfg(not(feature = "dynamic_group_by"))]
1377 {
1378 LazyGroupBy {
1379 logical_plan: self.logical_plan,
1380 opt_state,
1381 keys,
1382 maintain_order: true,
1383 }
1384 }
1385 }
1386
1387 #[cfg(feature = "semi_anti_join")]
1404 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1405 self.join(
1406 other,
1407 [left_on.into()],
1408 [right_on.into()],
1409 JoinArgs::new(JoinType::Anti),
1410 )
1411 }
1412
1413 #[cfg(feature = "cross_join")]
1415 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1416 self.join(
1417 other,
1418 vec![],
1419 vec![],
1420 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1421 )
1422 }
1423
1424 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1441 self.join(
1442 other,
1443 [left_on.into()],
1444 [right_on.into()],
1445 JoinArgs::new(JoinType::Left),
1446 )
1447 }
1448
1449 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1466 self.join(
1467 other,
1468 [left_on.into()],
1469 [right_on.into()],
1470 JoinArgs::new(JoinType::Inner),
1471 )
1472 }
1473
1474 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1491 self.join(
1492 other,
1493 [left_on.into()],
1494 [right_on.into()],
1495 JoinArgs::new(JoinType::Full),
1496 )
1497 }
1498
1499 #[cfg(feature = "semi_anti_join")]
1516 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1517 self.join(
1518 other,
1519 [left_on.into()],
1520 [right_on.into()],
1521 JoinArgs::new(JoinType::Semi),
1522 )
1523 }
1524
1525 pub fn join<E: AsRef<[Expr]>>(
1547 self,
1548 other: LazyFrame,
1549 left_on: E,
1550 right_on: E,
1551 args: JoinArgs,
1552 ) -> LazyFrame {
1553 let left_on = left_on.as_ref().to_vec();
1554 let right_on = right_on.as_ref().to_vec();
1555
1556 self._join_impl(other, left_on, right_on, args)
1557 }
1558
1559 fn _join_impl(
1560 self,
1561 other: LazyFrame,
1562 left_on: Vec<Expr>,
1563 right_on: Vec<Expr>,
1564 args: JoinArgs,
1565 ) -> LazyFrame {
1566 let JoinArgs {
1567 how,
1568 validation,
1569 suffix,
1570 slice,
1571 nulls_equal,
1572 coalesce,
1573 maintain_order,
1574 } = args;
1575
1576 if slice.is_some() {
1577 panic!("impl error: slice is not handled")
1578 }
1579
1580 let mut builder = self
1581 .join_builder()
1582 .with(other)
1583 .left_on(left_on)
1584 .right_on(right_on)
1585 .how(how)
1586 .validate(validation)
1587 .join_nulls(nulls_equal)
1588 .coalesce(coalesce)
1589 .maintain_order(maintain_order);
1590
1591 if let Some(suffix) = suffix {
1592 builder = builder.suffix(suffix);
1593 }
1594
1595 builder.finish()
1597 }
1598
1599 pub fn join_builder(self) -> JoinBuilder {
1605 JoinBuilder::new(self)
1606 }
1607
1608 pub fn with_column(self, expr: Expr) -> LazyFrame {
1626 let opt_state = self.get_opt_state();
1627 let lp = self
1628 .get_plan_builder()
1629 .with_columns(
1630 vec![expr],
1631 ProjectionOptions {
1632 run_parallel: false,
1633 duplicate_check: true,
1634 should_broadcast: true,
1635 },
1636 )
1637 .build();
1638 Self::from_logical_plan(lp, opt_state)
1639 }
1640
1641 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1656 let exprs = exprs.as_ref().to_vec();
1657 self.with_columns_impl(
1658 exprs,
1659 ProjectionOptions {
1660 run_parallel: true,
1661 duplicate_check: true,
1662 should_broadcast: true,
1663 },
1664 )
1665 }
1666
1667 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1669 let exprs = exprs.as_ref().to_vec();
1670 self.with_columns_impl(
1671 exprs,
1672 ProjectionOptions {
1673 run_parallel: false,
1674 duplicate_check: true,
1675 should_broadcast: true,
1676 },
1677 )
1678 }
1679
1680 pub fn match_to_schema(
1682 self,
1683 schema: SchemaRef,
1684 per_column: Arc<[MatchToSchemaPerColumn]>,
1685 extra_columns: ExtraColumnsPolicy,
1686 ) -> LazyFrame {
1687 let opt_state = self.get_opt_state();
1688 let lp = self
1689 .get_plan_builder()
1690 .match_to_schema(schema, per_column, extra_columns)
1691 .build();
1692 Self::from_logical_plan(lp, opt_state)
1693 }
1694
1695 pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1696 let opt_state = self.get_opt_state();
1697 let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1698 Self::from_logical_plan(lp, opt_state)
1699 }
1700
1701 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1702 let opt_state = self.get_opt_state();
1703 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1704 Self::from_logical_plan(lp, opt_state)
1705 }
1706
1707 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1708 let contexts = contexts
1709 .as_ref()
1710 .iter()
1711 .map(|lf| lf.logical_plan.clone())
1712 .collect();
1713 let opt_state = self.get_opt_state();
1714 let lp = self.get_plan_builder().with_context(contexts).build();
1715 Self::from_logical_plan(lp, opt_state)
1716 }
1717
1718 pub fn max(self) -> Self {
1722 self.map_private(DslFunction::Stats(StatsFunction::Max))
1723 }
1724
1725 pub fn min(self) -> Self {
1729 self.map_private(DslFunction::Stats(StatsFunction::Min))
1730 }
1731
1732 pub fn sum(self) -> Self {
1742 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1743 }
1744
1745 pub fn mean(self) -> Self {
1750 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1751 }
1752
1753 pub fn median(self) -> Self {
1759 self.map_private(DslFunction::Stats(StatsFunction::Median))
1760 }
1761
1762 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1764 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1765 quantile,
1766 method,
1767 }))
1768 }
1769
1770 pub fn std(self, ddof: u8) -> Self {
1783 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1784 }
1785
1786 pub fn var(self, ddof: u8) -> Self {
1796 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1797 }
1798
1799 pub fn explode(self, columns: Selector) -> LazyFrame {
1801 self.explode_impl(columns, false)
1802 }
1803
1804 fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1806 let opt_state = self.get_opt_state();
1807 let lp = self
1808 .get_plan_builder()
1809 .explode(columns, allow_empty)
1810 .build();
1811 Self::from_logical_plan(lp, opt_state)
1812 }
1813
1814 pub fn null_count(self) -> LazyFrame {
1816 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1817 }
1818
1819 pub fn unique_stable(
1824 self,
1825 subset: Option<Selector>,
1826 keep_strategy: UniqueKeepStrategy,
1827 ) -> LazyFrame {
1828 self.unique_stable_generic(subset, keep_strategy)
1829 }
1830
1831 pub fn unique_stable_generic(
1832 self,
1833 subset: Option<Selector>,
1834 keep_strategy: UniqueKeepStrategy,
1835 ) -> LazyFrame {
1836 let opt_state = self.get_opt_state();
1837 let options = DistinctOptionsDSL {
1838 subset,
1839 maintain_order: true,
1840 keep_strategy,
1841 };
1842 let lp = self.get_plan_builder().distinct(options).build();
1843 Self::from_logical_plan(lp, opt_state)
1844 }
1845
1846 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1854 self.unique_generic(subset, keep_strategy)
1855 }
1856
1857 pub fn unique_generic(
1858 self,
1859 subset: Option<Selector>,
1860 keep_strategy: UniqueKeepStrategy,
1861 ) -> LazyFrame {
1862 let opt_state = self.get_opt_state();
1863 let options = DistinctOptionsDSL {
1864 subset,
1865 maintain_order: false,
1866 keep_strategy,
1867 };
1868 let lp = self.get_plan_builder().distinct(options).build();
1869 Self::from_logical_plan(lp, opt_state)
1870 }
1871
1872 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1877 let opt_state = self.get_opt_state();
1878 let lp = self.get_plan_builder().drop_nans(subset).build();
1879 Self::from_logical_plan(lp, opt_state)
1880 }
1881
1882 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1887 let opt_state = self.get_opt_state();
1888 let lp = self.get_plan_builder().drop_nulls(subset).build();
1889 Self::from_logical_plan(lp, opt_state)
1890 }
1891
1892 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1902 let opt_state = self.get_opt_state();
1903 let lp = self.get_plan_builder().slice(offset, len).build();
1904 Self::from_logical_plan(lp, opt_state)
1905 }
1906
1907 pub fn first(self) -> LazyFrame {
1911 self.slice(0, 1)
1912 }
1913
1914 pub fn last(self) -> LazyFrame {
1918 self.slice(-1, 1)
1919 }
1920
1921 pub fn tail(self, n: IdxSize) -> LazyFrame {
1925 let neg_tail = -(n as i64);
1926 self.slice(neg_tail, n)
1927 }
1928
1929 #[cfg(feature = "pivot")]
1933 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1934 let opt_state = self.get_opt_state();
1935 let lp = self.get_plan_builder().unpivot(args).build();
1936 Self::from_logical_plan(lp, opt_state)
1937 }
1938
1939 pub fn limit(self, n: IdxSize) -> LazyFrame {
1941 self.slice(0, n)
1942 }
1943
1944 pub fn map<F>(
1958 self,
1959 function: F,
1960 optimizations: AllowedOptimizations,
1961 schema: Option<Arc<dyn UdfSchema>>,
1962 name: Option<&'static str>,
1963 ) -> LazyFrame
1964 where
1965 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1966 {
1967 let opt_state = self.get_opt_state();
1968 let lp = self
1969 .get_plan_builder()
1970 .map(
1971 function,
1972 optimizations,
1973 schema,
1974 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1975 )
1976 .build();
1977 Self::from_logical_plan(lp, opt_state)
1978 }
1979
1980 #[cfg(feature = "python")]
1981 pub fn map_python(
1982 self,
1983 function: polars_utils::python_function::PythonFunction,
1984 optimizations: AllowedOptimizations,
1985 schema: Option<SchemaRef>,
1986 validate_output: bool,
1987 ) -> LazyFrame {
1988 let opt_state = self.get_opt_state();
1989 let lp = self
1990 .get_plan_builder()
1991 .map_python(function, optimizations, schema, validate_output)
1992 .build();
1993 Self::from_logical_plan(lp, opt_state)
1994 }
1995
1996 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1997 let opt_state = self.get_opt_state();
1998 let lp = self.get_plan_builder().map_private(function).build();
1999 Self::from_logical_plan(lp, opt_state)
2000 }
2001
2002 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2011 where
2012 S: Into<PlSmallStr>,
2013 {
2014 let name = name.into();
2015
2016 match &self.logical_plan {
2017 v @ DslPlan::Scan { scan_type, .. }
2018 if !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2019 {
2020 let DslPlan::Scan {
2021 sources,
2022 mut unified_scan_args,
2023 scan_type,
2024 cached_ir: _,
2025 } = v.clone()
2026 else {
2027 unreachable!()
2028 };
2029
2030 unified_scan_args.row_index = Some(RowIndex {
2031 name,
2032 offset: offset.unwrap_or(0),
2033 });
2034
2035 DslPlan::Scan {
2036 sources,
2037 unified_scan_args,
2038 scan_type,
2039 cached_ir: Default::default(),
2040 }
2041 .into()
2042 },
2043 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2044 }
2045 }
2046
2047 pub fn count(self) -> LazyFrame {
2049 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2050 }
2051
2052 #[cfg(feature = "dtype-struct")]
2055 pub fn unnest(self, cols: Selector) -> Self {
2056 self.map_private(DslFunction::Unnest(cols))
2057 }
2058
2059 #[cfg(feature = "merge_sorted")]
2060 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2061 where
2062 S: Into<PlSmallStr>,
2063 {
2064 let key = key.into();
2065
2066 let lp = DslPlan::MergeSorted {
2067 input_left: Arc::new(self.logical_plan),
2068 input_right: Arc::new(other.logical_plan),
2069 key,
2070 };
2071 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2072 }
2073}
2074
2075#[derive(Clone)]
2077pub struct LazyGroupBy {
2078 pub logical_plan: DslPlan,
2079 opt_state: OptFlags,
2080 keys: Vec<Expr>,
2081 maintain_order: bool,
2082 #[cfg(feature = "dynamic_group_by")]
2083 dynamic_options: Option<DynamicGroupOptions>,
2084 #[cfg(feature = "dynamic_group_by")]
2085 rolling_options: Option<RollingGroupOptions>,
2086}
2087
2088impl From<LazyGroupBy> for LazyFrame {
2089 fn from(lgb: LazyGroupBy) -> Self {
2090 Self {
2091 logical_plan: lgb.logical_plan,
2092 opt_state: lgb.opt_state,
2093 cached_arena: Default::default(),
2094 }
2095 }
2096}
2097
2098impl LazyGroupBy {
2099 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2121 #[cfg(feature = "dynamic_group_by")]
2122 let lp = DslBuilder::from(self.logical_plan)
2123 .group_by(
2124 self.keys,
2125 aggs,
2126 None,
2127 self.maintain_order,
2128 self.dynamic_options,
2129 self.rolling_options,
2130 )
2131 .build();
2132
2133 #[cfg(not(feature = "dynamic_group_by"))]
2134 let lp = DslBuilder::from(self.logical_plan)
2135 .group_by(self.keys, aggs, None, self.maintain_order)
2136 .build();
2137 LazyFrame::from_logical_plan(lp, self.opt_state)
2138 }
2139
2140 pub fn head(self, n: Option<usize>) -> LazyFrame {
2142 let keys = self
2143 .keys
2144 .iter()
2145 .filter_map(|expr| expr_output_name(expr).ok())
2146 .collect::<Vec<_>>();
2147
2148 self.agg([all().as_expr().head(n)])
2149 .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2150 }
2151
2152 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2154 let keys = self
2155 .keys
2156 .iter()
2157 .filter_map(|expr| expr_output_name(expr).ok())
2158 .collect::<Vec<_>>();
2159
2160 self.agg([all().as_expr().tail(n)])
2161 .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2162 }
2163
2164 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2169 #[cfg(feature = "dynamic_group_by")]
2170 let options = GroupbyOptions {
2171 dynamic: self.dynamic_options,
2172 rolling: self.rolling_options,
2173 slice: None,
2174 };
2175
2176 #[cfg(not(feature = "dynamic_group_by"))]
2177 let options = GroupbyOptions { slice: None };
2178
2179 let lp = DslPlan::GroupBy {
2180 input: Arc::new(self.logical_plan),
2181 keys: self.keys,
2182 aggs: vec![],
2183 apply: Some((f, schema)),
2184 maintain_order: self.maintain_order,
2185 options: Arc::new(options),
2186 };
2187 LazyFrame::from_logical_plan(lp, self.opt_state)
2188 }
2189}
2190
2191#[must_use]
2192pub struct JoinBuilder {
2193 lf: LazyFrame,
2194 how: JoinType,
2195 other: Option<LazyFrame>,
2196 left_on: Vec<Expr>,
2197 right_on: Vec<Expr>,
2198 allow_parallel: bool,
2199 force_parallel: bool,
2200 suffix: Option<PlSmallStr>,
2201 validation: JoinValidation,
2202 nulls_equal: bool,
2203 coalesce: JoinCoalesce,
2204 maintain_order: MaintainOrderJoin,
2205}
2206impl JoinBuilder {
2207 pub fn new(lf: LazyFrame) -> Self {
2209 Self {
2210 lf,
2211 other: None,
2212 how: JoinType::Inner,
2213 left_on: vec![],
2214 right_on: vec![],
2215 allow_parallel: true,
2216 force_parallel: false,
2217 suffix: None,
2218 validation: Default::default(),
2219 nulls_equal: false,
2220 coalesce: Default::default(),
2221 maintain_order: Default::default(),
2222 }
2223 }
2224
2225 pub fn with(mut self, other: LazyFrame) -> Self {
2227 self.other = Some(other);
2228 self
2229 }
2230
2231 pub fn how(mut self, how: JoinType) -> Self {
2233 self.how = how;
2234 self
2235 }
2236
2237 pub fn validate(mut self, validation: JoinValidation) -> Self {
2238 self.validation = validation;
2239 self
2240 }
2241
2242 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2246 let on = on.as_ref().to_vec();
2247 self.left_on.clone_from(&on);
2248 self.right_on = on;
2249 self
2250 }
2251
2252 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2256 self.left_on = on.as_ref().to_vec();
2257 self
2258 }
2259
2260 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2264 self.right_on = on.as_ref().to_vec();
2265 self
2266 }
2267
2268 pub fn allow_parallel(mut self, allow: bool) -> Self {
2270 self.allow_parallel = allow;
2271 self
2272 }
2273
2274 pub fn force_parallel(mut self, force: bool) -> Self {
2276 self.force_parallel = force;
2277 self
2278 }
2279
2280 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2282 self.nulls_equal = nulls_equal;
2283 self
2284 }
2285
2286 pub fn suffix<S>(mut self, suffix: S) -> Self
2289 where
2290 S: Into<PlSmallStr>,
2291 {
2292 self.suffix = Some(suffix.into());
2293 self
2294 }
2295
2296 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2298 self.coalesce = coalesce;
2299 self
2300 }
2301
2302 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2304 self.maintain_order = maintain_order;
2305 self
2306 }
2307
2308 pub fn finish(self) -> LazyFrame {
2310 let opt_state = self.lf.opt_state;
2311 let other = self.other.expect("'with' not set in join builder");
2312
2313 let args = JoinArgs {
2314 how: self.how,
2315 validation: self.validation,
2316 suffix: self.suffix,
2317 slice: None,
2318 nulls_equal: self.nulls_equal,
2319 coalesce: self.coalesce,
2320 maintain_order: self.maintain_order,
2321 };
2322
2323 let lp = self
2324 .lf
2325 .get_plan_builder()
2326 .join(
2327 other.logical_plan,
2328 self.left_on,
2329 self.right_on,
2330 JoinOptions {
2331 allow_parallel: self.allow_parallel,
2332 force_parallel: self.force_parallel,
2333 args,
2334 }
2335 .into(),
2336 )
2337 .build();
2338 LazyFrame::from_logical_plan(lp, opt_state)
2339 }
2340
2341 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2343 let opt_state = self.lf.opt_state;
2344 let other = self.other.expect("with not set");
2345
2346 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2348 if let Expr::BinaryExpr {
2349 op: Operator::And,
2350 left,
2351 right,
2352 } = predicate
2353 {
2354 decompose_and((*left).clone(), expanded_predicates);
2355 decompose_and((*right).clone(), expanded_predicates);
2356 } else {
2357 expanded_predicates.push(predicate);
2358 }
2359 }
2360 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2361 for predicate in predicates {
2362 decompose_and(predicate, &mut expanded_predicates);
2363 }
2364 let predicates: Vec<Expr> = expanded_predicates;
2365
2366 #[cfg(feature = "is_between")]
2368 let predicates: Vec<Expr> = {
2369 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2370 for predicate in predicates {
2371 if let Expr::Function {
2372 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2373 input,
2374 ..
2375 } = &predicate
2376 {
2377 if let [expr, lower, upper] = input.as_slice() {
2378 match closed {
2379 ClosedInterval::Both => {
2380 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2381 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2382 },
2383 ClosedInterval::Right => {
2384 expanded_predicates.push(expr.clone().gt(lower.clone()));
2385 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2386 },
2387 ClosedInterval::Left => {
2388 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2389 expanded_predicates.push(expr.clone().lt(upper.clone()));
2390 },
2391 ClosedInterval::None => {
2392 expanded_predicates.push(expr.clone().gt(lower.clone()));
2393 expanded_predicates.push(expr.clone().lt(upper.clone()));
2394 },
2395 }
2396 continue;
2397 }
2398 }
2399 expanded_predicates.push(predicate);
2400 }
2401 expanded_predicates
2402 };
2403
2404 let args = JoinArgs {
2405 how: self.how,
2406 validation: self.validation,
2407 suffix: self.suffix,
2408 slice: None,
2409 nulls_equal: self.nulls_equal,
2410 coalesce: self.coalesce,
2411 maintain_order: self.maintain_order,
2412 };
2413 let options = JoinOptions {
2414 allow_parallel: self.allow_parallel,
2415 force_parallel: self.force_parallel,
2416 args,
2417 };
2418
2419 let lp = DslPlan::Join {
2420 input_left: Arc::new(self.lf.logical_plan),
2421 input_right: Arc::new(other.logical_plan),
2422 left_on: Default::default(),
2423 right_on: Default::default(),
2424 predicates,
2425 options: Arc::from(options),
2426 };
2427
2428 LazyFrame::from_logical_plan(lp, opt_state)
2429 }
2430}
2431
2432pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2433 #[cfg(not(feature = "new_streaming"))]
2434 {
2435 None
2436 }
2437 #[cfg(feature = "new_streaming")]
2438 {
2439 Some(streaming_dispatch::build_streaming_query_executor)
2440 }
2441};
2442#[cfg(feature = "new_streaming")]
2443pub use streaming_dispatch::build_streaming_query_executor;
2444
2445#[cfg(feature = "new_streaming")]
2446mod streaming_dispatch {
2447 use std::sync::{Arc, Mutex};
2448
2449 use polars_core::POOL;
2450 use polars_core::error::PolarsResult;
2451 use polars_core::frame::DataFrame;
2452 use polars_expr::state::ExecutionState;
2453 use polars_mem_engine::Executor;
2454 use polars_plan::dsl::SinkTypeIR;
2455 use polars_plan::plans::{AExpr, IR};
2456 use polars_utils::arena::{Arena, Node};
2457
2458 pub fn build_streaming_query_executor(
2459 node: Node,
2460 ir_arena: &mut Arena<IR>,
2461 expr_arena: &mut Arena<AExpr>,
2462 ) -> PolarsResult<Box<dyn Executor>> {
2463 let rechunk = match ir_arena.get(node) {
2464 IR::Scan {
2465 unified_scan_args, ..
2466 } => unified_scan_args.rechunk,
2467 _ => false,
2468 };
2469
2470 let node = match ir_arena.get(node) {
2471 IR::SinkMultiple { .. } => panic!("SinkMultiple not supported"),
2472 IR::Sink { .. } => node,
2473 _ => ir_arena.add(IR::Sink {
2474 input: node,
2475 payload: SinkTypeIR::Memory,
2476 }),
2477 };
2478
2479 polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2480 .map(Some)
2481 .map(Mutex::new)
2482 .map(Arc::new)
2483 .map(|x| StreamingQueryExecutor {
2484 executor: x,
2485 rechunk,
2486 })
2487 .map(|x| Box::new(x) as Box<dyn Executor>)
2488 }
2489
2490 struct StreamingQueryExecutor {
2492 executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2493 rechunk: bool,
2494 }
2495
2496 impl Executor for StreamingQueryExecutor {
2497 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2498 assert!(POOL.current_thread_index().is_none());
2500
2501 let mut df = { self.executor.try_lock().unwrap().take() }
2502 .expect("unhandled: execute() more than once")
2503 .execute()
2504 .map(|x| x.unwrap_single())?;
2505
2506 if self.rechunk {
2507 df.as_single_chunk_par();
2508 }
2509
2510 Ok(df)
2511 }
2512 }
2513}