polars_sql/
context.rs

1use std::ops::{ControlFlow, Deref};
2
3use polars_core::frame::row::Row;
4use polars_core::prelude::*;
5use polars_lazy::prelude::*;
6use polars_ops::frame::JoinCoalesce;
7use polars_plan::dsl::function_expr::StructFunction;
8use polars_plan::prelude::*;
9use polars_utils::format_pl_smallstr;
10use sqlparser::ast::{
11    BinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct, ExcludeSelectItem,
12    Expr as SQLExpr, FromTable, FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator,
13    LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName, ObjectNamePart, ObjectType,
14    OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectFlavor, SelectItem,
15    SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
16    TableFactor, TableWithJoins, Truncate, UnaryOperator, Value as SQLValue, ValueWithSpan, Values,
17    Visit, Visitor as SQLVisitor, WildcardAdditionalOptions, WindowSpec,
18};
19use sqlparser::dialect::GenericDialect;
20use sqlparser::parser::{Parser, ParserOptions};
21
22use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
23use crate::sql_expr::{
24    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
25};
26use crate::table_functions::PolarsTableFunctions;
27use crate::types::map_sql_dtype_to_polars;
28
29#[derive(Clone)]
30pub struct TableInfo {
31    pub(crate) frame: LazyFrame,
32    pub(crate) name: PlSmallStr,
33    pub(crate) schema: Arc<Schema>,
34}
35
36struct SelectModifiers {
37    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
38    ilike: Option<regex::Regex>,               // SELECT * ILIKE
39    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
40    replace: Vec<Expr>,                        // SELECT * REPLACE
41}
42impl SelectModifiers {
43    fn matches_ilike(&self, s: &str) -> bool {
44        match &self.ilike {
45            Some(rx) => rx.is_match(s),
46            None => true,
47        }
48    }
49    fn renamed_cols(&self) -> Vec<Expr> {
50        self.rename
51            .iter()
52            .map(|(before, after)| col(before.clone()).alias(after.clone()))
53            .collect()
54    }
55}
56
57/// The SQLContext is the main entry point for executing SQL queries.
58#[derive(Clone)]
59pub struct SQLContext {
60    pub(crate) table_map: PlHashMap<String, LazyFrame>,
61    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
62    pub(crate) lp_arena: Arena<IR>,
63    pub(crate) expr_arena: Arena<AExpr>,
64
65    cte_map: PlHashMap<String, LazyFrame>,
66    table_aliases: PlHashMap<String, String>,
67    joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
68    pub(crate) named_windows: PlHashMap<String, WindowSpec>,
69}
70
71impl Default for SQLContext {
72    fn default() -> Self {
73        Self {
74            function_registry: Arc::new(DefaultFunctionRegistry {}),
75            table_map: Default::default(),
76            cte_map: Default::default(),
77            table_aliases: Default::default(),
78            joined_aliases: Default::default(),
79            named_windows: Default::default(),
80            lp_arena: Default::default(),
81            expr_arena: Default::default(),
82        }
83    }
84}
85
86impl SQLContext {
87    /// Create a new SQLContext.
88    /// ```rust
89    /// # use polars_sql::SQLContext;
90    /// # fn main() {
91    /// let ctx = SQLContext::new();
92    /// # }
93    /// ```
94    pub fn new() -> Self {
95        Self::default()
96    }
97
98    /// Get the names of all registered tables, in sorted order.
99    pub fn get_tables(&self) -> Vec<String> {
100        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
101        tables.sort_unstable();
102        tables
103    }
104
105    /// Register a [`LazyFrame`] as a table in the SQLContext.
106    /// ```rust
107    /// # use polars_sql::SQLContext;
108    /// # use polars_core::prelude::*;
109    /// # use polars_lazy::prelude::*;
110    /// # fn main() {
111    ///
112    /// let mut ctx = SQLContext::new();
113    /// let df = df! {
114    ///    "a" =>  [1, 2, 3],
115    /// }.unwrap().lazy();
116    ///
117    /// ctx.register("df", df);
118    /// # }
119    ///```
120    pub fn register(&mut self, name: &str, lf: LazyFrame) {
121        self.table_map.insert(name.to_owned(), lf);
122    }
123
124    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
125    pub fn unregister(&mut self, name: &str) {
126        self.table_map.remove(&name.to_owned());
127    }
128
129    /// Execute a SQL query, returning a [`LazyFrame`].
130    /// ```rust
131    /// # use polars_sql::SQLContext;
132    /// # use polars_core::prelude::*;
133    /// # use polars_lazy::prelude::*;
134    /// # fn main() {
135    ///
136    /// let mut ctx = SQLContext::new();
137    /// let df = df! {
138    ///    "a" =>  [1, 2, 3],
139    /// }
140    /// .unwrap();
141    ///
142    /// ctx.register("df", df.clone().lazy());
143    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
144    /// assert!(sql_df.equals(&df));
145    /// # }
146    ///```
147    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
148        let mut parser = Parser::new(&GenericDialect);
149        parser = parser.with_options(ParserOptions {
150            trailing_commas: true,
151            ..Default::default()
152        });
153
154        let ast = parser
155            .try_with_sql(query)
156            .map_err(to_sql_interface_err)?
157            .parse_statements()
158            .map_err(to_sql_interface_err)?;
159
160        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
161        let res = self.execute_statement(ast.first().unwrap())?;
162
163        // Ensure the result uses the proper arenas.
164        // This will instantiate new arenas with a new version.
165        let lp_arena = std::mem::take(&mut self.lp_arena);
166        let expr_arena = std::mem::take(&mut self.expr_arena);
167        res.set_cached_arena(lp_arena, expr_arena);
168
169        // Every execution should clear the statement-level maps.
170        self.cte_map.clear();
171        self.table_aliases.clear();
172        self.joined_aliases.clear();
173        self.named_windows.clear();
174
175        Ok(res)
176    }
177
178    /// Add a function registry to the SQLContext.
179    /// The registry provides the ability to add custom functions to the SQLContext.
180    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
181        self.function_registry = function_registry;
182        self
183    }
184
185    /// Get the function registry of the SQLContext
186    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
187        &self.function_registry
188    }
189
190    /// Get a mutable reference to the function registry of the SQLContext
191    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
192        Arc::get_mut(&mut self.function_registry).unwrap()
193    }
194}
195
196impl SQLContext {
197    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
198        let ast = stmt;
199        Ok(match ast {
200            Statement::Query(query) => self.execute_query(query)?,
201            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
202            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
203            stmt @ Statement::Drop {
204                object_type: ObjectType::Table,
205                ..
206            } => self.execute_drop_table(stmt)?,
207            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
208            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
209            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
210            _ => polars_bail!(
211                SQLInterface: "statement type is not supported:\n{:?}", ast,
212            ),
213        })
214    }
215
216    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217        self.register_ctes(query)?;
218        self.execute_query_no_ctes(query)
219    }
220
221    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
222        let lf = self.process_query(&query.body, query)?;
223        let (limit, offset) = match &query.limit_clause {
224            Some(LimitClause::LimitOffset {
225                limit,
226                offset,
227                limit_by,
228            }) => {
229                if !limit_by.is_empty() {
230                    // specialised clickhouse syntax
231                    polars_bail!(SQLSyntax: "LIMIT BY clause is not supported");
232                }
233                (limit.as_ref(), offset.as_ref().map(|o| &o.value))
234            },
235            Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
236            None => (None, None),
237        };
238        self.process_limit_offset(lf, limit, offset)
239    }
240
241    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
242        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
243    }
244
245    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
246        // Resolve the table name in the current scope; multi-stage fallback
247        // * table name → cte name
248        // * table alias → cte alias
249        self.table_map
250            .get(name)
251            .or_else(|| self.cte_map.get(name))
252            .or_else(|| {
253                self.table_aliases.get(name).and_then(|alias| {
254                    self.table_map
255                        .get(alias.as_str())
256                        .or_else(|| self.cte_map.get(alias.as_str()))
257                })
258            })
259            .cloned()
260    }
261
262    /// Execute a query in an isolated context. This prevents subqueries from mutating
263    /// arenas and other context state. Returns both the LazyFrame *and* its associated
264    /// Schema (so that the correct arenas are used when determining schema).
265    pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<(LazyFrame, SchemaRef)>
266    where
267        F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
268    {
269        // Save key state (arenas and lookups)
270        let (joined_aliases, table_aliases, lp_arena, expr_arena, table_map) = (
271            // "take" to ensure subqueries start with clean state
272            std::mem::take(&mut self.joined_aliases),
273            std::mem::take(&mut self.table_aliases),
274            std::mem::take(&mut self.lp_arena),
275            std::mem::take(&mut self.expr_arena),
276            // "clone" to allow subqueries to see registered tables
277            self.table_map.clone(),
278        );
279
280        // Execute query with clean state (eg: nested/subquery)
281        let mut lf = query(self)?;
282        let schema = self.get_frame_schema(&mut lf)?;
283
284        // Restore saved state
285        lf.set_cached_arena(
286            std::mem::replace(&mut self.lp_arena, lp_arena),
287            std::mem::replace(&mut self.expr_arena, expr_arena),
288        );
289        self.joined_aliases = joined_aliases;
290        self.table_aliases = table_aliases;
291        self.table_map = table_map;
292
293        Ok((lf, schema))
294    }
295
296    fn expr_or_ordinal(
297        &mut self,
298        e: &SQLExpr,
299        exprs: &[Expr],
300        selected: Option<&[Expr]>,
301        schema: Option<&Schema>,
302        clause: &str,
303    ) -> PolarsResult<Expr> {
304        match e {
305            SQLExpr::UnaryOp {
306                op: UnaryOperator::Minus,
307                expr,
308            } if matches!(
309                **expr,
310                SQLExpr::Value(ValueWithSpan {
311                    value: SQLValue::Number(_, _),
312                    ..
313                })
314            ) =>
315            {
316                if let SQLExpr::Value(ValueWithSpan {
317                    value: SQLValue::Number(ref idx, _),
318                    ..
319                }) = **expr
320                {
321                    Err(polars_err!(
322                    SQLSyntax:
323                    "negative ordinal values are invalid for {}; found -{}",
324                    clause,
325                    idx
326                    ))
327                } else {
328                    unreachable!()
329                }
330            },
331            SQLExpr::Value(ValueWithSpan {
332                value: SQLValue::Number(idx, _),
333                ..
334            }) => {
335                // note: sql queries are 1-indexed
336                let idx = idx.parse::<usize>().map_err(|_| {
337                    polars_err!(
338                        SQLSyntax:
339                        "negative ordinal values are invalid for {}; found {}",
340                        clause,
341                        idx
342                    )
343                })?;
344                // note: "selected" cols represent final projection order, so we use those for
345                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
346                let cols = if let Some(cols) = selected {
347                    cols
348                } else {
349                    exprs
350                };
351                Ok(cols
352                    .get(idx - 1)
353                    .ok_or_else(|| {
354                        polars_err!(
355                            SQLInterface:
356                            "{} ordinal value must refer to a valid column; found {}",
357                            clause,
358                            idx
359                        )
360                    })?
361                    .clone())
362            },
363            SQLExpr::Value(v) => Err(polars_err!(
364                SQLSyntax:
365                "{} requires a valid expression or positive ordinal; found {}", clause, v,
366            )),
367            _ => {
368                // Handle qualified cross-aliasing in ORDER BY clauses
369                // (eg: `SELECT a AS b, -b AS a ... ORDER BY self.a`)
370                let mut expr = parse_sql_expr(e, self, schema)?;
371                if matches!(e, SQLExpr::CompoundIdentifier(_)) {
372                    if let Some(schema) = schema {
373                        expr = expr.map_expr(|ex| match &ex {
374                            Expr::Column(name) => {
375                                let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
376                                if schema.contains(prefixed.as_str()) {
377                                    col(prefixed)
378                                } else {
379                                    ex
380                                }
381                            },
382                            _ => ex,
383                        });
384                    }
385                }
386                Ok(expr)
387            },
388        }
389    }
390
391    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
392        if let Some(aliases) = self.joined_aliases.get(tbl_name) {
393            if let Some(name) = aliases.get(column_name) {
394                return name.to_string();
395            }
396        }
397        column_name.to_string()
398    }
399
400    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
401        match expr {
402            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
403            SetExpr::Query(query) => self.execute_query_no_ctes(query),
404            SetExpr::SetOperation {
405                op: SetOperator::Union,
406                set_quantifier,
407                left,
408                right,
409            } => self.process_union(left, right, set_quantifier, query),
410
411            #[cfg(feature = "semi_anti_join")]
412            SetExpr::SetOperation {
413                op: SetOperator::Intersect | SetOperator::Except,
414                set_quantifier,
415                left,
416                right,
417            } => self.process_except_intersect(left, right, set_quantifier, query),
418
419            SetExpr::Values(Values {
420                explicit_row: _,
421                rows,
422                value_keyword: _,
423            }) => self.process_values(rows),
424
425            SetExpr::Table(tbl) => {
426                if tbl.table_name.is_some() {
427                    let table_name = tbl.table_name.as_ref().unwrap();
428                    self.get_table_from_current_scope(table_name)
429                        .ok_or_else(|| {
430                            polars_err!(
431                                SQLInterface: "no table or alias named '{}' found",
432                                tbl
433                            )
434                        })
435                } else {
436                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
437                }
438            },
439            op => {
440                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
441            },
442        }
443    }
444
445    #[cfg(feature = "semi_anti_join")]
446    fn process_except_intersect(
447        &mut self,
448        left: &SetExpr,
449        right: &SetExpr,
450        quantifier: &SetQuantifier,
451        query: &Query,
452    ) -> PolarsResult<LazyFrame> {
453        let (join_type, op_name) = match *query.body {
454            SetExpr::SetOperation {
455                op: SetOperator::Except,
456                ..
457            } => (JoinType::Anti, "EXCEPT"),
458            _ => (JoinType::Semi, "INTERSECT"),
459        };
460        let mut lf = self.process_query(left, query)?;
461        let mut rf = self.process_query(right, query)?;
462        let lf_schema = self.get_frame_schema(&mut lf)?;
463
464        let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
465        let rf_cols = match quantifier {
466            SetQuantifier::ByName => None,
467            SetQuantifier::Distinct | SetQuantifier::None => {
468                let rf_schema = self.get_frame_schema(&mut rf)?;
469                let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
470                if lf_cols.len() != rf_cols.len() {
471                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
472                }
473                Some(rf_cols)
474            },
475            _ => {
476                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
477            },
478        };
479        let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
480        let joined_tbl = match rf_cols {
481            Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
482            None => join.on(lf_cols).finish(),
483        };
484        Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
485    }
486
487    fn process_union(
488        &mut self,
489        left: &SetExpr,
490        right: &SetExpr,
491        quantifier: &SetQuantifier,
492        query: &Query,
493    ) -> PolarsResult<LazyFrame> {
494        let quantifier = *quantifier;
495        let lf = self.process_query(left, query)?;
496        let rf = self.process_query(right, query)?;
497
498        let cb = PlanCallback::new(
499            move |(mut plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
500                let mut rf = LazyFrame::from(plans.pop().unwrap());
501                let lf = LazyFrame::from(plans.pop().unwrap());
502
503                let opts = UnionArgs {
504                    parallel: true,
505                    to_supertypes: true,
506                    ..Default::default()
507                };
508                let out = match quantifier {
509                    // UNION [ALL | DISTINCT]
510                    SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
511                        let lf_schema = &schemas[0];
512                        let rf_schema = &schemas[1];
513                        if lf_schema.len() != rf_schema.len() {
514                            polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
515                        }
516                        // rename `rf` columns to match `lf` if they differ; SQL behaves
517                        // positionally on UNION ops (unless using the "BY NAME" qualifier)
518                        if lf_schema.iter_names().ne(rf_schema.iter_names()) {
519                            rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
520                        }
521                        let concatenated = concat(vec![lf, rf], opts);
522                        match quantifier {
523                            SetQuantifier::Distinct | SetQuantifier::None => {
524                                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
525                            },
526                            _ => concatenated,
527                        }
528                    },
529                    // UNION ALL BY NAME
530                    #[cfg(feature = "diagonal_concat")]
531                    SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
532                    // UNION [DISTINCT] BY NAME
533                    #[cfg(feature = "diagonal_concat")]
534                    SetQuantifier::ByName | SetQuantifier::DistinctByName => {
535                        let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
536                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
537                    },
538                    #[allow(unreachable_patterns)]
539                    _ => {
540                        polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
541                    },
542                };
543
544                out.map(|lf| lf.logical_plan)
545            },
546        );
547
548        Ok(lf.pipe_with_schemas(vec![rf], cb))
549    }
550
551    /// Process UNNEST as a lateral operation when it contains column references
552    /// (handles `CROSS JOIN UNNEST(col) AS name` by exploding the referenced col).
553    fn process_unnest_lateral(
554        &self,
555        lf: LazyFrame,
556        alias: &Option<TableAlias>,
557        array_exprs: &[SQLExpr],
558        with_offset: bool,
559    ) -> PolarsResult<LazyFrame> {
560        let alias = alias
561            .as_ref()
562            .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
563        polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
564
565        let (mut explode_cols, mut rename_from, mut rename_to) = (
566            Vec::with_capacity(array_exprs.len()),
567            Vec::with_capacity(array_exprs.len()),
568            Vec::with_capacity(array_exprs.len()),
569        );
570        let is_single_col = array_exprs.len() == 1;
571
572        for (i, arr_expr) in array_exprs.iter().enumerate() {
573            let col_name = match arr_expr {
574                SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
575                SQLExpr::CompoundIdentifier(parts) => {
576                    PlSmallStr::from_str(&parts.last().unwrap().value)
577                },
578                SQLExpr::Array(_) => polars_bail!(
579                    SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
580                ),
581                other => polars_bail!(
582                    SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
583                ),
584            };
585            // alias: column name from "AS t(col)", or table alias
586            if let Some(name) = alias
587                .columns
588                .get(i)
589                .map(|c| c.name.value.as_str())
590                .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
591                .filter(|name| !name.is_empty() && *name != col_name.as_str())
592            {
593                rename_from.push(col_name.clone());
594                rename_to.push(PlSmallStr::from_str(name));
595            }
596            explode_cols.push(col_name);
597        }
598
599        let mut lf = lf.explode(
600            Selector::ByName {
601                names: Arc::from(explode_cols),
602                strict: true,
603            },
604            ExplodeOptions {
605                empty_as_null: true,
606                keep_nulls: true,
607            },
608        );
609        if !rename_from.is_empty() {
610            lf = lf.rename(rename_from, rename_to, true);
611        }
612        Ok(lf)
613    }
614
615    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
616        let frame_rows: Vec<Row> = values.iter().map(|row| {
617            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
618                let expr = parse_sql_expr(expr, self, None)?;
619                match expr {
620                    Expr::Literal(value) => {
621                        value.to_any_value()
622                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
623                            .map(|av| av.into_static())
624                    },
625                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
626                }
627            }).collect();
628            row_data.map(Row::new)
629        }).collect::<Result<_, _>>()?;
630
631        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
632    }
633
634    // EXPLAIN SELECT * FROM DF
635    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
636        match stmt {
637            Statement::Explain { statement, .. } => {
638                let lf = self.execute_statement(statement)?;
639                let plan = lf.describe_optimized_plan()?;
640                let plan = plan
641                    .split('\n')
642                    .collect::<Series>()
643                    .with_name(PlSmallStr::from_static("Logical Plan"))
644                    .into_column();
645                let df = DataFrame::new(vec![plan])?;
646                Ok(df.lazy())
647            },
648            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
649        }
650    }
651
652    // SHOW TABLES
653    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
654        let tables = Column::new("name".into(), self.get_tables());
655        let df = DataFrame::new(vec![tables])?;
656        Ok(df.lazy())
657    }
658
659    // DROP TABLE <tbl>
660    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
661        match stmt {
662            Statement::Drop { names, .. } => {
663                names.iter().for_each(|name| {
664                    self.table_map.remove(&name.to_string());
665                });
666                Ok(DataFrame::empty().lazy())
667            },
668            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
669        }
670    }
671
672    // DELETE FROM <tbl> [WHERE ...]
673    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
674        if let Statement::Delete(Delete {
675            tables,
676            from,
677            using,
678            selection,
679            returning,
680            order_by,
681            limit,
682            delete_token: _,
683        }) = stmt
684        {
685            if !tables.is_empty()
686                || using.is_some()
687                || returning.is_some()
688                || limit.is_some()
689                || !order_by.is_empty()
690            {
691                let error_message = match () {
692                    _ if !tables.is_empty() => "DELETE expects exactly one table name",
693                    _ if using.is_some() => "DELETE does not support the USING clause",
694                    _ if returning.is_some() => "DELETE does not support the RETURNING clause",
695                    _ if limit.is_some() => "DELETE does not support the LIMIT clause",
696                    _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
697                    _ => unreachable!(),
698                };
699                polars_bail!(SQLInterface: error_message);
700            }
701            let from_tables = match &from {
702                FromTable::WithFromKeyword(from) => from,
703                FromTable::WithoutKeyword(from) => from,
704            };
705            if from_tables.len() > 1 {
706                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
707            }
708            let tbl_expr = from_tables.first().unwrap();
709            if !tbl_expr.joins.is_empty() {
710                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
711            }
712            let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
713            if selection.is_none() {
714                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
715                Ok(DataFrame::empty_with_schema(
716                    lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
717                        .unwrap()
718                        .as_ref(),
719                )
720                .lazy())
721            } else {
722                // apply constraint as inverted filter (drops rows matching the selection)
723                Ok(self.process_where(lf.clone(), selection, true, None)?)
724            }
725        } else {
726            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
727        }
728    }
729
730    // TRUNCATE <tbl>
731    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
732        if let Statement::Truncate(Truncate {
733            table_names,
734            partitions,
735            ..
736        }) = stmt
737        {
738            match partitions {
739                None => {
740                    if table_names.len() != 1 {
741                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
742                    }
743                    let tbl = table_names[0].name.to_string();
744                    if let Some(lf) = self.table_map.get_mut(&tbl) {
745                        *lf = DataFrame::empty_with_schema(
746                            lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
747                                .unwrap()
748                                .as_ref(),
749                        )
750                        .lazy();
751                        Ok(lf.clone())
752                    } else {
753                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
754                    }
755                },
756                _ => {
757                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
758                },
759            }
760        } else {
761            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
762        }
763    }
764
765    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
766        self.cte_map.insert(name.to_owned(), lf);
767    }
768
769    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
770        if let Some(with) = &query.with {
771            if with.recursive {
772                polars_bail!(SQLInterface: "recursive CTEs are not supported")
773            }
774            for cte in &with.cte_tables {
775                let cte_name = cte.alias.name.value.clone();
776                let mut lf = self.execute_query(&cte.query)?;
777                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
778                self.register_cte(&cte_name, lf);
779            }
780        }
781        Ok(())
782    }
783
784    fn register_named_windows(
785        &mut self,
786        named_windows: &[NamedWindowDefinition],
787    ) -> PolarsResult<()> {
788        for NamedWindowDefinition(name, expr) in named_windows {
789            let spec = match expr {
790                NamedWindowExpr::NamedWindow(ref_name) => self
791                    .named_windows
792                    .get(&ref_name.value)
793                    .ok_or_else(|| {
794                        polars_err!(
795                            SQLInterface:
796                            "named window '{}' references undefined window '{}'",
797                            name.value, ref_name.value
798                        )
799                    })?
800                    .clone(),
801                NamedWindowExpr::WindowSpec(spec) => spec.clone(),
802            };
803            self.named_windows.insert(name.value.clone(), spec);
804        }
805        Ok(())
806    }
807
808    /// execute the 'FROM' part of the query
809    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
810        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
811        if !tbl_expr.joins.is_empty() {
812            for join in &tbl_expr.joins {
813                // Handle "CROSS JOIN UNNEST(col)" as a lateral join op
814                if let (
815                    JoinOperator::CrossJoin(JoinConstraint::None),
816                    TableFactor::UNNEST {
817                        alias,
818                        array_exprs,
819                        with_offset,
820                        ..
821                    },
822                ) = (&join.join_operator, &join.relation)
823                {
824                    if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
825                        lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
826                        continue;
827                    }
828                }
829
830                let (r_name, mut rf) = self.get_table(&join.relation)?;
831                if r_name.is_empty() {
832                    // Require non-empty to avoid duplicate column errors from nested self-joins.
833                    polars_bail!(
834                        SQLInterface:
835                        "cannot join on unnamed relation; please provide an alias"
836                    )
837                }
838                let left_schema = self.get_frame_schema(&mut lf)?;
839                let right_schema = self.get_frame_schema(&mut rf)?;
840
841                lf = match &join.join_operator {
842                    op @ (JoinOperator::Join(constraint)  // note: bare "join" is inner
843                    | JoinOperator::FullOuter(constraint)
844                    | JoinOperator::Left(constraint)
845                    | JoinOperator::LeftOuter(constraint)
846                    | JoinOperator::Right(constraint)
847                    | JoinOperator::RightOuter(constraint)
848                    | JoinOperator::Inner(constraint)
849                    | JoinOperator::Anti(constraint)
850                    | JoinOperator::Semi(constraint)
851                    | JoinOperator::LeftAnti(constraint)
852                    | JoinOperator::LeftSemi(constraint)
853                    | JoinOperator::RightAnti(constraint)
854                    | JoinOperator::RightSemi(constraint)) => {
855                        let (lf, rf) = match op {
856                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
857                            _ => (lf, rf),
858                        };
859                        self.process_join(
860                            &TableInfo {
861                                frame: lf,
862                                name: (&l_name).into(),
863                                schema: left_schema.clone(),
864                            },
865                            &TableInfo {
866                                frame: rf,
867                                name: (&r_name).into(),
868                                schema: right_schema.clone(),
869                            },
870                            constraint,
871                            match op {
872                                JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
873                                JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
874                                    JoinType::Left
875                                },
876                                JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
877                                    JoinType::Right
878                                },
879                                JoinOperator::FullOuter(_) => JoinType::Full,
880                                #[cfg(feature = "semi_anti_join")]
881                                JoinOperator::Anti(_)
882                                | JoinOperator::LeftAnti(_)
883                                | JoinOperator::RightAnti(_) => JoinType::Anti,
884                                #[cfg(feature = "semi_anti_join")]
885                                JoinOperator::Semi(_)
886                                | JoinOperator::LeftSemi(_)
887                                | JoinOperator::RightSemi(_) => JoinType::Semi,
888                                join_type => polars_bail!(
889                                    SQLInterface:
890                                    "join type '{:?}' not currently supported",
891                                    join_type
892                                ),
893                            },
894                        )?
895                    },
896                    JoinOperator::CrossJoin(JoinConstraint::None) => {
897                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
898                    },
899                    JoinOperator::CrossJoin(constraint) => {
900                        polars_bail!(
901                            SQLInterface:
902                            "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
903                            constraint
904                        )
905                    },
906                    join_type => {
907                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
908                    },
909                };
910
911                // track join-aliased columns so we can resolve them later
912                let joined_schema = self.get_frame_schema(&mut lf)?;
913
914                self.joined_aliases.insert(
915                    r_name.clone(),
916                    right_schema
917                        .iter_names()
918                        .filter_map(|name| {
919                            // col exists in both tables and is aliased in the joined result
920                            let aliased_name = format!("{name}:{r_name}");
921                            if left_schema.contains(name)
922                                && joined_schema.contains(aliased_name.as_str())
923                            {
924                                Some((name.to_string(), aliased_name))
925                            } else {
926                                None
927                            }
928                        })
929                        .collect::<PlHashMap<String, String>>(),
930                );
931            }
932        };
933        Ok(lf)
934    }
935
936    /// Check that the SELECT statement only contains supported clauses.
937    fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
938        // Destructure "Select" exhaustively; that way if/when new fields are added in
939        // future sqlparser versions, we'll get a compilation error and can handle them
940        let Select {
941            // Supported clauses
942            distinct: _,
943            from: _,
944            group_by: _,
945            having: _,
946            named_window: _,
947            projection: _,
948            qualify: _,
949            selection: _,
950
951            // Metadata/token fields (can ignore)
952            select_token: _,
953            top_before_distinct: _,
954            window_before_qualify: _,
955
956            // Unsupported clauses
957            ref cluster_by,
958            ref connect_by,
959            ref distribute_by,
960            ref exclude,
961            ref flavor,
962            ref into,
963            ref lateral_views,
964            ref prewhere,
965            ref sort_by,
966            ref top,
967            ref value_table_mode,
968        } = *select_stmt;
969
970        // Raise specific error messages for unsupported attributes
971        polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
972        polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
973        polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
974        polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
975        polars_ensure!(matches!(flavor, SelectFlavor::Standard), SQLInterface: "this `SELECT` flavor is not supported");
976        polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO`clause  is not supported");
977        polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
978        polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
979        polars_ensure!(sort_by.is_empty(), SQLInterface: "SORT BY` clause is not supported; use `ORDER BY` instead");
980        polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
981        polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
982
983        Ok(())
984    }
985
986    /// Execute the 'SELECT' part of the query.
987    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
988        // Check that the statement doesn't contain unsupported SELECT clauses
989        self.validate_select(select_stmt)?;
990
991        // Parse named windows first, as they may be referenced in the SELECT clause
992        self.register_named_windows(&select_stmt.named_window)?;
993
994        // Get `FROM` table/data
995        let mut lf = if select_stmt.from.is_empty() {
996            DataFrame::empty().lazy()
997        } else {
998            // Note: implicit joins need more work to support properly,
999            // explicit joins are preferred for now (ref: #16662)
1000            let from = select_stmt.clone().from;
1001            if from.len() > 1 {
1002                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
1003            }
1004            self.execute_from_statement(from.first().unwrap())?
1005        };
1006
1007        // Apply `WHERE` constraint
1008        let mut schema = self.get_frame_schema(&mut lf)?;
1009        lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
1010
1011        // Determine projections
1012        let mut select_modifiers = SelectModifiers {
1013            ilike: None,
1014            exclude: PlHashSet::new(),
1015            rename: PlHashMap::new(),
1016            replace: vec![],
1017        };
1018
1019        // Collect window function cols if QUALIFY is present (we check at the
1020        // SQL level because empty OVER() clauses don't create Expr::Over)
1021        let window_fn_columns = if select_stmt.qualify.is_some() {
1022            select_stmt
1023                .projection
1024                .iter()
1025                .filter_map(|item| match item {
1026                    SelectItem::ExprWithAlias { expr, alias }
1027                        if expr_has_window_functions(expr) =>
1028                    {
1029                        Some(alias.value.clone())
1030                    },
1031                    _ => None,
1032                })
1033                .collect::<PlHashSet<_>>()
1034        } else {
1035            PlHashSet::new()
1036        };
1037
1038        let mut projections =
1039            self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1040
1041        // Apply `UNNEST` expressions
1042        let mut explode_names = Vec::new();
1043        let mut explode_exprs = Vec::new();
1044        let mut explode_lookup = PlHashMap::new();
1045
1046        for expr in &projections {
1047            for e in expr {
1048                if let Expr::Explode { input, .. } = e {
1049                    match input.as_ref() {
1050                        Expr::Column(name) => explode_names.push(name.clone()),
1051                        other_expr => {
1052                            // Note: skip aggregate expressions; those are handled in the GROUP BY phase
1053                            if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1054                                let temp_name =
1055                                    format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1056                                explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1057                                explode_lookup.insert(other_expr.clone(), temp_name.clone());
1058                                explode_names.push(temp_name);
1059                            }
1060                        },
1061                    }
1062                }
1063            }
1064        }
1065        if !explode_names.is_empty() {
1066            if !explode_exprs.is_empty() {
1067                lf = lf.with_columns(explode_exprs);
1068            }
1069            lf = lf.explode(
1070                Selector::ByName {
1071                    names: Arc::from(explode_names),
1072                    strict: true,
1073                },
1074                ExplodeOptions {
1075                    empty_as_null: true,
1076                    keep_nulls: true,
1077                },
1078            );
1079            projections = projections
1080                .into_iter()
1081                .map(|p| {
1082                    // Update "projections" with column refs to the now-exploded expressions
1083                    p.map_expr(|e| match e {
1084                        Expr::Explode { input, .. } => explode_lookup
1085                            .get(input.as_ref())
1086                            .map(|name| Expr::Column(name.clone()))
1087                            .unwrap_or_else(|| input.as_ref().clone()),
1088                        _ => e,
1089                    })
1090                })
1091                .collect();
1092
1093            schema = self.get_frame_schema(&mut lf)?;
1094        }
1095
1096        // Check for "GROUP BY ..." (after determining projections)
1097        let mut group_by_keys: Vec<Expr> = Vec::new();
1098        match &select_stmt.group_by {
1099            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
1100            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1101                if !modifiers.is_empty() {
1102                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1103                }
1104                // Translate the group expressions, resolving ordinal values and SELECT aliases
1105                group_by_keys = group_by_exprs
1106                    .iter()
1107                    .map(|e| match e {
1108                        SQLExpr::Identifier(ident) => {
1109                            resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1110                                || {
1111                                    self.expr_or_ordinal(
1112                                        e,
1113                                        &projections,
1114                                        None,
1115                                        Some(&schema),
1116                                        "GROUP BY",
1117                                    )
1118                                },
1119                                Ok,
1120                            )
1121                        },
1122                        _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1123                    })
1124                    .collect::<PolarsResult<_>>()?
1125            },
1126            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
1127            // nested agg/window funcs to the group key (also ignores literals).
1128            GroupByExpr::All(modifiers) => {
1129                if !modifiers.is_empty() {
1130                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1131                }
1132                projections.iter().for_each(|expr| match expr {
1133                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
1134                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1135                    Expr::Column(_) => group_by_keys.push(expr.clone()),
1136                    Expr::Alias(e, _)
1137                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1138                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1139                        if let Expr::Column(name) = &**e {
1140                            group_by_keys.push(col(name.clone()));
1141                        }
1142                    },
1143                    _ => {
1144                        // If not quick-matched, add if no nested agg/window expressions
1145                        if !has_expr(expr, |e| {
1146                            matches!(e, Expr::Agg(_))
1147                                || matches!(e, Expr::Len)
1148                                || matches!(e, Expr::Over { .. })
1149                                || {
1150                                    #[cfg(feature = "dynamic_group_by")]
1151                                    {
1152                                        matches!(e, Expr::Rolling { .. })
1153                                    }
1154                                    #[cfg(not(feature = "dynamic_group_by"))]
1155                                    {
1156                                        false
1157                                    }
1158                                }
1159                        }) {
1160                            group_by_keys.push(expr.clone())
1161                        }
1162                    },
1163                });
1164            },
1165        };
1166
1167        lf = if group_by_keys.is_empty() {
1168            // The 'having' clause is only valid inside 'group by'
1169            if select_stmt.having.is_some() {
1170                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1171            };
1172
1173            // Final/selected cols, accounting for 'SELECT *' modifiers
1174            let mut retained_cols = Vec::with_capacity(projections.len());
1175            let mut retained_names = Vec::with_capacity(projections.len());
1176            let have_order_by = query.order_by.is_some();
1177
1178            // Initialize containing InheritsContext to handle empty projection case.
1179            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1180
1181            // Note: if there is an 'order by' then we project everything (original cols
1182            // and new projections) and *then* select the final cols; the retained cols
1183            // are used to ensure a correct final projection. If there's no 'order by',
1184            // clause then we can project the final column *expressions* directly.
1185            for p in projections.iter() {
1186                let name = p.to_field(schema.deref())?.name.to_string();
1187                if select_modifiers.matches_ilike(&name)
1188                    && !select_modifiers.exclude.contains(&name)
1189                {
1190                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1191
1192                    retained_cols.push(if have_order_by {
1193                        col(name.as_str())
1194                    } else {
1195                        p.clone()
1196                    });
1197                    retained_names.push(col(name));
1198                }
1199            }
1200
1201            // Apply the remaining modifiers and establish the final projection
1202            if have_order_by {
1203                // We can safely use `with_columns()` and avoid a join if:
1204                // * There is already a projection that projects to the table height.
1205                // * All projection heights inherit from context (e.g. all scalar literals that
1206                //   are to be broadcasted to table height).
1207                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1208                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1209                {
1210                    lf = lf.with_columns(projections);
1211                } else {
1212                    // We hit this branch if the output height is not guaranteed to match the table
1213                    // height. E.g.:
1214                    //
1215                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
1216                    //
1217                    // For these cases we truncate / extend the sorting columns with NULLs to match
1218                    // the output height. We do this by projecting independently and then joining
1219                    // back the original frame on the row index.
1220                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1221                    lf = lf
1222                        .clone()
1223                        .select(projections)
1224                        .with_row_index(NAME, None)
1225                        .join(
1226                            lf.with_row_index(NAME, None),
1227                            [col(NAME)],
1228                            [col(NAME)],
1229                            JoinArgs {
1230                                how: JoinType::Left,
1231                                validation: Default::default(),
1232                                suffix: None,
1233                                slice: None,
1234                                nulls_equal: false,
1235                                coalesce: Default::default(),
1236                                maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
1237                            },
1238                        );
1239                }
1240            }
1241            if !select_modifiers.replace.is_empty() {
1242                lf = lf.with_columns(&select_modifiers.replace);
1243            }
1244            if !select_modifiers.rename.is_empty() {
1245                lf = lf.with_columns(select_modifiers.renamed_cols());
1246            }
1247            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1248
1249            // Note: If `have_order_by`, with_columns is already done above.
1250            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1251                && !have_order_by
1252            {
1253                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
1254                lf = lf.with_columns(retained_cols).select(retained_names);
1255            } else {
1256                lf = lf.select(retained_cols);
1257            }
1258            if !select_modifiers.rename.is_empty() {
1259                lf = lf.rename(
1260                    select_modifiers.rename.keys(),
1261                    select_modifiers.rename.values(),
1262                    true,
1263                );
1264            };
1265            lf
1266        } else {
1267            let having = select_stmt
1268                .having
1269                .as_ref()
1270                .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1271                .transpose()?;
1272            lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1273            lf = self.process_order_by(lf, &query.order_by, None)?;
1274
1275            // Drop any extra columns (eg: added to maintain ORDER BY access to original cols)
1276            let output_cols: Vec<_> = projections
1277                .iter()
1278                .map(|p| p.to_field(&schema))
1279                .collect::<PolarsResult<Vec<_>>>()?
1280                .into_iter()
1281                .map(|f| col(f.name))
1282                .collect();
1283
1284            lf.select(&output_cols)
1285        };
1286
1287        // Apply optional QUALIFY clause (filters on window functions).
1288        lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1289
1290        // Apply optional DISTINCT clause.
1291        lf = match &select_stmt.distinct {
1292            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1293            Some(Distinct::On(exprs)) => {
1294                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
1295                let schema = Some(self.get_frame_schema(&mut lf)?);
1296                let cols = exprs
1297                    .iter()
1298                    .map(|e| {
1299                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
1300                        if let Expr::Column(name) = expr {
1301                            Ok(name)
1302                        } else {
1303                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1304                        }
1305                    })
1306                    .collect::<PolarsResult<Vec<_>>>()?;
1307
1308                // DISTINCT ON has to apply the ORDER BY before the operation.
1309                lf = self.process_order_by(lf, &query.order_by, None)?;
1310                return Ok(lf.unique_stable(
1311                    Some(Selector::ByName {
1312                        names: cols.into(),
1313                        strict: true,
1314                    }),
1315                    UniqueKeepStrategy::First,
1316                ));
1317            },
1318            None => lf,
1319        };
1320        Ok(lf)
1321    }
1322
1323    fn column_projections(
1324        &mut self,
1325        select_stmt: &Select,
1326        schema: &SchemaRef,
1327        select_modifiers: &mut SelectModifiers,
1328    ) -> PolarsResult<Vec<Expr>> {
1329        let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
1330            .projection
1331            .iter()
1332            .map(|select_item| match select_item {
1333                SelectItem::UnnamedExpr(expr) => {
1334                    Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
1335                },
1336                SelectItem::ExprWithAlias { expr, alias } => {
1337                    let expr = parse_sql_expr(expr, self, Some(schema))?;
1338                    Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
1339                },
1340                SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1341                    // eg: `alias.*` or `schema.table.*`
1342                    SelectItemQualifiedWildcardKind::ObjectName(obj_name) => self
1343                        .process_qualified_wildcard(
1344                            obj_name,
1345                            wildcard_options,
1346                            select_modifiers,
1347                            Some(schema),
1348                        ),
1349                    // eg: `STRUCT<STRING>('foo').*`
1350                    SelectItemQualifiedWildcardKind::Expr(_) => {
1351                        polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1352                    },
1353                },
1354                SelectItem::Wildcard(wildcard_options) => {
1355                    let cols = schema
1356                        .iter_names()
1357                        .map(|name| col(name.clone()))
1358                        .collect::<Vec<_>>();
1359
1360                    self.process_wildcard_additional_options(
1361                        cols,
1362                        wildcard_options,
1363                        select_modifiers,
1364                        Some(schema),
1365                    )
1366                },
1367            })
1368            .collect();
1369
1370        let flattened_exprs: Vec<Expr> = parsed_items?
1371            .into_iter()
1372            .flatten()
1373            .flat_map(|expr| expand_exprs(expr, schema))
1374            .collect();
1375
1376        Ok(flattened_exprs)
1377    }
1378
1379    fn process_where(
1380        &mut self,
1381        mut lf: LazyFrame,
1382        expr: &Option<SQLExpr>,
1383        invert_filter: bool,
1384        schema: Option<SchemaRef>,
1385    ) -> PolarsResult<LazyFrame> {
1386        if let Some(expr) = expr {
1387            let schema = match schema {
1388                None => self.get_frame_schema(&mut lf)?,
1389                Some(s) => s,
1390            };
1391
1392            // shortcut filter evaluation if given expression is just TRUE or FALSE
1393            let (all_true, all_false) = match expr {
1394                SQLExpr::Value(ValueWithSpan {
1395                    value: SQLValue::Boolean(b),
1396                    ..
1397                }) => (*b, !*b),
1398                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1399                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => {
1400                        (a.value == b.value, a.value != b.value)
1401                    },
1402                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1403                        (a.value != b.value, a.value == b.value)
1404                    },
1405                    _ => (false, false),
1406                },
1407                _ => (false, false),
1408            };
1409            if (all_true && !invert_filter) || (all_false && invert_filter) {
1410                return Ok(lf);
1411            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1412                return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1413            }
1414
1415            // ...otherwise parse and apply the filter as normal
1416            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1417            if filter_expression.clone().meta().has_multiple_outputs() {
1418                filter_expression = all_horizontal([filter_expression])?;
1419            }
1420            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1421            lf = if invert_filter {
1422                lf.remove(filter_expression)
1423            } else {
1424                lf.filter(filter_expression)
1425            };
1426        }
1427        Ok(lf)
1428    }
1429
1430    pub(super) fn process_join(
1431        &mut self,
1432        tbl_left: &TableInfo,
1433        tbl_right: &TableInfo,
1434        constraint: &JoinConstraint,
1435        join_type: JoinType,
1436    ) -> PolarsResult<LazyFrame> {
1437        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1438        let coalesce_type = match constraint {
1439            // "NATURAL" joins should coalesce; otherwise we disambiguate
1440            JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1441            _ => JoinCoalesce::KeepColumns,
1442        };
1443        let joined = tbl_left
1444            .frame
1445            .clone()
1446            .join_builder()
1447            .with(tbl_right.frame.clone())
1448            .left_on(left_on)
1449            .right_on(right_on)
1450            .how(join_type)
1451            .suffix(format!(":{}", tbl_right.name))
1452            .coalesce(coalesce_type)
1453            .finish();
1454
1455        Ok(joined)
1456    }
1457
1458    fn process_qualify(
1459        &mut self,
1460        mut lf: LazyFrame,
1461        qualify_expr: &Option<SQLExpr>,
1462        window_fn_columns: &PlHashSet<String>,
1463    ) -> PolarsResult<LazyFrame> {
1464        if let Some(expr) = qualify_expr {
1465            // Check the QUALIFY expression to identify window functions
1466            // and collect column refs (for looking up aliases from SELECT)
1467            let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1468            let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1469            if !has_window_fns && !references_window_alias {
1470                polars_bail!(
1471                    SQLSyntax:
1472                    "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1473                );
1474            }
1475            let schema = self.get_frame_schema(&mut lf)?;
1476            let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1477            if filter_expression.clone().meta().has_multiple_outputs() {
1478                filter_expression = all_horizontal([filter_expression])?;
1479            }
1480            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1481            lf = lf.filter(filter_expression);
1482        }
1483        Ok(lf)
1484    }
1485
1486    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1487        let mut contexts = vec![];
1488        for expr in exprs {
1489            *expr = expr.clone().map_expr(|e| match e {
1490                Expr::SubPlan(lp, names) => {
1491                    contexts.push(<LazyFrame>::from((**lp).clone()));
1492                    if names.len() == 1 {
1493                        Expr::Column(names[0].as_str().into())
1494                    } else {
1495                        Expr::SubPlan(lp, names)
1496                    }
1497                },
1498                e => e,
1499            })
1500        }
1501        if contexts.is_empty() {
1502            lf
1503        } else {
1504            lf.with_context(contexts)
1505        }
1506    }
1507
1508    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1509        if let Statement::CreateTable(CreateTable {
1510            if_not_exists,
1511            name,
1512            query,
1513            columns,
1514            like,
1515            ..
1516        }) = stmt
1517        {
1518            let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1519            if *if_not_exists && self.table_map.contains_key(tbl_name) {
1520                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1521            }
1522            let lf = match (query, columns.is_empty(), like) {
1523                (Some(query), true, None) => {
1524                    // ----------------------------------------------------
1525                    // CREATE TABLE [IF NOT EXISTS] <name> AS <query>
1526                    // ----------------------------------------------------
1527                    self.execute_query(query)?
1528                },
1529                (None, false, None) => {
1530                    // ----------------------------------------------------
1531                    // CREATE TABLE [IF NOT EXISTS] <name> (<coldef>, ...)
1532                    // ----------------------------------------------------
1533                    let mut schema = Schema::with_capacity(columns.len());
1534                    for col in columns {
1535                        let col_name = col.name.value.as_str();
1536                        let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1537                        schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1538                    }
1539                    DataFrame::empty_with_schema(&schema).lazy()
1540                },
1541                (None, true, Some(like_kind)) => {
1542                    // ----------------------------------------------------
1543                    // CREATE TABLE [IF NOT EXISTS] <name> LIKE <table>
1544                    // ----------------------------------------------------
1545                    let like_name = match like_kind {
1546                        CreateTableLikeKind::Plain(like)
1547                        | CreateTableLikeKind::Parenthesized(like) => &like.name,
1548                    };
1549                    let like_table = like_name
1550                        .0
1551                        .first()
1552                        .unwrap()
1553                        .as_ident()
1554                        .unwrap()
1555                        .value
1556                        .as_str();
1557                    if let Some(mut table) = self.table_map.get(like_table).cloned() {
1558                        let schema = self.get_frame_schema(&mut table)?;
1559                        DataFrame::empty_with_schema(&schema).lazy()
1560                    } else {
1561                        polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1562                    }
1563                },
1564                // No valid options provided
1565                (None, true, None) => {
1566                    polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1567                },
1568                // Mutually exclusive options
1569                _ => {
1570                    polars_bail!(
1571                        SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1572                        query,
1573                        columns,
1574                        like,
1575                    )
1576                },
1577            };
1578            self.register(tbl_name, lf);
1579
1580            let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1581            Ok(df_created.unwrap().lazy())
1582        } else {
1583            unreachable!()
1584        }
1585    }
1586
1587    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1588        match relation {
1589            TableFactor::Table {
1590                name, alias, args, ..
1591            } => {
1592                if let Some(args) = args {
1593                    return self.execute_table_function(name, alias, &args.args);
1594                }
1595                let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1596                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1597                    match alias {
1598                        Some(alias) => {
1599                            self.table_aliases
1600                                .insert(alias.name.value.clone(), tbl_name.to_string());
1601                            Ok((alias.name.value.clone(), lf))
1602                        },
1603                        None => Ok((tbl_name.to_string(), lf)),
1604                    }
1605                } else {
1606                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1607                }
1608            },
1609            TableFactor::Derived {
1610                lateral,
1611                subquery,
1612                alias,
1613            } => {
1614                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1615                if let Some(alias) = alias {
1616                    let mut lf = self.execute_query_no_ctes(subquery)?;
1617                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1618                    self.table_map.insert(alias.name.value.clone(), lf.clone());
1619                    Ok((alias.name.value.clone(), lf))
1620                } else {
1621                    polars_bail!(SQLSyntax: "derived tables must have aliases");
1622                }
1623            },
1624            TableFactor::UNNEST {
1625                alias,
1626                array_exprs,
1627                with_offset,
1628                with_offset_alias: _,
1629                ..
1630            } => {
1631                if let Some(alias) = alias {
1632                    let column_names: Vec<Option<PlSmallStr>> = alias
1633                        .columns
1634                        .iter()
1635                        .map(|c| {
1636                            if c.name.value.is_empty() {
1637                                None
1638                            } else {
1639                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1640                            }
1641                        })
1642                        .collect();
1643
1644                    let column_values: Vec<Series> = array_exprs
1645                        .iter()
1646                        .map(|arr| parse_sql_array(arr, self))
1647                        .collect::<Result<_, _>>()?;
1648
1649                    polars_ensure!(!column_names.is_empty(),
1650                        SQLSyntax:
1651                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1652                    );
1653                    if column_names.len() != column_values.len() {
1654                        let plural = if column_values.len() > 1 { "s" } else { "" };
1655                        polars_bail!(
1656                            SQLSyntax:
1657                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1658                        );
1659                    }
1660                    let column_series: Vec<Column> = column_values
1661                        .into_iter()
1662                        .zip(column_names)
1663                        .map(|(s, name)| {
1664                            if let Some(name) = name {
1665                                s.with_name(name)
1666                            } else {
1667                                s
1668                            }
1669                        })
1670                        .map(Column::from)
1671                        .collect();
1672
1673                    let lf = DataFrame::new(column_series)?.lazy();
1674
1675                    if *with_offset {
1676                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1677                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1678                    }
1679                    let table_name = alias.name.value.clone();
1680                    self.table_map.insert(table_name.clone(), lf.clone());
1681                    Ok((table_name, lf))
1682                } else {
1683                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1684                }
1685            },
1686            TableFactor::NestedJoin {
1687                table_with_joins,
1688                alias,
1689            } => {
1690                let lf = self.execute_from_statement(table_with_joins)?;
1691                match alias {
1692                    Some(a) => Ok((a.name.value.clone(), lf)),
1693                    None => Ok(("".to_string(), lf)),
1694                }
1695            },
1696            // Support bare table, optionally with an alias, for now
1697            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1698        }
1699    }
1700
1701    fn execute_table_function(
1702        &mut self,
1703        name: &ObjectName,
1704        alias: &Option<TableAlias>,
1705        args: &[FunctionArg],
1706    ) -> PolarsResult<(String, LazyFrame)> {
1707        let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1708        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1709        let (tbl_name, lf) = read_fn.execute(args)?;
1710        #[allow(clippy::useless_asref)]
1711        let tbl_name = alias
1712            .as_ref()
1713            .map(|a| a.name.value.clone())
1714            .unwrap_or_else(|| tbl_name.to_str().to_string());
1715
1716        self.table_map.insert(tbl_name.clone(), lf.clone());
1717        Ok((tbl_name, lf))
1718    }
1719
1720    fn process_order_by(
1721        &mut self,
1722        mut lf: LazyFrame,
1723        order_by: &Option<OrderBy>,
1724        selected: Option<&[Expr]>,
1725    ) -> PolarsResult<LazyFrame> {
1726        if order_by.as_ref().is_none_or(|ob| match &ob.kind {
1727            OrderByKind::Expressions(exprs) => exprs.is_empty(),
1728            OrderByKind::All(_) => false,
1729        }) {
1730            return Ok(lf);
1731        }
1732        let schema = self.get_frame_schema(&mut lf)?;
1733        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1734        let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
1735            OrderByKind::Expressions(exprs) => {
1736                // TODO: will look at making an upstream PR that allows us to more easily
1737                //  create a GenericDialect variant supporting "OrderByKind::All" instead
1738                if exprs.len() == 1
1739                    && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
1740                        if ident.value.to_uppercase() == "ALL"
1741                        && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1742                {
1743                    // Treat as ORDER BY ALL
1744                    let n_cols = if let Some(selected) = selected {
1745                        selected.len()
1746                    } else {
1747                        schema.len()
1748                    };
1749                    (vec![], Some(&exprs[0].options), n_cols)
1750                } else {
1751                    (exprs.clone(), None, exprs.len())
1752                }
1753            },
1754            OrderByKind::All(opts) => {
1755                let n_cols = if let Some(selected) = selected {
1756                    selected.len()
1757                } else {
1758                    schema.len()
1759                };
1760                (vec![], Some(opts), n_cols)
1761            },
1762        };
1763        let mut descending = Vec::with_capacity(n_order_cols);
1764        let mut nulls_last = Vec::with_capacity(n_order_cols);
1765        let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
1766
1767        if let Some(opts) = order_by_all {
1768            if let Some(selected) = selected {
1769                by.extend(selected.iter().cloned());
1770            } else {
1771                by.extend(columns_iter);
1772            };
1773            let desc_order = !opts.asc.unwrap_or(true);
1774            nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
1775            descending.resize(by.len(), desc_order);
1776        } else {
1777            let columns = &columns_iter.collect::<Vec<_>>();
1778            for ob in order_by {
1779                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1780                // https://www.postgresql.org/docs/current/queries-order.html
1781                let desc_order = !ob.options.asc.unwrap_or(true);
1782                nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
1783                descending.push(desc_order);
1784
1785                // translate order expression, allowing ordinal values
1786                by.push(self.expr_or_ordinal(
1787                    &ob.expr,
1788                    columns,
1789                    selected,
1790                    Some(&schema),
1791                    "ORDER BY",
1792                )?)
1793            }
1794        }
1795        Ok(lf.sort_by_exprs(
1796            &by,
1797            SortMultipleOptions::default()
1798                .with_order_descending_multi(descending)
1799                .with_nulls_last_multi(nulls_last)
1800                .with_maintain_order(true),
1801        ))
1802    }
1803
1804    fn process_group_by(
1805        &mut self,
1806        mut lf: LazyFrame,
1807        group_by_keys: &[Expr],
1808        projections: &[Expr],
1809        having: Option<Expr>,
1810    ) -> PolarsResult<LazyFrame> {
1811        let schema_before = self.get_frame_schema(&mut lf)?;
1812        let group_by_keys_schema =
1813            expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
1814                format!("group_by keys contained duplicate output name '{duplicate_name}'")
1815            })?;
1816
1817        // Note: remove the `group_by` keys as Polars adds those implicitly.
1818        let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
1819        let mut aggregation_projection = Vec::with_capacity(projections.len());
1820        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1821        let mut projection_aliases = PlHashSet::new();
1822        let mut group_key_aliases = PlHashSet::new();
1823
1824        // Pre-compute group key data (stripped expression + output name) to avoid repeated work.
1825        // We check both expression AND output name match to avoid cross-aliasing issues.
1826        let group_key_data: Vec<_> = group_by_keys
1827            .iter()
1828            .map(|gk| {
1829                (
1830                    strip_outer_alias(gk),
1831                    gk.to_field(&schema_before).ok().map(|f| f.name),
1832                )
1833            })
1834            .collect();
1835
1836        let projection_matches_group_key: Vec<bool> = projections
1837            .iter()
1838            .map(|p| {
1839                let p_stripped = strip_outer_alias(p);
1840                let p_name = p.to_field(&schema_before).ok().map(|f| f.name);
1841                group_key_data
1842                    .iter()
1843                    .any(|(gk_stripped, gk_name)| *gk_stripped == p_stripped && *gk_name == p_name)
1844            })
1845            .collect();
1846
1847        for (e, &matches_group_key) in projections.iter().zip(&projection_matches_group_key) {
1848            // `Len` represents COUNT(*) so we treat as an aggregation here.
1849            let is_non_group_key_expr = !matches_group_key
1850                && has_expr(e, |e| {
1851                    match e {
1852                        Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
1853                        #[cfg(feature = "dynamic_group_by")]
1854                        Expr::Rolling { .. } => true,
1855                        Expr::Function { function: func, .. }
1856                            if !matches!(func, FunctionExpr::StructExpr(_)) =>
1857                        {
1858                            // If it's a function call containing a column NOT in the group by keys,
1859                            // we treat it as an aggregation.
1860                            has_expr(e, |e| match e {
1861                                Expr::Column(name) => !group_by_keys_schema.contains(name),
1862                                _ => false,
1863                            })
1864                        },
1865                        _ => false,
1866                    }
1867                });
1868
1869            // Note: if simple aliased expression we defer aliasing until after the group_by.
1870            // Use `e_inner` to track the potentially unwrapped expression for field lookup.
1871            let mut e_inner = e;
1872            if let Expr::Alias(expr, alias) = e {
1873                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1874                    group_key_aliases.insert(alias.as_ref());
1875                    e_inner = expr
1876                } else if let Expr::Function {
1877                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1878                    ..
1879                } = expr.deref()
1880                {
1881                    projection_overrides
1882                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1883                } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
1884                    projection_aliases.insert(alias.as_ref());
1885                }
1886            }
1887            let field = e_inner.to_field(&schema_before)?;
1888            if is_non_group_key_expr {
1889                let mut e = e.clone();
1890                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1891                    e = (**expr).clone();
1892                } else if let Expr::Alias(expr, name) = &e {
1893                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1894                        e = (**expr).clone().alias(name.clone());
1895                    }
1896                }
1897                // If aggregation colname conflicts with a group key,
1898                // alias it to avoid duplicate/mis-tracked columns
1899                if group_by_keys_schema.get(&field.name).is_some() {
1900                    let alias_name = format!("__POLARS_AGG_{}", field.name);
1901                    e = e.alias(alias_name.as_str());
1902                    aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
1903                }
1904                aggregation_projection.push(e);
1905            } else if !matches_group_key {
1906                // Non-aggregated columns must be part of the GROUP BY clause
1907                if let Expr::Column(_)
1908                | Expr::Function {
1909                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1910                    ..
1911                } = e_inner
1912                {
1913                    if !group_by_keys_schema.contains(&field.name) {
1914                        polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1915                    }
1916                }
1917            }
1918        }
1919
1920        // Process HAVING clause: identify aggregate expressions, reusing those already
1921        // in projections, or compute as temporary columns and then post-filter/discard
1922        let having_filter = if let Some(having_expr) = having {
1923            let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
1924                .iter()
1925                .filter_map(|p| match p {
1926                    Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
1927                        Some((inner.as_ref().clone(), name.clone()))
1928                    },
1929                    e @ (Expr::Agg(_) | Expr::Len) => Some((
1930                        e.clone(),
1931                        e.to_field(&schema_before)
1932                            .map(|f| f.name)
1933                            .unwrap_or_default(),
1934                    )),
1935                    _ => None,
1936                })
1937                .collect();
1938
1939            let mut n_having_aggs = 0;
1940            let updated_having = having_expr.map_expr(|e| {
1941                if !matches!(&e, Expr::Agg(_) | Expr::Len) {
1942                    return e;
1943                }
1944                let name = agg_to_name
1945                    .iter()
1946                    .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
1947                    .unwrap_or_else(|| {
1948                        let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
1949                        aggregation_projection.push(e.clone().alias(n.clone()));
1950                        agg_to_name.push((e.clone(), n.clone()));
1951                        n_having_aggs += 1;
1952                        n
1953                    });
1954                col(name)
1955            });
1956            Some(updated_having)
1957        } else {
1958            None
1959        };
1960
1961        // Apply HAVING filter after aggregation
1962        let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1963        if let Some(filter_expr) = having_filter {
1964            aggregated = aggregated.filter(filter_expr);
1965        }
1966
1967        let projection_schema =
1968            expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
1969                format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
1970            })?;
1971
1972        // A final projection to get the proper order and any deferred transforms/aliases
1973        // (will also drop any temporary columns created for the HAVING post-filter).
1974        let final_projection = projection_schema
1975            .iter_names()
1976            .zip(projections.iter().zip(&projection_matches_group_key))
1977            .map(|(name, (projection_expr, &matches_group_key))| {
1978                if let Some(expr) = projection_overrides.get(name.as_str()) {
1979                    expr.clone()
1980                } else if let Some(aliased_name) = aliased_aggregations.get(name) {
1981                    col(aliased_name.clone()).alias(name.clone())
1982                } else if group_by_keys_schema.get(name).is_some() && matches_group_key {
1983                    col(name.clone())
1984                } else if group_by_keys_schema.get(name).is_some()
1985                    || projection_aliases.contains(name.as_str())
1986                    || group_key_aliases.contains(name.as_str())
1987                {
1988                    if has_expr(projection_expr, |e| {
1989                        matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
1990                    }) {
1991                        col(name.clone())
1992                    } else {
1993                        projection_expr.clone()
1994                    }
1995                } else {
1996                    col(name.clone())
1997                }
1998            })
1999            .collect::<Vec<_>>();
2000
2001        // Include original GROUP BY columns for ORDER BY access (if aliased).
2002        let mut output_projection = final_projection;
2003        for key_name in group_by_keys_schema.iter_names() {
2004            if !projection_schema.contains(key_name) {
2005                // Original col name not in output - add for ORDER BY access
2006                output_projection.push(col(key_name.clone()));
2007            } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2008                // Original col name in output - check if cross-aliased
2009                let is_cross_aliased = projections.iter().any(|p| {
2010                    p.to_field(&schema_before).is_ok_and(|f| f.name == key_name)
2011                        && !is_simple_col_ref(p, key_name)
2012                });
2013                if is_cross_aliased {
2014                    // Add original name under a prefixed alias for subsequent ORDER BY resolution
2015                    let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2016                    output_projection.push(col(key_name.clone()).alias(internal_name));
2017                }
2018            }
2019        }
2020        Ok(aggregated.select(&output_projection))
2021    }
2022
2023    fn process_limit_offset(
2024        &self,
2025        lf: LazyFrame,
2026        limit: Option<&SQLExpr>,
2027        offset: Option<&SQLExpr>,
2028    ) -> PolarsResult<LazyFrame> {
2029        match (offset, limit) {
2030            (
2031                Some(SQLExpr::Value(ValueWithSpan {
2032                    value: SQLValue::Number(offset, _),
2033                    ..
2034                })),
2035                Some(SQLExpr::Value(ValueWithSpan {
2036                    value: SQLValue::Number(limit, _),
2037                    ..
2038                })),
2039            ) => Ok(lf.slice(
2040                offset
2041                    .parse()
2042                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2043                limit
2044                    .parse()
2045                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
2046            )),
2047            (
2048                Some(SQLExpr::Value(ValueWithSpan {
2049                    value: SQLValue::Number(offset, _),
2050                    ..
2051                })),
2052                None,
2053            ) => Ok(lf.slice(
2054                offset
2055                    .parse()
2056                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2057                IdxSize::MAX,
2058            )),
2059            (
2060                None,
2061                Some(SQLExpr::Value(ValueWithSpan {
2062                    value: SQLValue::Number(limit, _),
2063                    ..
2064                })),
2065            ) => Ok(lf.limit(
2066                limit
2067                    .parse()
2068                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
2069            )),
2070            (None, None) => Ok(lf),
2071            _ => polars_bail!(
2072                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
2073            ),
2074        }
2075    }
2076
2077    fn process_qualified_wildcard(
2078        &mut self,
2079        ObjectName(idents): &ObjectName,
2080        options: &WildcardAdditionalOptions,
2081        modifiers: &mut SelectModifiers,
2082        schema: Option<&Schema>,
2083    ) -> PolarsResult<Vec<Expr>> {
2084        let mut new_idents = idents.clone();
2085        new_idents.push(ObjectNamePart::Identifier(Ident::new("*")));
2086
2087        let ident_refs: Vec<&Ident> = new_idents.iter().filter_map(|p| p.as_ident()).collect();
2088        let expr = resolve_compound_identifier(
2089            self,
2090            &ident_refs.iter().map(|&i| i.clone()).collect::<Vec<_>>(),
2091            schema,
2092        );
2093        self.process_wildcard_additional_options(expr?, options, modifiers, schema)
2094    }
2095
2096    fn process_wildcard_additional_options(
2097        &mut self,
2098        exprs: Vec<Expr>,
2099        options: &WildcardAdditionalOptions,
2100        modifiers: &mut SelectModifiers,
2101        schema: Option<&Schema>,
2102    ) -> PolarsResult<Vec<Expr>> {
2103        if options.opt_except.is_some() && options.opt_exclude.is_some() {
2104            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2105        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2106            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2107        }
2108
2109        // SELECT * EXCLUDE
2110        if let Some(items) = &options.opt_exclude {
2111            match items {
2112                ExcludeSelectItem::Single(ident) => {
2113                    modifiers.exclude.insert(ident.value.clone());
2114                },
2115                ExcludeSelectItem::Multiple(idents) => {
2116                    modifiers
2117                        .exclude
2118                        .extend(idents.iter().map(|i| i.value.clone()));
2119                },
2120            };
2121        }
2122
2123        // SELECT * EXCEPT
2124        if let Some(items) = &options.opt_except {
2125            modifiers.exclude.insert(items.first_element.value.clone());
2126            modifiers
2127                .exclude
2128                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2129        }
2130
2131        // SELECT * ILIKE
2132        if let Some(item) = &options.opt_ilike {
2133            let rx = regex::escape(item.pattern.as_str())
2134                .replace('%', ".*")
2135                .replace('_', ".");
2136
2137            modifiers.ilike = Some(
2138                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2139            );
2140        }
2141
2142        // SELECT * RENAME
2143        if let Some(items) = &options.opt_rename {
2144            let renames = match items {
2145                RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2146                RenameSelectItem::Multiple(renames) => renames.as_slice(),
2147            };
2148            for rn in renames {
2149                let before = PlSmallStr::from_str(rn.ident.value.as_str());
2150                let after = PlSmallStr::from_str(rn.alias.value.as_str());
2151                if before != after {
2152                    modifiers.rename.insert(before, after);
2153                }
2154            }
2155        }
2156
2157        // SELECT * REPLACE
2158        if let Some(replacements) = &options.opt_replace {
2159            for rp in &replacements.items {
2160                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2161                modifiers
2162                    .replace
2163                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2164            }
2165        }
2166        Ok(exprs)
2167    }
2168
2169    fn rename_columns_from_table_alias(
2170        &mut self,
2171        mut lf: LazyFrame,
2172        alias: &TableAlias,
2173    ) -> PolarsResult<LazyFrame> {
2174        if alias.columns.is_empty() {
2175            Ok(lf)
2176        } else {
2177            let schema = self.get_frame_schema(&mut lf)?;
2178            if alias.columns.len() != schema.len() {
2179                polars_bail!(
2180                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2181                    alias.columns.len(), alias.name.value, schema.len()
2182                )
2183            } else {
2184                let existing_columns: Vec<_> = schema.iter_names().collect();
2185                let new_columns: Vec<_> =
2186                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
2187                Ok(lf.rename(existing_columns, new_columns, true))
2188            }
2189        }
2190    }
2191}
2192
2193impl SQLContext {
2194    /// Get internal table map. For internal use only.
2195    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
2196        self.table_map.clone()
2197    }
2198
2199    /// Create a new SQLContext from a table map. For internal use only
2200    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2201        Self {
2202            table_map,
2203            ..Default::default()
2204        }
2205    }
2206}
2207
2208fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2209    match expr {
2210        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2211            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2212            schema
2213                .iter_names()
2214                .filter(|name| re.is_match(name))
2215                .map(|name| col(name.clone()))
2216                .collect::<Vec<_>>()
2217        },
2218        Expr::Selector(s) => s
2219            .into_columns(schema, &Default::default())
2220            .unwrap()
2221            .into_iter()
2222            .map(col)
2223            .collect::<Vec<_>>(),
2224        _ => vec![expr],
2225    }
2226}
2227
2228fn is_regex_colname(nm: &str) -> bool {
2229    nm.starts_with('^') && nm.ends_with('$')
2230}
2231
2232/// Visitor that checks if an expression tree contains a reference to a specific table.
2233struct FindTableIdentifier<'a> {
2234    table_name: &'a str,
2235    found: bool,
2236}
2237
2238impl<'a> SQLVisitor for FindTableIdentifier<'a> {
2239    type Break = ();
2240
2241    fn pre_visit_expr(&mut self, expr: &SQLExpr) -> ControlFlow<Self::Break> {
2242        if let SQLExpr::CompoundIdentifier(idents) = expr {
2243            if idents.len() >= 2 && idents[0].value.as_str() == self.table_name {
2244                self.found = true; // return immediately on first match
2245                return ControlFlow::Break(());
2246            }
2247        }
2248        ControlFlow::Continue(())
2249    }
2250}
2251
2252/// Visitor used to check a SQL expression used in a QUALIFY clause.
2253/// (Confirms window functions are present and collects column refs in one pass).
2254struct QualifyExpression {
2255    has_window_functions: bool,
2256    column_refs: PlHashSet<String>,
2257}
2258
2259impl QualifyExpression {
2260    fn new() -> Self {
2261        Self {
2262            has_window_functions: false,
2263            column_refs: PlHashSet::new(),
2264        }
2265    }
2266
2267    fn analyze(expr: &SQLExpr) -> (bool, PlHashSet<String>) {
2268        let mut analyzer = Self::new();
2269        let _ = expr.visit(&mut analyzer);
2270        (analyzer.has_window_functions, analyzer.column_refs)
2271    }
2272}
2273
2274impl SQLVisitor for QualifyExpression {
2275    type Break = ();
2276
2277    fn pre_visit_expr(&mut self, expr: &SQLExpr) -> ControlFlow<Self::Break> {
2278        match expr {
2279            SQLExpr::Function(func) if func.over.is_some() => {
2280                self.has_window_functions = true;
2281            },
2282            SQLExpr::Identifier(ident) => {
2283                self.column_refs.insert(ident.value.clone());
2284            },
2285            SQLExpr::CompoundIdentifier(idents) if !idents.is_empty() => {
2286                self.column_refs
2287                    .insert(idents.last().unwrap().value.clone());
2288            },
2289            _ => {},
2290        }
2291        ControlFlow::Continue(())
2292    }
2293}
2294
2295/// Check if a SQL expression contains explicit window functions.
2296/// Uses early-exit for efficiency when only the boolean result is needed.
2297fn expr_has_window_functions(expr: &SQLExpr) -> bool {
2298    struct WindowFunctionFinder;
2299    impl SQLVisitor for WindowFunctionFinder {
2300        type Break = ();
2301        fn pre_visit_expr(&mut self, expr: &SQLExpr) -> ControlFlow<()> {
2302            if matches!(expr, SQLExpr::Function(f) if f.over.is_some()) {
2303                ControlFlow::Break(())
2304            } else {
2305                ControlFlow::Continue(())
2306            }
2307        }
2308    }
2309    expr.visit(&mut WindowFunctionFinder).is_break()
2310}
2311
2312/// Check if an expression is a simple column reference (with optional alias) to the given name.
2313fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2314    match expr {
2315        Expr::Column(n) => n == col_name,
2316        Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2317        _ => false,
2318    }
2319}
2320
2321/// Strip the outer alias from an expression (if present) for expression equality comparison.
2322fn strip_outer_alias(expr: &Expr) -> Expr {
2323    if let Expr::Alias(inner, _) = expr {
2324        inner.as_ref().clone()
2325    } else {
2326        expr.clone()
2327    }
2328}
2329
2330/// Resolve a SELECT alias to its underlying expression (for use in GROUP BY).
2331///
2332/// Returns the expression WITH alias if the name matches a projection alias and is NOT a column
2333/// that exists in the schema; otherwise returns `None` to use the default/standard resolution.
2334fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2335    // Original columns take precedence over SELECT aliases
2336    if schema.contains(name) {
2337        return None;
2338    }
2339    // Find a projection with this alias and return its expression (preserving the alias)
2340    projections.iter().find_map(|p| match p {
2341        Expr::Alias(inner, alias) if alias.as_str() == name => {
2342            Some(inner.as_ref().clone().alias(alias.clone()))
2343        },
2344        _ => None,
2345    })
2346}
2347
2348/// Check if all columns referred to in a Polars expression exist in the given Schema.
2349fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2350    let mut found_cols = false;
2351    let mut all_in_schema = true;
2352    for e in expr.into_iter() {
2353        if let Expr::Column(name) = e {
2354            found_cols = true;
2355            if !schema.contains(name.as_str()) {
2356                all_in_schema = false;
2357                break;
2358            }
2359        }
2360    }
2361    found_cols && all_in_schema
2362}
2363
2364/// Check if a SQL expression contains a reference to a specific table.
2365fn expr_refers_to_table(expr: &SQLExpr, table_name: &str) -> bool {
2366    let mut table_finder = FindTableIdentifier {
2367        table_name,
2368        found: false,
2369    };
2370    let _ = expr.visit(&mut table_finder);
2371    table_finder.found
2372}
2373
2374/// Determine which parsed join expressions actually belong in `left_om` and which in `right_on`.
2375///
2376/// This needs to be handled carefully because in SQL joins you can write "join on" constraints
2377/// either way round, and in joins with more than two tables you can also join against an earlier
2378/// table (e.g.: you could be joining `df1` to `df2` to `df3`, but the final join condition where
2379/// we join `df2` to `df3` could refer to `df1.a = df3.b`; this takes a little more work to
2380/// resolve as our native `join` function operates on only two tables at a time.
2381fn determine_left_right_join_on(
2382    ctx: &mut SQLContext,
2383    expr_left: &SQLExpr,
2384    expr_right: &SQLExpr,
2385    tbl_left: &TableInfo,
2386    tbl_right: &TableInfo,
2387    join_schema: &Schema,
2388) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2389    // parse, removing any aliases that may have been added by `resolve_column`
2390    // (called inside `parse_sql_expr`) as we need the actual/underlying col
2391    let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2392        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2393        e => e,
2394    };
2395    let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2396        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2397        e => e,
2398    };
2399
2400    // ------------------------------------------------------------------
2401    // simple/typical case: can fully resolve SQL-level table references
2402    // ------------------------------------------------------------------
2403    let left_refs = (
2404        expr_refers_to_table(expr_left, &tbl_left.name),
2405        expr_refers_to_table(expr_left, &tbl_right.name),
2406    );
2407    let right_refs = (
2408        expr_refers_to_table(expr_right, &tbl_left.name),
2409        expr_refers_to_table(expr_right, &tbl_right.name),
2410    );
2411    // if the SQL-level references unambiguously indicate table ownership, we're done
2412    match (left_refs, right_refs) {
2413        // standard: left expr → left table, right expr → right table
2414        ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2415        // reversed: left expr → right table, right expr → left table
2416        ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2417        // unsupported: one side references *both* tables
2418        ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2419            polars_bail!(
2420               SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2421               if left_refs.0 && left_refs.1 {
2422                    "left"
2423                } else {
2424                    "right"
2425                }, tbl_left.name, tbl_right.name
2426            )
2427        },
2428        // fall through to the more involved col/ref resolution
2429        _ => {},
2430    }
2431
2432    // ------------------------------------------------------------------
2433    // more involved: additionally employ schema-based column resolution
2434    // (applies to unqualified columns and/or chained joins)
2435    // ------------------------------------------------------------------
2436    let left_on_cols_in = (
2437        expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2438        expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2439    );
2440    let right_on_cols_in = (
2441        expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2442        expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2443    );
2444    match (left_on_cols_in, right_on_cols_in) {
2445        // each expression's columns exist in exactly one schema
2446        ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2447        ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2448        // one expression in both, other only in one; prefer the unique one
2449        ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2450        ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2451        ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2452        ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2453        // pass through as-is
2454        _ => Ok((vec![left_on], vec![right_on])),
2455    }
2456}
2457
2458fn process_join_on(
2459    ctx: &mut SQLContext,
2460    sql_expr: &SQLExpr,
2461    tbl_left: &TableInfo,
2462    tbl_right: &TableInfo,
2463) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2464    match sql_expr {
2465        SQLExpr::BinaryOp { left, op, right } => match op {
2466            BinaryOperator::And => {
2467                let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
2468                let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
2469                left_i.append(&mut left_j);
2470                right_i.append(&mut right_j);
2471                Ok((left_i, right_i))
2472            },
2473            BinaryOperator::Eq => {
2474                // establish unified schema with cols from both tables; needed for multi/chained
2475                // joins where suffixed intermediary/joined cols aren't in an existing schema.
2476                let mut join_schema =
2477                    Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2478                for (name, dtype) in tbl_left.schema.iter() {
2479                    join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2480                }
2481                for (name, dtype) in tbl_right.schema.iter() {
2482                    if !join_schema.contains(name) {
2483                        join_schema.insert_at_index(
2484                            join_schema.len(),
2485                            name.clone(),
2486                            dtype.clone(),
2487                        )?;
2488                    }
2489                }
2490                determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
2491            },
2492            _ => polars_bail!(
2493                // TODO: should be able to support more operators later (via `join_where`?)
2494                SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
2495            ),
2496        },
2497        SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2498        _ => polars_bail!(
2499            SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
2500        ),
2501    }
2502}
2503
2504fn process_join_constraint(
2505    constraint: &JoinConstraint,
2506    tbl_left: &TableInfo,
2507    tbl_right: &TableInfo,
2508    ctx: &mut SQLContext,
2509) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2510    match constraint {
2511        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2512            process_join_on(ctx, expr, tbl_left, tbl_right)
2513        },
2514        JoinConstraint::Using(idents) if !idents.is_empty() => {
2515            let using: Vec<Expr> = idents
2516                .iter()
2517                .map(|ObjectName(parts)| {
2518                    if parts.len() != 1 {
2519                        polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2520                    }
2521                    match parts[0].as_ident() {
2522                        Some(ident) => Ok(col(ident.value.as_str())),
2523                        None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2524                    }
2525                })
2526                .collect::<PolarsResult<Vec<_>>>()?;
2527            Ok((using.clone(), using))
2528        },
2529        JoinConstraint::Natural => {
2530            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2531            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2532            let on: Vec<Expr> = left_names
2533                .intersection(&right_names)
2534                .map(|&name| col(name.clone()))
2535                .collect();
2536            if on.is_empty() {
2537                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2538            }
2539            Ok((on.clone(), on))
2540        },
2541        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2542    }
2543}
2544
2545bitflags::bitflags! {
2546    /// Bitfield indicating whether there exists a projection with the specified height behavior.
2547    ///
2548    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
2549    /// context.
2550    #[derive(PartialEq)]
2551    struct ExprSqlProjectionHeightBehavior: u8 {
2552        /// Maintains the height of input column(s)
2553        const MaintainsColumn = 1 << 0;
2554        /// Height is independent of input, e.g.:
2555        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
2556        /// * aggregations: count(*), first(), sum() etc.
2557        const Independent = 1 << 1;
2558        /// "Inherits" the height of the context, e.g.:
2559        /// * Scalar literals
2560        const InheritsContext = 1 << 2;
2561    }
2562}
2563
2564impl ExprSqlProjectionHeightBehavior {
2565    fn identify_from_expr(expr: &Expr) -> Self {
2566        let mut has_column = false;
2567        let mut has_independent = false;
2568
2569        for e in expr.into_iter() {
2570            use Expr::*;
2571            has_column |= matches!(e, Column(_) | Selector(_));
2572            has_independent |= match e {
2573                // @TODO: This is broken now with functions.
2574                AnonymousFunction { options, .. } => {
2575                    options.returns_scalar() || !options.is_length_preserving()
2576                },
2577                Literal(v) => !v.is_scalar(),
2578                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
2579                Agg { .. } | Len => true,
2580                _ => false,
2581            }
2582        }
2583        if has_independent {
2584            Self::Independent
2585        } else if has_column {
2586            Self::MaintainsColumn
2587        } else {
2588            Self::InheritsContext
2589        }
2590    }
2591}