1use std::ops::Deref;
2use std::sync::RwLock;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::aliases::{PlHashSet, PlIndexSet};
11use polars_utils::format_pl_smallstr;
12use sqlparser::ast::{
13 BinaryOperator as SQLBinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct,
14 ExcludeSelectItem, Expr as SQLExpr, Fetch, FromTable, FunctionArg, GroupByExpr, Ident,
15 JoinConstraint, JoinOperator, LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName,
16 ObjectType, OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectFlavor, SelectItem,
17 SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
18 TableFactor, TableWithJoins, Truncate, UnaryOperator as SQLUnaryOperator, Value as SQLValue,
19 ValueWithSpan, Values, Visit, WildcardAdditionalOptions, WindowSpec,
20};
21use sqlparser::dialect::GenericDialect;
22use sqlparser::parser::{Parser, ParserOptions};
23
24use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
25use crate::sql_expr::{
26 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
27};
28use crate::sql_visitors::{
29 QualifyExpression, TableIdentifierCollector, check_for_ambiguous_column_refs,
30 expr_has_window_functions, expr_refers_to_table,
31};
32use crate::table_functions::PolarsTableFunctions;
33use crate::types::map_sql_dtype_to_polars;
34
35#[derive(Clone)]
36pub struct TableInfo {
37 pub(crate) frame: LazyFrame,
38 pub(crate) name: PlSmallStr,
39 pub(crate) schema: Arc<Schema>,
40}
41
42struct SelectModifiers {
43 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
48impl SelectModifiers {
49 fn matches_ilike(&self, s: &str) -> bool {
50 match &self.ilike {
51 Some(rx) => rx.is_match(s),
52 None => true,
53 }
54 }
55 fn renamed_cols(&self) -> Vec<Expr> {
56 self.rename
57 .iter()
58 .map(|(before, after)| col(before.clone()).alias(after.clone()))
59 .collect()
60 }
61}
62
63enum ProjectionItem {
65 QualifiedExprs(PlSmallStr, Vec<Expr>),
66 Exprs(Vec<Expr>),
67}
68
69fn expr_output_name(expr: &Expr) -> Option<&PlSmallStr> {
71 match expr {
72 Expr::Column(name) | Expr::Alias(_, name) => Some(name),
73 _ => None,
74 }
75}
76
77fn disambiguate_projection_cols(
79 items: Vec<ProjectionItem>,
80 schema: &Schema,
81) -> PolarsResult<Vec<Expr>> {
82 let mut qualified_wildcard_names: PlHashMap<PlSmallStr, usize> = PlHashMap::new();
84 let mut other_names: PlHashSet<PlSmallStr> = PlHashSet::new();
85 for item in &items {
86 match item {
87 ProjectionItem::QualifiedExprs(_, exprs) => {
88 for expr in exprs {
89 if let Some(name) = expr_output_name(expr) {
90 *qualified_wildcard_names.entry(name.clone()).or_insert(0) += 1;
91 }
92 }
93 },
94 ProjectionItem::Exprs(exprs) => {
95 for expr in exprs {
96 if let Some(name) = expr_output_name(expr) {
97 other_names.insert(name.clone());
98 }
99 }
100 },
101 }
102 }
103
104 let needs_suffix: PlHashSet<PlSmallStr> = qualified_wildcard_names
106 .into_iter()
107 .filter(|(name, count)| *count > 1 || other_names.contains(name))
108 .map(|(name, _)| name)
109 .collect();
110
111 let mut result: Vec<Expr> = Vec::new();
113 for item in items {
114 match item {
115 ProjectionItem::QualifiedExprs(tbl_name, exprs) if !needs_suffix.is_empty() => {
116 for expr in exprs {
117 if let Some(name) = expr_output_name(&expr) {
118 if needs_suffix.contains(name) {
119 let suffixed = format_pl_smallstr!("{}:{}", name, tbl_name);
120 if schema.contains(suffixed.as_str()) {
121 result.push(col(suffixed));
122 continue;
123 }
124 if other_names.contains(name) {
125 polars_bail!(
126 SQLInterface:
127 "column '{}' is duplicated in the SELECT (explicitly, and via the `*` wildcard)", name
128 );
129 }
130 }
131 }
132 result.push(expr);
133 }
134 },
135 ProjectionItem::QualifiedExprs(_, exprs) | ProjectionItem::Exprs(exprs) => {
136 result.extend(exprs);
137 },
138 }
139 }
140 Ok(result)
141}
142
143#[derive(Clone, Copy, PartialEq, Eq)]
145pub(crate) enum FilterMode {
146 KeepTrue,
149 RemoveTrue,
152}
153
154#[derive(Clone)]
156pub struct SQLContext {
157 pub(crate) table_map: Arc<RwLock<PlHashMap<String, LazyFrame>>>,
158 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
159 pub(crate) lp_arena: Arena<IR>,
160 pub(crate) expr_arena: Arena<AExpr>,
161
162 cte_map: PlHashMap<String, LazyFrame>,
163 table_aliases: PlHashMap<String, String>,
164 joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
165 pub(crate) named_windows: PlHashMap<String, WindowSpec>,
166}
167
168impl Default for SQLContext {
169 fn default() -> Self {
170 Self {
171 function_registry: Arc::new(DefaultFunctionRegistry {}),
172 table_map: Default::default(),
173 cte_map: Default::default(),
174 table_aliases: Default::default(),
175 joined_aliases: Default::default(),
176 named_windows: Default::default(),
177 lp_arena: Default::default(),
178 expr_arena: Default::default(),
179 }
180 }
181}
182
183impl SQLContext {
184 pub fn new() -> Self {
192 Self::default()
193 }
194
195 pub fn get_tables(&self) -> Vec<String> {
197 let mut tables = Vec::from_iter(self.table_map.read().unwrap().keys().cloned());
198 tables.sort_unstable();
199 tables
200 }
201
202 pub fn register(&self, name: &str, lf: LazyFrame) {
218 self.table_map.write().unwrap().insert(name.to_owned(), lf);
219 }
220
221 pub fn unregister(&self, name: &str) {
223 self.table_map.write().unwrap().remove(&name.to_owned());
224 }
225
226 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
245 let mut parser = Parser::new(&GenericDialect);
246 parser = parser.with_options(ParserOptions {
247 trailing_commas: true,
248 ..Default::default()
249 });
250
251 let ast = parser
252 .try_with_sql(query)
253 .map_err(to_sql_interface_err)?
254 .parse_statements()
255 .map_err(to_sql_interface_err)?;
256
257 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
258 let res = self.execute_statement(ast.first().unwrap())?;
259
260 let lp_arena = std::mem::take(&mut self.lp_arena);
263 let expr_arena = std::mem::take(&mut self.expr_arena);
264 res.set_cached_arena(lp_arena, expr_arena);
265
266 self.cte_map.clear();
268 self.table_aliases.clear();
269 self.joined_aliases.clear();
270 self.named_windows.clear();
271
272 Ok(res)
273 }
274
275 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
278 self.function_registry = function_registry;
279 self
280 }
281
282 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
284 &self.function_registry
285 }
286
287 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
289 Arc::get_mut(&mut self.function_registry).unwrap()
290 }
291}
292
293impl SQLContext {
294 pub(crate) fn isolated(&self) -> Self {
295 Self {
296 table_map: Arc::new(RwLock::new(self.table_map.read().unwrap().clone())),
298 named_windows: self.named_windows.clone(),
299 cte_map: self.cte_map.clone(),
300
301 ..Default::default()
302 }
303 }
304
305 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
306 let ast = stmt;
307 Ok(match ast {
308 Statement::Query(query) => self.execute_query(query)?,
309 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
310 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
311 stmt @ Statement::Drop {
312 object_type: ObjectType::Table,
313 ..
314 } => self.execute_drop_table(stmt)?,
315 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
316 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
317 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
318 _ => polars_bail!(
319 SQLInterface: "statement type is not supported:\n{:?}", ast,
320 ),
321 })
322 }
323
324 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
325 self.register_ctes(query)?;
326 self.execute_query_no_ctes(query)
327 }
328
329 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
330 self.validate_query(query)?;
331
332 let lf = self.process_query(&query.body, query)?;
333 self.process_limit_offset(lf, &query.limit_clause, &query.fetch)
334 }
335
336 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
337 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
338 }
339
340 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
341 self.table_map
345 .read()
346 .unwrap()
347 .get(name)
348 .cloned()
349 .or_else(|| self.cte_map.get(name).cloned())
350 .or_else(|| {
351 self.table_aliases.get(name).and_then(|alias| {
352 self.table_map
353 .read()
354 .unwrap()
355 .get(alias.as_str())
356 .or_else(|| self.cte_map.get(alias.as_str()))
357 .cloned()
358 })
359 })
360 }
361
362 pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<LazyFrame>
366 where
367 F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
368 {
369 let mut ctx = self.isolated();
370
371 let lf = query(&mut ctx)?;
373
374 lf.set_cached_arena(ctx.lp_arena, ctx.expr_arena);
376
377 Ok(lf)
378 }
379
380 fn expr_or_ordinal(
381 &mut self,
382 e: &SQLExpr,
383 exprs: &[Expr],
384 selected: Option<&[Expr]>,
385 schema: Option<&Schema>,
386 clause: &str,
387 ) -> PolarsResult<Expr> {
388 match e {
389 SQLExpr::UnaryOp {
390 op: SQLUnaryOperator::Minus,
391 expr,
392 } if matches!(
393 **expr,
394 SQLExpr::Value(ValueWithSpan {
395 value: SQLValue::Number(_, _),
396 ..
397 })
398 ) =>
399 {
400 if let SQLExpr::Value(ValueWithSpan {
401 value: SQLValue::Number(ref idx, _),
402 ..
403 }) = **expr
404 {
405 Err(polars_err!(
406 SQLSyntax:
407 "negative ordinal values are invalid for {}; found -{}",
408 clause,
409 idx
410 ))
411 } else {
412 unreachable!()
413 }
414 },
415 SQLExpr::Value(ValueWithSpan {
416 value: SQLValue::Number(idx, _),
417 ..
418 }) => {
419 let idx = idx.parse::<usize>().map_err(|_| {
421 polars_err!(
422 SQLSyntax:
423 "negative ordinal values are invalid for {}; found {}",
424 clause,
425 idx
426 )
427 })?;
428 let cols = if let Some(cols) = selected {
431 cols
432 } else {
433 exprs
434 };
435 Ok(cols
436 .get(idx - 1)
437 .ok_or_else(|| {
438 polars_err!(
439 SQLInterface:
440 "{} ordinal value must refer to a valid column; found {}",
441 clause,
442 idx
443 )
444 })?
445 .clone())
446 },
447 SQLExpr::Value(v) => Err(polars_err!(
448 SQLSyntax:
449 "{} requires a valid expression or positive ordinal; found {}", clause, v,
450 )),
451 _ => {
452 let mut expr = parse_sql_expr(e, self, schema)?;
455 if matches!(e, SQLExpr::CompoundIdentifier(_)) {
456 if let Some(schema) = schema {
457 expr = expr.map_expr(|ex| match &ex {
458 Expr::Column(name) => {
459 let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
460 if schema.contains(prefixed.as_str()) {
461 col(prefixed)
462 } else {
463 ex
464 }
465 },
466 _ => ex,
467 });
468 }
469 }
470 Ok(expr)
471 },
472 }
473 }
474
475 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
476 if let Some(aliases) = self.joined_aliases.get(tbl_name) {
477 if let Some(name) = aliases.get(column_name) {
478 return name.to_string();
479 }
480 }
481 column_name.to_string()
482 }
483
484 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
485 match expr {
486 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
487 SetExpr::Query(nested_query) => {
488 let lf = self.execute_query_no_ctes(nested_query)?;
489 self.process_order_by(lf, &query.order_by, None)
490 },
491 SetExpr::SetOperation {
492 op: SetOperator::Union,
493 set_quantifier,
494 left,
495 right,
496 } => self.process_union(left, right, set_quantifier, query),
497
498 #[cfg(feature = "semi_anti_join")]
499 SetExpr::SetOperation {
500 op: SetOperator::Intersect | SetOperator::Except,
501 set_quantifier,
502 left,
503 right,
504 } => self.process_except_intersect(left, right, set_quantifier, query),
505
506 SetExpr::Values(Values {
507 explicit_row: _,
508 rows,
509 value_keyword: _,
510 }) => self.process_values(rows),
511
512 SetExpr::Table(tbl) => {
513 if let Some(table_name) = tbl.table_name.as_ref() {
514 self.get_table_from_current_scope(table_name)
515 .ok_or_else(|| {
516 polars_err!(
517 SQLInterface: "no table or alias named '{}' found",
518 tbl
519 )
520 })
521 } else {
522 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
523 }
524 },
525 op => {
526 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
527 },
528 }
529 }
530
531 #[cfg(feature = "semi_anti_join")]
532 fn process_except_intersect(
533 &mut self,
534 left: &SetExpr,
535 right: &SetExpr,
536 quantifier: &SetQuantifier,
537 query: &Query,
538 ) -> PolarsResult<LazyFrame> {
539 let (join_type, op_name) = match *query.body {
540 SetExpr::SetOperation {
541 op: SetOperator::Except,
542 ..
543 } => (JoinType::Anti, "EXCEPT"),
544 _ => (JoinType::Semi, "INTERSECT"),
545 };
546
547 let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
550 let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
551 let lf_schema = self.get_frame_schema(&mut lf)?;
552
553 let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
554 let rf_cols = match quantifier {
555 SetQuantifier::ByName => None,
556 SetQuantifier::Distinct | SetQuantifier::None => {
557 let rf_schema = self.get_frame_schema(&mut rf)?;
558 let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
559 if lf_cols.len() != rf_cols.len() {
560 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
561 }
562 Some(rf_cols)
563 },
564 _ => {
565 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
566 },
567 };
568 let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
569 let joined_tbl = match rf_cols {
570 Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
571 None => join.on(lf_cols).finish(),
572 };
573 let lf = joined_tbl.unique(None, UniqueKeepStrategy::Any);
574 self.process_order_by(lf, &query.order_by, None)
575 }
576
577 fn process_union(
578 &mut self,
579 left: &SetExpr,
580 right: &SetExpr,
581 quantifier: &SetQuantifier,
582 query: &Query,
583 ) -> PolarsResult<LazyFrame> {
584 let quantifier = *quantifier;
585
586 let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
589 let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
590
591 let opts = UnionArgs {
592 parallel: true,
593 to_supertypes: true,
594 maintain_order: false,
595 ..Default::default()
596 };
597 let lf = match quantifier {
598 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
600 let lf_schema = self.get_frame_schema(&mut lf)?;
601 let rf_schema = self.get_frame_schema(&mut rf)?;
602 if lf_schema.len() != rf_schema.len() {
603 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
604 }
605 if lf_schema.iter_names().ne(rf_schema.iter_names()) {
608 rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
609 }
610 let concatenated = concat(vec![lf, rf], opts);
611 match quantifier {
612 SetQuantifier::Distinct | SetQuantifier::None => {
613 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
614 },
615 _ => concatenated,
616 }
617 },
618 #[cfg(feature = "diagonal_concat")]
620 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
621 #[cfg(feature = "diagonal_concat")]
623 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
624 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
625 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
626 },
627 #[allow(unreachable_patterns)]
628 _ => {
629 polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
630 },
631 }?;
632
633 self.process_order_by(lf, &query.order_by, None)
634 }
635
636 fn process_unnest_lateral(
639 &self,
640 lf: LazyFrame,
641 alias: &Option<TableAlias>,
642 array_exprs: &[SQLExpr],
643 with_offset: bool,
644 ) -> PolarsResult<LazyFrame> {
645 let alias = alias
646 .as_ref()
647 .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
648 polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
649
650 let (mut explode_cols, mut rename_from, mut rename_to) = (
651 Vec::with_capacity(array_exprs.len()),
652 Vec::with_capacity(array_exprs.len()),
653 Vec::with_capacity(array_exprs.len()),
654 );
655 let is_single_col = array_exprs.len() == 1;
656
657 for (i, arr_expr) in array_exprs.iter().enumerate() {
658 let col_name = match arr_expr {
659 SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
660 SQLExpr::CompoundIdentifier(parts) => {
661 PlSmallStr::from_str(&parts.last().unwrap().value)
662 },
663 SQLExpr::Array(_) => polars_bail!(
664 SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
665 ),
666 other => polars_bail!(
667 SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
668 ),
669 };
670 if let Some(name) = alias
672 .columns
673 .get(i)
674 .map(|c| c.name.value.as_str())
675 .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
676 .filter(|name| !name.is_empty() && *name != col_name.as_str())
677 {
678 rename_from.push(col_name.clone());
679 rename_to.push(PlSmallStr::from_str(name));
680 }
681 explode_cols.push(col_name);
682 }
683
684 let mut lf = lf.explode(
685 Selector::ByName {
686 names: Arc::from(explode_cols),
687 strict: true,
688 },
689 ExplodeOptions {
690 empty_as_null: true,
691 keep_nulls: true,
692 },
693 );
694 if !rename_from.is_empty() {
695 lf = lf.rename(rename_from, rename_to, true);
696 }
697 Ok(lf)
698 }
699
700 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
701 let frame_rows: Vec<Row> = values.iter().map(|row| {
702 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
703 let expr = parse_sql_expr(expr, self, None)?;
704 match expr {
705 Expr::Literal(value) => {
706 value.to_any_value()
707 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
708 .map(|av| av.into_static())
709 },
710 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
711 }
712 }).collect();
713 row_data.map(Row::new)
714 }).collect::<Result<_, _>>()?;
715
716 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
717 }
718
719 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
721 match stmt {
722 Statement::Explain { statement, .. } => {
723 let lf = self.execute_statement(statement)?;
724 let plan = lf.describe_optimized_plan()?;
725 let plan = plan
726 .split('\n')
727 .collect::<Series>()
728 .with_name(PlSmallStr::from_static("Logical Plan"))
729 .into_column();
730 let df = DataFrame::new_infer_height(vec![plan])?;
731 Ok(df.lazy())
732 },
733 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
734 }
735 }
736
737 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
739 let tables = Column::new("name".into(), self.get_tables());
740 let df = DataFrame::new_infer_height(vec![tables])?;
741 Ok(df.lazy())
742 }
743
744 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
746 match stmt {
747 Statement::Drop { names, .. } => {
748 names.iter().for_each(|name| {
749 self.table_map.write().unwrap().remove(&name.to_string());
750 });
751 Ok(DataFrame::empty().lazy())
752 },
753 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
754 }
755 }
756
757 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
759 if let Statement::Delete(Delete {
760 tables,
761 from,
762 using,
763 selection,
764 returning,
765 order_by,
766 limit,
767 delete_token: _,
768 }) = stmt
769 {
770 let error_message: Option<&'static str> = if !tables.is_empty() {
771 Some("DELETE expects exactly one table name")
772 } else if using.is_some() {
773 Some("DELETE does not support the USING clause")
774 } else if returning.is_some() {
775 Some("DELETE does not support the RETURNING clause")
776 } else if limit.is_some() {
777 Some("DELETE does not support the LIMIT clause")
778 } else if !order_by.is_empty() {
779 Some("DELETE does not support the ORDER BY clause")
780 } else {
781 None
782 };
783
784 if let Some(msg) = error_message {
785 polars_bail!(SQLInterface: msg);
786 }
787
788 let from_tables = match &from {
789 FromTable::WithFromKeyword(from) => from,
790 FromTable::WithoutKeyword(from) => from,
791 };
792 if from_tables.len() > 1 {
793 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
794 }
795 let tbl_expr = from_tables.first().unwrap();
796 if !tbl_expr.joins.is_empty() {
797 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
798 }
799 let (_, lf) = self.get_table(&tbl_expr.relation)?;
800 if selection.is_none() {
801 Ok(lf.clear())
803 } else {
804 Ok(self.process_where(lf.clone(), selection, FilterMode::RemoveTrue, None)?)
806 }
807 } else {
808 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
809 }
810 }
811
812 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
814 if let Statement::Truncate(Truncate {
815 table_names,
816 partitions,
817 ..
818 }) = stmt
819 {
820 match partitions {
821 None => {
822 if table_names.len() != 1 {
823 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
824 }
825 let tbl = table_names[0].name.to_string();
826 if let Some(lf) = self.table_map.write().unwrap().get_mut(&tbl) {
827 *lf = lf.clone().clear();
828 Ok(lf.clone())
829 } else {
830 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
831 }
832 },
833 _ => {
834 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
835 },
836 }
837 } else {
838 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
839 }
840 }
841
842 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
843 self.cte_map.insert(name.to_owned(), lf);
844 }
845
846 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
847 if let Some(with) = &query.with {
848 if with.recursive {
849 polars_bail!(SQLInterface: "recursive CTEs are not supported")
850 }
851 for cte in &with.cte_tables {
852 let cte_name = cte.alias.name.value.clone();
854 let mut lf = self.execute_isolated(|ctx| ctx.execute_query(&cte.query))?;
855 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
856 self.register_cte(&cte_name, lf);
857 }
858 }
859 Ok(())
860 }
861
862 fn register_named_windows(
863 &mut self,
864 named_windows: &[NamedWindowDefinition],
865 ) -> PolarsResult<()> {
866 for NamedWindowDefinition(name, expr) in named_windows {
867 let spec = match expr {
868 NamedWindowExpr::NamedWindow(ref_name) => self
869 .named_windows
870 .get(&ref_name.value)
871 .ok_or_else(|| {
872 polars_err!(
873 SQLInterface:
874 "named window '{}' references undefined window '{}'",
875 name.value, ref_name.value
876 )
877 })?
878 .clone(),
879 NamedWindowExpr::WindowSpec(spec) => spec.clone(),
880 };
881 self.named_windows.insert(name.value.clone(), spec);
882 }
883 Ok(())
884 }
885
886 pub(crate) fn execute_from_statement(
888 &mut self,
889 tbl_expr: &TableWithJoins,
890 ) -> PolarsResult<LazyFrame> {
891 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
892 if !tbl_expr.joins.is_empty() {
893 for join in &tbl_expr.joins {
894 if let (
896 JoinOperator::CrossJoin(JoinConstraint::None),
897 TableFactor::UNNEST {
898 alias,
899 array_exprs,
900 with_offset,
901 ..
902 },
903 ) = (&join.join_operator, &join.relation)
904 {
905 if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
906 lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
907 continue;
908 }
909 }
910
911 let (r_name, mut rf) = self.get_table(&join.relation)?;
912 if r_name.is_empty() {
913 polars_bail!(
915 SQLInterface:
916 "cannot JOIN on unnamed relation; please provide an alias"
917 )
918 }
919 let left_schema = self.get_frame_schema(&mut lf)?;
920 let right_schema = self.get_frame_schema(&mut rf)?;
921
922 lf = match &join.join_operator {
923 op @ (JoinOperator::Join(constraint) | JoinOperator::FullOuter(constraint)
925 | JoinOperator::Left(constraint)
926 | JoinOperator::LeftOuter(constraint)
927 | JoinOperator::Right(constraint)
928 | JoinOperator::RightOuter(constraint)
929 | JoinOperator::Inner(constraint)
930 | JoinOperator::Anti(constraint)
931 | JoinOperator::Semi(constraint)
932 | JoinOperator::LeftAnti(constraint)
933 | JoinOperator::LeftSemi(constraint)
934 | JoinOperator::RightAnti(constraint)
935 | JoinOperator::RightSemi(constraint)) => {
936 let (lf, rf) = match op {
937 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
938 _ => (lf, rf),
939 };
940 self.process_join(
941 &TableInfo {
942 frame: lf,
943 name: (&l_name).into(),
944 schema: left_schema.clone(),
945 },
946 &TableInfo {
947 frame: rf,
948 name: (&r_name).into(),
949 schema: right_schema.clone(),
950 },
951 constraint,
952 match op {
953 JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
954 JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
955 JoinType::Left
956 },
957 JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
958 JoinType::Right
959 },
960 JoinOperator::FullOuter(_) => JoinType::Full,
961 #[cfg(feature = "semi_anti_join")]
962 JoinOperator::Anti(_)
963 | JoinOperator::LeftAnti(_)
964 | JoinOperator::RightAnti(_) => JoinType::Anti,
965 #[cfg(feature = "semi_anti_join")]
966 JoinOperator::Semi(_)
967 | JoinOperator::LeftSemi(_)
968 | JoinOperator::RightSemi(_) => JoinType::Semi,
969 join_type => polars_bail!(
970 SQLInterface:
971 "join type '{:?}' not currently supported",
972 join_type
973 ),
974 },
975 )?
976 },
977 JoinOperator::CrossJoin(JoinConstraint::None) => {
978 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
979 },
980 JoinOperator::CrossJoin(constraint) => {
981 polars_bail!(
982 SQLInterface:
983 "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
984 constraint
985 )
986 },
987 join_type => {
988 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
989 },
990 };
991
992 let joined_schema = self.get_frame_schema(&mut lf)?;
994
995 self.joined_aliases.insert(
996 r_name.clone(),
997 right_schema
998 .iter_names()
999 .filter_map(|name| {
1000 let aliased_name = format!("{name}:{r_name}");
1002 if left_schema.contains(name)
1003 && joined_schema.contains(aliased_name.as_str())
1004 {
1005 Some((name.to_string(), aliased_name))
1006 } else {
1007 None
1008 }
1009 })
1010 .collect::<PlHashMap<String, String>>(),
1011 );
1012 }
1013 };
1014 Ok(lf)
1015 }
1016
1017 fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
1019 let Select {
1022 distinct: _,
1024 from: _,
1025 group_by: _,
1026 having: _,
1027 named_window: _,
1028 projection: _,
1029 qualify: _,
1030 selection: _,
1031
1032 flavor: _,
1034 select_token: _,
1035 top_before_distinct: _,
1036 window_before_qualify: _,
1037
1038 ref cluster_by,
1040 ref connect_by,
1041 ref distribute_by,
1042 ref exclude,
1043 ref into,
1044 ref lateral_views,
1045 ref prewhere,
1046 ref sort_by,
1047 ref top,
1048 ref value_table_mode,
1049 } = *select_stmt;
1050
1051 polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
1053 polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
1054 polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
1055 polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
1056 polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO` clause is not supported");
1057 polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
1058 polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
1059 polars_ensure!(sort_by.is_empty(), SQLInterface: "`SORT BY` clause is not supported; use `ORDER BY` instead");
1060 polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
1061 polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
1062
1063 Ok(())
1064 }
1065
1066 fn validate_query(&self, query: &Query) -> PolarsResult<()> {
1068 let Query {
1070 with: _,
1072 body: _,
1073 order_by: _,
1074 limit_clause: _,
1075 fetch,
1076
1077 for_clause,
1079 format_clause,
1080 locks,
1081 pipe_operators,
1082 settings,
1083 } = query;
1084
1085 polars_ensure!(for_clause.is_none(), SQLInterface: "`FOR` clause is not supported");
1087 polars_ensure!(format_clause.is_none(), SQLInterface: "`FORMAT` clause is not supported");
1088 polars_ensure!(locks.is_empty(), SQLInterface: "`FOR UPDATE/SHARE` locking clause is not supported");
1089 polars_ensure!(pipe_operators.is_empty(), SQLInterface: "pipe operators are not supported");
1090 polars_ensure!(settings.is_none(), SQLInterface: "`SETTINGS` clause is not supported");
1091
1092 if let Some(Fetch {
1094 quantity: _, percent,
1096 with_ties,
1097 }) = fetch
1098 {
1099 polars_ensure!(!percent, SQLInterface: "`FETCH` with `PERCENT` is not supported");
1100 polars_ensure!(!with_ties, SQLInterface: "`FETCH` with `WITH TIES` is not supported");
1101 }
1102 Ok(())
1103 }
1104
1105 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
1107 self.validate_select(select_stmt)?;
1109
1110 self.register_named_windows(&select_stmt.named_window)?;
1112
1113 let mut implicit_join_filter: Option<SQLExpr> = None;
1115 let (mut lf, base_table_name) = if select_stmt.from.is_empty() {
1116 (DataFrame::empty().lazy(), None)
1117 } else {
1118 let from = &select_stmt.from;
1119 let first = from.first().unwrap();
1120 let mut lf = self.execute_from_statement(first)?;
1121 let base_name = get_table_name(&first.relation);
1122 if from.len() > 1 {
1123 implicit_join_filter =
1124 self.process_implicit_joins(&mut lf, from, &select_stmt.selection)?;
1125 }
1126 (lf, base_name)
1127 };
1128
1129 if let Some(ref base_name) = base_table_name {
1131 if !self.joined_aliases.is_empty() {
1132 let using_cols: PlHashSet<String> = select_stmt
1134 .from
1135 .first()
1136 .into_iter()
1137 .flat_map(|t| t.joins.iter())
1138 .filter_map(|join| get_using_cols(&join.join_operator))
1139 .flatten()
1140 .collect();
1141
1142 let check_expr = |e| {
1144 check_for_ambiguous_column_refs(e, &self.joined_aliases, base_name, &using_cols)
1145 };
1146 for item in &select_stmt.projection {
1147 match item {
1148 SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
1149 check_expr(e)?
1150 },
1151 _ => {},
1152 }
1153 }
1154 if let Some(ref where_expr) = select_stmt.selection {
1155 check_expr(where_expr)?;
1156 }
1157 }
1158 }
1159
1160 let effective_where = if implicit_join_filter.is_some() {
1162 &implicit_join_filter
1163 } else {
1164 &select_stmt.selection
1165 };
1166 let mut schema = self.get_frame_schema(&mut lf)?;
1167 lf = self.process_where(
1168 lf,
1169 effective_where,
1170 FilterMode::KeepTrue,
1171 Some(schema.clone()),
1172 )?;
1173
1174 let mut select_modifiers = SelectModifiers {
1176 ilike: None,
1177 exclude: PlHashSet::new(),
1178 rename: PlHashMap::new(),
1179 replace: vec![],
1180 };
1181
1182 let window_fn_columns = if select_stmt.qualify.is_some() {
1185 select_stmt
1186 .projection
1187 .iter()
1188 .filter_map(|item| match item {
1189 SelectItem::ExprWithAlias { expr, alias }
1190 if expr_has_window_functions(expr) =>
1191 {
1192 Some(alias.value.clone())
1193 },
1194 _ => None,
1195 })
1196 .collect::<PlHashSet<_>>()
1197 } else {
1198 PlHashSet::new()
1199 };
1200
1201 let mut projections =
1202 self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1203
1204 let mut explode_names = Vec::new();
1206 let mut explode_exprs = Vec::new();
1207 let mut explode_lookup = PlHashMap::new();
1208
1209 for expr in &projections {
1210 for e in expr {
1211 if let Expr::Explode { input, .. } = e {
1212 match input.as_ref() {
1213 Expr::Column(name) => explode_names.push(name.clone()),
1214 other_expr => {
1215 if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1217 let temp_name =
1218 format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1219 explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1220 explode_lookup.insert(other_expr.clone(), temp_name.clone());
1221 explode_names.push(temp_name);
1222 }
1223 },
1224 }
1225 }
1226 }
1227 }
1228 if !explode_names.is_empty() {
1229 if !explode_exprs.is_empty() {
1230 lf = lf.with_columns(explode_exprs);
1231 }
1232 lf = lf.explode(
1233 Selector::ByName {
1234 names: Arc::from(explode_names),
1235 strict: true,
1236 },
1237 ExplodeOptions {
1238 empty_as_null: true,
1239 keep_nulls: true,
1240 },
1241 );
1242 projections = projections
1243 .into_iter()
1244 .map(|p| {
1245 p.map_expr(|e| match e {
1247 Expr::Explode { input, .. } => explode_lookup
1248 .get(input.as_ref())
1249 .map(|name| Expr::Column(name.clone()))
1250 .unwrap_or_else(|| input.as_ref().clone()),
1251 _ => e,
1252 })
1253 })
1254 .collect();
1255
1256 schema = self.get_frame_schema(&mut lf)?;
1257 }
1258
1259 let mut group_by_keys: Vec<Expr> = Vec::new();
1261 match &select_stmt.group_by {
1262 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1264 if !modifiers.is_empty() {
1265 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1266 }
1267 group_by_keys = group_by_exprs
1269 .iter()
1270 .map(|e| match e {
1271 SQLExpr::Identifier(ident) => {
1272 resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1273 || {
1274 self.expr_or_ordinal(
1275 e,
1276 &projections,
1277 None,
1278 Some(&schema),
1279 "GROUP BY",
1280 )
1281 },
1282 Ok,
1283 )
1284 },
1285 _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1286 })
1287 .collect::<PolarsResult<_>>()?
1288 },
1289 GroupByExpr::All(modifiers) => {
1292 if !modifiers.is_empty() {
1293 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1294 }
1295 projections.iter().for_each(|expr| match expr {
1296 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1298 Expr::Column(_) => group_by_keys.push(expr.clone()),
1299 Expr::Alias(e, _)
1300 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1301 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1302 if let Expr::Column(name) = &**e {
1303 group_by_keys.push(col(name.clone()));
1304 }
1305 },
1306 _ => {
1307 if !has_expr(expr, |e| {
1309 matches!(e, Expr::Agg(_))
1310 || matches!(e, Expr::Len)
1311 || matches!(e, Expr::Over { .. })
1312 || {
1313 #[cfg(feature = "dynamic_group_by")]
1314 {
1315 matches!(e, Expr::Rolling { .. })
1316 }
1317 #[cfg(not(feature = "dynamic_group_by"))]
1318 {
1319 false
1320 }
1321 }
1322 }) {
1323 group_by_keys.push(expr.clone())
1324 }
1325 },
1326 });
1327 },
1328 };
1329
1330 lf = if group_by_keys.is_empty() {
1331 if select_stmt.having.is_some() {
1333 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1334 };
1335
1336 let mut retained_cols = Vec::with_capacity(projections.len());
1338 let mut retained_names = Vec::with_capacity(projections.len());
1339 let have_order_by = query.order_by.is_some();
1340
1341 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1343
1344 for p in projections.iter() {
1349 let name = p.to_field(schema.deref())?.name.to_string();
1350 if select_modifiers.matches_ilike(&name)
1351 && !select_modifiers.exclude.contains(&name)
1352 {
1353 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1354
1355 retained_cols.push(if have_order_by {
1356 col(name.as_str())
1357 } else {
1358 p.clone()
1359 });
1360 retained_names.push(col(name));
1361 }
1362 }
1363
1364 if have_order_by {
1366 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1371 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1372 {
1373 lf = lf.with_columns(projections);
1374 } else {
1375 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1384 lf = lf
1385 .clone()
1386 .select(projections)
1387 .with_row_index(NAME, None)
1388 .join(
1389 lf.with_row_index(NAME, None),
1390 [col(NAME)],
1391 [col(NAME)],
1392 JoinArgs {
1393 how: JoinType::Left,
1394 validation: Default::default(),
1395 suffix: None,
1396 slice: None,
1397 nulls_equal: false,
1398 coalesce: Default::default(),
1399 maintain_order: MaintainOrderJoin::Left,
1400 build_side: None,
1401 },
1402 );
1403 }
1404 }
1405 if !select_modifiers.replace.is_empty() {
1406 lf = lf.with_columns(&select_modifiers.replace);
1407 }
1408 if !select_modifiers.rename.is_empty() {
1409 lf = lf.with_columns(select_modifiers.renamed_cols());
1410 }
1411 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1412
1413 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1415 && !have_order_by
1416 {
1417 lf = lf.with_columns(retained_cols).select(retained_names);
1419 } else {
1420 lf = lf.select(retained_cols);
1421 }
1422 if !select_modifiers.rename.is_empty() {
1423 lf = lf.rename(
1424 select_modifiers.rename.keys(),
1425 select_modifiers.rename.values(),
1426 true,
1427 );
1428 };
1429 lf
1430 } else {
1431 let having = select_stmt
1432 .having
1433 .as_ref()
1434 .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1435 .transpose()?;
1436 lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1437 lf = self.process_order_by(lf, &query.order_by, None)?;
1438
1439 let output_cols: Vec<_> = projections
1441 .iter()
1442 .map(|p| p.to_field(&schema))
1443 .collect::<PolarsResult<Vec<_>>>()?
1444 .into_iter()
1445 .map(|f| col(f.name))
1446 .collect();
1447
1448 lf.select(&output_cols)
1449 };
1450
1451 lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1453
1454 lf = match &select_stmt.distinct {
1456 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1457 Some(Distinct::On(exprs)) => {
1458 let schema = Some(self.get_frame_schema(&mut lf)?);
1460 let cols = exprs
1461 .iter()
1462 .map(|e| {
1463 let expr = parse_sql_expr(e, self, schema.as_deref())?;
1464 if let Expr::Column(name) = expr {
1465 Ok(name)
1466 } else {
1467 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1468 }
1469 })
1470 .collect::<PolarsResult<Vec<_>>>()?;
1471
1472 lf = self.process_order_by(lf, &query.order_by, None)?;
1474 return Ok(lf.unique_stable(
1475 Some(Selector::ByName {
1476 names: cols.into(),
1477 strict: true,
1478 }),
1479 UniqueKeepStrategy::First,
1480 ));
1481 },
1482 None => lf,
1483 };
1484 Ok(lf)
1485 }
1486
1487 fn column_projections(
1488 &mut self,
1489 select_stmt: &Select,
1490 schema: &SchemaRef,
1491 select_modifiers: &mut SelectModifiers,
1492 ) -> PolarsResult<Vec<Expr>> {
1493 if select_stmt.projection.is_empty()
1494 && select_stmt.flavor == SelectFlavor::FromFirstNoSelect
1495 {
1496 return Ok(schema.iter_names().map(|name| col(name.clone())).collect());
1498 }
1499 let mut items: Vec<ProjectionItem> = Vec::with_capacity(select_stmt.projection.len());
1500 let mut has_qualified_wildcard = false;
1501
1502 for select_item in &select_stmt.projection {
1503 match select_item {
1504 SelectItem::UnnamedExpr(expr) => {
1505 items.push(ProjectionItem::Exprs(vec![parse_sql_expr(
1506 expr,
1507 self,
1508 Some(schema),
1509 )?]));
1510 },
1511 SelectItem::ExprWithAlias { expr, alias } => {
1512 let expr = parse_sql_expr(expr, self, Some(schema))?;
1513 items.push(ProjectionItem::Exprs(vec![
1514 expr.alias(PlSmallStr::from_str(alias.value.as_str())),
1515 ]));
1516 },
1517 SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1518 SelectItemQualifiedWildcardKind::ObjectName(obj_name) => {
1519 let tbl_name = obj_name
1520 .0
1521 .last()
1522 .and_then(|p| p.as_ident())
1523 .map(|i| PlSmallStr::from_str(&i.value))
1524 .unwrap_or_default();
1525 let exprs = self.process_qualified_wildcard(
1526 obj_name,
1527 wildcard_options,
1528 select_modifiers,
1529 Some(schema),
1530 )?;
1531 items.push(ProjectionItem::QualifiedExprs(tbl_name, exprs));
1532 has_qualified_wildcard = true;
1533 },
1534 SelectItemQualifiedWildcardKind::Expr(_) => {
1535 polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1536 },
1537 },
1538 SelectItem::Wildcard(wildcard_options) => {
1539 let cols = schema.iter_names().map(|name| col(name.clone())).collect();
1540 items.push(ProjectionItem::Exprs(
1541 self.process_wildcard_additional_options(
1542 cols,
1543 wildcard_options,
1544 select_modifiers,
1545 Some(schema),
1546 )?,
1547 ));
1548 },
1549 }
1550 }
1551
1552 let exprs = if has_qualified_wildcard {
1554 disambiguate_projection_cols(items, schema)?
1555 } else {
1556 items
1557 .into_iter()
1558 .flat_map(|item| match item {
1559 ProjectionItem::Exprs(exprs) | ProjectionItem::QualifiedExprs(_, exprs) => {
1560 exprs
1561 },
1562 })
1563 .collect()
1564 };
1565 let flattened_exprs = exprs
1566 .into_iter()
1567 .flat_map(|expr| expand_exprs(expr, schema))
1568 .collect();
1569
1570 Ok(flattened_exprs)
1571 }
1572
1573 fn process_where(
1574 &mut self,
1575 mut lf: LazyFrame,
1576 expr: &Option<SQLExpr>,
1577 filter_mode: FilterMode,
1578 schema: Option<SchemaRef>,
1579 ) -> PolarsResult<LazyFrame> {
1580 if let Some(expr) = expr {
1581 let schema = match schema {
1582 None => self.get_frame_schema(&mut lf)?,
1583 Some(s) => s,
1584 };
1585
1586 let (all_true, all_false) = match expr {
1588 SQLExpr::Value(ValueWithSpan {
1589 value: SQLValue::Boolean(b),
1590 ..
1591 }) => (*b, !*b),
1592 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1593 (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::Eq) => {
1594 (a.value == b.value, a.value != b.value)
1595 },
1596 (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::NotEq) => {
1597 (a.value != b.value, a.value == b.value)
1598 },
1599 _ => (false, false),
1600 },
1601 _ => (false, false),
1602 };
1603 let removing = filter_mode == FilterMode::RemoveTrue;
1604 if (all_true && !removing) || (all_false && removing) {
1605 return Ok(lf);
1606 } else if (all_false && !removing) || (all_true && removing) {
1607 return Ok(lf.clear());
1608 }
1609
1610 let residual_exprs: Vec<&SQLExpr>;
1614 (lf, residual_exprs) =
1615 self.rewrite_subquery_conjuncts(lf, expr, filter_mode, &schema)?;
1616
1617 let Some(parsed_residual) = residual_exprs
1618 .iter()
1619 .map(|e| parse_sql_expr(e, self, Some(&*schema)))
1620 .reduce(|a, b| Ok(a?.and(b?)))
1621 else {
1622 return Ok(lf);
1624 };
1625 let mut filter_expression = parsed_residual?;
1626 if filter_expression.clone().meta().has_multiple_outputs() {
1627 filter_expression = all_horizontal([filter_expression])?;
1628 }
1629 lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1630 lf = match filter_mode {
1631 FilterMode::KeepTrue => lf.filter(filter_expression),
1632 FilterMode::RemoveTrue => lf.remove(filter_expression),
1633 };
1634 }
1635 Ok(lf)
1636 }
1637
1638 pub(super) fn process_join(
1639 &mut self,
1640 tbl_left: &TableInfo,
1641 tbl_right: &TableInfo,
1642 constraint: &JoinConstraint,
1643 join_type: JoinType,
1644 ) -> PolarsResult<LazyFrame> {
1645 let (left_on, right_on, predicates) =
1646 process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1647 let coalesce_type = match constraint {
1648 JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1650 _ => JoinCoalesce::KeepColumns,
1651 };
1652 let suffix = format!(":{}", tbl_right.name);
1653
1654 let joined = if predicates.is_empty() {
1655 tbl_left
1657 .frame
1658 .clone()
1659 .join_builder()
1660 .with(tbl_right.frame.clone())
1661 .left_on(left_on)
1662 .right_on(right_on)
1663 .how(join_type)
1664 .suffix(suffix)
1665 .coalesce(coalesce_type)
1666 .finish()
1667 } else {
1668 let mut all_predicates = predicates;
1672 for (l, r) in left_on.into_iter().zip(right_on) {
1673 let r_suffixed = suffix_conflicting_columns(r, tbl_left, tbl_right, &suffix);
1674 all_predicates.push(l.eq(r_suffixed));
1675 }
1676 tbl_left
1677 .frame
1678 .clone()
1679 .join_builder()
1680 .with(tbl_right.frame.clone())
1681 .how(join_type)
1682 .suffix(suffix)
1683 .coalesce(coalesce_type)
1684 .join_where(all_predicates)
1685 };
1686
1687 Ok(joined)
1688 }
1689
1690 fn process_implicit_joins(
1696 &mut self,
1697 lf: &mut LazyFrame,
1698 from: &[TableWithJoins],
1699 where_clause: &Option<SQLExpr>,
1700 ) -> PolarsResult<Option<SQLExpr>> {
1701 let first = from.first().unwrap();
1702 let base_name = get_table_name(&first.relation).unwrap_or_default();
1703 let mut remaining_where = where_clause.clone();
1704 let mut joined_table_names: Vec<String> = vec![base_name];
1705
1706 for join in &first.joins {
1708 if let Some(name) = get_table_name(&join.relation) {
1709 joined_table_names.push(name);
1710 }
1711 }
1712 for tbl_expr in from.iter().skip(1) {
1713 let mut rf = self.execute_from_statement(tbl_expr)?;
1714 let r_name = get_table_name(&tbl_expr.relation).unwrap_or_default();
1715 polars_ensure!(
1716 !r_name.is_empty(),
1717 SQLInterface: "implicit joins require named tables; please provide an alias"
1718 );
1719 let left_schema = self.get_frame_schema(lf)?;
1720 let right_schema = self.get_frame_schema(&mut rf)?;
1721 let (join_expr, residual) =
1722 extract_join_predicates(&remaining_where, &joined_table_names, &r_name);
1723
1724 *lf = if let Some(on_expr) = join_expr {
1725 self.process_join(
1726 &TableInfo {
1727 frame: lf.clone(),
1728 name: PlSmallStr::from_str(
1729 &joined_table_names.first().cloned().unwrap_or_default(),
1730 ),
1731 schema: left_schema.clone(),
1732 },
1733 &TableInfo {
1734 frame: rf,
1735 name: PlSmallStr::from_str(&r_name),
1736 schema: right_schema.clone(),
1737 },
1738 &JoinConstraint::On(on_expr),
1739 JoinType::Inner,
1740 )?
1741 } else {
1742 lf.clone()
1743 .cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
1744 };
1745 remaining_where = residual;
1746
1747 let joined_schema = self.get_frame_schema(lf)?;
1749 self.joined_aliases.insert(
1750 r_name.clone(),
1751 right_schema
1752 .iter_names()
1753 .filter_map(|name| {
1754 let aliased_name = format!("{name}:{r_name}");
1755 if left_schema.contains(name)
1756 && joined_schema.contains(aliased_name.as_str())
1757 {
1758 Some((name.to_string(), aliased_name))
1759 } else {
1760 None
1761 }
1762 })
1763 .collect::<PlHashMap<String, String>>(),
1764 );
1765 joined_table_names.push(r_name);
1766 for join in &tbl_expr.joins {
1767 if let Some(name) = get_table_name(&join.relation) {
1768 joined_table_names.push(name);
1769 }
1770 }
1771 }
1772 Ok(remaining_where)
1773 }
1774
1775 fn process_qualify(
1776 &mut self,
1777 mut lf: LazyFrame,
1778 qualify_expr: &Option<SQLExpr>,
1779 window_fn_columns: &PlHashSet<String>,
1780 ) -> PolarsResult<LazyFrame> {
1781 if let Some(expr) = qualify_expr {
1782 let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1785 let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1786 if !has_window_fns && !references_window_alias {
1787 polars_bail!(
1788 SQLSyntax:
1789 "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1790 );
1791 }
1792 let schema = self.get_frame_schema(&mut lf)?;
1793 let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1794 if filter_expression.clone().meta().has_multiple_outputs() {
1795 filter_expression = all_horizontal([filter_expression])?;
1796 }
1797 lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1798 lf = lf.filter(filter_expression);
1799 }
1800 Ok(lf)
1801 }
1802
1803 fn process_subqueries(
1804 &mut self,
1805 lf: LazyFrame,
1806 exprs: Vec<&mut Expr>,
1807 ) -> PolarsResult<LazyFrame> {
1808 let mut subplans = vec![];
1809
1810 for e in exprs {
1811 *e = e.clone().try_map_expr(|e| {
1812 if let Expr::SubPlan(lp, names) = e {
1813 assert_eq!(
1814 names.len(),
1815 1,
1816 "multiple columns in subqueries not yet supported"
1817 );
1818
1819 let select_expr = names[0].1.clone();
1820 let mut lf = LazyFrame::from((**lp).clone());
1821 let schema = self.get_frame_schema(&mut lf)?;
1822 polars_ensure!(schema.len() == 1, SQLSyntax: "SQL subquery returns more than one column");
1823 let lf = lf.select([select_expr.clone()]);
1824
1825 subplans.push(lf);
1826 Ok(Expr::Column(names[0].0.clone()).first())
1827 } else {
1828 Ok(e)
1829 }
1830 })?;
1831 }
1832
1833 if subplans.is_empty() {
1834 Ok(lf)
1835 } else {
1836 subplans.insert(0, lf);
1837 concat_lf_horizontal(
1838 subplans,
1839 HConcatOptions {
1840 broadcast_unit_length: true,
1841 ..Default::default()
1842 },
1843 )
1844 }
1845 }
1846
1847 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1848 if let Statement::CreateTable(CreateTable {
1849 if_not_exists,
1850 name,
1851 query,
1852 columns,
1853 like,
1854 ..
1855 }) = stmt
1856 {
1857 let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1858 if *if_not_exists && self.table_map.read().unwrap().contains_key(tbl_name) {
1859 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1860 }
1861 let lf = match (query, columns.is_empty(), like) {
1862 (Some(query), true, None) => {
1863 self.execute_query(query)?
1867 },
1868 (None, false, None) => {
1869 let mut schema = Schema::with_capacity(columns.len());
1873 for col in columns {
1874 let col_name = col.name.value.as_str();
1875 let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1876 schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1877 }
1878 DataFrame::empty_with_schema(&schema).lazy()
1879 },
1880 (None, true, Some(like_kind)) => {
1881 let like_name = match like_kind {
1885 CreateTableLikeKind::Plain(like)
1886 | CreateTableLikeKind::Parenthesized(like) => &like.name,
1887 };
1888 let like_table = like_name
1889 .0
1890 .first()
1891 .unwrap()
1892 .as_ident()
1893 .unwrap()
1894 .value
1895 .as_str();
1896 if let Some(table) = self.table_map.read().unwrap().get(like_table).cloned() {
1897 table.clear()
1898 } else {
1899 polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1900 }
1901 },
1902 (None, true, None) => {
1904 polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1905 },
1906 _ => {
1908 polars_bail!(
1909 SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1910 query,
1911 columns,
1912 like,
1913 )
1914 },
1915 };
1916 self.register(tbl_name, lf);
1917
1918 let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1919 Ok(df_created.unwrap().lazy())
1920 } else {
1921 unreachable!()
1922 }
1923 }
1924
1925 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1926 match relation {
1927 TableFactor::Table {
1928 name, alias, args, ..
1929 } => {
1930 if let Some(args) = args {
1931 return self.execute_table_function(name, alias, &args.args);
1932 }
1933 let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1934 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1935 match alias {
1936 Some(alias) => {
1937 self.table_aliases
1938 .insert(alias.name.value.clone(), tbl_name.to_string());
1939 Ok((alias.name.value.clone(), lf))
1940 },
1941 None => Ok((tbl_name.to_string(), lf)),
1942 }
1943 } else {
1944 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1945 }
1946 },
1947 TableFactor::Derived {
1948 lateral,
1949 subquery,
1950 alias,
1951 } => {
1952 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1953 if let Some(alias) = alias {
1956 let mut lf =
1957 self.execute_isolated(|ctx| ctx.execute_query_no_ctes(subquery))?;
1958 lf = self.rename_columns_from_table_alias(lf, alias)?;
1959 self.table_map
1960 .write()
1961 .unwrap()
1962 .insert(alias.name.value.clone(), lf.clone());
1963 Ok((alias.name.value.clone(), lf))
1964 } else {
1965 let lf = self.execute_isolated(|ctx| ctx.execute_query_no_ctes(subquery))?;
1966 Ok(("".to_string(), lf))
1967 }
1968 },
1969 TableFactor::UNNEST {
1970 alias,
1971 array_exprs,
1972 with_offset,
1973 with_offset_alias: _,
1974 ..
1975 } => {
1976 if let Some(alias) = alias {
1977 let column_names: Vec<Option<PlSmallStr>> = alias
1978 .columns
1979 .iter()
1980 .map(|c| {
1981 if c.name.value.is_empty() {
1982 None
1983 } else {
1984 Some(PlSmallStr::from_str(c.name.value.as_str()))
1985 }
1986 })
1987 .collect();
1988
1989 let column_values: Vec<Series> = array_exprs
1990 .iter()
1991 .map(|arr| parse_sql_array(arr, self))
1992 .collect::<Result<_, _>>()?;
1993
1994 polars_ensure!(!column_names.is_empty(),
1995 SQLSyntax:
1996 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1997 );
1998 if column_names.len() != column_values.len() {
1999 let plural = if column_values.len() > 1 { "s" } else { "" };
2000 polars_bail!(
2001 SQLSyntax:
2002 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
2003 );
2004 }
2005 let column_series: Vec<Column> = column_values
2006 .into_iter()
2007 .zip(column_names)
2008 .map(|(s, name)| {
2009 if let Some(name) = name {
2010 s.with_name(name)
2011 } else {
2012 s
2013 }
2014 })
2015 .map(Column::from)
2016 .collect();
2017
2018 let lf = DataFrame::new_infer_height(column_series)?.lazy();
2019
2020 if *with_offset {
2021 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
2023 }
2024 let table_name = alias.name.value.clone();
2025 self.table_map
2026 .write()
2027 .unwrap()
2028 .insert(table_name.clone(), lf.clone());
2029 Ok((table_name, lf))
2030 } else {
2031 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
2032 }
2033 },
2034 TableFactor::NestedJoin {
2035 table_with_joins,
2036 alias,
2037 } => {
2038 let lf =
2039 self.execute_isolated(|ctx| ctx.execute_from_statement(table_with_joins))?;
2040 match alias {
2041 Some(a) => Ok((a.name.value.clone(), lf)),
2042 None => Ok(("".to_string(), lf)),
2043 }
2044 },
2045 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
2047 }
2048 }
2049
2050 fn execute_table_function(
2051 &mut self,
2052 name: &ObjectName,
2053 alias: &Option<TableAlias>,
2054 args: &[FunctionArg],
2055 ) -> PolarsResult<(String, LazyFrame)> {
2056 let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
2057 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
2058 let (tbl_name, lf) = read_fn.execute(args)?;
2059 #[allow(clippy::useless_asref)]
2060 let tbl_name = alias
2061 .as_ref()
2062 .map(|a| a.name.value.clone())
2063 .unwrap_or_else(|| tbl_name.to_string());
2064
2065 self.table_map
2066 .write()
2067 .unwrap()
2068 .insert(tbl_name.clone(), lf.clone());
2069 Ok((tbl_name, lf))
2070 }
2071
2072 fn process_order_by(
2073 &mut self,
2074 mut lf: LazyFrame,
2075 order_by: &Option<OrderBy>,
2076 selected: Option<&[Expr]>,
2077 ) -> PolarsResult<LazyFrame> {
2078 if order_by.as_ref().is_none_or(|ob| match &ob.kind {
2079 OrderByKind::Expressions(exprs) => exprs.is_empty(),
2080 OrderByKind::All(_) => false,
2081 }) {
2082 return Ok(lf);
2083 }
2084 let schema = self.get_frame_schema(&mut lf)?;
2085 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
2086 let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
2087 OrderByKind::Expressions(exprs) => {
2088 if exprs.len() == 1
2091 && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
2092 if ident.value.to_uppercase() == "ALL"
2093 && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
2094 {
2095 let n_cols = if let Some(selected) = selected {
2097 selected.len()
2098 } else {
2099 schema.len()
2100 };
2101 (vec![], Some(&exprs[0].options), n_cols)
2102 } else {
2103 (exprs.clone(), None, exprs.len())
2104 }
2105 },
2106 OrderByKind::All(opts) => {
2107 let n_cols = if let Some(selected) = selected {
2108 selected.len()
2109 } else {
2110 schema.len()
2111 };
2112 (vec![], Some(opts), n_cols)
2113 },
2114 };
2115 let mut descending = Vec::with_capacity(n_order_cols);
2116 let mut nulls_last = Vec::with_capacity(n_order_cols);
2117 let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
2118
2119 if let Some(opts) = order_by_all {
2120 if let Some(selected) = selected {
2121 by.extend(selected.iter().cloned());
2122 } else {
2123 by.extend(columns_iter);
2124 };
2125 let desc_order = !opts.asc.unwrap_or(true);
2126 nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
2127 descending.resize(by.len(), desc_order);
2128 } else {
2129 let columns = &columns_iter.collect::<Vec<_>>();
2130 for ob in order_by {
2131 let desc_order = !ob.options.asc.unwrap_or(true);
2134 nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
2135 descending.push(desc_order);
2136
2137 by.push(self.expr_or_ordinal(
2139 &ob.expr,
2140 columns,
2141 selected,
2142 Some(&schema),
2143 "ORDER BY",
2144 )?)
2145 }
2146 }
2147 Ok(lf.sort_by_exprs(
2148 &by,
2149 SortMultipleOptions::default()
2150 .with_order_descending_multi(descending)
2151 .with_nulls_last_multi(nulls_last),
2152 ))
2153 }
2154
2155 fn process_group_by(
2156 &mut self,
2157 mut lf: LazyFrame,
2158 group_by_keys: &[Expr],
2159 projections: &[Expr],
2160 having: Option<Expr>,
2161 ) -> PolarsResult<LazyFrame> {
2162 let schema_before = self.get_frame_schema(&mut lf)?;
2163 let group_by_keys_schema =
2164 expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
2165 format!("group_by keys contained duplicate output name '{duplicate_name}'")
2166 })?;
2167
2168 let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
2170 let mut aggregation_projection = Vec::with_capacity(projections.len());
2171 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
2172 let mut projection_aliases = PlHashSet::new();
2173 let mut group_key_aliases = PlHashSet::new();
2174
2175 let group_key_data: Vec<_> = group_by_keys
2178 .iter()
2179 .map(|gk| {
2180 (
2181 strip_outer_alias(gk),
2182 gk.to_field(&schema_before).ok().map(|f| f.name),
2183 )
2184 })
2185 .collect();
2186
2187 let projection_group_key: Vec<Option<PlSmallStr>> = projections
2188 .iter()
2189 .map(|p| {
2190 let p_stripped = strip_outer_alias(p);
2191 group_key_data.iter().find_map(|(gk_stripped, gk_name)| {
2192 (*gk_stripped == p_stripped)
2193 .then(|| gk_name.clone())
2194 .flatten()
2195 })
2196 })
2197 .collect();
2198
2199 for (e, group_key) in projections.iter().zip(&projection_group_key) {
2200 let matches_group_key = group_key.is_some();
2201 let is_non_group_key_expr = !matches_group_key
2203 && has_expr(e, |e| {
2204 match e {
2205 Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
2206 #[cfg(feature = "dynamic_group_by")]
2207 Expr::Rolling { .. } => true,
2208 Expr::AnonymousFunction { options, .. } => options.returns_scalar(),
2209 Expr::Function { function: func, .. }
2210 if !matches!(func, FunctionExpr::StructExpr(_)) =>
2211 {
2212 has_expr(e, |e| match e {
2215 Expr::Column(name) => !group_by_keys_schema.contains(name),
2216 _ => false,
2217 })
2218 },
2219 _ => false,
2220 }
2221 });
2222
2223 let mut e_inner = e;
2226 if let Expr::Alias(expr, alias) = e {
2227 if e.clone().meta().is_simple_projection(Some(&schema_before)) {
2228 group_key_aliases.insert(alias.as_ref());
2229 e_inner = expr
2230 } else if let Expr::Function {
2231 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
2232 ..
2233 } = expr.deref()
2234 {
2235 projection_overrides
2236 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
2237 } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
2238 projection_aliases.insert(alias.as_ref());
2239 }
2240 }
2241 let field = e_inner.to_field(&schema_before)?;
2242 if is_non_group_key_expr {
2243 let mut e = e.clone();
2244 if let Expr::Agg(AggExpr::Implode {
2245 input: expr,
2246 maintain_order: _,
2247 }) = &e
2248 {
2249 e = (**expr).clone();
2250 } else if let Expr::Alias(expr, name) = &e {
2251 if let Expr::Agg(AggExpr::Implode {
2252 input: expr,
2253 maintain_order: _,
2254 }) = expr.as_ref()
2255 {
2256 e = (**expr).clone().alias(name.clone());
2257 }
2258 }
2259 if group_by_keys_schema.get(&field.name).is_some() {
2262 let alias_name = format_pl_smallstr!("__POLARS_AGG_{}", field.name);
2263 e = e.alias(alias_name.clone());
2264 aliased_aggregations.insert(field.name.clone(), alias_name);
2265 }
2266 aggregation_projection.push(e);
2267 } else if !matches_group_key {
2268 if let Expr::Column(_)
2270 | Expr::Function {
2271 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
2272 ..
2273 } = e_inner
2274 {
2275 if !group_by_keys_schema.contains(&field.name) {
2276 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
2277 }
2278 }
2279 }
2280 }
2281
2282 let having_filter = if let Some(having_expr) = having {
2285 let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
2286 .iter()
2287 .filter_map(|p| match p {
2288 Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
2289 Some((inner.as_ref().clone(), name.clone()))
2290 },
2291 e @ (Expr::Agg(_) | Expr::Len) => Some((
2292 e.clone(),
2293 e.to_field(&schema_before)
2294 .map(|f| f.name)
2295 .unwrap_or_default(),
2296 )),
2297 _ => None,
2298 })
2299 .collect();
2300
2301 let mut n_having_aggs = 0;
2302 let updated_having = having_expr.map_expr(|e| {
2303 if !matches!(&e, Expr::Agg(_) | Expr::Len) {
2304 return e;
2305 }
2306 let name = agg_to_name
2307 .iter()
2308 .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
2309 .unwrap_or_else(|| {
2310 let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
2311 aggregation_projection.push(e.clone().alias(n.clone()));
2312 agg_to_name.push((e.clone(), n.clone()));
2313 n_having_aggs += 1;
2314 n
2315 });
2316 col(name)
2317 });
2318 Some(updated_having)
2319 } else {
2320 None
2321 };
2322
2323 let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
2325 if let Some(filter_expr) = having_filter {
2326 aggregated = aggregated.filter(filter_expr);
2327 }
2328
2329 let projection_schema =
2330 expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
2331 format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
2332 })?;
2333
2334 let final_projection = projection_schema
2337 .iter_names()
2338 .zip(projections.iter().zip(&projection_group_key))
2339 .map(|(name, (projection_expr, group_key))| {
2340 if let Some(expr) = projection_overrides.get(name.as_str()) {
2341 expr.clone()
2342 } else if let Some(aliased_name) = aliased_aggregations.get(name) {
2343 col(aliased_name.clone()).alias(name.clone())
2344 } else if let Some(key_name) = group_key {
2345 if key_name == name {
2348 col(name.clone())
2349 } else {
2350 col(key_name.clone()).alias(name.clone())
2351 }
2352 } else if group_by_keys_schema.get(name).is_some()
2353 || projection_aliases.contains(name.as_str())
2354 || group_key_aliases.contains(name.as_str())
2355 {
2356 if has_expr(projection_expr, |e| {
2357 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
2358 }) {
2359 col(name.clone())
2360 } else {
2361 projection_expr.clone()
2362 }
2363 } else {
2364 col(name.clone())
2365 }
2366 })
2367 .collect::<Vec<_>>();
2368
2369 let mut output_projection = final_projection;
2371 for key_name in group_by_keys_schema.iter_names() {
2372 if !projection_schema.contains(key_name) {
2373 output_projection.push(col(key_name.clone()));
2375 } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2376 let is_cross_aliased = projection_schema
2378 .iter_names()
2379 .zip(projections.iter())
2380 .any(|(name, p)| name == key_name && !is_simple_col_ref(p, key_name));
2381 if is_cross_aliased {
2382 let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2384 output_projection.push(col(key_name.clone()).alias(internal_name));
2385 }
2386 }
2387 }
2388 Ok(aggregated.select(&output_projection))
2389 }
2390
2391 fn process_limit_offset(
2392 &self,
2393 lf: LazyFrame,
2394 limit_clause: &Option<LimitClause>,
2395 fetch: &Option<Fetch>,
2396 ) -> PolarsResult<LazyFrame> {
2397 let (limit, offset) = match limit_clause {
2399 Some(LimitClause::LimitOffset {
2400 limit,
2401 offset,
2402 limit_by,
2403 }) => {
2404 if !limit_by.is_empty() {
2405 polars_bail!(SQLSyntax: "`LIMIT <n> BY <exprs>` clause is not supported");
2408 }
2409 (limit.as_ref(), offset.as_ref().map(|o| &o.value))
2410 },
2411 Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
2412 None => (None, None),
2413 };
2414
2415 let limit = match (fetch, limit) {
2417 (Some(fetch), None) => fetch.quantity.as_ref(),
2418 (Some(_), Some(_)) => {
2419 polars_bail!(SQLSyntax: "cannot use both `LIMIT` and `FETCH` in the same query")
2420 },
2421 (None, limit) => limit,
2422 };
2423
2424 match (offset, limit) {
2426 (
2427 Some(SQLExpr::Value(ValueWithSpan {
2428 value: SQLValue::Number(offset, _),
2429 ..
2430 })),
2431 Some(SQLExpr::Value(ValueWithSpan {
2432 value: SQLValue::Number(limit, _),
2433 ..
2434 })),
2435 ) => Ok(lf.slice(
2436 offset
2437 .parse()
2438 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2439 limit.parse().map_err(
2440 |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2441 )?,
2442 )),
2443 (
2444 Some(SQLExpr::Value(ValueWithSpan {
2445 value: SQLValue::Number(offset, _),
2446 ..
2447 })),
2448 None,
2449 ) => Ok(lf.slice(
2450 offset
2451 .parse()
2452 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2453 IdxSize::MAX,
2454 )),
2455 (
2456 None,
2457 Some(SQLExpr::Value(ValueWithSpan {
2458 value: SQLValue::Number(limit, _),
2459 ..
2460 })),
2461 ) => {
2462 Ok(lf.limit(limit.parse().map_err(
2463 |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2464 )?))
2465 },
2466 (None, None) => Ok(lf),
2467 _ => polars_bail!(
2468 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET/FETCH are not supported",
2469 ),
2470 }
2471 }
2472
2473 fn process_qualified_wildcard(
2474 &mut self,
2475 ObjectName(idents): &ObjectName,
2476 options: &WildcardAdditionalOptions,
2477 modifiers: &mut SelectModifiers,
2478 schema: Option<&Schema>,
2479 ) -> PolarsResult<Vec<Expr>> {
2480 let mut idents_with_wildcard: Vec<Ident> = idents
2481 .iter()
2482 .filter_map(|p| p.as_ident().cloned())
2483 .collect();
2484 idents_with_wildcard.push(Ident::new("*"));
2485
2486 let exprs = resolve_compound_identifier(self, &idents_with_wildcard, schema)?;
2487 self.process_wildcard_additional_options(exprs, options, modifiers, schema)
2488 }
2489
2490 fn process_wildcard_additional_options(
2491 &mut self,
2492 exprs: Vec<Expr>,
2493 options: &WildcardAdditionalOptions,
2494 modifiers: &mut SelectModifiers,
2495 schema: Option<&Schema>,
2496 ) -> PolarsResult<Vec<Expr>> {
2497 if options.opt_except.is_some() && options.opt_exclude.is_some() {
2498 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2499 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2500 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2501 }
2502
2503 if let Some(items) = &options.opt_exclude {
2505 match items {
2506 ExcludeSelectItem::Single(ident) => {
2507 modifiers.exclude.insert(ident.value.clone());
2508 },
2509 ExcludeSelectItem::Multiple(idents) => {
2510 modifiers
2511 .exclude
2512 .extend(idents.iter().map(|i| i.value.clone()));
2513 },
2514 };
2515 }
2516
2517 if let Some(items) = &options.opt_except {
2519 modifiers.exclude.insert(items.first_element.value.clone());
2520 modifiers
2521 .exclude
2522 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2523 }
2524
2525 if let Some(item) = &options.opt_ilike {
2527 let rx = regex::escape(item.pattern.as_str())
2528 .replace('%', ".*")
2529 .replace('_', ".");
2530
2531 modifiers.ilike = Some(
2532 polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2533 );
2534 }
2535
2536 if let Some(items) = &options.opt_rename {
2538 let renames = match items {
2539 RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2540 RenameSelectItem::Multiple(renames) => renames.as_slice(),
2541 };
2542 for rn in renames {
2543 let before = PlSmallStr::from_str(rn.ident.value.as_str());
2544 let after = PlSmallStr::from_str(rn.alias.value.as_str());
2545 if before != after {
2546 modifiers.rename.insert(before, after);
2547 }
2548 }
2549 }
2550
2551 if let Some(replacements) = &options.opt_replace {
2553 for rp in &replacements.items {
2554 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2555 modifiers
2556 .replace
2557 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2558 }
2559 }
2560 Ok(exprs)
2561 }
2562
2563 fn rename_columns_from_table_alias(
2564 &mut self,
2565 mut lf: LazyFrame,
2566 alias: &TableAlias,
2567 ) -> PolarsResult<LazyFrame> {
2568 if alias.columns.is_empty() {
2569 Ok(lf)
2570 } else {
2571 let schema = self.get_frame_schema(&mut lf)?;
2572 if alias.columns.len() != schema.len() {
2573 polars_bail!(
2574 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2575 alias.columns.len(), alias.name.value, schema.len()
2576 )
2577 } else {
2578 let existing_columns: Vec<_> = schema.iter_names().collect();
2579 let new_columns: Vec<_> =
2580 alias.columns.iter().map(|c| c.name.value.clone()).collect();
2581 Ok(lf.rename(existing_columns, new_columns, true))
2582 }
2583 }
2584 }
2585}
2586
2587impl SQLContext {
2588 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2590 Self {
2591 table_map: Arc::new(RwLock::new(table_map)),
2592 ..Default::default()
2593 }
2594 }
2595}
2596
2597fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2598 match expr {
2599 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2600 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2601 schema
2602 .iter_names()
2603 .filter(|name| re.is_match(name))
2604 .map(|name| col(name.clone()))
2605 .collect::<Vec<_>>()
2606 },
2607 Expr::Selector(s) => s
2608 .into_columns(schema, &Default::default())
2609 .unwrap()
2610 .into_iter()
2611 .map(col)
2612 .collect::<Vec<_>>(),
2613 _ => vec![expr],
2614 }
2615}
2616
2617fn is_regex_colname(nm: &str) -> bool {
2618 nm.starts_with('^') && nm.ends_with('$')
2619}
2620
2621fn get_using_cols(op: &JoinOperator) -> Option<impl Iterator<Item = String> + '_> {
2623 use JoinOperator::*;
2624 match op {
2625 Join(JoinConstraint::Using(cols))
2626 | Inner(JoinConstraint::Using(cols))
2627 | Left(JoinConstraint::Using(cols))
2628 | LeftOuter(JoinConstraint::Using(cols))
2629 | Right(JoinConstraint::Using(cols))
2630 | RightOuter(JoinConstraint::Using(cols))
2631 | FullOuter(JoinConstraint::Using(cols))
2632 | Semi(JoinConstraint::Using(cols))
2633 | Anti(JoinConstraint::Using(cols))
2634 | LeftSemi(JoinConstraint::Using(cols))
2635 | LeftAnti(JoinConstraint::Using(cols))
2636 | RightSemi(JoinConstraint::Using(cols))
2637 | RightAnti(JoinConstraint::Using(cols)) => Some(cols.iter().filter_map(|c| {
2638 c.0.first()
2639 .and_then(|p| p.as_ident())
2640 .map(|i| i.value.clone())
2641 })),
2642 _ => None,
2643 }
2644}
2645
2646pub(crate) fn get_table_name(factor: &TableFactor) -> Option<String> {
2648 match factor {
2649 TableFactor::Table { name, alias, .. } => {
2650 alias.as_ref().map(|a| a.name.value.clone()).or_else(|| {
2651 name.0
2652 .last()
2653 .and_then(|p| p.as_ident())
2654 .map(|i| i.value.clone())
2655 })
2656 },
2657 TableFactor::Derived { alias, .. }
2658 | TableFactor::NestedJoin { alias, .. }
2659 | TableFactor::TableFunction { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2660 _ => None,
2661 }
2662}
2663
2664fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2666 match expr {
2667 Expr::Column(n) => n == col_name,
2668 Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2669 _ => false,
2670 }
2671}
2672
2673fn strip_outer_alias(expr: &Expr) -> Expr {
2675 if let Expr::Alias(inner, _) = expr {
2676 inner.as_ref().clone()
2677 } else {
2678 expr.clone()
2679 }
2680}
2681
2682fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2687 if schema.contains(name) {
2689 return None;
2690 }
2691 projections.iter().find_map(|p| match p {
2693 Expr::Alias(inner, alias) if alias.as_str() == name => {
2694 Some(inner.as_ref().clone().alias(alias.clone()))
2695 },
2696 _ => None,
2697 })
2698}
2699
2700fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2702 let mut found_cols = false;
2703 let mut all_in_schema = true;
2704 for e in expr.into_iter() {
2705 if let Expr::Column(name) = e {
2706 found_cols = true;
2707 if !schema.contains(name.as_str()) {
2708 all_in_schema = false;
2709 break;
2710 }
2711 }
2712 }
2713 found_cols && all_in_schema
2714}
2715
2716fn determine_left_right_join_on(
2724 ctx: &mut SQLContext,
2725 expr_left: &SQLExpr,
2726 expr_right: &SQLExpr,
2727 tbl_left: &TableInfo,
2728 tbl_right: &TableInfo,
2729 join_schema: &Schema,
2730) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2731 let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2734 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2735 e => e,
2736 };
2737 let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2738 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2739 e => e,
2740 };
2741
2742 let left_refs = (
2746 expr_refers_to_table(expr_left, &tbl_left.name),
2747 expr_refers_to_table(expr_left, &tbl_right.name),
2748 );
2749 let right_refs = (
2750 expr_refers_to_table(expr_right, &tbl_left.name),
2751 expr_refers_to_table(expr_right, &tbl_right.name),
2752 );
2753 match (left_refs, right_refs) {
2755 ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2757 ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2759 ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2761 polars_bail!(
2762 SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2763 if left_refs.0 && left_refs.1 {
2764 "left"
2765 } else {
2766 "right"
2767 }, tbl_left.name, tbl_right.name
2768 )
2769 },
2770 _ => {},
2772 }
2773
2774 let left_on_cols_in = (
2779 expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2780 expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2781 );
2782 let right_on_cols_in = (
2783 expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2784 expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2785 );
2786 match (left_on_cols_in, right_on_cols_in) {
2787 ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2789 ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2790 ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2792 ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2793 ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2794 ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2795 _ => Ok((vec![left_on], vec![right_on])),
2797 }
2798}
2799
2800fn process_join_on(
2807 ctx: &mut SQLContext,
2808 sql_expr: &SQLExpr,
2809 tbl_left: &TableInfo,
2810 tbl_right: &TableInfo,
2811) -> PolarsResult<(Vec<Expr>, Vec<Expr>, Vec<Expr>)> {
2812 match sql_expr {
2813 SQLExpr::BinaryOp { left, op, right } => match op {
2814 SQLBinaryOperator::And => {
2815 let (mut left_i, mut right_i, mut preds_i) =
2816 process_join_on(ctx, left, tbl_left, tbl_right)?;
2817 let (mut left_j, mut right_j, mut preds_j) =
2818 process_join_on(ctx, right, tbl_left, tbl_right)?;
2819 left_i.append(&mut left_j);
2820 right_i.append(&mut right_j);
2821 preds_i.append(&mut preds_j);
2822 Ok((left_i, right_i, preds_i))
2823 },
2824 SQLBinaryOperator::Eq => {
2825 let join_schema = build_join_schema(tbl_left, tbl_right)?;
2826 let (l, r) = determine_left_right_join_on(
2827 ctx,
2828 left,
2829 right,
2830 tbl_left,
2831 tbl_right,
2832 &join_schema,
2833 )?;
2834 Ok((l, r, vec![]))
2835 },
2836 SQLBinaryOperator::Lt
2837 | SQLBinaryOperator::LtEq
2838 | SQLBinaryOperator::Gt
2839 | SQLBinaryOperator::GtEq
2840 | SQLBinaryOperator::NotEq => {
2841 let join_schema = build_join_schema(tbl_left, tbl_right)?;
2842 let suffix = format!(":{}", tbl_right.name);
2843
2844 let lhs = suffix_if_right_table(
2847 parse_sql_expr(left, ctx, Some(&join_schema))?,
2848 left,
2849 tbl_left,
2850 tbl_right,
2851 &suffix,
2852 );
2853 let rhs = suffix_if_right_table(
2854 parse_sql_expr(right, ctx, Some(&join_schema))?,
2855 right,
2856 tbl_left,
2857 tbl_right,
2858 &suffix,
2859 );
2860
2861 let polars_op = match op {
2862 SQLBinaryOperator::Lt => Operator::Lt,
2863 SQLBinaryOperator::LtEq => Operator::LtEq,
2864 SQLBinaryOperator::Gt => Operator::Gt,
2865 SQLBinaryOperator::GtEq => Operator::GtEq,
2866 SQLBinaryOperator::NotEq => Operator::NotEq,
2867 _ => unreachable!(),
2868 };
2869 let predicate = Expr::BinaryExpr {
2870 left: Arc::new(lhs),
2871 op: polars_op,
2872 right: Arc::new(rhs),
2873 };
2874 Ok((vec![], vec![], vec![predicate]))
2875 },
2876 _ => polars_bail!(
2877 SQLInterface: "unsupported join constraint operator '{:?}'", op
2878 ),
2879 },
2880 SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2881 _ => polars_bail!(
2882 SQLInterface: "unsupported join constraint expression: {:?}", sql_expr
2883 ),
2884 }
2885}
2886
2887fn build_join_schema(tbl_left: &TableInfo, tbl_right: &TableInfo) -> PolarsResult<Schema> {
2890 let mut join_schema = Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2891 for (name, dtype) in tbl_left.schema.iter() {
2892 join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2893 }
2894 for (name, dtype) in tbl_right.schema.iter() {
2895 if !join_schema.contains(name) {
2896 join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2897 }
2898 }
2899 Ok(join_schema)
2900}
2901
2902fn suffix_conflicting_columns(
2905 expr: Expr,
2906 tbl_left: &TableInfo,
2907 tbl_right: &TableInfo,
2908 suffix: &str,
2909) -> Expr {
2910 expr.map_expr(|e| match e {
2911 Expr::Column(ref name)
2912 if tbl_left.schema.contains(name.as_str())
2913 && tbl_right.schema.contains(name.as_str()) =>
2914 {
2915 Expr::Column(PlSmallStr::from_string(format!("{name}{suffix}")))
2916 },
2917 other => other,
2918 })
2919}
2920
2921fn suffix_if_right_table(
2924 expr: Expr,
2925 sql_expr: &SQLExpr,
2926 tbl_left: &TableInfo,
2927 tbl_right: &TableInfo,
2928 suffix: &str,
2929) -> Expr {
2930 let expr = match expr {
2932 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2933 e => e,
2934 };
2935
2936 let refs_left = expr_refers_to_table(sql_expr, &tbl_left.name);
2937 let refs_right = expr_refers_to_table(sql_expr, &tbl_right.name);
2938
2939 let is_right = if refs_right && !refs_left {
2940 true
2941 } else if refs_left {
2942 false
2943 } else {
2944 !expr_cols_all_in_schema(&expr, &tbl_left.schema)
2946 && expr_cols_all_in_schema(&expr, &tbl_right.schema)
2947 };
2948
2949 if is_right {
2950 suffix_conflicting_columns(expr, tbl_left, tbl_right, suffix)
2951 } else {
2952 expr
2953 }
2954}
2955
2956fn process_join_constraint(
2957 constraint: &JoinConstraint,
2958 tbl_left: &TableInfo,
2959 tbl_right: &TableInfo,
2960 ctx: &mut SQLContext,
2961) -> PolarsResult<(Vec<Expr>, Vec<Expr>, Vec<Expr>)> {
2962 match constraint {
2963 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2964 process_join_on(ctx, expr, tbl_left, tbl_right)
2965 },
2966 JoinConstraint::Using(idents) if !idents.is_empty() => {
2967 let using: Vec<Expr> = idents
2968 .iter()
2969 .map(|ObjectName(parts)| {
2970 if parts.len() != 1 {
2971 polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2972 }
2973 match parts[0].as_ident() {
2974 Some(ident) => Ok(col(ident.value.as_str())),
2975 None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2976 }
2977 })
2978 .collect::<PolarsResult<Vec<_>>>()?;
2979 Ok((using.clone(), using, vec![]))
2980 },
2981 JoinConstraint::Natural => {
2982 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2983 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2984 let on: Vec<Expr> = left_names
2985 .intersection(&right_names)
2986 .map(|&name| col(name.clone()))
2987 .collect();
2988 if on.is_empty() {
2989 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2990 }
2991 Ok((on.clone(), on, vec![]))
2992 },
2993 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2994 }
2995}
2996
2997fn flatten_and_conditions(expr: &SQLExpr) -> Vec<&SQLExpr> {
2999 match expr {
3000 SQLExpr::BinaryOp {
3001 left,
3002 op: SQLBinaryOperator::And,
3003 right,
3004 } => {
3005 let mut conditions = flatten_and_conditions(left);
3006 conditions.extend(flatten_and_conditions(right));
3007 conditions
3008 },
3009 SQLExpr::Nested(inner) => flatten_and_conditions(inner),
3010 _ => vec![expr],
3011 }
3012}
3013
3014fn combine_and_conditions(conditions: Vec<SQLExpr>) -> Option<SQLExpr> {
3016 conditions
3017 .into_iter()
3018 .reduce(|left, right| SQLExpr::BinaryOp {
3019 left: Box::new(left),
3020 op: SQLBinaryOperator::And,
3021 right: Box::new(right),
3022 })
3023}
3024
3025fn is_join_comparison(expr: &SQLExpr, left_tables: &[String], right_table: &str) -> bool {
3028 if let SQLExpr::BinaryOp {
3029 left,
3030 op:
3031 SQLBinaryOperator::Eq
3032 | SQLBinaryOperator::Lt
3033 | SQLBinaryOperator::LtEq
3034 | SQLBinaryOperator::Gt
3035 | SQLBinaryOperator::GtEq
3036 | SQLBinaryOperator::NotEq,
3037 right,
3038 } = expr
3039 {
3040 let left_refs_right = expr_refers_to_table(left, right_table);
3041 let right_refs_right = expr_refers_to_table(right, right_table);
3042
3043 let left_refs_any_left = left_tables
3044 .iter()
3045 .any(|t| expr_refers_to_table(left, t.as_str()));
3046 let right_refs_any_left = left_tables
3047 .iter()
3048 .any(|t| expr_refers_to_table(right, t.as_str()));
3049
3050 (left_refs_right && right_refs_any_left) || (right_refs_right && left_refs_any_left)
3052 } else {
3053 false
3054 }
3055}
3056
3057fn extract_join_predicates(
3060 where_expr: &Option<SQLExpr>,
3061 left_tables: &[String],
3062 right_table: &str,
3063) -> (Option<SQLExpr>, Option<SQLExpr>) {
3064 let Some(expr) = where_expr else {
3065 return (None, None);
3066 };
3067 let conditions = flatten_and_conditions(expr);
3068 let mut join_conds = Vec::new();
3069 let mut filter_conds = Vec::new();
3070 for cond in conditions {
3071 if is_join_comparison(cond, left_tables, right_table) {
3072 join_conds.push(cond.clone());
3073 } else {
3074 filter_conds.push(cond.clone());
3075 }
3076 }
3077 (
3078 combine_and_conditions(join_conds),
3079 combine_and_conditions(filter_conds),
3080 )
3081}
3082
3083pub fn extract_table_identifiers(
3087 query: &str,
3088 include_schema: bool,
3089 unique: bool,
3090) -> PolarsResult<Vec<String>> {
3091 let mut parser = Parser::new(&GenericDialect);
3092 parser = parser.with_options(ParserOptions {
3093 trailing_commas: true,
3094 ..Default::default()
3095 });
3096 let ast = parser
3097 .try_with_sql(query)
3098 .map_err(to_sql_interface_err)?
3099 .parse_statements()
3100 .map_err(to_sql_interface_err)?;
3101
3102 let mut collector = TableIdentifierCollector {
3103 include_schema,
3104 ..Default::default()
3105 };
3106 for stmt in &ast {
3107 let _ = stmt.visit(&mut collector);
3108 }
3109 Ok(if unique {
3110 collector
3111 .tables
3112 .into_iter()
3113 .collect::<PlIndexSet<_>>()
3114 .into_iter()
3115 .collect()
3116 } else {
3117 collector.tables
3118 })
3119}
3120
3121bitflags::bitflags! {
3122 #[derive(PartialEq)]
3127 struct ExprSqlProjectionHeightBehavior: u8 {
3128 const MaintainsColumn = 1 << 0;
3130 const Independent = 1 << 1;
3134 const InheritsContext = 1 << 2;
3137 }
3138}
3139
3140impl ExprSqlProjectionHeightBehavior {
3141 fn identify_from_expr(expr: &Expr) -> Self {
3142 let mut has_column = false;
3143 let mut has_independent = false;
3144
3145 for e in expr.into_iter() {
3146 use Expr::*;
3147 has_column |= matches!(e, Column(_) | Selector(_));
3148 has_independent |= match e {
3149 AnonymousFunction { options, .. } => {
3151 options.returns_scalar() || !options.is_length_preserving()
3152 },
3153 Literal(v) => !v.is_scalar(),
3154 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
3155 Agg { .. } | Len => true,
3156 _ => false,
3157 }
3158 }
3159 if has_independent {
3160 Self::Independent
3161 } else if has_column {
3162 Self::MaintainsColumn
3163 } else {
3164 Self::InheritsContext
3165 }
3166 }
3167}