1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::num::NonZeroUsize;
13use std::sync::{Arc, Mutex};
14
15pub use anonymous_scan::*;
16#[cfg(feature = "csv")]
17pub use csv::*;
18#[cfg(not(target_arch = "wasm32"))]
19pub use exitable::*;
20pub use file_list_reader::*;
21#[cfg(feature = "ipc")]
22pub use ipc::*;
23#[cfg(feature = "json")]
24pub use ndjson::*;
25#[cfg(feature = "parquet")]
26pub use parquet::*;
27use polars_compute::rolling::QuantileMethod;
28use polars_core::POOL;
29use polars_core::error::feature_gated;
30use polars_core::prelude::*;
31use polars_expr::{ExpressionConversionState, create_physical_expr};
32use polars_io::RowIndex;
33use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
34use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
35#[cfg(feature = "is_between")]
36use polars_ops::prelude::ClosedInterval;
37pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
38use polars_utils::pl_str::PlSmallStr;
39use polars_utils::plpath::PlPath;
40use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
41
42use crate::frame::cached_arenas::CachedArena;
43use crate::prelude::*;
44
45pub trait IntoLazy {
46 fn lazy(self) -> LazyFrame;
47}
48
49impl IntoLazy for DataFrame {
50 fn lazy(self) -> LazyFrame {
52 let lp = DslBuilder::from_existing_df(self).build();
53 LazyFrame {
54 logical_plan: lp,
55 opt_state: Default::default(),
56 cached_arena: Default::default(),
57 }
58 }
59}
60
61impl IntoLazy for LazyFrame {
62 fn lazy(self) -> LazyFrame {
63 self
64 }
65}
66
67#[derive(Clone, Default)]
72#[must_use]
73pub struct LazyFrame {
74 pub logical_plan: DslPlan,
75 pub(crate) opt_state: OptFlags,
76 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
77}
78
79impl From<DslPlan> for LazyFrame {
80 fn from(plan: DslPlan) -> Self {
81 Self {
82 logical_plan: plan,
83 opt_state: OptFlags::default(),
84 cached_arena: Default::default(),
85 }
86 }
87}
88
89impl LazyFrame {
90 pub(crate) fn from_inner(
91 logical_plan: DslPlan,
92 opt_state: OptFlags,
93 cached_arena: Arc<Mutex<Option<CachedArena>>>,
94 ) -> Self {
95 Self {
96 logical_plan,
97 opt_state,
98 cached_arena,
99 }
100 }
101
102 pub(crate) fn get_plan_builder(self) -> DslBuilder {
103 DslBuilder::from(self.logical_plan)
104 }
105
106 fn get_opt_state(&self) -> OptFlags {
107 self.opt_state
108 }
109
110 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
111 LazyFrame {
112 logical_plan,
113 opt_state,
114 cached_arena: Default::default(),
115 }
116 }
117
118 pub fn get_current_optimizations(&self) -> OptFlags {
120 self.opt_state
121 }
122
123 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
125 self.opt_state = opt_state;
126 self
127 }
128
129 pub fn without_optimizations(self) -> Self {
131 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
132 }
133
134 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
136 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
137 self
138 }
139
140 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
142 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
143 self
144 }
145
146 pub fn with_check_order(mut self, toggle: bool) -> Self {
149 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
150 self
151 }
152
153 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
155 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
156 self
157 }
158
159 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
161 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
162 self
163 }
164
165 pub fn with_type_check(mut self, toggle: bool) -> Self {
167 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
168 self
169 }
170
171 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
173 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
174 self
175 }
176
177 #[cfg(feature = "cse")]
179 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
180 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
181 self
182 }
183
184 #[cfg(feature = "cse")]
186 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
187 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
188 self
189 }
190
191 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
193 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
194 self
195 }
196
197 #[cfg(feature = "new_streaming")]
198 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
199 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
200 self
201 }
202
203 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
205 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
206 self
207 }
208
209 pub fn _with_eager(mut self, toggle: bool) -> Self {
211 self.opt_state.set(OptFlags::EAGER, toggle);
212 self
213 }
214
215 pub fn describe_plan(&self) -> PolarsResult<String> {
217 Ok(self.clone().to_alp()?.describe())
218 }
219
220 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
222 Ok(self.clone().to_alp()?.describe_tree_format())
223 }
224
225 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
229 Ok(self.clone().to_alp_optimized()?.describe())
230 }
231
232 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
236 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
237 }
238
239 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
244 if optimized {
245 self.describe_optimized_plan()
246 } else {
247 self.describe_plan()
248 }
249 }
250
251 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
291 let opt_state = self.get_opt_state();
292 let lp = self
293 .get_plan_builder()
294 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
295 .build();
296 Self::from_logical_plan(lp, opt_state)
297 }
298
299 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
319 self,
320 by_exprs: E,
321 sort_options: SortMultipleOptions,
322 ) -> Self {
323 let by_exprs = by_exprs.as_ref().to_vec();
324 if by_exprs.is_empty() {
325 self
326 } else {
327 let opt_state = self.get_opt_state();
328 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
329 Self::from_logical_plan(lp, opt_state)
330 }
331 }
332
333 pub fn top_k<E: AsRef<[Expr]>>(
334 self,
335 k: IdxSize,
336 by_exprs: E,
337 sort_options: SortMultipleOptions,
338 ) -> Self {
339 self.sort_by_exprs(
341 by_exprs,
342 sort_options.with_order_reversed().with_nulls_last(true),
343 )
344 .slice(0, k)
345 }
346
347 pub fn bottom_k<E: AsRef<[Expr]>>(
348 self,
349 k: IdxSize,
350 by_exprs: E,
351 sort_options: SortMultipleOptions,
352 ) -> Self {
353 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
355 .slice(0, k)
356 }
357
358 pub fn reverse(self) -> Self {
374 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
375 }
376
377 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
385 where
386 I: IntoIterator<Item = T>,
387 J: IntoIterator<Item = S>,
388 T: AsRef<str>,
389 S: AsRef<str>,
390 {
391 let iter = existing.into_iter();
392 let cap = iter.size_hint().0;
393 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
394 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
395
396 for (existing, new) in iter.zip(new) {
399 let existing = existing.as_ref();
400 let new = new.as_ref();
401 if new != existing {
402 existing_vec.push(existing.into());
403 new_vec.push(new.into());
404 }
405 }
406
407 self.map_private(DslFunction::Rename {
408 existing: existing_vec.into(),
409 new: new_vec.into(),
410 strict,
411 })
412 }
413
414 pub fn drop(self, columns: Selector) -> Self {
421 let opt_state = self.get_opt_state();
422 let lp = self.get_plan_builder().drop(columns).build();
423 Self::from_logical_plan(lp, opt_state)
424 }
425
426 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
431 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
432 }
433
434 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
439 self.select(vec![
440 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
441 ])
442 }
443
444 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
446 let opt_state = self.get_opt_state();
447 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
448 Self::from_logical_plan(lp, opt_state)
449 }
450
451 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
453 let opt_state = self.get_opt_state();
454 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
455 Self::from_logical_plan(lp, opt_state)
456 }
457
458 pub fn cache(self) -> Self {
462 let opt_state = self.get_opt_state();
463 let lp = self.get_plan_builder().cache().build();
464 Self::from_logical_plan(lp, opt_state)
465 }
466
467 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
469 let cast_cols: Vec<Expr> = dtypes
470 .into_iter()
471 .map(|(name, dt)| {
472 let name = PlSmallStr::from_str(name);
473
474 if strict {
475 col(name).strict_cast(dt)
476 } else {
477 col(name).cast(dt)
478 }
479 })
480 .collect();
481
482 if cast_cols.is_empty() {
483 self
484 } else {
485 self.with_columns(cast_cols)
486 }
487 }
488
489 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
491 self.with_columns(vec![if strict {
492 col(PlSmallStr::from_static("*")).strict_cast(dtype)
493 } else {
494 col(PlSmallStr::from_static("*")).cast(dtype)
495 }])
496 }
497
498 pub fn optimize(
499 self,
500 lp_arena: &mut Arena<IR>,
501 expr_arena: &mut Arena<AExpr>,
502 ) -> PolarsResult<Node> {
503 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
504 }
505
506 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
507 let (mut lp_arena, mut expr_arena) = self.get_arenas();
508 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
509
510 Ok(IRPlan::new(node, lp_arena, expr_arena))
511 }
512
513 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
514 let (mut lp_arena, mut expr_arena) = self.get_arenas();
515 let node = to_alp(
516 self.logical_plan,
517 &mut expr_arena,
518 &mut lp_arena,
519 &mut self.opt_state,
520 )?;
521 let plan = IRPlan::new(node, lp_arena, expr_arena);
522 Ok(plan)
523 }
524
525 pub(crate) fn optimize_with_scratch(
526 self,
527 lp_arena: &mut Arena<IR>,
528 expr_arena: &mut Arena<AExpr>,
529 scratch: &mut Vec<Node>,
530 ) -> PolarsResult<Node> {
531 #[allow(unused_mut)]
532 let mut opt_state = self.opt_state;
533 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
534
535 #[cfg(feature = "cse")]
536 if new_streaming {
537 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
540 }
541
542 let lp_top = optimize(
543 self.logical_plan,
544 opt_state,
545 lp_arena,
546 expr_arena,
547 scratch,
548 Some(&|expr, expr_arena, schema| {
549 let phys_expr = create_physical_expr(
550 expr,
551 Context::Default,
552 expr_arena,
553 schema,
554 &mut ExpressionConversionState::new(true),
555 )
556 .ok()?;
557 let io_expr = phys_expr_to_io_expr(phys_expr);
558 Some(io_expr)
559 }),
560 )?;
561
562 Ok(lp_top)
563 }
564
565 fn prepare_collect_post_opt<P>(
566 mut self,
567 check_sink: bool,
568 query_start: Option<std::time::Instant>,
569 post_opt: P,
570 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
571 where
572 P: FnOnce(
573 Node,
574 &mut Arena<IR>,
575 &mut Arena<AExpr>,
576 Option<std::time::Duration>,
577 ) -> PolarsResult<()>,
578 {
579 let (mut lp_arena, mut expr_arena) = self.get_arenas();
580
581 let mut scratch = vec![];
582 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
583
584 post_opt(
585 lp_top,
586 &mut lp_arena,
587 &mut expr_arena,
588 query_start.map(|s| s.elapsed()),
591 )?;
592
593 let no_file_sink = if check_sink {
595 !matches!(
596 lp_arena.get(lp_top),
597 IR::Sink {
598 payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
599 ..
600 }
601 )
602 } else {
603 true
604 };
605 let physical_plan = create_physical_plan(
606 lp_top,
607 &mut lp_arena,
608 &mut expr_arena,
609 BUILD_STREAMING_EXECUTOR,
610 )?;
611
612 let state = ExecutionState::new();
613 Ok((state, physical_plan, no_file_sink))
614 }
615
616 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
618 where
619 P: FnOnce(
620 Node,
621 &mut Arena<IR>,
622 &mut Arena<AExpr>,
623 Option<std::time::Duration>,
624 ) -> PolarsResult<()>,
625 {
626 let (mut state, mut physical_plan, _) =
627 self.prepare_collect_post_opt(false, None, post_opt)?;
628 physical_plan.execute(&mut state)
629 }
630
631 #[allow(unused_mut)]
632 fn prepare_collect(
633 self,
634 check_sink: bool,
635 query_start: Option<std::time::Instant>,
636 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
637 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
638 }
639
640 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
645 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
646 payload.clone()
647 } else {
648 self.logical_plan = DslPlan::Sink {
649 input: Arc::new(self.logical_plan),
650 payload: SinkType::Memory,
651 };
652 SinkType::Memory
653 };
654
655 if engine == Engine::Auto {
657 engine = match payload {
658 #[cfg(feature = "new_streaming")]
659 SinkType::Callback { .. } | SinkType::File { .. } | SinkType::Partition { .. } => {
660 Engine::Streaming
661 },
662 _ => Engine::InMemory,
663 };
664 }
665 if engine == Engine::Gpu {
667 engine = Engine::InMemory;
668 }
669
670 #[cfg(feature = "new_streaming")]
671 {
672 if let Some(result) = self.try_new_streaming_if_requested() {
673 return result.map(|v| v.unwrap_single());
674 }
675 }
676
677 match engine {
678 Engine::Auto => unreachable!(),
679 Engine::Streaming => {
680 feature_gated!("new_streaming", self = self.with_new_streaming(true))
681 },
682 _ => {},
683 }
684 let mut alp_plan = self.clone().to_alp_optimized()?;
685
686 match engine {
687 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
688 let result = polars_stream::run_query(
689 alp_plan.lp_top,
690 &mut alp_plan.lp_arena,
691 &mut alp_plan.expr_arena,
692 );
693 result.map(|v| v.unwrap_single())
694 }),
695 Engine::Gpu => {
696 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
697 },
698 Engine::InMemory => {
699 let mut physical_plan = create_physical_plan(
700 alp_plan.lp_top,
701 &mut alp_plan.lp_arena,
702 &mut alp_plan.expr_arena,
703 BUILD_STREAMING_EXECUTOR,
704 )?;
705 let mut state = ExecutionState::new();
706 physical_plan.execute(&mut state)
707 },
708 }
709 }
710
711 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
712 let sink_multiple = LazyFrame {
713 logical_plan: DslPlan::SinkMultiple { inputs: plans },
714 opt_state,
715 cached_arena: Default::default(),
716 };
717 sink_multiple.explain(true)
718 }
719
720 pub fn collect_all_with_engine(
721 plans: Vec<DslPlan>,
722 mut engine: Engine,
723 opt_state: OptFlags,
724 ) -> PolarsResult<Vec<DataFrame>> {
725 if plans.is_empty() {
726 return Ok(Vec::new());
727 }
728
729 if engine == Engine::Auto {
731 engine = Engine::InMemory;
732 }
733 if engine == Engine::Gpu {
735 engine = Engine::InMemory;
736 }
737
738 let mut sink_multiple = LazyFrame {
739 logical_plan: DslPlan::SinkMultiple { inputs: plans },
740 opt_state,
741 cached_arena: Default::default(),
742 };
743
744 #[cfg(feature = "new_streaming")]
745 {
746 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
747 return result.map(|v| v.unwrap_multiple());
748 }
749 }
750
751 match engine {
752 Engine::Auto => unreachable!(),
753 Engine::Streaming => {
754 feature_gated!(
755 "new_streaming",
756 sink_multiple = sink_multiple.with_new_streaming(true)
757 )
758 },
759 _ => {},
760 }
761 let mut alp_plan = sink_multiple.to_alp_optimized()?;
762
763 if engine == Engine::Streaming {
764 feature_gated!("new_streaming", {
765 let result = polars_stream::run_query(
766 alp_plan.lp_top,
767 &mut alp_plan.lp_arena,
768 &mut alp_plan.expr_arena,
769 );
770 return result.map(|v| v.unwrap_multiple());
771 });
772 }
773
774 let IR::SinkMultiple { inputs } = alp_plan.root() else {
775 unreachable!()
776 };
777
778 let mut multiplan = create_multiple_physical_plans(
779 inputs.clone().as_slice(),
780 &mut alp_plan.lp_arena,
781 &mut alp_plan.expr_arena,
782 BUILD_STREAMING_EXECUTOR,
783 )?;
784
785 match engine {
786 Engine::Gpu => polars_bail!(
787 InvalidOperation: "collect_all is not supported for the gpu engine"
788 ),
789 Engine::InMemory => {
790 let mut state = ExecutionState::new();
794 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
795 cache_prefiller.execute(&mut state)?;
796 }
797 let out = POOL.install(|| {
798 multiplan
799 .physical_plans
800 .chunks_mut(POOL.current_num_threads() * 3)
801 .map(|chunk| {
802 chunk
803 .into_par_iter()
804 .enumerate()
805 .map(|(idx, input)| {
806 let mut input = std::mem::take(input);
807 let mut state = state.split();
808 state.branch_idx += idx;
809
810 let df = input.execute(&mut state)?;
811 Ok(df)
812 })
813 .collect::<PolarsResult<Vec<_>>>()
814 })
815 .collect::<PolarsResult<Vec<_>>>()
816 });
817 Ok(out?.into_iter().flatten().collect())
818 },
819 _ => unreachable!(),
820 }
821 }
822
823 pub fn collect(self) -> PolarsResult<DataFrame> {
841 self.collect_with_engine(Engine::InMemory)
842 }
843
844 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
847 where
848 P: FnOnce(
849 Node,
850 &mut Arena<IR>,
851 &mut Arena<AExpr>,
852 Option<std::time::Duration>,
853 ) -> PolarsResult<()>,
854 {
855 let query_start = std::time::Instant::now();
856 let (mut state, mut physical_plan, _) =
857 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
858 state.time_nodes(query_start);
859 let out = physical_plan.execute(&mut state)?;
860 let timer_df = state.finish_timer()?;
861 Ok((out, timer_df))
862 }
863
864 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
872 self._profile_post_opt(|_, _, _, _| Ok(()))
873 }
874
875 #[cfg(feature = "parquet")]
879 pub fn sink_parquet(
880 self,
881 target: SinkTarget,
882 options: ParquetWriteOptions,
883 cloud_options: Option<polars_io::cloud::CloudOptions>,
884 sink_options: SinkOptions,
885 ) -> PolarsResult<Self> {
886 self.sink(SinkType::File(FileSinkType {
887 target,
888 sink_options,
889 file_type: FileType::Parquet(options),
890 cloud_options,
891 }))
892 }
893
894 #[cfg(feature = "ipc")]
898 pub fn sink_ipc(
899 self,
900 target: SinkTarget,
901 options: IpcWriterOptions,
902 cloud_options: Option<polars_io::cloud::CloudOptions>,
903 sink_options: SinkOptions,
904 ) -> PolarsResult<Self> {
905 self.sink(SinkType::File(FileSinkType {
906 target,
907 sink_options,
908 file_type: FileType::Ipc(options),
909 cloud_options,
910 }))
911 }
912
913 #[cfg(feature = "csv")]
917 pub fn sink_csv(
918 self,
919 target: SinkTarget,
920 options: CsvWriterOptions,
921 cloud_options: Option<polars_io::cloud::CloudOptions>,
922 sink_options: SinkOptions,
923 ) -> PolarsResult<Self> {
924 self.sink(SinkType::File(FileSinkType {
925 target,
926 sink_options,
927 file_type: FileType::Csv(options),
928 cloud_options,
929 }))
930 }
931
932 #[cfg(feature = "json")]
936 pub fn sink_json(
937 self,
938 target: SinkTarget,
939 options: JsonWriterOptions,
940 cloud_options: Option<polars_io::cloud::CloudOptions>,
941 sink_options: SinkOptions,
942 ) -> PolarsResult<Self> {
943 self.sink(SinkType::File(FileSinkType {
944 target,
945 sink_options,
946 file_type: FileType::Json(options),
947 cloud_options,
948 }))
949 }
950
951 #[cfg(feature = "parquet")]
955 #[allow(clippy::too_many_arguments)]
956 pub fn sink_parquet_partitioned(
957 self,
958 base_path: Arc<PlPath>,
959 file_path_cb: Option<PartitionTargetCallback>,
960 variant: PartitionVariant,
961 options: ParquetWriteOptions,
962 cloud_options: Option<polars_io::cloud::CloudOptions>,
963 sink_options: SinkOptions,
964 per_partition_sort_by: Option<Vec<SortColumn>>,
965 finish_callback: Option<SinkFinishCallback>,
966 ) -> PolarsResult<Self> {
967 self.sink(SinkType::Partition(PartitionSinkType {
968 base_path,
969 file_path_cb,
970 sink_options,
971 variant,
972 file_type: FileType::Parquet(options),
973 cloud_options,
974 per_partition_sort_by,
975 finish_callback,
976 }))
977 }
978
979 #[cfg(feature = "ipc")]
983 #[allow(clippy::too_many_arguments)]
984 pub fn sink_ipc_partitioned(
985 self,
986 base_path: Arc<PlPath>,
987 file_path_cb: Option<PartitionTargetCallback>,
988 variant: PartitionVariant,
989 options: IpcWriterOptions,
990 cloud_options: Option<polars_io::cloud::CloudOptions>,
991 sink_options: SinkOptions,
992 per_partition_sort_by: Option<Vec<SortColumn>>,
993 finish_callback: Option<SinkFinishCallback>,
994 ) -> PolarsResult<Self> {
995 self.sink(SinkType::Partition(PartitionSinkType {
996 base_path,
997 file_path_cb,
998 sink_options,
999 variant,
1000 file_type: FileType::Ipc(options),
1001 cloud_options,
1002 per_partition_sort_by,
1003 finish_callback,
1004 }))
1005 }
1006
1007 #[cfg(feature = "csv")]
1011 #[allow(clippy::too_many_arguments)]
1012 pub fn sink_csv_partitioned(
1013 self,
1014 base_path: Arc<PlPath>,
1015 file_path_cb: Option<PartitionTargetCallback>,
1016 variant: PartitionVariant,
1017 options: CsvWriterOptions,
1018 cloud_options: Option<polars_io::cloud::CloudOptions>,
1019 sink_options: SinkOptions,
1020 per_partition_sort_by: Option<Vec<SortColumn>>,
1021 finish_callback: Option<SinkFinishCallback>,
1022 ) -> PolarsResult<Self> {
1023 self.sink(SinkType::Partition(PartitionSinkType {
1024 base_path,
1025 file_path_cb,
1026 sink_options,
1027 variant,
1028 file_type: FileType::Csv(options),
1029 cloud_options,
1030 per_partition_sort_by,
1031 finish_callback,
1032 }))
1033 }
1034
1035 #[cfg(feature = "json")]
1039 #[allow(clippy::too_many_arguments)]
1040 pub fn sink_json_partitioned(
1041 self,
1042 base_path: Arc<PlPath>,
1043 file_path_cb: Option<PartitionTargetCallback>,
1044 variant: PartitionVariant,
1045 options: JsonWriterOptions,
1046 cloud_options: Option<polars_io::cloud::CloudOptions>,
1047 sink_options: SinkOptions,
1048 per_partition_sort_by: Option<Vec<SortColumn>>,
1049 finish_callback: Option<SinkFinishCallback>,
1050 ) -> PolarsResult<Self> {
1051 self.sink(SinkType::Partition(PartitionSinkType {
1052 base_path,
1053 file_path_cb,
1054 sink_options,
1055 variant,
1056 file_type: FileType::Json(options),
1057 cloud_options,
1058 per_partition_sort_by,
1059 finish_callback,
1060 }))
1061 }
1062
1063 pub fn sink_batches(
1064 self,
1065 function: PlanCallback<DataFrame, bool>,
1066 maintain_order: bool,
1067 chunk_size: Option<NonZeroUsize>,
1068 ) -> PolarsResult<Self> {
1069 self.sink(SinkType::Callback(CallbackSinkType {
1070 function,
1071 maintain_order,
1072 chunk_size,
1073 }))
1074 }
1075
1076 #[cfg(feature = "new_streaming")]
1077 pub fn try_new_streaming_if_requested(
1078 &mut self,
1079 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1080 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1081 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1082
1083 if auto_new_streaming || force_new_streaming {
1084 let mut new_stream_lazy = self.clone();
1087 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1088 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1089 Ok(v) => v,
1090 Err(e) => return Some(Err(e)),
1091 };
1092
1093 let f = || {
1094 polars_stream::run_query(
1095 alp_plan.lp_top,
1096 &mut alp_plan.lp_arena,
1097 &mut alp_plan.expr_arena,
1098 )
1099 };
1100
1101 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1102 Ok(v) => return Some(v),
1103 Err(e) => {
1104 if !force_new_streaming
1107 && auto_new_streaming
1108 && e.downcast_ref::<&str>()
1109 .map(|s| s.starts_with("not yet implemented"))
1110 .unwrap_or(false)
1111 {
1112 if polars_core::config::verbose() {
1113 eprintln!(
1114 "caught unimplemented error in new streaming engine, falling back to normal engine"
1115 );
1116 }
1117 } else {
1118 std::panic::resume_unwind(e);
1119 }
1120 },
1121 }
1122 }
1123
1124 None
1125 }
1126
1127 fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1128 polars_ensure!(
1129 !matches!(self.logical_plan, DslPlan::Sink { .. }),
1130 InvalidOperation: "cannot create a sink on top of another sink"
1131 );
1132 self.logical_plan = DslPlan::Sink {
1133 input: Arc::new(self.logical_plan),
1134 payload,
1135 };
1136 Ok(self)
1137 }
1138
1139 pub fn filter(self, predicate: Expr) -> Self {
1157 let opt_state = self.get_opt_state();
1158 let lp = self.get_plan_builder().filter(predicate).build();
1159 Self::from_logical_plan(lp, opt_state)
1160 }
1161
1162 pub fn remove(self, predicate: Expr) -> Self {
1180 self.filter(predicate.neq_missing(lit(true)))
1181 }
1182
1183 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1209 let exprs = exprs.as_ref().to_vec();
1210 self.select_impl(
1211 exprs,
1212 ProjectionOptions {
1213 run_parallel: true,
1214 duplicate_check: true,
1215 should_broadcast: true,
1216 },
1217 )
1218 }
1219
1220 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1221 let exprs = exprs.as_ref().to_vec();
1222 self.select_impl(
1223 exprs,
1224 ProjectionOptions {
1225 run_parallel: false,
1226 duplicate_check: true,
1227 should_broadcast: true,
1228 },
1229 )
1230 }
1231
1232 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1233 let opt_state = self.get_opt_state();
1234 let lp = self.get_plan_builder().project(exprs, options).build();
1235 Self::from_logical_plan(lp, opt_state)
1236 }
1237
1238 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1259 let keys = by
1260 .as_ref()
1261 .iter()
1262 .map(|e| e.clone().into())
1263 .collect::<Vec<_>>();
1264 let opt_state = self.get_opt_state();
1265
1266 #[cfg(feature = "dynamic_group_by")]
1267 {
1268 LazyGroupBy {
1269 logical_plan: self.logical_plan,
1270 opt_state,
1271 keys,
1272 maintain_order: false,
1273 dynamic_options: None,
1274 rolling_options: None,
1275 }
1276 }
1277
1278 #[cfg(not(feature = "dynamic_group_by"))]
1279 {
1280 LazyGroupBy {
1281 logical_plan: self.logical_plan,
1282 opt_state,
1283 keys,
1284 maintain_order: false,
1285 }
1286 }
1287 }
1288
1289 #[cfg(feature = "dynamic_group_by")]
1297 pub fn rolling<E: AsRef<[Expr]>>(
1298 mut self,
1299 index_column: Expr,
1300 group_by: E,
1301 mut options: RollingGroupOptions,
1302 ) -> LazyGroupBy {
1303 if let Expr::Column(name) = index_column {
1304 options.index_column = name;
1305 } else {
1306 let output_field = index_column
1307 .to_field(&self.collect_schema().unwrap())
1308 .unwrap();
1309 return self.with_column(index_column).rolling(
1310 Expr::Column(output_field.name().clone()),
1311 group_by,
1312 options,
1313 );
1314 }
1315 let opt_state = self.get_opt_state();
1316 LazyGroupBy {
1317 logical_plan: self.logical_plan,
1318 opt_state,
1319 keys: group_by.as_ref().to_vec(),
1320 maintain_order: true,
1321 dynamic_options: None,
1322 rolling_options: Some(options),
1323 }
1324 }
1325
1326 #[cfg(feature = "dynamic_group_by")]
1342 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1343 mut self,
1344 index_column: Expr,
1345 group_by: E,
1346 mut options: DynamicGroupOptions,
1347 ) -> LazyGroupBy {
1348 if let Expr::Column(name) = index_column {
1349 options.index_column = name;
1350 } else {
1351 let output_field = index_column
1352 .to_field(&self.collect_schema().unwrap())
1353 .unwrap();
1354 return self.with_column(index_column).group_by_dynamic(
1355 Expr::Column(output_field.name().clone()),
1356 group_by,
1357 options,
1358 );
1359 }
1360 let opt_state = self.get_opt_state();
1361 LazyGroupBy {
1362 logical_plan: self.logical_plan,
1363 opt_state,
1364 keys: group_by.as_ref().to_vec(),
1365 maintain_order: true,
1366 dynamic_options: Some(options),
1367 rolling_options: None,
1368 }
1369 }
1370
1371 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1373 let keys = by
1374 .as_ref()
1375 .iter()
1376 .map(|e| e.clone().into())
1377 .collect::<Vec<_>>();
1378 let opt_state = self.get_opt_state();
1379
1380 #[cfg(feature = "dynamic_group_by")]
1381 {
1382 LazyGroupBy {
1383 logical_plan: self.logical_plan,
1384 opt_state,
1385 keys,
1386 maintain_order: true,
1387 dynamic_options: None,
1388 rolling_options: None,
1389 }
1390 }
1391
1392 #[cfg(not(feature = "dynamic_group_by"))]
1393 {
1394 LazyGroupBy {
1395 logical_plan: self.logical_plan,
1396 opt_state,
1397 keys,
1398 maintain_order: true,
1399 }
1400 }
1401 }
1402
1403 #[cfg(feature = "semi_anti_join")]
1420 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1421 self.join(
1422 other,
1423 [left_on.into()],
1424 [right_on.into()],
1425 JoinArgs::new(JoinType::Anti),
1426 )
1427 }
1428
1429 #[cfg(feature = "cross_join")]
1431 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1432 self.join(
1433 other,
1434 vec![],
1435 vec![],
1436 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1437 )
1438 }
1439
1440 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1457 self.join(
1458 other,
1459 [left_on.into()],
1460 [right_on.into()],
1461 JoinArgs::new(JoinType::Left),
1462 )
1463 }
1464
1465 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1482 self.join(
1483 other,
1484 [left_on.into()],
1485 [right_on.into()],
1486 JoinArgs::new(JoinType::Inner),
1487 )
1488 }
1489
1490 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1507 self.join(
1508 other,
1509 [left_on.into()],
1510 [right_on.into()],
1511 JoinArgs::new(JoinType::Full),
1512 )
1513 }
1514
1515 #[cfg(feature = "semi_anti_join")]
1532 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1533 self.join(
1534 other,
1535 [left_on.into()],
1536 [right_on.into()],
1537 JoinArgs::new(JoinType::Semi),
1538 )
1539 }
1540
1541 pub fn join<E: AsRef<[Expr]>>(
1563 self,
1564 other: LazyFrame,
1565 left_on: E,
1566 right_on: E,
1567 args: JoinArgs,
1568 ) -> LazyFrame {
1569 let left_on = left_on.as_ref().to_vec();
1570 let right_on = right_on.as_ref().to_vec();
1571
1572 self._join_impl(other, left_on, right_on, args)
1573 }
1574
1575 fn _join_impl(
1576 self,
1577 other: LazyFrame,
1578 left_on: Vec<Expr>,
1579 right_on: Vec<Expr>,
1580 args: JoinArgs,
1581 ) -> LazyFrame {
1582 let JoinArgs {
1583 how,
1584 validation,
1585 suffix,
1586 slice,
1587 nulls_equal,
1588 coalesce,
1589 maintain_order,
1590 } = args;
1591
1592 if slice.is_some() {
1593 panic!("impl error: slice is not handled")
1594 }
1595
1596 let mut builder = self
1597 .join_builder()
1598 .with(other)
1599 .left_on(left_on)
1600 .right_on(right_on)
1601 .how(how)
1602 .validate(validation)
1603 .join_nulls(nulls_equal)
1604 .coalesce(coalesce)
1605 .maintain_order(maintain_order);
1606
1607 if let Some(suffix) = suffix {
1608 builder = builder.suffix(suffix);
1609 }
1610
1611 builder.finish()
1613 }
1614
1615 pub fn join_builder(self) -> JoinBuilder {
1621 JoinBuilder::new(self)
1622 }
1623
1624 pub fn with_column(self, expr: Expr) -> LazyFrame {
1642 let opt_state = self.get_opt_state();
1643 let lp = self
1644 .get_plan_builder()
1645 .with_columns(
1646 vec![expr],
1647 ProjectionOptions {
1648 run_parallel: false,
1649 duplicate_check: true,
1650 should_broadcast: true,
1651 },
1652 )
1653 .build();
1654 Self::from_logical_plan(lp, opt_state)
1655 }
1656
1657 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1672 let exprs = exprs.as_ref().to_vec();
1673 self.with_columns_impl(
1674 exprs,
1675 ProjectionOptions {
1676 run_parallel: true,
1677 duplicate_check: true,
1678 should_broadcast: true,
1679 },
1680 )
1681 }
1682
1683 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1685 let exprs = exprs.as_ref().to_vec();
1686 self.with_columns_impl(
1687 exprs,
1688 ProjectionOptions {
1689 run_parallel: false,
1690 duplicate_check: true,
1691 should_broadcast: true,
1692 },
1693 )
1694 }
1695
1696 pub fn match_to_schema(
1698 self,
1699 schema: SchemaRef,
1700 per_column: Arc<[MatchToSchemaPerColumn]>,
1701 extra_columns: ExtraColumnsPolicy,
1702 ) -> LazyFrame {
1703 let opt_state = self.get_opt_state();
1704 let lp = self
1705 .get_plan_builder()
1706 .match_to_schema(schema, per_column, extra_columns)
1707 .build();
1708 Self::from_logical_plan(lp, opt_state)
1709 }
1710
1711 pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1712 let opt_state = self.get_opt_state();
1713 let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1714 Self::from_logical_plan(lp, opt_state)
1715 }
1716
1717 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1718 let opt_state = self.get_opt_state();
1719 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1720 Self::from_logical_plan(lp, opt_state)
1721 }
1722
1723 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1724 let contexts = contexts
1725 .as_ref()
1726 .iter()
1727 .map(|lf| lf.logical_plan.clone())
1728 .collect();
1729 let opt_state = self.get_opt_state();
1730 let lp = self.get_plan_builder().with_context(contexts).build();
1731 Self::from_logical_plan(lp, opt_state)
1732 }
1733
1734 pub fn max(self) -> Self {
1738 self.map_private(DslFunction::Stats(StatsFunction::Max))
1739 }
1740
1741 pub fn min(self) -> Self {
1745 self.map_private(DslFunction::Stats(StatsFunction::Min))
1746 }
1747
1748 pub fn sum(self) -> Self {
1758 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1759 }
1760
1761 pub fn mean(self) -> Self {
1766 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1767 }
1768
1769 pub fn median(self) -> Self {
1775 self.map_private(DslFunction::Stats(StatsFunction::Median))
1776 }
1777
1778 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1780 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1781 quantile,
1782 method,
1783 }))
1784 }
1785
1786 pub fn std(self, ddof: u8) -> Self {
1799 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1800 }
1801
1802 pub fn var(self, ddof: u8) -> Self {
1812 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1813 }
1814
1815 pub fn explode(self, columns: Selector) -> LazyFrame {
1817 self.explode_impl(columns, false)
1818 }
1819
1820 fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1822 let opt_state = self.get_opt_state();
1823 let lp = self
1824 .get_plan_builder()
1825 .explode(columns, allow_empty)
1826 .build();
1827 Self::from_logical_plan(lp, opt_state)
1828 }
1829
1830 pub fn null_count(self) -> LazyFrame {
1832 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1833 }
1834
1835 pub fn unique_stable(
1840 self,
1841 subset: Option<Selector>,
1842 keep_strategy: UniqueKeepStrategy,
1843 ) -> LazyFrame {
1844 self.unique_stable_generic(subset, keep_strategy)
1845 }
1846
1847 pub fn unique_stable_generic(
1848 self,
1849 subset: Option<Selector>,
1850 keep_strategy: UniqueKeepStrategy,
1851 ) -> LazyFrame {
1852 let opt_state = self.get_opt_state();
1853 let options = DistinctOptionsDSL {
1854 subset,
1855 maintain_order: true,
1856 keep_strategy,
1857 };
1858 let lp = self.get_plan_builder().distinct(options).build();
1859 Self::from_logical_plan(lp, opt_state)
1860 }
1861
1862 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1870 self.unique_generic(subset, keep_strategy)
1871 }
1872
1873 pub fn unique_generic(
1874 self,
1875 subset: Option<Selector>,
1876 keep_strategy: UniqueKeepStrategy,
1877 ) -> LazyFrame {
1878 let opt_state = self.get_opt_state();
1879 let options = DistinctOptionsDSL {
1880 subset,
1881 maintain_order: false,
1882 keep_strategy,
1883 };
1884 let lp = self.get_plan_builder().distinct(options).build();
1885 Self::from_logical_plan(lp, opt_state)
1886 }
1887
1888 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1893 let opt_state = self.get_opt_state();
1894 let lp = self.get_plan_builder().drop_nans(subset).build();
1895 Self::from_logical_plan(lp, opt_state)
1896 }
1897
1898 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1903 let opt_state = self.get_opt_state();
1904 let lp = self.get_plan_builder().drop_nulls(subset).build();
1905 Self::from_logical_plan(lp, opt_state)
1906 }
1907
1908 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1918 let opt_state = self.get_opt_state();
1919 let lp = self.get_plan_builder().slice(offset, len).build();
1920 Self::from_logical_plan(lp, opt_state)
1921 }
1922
1923 pub fn first(self) -> LazyFrame {
1927 self.slice(0, 1)
1928 }
1929
1930 pub fn last(self) -> LazyFrame {
1934 self.slice(-1, 1)
1935 }
1936
1937 pub fn tail(self, n: IdxSize) -> LazyFrame {
1941 let neg_tail = -(n as i64);
1942 self.slice(neg_tail, n)
1943 }
1944
1945 #[cfg(feature = "pivot")]
1949 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1950 let opt_state = self.get_opt_state();
1951 let lp = self.get_plan_builder().unpivot(args).build();
1952 Self::from_logical_plan(lp, opt_state)
1953 }
1954
1955 pub fn limit(self, n: IdxSize) -> LazyFrame {
1957 self.slice(0, n)
1958 }
1959
1960 pub fn map<F>(
1974 self,
1975 function: F,
1976 optimizations: AllowedOptimizations,
1977 schema: Option<Arc<dyn UdfSchema>>,
1978 name: Option<&'static str>,
1979 ) -> LazyFrame
1980 where
1981 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1982 {
1983 let opt_state = self.get_opt_state();
1984 let lp = self
1985 .get_plan_builder()
1986 .map(
1987 function,
1988 optimizations,
1989 schema,
1990 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1991 )
1992 .build();
1993 Self::from_logical_plan(lp, opt_state)
1994 }
1995
1996 #[cfg(feature = "python")]
1997 pub fn map_python(
1998 self,
1999 function: polars_utils::python_function::PythonFunction,
2000 optimizations: AllowedOptimizations,
2001 schema: Option<SchemaRef>,
2002 validate_output: bool,
2003 ) -> LazyFrame {
2004 let opt_state = self.get_opt_state();
2005 let lp = self
2006 .get_plan_builder()
2007 .map_python(function, optimizations, schema, validate_output)
2008 .build();
2009 Self::from_logical_plan(lp, opt_state)
2010 }
2011
2012 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2013 let opt_state = self.get_opt_state();
2014 let lp = self.get_plan_builder().map_private(function).build();
2015 Self::from_logical_plan(lp, opt_state)
2016 }
2017
2018 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2027 where
2028 S: Into<PlSmallStr>,
2029 {
2030 let name = name.into();
2031
2032 match &self.logical_plan {
2033 v @ DslPlan::Scan { scan_type, .. }
2034 if !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2035 {
2036 let DslPlan::Scan {
2037 sources,
2038 mut unified_scan_args,
2039 scan_type,
2040 cached_ir: _,
2041 } = v.clone()
2042 else {
2043 unreachable!()
2044 };
2045
2046 unified_scan_args.row_index = Some(RowIndex {
2047 name,
2048 offset: offset.unwrap_or(0),
2049 });
2050
2051 DslPlan::Scan {
2052 sources,
2053 unified_scan_args,
2054 scan_type,
2055 cached_ir: Default::default(),
2056 }
2057 .into()
2058 },
2059 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2060 }
2061 }
2062
2063 pub fn count(self) -> LazyFrame {
2065 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2066 }
2067
2068 #[cfg(feature = "dtype-struct")]
2071 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2072 self.map_private(DslFunction::Unnest {
2073 columns: cols,
2074 separator,
2075 })
2076 }
2077
2078 #[cfg(feature = "merge_sorted")]
2079 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2080 where
2081 S: Into<PlSmallStr>,
2082 {
2083 let key = key.into();
2084
2085 let lp = DslPlan::MergeSorted {
2086 input_left: Arc::new(self.logical_plan),
2087 input_right: Arc::new(other.logical_plan),
2088 key,
2089 };
2090 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2091 }
2092}
2093
2094#[derive(Clone)]
2096pub struct LazyGroupBy {
2097 pub logical_plan: DslPlan,
2098 opt_state: OptFlags,
2099 keys: Vec<Expr>,
2100 maintain_order: bool,
2101 #[cfg(feature = "dynamic_group_by")]
2102 dynamic_options: Option<DynamicGroupOptions>,
2103 #[cfg(feature = "dynamic_group_by")]
2104 rolling_options: Option<RollingGroupOptions>,
2105}
2106
2107impl From<LazyGroupBy> for LazyFrame {
2108 fn from(lgb: LazyGroupBy) -> Self {
2109 Self {
2110 logical_plan: lgb.logical_plan,
2111 opt_state: lgb.opt_state,
2112 cached_arena: Default::default(),
2113 }
2114 }
2115}
2116
2117impl LazyGroupBy {
2118 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2140 #[cfg(feature = "dynamic_group_by")]
2141 let lp = DslBuilder::from(self.logical_plan)
2142 .group_by(
2143 self.keys,
2144 aggs,
2145 None,
2146 self.maintain_order,
2147 self.dynamic_options,
2148 self.rolling_options,
2149 )
2150 .build();
2151
2152 #[cfg(not(feature = "dynamic_group_by"))]
2153 let lp = DslBuilder::from(self.logical_plan)
2154 .group_by(self.keys, aggs, None, self.maintain_order)
2155 .build();
2156 LazyFrame::from_logical_plan(lp, self.opt_state)
2157 }
2158
2159 pub fn head(self, n: Option<usize>) -> LazyFrame {
2161 let keys = self
2162 .keys
2163 .iter()
2164 .filter_map(|expr| expr_output_name(expr).ok())
2165 .collect::<Vec<_>>();
2166
2167 self.agg([all().as_expr().head(n)])
2168 .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2169 }
2170
2171 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2173 let keys = self
2174 .keys
2175 .iter()
2176 .filter_map(|expr| expr_output_name(expr).ok())
2177 .collect::<Vec<_>>();
2178
2179 self.agg([all().as_expr().tail(n)])
2180 .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2181 }
2182
2183 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2188 #[cfg(feature = "dynamic_group_by")]
2189 let options = GroupbyOptions {
2190 dynamic: self.dynamic_options,
2191 rolling: self.rolling_options,
2192 slice: None,
2193 };
2194
2195 #[cfg(not(feature = "dynamic_group_by"))]
2196 let options = GroupbyOptions { slice: None };
2197
2198 let lp = DslPlan::GroupBy {
2199 input: Arc::new(self.logical_plan),
2200 keys: self.keys,
2201 aggs: vec![],
2202 apply: Some((f, schema)),
2203 maintain_order: self.maintain_order,
2204 options: Arc::new(options),
2205 };
2206 LazyFrame::from_logical_plan(lp, self.opt_state)
2207 }
2208}
2209
2210#[must_use]
2211pub struct JoinBuilder {
2212 lf: LazyFrame,
2213 how: JoinType,
2214 other: Option<LazyFrame>,
2215 left_on: Vec<Expr>,
2216 right_on: Vec<Expr>,
2217 allow_parallel: bool,
2218 force_parallel: bool,
2219 suffix: Option<PlSmallStr>,
2220 validation: JoinValidation,
2221 nulls_equal: bool,
2222 coalesce: JoinCoalesce,
2223 maintain_order: MaintainOrderJoin,
2224}
2225impl JoinBuilder {
2226 pub fn new(lf: LazyFrame) -> Self {
2228 Self {
2229 lf,
2230 other: None,
2231 how: JoinType::Inner,
2232 left_on: vec![],
2233 right_on: vec![],
2234 allow_parallel: true,
2235 force_parallel: false,
2236 suffix: None,
2237 validation: Default::default(),
2238 nulls_equal: false,
2239 coalesce: Default::default(),
2240 maintain_order: Default::default(),
2241 }
2242 }
2243
2244 pub fn with(mut self, other: LazyFrame) -> Self {
2246 self.other = Some(other);
2247 self
2248 }
2249
2250 pub fn how(mut self, how: JoinType) -> Self {
2252 self.how = how;
2253 self
2254 }
2255
2256 pub fn validate(mut self, validation: JoinValidation) -> Self {
2257 self.validation = validation;
2258 self
2259 }
2260
2261 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2265 let on = on.as_ref().to_vec();
2266 self.left_on.clone_from(&on);
2267 self.right_on = on;
2268 self
2269 }
2270
2271 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2275 self.left_on = on.as_ref().to_vec();
2276 self
2277 }
2278
2279 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2283 self.right_on = on.as_ref().to_vec();
2284 self
2285 }
2286
2287 pub fn allow_parallel(mut self, allow: bool) -> Self {
2289 self.allow_parallel = allow;
2290 self
2291 }
2292
2293 pub fn force_parallel(mut self, force: bool) -> Self {
2295 self.force_parallel = force;
2296 self
2297 }
2298
2299 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2301 self.nulls_equal = nulls_equal;
2302 self
2303 }
2304
2305 pub fn suffix<S>(mut self, suffix: S) -> Self
2308 where
2309 S: Into<PlSmallStr>,
2310 {
2311 self.suffix = Some(suffix.into());
2312 self
2313 }
2314
2315 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2317 self.coalesce = coalesce;
2318 self
2319 }
2320
2321 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2323 self.maintain_order = maintain_order;
2324 self
2325 }
2326
2327 pub fn finish(self) -> LazyFrame {
2329 let opt_state = self.lf.opt_state;
2330 let other = self.other.expect("'with' not set in join builder");
2331
2332 let args = JoinArgs {
2333 how: self.how,
2334 validation: self.validation,
2335 suffix: self.suffix,
2336 slice: None,
2337 nulls_equal: self.nulls_equal,
2338 coalesce: self.coalesce,
2339 maintain_order: self.maintain_order,
2340 };
2341
2342 let lp = self
2343 .lf
2344 .get_plan_builder()
2345 .join(
2346 other.logical_plan,
2347 self.left_on,
2348 self.right_on,
2349 JoinOptions {
2350 allow_parallel: self.allow_parallel,
2351 force_parallel: self.force_parallel,
2352 args,
2353 }
2354 .into(),
2355 )
2356 .build();
2357 LazyFrame::from_logical_plan(lp, opt_state)
2358 }
2359
2360 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2362 let opt_state = self.lf.opt_state;
2363 let other = self.other.expect("with not set");
2364
2365 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2367 if let Expr::BinaryExpr {
2368 op: Operator::And,
2369 left,
2370 right,
2371 } = predicate
2372 {
2373 decompose_and((*left).clone(), expanded_predicates);
2374 decompose_and((*right).clone(), expanded_predicates);
2375 } else {
2376 expanded_predicates.push(predicate);
2377 }
2378 }
2379 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2380 for predicate in predicates {
2381 decompose_and(predicate, &mut expanded_predicates);
2382 }
2383 let predicates: Vec<Expr> = expanded_predicates;
2384
2385 #[cfg(feature = "is_between")]
2387 let predicates: Vec<Expr> = {
2388 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2389 for predicate in predicates {
2390 if let Expr::Function {
2391 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2392 input,
2393 ..
2394 } = &predicate
2395 {
2396 if let [expr, lower, upper] = input.as_slice() {
2397 match closed {
2398 ClosedInterval::Both => {
2399 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2400 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2401 },
2402 ClosedInterval::Right => {
2403 expanded_predicates.push(expr.clone().gt(lower.clone()));
2404 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2405 },
2406 ClosedInterval::Left => {
2407 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2408 expanded_predicates.push(expr.clone().lt(upper.clone()));
2409 },
2410 ClosedInterval::None => {
2411 expanded_predicates.push(expr.clone().gt(lower.clone()));
2412 expanded_predicates.push(expr.clone().lt(upper.clone()));
2413 },
2414 }
2415 continue;
2416 }
2417 }
2418 expanded_predicates.push(predicate);
2419 }
2420 expanded_predicates
2421 };
2422
2423 let args = JoinArgs {
2424 how: self.how,
2425 validation: self.validation,
2426 suffix: self.suffix,
2427 slice: None,
2428 nulls_equal: self.nulls_equal,
2429 coalesce: self.coalesce,
2430 maintain_order: self.maintain_order,
2431 };
2432 let options = JoinOptions {
2433 allow_parallel: self.allow_parallel,
2434 force_parallel: self.force_parallel,
2435 args,
2436 };
2437
2438 let lp = DslPlan::Join {
2439 input_left: Arc::new(self.lf.logical_plan),
2440 input_right: Arc::new(other.logical_plan),
2441 left_on: Default::default(),
2442 right_on: Default::default(),
2443 predicates,
2444 options: Arc::from(options),
2445 };
2446
2447 LazyFrame::from_logical_plan(lp, opt_state)
2448 }
2449}
2450
2451pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2452 #[cfg(not(feature = "new_streaming"))]
2453 {
2454 None
2455 }
2456 #[cfg(feature = "new_streaming")]
2457 {
2458 Some(streaming_dispatch::build_streaming_query_executor)
2459 }
2460};
2461#[cfg(feature = "new_streaming")]
2462pub use streaming_dispatch::build_streaming_query_executor;
2463
2464#[cfg(feature = "new_streaming")]
2465mod streaming_dispatch {
2466 use std::sync::{Arc, Mutex};
2467
2468 use polars_core::POOL;
2469 use polars_core::error::PolarsResult;
2470 use polars_core::frame::DataFrame;
2471 use polars_expr::state::ExecutionState;
2472 use polars_mem_engine::Executor;
2473 use polars_plan::dsl::SinkTypeIR;
2474 use polars_plan::plans::{AExpr, IR};
2475 use polars_utils::arena::{Arena, Node};
2476
2477 pub fn build_streaming_query_executor(
2478 node: Node,
2479 ir_arena: &mut Arena<IR>,
2480 expr_arena: &mut Arena<AExpr>,
2481 ) -> PolarsResult<Box<dyn Executor>> {
2482 let rechunk = match ir_arena.get(node) {
2483 IR::Scan {
2484 unified_scan_args, ..
2485 } => unified_scan_args.rechunk,
2486 _ => false,
2487 };
2488
2489 let node = match ir_arena.get(node) {
2490 IR::SinkMultiple { .. } => panic!("SinkMultiple not supported"),
2491 IR::Sink { .. } => node,
2492 _ => ir_arena.add(IR::Sink {
2493 input: node,
2494 payload: SinkTypeIR::Memory,
2495 }),
2496 };
2497
2498 polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2499 .map(Some)
2500 .map(Mutex::new)
2501 .map(Arc::new)
2502 .map(|x| StreamingQueryExecutor {
2503 executor: x,
2504 rechunk,
2505 })
2506 .map(|x| Box::new(x) as Box<dyn Executor>)
2507 }
2508
2509 struct StreamingQueryExecutor {
2511 executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2512 rechunk: bool,
2513 }
2514
2515 impl Executor for StreamingQueryExecutor {
2516 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2517 assert!(POOL.current_thread_index().is_none());
2519
2520 let mut df = { self.executor.try_lock().unwrap().take() }
2521 .expect("unhandled: execute() more than once")
2522 .execute()
2523 .map(|x| x.unwrap_single())?;
2524
2525 if self.rechunk {
2526 df.as_single_chunk_par();
2527 }
2528
2529 Ok(df)
2530 }
2531 }
2532}