1#[cfg(feature = "python")]
3mod python;
4
5mod cached_arenas;
6mod err;
7#[cfg(not(target_arch = "wasm32"))]
8mod exitable;
9#[cfg(feature = "pivot")]
10pub mod pivot;
11
12use std::num::NonZeroUsize;
13use std::sync::{Arc, Mutex};
14
15pub use anonymous_scan::*;
16#[cfg(feature = "csv")]
17pub use csv::*;
18#[cfg(not(target_arch = "wasm32"))]
19pub use exitable::*;
20pub use file_list_reader::*;
21#[cfg(feature = "json")]
22pub use ndjson::*;
23#[cfg(feature = "parquet")]
24pub use parquet::*;
25use polars_compute::rolling::QuantileMethod;
26use polars_core::POOL;
27use polars_core::error::feature_gated;
28use polars_core::prelude::*;
29use polars_io::RowIndex;
30use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
31use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
32use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
33#[cfg(feature = "is_between")]
34use polars_ops::prelude::ClosedInterval;
35pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
36use polars_utils::pl_str::PlSmallStr;
37use polars_utils::plpath::PlPath;
38use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
39
40use crate::frame::cached_arenas::CachedArena;
41use crate::prelude::*;
42
43pub trait IntoLazy {
44 fn lazy(self) -> LazyFrame;
45}
46
47impl IntoLazy for DataFrame {
48 fn lazy(self) -> LazyFrame {
50 let lp = DslBuilder::from_existing_df(self).build();
51 LazyFrame {
52 logical_plan: lp,
53 opt_state: Default::default(),
54 cached_arena: Default::default(),
55 }
56 }
57}
58
59impl IntoLazy for LazyFrame {
60 fn lazy(self) -> LazyFrame {
61 self
62 }
63}
64
65#[derive(Clone, Default)]
70#[must_use]
71pub struct LazyFrame {
72 pub logical_plan: DslPlan,
73 pub(crate) opt_state: OptFlags,
74 pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
75}
76
77impl From<DslPlan> for LazyFrame {
78 fn from(plan: DslPlan) -> Self {
79 Self {
80 logical_plan: plan,
81 opt_state: OptFlags::default(),
82 cached_arena: Default::default(),
83 }
84 }
85}
86
87impl LazyFrame {
88 pub(crate) fn from_inner(
89 logical_plan: DslPlan,
90 opt_state: OptFlags,
91 cached_arena: Arc<Mutex<Option<CachedArena>>>,
92 ) -> Self {
93 Self {
94 logical_plan,
95 opt_state,
96 cached_arena,
97 }
98 }
99
100 pub(crate) fn get_plan_builder(self) -> DslBuilder {
101 DslBuilder::from(self.logical_plan)
102 }
103
104 fn get_opt_state(&self) -> OptFlags {
105 self.opt_state
106 }
107
108 fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
109 LazyFrame {
110 logical_plan,
111 opt_state,
112 cached_arena: Default::default(),
113 }
114 }
115
116 pub fn get_current_optimizations(&self) -> OptFlags {
118 self.opt_state
119 }
120
121 pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
123 self.opt_state = opt_state;
124 self
125 }
126
127 pub fn without_optimizations(self) -> Self {
129 self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
130 }
131
132 pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
134 self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
135 self
136 }
137
138 pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
140 self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
141 self
142 }
143
144 pub fn with_check_order(mut self, toggle: bool) -> Self {
147 self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
148 self
149 }
150
151 pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
153 self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
154 self
155 }
156
157 pub fn with_type_coercion(mut self, toggle: bool) -> Self {
159 self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
160 self
161 }
162
163 pub fn with_type_check(mut self, toggle: bool) -> Self {
165 self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
166 self
167 }
168
169 pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
171 self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
172 self
173 }
174
175 #[cfg(feature = "cse")]
177 pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
178 self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
179 self
180 }
181
182 #[cfg(feature = "cse")]
184 pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
185 self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
186 self
187 }
188
189 pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
191 self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
192 self
193 }
194
195 #[cfg(feature = "new_streaming")]
196 pub fn with_new_streaming(mut self, toggle: bool) -> Self {
197 self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
198 self
199 }
200
201 pub fn with_row_estimate(mut self, toggle: bool) -> Self {
203 self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
204 self
205 }
206
207 pub fn _with_eager(mut self, toggle: bool) -> Self {
209 self.opt_state.set(OptFlags::EAGER, toggle);
210 self
211 }
212
213 pub fn describe_plan(&self) -> PolarsResult<String> {
215 Ok(self.clone().to_alp()?.describe())
216 }
217
218 pub fn describe_plan_tree(&self) -> PolarsResult<String> {
220 Ok(self.clone().to_alp()?.describe_tree_format())
221 }
222
223 pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
227 Ok(self.clone().to_alp_optimized()?.describe())
228 }
229
230 pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
234 Ok(self.clone().to_alp_optimized()?.describe_tree_format())
235 }
236
237 pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
242 if optimized {
243 self.describe_optimized_plan()
244 } else {
245 self.describe_plan()
246 }
247 }
248
249 pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
289 let opt_state = self.get_opt_state();
290 let lp = self
291 .get_plan_builder()
292 .sort(by.into_vec().into_iter().map(col).collect(), sort_options)
293 .build();
294 Self::from_logical_plan(lp, opt_state)
295 }
296
297 pub fn sort_by_exprs<E: AsRef<[Expr]>>(
317 self,
318 by_exprs: E,
319 sort_options: SortMultipleOptions,
320 ) -> Self {
321 let by_exprs = by_exprs.as_ref().to_vec();
322 if by_exprs.is_empty() {
323 self
324 } else {
325 let opt_state = self.get_opt_state();
326 let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
327 Self::from_logical_plan(lp, opt_state)
328 }
329 }
330
331 pub fn top_k<E: AsRef<[Expr]>>(
332 self,
333 k: IdxSize,
334 by_exprs: E,
335 sort_options: SortMultipleOptions,
336 ) -> Self {
337 self.sort_by_exprs(
339 by_exprs,
340 sort_options.with_order_reversed().with_nulls_last(true),
341 )
342 .slice(0, k)
343 }
344
345 pub fn bottom_k<E: AsRef<[Expr]>>(
346 self,
347 k: IdxSize,
348 by_exprs: E,
349 sort_options: SortMultipleOptions,
350 ) -> Self {
351 self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
353 .slice(0, k)
354 }
355
356 pub fn reverse(self) -> Self {
372 self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
373 }
374
375 pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
383 where
384 I: IntoIterator<Item = T>,
385 J: IntoIterator<Item = S>,
386 T: AsRef<str>,
387 S: AsRef<str>,
388 {
389 let iter = existing.into_iter();
390 let cap = iter.size_hint().0;
391 let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
392 let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
393
394 for (existing, new) in iter.zip(new) {
397 let existing = existing.as_ref();
398 let new = new.as_ref();
399 if new != existing {
400 existing_vec.push(existing.into());
401 new_vec.push(new.into());
402 }
403 }
404
405 self.map_private(DslFunction::Rename {
406 existing: existing_vec.into(),
407 new: new_vec.into(),
408 strict,
409 })
410 }
411
412 pub fn drop(self, columns: Selector) -> Self {
419 let opt_state = self.get_opt_state();
420 let lp = self.get_plan_builder().drop(columns).build();
421 Self::from_logical_plan(lp, opt_state)
422 }
423
424 pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
429 self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
430 }
431
432 pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
437 self.select(vec![
438 col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
439 ])
440 }
441
442 pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
444 let opt_state = self.get_opt_state();
445 let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
446 Self::from_logical_plan(lp, opt_state)
447 }
448
449 pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
451 let opt_state = self.get_opt_state();
452 let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
453 Self::from_logical_plan(lp, opt_state)
454 }
455
456 pub fn cache(self) -> Self {
460 let opt_state = self.get_opt_state();
461 let lp = self.get_plan_builder().cache().build();
462 Self::from_logical_plan(lp, opt_state)
463 }
464
465 pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
467 let cast_cols: Vec<Expr> = dtypes
468 .into_iter()
469 .map(|(name, dt)| {
470 let name = PlSmallStr::from_str(name);
471
472 if strict {
473 col(name).strict_cast(dt)
474 } else {
475 col(name).cast(dt)
476 }
477 })
478 .collect();
479
480 if cast_cols.is_empty() {
481 self
482 } else {
483 self.with_columns(cast_cols)
484 }
485 }
486
487 pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
489 self.with_columns(vec![if strict {
490 col(PlSmallStr::from_static("*")).strict_cast(dtype)
491 } else {
492 col(PlSmallStr::from_static("*")).cast(dtype)
493 }])
494 }
495
496 pub fn optimize(
497 self,
498 lp_arena: &mut Arena<IR>,
499 expr_arena: &mut Arena<AExpr>,
500 ) -> PolarsResult<Node> {
501 self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
502 }
503
504 pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
505 let (mut lp_arena, mut expr_arena) = self.get_arenas();
506 let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
507
508 Ok(IRPlan::new(node, lp_arena, expr_arena))
509 }
510
511 pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
512 let (mut lp_arena, mut expr_arena) = self.get_arenas();
513 let node = to_alp(
514 self.logical_plan,
515 &mut expr_arena,
516 &mut lp_arena,
517 &mut self.opt_state,
518 )?;
519 let plan = IRPlan::new(node, lp_arena, expr_arena);
520 Ok(plan)
521 }
522
523 pub(crate) fn optimize_with_scratch(
524 self,
525 lp_arena: &mut Arena<IR>,
526 expr_arena: &mut Arena<AExpr>,
527 scratch: &mut Vec<Node>,
528 ) -> PolarsResult<Node> {
529 #[allow(unused_mut)]
530 let mut opt_state = self.opt_state;
531 let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
532
533 #[cfg(feature = "cse")]
534 if new_streaming {
535 opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
538 }
539
540 let lp_top = optimize(
541 self.logical_plan,
542 opt_state,
543 lp_arena,
544 expr_arena,
545 scratch,
546 apply_scan_predicate_to_scan_ir,
547 )?;
548
549 Ok(lp_top)
550 }
551
552 fn prepare_collect_post_opt<P>(
553 mut self,
554 check_sink: bool,
555 query_start: Option<std::time::Instant>,
556 post_opt: P,
557 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
558 where
559 P: FnOnce(
560 Node,
561 &mut Arena<IR>,
562 &mut Arena<AExpr>,
563 Option<std::time::Duration>,
564 ) -> PolarsResult<()>,
565 {
566 let (mut lp_arena, mut expr_arena) = self.get_arenas();
567
568 let mut scratch = vec![];
569 let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
570
571 post_opt(
572 lp_top,
573 &mut lp_arena,
574 &mut expr_arena,
575 query_start.map(|s| s.elapsed()),
578 )?;
579
580 let no_file_sink = if check_sink {
582 !matches!(
583 lp_arena.get(lp_top),
584 IR::Sink {
585 payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
586 ..
587 }
588 )
589 } else {
590 true
591 };
592 let physical_plan = create_physical_plan(
593 lp_top,
594 &mut lp_arena,
595 &mut expr_arena,
596 BUILD_STREAMING_EXECUTOR,
597 )?;
598
599 let state = ExecutionState::new();
600 Ok((state, physical_plan, no_file_sink))
601 }
602
603 pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
605 where
606 P: FnOnce(
607 Node,
608 &mut Arena<IR>,
609 &mut Arena<AExpr>,
610 Option<std::time::Duration>,
611 ) -> PolarsResult<()>,
612 {
613 let (mut state, mut physical_plan, _) =
614 self.prepare_collect_post_opt(false, None, post_opt)?;
615 physical_plan.execute(&mut state)
616 }
617
618 #[allow(unused_mut)]
619 fn prepare_collect(
620 self,
621 check_sink: bool,
622 query_start: Option<std::time::Instant>,
623 ) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
624 self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
625 }
626
627 pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
632 let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
633 payload.clone()
634 } else {
635 self.logical_plan = DslPlan::Sink {
636 input: Arc::new(self.logical_plan),
637 payload: SinkType::Memory,
638 };
639 SinkType::Memory
640 };
641
642 if engine == Engine::Auto {
644 engine = match payload {
645 #[cfg(feature = "new_streaming")]
646 SinkType::Callback { .. } | SinkType::File { .. } | SinkType::Partition { .. } => {
647 Engine::Streaming
648 },
649 _ => Engine::InMemory,
650 };
651 }
652 if engine == Engine::Gpu {
654 engine = Engine::InMemory;
655 }
656
657 #[cfg(feature = "new_streaming")]
658 {
659 if let Some(result) = self.try_new_streaming_if_requested() {
660 return result.map(|v| v.unwrap_single());
661 }
662 }
663
664 match engine {
665 Engine::Auto => unreachable!(),
666 Engine::Streaming => {
667 feature_gated!("new_streaming", self = self.with_new_streaming(true))
668 },
669 _ => {},
670 }
671 let mut alp_plan = self.clone().to_alp_optimized()?;
672
673 match engine {
674 Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
675 let result = polars_stream::run_query(
676 alp_plan.lp_top,
677 &mut alp_plan.lp_arena,
678 &mut alp_plan.expr_arena,
679 );
680 result.map(|v| v.unwrap_single())
681 }),
682 Engine::Gpu => {
683 Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
684 },
685 Engine::InMemory => {
686 let mut physical_plan = create_physical_plan(
687 alp_plan.lp_top,
688 &mut alp_plan.lp_arena,
689 &mut alp_plan.expr_arena,
690 BUILD_STREAMING_EXECUTOR,
691 )?;
692 let mut state = ExecutionState::new();
693 physical_plan.execute(&mut state)
694 },
695 }
696 }
697
698 pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
699 let sink_multiple = LazyFrame {
700 logical_plan: DslPlan::SinkMultiple { inputs: plans },
701 opt_state,
702 cached_arena: Default::default(),
703 };
704 sink_multiple.explain(true)
705 }
706
707 pub fn collect_all_with_engine(
708 plans: Vec<DslPlan>,
709 mut engine: Engine,
710 opt_state: OptFlags,
711 ) -> PolarsResult<Vec<DataFrame>> {
712 if plans.is_empty() {
713 return Ok(Vec::new());
714 }
715
716 if engine == Engine::Auto {
718 engine = Engine::InMemory;
719 }
720 if engine == Engine::Gpu {
722 engine = Engine::InMemory;
723 }
724
725 let mut sink_multiple = LazyFrame {
726 logical_plan: DslPlan::SinkMultiple { inputs: plans },
727 opt_state,
728 cached_arena: Default::default(),
729 };
730
731 #[cfg(feature = "new_streaming")]
732 {
733 if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
734 return result.map(|v| v.unwrap_multiple());
735 }
736 }
737
738 match engine {
739 Engine::Auto => unreachable!(),
740 Engine::Streaming => {
741 feature_gated!(
742 "new_streaming",
743 sink_multiple = sink_multiple.with_new_streaming(true)
744 )
745 },
746 _ => {},
747 }
748 let mut alp_plan = sink_multiple.to_alp_optimized()?;
749
750 if engine == Engine::Streaming {
751 feature_gated!("new_streaming", {
752 let result = polars_stream::run_query(
753 alp_plan.lp_top,
754 &mut alp_plan.lp_arena,
755 &mut alp_plan.expr_arena,
756 );
757 return result.map(|v| v.unwrap_multiple());
758 });
759 }
760
761 let IR::SinkMultiple { inputs } = alp_plan.root() else {
762 unreachable!()
763 };
764
765 let mut multiplan = create_multiple_physical_plans(
766 inputs.clone().as_slice(),
767 &mut alp_plan.lp_arena,
768 &mut alp_plan.expr_arena,
769 BUILD_STREAMING_EXECUTOR,
770 )?;
771
772 match engine {
773 Engine::Gpu => polars_bail!(
774 InvalidOperation: "collect_all is not supported for the gpu engine"
775 ),
776 Engine::InMemory => {
777 let mut state = ExecutionState::new();
781 if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
782 cache_prefiller.execute(&mut state)?;
783 }
784 let out = POOL.install(|| {
785 multiplan
786 .physical_plans
787 .chunks_mut(POOL.current_num_threads() * 3)
788 .map(|chunk| {
789 chunk
790 .into_par_iter()
791 .enumerate()
792 .map(|(idx, input)| {
793 let mut input = std::mem::take(input);
794 let mut state = state.split();
795 state.branch_idx += idx;
796
797 let df = input.execute(&mut state)?;
798 Ok(df)
799 })
800 .collect::<PolarsResult<Vec<_>>>()
801 })
802 .collect::<PolarsResult<Vec<_>>>()
803 });
804 Ok(out?.into_iter().flatten().collect())
805 },
806 _ => unreachable!(),
807 }
808 }
809
810 pub fn collect(self) -> PolarsResult<DataFrame> {
828 self.collect_with_engine(Engine::InMemory)
829 }
830
831 pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
834 where
835 P: FnOnce(
836 Node,
837 &mut Arena<IR>,
838 &mut Arena<AExpr>,
839 Option<std::time::Duration>,
840 ) -> PolarsResult<()>,
841 {
842 let query_start = std::time::Instant::now();
843 let (mut state, mut physical_plan, _) =
844 self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
845 state.time_nodes(query_start);
846 let out = physical_plan.execute(&mut state)?;
847 let timer_df = state.finish_timer()?;
848 Ok((out, timer_df))
849 }
850
851 pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
859 self._profile_post_opt(|_, _, _, _| Ok(()))
860 }
861
862 #[cfg(feature = "parquet")]
866 pub fn sink_parquet(
867 self,
868 target: SinkTarget,
869 options: ParquetWriteOptions,
870 cloud_options: Option<polars_io::cloud::CloudOptions>,
871 sink_options: SinkOptions,
872 ) -> PolarsResult<Self> {
873 self.sink(SinkType::File(FileSinkType {
874 target,
875 sink_options,
876 file_type: FileType::Parquet(options),
877 cloud_options,
878 }))
879 }
880
881 #[cfg(feature = "ipc")]
885 pub fn sink_ipc(
886 self,
887 target: SinkTarget,
888 options: IpcWriterOptions,
889 cloud_options: Option<polars_io::cloud::CloudOptions>,
890 sink_options: SinkOptions,
891 ) -> PolarsResult<Self> {
892 self.sink(SinkType::File(FileSinkType {
893 target,
894 sink_options,
895 file_type: FileType::Ipc(options),
896 cloud_options,
897 }))
898 }
899
900 #[cfg(feature = "csv")]
904 pub fn sink_csv(
905 self,
906 target: SinkTarget,
907 options: CsvWriterOptions,
908 cloud_options: Option<polars_io::cloud::CloudOptions>,
909 sink_options: SinkOptions,
910 ) -> PolarsResult<Self> {
911 self.sink(SinkType::File(FileSinkType {
912 target,
913 sink_options,
914 file_type: FileType::Csv(options),
915 cloud_options,
916 }))
917 }
918
919 #[cfg(feature = "json")]
923 pub fn sink_json(
924 self,
925 target: SinkTarget,
926 options: JsonWriterOptions,
927 cloud_options: Option<polars_io::cloud::CloudOptions>,
928 sink_options: SinkOptions,
929 ) -> PolarsResult<Self> {
930 self.sink(SinkType::File(FileSinkType {
931 target,
932 sink_options,
933 file_type: FileType::Json(options),
934 cloud_options,
935 }))
936 }
937
938 #[cfg(feature = "parquet")]
942 #[allow(clippy::too_many_arguments)]
943 pub fn sink_parquet_partitioned(
944 self,
945 base_path: Arc<PlPath>,
946 file_path_cb: Option<PartitionTargetCallback>,
947 variant: PartitionVariant,
948 options: ParquetWriteOptions,
949 cloud_options: Option<polars_io::cloud::CloudOptions>,
950 sink_options: SinkOptions,
951 per_partition_sort_by: Option<Vec<SortColumn>>,
952 finish_callback: Option<SinkFinishCallback>,
953 ) -> PolarsResult<Self> {
954 self.sink(SinkType::Partition(PartitionSinkType {
955 base_path,
956 file_path_cb,
957 sink_options,
958 variant,
959 file_type: FileType::Parquet(options),
960 cloud_options,
961 per_partition_sort_by,
962 finish_callback,
963 }))
964 }
965
966 #[cfg(feature = "ipc")]
970 #[allow(clippy::too_many_arguments)]
971 pub fn sink_ipc_partitioned(
972 self,
973 base_path: Arc<PlPath>,
974 file_path_cb: Option<PartitionTargetCallback>,
975 variant: PartitionVariant,
976 options: IpcWriterOptions,
977 cloud_options: Option<polars_io::cloud::CloudOptions>,
978 sink_options: SinkOptions,
979 per_partition_sort_by: Option<Vec<SortColumn>>,
980 finish_callback: Option<SinkFinishCallback>,
981 ) -> PolarsResult<Self> {
982 self.sink(SinkType::Partition(PartitionSinkType {
983 base_path,
984 file_path_cb,
985 sink_options,
986 variant,
987 file_type: FileType::Ipc(options),
988 cloud_options,
989 per_partition_sort_by,
990 finish_callback,
991 }))
992 }
993
994 #[cfg(feature = "csv")]
998 #[allow(clippy::too_many_arguments)]
999 pub fn sink_csv_partitioned(
1000 self,
1001 base_path: Arc<PlPath>,
1002 file_path_cb: Option<PartitionTargetCallback>,
1003 variant: PartitionVariant,
1004 options: CsvWriterOptions,
1005 cloud_options: Option<polars_io::cloud::CloudOptions>,
1006 sink_options: SinkOptions,
1007 per_partition_sort_by: Option<Vec<SortColumn>>,
1008 finish_callback: Option<SinkFinishCallback>,
1009 ) -> PolarsResult<Self> {
1010 self.sink(SinkType::Partition(PartitionSinkType {
1011 base_path,
1012 file_path_cb,
1013 sink_options,
1014 variant,
1015 file_type: FileType::Csv(options),
1016 cloud_options,
1017 per_partition_sort_by,
1018 finish_callback,
1019 }))
1020 }
1021
1022 #[cfg(feature = "json")]
1026 #[allow(clippy::too_many_arguments)]
1027 pub fn sink_json_partitioned(
1028 self,
1029 base_path: Arc<PlPath>,
1030 file_path_cb: Option<PartitionTargetCallback>,
1031 variant: PartitionVariant,
1032 options: JsonWriterOptions,
1033 cloud_options: Option<polars_io::cloud::CloudOptions>,
1034 sink_options: SinkOptions,
1035 per_partition_sort_by: Option<Vec<SortColumn>>,
1036 finish_callback: Option<SinkFinishCallback>,
1037 ) -> PolarsResult<Self> {
1038 self.sink(SinkType::Partition(PartitionSinkType {
1039 base_path,
1040 file_path_cb,
1041 sink_options,
1042 variant,
1043 file_type: FileType::Json(options),
1044 cloud_options,
1045 per_partition_sort_by,
1046 finish_callback,
1047 }))
1048 }
1049
1050 pub fn sink_batches(
1051 self,
1052 function: PlanCallback<DataFrame, bool>,
1053 maintain_order: bool,
1054 chunk_size: Option<NonZeroUsize>,
1055 ) -> PolarsResult<Self> {
1056 self.sink(SinkType::Callback(CallbackSinkType {
1057 function,
1058 maintain_order,
1059 chunk_size,
1060 }))
1061 }
1062
1063 #[cfg(feature = "new_streaming")]
1064 pub fn try_new_streaming_if_requested(
1065 &mut self,
1066 ) -> Option<PolarsResult<polars_stream::QueryResult>> {
1067 let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1068 let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1069
1070 if auto_new_streaming || force_new_streaming {
1071 let mut new_stream_lazy = self.clone();
1074 new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1075 let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1076 Ok(v) => v,
1077 Err(e) => return Some(Err(e)),
1078 };
1079
1080 let f = || {
1081 polars_stream::run_query(
1082 alp_plan.lp_top,
1083 &mut alp_plan.lp_arena,
1084 &mut alp_plan.expr_arena,
1085 )
1086 };
1087
1088 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1089 Ok(v) => return Some(v),
1090 Err(e) => {
1091 if !force_new_streaming
1094 && auto_new_streaming
1095 && e.downcast_ref::<&str>()
1096 .map(|s| s.starts_with("not yet implemented"))
1097 .unwrap_or(false)
1098 {
1099 if polars_core::config::verbose() {
1100 eprintln!(
1101 "caught unimplemented error in new streaming engine, falling back to normal engine"
1102 );
1103 }
1104 } else {
1105 std::panic::resume_unwind(e);
1106 }
1107 },
1108 }
1109 }
1110
1111 None
1112 }
1113
1114 fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1115 polars_ensure!(
1116 !matches!(self.logical_plan, DslPlan::Sink { .. }),
1117 InvalidOperation: "cannot create a sink on top of another sink"
1118 );
1119 self.logical_plan = DslPlan::Sink {
1120 input: Arc::new(self.logical_plan),
1121 payload,
1122 };
1123 Ok(self)
1124 }
1125
1126 pub fn filter(self, predicate: Expr) -> Self {
1144 let opt_state = self.get_opt_state();
1145 let lp = self.get_plan_builder().filter(predicate).build();
1146 Self::from_logical_plan(lp, opt_state)
1147 }
1148
1149 pub fn remove(self, predicate: Expr) -> Self {
1167 self.filter(predicate.neq_missing(lit(true)))
1168 }
1169
1170 pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1196 let exprs = exprs.as_ref().to_vec();
1197 self.select_impl(
1198 exprs,
1199 ProjectionOptions {
1200 run_parallel: true,
1201 duplicate_check: true,
1202 should_broadcast: true,
1203 },
1204 )
1205 }
1206
1207 pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1208 let exprs = exprs.as_ref().to_vec();
1209 self.select_impl(
1210 exprs,
1211 ProjectionOptions {
1212 run_parallel: false,
1213 duplicate_check: true,
1214 should_broadcast: true,
1215 },
1216 )
1217 }
1218
1219 fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1220 let opt_state = self.get_opt_state();
1221 let lp = self.get_plan_builder().project(exprs, options).build();
1222 Self::from_logical_plan(lp, opt_state)
1223 }
1224
1225 pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1246 let keys = by
1247 .as_ref()
1248 .iter()
1249 .map(|e| e.clone().into())
1250 .collect::<Vec<_>>();
1251 let opt_state = self.get_opt_state();
1252
1253 #[cfg(feature = "dynamic_group_by")]
1254 {
1255 LazyGroupBy {
1256 logical_plan: self.logical_plan,
1257 opt_state,
1258 keys,
1259 maintain_order: false,
1260 dynamic_options: None,
1261 rolling_options: None,
1262 }
1263 }
1264
1265 #[cfg(not(feature = "dynamic_group_by"))]
1266 {
1267 LazyGroupBy {
1268 logical_plan: self.logical_plan,
1269 opt_state,
1270 keys,
1271 maintain_order: false,
1272 }
1273 }
1274 }
1275
1276 #[cfg(feature = "dynamic_group_by")]
1284 pub fn rolling<E: AsRef<[Expr]>>(
1285 mut self,
1286 index_column: Expr,
1287 group_by: E,
1288 mut options: RollingGroupOptions,
1289 ) -> LazyGroupBy {
1290 if let Expr::Column(name) = index_column {
1291 options.index_column = name;
1292 } else {
1293 let output_field = index_column
1294 .to_field(&self.collect_schema().unwrap())
1295 .unwrap();
1296 return self.with_column(index_column).rolling(
1297 Expr::Column(output_field.name().clone()),
1298 group_by,
1299 options,
1300 );
1301 }
1302 let opt_state = self.get_opt_state();
1303 LazyGroupBy {
1304 logical_plan: self.logical_plan,
1305 opt_state,
1306 keys: group_by.as_ref().to_vec(),
1307 maintain_order: true,
1308 dynamic_options: None,
1309 rolling_options: Some(options),
1310 }
1311 }
1312
1313 #[cfg(feature = "dynamic_group_by")]
1329 pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1330 mut self,
1331 index_column: Expr,
1332 group_by: E,
1333 mut options: DynamicGroupOptions,
1334 ) -> LazyGroupBy {
1335 if let Expr::Column(name) = index_column {
1336 options.index_column = name;
1337 } else {
1338 let output_field = index_column
1339 .to_field(&self.collect_schema().unwrap())
1340 .unwrap();
1341 return self.with_column(index_column).group_by_dynamic(
1342 Expr::Column(output_field.name().clone()),
1343 group_by,
1344 options,
1345 );
1346 }
1347 let opt_state = self.get_opt_state();
1348 LazyGroupBy {
1349 logical_plan: self.logical_plan,
1350 opt_state,
1351 keys: group_by.as_ref().to_vec(),
1352 maintain_order: true,
1353 dynamic_options: Some(options),
1354 rolling_options: None,
1355 }
1356 }
1357
1358 pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1360 let keys = by
1361 .as_ref()
1362 .iter()
1363 .map(|e| e.clone().into())
1364 .collect::<Vec<_>>();
1365 let opt_state = self.get_opt_state();
1366
1367 #[cfg(feature = "dynamic_group_by")]
1368 {
1369 LazyGroupBy {
1370 logical_plan: self.logical_plan,
1371 opt_state,
1372 keys,
1373 maintain_order: true,
1374 dynamic_options: None,
1375 rolling_options: None,
1376 }
1377 }
1378
1379 #[cfg(not(feature = "dynamic_group_by"))]
1380 {
1381 LazyGroupBy {
1382 logical_plan: self.logical_plan,
1383 opt_state,
1384 keys,
1385 maintain_order: true,
1386 }
1387 }
1388 }
1389
1390 #[cfg(feature = "semi_anti_join")]
1407 pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1408 self.join(
1409 other,
1410 [left_on.into()],
1411 [right_on.into()],
1412 JoinArgs::new(JoinType::Anti),
1413 )
1414 }
1415
1416 #[cfg(feature = "cross_join")]
1418 pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1419 self.join(
1420 other,
1421 vec![],
1422 vec![],
1423 JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1424 )
1425 }
1426
1427 pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1444 self.join(
1445 other,
1446 [left_on.into()],
1447 [right_on.into()],
1448 JoinArgs::new(JoinType::Left),
1449 )
1450 }
1451
1452 pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1469 self.join(
1470 other,
1471 [left_on.into()],
1472 [right_on.into()],
1473 JoinArgs::new(JoinType::Inner),
1474 )
1475 }
1476
1477 pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1494 self.join(
1495 other,
1496 [left_on.into()],
1497 [right_on.into()],
1498 JoinArgs::new(JoinType::Full),
1499 )
1500 }
1501
1502 #[cfg(feature = "semi_anti_join")]
1519 pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1520 self.join(
1521 other,
1522 [left_on.into()],
1523 [right_on.into()],
1524 JoinArgs::new(JoinType::Semi),
1525 )
1526 }
1527
1528 pub fn join<E: AsRef<[Expr]>>(
1550 self,
1551 other: LazyFrame,
1552 left_on: E,
1553 right_on: E,
1554 args: JoinArgs,
1555 ) -> LazyFrame {
1556 let left_on = left_on.as_ref().to_vec();
1557 let right_on = right_on.as_ref().to_vec();
1558
1559 self._join_impl(other, left_on, right_on, args)
1560 }
1561
1562 fn _join_impl(
1563 self,
1564 other: LazyFrame,
1565 left_on: Vec<Expr>,
1566 right_on: Vec<Expr>,
1567 args: JoinArgs,
1568 ) -> LazyFrame {
1569 let JoinArgs {
1570 how,
1571 validation,
1572 suffix,
1573 slice,
1574 nulls_equal,
1575 coalesce,
1576 maintain_order,
1577 } = args;
1578
1579 if slice.is_some() {
1580 panic!("impl error: slice is not handled")
1581 }
1582
1583 let mut builder = self
1584 .join_builder()
1585 .with(other)
1586 .left_on(left_on)
1587 .right_on(right_on)
1588 .how(how)
1589 .validate(validation)
1590 .join_nulls(nulls_equal)
1591 .coalesce(coalesce)
1592 .maintain_order(maintain_order);
1593
1594 if let Some(suffix) = suffix {
1595 builder = builder.suffix(suffix);
1596 }
1597
1598 builder.finish()
1600 }
1601
1602 pub fn join_builder(self) -> JoinBuilder {
1608 JoinBuilder::new(self)
1609 }
1610
1611 pub fn with_column(self, expr: Expr) -> LazyFrame {
1629 let opt_state = self.get_opt_state();
1630 let lp = self
1631 .get_plan_builder()
1632 .with_columns(
1633 vec![expr],
1634 ProjectionOptions {
1635 run_parallel: false,
1636 duplicate_check: true,
1637 should_broadcast: true,
1638 },
1639 )
1640 .build();
1641 Self::from_logical_plan(lp, opt_state)
1642 }
1643
1644 pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1659 let exprs = exprs.as_ref().to_vec();
1660 self.with_columns_impl(
1661 exprs,
1662 ProjectionOptions {
1663 run_parallel: true,
1664 duplicate_check: true,
1665 should_broadcast: true,
1666 },
1667 )
1668 }
1669
1670 pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1672 let exprs = exprs.as_ref().to_vec();
1673 self.with_columns_impl(
1674 exprs,
1675 ProjectionOptions {
1676 run_parallel: false,
1677 duplicate_check: true,
1678 should_broadcast: true,
1679 },
1680 )
1681 }
1682
1683 pub fn match_to_schema(
1685 self,
1686 schema: SchemaRef,
1687 per_column: Arc<[MatchToSchemaPerColumn]>,
1688 extra_columns: ExtraColumnsPolicy,
1689 ) -> LazyFrame {
1690 let opt_state = self.get_opt_state();
1691 let lp = self
1692 .get_plan_builder()
1693 .match_to_schema(schema, per_column, extra_columns)
1694 .build();
1695 Self::from_logical_plan(lp, opt_state)
1696 }
1697
1698 pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1699 let opt_state = self.get_opt_state();
1700 let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1701 Self::from_logical_plan(lp, opt_state)
1702 }
1703
1704 fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1705 let opt_state = self.get_opt_state();
1706 let lp = self.get_plan_builder().with_columns(exprs, options).build();
1707 Self::from_logical_plan(lp, opt_state)
1708 }
1709
1710 pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1711 let contexts = contexts
1712 .as_ref()
1713 .iter()
1714 .map(|lf| lf.logical_plan.clone())
1715 .collect();
1716 let opt_state = self.get_opt_state();
1717 let lp = self.get_plan_builder().with_context(contexts).build();
1718 Self::from_logical_plan(lp, opt_state)
1719 }
1720
1721 pub fn max(self) -> Self {
1725 self.map_private(DslFunction::Stats(StatsFunction::Max))
1726 }
1727
1728 pub fn min(self) -> Self {
1732 self.map_private(DslFunction::Stats(StatsFunction::Min))
1733 }
1734
1735 pub fn sum(self) -> Self {
1745 self.map_private(DslFunction::Stats(StatsFunction::Sum))
1746 }
1747
1748 pub fn mean(self) -> Self {
1753 self.map_private(DslFunction::Stats(StatsFunction::Mean))
1754 }
1755
1756 pub fn median(self) -> Self {
1762 self.map_private(DslFunction::Stats(StatsFunction::Median))
1763 }
1764
1765 pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1767 self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1768 quantile,
1769 method,
1770 }))
1771 }
1772
1773 pub fn std(self, ddof: u8) -> Self {
1786 self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1787 }
1788
1789 pub fn var(self, ddof: u8) -> Self {
1799 self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1800 }
1801
1802 pub fn explode(self, columns: Selector) -> LazyFrame {
1804 self.explode_impl(columns, false)
1805 }
1806
1807 fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1809 let opt_state = self.get_opt_state();
1810 let lp = self
1811 .get_plan_builder()
1812 .explode(columns, allow_empty)
1813 .build();
1814 Self::from_logical_plan(lp, opt_state)
1815 }
1816
1817 pub fn null_count(self) -> LazyFrame {
1819 self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1820 }
1821
1822 pub fn unique_stable(
1827 self,
1828 subset: Option<Selector>,
1829 keep_strategy: UniqueKeepStrategy,
1830 ) -> LazyFrame {
1831 self.unique_stable_generic(subset, keep_strategy)
1832 }
1833
1834 pub fn unique_stable_generic(
1835 self,
1836 subset: Option<Selector>,
1837 keep_strategy: UniqueKeepStrategy,
1838 ) -> LazyFrame {
1839 let opt_state = self.get_opt_state();
1840 let options = DistinctOptionsDSL {
1841 subset,
1842 maintain_order: true,
1843 keep_strategy,
1844 };
1845 let lp = self.get_plan_builder().distinct(options).build();
1846 Self::from_logical_plan(lp, opt_state)
1847 }
1848
1849 pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1857 self.unique_generic(subset, keep_strategy)
1858 }
1859
1860 pub fn unique_generic(
1861 self,
1862 subset: Option<Selector>,
1863 keep_strategy: UniqueKeepStrategy,
1864 ) -> LazyFrame {
1865 let opt_state = self.get_opt_state();
1866 let options = DistinctOptionsDSL {
1867 subset,
1868 maintain_order: false,
1869 keep_strategy,
1870 };
1871 let lp = self.get_plan_builder().distinct(options).build();
1872 Self::from_logical_plan(lp, opt_state)
1873 }
1874
1875 pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1880 let opt_state = self.get_opt_state();
1881 let lp = self.get_plan_builder().drop_nans(subset).build();
1882 Self::from_logical_plan(lp, opt_state)
1883 }
1884
1885 pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1890 let opt_state = self.get_opt_state();
1891 let lp = self.get_plan_builder().drop_nulls(subset).build();
1892 Self::from_logical_plan(lp, opt_state)
1893 }
1894
1895 pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1905 let opt_state = self.get_opt_state();
1906 let lp = self.get_plan_builder().slice(offset, len).build();
1907 Self::from_logical_plan(lp, opt_state)
1908 }
1909
1910 pub fn first(self) -> LazyFrame {
1914 self.slice(0, 1)
1915 }
1916
1917 pub fn last(self) -> LazyFrame {
1921 self.slice(-1, 1)
1922 }
1923
1924 pub fn tail(self, n: IdxSize) -> LazyFrame {
1928 let neg_tail = -(n as i64);
1929 self.slice(neg_tail, n)
1930 }
1931
1932 #[cfg(feature = "pivot")]
1936 pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1937 let opt_state = self.get_opt_state();
1938 let lp = self.get_plan_builder().unpivot(args).build();
1939 Self::from_logical_plan(lp, opt_state)
1940 }
1941
1942 pub fn limit(self, n: IdxSize) -> LazyFrame {
1944 self.slice(0, n)
1945 }
1946
1947 pub fn map<F>(
1961 self,
1962 function: F,
1963 optimizations: AllowedOptimizations,
1964 schema: Option<Arc<dyn UdfSchema>>,
1965 name: Option<&'static str>,
1966 ) -> LazyFrame
1967 where
1968 F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1969 {
1970 let opt_state = self.get_opt_state();
1971 let lp = self
1972 .get_plan_builder()
1973 .map(
1974 function,
1975 optimizations,
1976 schema,
1977 PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1978 )
1979 .build();
1980 Self::from_logical_plan(lp, opt_state)
1981 }
1982
1983 #[cfg(feature = "python")]
1984 pub fn map_python(
1985 self,
1986 function: polars_utils::python_function::PythonFunction,
1987 optimizations: AllowedOptimizations,
1988 schema: Option<SchemaRef>,
1989 validate_output: bool,
1990 ) -> LazyFrame {
1991 let opt_state = self.get_opt_state();
1992 let lp = self
1993 .get_plan_builder()
1994 .map_python(function, optimizations, schema, validate_output)
1995 .build();
1996 Self::from_logical_plan(lp, opt_state)
1997 }
1998
1999 pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2000 let opt_state = self.get_opt_state();
2001 let lp = self.get_plan_builder().map_private(function).build();
2002 Self::from_logical_plan(lp, opt_state)
2003 }
2004
2005 pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2014 where
2015 S: Into<PlSmallStr>,
2016 {
2017 let name = name.into();
2018
2019 match &self.logical_plan {
2020 v @ DslPlan::Scan {
2021 scan_type,
2022 unified_scan_args,
2023 ..
2024 } if unified_scan_args.row_index.is_none()
2025 && !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2026 {
2027 let DslPlan::Scan {
2028 sources,
2029 mut unified_scan_args,
2030 scan_type,
2031 cached_ir: _,
2032 } = v.clone()
2033 else {
2034 unreachable!()
2035 };
2036
2037 unified_scan_args.row_index = Some(RowIndex {
2038 name,
2039 offset: offset.unwrap_or(0),
2040 });
2041
2042 DslPlan::Scan {
2043 sources,
2044 unified_scan_args,
2045 scan_type,
2046 cached_ir: Default::default(),
2047 }
2048 .into()
2049 },
2050 _ => self.map_private(DslFunction::RowIndex { name, offset }),
2051 }
2052 }
2053
2054 pub fn count(self) -> LazyFrame {
2056 self.select(vec![col(PlSmallStr::from_static("*")).count()])
2057 }
2058
2059 #[cfg(feature = "dtype-struct")]
2062 pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2063 self.map_private(DslFunction::Unnest {
2064 columns: cols,
2065 separator,
2066 })
2067 }
2068
2069 #[cfg(feature = "merge_sorted")]
2070 pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2071 where
2072 S: Into<PlSmallStr>,
2073 {
2074 let key = key.into();
2075
2076 let lp = DslPlan::MergeSorted {
2077 input_left: Arc::new(self.logical_plan),
2078 input_right: Arc::new(other.logical_plan),
2079 key,
2080 };
2081 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2082 }
2083
2084 pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
2085 let lp = DslPlan::MapFunction {
2086 input: Arc::new(self.logical_plan),
2087 function: DslFunction::Hint(hint),
2088 };
2089 Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2090 }
2091}
2092
2093#[derive(Clone)]
2095pub struct LazyGroupBy {
2096 pub logical_plan: DslPlan,
2097 opt_state: OptFlags,
2098 keys: Vec<Expr>,
2099 maintain_order: bool,
2100 #[cfg(feature = "dynamic_group_by")]
2101 dynamic_options: Option<DynamicGroupOptions>,
2102 #[cfg(feature = "dynamic_group_by")]
2103 rolling_options: Option<RollingGroupOptions>,
2104}
2105
2106impl From<LazyGroupBy> for LazyFrame {
2107 fn from(lgb: LazyGroupBy) -> Self {
2108 Self {
2109 logical_plan: lgb.logical_plan,
2110 opt_state: lgb.opt_state,
2111 cached_arena: Default::default(),
2112 }
2113 }
2114}
2115
2116impl LazyGroupBy {
2117 pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2139 #[cfg(feature = "dynamic_group_by")]
2140 let lp = DslBuilder::from(self.logical_plan)
2141 .group_by(
2142 self.keys,
2143 aggs,
2144 None,
2145 self.maintain_order,
2146 self.dynamic_options,
2147 self.rolling_options,
2148 )
2149 .build();
2150
2151 #[cfg(not(feature = "dynamic_group_by"))]
2152 let lp = DslBuilder::from(self.logical_plan)
2153 .group_by(self.keys, aggs, None, self.maintain_order)
2154 .build();
2155 LazyFrame::from_logical_plan(lp, self.opt_state)
2156 }
2157
2158 pub fn head(self, n: Option<usize>) -> LazyFrame {
2160 let keys = self
2161 .keys
2162 .iter()
2163 .filter_map(|expr| expr_output_name(expr).ok())
2164 .collect::<Vec<_>>();
2165
2166 self.agg([all().as_expr().head(n)])
2167 .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2168 }
2169
2170 pub fn tail(self, n: Option<usize>) -> LazyFrame {
2172 let keys = self
2173 .keys
2174 .iter()
2175 .filter_map(|expr| expr_output_name(expr).ok())
2176 .collect::<Vec<_>>();
2177
2178 self.agg([all().as_expr().tail(n)])
2179 .explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2180 }
2181
2182 pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2187 #[cfg(feature = "dynamic_group_by")]
2188 let options = GroupbyOptions {
2189 dynamic: self.dynamic_options,
2190 rolling: self.rolling_options,
2191 slice: None,
2192 };
2193
2194 #[cfg(not(feature = "dynamic_group_by"))]
2195 let options = GroupbyOptions { slice: None };
2196
2197 let lp = DslPlan::GroupBy {
2198 input: Arc::new(self.logical_plan),
2199 keys: self.keys,
2200 aggs: vec![],
2201 apply: Some((f, schema)),
2202 maintain_order: self.maintain_order,
2203 options: Arc::new(options),
2204 };
2205 LazyFrame::from_logical_plan(lp, self.opt_state)
2206 }
2207}
2208
2209#[must_use]
2210pub struct JoinBuilder {
2211 lf: LazyFrame,
2212 how: JoinType,
2213 other: Option<LazyFrame>,
2214 left_on: Vec<Expr>,
2215 right_on: Vec<Expr>,
2216 allow_parallel: bool,
2217 force_parallel: bool,
2218 suffix: Option<PlSmallStr>,
2219 validation: JoinValidation,
2220 nulls_equal: bool,
2221 coalesce: JoinCoalesce,
2222 maintain_order: MaintainOrderJoin,
2223}
2224impl JoinBuilder {
2225 pub fn new(lf: LazyFrame) -> Self {
2227 Self {
2228 lf,
2229 other: None,
2230 how: JoinType::Inner,
2231 left_on: vec![],
2232 right_on: vec![],
2233 allow_parallel: true,
2234 force_parallel: false,
2235 suffix: None,
2236 validation: Default::default(),
2237 nulls_equal: false,
2238 coalesce: Default::default(),
2239 maintain_order: Default::default(),
2240 }
2241 }
2242
2243 pub fn with(mut self, other: LazyFrame) -> Self {
2245 self.other = Some(other);
2246 self
2247 }
2248
2249 pub fn how(mut self, how: JoinType) -> Self {
2251 self.how = how;
2252 self
2253 }
2254
2255 pub fn validate(mut self, validation: JoinValidation) -> Self {
2256 self.validation = validation;
2257 self
2258 }
2259
2260 pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2264 let on = on.as_ref().to_vec();
2265 self.left_on.clone_from(&on);
2266 self.right_on = on;
2267 self
2268 }
2269
2270 pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2274 self.left_on = on.as_ref().to_vec();
2275 self
2276 }
2277
2278 pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2282 self.right_on = on.as_ref().to_vec();
2283 self
2284 }
2285
2286 pub fn allow_parallel(mut self, allow: bool) -> Self {
2288 self.allow_parallel = allow;
2289 self
2290 }
2291
2292 pub fn force_parallel(mut self, force: bool) -> Self {
2294 self.force_parallel = force;
2295 self
2296 }
2297
2298 pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2300 self.nulls_equal = nulls_equal;
2301 self
2302 }
2303
2304 pub fn suffix<S>(mut self, suffix: S) -> Self
2307 where
2308 S: Into<PlSmallStr>,
2309 {
2310 self.suffix = Some(suffix.into());
2311 self
2312 }
2313
2314 pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2316 self.coalesce = coalesce;
2317 self
2318 }
2319
2320 pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2322 self.maintain_order = maintain_order;
2323 self
2324 }
2325
2326 pub fn finish(self) -> LazyFrame {
2328 let opt_state = self.lf.opt_state;
2329 let other = self.other.expect("'with' not set in join builder");
2330
2331 let args = JoinArgs {
2332 how: self.how,
2333 validation: self.validation,
2334 suffix: self.suffix,
2335 slice: None,
2336 nulls_equal: self.nulls_equal,
2337 coalesce: self.coalesce,
2338 maintain_order: self.maintain_order,
2339 };
2340
2341 let lp = self
2342 .lf
2343 .get_plan_builder()
2344 .join(
2345 other.logical_plan,
2346 self.left_on,
2347 self.right_on,
2348 JoinOptions {
2349 allow_parallel: self.allow_parallel,
2350 force_parallel: self.force_parallel,
2351 args,
2352 }
2353 .into(),
2354 )
2355 .build();
2356 LazyFrame::from_logical_plan(lp, opt_state)
2357 }
2358
2359 pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2361 let opt_state = self.lf.opt_state;
2362 let other = self.other.expect("with not set");
2363
2364 fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2366 if let Expr::BinaryExpr {
2367 op: Operator::And,
2368 left,
2369 right,
2370 } = predicate
2371 {
2372 decompose_and((*left).clone(), expanded_predicates);
2373 decompose_and((*right).clone(), expanded_predicates);
2374 } else {
2375 expanded_predicates.push(predicate);
2376 }
2377 }
2378 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2379 for predicate in predicates {
2380 decompose_and(predicate, &mut expanded_predicates);
2381 }
2382 let predicates: Vec<Expr> = expanded_predicates;
2383
2384 #[cfg(feature = "is_between")]
2386 let predicates: Vec<Expr> = {
2387 let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2388 for predicate in predicates {
2389 if let Expr::Function {
2390 function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2391 input,
2392 ..
2393 } = &predicate
2394 {
2395 if let [expr, lower, upper] = input.as_slice() {
2396 match closed {
2397 ClosedInterval::Both => {
2398 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2399 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2400 },
2401 ClosedInterval::Right => {
2402 expanded_predicates.push(expr.clone().gt(lower.clone()));
2403 expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2404 },
2405 ClosedInterval::Left => {
2406 expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2407 expanded_predicates.push(expr.clone().lt(upper.clone()));
2408 },
2409 ClosedInterval::None => {
2410 expanded_predicates.push(expr.clone().gt(lower.clone()));
2411 expanded_predicates.push(expr.clone().lt(upper.clone()));
2412 },
2413 }
2414 continue;
2415 }
2416 }
2417 expanded_predicates.push(predicate);
2418 }
2419 expanded_predicates
2420 };
2421
2422 let args = JoinArgs {
2423 how: self.how,
2424 validation: self.validation,
2425 suffix: self.suffix,
2426 slice: None,
2427 nulls_equal: self.nulls_equal,
2428 coalesce: self.coalesce,
2429 maintain_order: self.maintain_order,
2430 };
2431 let options = JoinOptions {
2432 allow_parallel: self.allow_parallel,
2433 force_parallel: self.force_parallel,
2434 args,
2435 };
2436
2437 let lp = DslPlan::Join {
2438 input_left: Arc::new(self.lf.logical_plan),
2439 input_right: Arc::new(other.logical_plan),
2440 left_on: Default::default(),
2441 right_on: Default::default(),
2442 predicates,
2443 options: Arc::from(options),
2444 };
2445
2446 LazyFrame::from_logical_plan(lp, opt_state)
2447 }
2448}
2449
2450pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2451 #[cfg(not(feature = "new_streaming"))]
2452 {
2453 None
2454 }
2455 #[cfg(feature = "new_streaming")]
2456 {
2457 Some(polars_stream::build_streaming_query_executor)
2458 }
2459};