1use std::ops::Deref;
2use std::sync::RwLock;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::aliases::{PlHashSet, PlIndexSet};
11use polars_utils::format_pl_smallstr;
12use sqlparser::ast::{
13 BinaryOperator as SQLBinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct,
14 ExcludeSelectItem, Expr as SQLExpr, Fetch, FromTable, FunctionArg, GroupByExpr, Ident,
15 JoinConstraint, JoinOperator, LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName,
16 ObjectType, OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectFlavor, SelectItem,
17 SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
18 TableFactor, TableWithJoins, Truncate, UnaryOperator as SQLUnaryOperator, Value as SQLValue,
19 ValueWithSpan, Values, Visit, WildcardAdditionalOptions, WindowSpec,
20};
21use sqlparser::dialect::GenericDialect;
22use sqlparser::parser::{Parser, ParserOptions};
23
24use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
25use crate::sql_expr::{
26 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
27};
28use crate::sql_visitors::{
29 QualifyExpression, TableIdentifierCollector, check_for_ambiguous_column_refs,
30 expr_has_window_functions, expr_refers_to_table,
31};
32use crate::table_functions::PolarsTableFunctions;
33use crate::types::map_sql_dtype_to_polars;
34
35#[derive(Clone)]
36pub struct TableInfo {
37 pub(crate) frame: LazyFrame,
38 pub(crate) name: PlSmallStr,
39 pub(crate) schema: Arc<Schema>,
40}
41
42struct SelectModifiers {
43 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
48impl SelectModifiers {
49 fn matches_ilike(&self, s: &str) -> bool {
50 match &self.ilike {
51 Some(rx) => rx.is_match(s),
52 None => true,
53 }
54 }
55 fn renamed_cols(&self) -> Vec<Expr> {
56 self.rename
57 .iter()
58 .map(|(before, after)| col(before.clone()).alias(after.clone()))
59 .collect()
60 }
61}
62
63enum ProjectionItem {
65 QualifiedExprs(PlSmallStr, Vec<Expr>),
66 Exprs(Vec<Expr>),
67}
68
69fn expr_output_name(expr: &Expr) -> Option<&PlSmallStr> {
71 match expr {
72 Expr::Column(name) | Expr::Alias(_, name) => Some(name),
73 _ => None,
74 }
75}
76
77fn disambiguate_projection_cols(
79 items: Vec<ProjectionItem>,
80 schema: &Schema,
81) -> PolarsResult<Vec<Expr>> {
82 let mut qualified_wildcard_names: PlHashMap<PlSmallStr, usize> = PlHashMap::new();
84 let mut other_names: PlHashSet<PlSmallStr> = PlHashSet::new();
85 for item in &items {
86 match item {
87 ProjectionItem::QualifiedExprs(_, exprs) => {
88 for expr in exprs {
89 if let Some(name) = expr_output_name(expr) {
90 *qualified_wildcard_names.entry(name.clone()).or_insert(0) += 1;
91 }
92 }
93 },
94 ProjectionItem::Exprs(exprs) => {
95 for expr in exprs {
96 if let Some(name) = expr_output_name(expr) {
97 other_names.insert(name.clone());
98 }
99 }
100 },
101 }
102 }
103
104 let needs_suffix: PlHashSet<PlSmallStr> = qualified_wildcard_names
106 .into_iter()
107 .filter(|(name, count)| *count > 1 || other_names.contains(name))
108 .map(|(name, _)| name)
109 .collect();
110
111 let mut result: Vec<Expr> = Vec::new();
113 for item in items {
114 match item {
115 ProjectionItem::QualifiedExprs(tbl_name, exprs) if !needs_suffix.is_empty() => {
116 for expr in exprs {
117 if let Some(name) = expr_output_name(&expr) {
118 if needs_suffix.contains(name) {
119 let suffixed = format_pl_smallstr!("{}:{}", name, tbl_name);
120 if schema.contains(suffixed.as_str()) {
121 result.push(col(suffixed));
122 continue;
123 }
124 if other_names.contains(name) {
125 polars_bail!(
126 SQLInterface:
127 "column '{}' is duplicated in the SELECT (explicitly, and via the `*` wildcard)", name
128 );
129 }
130 }
131 }
132 result.push(expr);
133 }
134 },
135 ProjectionItem::QualifiedExprs(_, exprs) | ProjectionItem::Exprs(exprs) => {
136 result.extend(exprs);
137 },
138 }
139 }
140 Ok(result)
141}
142
143#[derive(Clone)]
145pub struct SQLContext {
146 pub(crate) table_map: Arc<RwLock<PlHashMap<String, LazyFrame>>>,
147 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
148 pub(crate) lp_arena: Arena<IR>,
149 pub(crate) expr_arena: Arena<AExpr>,
150
151 cte_map: PlHashMap<String, LazyFrame>,
152 table_aliases: PlHashMap<String, String>,
153 joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
154 pub(crate) named_windows: PlHashMap<String, WindowSpec>,
155}
156
157impl Default for SQLContext {
158 fn default() -> Self {
159 Self {
160 function_registry: Arc::new(DefaultFunctionRegistry {}),
161 table_map: Default::default(),
162 cte_map: Default::default(),
163 table_aliases: Default::default(),
164 joined_aliases: Default::default(),
165 named_windows: Default::default(),
166 lp_arena: Default::default(),
167 expr_arena: Default::default(),
168 }
169 }
170}
171
172impl SQLContext {
173 pub fn new() -> Self {
181 Self::default()
182 }
183
184 pub fn get_tables(&self) -> Vec<String> {
186 let mut tables = Vec::from_iter(self.table_map.read().unwrap().keys().cloned());
187 tables.sort_unstable();
188 tables
189 }
190
191 pub fn register(&self, name: &str, lf: LazyFrame) {
207 self.table_map.write().unwrap().insert(name.to_owned(), lf);
208 }
209
210 pub fn unregister(&self, name: &str) {
212 self.table_map.write().unwrap().remove(&name.to_owned());
213 }
214
215 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
234 let mut parser = Parser::new(&GenericDialect);
235 parser = parser.with_options(ParserOptions {
236 trailing_commas: true,
237 ..Default::default()
238 });
239
240 let ast = parser
241 .try_with_sql(query)
242 .map_err(to_sql_interface_err)?
243 .parse_statements()
244 .map_err(to_sql_interface_err)?;
245
246 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
247 let res = self.execute_statement(ast.first().unwrap())?;
248
249 let lp_arena = std::mem::take(&mut self.lp_arena);
252 let expr_arena = std::mem::take(&mut self.expr_arena);
253 res.set_cached_arena(lp_arena, expr_arena);
254
255 self.cte_map.clear();
257 self.table_aliases.clear();
258 self.joined_aliases.clear();
259 self.named_windows.clear();
260
261 Ok(res)
262 }
263
264 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
267 self.function_registry = function_registry;
268 self
269 }
270
271 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
273 &self.function_registry
274 }
275
276 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
278 Arc::get_mut(&mut self.function_registry).unwrap()
279 }
280}
281
282impl SQLContext {
283 fn isolated(&self) -> Self {
284 Self {
285 table_map: Arc::new(RwLock::new(self.table_map.read().unwrap().clone())),
287 named_windows: self.named_windows.clone(),
288 cte_map: self.cte_map.clone(),
289
290 ..Default::default()
291 }
292 }
293
294 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
295 let ast = stmt;
296 Ok(match ast {
297 Statement::Query(query) => self.execute_query(query)?,
298 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
299 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
300 stmt @ Statement::Drop {
301 object_type: ObjectType::Table,
302 ..
303 } => self.execute_drop_table(stmt)?,
304 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
305 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
306 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
307 _ => polars_bail!(
308 SQLInterface: "statement type is not supported:\n{:?}", ast,
309 ),
310 })
311 }
312
313 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
314 self.register_ctes(query)?;
315 self.execute_query_no_ctes(query)
316 }
317
318 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
319 self.validate_query(query)?;
320
321 let lf = self.process_query(&query.body, query)?;
322 self.process_limit_offset(lf, &query.limit_clause, &query.fetch)
323 }
324
325 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
326 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
327 }
328
329 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
330 self.table_map
334 .read()
335 .unwrap()
336 .get(name)
337 .cloned()
338 .or_else(|| self.cte_map.get(name).cloned())
339 .or_else(|| {
340 self.table_aliases.get(name).and_then(|alias| {
341 self.table_map
342 .read()
343 .unwrap()
344 .get(alias.as_str())
345 .or_else(|| self.cte_map.get(alias.as_str()))
346 .cloned()
347 })
348 })
349 }
350
351 pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<LazyFrame>
355 where
356 F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
357 {
358 let mut ctx = self.isolated();
359
360 let lf = query(&mut ctx)?;
362
363 lf.set_cached_arena(ctx.lp_arena, ctx.expr_arena);
365
366 Ok(lf)
367 }
368
369 fn expr_or_ordinal(
370 &mut self,
371 e: &SQLExpr,
372 exprs: &[Expr],
373 selected: Option<&[Expr]>,
374 schema: Option<&Schema>,
375 clause: &str,
376 ) -> PolarsResult<Expr> {
377 match e {
378 SQLExpr::UnaryOp {
379 op: SQLUnaryOperator::Minus,
380 expr,
381 } if matches!(
382 **expr,
383 SQLExpr::Value(ValueWithSpan {
384 value: SQLValue::Number(_, _),
385 ..
386 })
387 ) =>
388 {
389 if let SQLExpr::Value(ValueWithSpan {
390 value: SQLValue::Number(ref idx, _),
391 ..
392 }) = **expr
393 {
394 Err(polars_err!(
395 SQLSyntax:
396 "negative ordinal values are invalid for {}; found -{}",
397 clause,
398 idx
399 ))
400 } else {
401 unreachable!()
402 }
403 },
404 SQLExpr::Value(ValueWithSpan {
405 value: SQLValue::Number(idx, _),
406 ..
407 }) => {
408 let idx = idx.parse::<usize>().map_err(|_| {
410 polars_err!(
411 SQLSyntax:
412 "negative ordinal values are invalid for {}; found {}",
413 clause,
414 idx
415 )
416 })?;
417 let cols = if let Some(cols) = selected {
420 cols
421 } else {
422 exprs
423 };
424 Ok(cols
425 .get(idx - 1)
426 .ok_or_else(|| {
427 polars_err!(
428 SQLInterface:
429 "{} ordinal value must refer to a valid column; found {}",
430 clause,
431 idx
432 )
433 })?
434 .clone())
435 },
436 SQLExpr::Value(v) => Err(polars_err!(
437 SQLSyntax:
438 "{} requires a valid expression or positive ordinal; found {}", clause, v,
439 )),
440 _ => {
441 let mut expr = parse_sql_expr(e, self, schema)?;
444 if matches!(e, SQLExpr::CompoundIdentifier(_)) {
445 if let Some(schema) = schema {
446 expr = expr.map_expr(|ex| match &ex {
447 Expr::Column(name) => {
448 let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
449 if schema.contains(prefixed.as_str()) {
450 col(prefixed)
451 } else {
452 ex
453 }
454 },
455 _ => ex,
456 });
457 }
458 }
459 Ok(expr)
460 },
461 }
462 }
463
464 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
465 if let Some(aliases) = self.joined_aliases.get(tbl_name) {
466 if let Some(name) = aliases.get(column_name) {
467 return name.to_string();
468 }
469 }
470 column_name.to_string()
471 }
472
473 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
474 match expr {
475 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
476 SetExpr::Query(nested_query) => {
477 let lf = self.execute_query_no_ctes(nested_query)?;
478 self.process_order_by(lf, &query.order_by, None)
479 },
480 SetExpr::SetOperation {
481 op: SetOperator::Union,
482 set_quantifier,
483 left,
484 right,
485 } => self.process_union(left, right, set_quantifier, query),
486
487 #[cfg(feature = "semi_anti_join")]
488 SetExpr::SetOperation {
489 op: SetOperator::Intersect | SetOperator::Except,
490 set_quantifier,
491 left,
492 right,
493 } => self.process_except_intersect(left, right, set_quantifier, query),
494
495 SetExpr::Values(Values {
496 explicit_row: _,
497 rows,
498 value_keyword: _,
499 }) => self.process_values(rows),
500
501 SetExpr::Table(tbl) => {
502 if let Some(table_name) = tbl.table_name.as_ref() {
503 self.get_table_from_current_scope(table_name)
504 .ok_or_else(|| {
505 polars_err!(
506 SQLInterface: "no table or alias named '{}' found",
507 tbl
508 )
509 })
510 } else {
511 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
512 }
513 },
514 op => {
515 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
516 },
517 }
518 }
519
520 #[cfg(feature = "semi_anti_join")]
521 fn process_except_intersect(
522 &mut self,
523 left: &SetExpr,
524 right: &SetExpr,
525 quantifier: &SetQuantifier,
526 query: &Query,
527 ) -> PolarsResult<LazyFrame> {
528 let (join_type, op_name) = match *query.body {
529 SetExpr::SetOperation {
530 op: SetOperator::Except,
531 ..
532 } => (JoinType::Anti, "EXCEPT"),
533 _ => (JoinType::Semi, "INTERSECT"),
534 };
535
536 let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
539 let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
540 let lf_schema = self.get_frame_schema(&mut lf)?;
541
542 let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
543 let rf_cols = match quantifier {
544 SetQuantifier::ByName => None,
545 SetQuantifier::Distinct | SetQuantifier::None => {
546 let rf_schema = self.get_frame_schema(&mut rf)?;
547 let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
548 if lf_cols.len() != rf_cols.len() {
549 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
550 }
551 Some(rf_cols)
552 },
553 _ => {
554 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
555 },
556 };
557 let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
558 let joined_tbl = match rf_cols {
559 Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
560 None => join.on(lf_cols).finish(),
561 };
562 let lf = joined_tbl.unique(None, UniqueKeepStrategy::Any);
563 self.process_order_by(lf, &query.order_by, None)
564 }
565
566 fn process_union(
567 &mut self,
568 left: &SetExpr,
569 right: &SetExpr,
570 quantifier: &SetQuantifier,
571 query: &Query,
572 ) -> PolarsResult<LazyFrame> {
573 let quantifier = *quantifier;
574
575 let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
578 let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
579
580 let opts = UnionArgs {
581 parallel: true,
582 to_supertypes: true,
583 maintain_order: false,
584 ..Default::default()
585 };
586 let lf = match quantifier {
587 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
589 let lf_schema = self.get_frame_schema(&mut lf)?;
590 let rf_schema = self.get_frame_schema(&mut rf)?;
591 if lf_schema.len() != rf_schema.len() {
592 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
593 }
594 if lf_schema.iter_names().ne(rf_schema.iter_names()) {
597 rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
598 }
599 let concatenated = concat(vec![lf, rf], opts);
600 match quantifier {
601 SetQuantifier::Distinct | SetQuantifier::None => {
602 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
603 },
604 _ => concatenated,
605 }
606 },
607 #[cfg(feature = "diagonal_concat")]
609 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
610 #[cfg(feature = "diagonal_concat")]
612 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
613 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
614 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
615 },
616 #[allow(unreachable_patterns)]
617 _ => {
618 polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
619 },
620 }?;
621
622 self.process_order_by(lf, &query.order_by, None)
623 }
624
625 fn process_unnest_lateral(
628 &self,
629 lf: LazyFrame,
630 alias: &Option<TableAlias>,
631 array_exprs: &[SQLExpr],
632 with_offset: bool,
633 ) -> PolarsResult<LazyFrame> {
634 let alias = alias
635 .as_ref()
636 .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
637 polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
638
639 let (mut explode_cols, mut rename_from, mut rename_to) = (
640 Vec::with_capacity(array_exprs.len()),
641 Vec::with_capacity(array_exprs.len()),
642 Vec::with_capacity(array_exprs.len()),
643 );
644 let is_single_col = array_exprs.len() == 1;
645
646 for (i, arr_expr) in array_exprs.iter().enumerate() {
647 let col_name = match arr_expr {
648 SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
649 SQLExpr::CompoundIdentifier(parts) => {
650 PlSmallStr::from_str(&parts.last().unwrap().value)
651 },
652 SQLExpr::Array(_) => polars_bail!(
653 SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
654 ),
655 other => polars_bail!(
656 SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
657 ),
658 };
659 if let Some(name) = alias
661 .columns
662 .get(i)
663 .map(|c| c.name.value.as_str())
664 .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
665 .filter(|name| !name.is_empty() && *name != col_name.as_str())
666 {
667 rename_from.push(col_name.clone());
668 rename_to.push(PlSmallStr::from_str(name));
669 }
670 explode_cols.push(col_name);
671 }
672
673 let mut lf = lf.explode(
674 Selector::ByName {
675 names: Arc::from(explode_cols),
676 strict: true,
677 },
678 ExplodeOptions {
679 empty_as_null: true,
680 keep_nulls: true,
681 },
682 );
683 if !rename_from.is_empty() {
684 lf = lf.rename(rename_from, rename_to, true);
685 }
686 Ok(lf)
687 }
688
689 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
690 let frame_rows: Vec<Row> = values.iter().map(|row| {
691 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
692 let expr = parse_sql_expr(expr, self, None)?;
693 match expr {
694 Expr::Literal(value) => {
695 value.to_any_value()
696 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
697 .map(|av| av.into_static())
698 },
699 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
700 }
701 }).collect();
702 row_data.map(Row::new)
703 }).collect::<Result<_, _>>()?;
704
705 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
706 }
707
708 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
710 match stmt {
711 Statement::Explain { statement, .. } => {
712 let lf = self.execute_statement(statement)?;
713 let plan = lf.describe_optimized_plan()?;
714 let plan = plan
715 .split('\n')
716 .collect::<Series>()
717 .with_name(PlSmallStr::from_static("Logical Plan"))
718 .into_column();
719 let df = DataFrame::new_infer_height(vec![plan])?;
720 Ok(df.lazy())
721 },
722 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
723 }
724 }
725
726 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
728 let tables = Column::new("name".into(), self.get_tables());
729 let df = DataFrame::new_infer_height(vec![tables])?;
730 Ok(df.lazy())
731 }
732
733 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
735 match stmt {
736 Statement::Drop { names, .. } => {
737 names.iter().for_each(|name| {
738 self.table_map.write().unwrap().remove(&name.to_string());
739 });
740 Ok(DataFrame::empty().lazy())
741 },
742 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
743 }
744 }
745
746 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
748 if let Statement::Delete(Delete {
749 tables,
750 from,
751 using,
752 selection,
753 returning,
754 order_by,
755 limit,
756 delete_token: _,
757 }) = stmt
758 {
759 let error_message: Option<&'static str> = if !tables.is_empty() {
760 Some("DELETE expects exactly one table name")
761 } else if using.is_some() {
762 Some("DELETE does not support the USING clause")
763 } else if returning.is_some() {
764 Some("DELETE does not support the RETURNING clause")
765 } else if limit.is_some() {
766 Some("DELETE does not support the LIMIT clause")
767 } else if !order_by.is_empty() {
768 Some("DELETE does not support the ORDER BY clause")
769 } else {
770 None
771 };
772
773 if let Some(msg) = error_message {
774 polars_bail!(SQLInterface: msg);
775 }
776
777 let from_tables = match &from {
778 FromTable::WithFromKeyword(from) => from,
779 FromTable::WithoutKeyword(from) => from,
780 };
781 if from_tables.len() > 1 {
782 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
783 }
784 let tbl_expr = from_tables.first().unwrap();
785 if !tbl_expr.joins.is_empty() {
786 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
787 }
788 let (_, lf) = self.get_table(&tbl_expr.relation)?;
789 if selection.is_none() {
790 Ok(lf.clear())
792 } else {
793 Ok(self.process_where(lf.clone(), selection, true, None)?)
795 }
796 } else {
797 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
798 }
799 }
800
801 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
803 if let Statement::Truncate(Truncate {
804 table_names,
805 partitions,
806 ..
807 }) = stmt
808 {
809 match partitions {
810 None => {
811 if table_names.len() != 1 {
812 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
813 }
814 let tbl = table_names[0].name.to_string();
815 if let Some(lf) = self.table_map.write().unwrap().get_mut(&tbl) {
816 *lf = lf.clone().clear();
817 Ok(lf.clone())
818 } else {
819 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
820 }
821 },
822 _ => {
823 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
824 },
825 }
826 } else {
827 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
828 }
829 }
830
831 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
832 self.cte_map.insert(name.to_owned(), lf);
833 }
834
835 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
836 if let Some(with) = &query.with {
837 if with.recursive {
838 polars_bail!(SQLInterface: "recursive CTEs are not supported")
839 }
840 for cte in &with.cte_tables {
841 let cte_name = cte.alias.name.value.clone();
842 let mut lf = self.execute_query(&cte.query)?;
843 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
844 self.register_cte(&cte_name, lf);
845 }
846 }
847 Ok(())
848 }
849
850 fn register_named_windows(
851 &mut self,
852 named_windows: &[NamedWindowDefinition],
853 ) -> PolarsResult<()> {
854 for NamedWindowDefinition(name, expr) in named_windows {
855 let spec = match expr {
856 NamedWindowExpr::NamedWindow(ref_name) => self
857 .named_windows
858 .get(&ref_name.value)
859 .ok_or_else(|| {
860 polars_err!(
861 SQLInterface:
862 "named window '{}' references undefined window '{}'",
863 name.value, ref_name.value
864 )
865 })?
866 .clone(),
867 NamedWindowExpr::WindowSpec(spec) => spec.clone(),
868 };
869 self.named_windows.insert(name.value.clone(), spec);
870 }
871 Ok(())
872 }
873
874 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
876 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
877 if !tbl_expr.joins.is_empty() {
878 for join in &tbl_expr.joins {
879 if let (
881 JoinOperator::CrossJoin(JoinConstraint::None),
882 TableFactor::UNNEST {
883 alias,
884 array_exprs,
885 with_offset,
886 ..
887 },
888 ) = (&join.join_operator, &join.relation)
889 {
890 if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
891 lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
892 continue;
893 }
894 }
895
896 let (r_name, mut rf) = self.get_table(&join.relation)?;
897 if r_name.is_empty() {
898 polars_bail!(
900 SQLInterface:
901 "cannot JOIN on unnamed relation; please provide an alias"
902 )
903 }
904 let left_schema = self.get_frame_schema(&mut lf)?;
905 let right_schema = self.get_frame_schema(&mut rf)?;
906
907 lf = match &join.join_operator {
908 op @ (JoinOperator::Join(constraint) | JoinOperator::FullOuter(constraint)
910 | JoinOperator::Left(constraint)
911 | JoinOperator::LeftOuter(constraint)
912 | JoinOperator::Right(constraint)
913 | JoinOperator::RightOuter(constraint)
914 | JoinOperator::Inner(constraint)
915 | JoinOperator::Anti(constraint)
916 | JoinOperator::Semi(constraint)
917 | JoinOperator::LeftAnti(constraint)
918 | JoinOperator::LeftSemi(constraint)
919 | JoinOperator::RightAnti(constraint)
920 | JoinOperator::RightSemi(constraint)) => {
921 let (lf, rf) = match op {
922 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
923 _ => (lf, rf),
924 };
925 self.process_join(
926 &TableInfo {
927 frame: lf,
928 name: (&l_name).into(),
929 schema: left_schema.clone(),
930 },
931 &TableInfo {
932 frame: rf,
933 name: (&r_name).into(),
934 schema: right_schema.clone(),
935 },
936 constraint,
937 match op {
938 JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
939 JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
940 JoinType::Left
941 },
942 JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
943 JoinType::Right
944 },
945 JoinOperator::FullOuter(_) => JoinType::Full,
946 #[cfg(feature = "semi_anti_join")]
947 JoinOperator::Anti(_)
948 | JoinOperator::LeftAnti(_)
949 | JoinOperator::RightAnti(_) => JoinType::Anti,
950 #[cfg(feature = "semi_anti_join")]
951 JoinOperator::Semi(_)
952 | JoinOperator::LeftSemi(_)
953 | JoinOperator::RightSemi(_) => JoinType::Semi,
954 join_type => polars_bail!(
955 SQLInterface:
956 "join type '{:?}' not currently supported",
957 join_type
958 ),
959 },
960 )?
961 },
962 JoinOperator::CrossJoin(JoinConstraint::None) => {
963 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
964 },
965 JoinOperator::CrossJoin(constraint) => {
966 polars_bail!(
967 SQLInterface:
968 "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
969 constraint
970 )
971 },
972 join_type => {
973 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
974 },
975 };
976
977 let joined_schema = self.get_frame_schema(&mut lf)?;
979
980 self.joined_aliases.insert(
981 r_name.clone(),
982 right_schema
983 .iter_names()
984 .filter_map(|name| {
985 let aliased_name = format!("{name}:{r_name}");
987 if left_schema.contains(name)
988 && joined_schema.contains(aliased_name.as_str())
989 {
990 Some((name.to_string(), aliased_name))
991 } else {
992 None
993 }
994 })
995 .collect::<PlHashMap<String, String>>(),
996 );
997 }
998 };
999 Ok(lf)
1000 }
1001
1002 fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
1004 let Select {
1007 distinct: _,
1009 from: _,
1010 group_by: _,
1011 having: _,
1012 named_window: _,
1013 projection: _,
1014 qualify: _,
1015 selection: _,
1016
1017 flavor: _,
1019 select_token: _,
1020 top_before_distinct: _,
1021 window_before_qualify: _,
1022
1023 ref cluster_by,
1025 ref connect_by,
1026 ref distribute_by,
1027 ref exclude,
1028 ref into,
1029 ref lateral_views,
1030 ref prewhere,
1031 ref sort_by,
1032 ref top,
1033 ref value_table_mode,
1034 } = *select_stmt;
1035
1036 polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
1038 polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
1039 polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
1040 polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
1041 polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO` clause is not supported");
1042 polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
1043 polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
1044 polars_ensure!(sort_by.is_empty(), SQLInterface: "`SORT BY` clause is not supported; use `ORDER BY` instead");
1045 polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
1046 polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
1047
1048 Ok(())
1049 }
1050
1051 fn validate_query(&self, query: &Query) -> PolarsResult<()> {
1053 let Query {
1055 with: _,
1057 body: _,
1058 order_by: _,
1059 limit_clause: _,
1060 fetch,
1061
1062 for_clause,
1064 format_clause,
1065 locks,
1066 pipe_operators,
1067 settings,
1068 } = query;
1069
1070 polars_ensure!(for_clause.is_none(), SQLInterface: "`FOR` clause is not supported");
1072 polars_ensure!(format_clause.is_none(), SQLInterface: "`FORMAT` clause is not supported");
1073 polars_ensure!(locks.is_empty(), SQLInterface: "`FOR UPDATE/SHARE` locking clause is not supported");
1074 polars_ensure!(pipe_operators.is_empty(), SQLInterface: "pipe operators are not supported");
1075 polars_ensure!(settings.is_none(), SQLInterface: "`SETTINGS` clause is not supported");
1076
1077 if let Some(Fetch {
1079 quantity: _, percent,
1081 with_ties,
1082 }) = fetch
1083 {
1084 polars_ensure!(!percent, SQLInterface: "`FETCH` with `PERCENT` is not supported");
1085 polars_ensure!(!with_ties, SQLInterface: "`FETCH` with `WITH TIES` is not supported");
1086 }
1087 Ok(())
1088 }
1089
1090 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
1092 self.validate_select(select_stmt)?;
1094
1095 self.register_named_windows(&select_stmt.named_window)?;
1097
1098 let (mut lf, base_table_name) = if select_stmt.from.is_empty() {
1100 (DataFrame::empty().lazy(), None)
1101 } else {
1102 let from = select_stmt.clone().from;
1105 if from.len() > 1 {
1106 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
1107 }
1108 let tbl_expr = from.first().unwrap();
1109 let lf = self.execute_from_statement(tbl_expr)?;
1110 let base_name = get_table_name(&tbl_expr.relation);
1111 (lf, base_name)
1112 };
1113
1114 if let Some(ref base_name) = base_table_name {
1116 if !self.joined_aliases.is_empty() {
1117 let using_cols: PlHashSet<String> = select_stmt
1119 .from
1120 .first()
1121 .into_iter()
1122 .flat_map(|t| t.joins.iter())
1123 .filter_map(|join| get_using_cols(&join.join_operator))
1124 .flatten()
1125 .collect();
1126
1127 let check_expr = |e| {
1129 check_for_ambiguous_column_refs(e, &self.joined_aliases, base_name, &using_cols)
1130 };
1131 for item in &select_stmt.projection {
1132 match item {
1133 SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
1134 check_expr(e)?
1135 },
1136 _ => {},
1137 }
1138 }
1139 if let Some(ref where_expr) = select_stmt.selection {
1140 check_expr(where_expr)?;
1141 }
1142 }
1143 }
1144
1145 let mut schema = self.get_frame_schema(&mut lf)?;
1147 lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
1148
1149 let mut select_modifiers = SelectModifiers {
1151 ilike: None,
1152 exclude: PlHashSet::new(),
1153 rename: PlHashMap::new(),
1154 replace: vec![],
1155 };
1156
1157 let window_fn_columns = if select_stmt.qualify.is_some() {
1160 select_stmt
1161 .projection
1162 .iter()
1163 .filter_map(|item| match item {
1164 SelectItem::ExprWithAlias { expr, alias }
1165 if expr_has_window_functions(expr) =>
1166 {
1167 Some(alias.value.clone())
1168 },
1169 _ => None,
1170 })
1171 .collect::<PlHashSet<_>>()
1172 } else {
1173 PlHashSet::new()
1174 };
1175
1176 let mut projections =
1177 self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1178
1179 let mut explode_names = Vec::new();
1181 let mut explode_exprs = Vec::new();
1182 let mut explode_lookup = PlHashMap::new();
1183
1184 for expr in &projections {
1185 for e in expr {
1186 if let Expr::Explode { input, .. } = e {
1187 match input.as_ref() {
1188 Expr::Column(name) => explode_names.push(name.clone()),
1189 other_expr => {
1190 if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1192 let temp_name =
1193 format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1194 explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1195 explode_lookup.insert(other_expr.clone(), temp_name.clone());
1196 explode_names.push(temp_name);
1197 }
1198 },
1199 }
1200 }
1201 }
1202 }
1203 if !explode_names.is_empty() {
1204 if !explode_exprs.is_empty() {
1205 lf = lf.with_columns(explode_exprs);
1206 }
1207 lf = lf.explode(
1208 Selector::ByName {
1209 names: Arc::from(explode_names),
1210 strict: true,
1211 },
1212 ExplodeOptions {
1213 empty_as_null: true,
1214 keep_nulls: true,
1215 },
1216 );
1217 projections = projections
1218 .into_iter()
1219 .map(|p| {
1220 p.map_expr(|e| match e {
1222 Expr::Explode { input, .. } => explode_lookup
1223 .get(input.as_ref())
1224 .map(|name| Expr::Column(name.clone()))
1225 .unwrap_or_else(|| input.as_ref().clone()),
1226 _ => e,
1227 })
1228 })
1229 .collect();
1230
1231 schema = self.get_frame_schema(&mut lf)?;
1232 }
1233
1234 let mut group_by_keys: Vec<Expr> = Vec::new();
1236 match &select_stmt.group_by {
1237 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1239 if !modifiers.is_empty() {
1240 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1241 }
1242 group_by_keys = group_by_exprs
1244 .iter()
1245 .map(|e| match e {
1246 SQLExpr::Identifier(ident) => {
1247 resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1248 || {
1249 self.expr_or_ordinal(
1250 e,
1251 &projections,
1252 None,
1253 Some(&schema),
1254 "GROUP BY",
1255 )
1256 },
1257 Ok,
1258 )
1259 },
1260 _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1261 })
1262 .collect::<PolarsResult<_>>()?
1263 },
1264 GroupByExpr::All(modifiers) => {
1267 if !modifiers.is_empty() {
1268 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1269 }
1270 projections.iter().for_each(|expr| match expr {
1271 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1273 Expr::Column(_) => group_by_keys.push(expr.clone()),
1274 Expr::Alias(e, _)
1275 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1276 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1277 if let Expr::Column(name) = &**e {
1278 group_by_keys.push(col(name.clone()));
1279 }
1280 },
1281 _ => {
1282 if !has_expr(expr, |e| {
1284 matches!(e, Expr::Agg(_))
1285 || matches!(e, Expr::Len)
1286 || matches!(e, Expr::Over { .. })
1287 || {
1288 #[cfg(feature = "dynamic_group_by")]
1289 {
1290 matches!(e, Expr::Rolling { .. })
1291 }
1292 #[cfg(not(feature = "dynamic_group_by"))]
1293 {
1294 false
1295 }
1296 }
1297 }) {
1298 group_by_keys.push(expr.clone())
1299 }
1300 },
1301 });
1302 },
1303 };
1304
1305 lf = if group_by_keys.is_empty() {
1306 if select_stmt.having.is_some() {
1308 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1309 };
1310
1311 let mut retained_cols = Vec::with_capacity(projections.len());
1313 let mut retained_names = Vec::with_capacity(projections.len());
1314 let have_order_by = query.order_by.is_some();
1315
1316 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1318
1319 for p in projections.iter() {
1324 let name = p.to_field(schema.deref())?.name.to_string();
1325 if select_modifiers.matches_ilike(&name)
1326 && !select_modifiers.exclude.contains(&name)
1327 {
1328 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1329
1330 retained_cols.push(if have_order_by {
1331 col(name.as_str())
1332 } else {
1333 p.clone()
1334 });
1335 retained_names.push(col(name));
1336 }
1337 }
1338
1339 if have_order_by {
1341 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1346 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1347 {
1348 lf = lf.with_columns(projections);
1349 } else {
1350 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1359 lf = lf
1360 .clone()
1361 .select(projections)
1362 .with_row_index(NAME, None)
1363 .join(
1364 lf.with_row_index(NAME, None),
1365 [col(NAME)],
1366 [col(NAME)],
1367 JoinArgs {
1368 how: JoinType::Left,
1369 validation: Default::default(),
1370 suffix: None,
1371 slice: None,
1372 nulls_equal: false,
1373 coalesce: Default::default(),
1374 maintain_order: MaintainOrderJoin::Left,
1375 build_side: None,
1376 },
1377 );
1378 }
1379 }
1380 if !select_modifiers.replace.is_empty() {
1381 lf = lf.with_columns(&select_modifiers.replace);
1382 }
1383 if !select_modifiers.rename.is_empty() {
1384 lf = lf.with_columns(select_modifiers.renamed_cols());
1385 }
1386 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1387
1388 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1390 && !have_order_by
1391 {
1392 lf = lf.with_columns(retained_cols).select(retained_names);
1394 } else {
1395 lf = lf.select(retained_cols);
1396 }
1397 if !select_modifiers.rename.is_empty() {
1398 lf = lf.rename(
1399 select_modifiers.rename.keys(),
1400 select_modifiers.rename.values(),
1401 true,
1402 );
1403 };
1404 lf
1405 } else {
1406 let having = select_stmt
1407 .having
1408 .as_ref()
1409 .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1410 .transpose()?;
1411 lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1412 lf = self.process_order_by(lf, &query.order_by, None)?;
1413
1414 let output_cols: Vec<_> = projections
1416 .iter()
1417 .map(|p| p.to_field(&schema))
1418 .collect::<PolarsResult<Vec<_>>>()?
1419 .into_iter()
1420 .map(|f| col(f.name))
1421 .collect();
1422
1423 lf.select(&output_cols)
1424 };
1425
1426 lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1428
1429 lf = match &select_stmt.distinct {
1431 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1432 Some(Distinct::On(exprs)) => {
1433 let schema = Some(self.get_frame_schema(&mut lf)?);
1435 let cols = exprs
1436 .iter()
1437 .map(|e| {
1438 let expr = parse_sql_expr(e, self, schema.as_deref())?;
1439 if let Expr::Column(name) = expr {
1440 Ok(name)
1441 } else {
1442 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1443 }
1444 })
1445 .collect::<PolarsResult<Vec<_>>>()?;
1446
1447 lf = self.process_order_by(lf, &query.order_by, None)?;
1449 return Ok(lf.unique_stable(
1450 Some(Selector::ByName {
1451 names: cols.into(),
1452 strict: true,
1453 }),
1454 UniqueKeepStrategy::First,
1455 ));
1456 },
1457 None => lf,
1458 };
1459 Ok(lf)
1460 }
1461
1462 fn column_projections(
1463 &mut self,
1464 select_stmt: &Select,
1465 schema: &SchemaRef,
1466 select_modifiers: &mut SelectModifiers,
1467 ) -> PolarsResult<Vec<Expr>> {
1468 if select_stmt.projection.is_empty()
1469 && select_stmt.flavor == SelectFlavor::FromFirstNoSelect
1470 {
1471 return Ok(schema.iter_names().map(|name| col(name.clone())).collect());
1473 }
1474 let mut items: Vec<ProjectionItem> = Vec::with_capacity(select_stmt.projection.len());
1475 let mut has_qualified_wildcard = false;
1476
1477 for select_item in &select_stmt.projection {
1478 match select_item {
1479 SelectItem::UnnamedExpr(expr) => {
1480 items.push(ProjectionItem::Exprs(vec![parse_sql_expr(
1481 expr,
1482 self,
1483 Some(schema),
1484 )?]));
1485 },
1486 SelectItem::ExprWithAlias { expr, alias } => {
1487 let expr = parse_sql_expr(expr, self, Some(schema))?;
1488 items.push(ProjectionItem::Exprs(vec![
1489 expr.alias(PlSmallStr::from_str(alias.value.as_str())),
1490 ]));
1491 },
1492 SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1493 SelectItemQualifiedWildcardKind::ObjectName(obj_name) => {
1494 let tbl_name = obj_name
1495 .0
1496 .last()
1497 .and_then(|p| p.as_ident())
1498 .map(|i| PlSmallStr::from_str(&i.value))
1499 .unwrap_or_default();
1500 let exprs = self.process_qualified_wildcard(
1501 obj_name,
1502 wildcard_options,
1503 select_modifiers,
1504 Some(schema),
1505 )?;
1506 items.push(ProjectionItem::QualifiedExprs(tbl_name, exprs));
1507 has_qualified_wildcard = true;
1508 },
1509 SelectItemQualifiedWildcardKind::Expr(_) => {
1510 polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1511 },
1512 },
1513 SelectItem::Wildcard(wildcard_options) => {
1514 let cols = schema.iter_names().map(|name| col(name.clone())).collect();
1515 items.push(ProjectionItem::Exprs(
1516 self.process_wildcard_additional_options(
1517 cols,
1518 wildcard_options,
1519 select_modifiers,
1520 Some(schema),
1521 )?,
1522 ));
1523 },
1524 }
1525 }
1526
1527 let exprs = if has_qualified_wildcard {
1529 disambiguate_projection_cols(items, schema)?
1530 } else {
1531 items
1532 .into_iter()
1533 .flat_map(|item| match item {
1534 ProjectionItem::Exprs(exprs) | ProjectionItem::QualifiedExprs(_, exprs) => {
1535 exprs
1536 },
1537 })
1538 .collect()
1539 };
1540 let flattened_exprs = exprs
1541 .into_iter()
1542 .flat_map(|expr| expand_exprs(expr, schema))
1543 .collect();
1544
1545 Ok(flattened_exprs)
1546 }
1547
1548 fn process_where(
1549 &mut self,
1550 mut lf: LazyFrame,
1551 expr: &Option<SQLExpr>,
1552 invert_filter: bool,
1553 schema: Option<SchemaRef>,
1554 ) -> PolarsResult<LazyFrame> {
1555 if let Some(expr) = expr {
1556 let schema = match schema {
1557 None => self.get_frame_schema(&mut lf)?,
1558 Some(s) => s,
1559 };
1560
1561 let (all_true, all_false) = match expr {
1563 SQLExpr::Value(ValueWithSpan {
1564 value: SQLValue::Boolean(b),
1565 ..
1566 }) => (*b, !*b),
1567 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1568 (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::Eq) => {
1569 (a.value == b.value, a.value != b.value)
1570 },
1571 (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::NotEq) => {
1572 (a.value != b.value, a.value == b.value)
1573 },
1574 _ => (false, false),
1575 },
1576 _ => (false, false),
1577 };
1578 if (all_true && !invert_filter) || (all_false && invert_filter) {
1579 return Ok(lf);
1580 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1581 return Ok(lf.clear());
1582 }
1583
1584 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1586 if filter_expression.clone().meta().has_multiple_outputs() {
1587 filter_expression = all_horizontal([filter_expression])?;
1588 }
1589 lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1590 lf = if invert_filter {
1591 lf.remove(filter_expression)
1592 } else {
1593 lf.filter(filter_expression)
1594 };
1595 }
1596 Ok(lf)
1597 }
1598
1599 pub(super) fn process_join(
1600 &mut self,
1601 tbl_left: &TableInfo,
1602 tbl_right: &TableInfo,
1603 constraint: &JoinConstraint,
1604 join_type: JoinType,
1605 ) -> PolarsResult<LazyFrame> {
1606 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1607 let coalesce_type = match constraint {
1608 JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1610 _ => JoinCoalesce::KeepColumns,
1611 };
1612 let joined = tbl_left
1613 .frame
1614 .clone()
1615 .join_builder()
1616 .with(tbl_right.frame.clone())
1617 .left_on(left_on)
1618 .right_on(right_on)
1619 .how(join_type)
1620 .suffix(format!(":{}", tbl_right.name))
1621 .coalesce(coalesce_type)
1622 .finish();
1623
1624 Ok(joined)
1625 }
1626
1627 fn process_qualify(
1628 &mut self,
1629 mut lf: LazyFrame,
1630 qualify_expr: &Option<SQLExpr>,
1631 window_fn_columns: &PlHashSet<String>,
1632 ) -> PolarsResult<LazyFrame> {
1633 if let Some(expr) = qualify_expr {
1634 let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1637 let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1638 if !has_window_fns && !references_window_alias {
1639 polars_bail!(
1640 SQLSyntax:
1641 "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1642 );
1643 }
1644 let schema = self.get_frame_schema(&mut lf)?;
1645 let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1646 if filter_expression.clone().meta().has_multiple_outputs() {
1647 filter_expression = all_horizontal([filter_expression])?;
1648 }
1649 lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1650 lf = lf.filter(filter_expression);
1651 }
1652 Ok(lf)
1653 }
1654
1655 fn process_subqueries(
1656 &mut self,
1657 lf: LazyFrame,
1658 exprs: Vec<&mut Expr>,
1659 ) -> PolarsResult<LazyFrame> {
1660 let mut subplans = vec![];
1661
1662 for e in exprs {
1663 *e = e.clone().try_map_expr(|e| {
1664 if let Expr::SubPlan(lp, names) = e {
1665 assert_eq!(
1666 names.len(),
1667 1,
1668 "multiple columns in subqueries not yet supported"
1669 );
1670
1671 let select_expr = names[0].1.clone();
1672 let mut lf = LazyFrame::from((**lp).clone());
1673 let schema = self.get_frame_schema(&mut lf)?;
1674 polars_ensure!(schema.len() == 1, SQLSyntax: "SQL subquery returns more than one column");
1675 let lf = lf.select([select_expr.clone()]);
1676
1677 subplans.push(lf);
1678 Ok(Expr::Column(names[0].0.clone()).first())
1679 } else {
1680 Ok(e)
1681 }
1682 })?;
1683 }
1684
1685 if subplans.is_empty() {
1686 Ok(lf)
1687 } else {
1688 subplans.insert(0, lf);
1689 concat_lf_horizontal(
1690 subplans,
1691 HConcatOptions {
1692 broadcast_unit_length: true,
1693 ..Default::default()
1694 },
1695 )
1696 }
1697 }
1698
1699 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1700 if let Statement::CreateTable(CreateTable {
1701 if_not_exists,
1702 name,
1703 query,
1704 columns,
1705 like,
1706 ..
1707 }) = stmt
1708 {
1709 let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1710 if *if_not_exists && self.table_map.read().unwrap().contains_key(tbl_name) {
1711 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1712 }
1713 let lf = match (query, columns.is_empty(), like) {
1714 (Some(query), true, None) => {
1715 self.execute_query(query)?
1719 },
1720 (None, false, None) => {
1721 let mut schema = Schema::with_capacity(columns.len());
1725 for col in columns {
1726 let col_name = col.name.value.as_str();
1727 let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1728 schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1729 }
1730 DataFrame::empty_with_schema(&schema).lazy()
1731 },
1732 (None, true, Some(like_kind)) => {
1733 let like_name = match like_kind {
1737 CreateTableLikeKind::Plain(like)
1738 | CreateTableLikeKind::Parenthesized(like) => &like.name,
1739 };
1740 let like_table = like_name
1741 .0
1742 .first()
1743 .unwrap()
1744 .as_ident()
1745 .unwrap()
1746 .value
1747 .as_str();
1748 if let Some(table) = self.table_map.read().unwrap().get(like_table).cloned() {
1749 table.clear()
1750 } else {
1751 polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1752 }
1753 },
1754 (None, true, None) => {
1756 polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1757 },
1758 _ => {
1760 polars_bail!(
1761 SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1762 query,
1763 columns,
1764 like,
1765 )
1766 },
1767 };
1768 self.register(tbl_name, lf);
1769
1770 let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1771 Ok(df_created.unwrap().lazy())
1772 } else {
1773 unreachable!()
1774 }
1775 }
1776
1777 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1778 match relation {
1779 TableFactor::Table {
1780 name, alias, args, ..
1781 } => {
1782 if let Some(args) = args {
1783 return self.execute_table_function(name, alias, &args.args);
1784 }
1785 let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1786 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1787 match alias {
1788 Some(alias) => {
1789 self.table_aliases
1790 .insert(alias.name.value.clone(), tbl_name.to_string());
1791 Ok((alias.name.value.clone(), lf))
1792 },
1793 None => Ok((tbl_name.to_string(), lf)),
1794 }
1795 } else {
1796 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1797 }
1798 },
1799 TableFactor::Derived {
1800 lateral,
1801 subquery,
1802 alias,
1803 } => {
1804 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1805 if let Some(alias) = alias {
1806 let mut lf = self.execute_query_no_ctes(subquery)?;
1807 lf = self.rename_columns_from_table_alias(lf, alias)?;
1808 self.table_map
1809 .write()
1810 .unwrap()
1811 .insert(alias.name.value.clone(), lf.clone());
1812 Ok((alias.name.value.clone(), lf))
1813 } else {
1814 let lf = self.execute_query_no_ctes(subquery)?;
1815 Ok(("".to_string(), lf))
1816 }
1817 },
1818 TableFactor::UNNEST {
1819 alias,
1820 array_exprs,
1821 with_offset,
1822 with_offset_alias: _,
1823 ..
1824 } => {
1825 if let Some(alias) = alias {
1826 let column_names: Vec<Option<PlSmallStr>> = alias
1827 .columns
1828 .iter()
1829 .map(|c| {
1830 if c.name.value.is_empty() {
1831 None
1832 } else {
1833 Some(PlSmallStr::from_str(c.name.value.as_str()))
1834 }
1835 })
1836 .collect();
1837
1838 let column_values: Vec<Series> = array_exprs
1839 .iter()
1840 .map(|arr| parse_sql_array(arr, self))
1841 .collect::<Result<_, _>>()?;
1842
1843 polars_ensure!(!column_names.is_empty(),
1844 SQLSyntax:
1845 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1846 );
1847 if column_names.len() != column_values.len() {
1848 let plural = if column_values.len() > 1 { "s" } else { "" };
1849 polars_bail!(
1850 SQLSyntax:
1851 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1852 );
1853 }
1854 let column_series: Vec<Column> = column_values
1855 .into_iter()
1856 .zip(column_names)
1857 .map(|(s, name)| {
1858 if let Some(name) = name {
1859 s.with_name(name)
1860 } else {
1861 s
1862 }
1863 })
1864 .map(Column::from)
1865 .collect();
1866
1867 let lf = DataFrame::new_infer_height(column_series)?.lazy();
1868
1869 if *with_offset {
1870 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1872 }
1873 let table_name = alias.name.value.clone();
1874 self.table_map
1875 .write()
1876 .unwrap()
1877 .insert(table_name.clone(), lf.clone());
1878 Ok((table_name, lf))
1879 } else {
1880 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1881 }
1882 },
1883 TableFactor::NestedJoin {
1884 table_with_joins,
1885 alias,
1886 } => {
1887 let lf = self.execute_from_statement(table_with_joins)?;
1888 match alias {
1889 Some(a) => Ok((a.name.value.clone(), lf)),
1890 None => Ok(("".to_string(), lf)),
1891 }
1892 },
1893 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1895 }
1896 }
1897
1898 fn execute_table_function(
1899 &mut self,
1900 name: &ObjectName,
1901 alias: &Option<TableAlias>,
1902 args: &[FunctionArg],
1903 ) -> PolarsResult<(String, LazyFrame)> {
1904 let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1905 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1906 let (tbl_name, lf) = read_fn.execute(args)?;
1907 #[allow(clippy::useless_asref)]
1908 let tbl_name = alias
1909 .as_ref()
1910 .map(|a| a.name.value.clone())
1911 .unwrap_or_else(|| tbl_name.to_string());
1912
1913 self.table_map
1914 .write()
1915 .unwrap()
1916 .insert(tbl_name.clone(), lf.clone());
1917 Ok((tbl_name, lf))
1918 }
1919
1920 fn process_order_by(
1921 &mut self,
1922 mut lf: LazyFrame,
1923 order_by: &Option<OrderBy>,
1924 selected: Option<&[Expr]>,
1925 ) -> PolarsResult<LazyFrame> {
1926 if order_by.as_ref().is_none_or(|ob| match &ob.kind {
1927 OrderByKind::Expressions(exprs) => exprs.is_empty(),
1928 OrderByKind::All(_) => false,
1929 }) {
1930 return Ok(lf);
1931 }
1932 let schema = self.get_frame_schema(&mut lf)?;
1933 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1934 let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
1935 OrderByKind::Expressions(exprs) => {
1936 if exprs.len() == 1
1939 && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
1940 if ident.value.to_uppercase() == "ALL"
1941 && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1942 {
1943 let n_cols = if let Some(selected) = selected {
1945 selected.len()
1946 } else {
1947 schema.len()
1948 };
1949 (vec![], Some(&exprs[0].options), n_cols)
1950 } else {
1951 (exprs.clone(), None, exprs.len())
1952 }
1953 },
1954 OrderByKind::All(opts) => {
1955 let n_cols = if let Some(selected) = selected {
1956 selected.len()
1957 } else {
1958 schema.len()
1959 };
1960 (vec![], Some(opts), n_cols)
1961 },
1962 };
1963 let mut descending = Vec::with_capacity(n_order_cols);
1964 let mut nulls_last = Vec::with_capacity(n_order_cols);
1965 let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
1966
1967 if let Some(opts) = order_by_all {
1968 if let Some(selected) = selected {
1969 by.extend(selected.iter().cloned());
1970 } else {
1971 by.extend(columns_iter);
1972 };
1973 let desc_order = !opts.asc.unwrap_or(true);
1974 nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
1975 descending.resize(by.len(), desc_order);
1976 } else {
1977 let columns = &columns_iter.collect::<Vec<_>>();
1978 for ob in order_by {
1979 let desc_order = !ob.options.asc.unwrap_or(true);
1982 nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
1983 descending.push(desc_order);
1984
1985 by.push(self.expr_or_ordinal(
1987 &ob.expr,
1988 columns,
1989 selected,
1990 Some(&schema),
1991 "ORDER BY",
1992 )?)
1993 }
1994 }
1995 Ok(lf.sort_by_exprs(
1996 &by,
1997 SortMultipleOptions::default()
1998 .with_order_descending_multi(descending)
1999 .with_nulls_last_multi(nulls_last),
2000 ))
2001 }
2002
2003 fn process_group_by(
2004 &mut self,
2005 mut lf: LazyFrame,
2006 group_by_keys: &[Expr],
2007 projections: &[Expr],
2008 having: Option<Expr>,
2009 ) -> PolarsResult<LazyFrame> {
2010 let schema_before = self.get_frame_schema(&mut lf)?;
2011 let group_by_keys_schema =
2012 expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
2013 format!("group_by keys contained duplicate output name '{duplicate_name}'")
2014 })?;
2015
2016 let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
2018 let mut aggregation_projection = Vec::with_capacity(projections.len());
2019 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
2020 let mut projection_aliases = PlHashSet::new();
2021 let mut group_key_aliases = PlHashSet::new();
2022
2023 let group_key_data: Vec<_> = group_by_keys
2026 .iter()
2027 .map(|gk| {
2028 (
2029 strip_outer_alias(gk),
2030 gk.to_field(&schema_before).ok().map(|f| f.name),
2031 )
2032 })
2033 .collect();
2034
2035 let projection_matches_group_key: Vec<bool> = projections
2036 .iter()
2037 .map(|p| {
2038 let p_stripped = strip_outer_alias(p);
2039 let p_name = p.to_field(&schema_before).ok().map(|f| f.name);
2040 group_key_data
2041 .iter()
2042 .any(|(gk_stripped, gk_name)| *gk_stripped == p_stripped && *gk_name == p_name)
2043 })
2044 .collect();
2045
2046 for (e, &matches_group_key) in projections.iter().zip(&projection_matches_group_key) {
2047 let is_non_group_key_expr = !matches_group_key
2049 && has_expr(e, |e| {
2050 match e {
2051 Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
2052 #[cfg(feature = "dynamic_group_by")]
2053 Expr::Rolling { .. } => true,
2054 Expr::Function { function: func, .. }
2055 if !matches!(func, FunctionExpr::StructExpr(_)) =>
2056 {
2057 has_expr(e, |e| match e {
2060 Expr::Column(name) => !group_by_keys_schema.contains(name),
2061 _ => false,
2062 })
2063 },
2064 _ => false,
2065 }
2066 });
2067
2068 let mut e_inner = e;
2071 if let Expr::Alias(expr, alias) = e {
2072 if e.clone().meta().is_simple_projection(Some(&schema_before)) {
2073 group_key_aliases.insert(alias.as_ref());
2074 e_inner = expr
2075 } else if let Expr::Function {
2076 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
2077 ..
2078 } = expr.deref()
2079 {
2080 projection_overrides
2081 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
2082 } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
2083 projection_aliases.insert(alias.as_ref());
2084 }
2085 }
2086 let field = e_inner.to_field(&schema_before)?;
2087 if is_non_group_key_expr {
2088 let mut e = e.clone();
2089 if let Expr::Agg(AggExpr::Implode {
2090 input: expr,
2091 maintain_order: _,
2092 }) = &e
2093 {
2094 e = (**expr).clone();
2095 } else if let Expr::Alias(expr, name) = &e {
2096 if let Expr::Agg(AggExpr::Implode {
2097 input: expr,
2098 maintain_order: _,
2099 }) = expr.as_ref()
2100 {
2101 e = (**expr).clone().alias(name.clone());
2102 }
2103 }
2104 if group_by_keys_schema.get(&field.name).is_some() {
2107 let alias_name = format!("__POLARS_AGG_{}", field.name);
2108 e = e.alias(alias_name.as_str());
2109 aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
2110 }
2111 aggregation_projection.push(e);
2112 } else if !matches_group_key {
2113 if let Expr::Column(_)
2115 | Expr::Function {
2116 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
2117 ..
2118 } = e_inner
2119 {
2120 if !group_by_keys_schema.contains(&field.name) {
2121 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
2122 }
2123 }
2124 }
2125 }
2126
2127 let having_filter = if let Some(having_expr) = having {
2130 let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
2131 .iter()
2132 .filter_map(|p| match p {
2133 Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
2134 Some((inner.as_ref().clone(), name.clone()))
2135 },
2136 e @ (Expr::Agg(_) | Expr::Len) => Some((
2137 e.clone(),
2138 e.to_field(&schema_before)
2139 .map(|f| f.name)
2140 .unwrap_or_default(),
2141 )),
2142 _ => None,
2143 })
2144 .collect();
2145
2146 let mut n_having_aggs = 0;
2147 let updated_having = having_expr.map_expr(|e| {
2148 if !matches!(&e, Expr::Agg(_) | Expr::Len) {
2149 return e;
2150 }
2151 let name = agg_to_name
2152 .iter()
2153 .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
2154 .unwrap_or_else(|| {
2155 let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
2156 aggregation_projection.push(e.clone().alias(n.clone()));
2157 agg_to_name.push((e.clone(), n.clone()));
2158 n_having_aggs += 1;
2159 n
2160 });
2161 col(name)
2162 });
2163 Some(updated_having)
2164 } else {
2165 None
2166 };
2167
2168 let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
2170 if let Some(filter_expr) = having_filter {
2171 aggregated = aggregated.filter(filter_expr);
2172 }
2173
2174 let projection_schema =
2175 expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
2176 format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
2177 })?;
2178
2179 let final_projection = projection_schema
2182 .iter_names()
2183 .zip(projections.iter().zip(&projection_matches_group_key))
2184 .map(|(name, (projection_expr, &matches_group_key))| {
2185 if let Some(expr) = projection_overrides.get(name.as_str()) {
2186 expr.clone()
2187 } else if let Some(aliased_name) = aliased_aggregations.get(name) {
2188 col(aliased_name.clone()).alias(name.clone())
2189 } else if group_by_keys_schema.get(name).is_some() && matches_group_key {
2190 col(name.clone())
2191 } else if group_by_keys_schema.get(name).is_some()
2192 || projection_aliases.contains(name.as_str())
2193 || group_key_aliases.contains(name.as_str())
2194 {
2195 if has_expr(projection_expr, |e| {
2196 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
2197 }) {
2198 col(name.clone())
2199 } else {
2200 projection_expr.clone()
2201 }
2202 } else {
2203 col(name.clone())
2204 }
2205 })
2206 .collect::<Vec<_>>();
2207
2208 let mut output_projection = final_projection;
2210 for key_name in group_by_keys_schema.iter_names() {
2211 if !projection_schema.contains(key_name) {
2212 output_projection.push(col(key_name.clone()));
2214 } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2215 let is_cross_aliased = projections.iter().any(|p| {
2217 p.to_field(&schema_before).is_ok_and(|f| f.name == key_name)
2218 && !is_simple_col_ref(p, key_name)
2219 });
2220 if is_cross_aliased {
2221 let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2223 output_projection.push(col(key_name.clone()).alias(internal_name));
2224 }
2225 }
2226 }
2227 Ok(aggregated.select(&output_projection))
2228 }
2229
2230 fn process_limit_offset(
2231 &self,
2232 lf: LazyFrame,
2233 limit_clause: &Option<LimitClause>,
2234 fetch: &Option<Fetch>,
2235 ) -> PolarsResult<LazyFrame> {
2236 let (limit, offset) = match limit_clause {
2238 Some(LimitClause::LimitOffset {
2239 limit,
2240 offset,
2241 limit_by,
2242 }) => {
2243 if !limit_by.is_empty() {
2244 polars_bail!(SQLSyntax: "`LIMIT <n> BY <exprs>` clause is not supported");
2247 }
2248 (limit.as_ref(), offset.as_ref().map(|o| &o.value))
2249 },
2250 Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
2251 None => (None, None),
2252 };
2253
2254 let limit = match (fetch, limit) {
2256 (Some(fetch), None) => fetch.quantity.as_ref(),
2257 (Some(_), Some(_)) => {
2258 polars_bail!(SQLSyntax: "cannot use both `LIMIT` and `FETCH` in the same query")
2259 },
2260 (None, limit) => limit,
2261 };
2262
2263 match (offset, limit) {
2265 (
2266 Some(SQLExpr::Value(ValueWithSpan {
2267 value: SQLValue::Number(offset, _),
2268 ..
2269 })),
2270 Some(SQLExpr::Value(ValueWithSpan {
2271 value: SQLValue::Number(limit, _),
2272 ..
2273 })),
2274 ) => Ok(lf.slice(
2275 offset
2276 .parse()
2277 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2278 limit.parse().map_err(
2279 |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2280 )?,
2281 )),
2282 (
2283 Some(SQLExpr::Value(ValueWithSpan {
2284 value: SQLValue::Number(offset, _),
2285 ..
2286 })),
2287 None,
2288 ) => Ok(lf.slice(
2289 offset
2290 .parse()
2291 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2292 IdxSize::MAX,
2293 )),
2294 (
2295 None,
2296 Some(SQLExpr::Value(ValueWithSpan {
2297 value: SQLValue::Number(limit, _),
2298 ..
2299 })),
2300 ) => {
2301 Ok(lf.limit(limit.parse().map_err(
2302 |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2303 )?))
2304 },
2305 (None, None) => Ok(lf),
2306 _ => polars_bail!(
2307 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET/FETCH are not supported",
2308 ),
2309 }
2310 }
2311
2312 fn process_qualified_wildcard(
2313 &mut self,
2314 ObjectName(idents): &ObjectName,
2315 options: &WildcardAdditionalOptions,
2316 modifiers: &mut SelectModifiers,
2317 schema: Option<&Schema>,
2318 ) -> PolarsResult<Vec<Expr>> {
2319 let mut idents_with_wildcard: Vec<Ident> = idents
2320 .iter()
2321 .filter_map(|p| p.as_ident().cloned())
2322 .collect();
2323 idents_with_wildcard.push(Ident::new("*"));
2324
2325 let exprs = resolve_compound_identifier(self, &idents_with_wildcard, schema)?;
2326 self.process_wildcard_additional_options(exprs, options, modifiers, schema)
2327 }
2328
2329 fn process_wildcard_additional_options(
2330 &mut self,
2331 exprs: Vec<Expr>,
2332 options: &WildcardAdditionalOptions,
2333 modifiers: &mut SelectModifiers,
2334 schema: Option<&Schema>,
2335 ) -> PolarsResult<Vec<Expr>> {
2336 if options.opt_except.is_some() && options.opt_exclude.is_some() {
2337 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2338 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2339 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2340 }
2341
2342 if let Some(items) = &options.opt_exclude {
2344 match items {
2345 ExcludeSelectItem::Single(ident) => {
2346 modifiers.exclude.insert(ident.value.clone());
2347 },
2348 ExcludeSelectItem::Multiple(idents) => {
2349 modifiers
2350 .exclude
2351 .extend(idents.iter().map(|i| i.value.clone()));
2352 },
2353 };
2354 }
2355
2356 if let Some(items) = &options.opt_except {
2358 modifiers.exclude.insert(items.first_element.value.clone());
2359 modifiers
2360 .exclude
2361 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2362 }
2363
2364 if let Some(item) = &options.opt_ilike {
2366 let rx = regex::escape(item.pattern.as_str())
2367 .replace('%', ".*")
2368 .replace('_', ".");
2369
2370 modifiers.ilike = Some(
2371 polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2372 );
2373 }
2374
2375 if let Some(items) = &options.opt_rename {
2377 let renames = match items {
2378 RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2379 RenameSelectItem::Multiple(renames) => renames.as_slice(),
2380 };
2381 for rn in renames {
2382 let before = PlSmallStr::from_str(rn.ident.value.as_str());
2383 let after = PlSmallStr::from_str(rn.alias.value.as_str());
2384 if before != after {
2385 modifiers.rename.insert(before, after);
2386 }
2387 }
2388 }
2389
2390 if let Some(replacements) = &options.opt_replace {
2392 for rp in &replacements.items {
2393 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2394 modifiers
2395 .replace
2396 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2397 }
2398 }
2399 Ok(exprs)
2400 }
2401
2402 fn rename_columns_from_table_alias(
2403 &mut self,
2404 mut lf: LazyFrame,
2405 alias: &TableAlias,
2406 ) -> PolarsResult<LazyFrame> {
2407 if alias.columns.is_empty() {
2408 Ok(lf)
2409 } else {
2410 let schema = self.get_frame_schema(&mut lf)?;
2411 if alias.columns.len() != schema.len() {
2412 polars_bail!(
2413 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2414 alias.columns.len(), alias.name.value, schema.len()
2415 )
2416 } else {
2417 let existing_columns: Vec<_> = schema.iter_names().collect();
2418 let new_columns: Vec<_> =
2419 alias.columns.iter().map(|c| c.name.value.clone()).collect();
2420 Ok(lf.rename(existing_columns, new_columns, true))
2421 }
2422 }
2423 }
2424}
2425
2426impl SQLContext {
2427 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2429 Self {
2430 table_map: Arc::new(RwLock::new(table_map)),
2431 ..Default::default()
2432 }
2433 }
2434}
2435
2436fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2437 match expr {
2438 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2439 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2440 schema
2441 .iter_names()
2442 .filter(|name| re.is_match(name))
2443 .map(|name| col(name.clone()))
2444 .collect::<Vec<_>>()
2445 },
2446 Expr::Selector(s) => s
2447 .into_columns(schema, &Default::default())
2448 .unwrap()
2449 .into_iter()
2450 .map(col)
2451 .collect::<Vec<_>>(),
2452 _ => vec![expr],
2453 }
2454}
2455
2456fn is_regex_colname(nm: &str) -> bool {
2457 nm.starts_with('^') && nm.ends_with('$')
2458}
2459
2460fn get_using_cols(op: &JoinOperator) -> Option<impl Iterator<Item = String> + '_> {
2462 use JoinOperator::*;
2463 match op {
2464 Join(JoinConstraint::Using(cols))
2465 | Inner(JoinConstraint::Using(cols))
2466 | Left(JoinConstraint::Using(cols))
2467 | LeftOuter(JoinConstraint::Using(cols))
2468 | Right(JoinConstraint::Using(cols))
2469 | RightOuter(JoinConstraint::Using(cols))
2470 | FullOuter(JoinConstraint::Using(cols))
2471 | Semi(JoinConstraint::Using(cols))
2472 | Anti(JoinConstraint::Using(cols))
2473 | LeftSemi(JoinConstraint::Using(cols))
2474 | LeftAnti(JoinConstraint::Using(cols))
2475 | RightSemi(JoinConstraint::Using(cols))
2476 | RightAnti(JoinConstraint::Using(cols)) => Some(cols.iter().filter_map(|c| {
2477 c.0.first()
2478 .and_then(|p| p.as_ident())
2479 .map(|i| i.value.clone())
2480 })),
2481 _ => None,
2482 }
2483}
2484
2485fn get_table_name(factor: &TableFactor) -> Option<String> {
2487 match factor {
2488 TableFactor::Table { name, alias, .. } => {
2489 alias.as_ref().map(|a| a.name.value.clone()).or_else(|| {
2490 name.0
2491 .last()
2492 .and_then(|p| p.as_ident())
2493 .map(|i| i.value.clone())
2494 })
2495 },
2496 TableFactor::Derived { alias, .. }
2497 | TableFactor::NestedJoin { alias, .. }
2498 | TableFactor::TableFunction { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2499 _ => None,
2500 }
2501}
2502
2503fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2505 match expr {
2506 Expr::Column(n) => n == col_name,
2507 Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2508 _ => false,
2509 }
2510}
2511
2512fn strip_outer_alias(expr: &Expr) -> Expr {
2514 if let Expr::Alias(inner, _) = expr {
2515 inner.as_ref().clone()
2516 } else {
2517 expr.clone()
2518 }
2519}
2520
2521fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2526 if schema.contains(name) {
2528 return None;
2529 }
2530 projections.iter().find_map(|p| match p {
2532 Expr::Alias(inner, alias) if alias.as_str() == name => {
2533 Some(inner.as_ref().clone().alias(alias.clone()))
2534 },
2535 _ => None,
2536 })
2537}
2538
2539fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2541 let mut found_cols = false;
2542 let mut all_in_schema = true;
2543 for e in expr.into_iter() {
2544 if let Expr::Column(name) = e {
2545 found_cols = true;
2546 if !schema.contains(name.as_str()) {
2547 all_in_schema = false;
2548 break;
2549 }
2550 }
2551 }
2552 found_cols && all_in_schema
2553}
2554
2555fn determine_left_right_join_on(
2563 ctx: &mut SQLContext,
2564 expr_left: &SQLExpr,
2565 expr_right: &SQLExpr,
2566 tbl_left: &TableInfo,
2567 tbl_right: &TableInfo,
2568 join_schema: &Schema,
2569) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2570 let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2573 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2574 e => e,
2575 };
2576 let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2577 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2578 e => e,
2579 };
2580
2581 let left_refs = (
2585 expr_refers_to_table(expr_left, &tbl_left.name),
2586 expr_refers_to_table(expr_left, &tbl_right.name),
2587 );
2588 let right_refs = (
2589 expr_refers_to_table(expr_right, &tbl_left.name),
2590 expr_refers_to_table(expr_right, &tbl_right.name),
2591 );
2592 match (left_refs, right_refs) {
2594 ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2596 ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2598 ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2600 polars_bail!(
2601 SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2602 if left_refs.0 && left_refs.1 {
2603 "left"
2604 } else {
2605 "right"
2606 }, tbl_left.name, tbl_right.name
2607 )
2608 },
2609 _ => {},
2611 }
2612
2613 let left_on_cols_in = (
2618 expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2619 expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2620 );
2621 let right_on_cols_in = (
2622 expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2623 expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2624 );
2625 match (left_on_cols_in, right_on_cols_in) {
2626 ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2628 ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2629 ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2631 ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2632 ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2633 ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2634 _ => Ok((vec![left_on], vec![right_on])),
2636 }
2637}
2638
2639fn process_join_on(
2640 ctx: &mut SQLContext,
2641 sql_expr: &SQLExpr,
2642 tbl_left: &TableInfo,
2643 tbl_right: &TableInfo,
2644) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2645 match sql_expr {
2646 SQLExpr::BinaryOp { left, op, right } => match op {
2647 SQLBinaryOperator::And => {
2648 let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
2649 let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
2650 left_i.append(&mut left_j);
2651 right_i.append(&mut right_j);
2652 Ok((left_i, right_i))
2653 },
2654 SQLBinaryOperator::Eq => {
2655 let mut join_schema =
2658 Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2659 for (name, dtype) in tbl_left.schema.iter() {
2660 join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2661 }
2662 for (name, dtype) in tbl_right.schema.iter() {
2663 if !join_schema.contains(name) {
2664 join_schema.insert_at_index(
2665 join_schema.len(),
2666 name.clone(),
2667 dtype.clone(),
2668 )?;
2669 }
2670 }
2671 determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
2672 },
2673 _ => polars_bail!(
2674 SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
2676 ),
2677 },
2678 SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2679 _ => polars_bail!(
2680 SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
2681 ),
2682 }
2683}
2684
2685fn process_join_constraint(
2686 constraint: &JoinConstraint,
2687 tbl_left: &TableInfo,
2688 tbl_right: &TableInfo,
2689 ctx: &mut SQLContext,
2690) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2691 match constraint {
2692 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2693 process_join_on(ctx, expr, tbl_left, tbl_right)
2694 },
2695 JoinConstraint::Using(idents) if !idents.is_empty() => {
2696 let using: Vec<Expr> = idents
2697 .iter()
2698 .map(|ObjectName(parts)| {
2699 if parts.len() != 1 {
2700 polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2701 }
2702 match parts[0].as_ident() {
2703 Some(ident) => Ok(col(ident.value.as_str())),
2704 None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2705 }
2706 })
2707 .collect::<PolarsResult<Vec<_>>>()?;
2708 Ok((using.clone(), using))
2709 },
2710 JoinConstraint::Natural => {
2711 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2712 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2713 let on: Vec<Expr> = left_names
2714 .intersection(&right_names)
2715 .map(|&name| col(name.clone()))
2716 .collect();
2717 if on.is_empty() {
2718 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2719 }
2720 Ok((on.clone(), on))
2721 },
2722 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2723 }
2724}
2725
2726pub fn extract_table_identifiers(
2730 query: &str,
2731 include_schema: bool,
2732 unique: bool,
2733) -> PolarsResult<Vec<String>> {
2734 let mut parser = Parser::new(&GenericDialect);
2735 parser = parser.with_options(ParserOptions {
2736 trailing_commas: true,
2737 ..Default::default()
2738 });
2739 let ast = parser
2740 .try_with_sql(query)
2741 .map_err(to_sql_interface_err)?
2742 .parse_statements()
2743 .map_err(to_sql_interface_err)?;
2744
2745 let mut collector = TableIdentifierCollector {
2746 include_schema,
2747 ..Default::default()
2748 };
2749 for stmt in &ast {
2750 let _ = stmt.visit(&mut collector);
2751 }
2752 Ok(if unique {
2753 collector
2754 .tables
2755 .into_iter()
2756 .collect::<PlIndexSet<_>>()
2757 .into_iter()
2758 .collect()
2759 } else {
2760 collector.tables
2761 })
2762}
2763
2764bitflags::bitflags! {
2765 #[derive(PartialEq)]
2770 struct ExprSqlProjectionHeightBehavior: u8 {
2771 const MaintainsColumn = 1 << 0;
2773 const Independent = 1 << 1;
2777 const InheritsContext = 1 << 2;
2780 }
2781}
2782
2783impl ExprSqlProjectionHeightBehavior {
2784 fn identify_from_expr(expr: &Expr) -> Self {
2785 let mut has_column = false;
2786 let mut has_independent = false;
2787
2788 for e in expr.into_iter() {
2789 use Expr::*;
2790 has_column |= matches!(e, Column(_) | Selector(_));
2791 has_independent |= match e {
2792 AnonymousFunction { options, .. } => {
2794 options.returns_scalar() || !options.is_length_preserving()
2795 },
2796 Literal(v) => !v.is_scalar(),
2797 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
2798 Agg { .. } | Len => true,
2799 _ => false,
2800 }
2801 }
2802 if has_independent {
2803 Self::Independent
2804 } else if has_column {
2805 Self::MaintainsColumn
2806 } else {
2807 Self::InheritsContext
2808 }
2809 }
2810}