Skip to main content

polars_sql/
context.rs

1use std::ops::Deref;
2use std::sync::RwLock;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::aliases::{PlHashSet, PlIndexSet};
11use polars_utils::format_pl_smallstr;
12use sqlparser::ast::{
13    BinaryOperator as SQLBinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct,
14    ExcludeSelectItem, Expr as SQLExpr, Fetch, FromTable, FunctionArg, GroupByExpr, Ident,
15    JoinConstraint, JoinOperator, LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName,
16    ObjectType, OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectFlavor, SelectItem,
17    SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
18    TableFactor, TableWithJoins, Truncate, UnaryOperator as SQLUnaryOperator, Value as SQLValue,
19    ValueWithSpan, Values, Visit, WildcardAdditionalOptions, WindowSpec,
20};
21use sqlparser::dialect::GenericDialect;
22use sqlparser::parser::{Parser, ParserOptions};
23
24use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
25use crate::sql_expr::{
26    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
27};
28use crate::sql_visitors::{
29    QualifyExpression, TableIdentifierCollector, check_for_ambiguous_column_refs,
30    expr_has_window_functions, expr_refers_to_table,
31};
32use crate::table_functions::PolarsTableFunctions;
33use crate::types::map_sql_dtype_to_polars;
34
35#[derive(Clone)]
36pub struct TableInfo {
37    pub(crate) frame: LazyFrame,
38    pub(crate) name: PlSmallStr,
39    pub(crate) schema: Arc<Schema>,
40}
41
42struct SelectModifiers {
43    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
44    ilike: Option<regex::Regex>,               // SELECT * ILIKE
45    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
46    replace: Vec<Expr>,                        // SELECT * REPLACE
47}
48impl SelectModifiers {
49    fn matches_ilike(&self, s: &str) -> bool {
50        match &self.ilike {
51            Some(rx) => rx.is_match(s),
52            None => true,
53        }
54    }
55    fn renamed_cols(&self) -> Vec<Expr> {
56        self.rename
57            .iter()
58            .map(|(before, after)| col(before.clone()).alias(after.clone()))
59            .collect()
60    }
61}
62
63/// For SELECT projection items; helps simplify any required disambiguation.
64enum ProjectionItem {
65    QualifiedExprs(PlSmallStr, Vec<Expr>),
66    Exprs(Vec<Expr>),
67}
68
69/// Extract the output column name from an expression (if it has one).
70fn expr_output_name(expr: &Expr) -> Option<&PlSmallStr> {
71    match expr {
72        Expr::Column(name) | Expr::Alias(_, name) => Some(name),
73        _ => None,
74    }
75}
76
77/// Disambiguate qualified wildcard columns that conflict with each other or other projections.
78fn disambiguate_projection_cols(
79    items: Vec<ProjectionItem>,
80    schema: &Schema,
81) -> PolarsResult<Vec<Expr>> {
82    // Establish qualified wildcard names (with counts), and other expression names
83    let mut qualified_wildcard_names: PlHashMap<PlSmallStr, usize> = PlHashMap::new();
84    let mut other_names: PlHashSet<PlSmallStr> = PlHashSet::new();
85    for item in &items {
86        match item {
87            ProjectionItem::QualifiedExprs(_, exprs) => {
88                for expr in exprs {
89                    if let Some(name) = expr_output_name(expr) {
90                        *qualified_wildcard_names.entry(name.clone()).or_insert(0) += 1;
91                    }
92                }
93            },
94            ProjectionItem::Exprs(exprs) => {
95                for expr in exprs {
96                    if let Some(name) = expr_output_name(expr) {
97                        other_names.insert(name.clone());
98                    }
99                }
100            },
101        }
102    }
103
104    // Names requiring disambiguation (duplicates across wildcards, eg: `tbl1.*`,`tbl2.*`)
105    let needs_suffix: PlHashSet<PlSmallStr> = qualified_wildcard_names
106        .into_iter()
107        .filter(|(name, count)| *count > 1 || other_names.contains(name))
108        .map(|(name, _)| name)
109        .collect();
110
111    // Output, applying suffixes where needed
112    let mut result: Vec<Expr> = Vec::new();
113    for item in items {
114        match item {
115            ProjectionItem::QualifiedExprs(tbl_name, exprs) if !needs_suffix.is_empty() => {
116                for expr in exprs {
117                    if let Some(name) = expr_output_name(&expr) {
118                        if needs_suffix.contains(name) {
119                            let suffixed = format_pl_smallstr!("{}:{}", name, tbl_name);
120                            if schema.contains(suffixed.as_str()) {
121                                result.push(col(suffixed));
122                                continue;
123                            }
124                            if other_names.contains(name) {
125                                polars_bail!(
126                                    SQLInterface:
127                                    "column '{}' is duplicated in the SELECT (explicitly, and via the `*` wildcard)", name
128                                );
129                            }
130                        }
131                    }
132                    result.push(expr);
133                }
134            },
135            ProjectionItem::QualifiedExprs(_, exprs) | ProjectionItem::Exprs(exprs) => {
136                result.extend(exprs);
137            },
138        }
139    }
140    Ok(result)
141}
142
143/// What to do with the rows whose WHERE predicate evaluates to true.
144#[derive(Clone, Copy, PartialEq, Eq)]
145pub(crate) enum FilterMode {
146    /// `SELECT ... WHERE`: keep exactly the rows where the predicate is true
147    /// (rows where it is false or NULL are dropped).
148    KeepTrue,
149    /// `DELETE ... WHERE`: drop exactly the rows where the predicate is true
150    /// (rows where it is false or NULL are kept).
151    RemoveTrue,
152}
153
154/// The SQLContext is the main entry point for executing SQL queries.
155#[derive(Clone)]
156pub struct SQLContext {
157    pub(crate) table_map: Arc<RwLock<PlHashMap<String, LazyFrame>>>,
158    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
159    pub(crate) lp_arena: Arena<IR>,
160    pub(crate) expr_arena: Arena<AExpr>,
161
162    cte_map: PlHashMap<String, LazyFrame>,
163    table_aliases: PlHashMap<String, String>,
164    joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
165    pub(crate) named_windows: PlHashMap<String, WindowSpec>,
166}
167
168impl Default for SQLContext {
169    fn default() -> Self {
170        Self {
171            function_registry: Arc::new(DefaultFunctionRegistry {}),
172            table_map: Default::default(),
173            cte_map: Default::default(),
174            table_aliases: Default::default(),
175            joined_aliases: Default::default(),
176            named_windows: Default::default(),
177            lp_arena: Default::default(),
178            expr_arena: Default::default(),
179        }
180    }
181}
182
183impl SQLContext {
184    /// Create a new SQLContext.
185    /// ```rust
186    /// # use polars_sql::SQLContext;
187    /// # fn main() {
188    /// let ctx = SQLContext::new();
189    /// # }
190    /// ```
191    pub fn new() -> Self {
192        Self::default()
193    }
194
195    /// Get the names of all registered tables, in sorted order.
196    pub fn get_tables(&self) -> Vec<String> {
197        let mut tables = Vec::from_iter(self.table_map.read().unwrap().keys().cloned());
198        tables.sort_unstable();
199        tables
200    }
201
202    /// Register a [`LazyFrame`] as a table in the SQLContext.
203    /// ```rust
204    /// # use polars_sql::SQLContext;
205    /// # use polars_core::prelude::*;
206    /// # use polars_lazy::prelude::*;
207    /// # fn main() {
208    ///
209    /// let mut ctx = SQLContext::new();
210    /// let df = df! {
211    ///    "a" =>  [1, 2, 3],
212    /// }.unwrap().lazy();
213    ///
214    /// ctx.register("df", df);
215    /// # }
216    ///```
217    pub fn register(&self, name: &str, lf: LazyFrame) {
218        self.table_map.write().unwrap().insert(name.to_owned(), lf);
219    }
220
221    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
222    pub fn unregister(&self, name: &str) {
223        self.table_map.write().unwrap().remove(&name.to_owned());
224    }
225
226    /// Execute a SQL query, returning a [`LazyFrame`].
227    /// ```rust
228    /// # use polars_sql::SQLContext;
229    /// # use polars_core::prelude::*;
230    /// # use polars_lazy::prelude::*;
231    /// # fn main() {
232    ///
233    /// let mut ctx = SQLContext::new();
234    /// let df = df! {
235    ///    "a" =>  [1, 2, 3],
236    /// }
237    /// .unwrap();
238    ///
239    /// ctx.register("df", df.clone().lazy());
240    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
241    /// assert!(sql_df.equals(&df));
242    /// # }
243    ///```
244    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
245        let mut parser = Parser::new(&GenericDialect);
246        parser = parser.with_options(ParserOptions {
247            trailing_commas: true,
248            ..Default::default()
249        });
250
251        let ast = parser
252            .try_with_sql(query)
253            .map_err(to_sql_interface_err)?
254            .parse_statements()
255            .map_err(to_sql_interface_err)?;
256
257        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
258        let res = self.execute_statement(ast.first().unwrap())?;
259
260        // Ensure the result uses the proper arenas.
261        // This will instantiate new arenas with a new version.
262        let lp_arena = std::mem::take(&mut self.lp_arena);
263        let expr_arena = std::mem::take(&mut self.expr_arena);
264        res.set_cached_arena(lp_arena, expr_arena);
265
266        // Every execution should clear the statement-level maps.
267        self.cte_map.clear();
268        self.table_aliases.clear();
269        self.joined_aliases.clear();
270        self.named_windows.clear();
271
272        Ok(res)
273    }
274
275    /// Add a function registry to the SQLContext.
276    /// The registry provides the ability to add custom functions to the SQLContext.
277    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
278        self.function_registry = function_registry;
279        self
280    }
281
282    /// Get the function registry of the SQLContext
283    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
284        &self.function_registry
285    }
286
287    /// Get a mutable reference to the function registry of the SQLContext
288    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
289        Arc::get_mut(&mut self.function_registry).unwrap()
290    }
291}
292
293impl SQLContext {
294    pub(crate) fn isolated(&self) -> Self {
295        Self {
296            // Deep clone to isolate
297            table_map: Arc::new(RwLock::new(self.table_map.read().unwrap().clone())),
298            named_windows: self.named_windows.clone(),
299            cte_map: self.cte_map.clone(),
300
301            ..Default::default()
302        }
303    }
304
305    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
306        let ast = stmt;
307        Ok(match ast {
308            Statement::Query(query) => self.execute_query(query)?,
309            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
310            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
311            stmt @ Statement::Drop {
312                object_type: ObjectType::Table,
313                ..
314            } => self.execute_drop_table(stmt)?,
315            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
316            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
317            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
318            _ => polars_bail!(
319                SQLInterface: "statement type is not supported:\n{:?}", ast,
320            ),
321        })
322    }
323
324    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
325        self.register_ctes(query)?;
326        self.execute_query_no_ctes(query)
327    }
328
329    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
330        self.validate_query(query)?;
331
332        let lf = self.process_query(&query.body, query)?;
333        self.process_limit_offset(lf, &query.limit_clause, &query.fetch)
334    }
335
336    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
337        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
338    }
339
340    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
341        // Resolve the table name in the current scope; multi-stage fallback
342        // * table name → cte name
343        // * table alias → cte alias
344        self.table_map
345            .read()
346            .unwrap()
347            .get(name)
348            .cloned()
349            .or_else(|| self.cte_map.get(name).cloned())
350            .or_else(|| {
351                self.table_aliases.get(name).and_then(|alias| {
352                    self.table_map
353                        .read()
354                        .unwrap()
355                        .get(alias.as_str())
356                        .or_else(|| self.cte_map.get(alias.as_str()))
357                        .cloned()
358                })
359            })
360    }
361
362    /// Execute a query in an isolated context. This prevents subqueries from mutating
363    /// arenas and other context state. Returns both the LazyFrame *and* its associated
364    /// Schema (so that the correct arenas are used when determining schema).
365    pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<LazyFrame>
366    where
367        F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
368    {
369        let mut ctx = self.isolated();
370
371        // Execute query with clean state (eg: nested/subquery)
372        let lf = query(&mut ctx)?;
373
374        // Save state
375        lf.set_cached_arena(ctx.lp_arena, ctx.expr_arena);
376
377        Ok(lf)
378    }
379
380    fn expr_or_ordinal(
381        &mut self,
382        e: &SQLExpr,
383        exprs: &[Expr],
384        selected: Option<&[Expr]>,
385        schema: Option<&Schema>,
386        clause: &str,
387    ) -> PolarsResult<Expr> {
388        match e {
389            SQLExpr::UnaryOp {
390                op: SQLUnaryOperator::Minus,
391                expr,
392            } if matches!(
393                **expr,
394                SQLExpr::Value(ValueWithSpan {
395                    value: SQLValue::Number(_, _),
396                    ..
397                })
398            ) =>
399            {
400                if let SQLExpr::Value(ValueWithSpan {
401                    value: SQLValue::Number(ref idx, _),
402                    ..
403                }) = **expr
404                {
405                    Err(polars_err!(
406                    SQLSyntax:
407                    "negative ordinal values are invalid for {}; found -{}",
408                    clause,
409                    idx
410                    ))
411                } else {
412                    unreachable!()
413                }
414            },
415            SQLExpr::Value(ValueWithSpan {
416                value: SQLValue::Number(idx, _),
417                ..
418            }) => {
419                // note: sql queries are 1-indexed
420                let idx = idx.parse::<usize>().map_err(|_| {
421                    polars_err!(
422                        SQLSyntax:
423                        "negative ordinal values are invalid for {}; found {}",
424                        clause,
425                        idx
426                    )
427                })?;
428                // note: "selected" cols represent final projection order, so we use those for
429                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
430                let cols = if let Some(cols) = selected {
431                    cols
432                } else {
433                    exprs
434                };
435                Ok(cols
436                    .get(idx - 1)
437                    .ok_or_else(|| {
438                        polars_err!(
439                            SQLInterface:
440                            "{} ordinal value must refer to a valid column; found {}",
441                            clause,
442                            idx
443                        )
444                    })?
445                    .clone())
446            },
447            SQLExpr::Value(v) => Err(polars_err!(
448                SQLSyntax:
449                "{} requires a valid expression or positive ordinal; found {}", clause, v,
450            )),
451            _ => {
452                // Handle qualified cross-aliasing in ORDER BY clauses
453                // (eg: `SELECT a AS b, -b AS a ... ORDER BY self.a`)
454                let mut expr = parse_sql_expr(e, self, schema)?;
455                if matches!(e, SQLExpr::CompoundIdentifier(_)) {
456                    if let Some(schema) = schema {
457                        expr = expr.map_expr(|ex| match &ex {
458                            Expr::Column(name) => {
459                                let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
460                                if schema.contains(prefixed.as_str()) {
461                                    col(prefixed)
462                                } else {
463                                    ex
464                                }
465                            },
466                            _ => ex,
467                        });
468                    }
469                }
470                Ok(expr)
471            },
472        }
473    }
474
475    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
476        if let Some(aliases) = self.joined_aliases.get(tbl_name) {
477            if let Some(name) = aliases.get(column_name) {
478                return name.to_string();
479            }
480        }
481        column_name.to_string()
482    }
483
484    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
485        match expr {
486            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
487            SetExpr::Query(nested_query) => {
488                let lf = self.execute_query_no_ctes(nested_query)?;
489                self.process_order_by(lf, &query.order_by, None)
490            },
491            SetExpr::SetOperation {
492                op: SetOperator::Union,
493                set_quantifier,
494                left,
495                right,
496            } => self.process_union(left, right, set_quantifier, query),
497
498            #[cfg(feature = "semi_anti_join")]
499            SetExpr::SetOperation {
500                op: SetOperator::Intersect | SetOperator::Except,
501                set_quantifier,
502                left,
503                right,
504            } => self.process_except_intersect(left, right, set_quantifier, query),
505
506            SetExpr::Values(Values {
507                explicit_row: _,
508                rows,
509                value_keyword: _,
510            }) => self.process_values(rows),
511
512            SetExpr::Table(tbl) => {
513                if let Some(table_name) = tbl.table_name.as_ref() {
514                    self.get_table_from_current_scope(table_name)
515                        .ok_or_else(|| {
516                            polars_err!(
517                                SQLInterface: "no table or alias named '{}' found",
518                                tbl
519                            )
520                        })
521                } else {
522                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
523                }
524            },
525            op => {
526                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
527            },
528        }
529    }
530
531    #[cfg(feature = "semi_anti_join")]
532    fn process_except_intersect(
533        &mut self,
534        left: &SetExpr,
535        right: &SetExpr,
536        quantifier: &SetQuantifier,
537        query: &Query,
538    ) -> PolarsResult<LazyFrame> {
539        let (join_type, op_name) = match *query.body {
540            SetExpr::SetOperation {
541                op: SetOperator::Except,
542                ..
543            } => (JoinType::Anti, "EXCEPT"),
544            _ => (JoinType::Semi, "INTERSECT"),
545        };
546
547        // Note: each side of the EXCEPT/INTERSECT operation should execute
548        // in isolation to prevent context state leakage between them
549        let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
550        let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
551        let lf_schema = self.get_frame_schema(&mut lf)?;
552
553        let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
554        let rf_cols = match quantifier {
555            SetQuantifier::ByName => None,
556            SetQuantifier::Distinct | SetQuantifier::None => {
557                let rf_schema = self.get_frame_schema(&mut rf)?;
558                let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
559                if lf_cols.len() != rf_cols.len() {
560                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
561                }
562                Some(rf_cols)
563            },
564            _ => {
565                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
566            },
567        };
568        let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
569        let joined_tbl = match rf_cols {
570            Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
571            None => join.on(lf_cols).finish(),
572        };
573        let lf = joined_tbl.unique(None, UniqueKeepStrategy::Any);
574        self.process_order_by(lf, &query.order_by, None)
575    }
576
577    fn process_union(
578        &mut self,
579        left: &SetExpr,
580        right: &SetExpr,
581        quantifier: &SetQuantifier,
582        query: &Query,
583    ) -> PolarsResult<LazyFrame> {
584        let quantifier = *quantifier;
585
586        // Note: each side of the UNION operation should execute
587        // in isolation to prevent context state leakage between them
588        let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
589        let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
590
591        let opts = UnionArgs {
592            parallel: true,
593            to_supertypes: true,
594            maintain_order: false,
595            ..Default::default()
596        };
597        let lf = match quantifier {
598            // UNION [ALL | DISTINCT]
599            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
600                let lf_schema = self.get_frame_schema(&mut lf)?;
601                let rf_schema = self.get_frame_schema(&mut rf)?;
602                if lf_schema.len() != rf_schema.len() {
603                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
604                }
605                // rename `rf` columns to match `lf` if they differ; SQL behaves
606                // positionally on UNION ops (unless using the "BY NAME" qualifier)
607                if lf_schema.iter_names().ne(rf_schema.iter_names()) {
608                    rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
609                }
610                let concatenated = concat(vec![lf, rf], opts);
611                match quantifier {
612                    SetQuantifier::Distinct | SetQuantifier::None => {
613                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
614                    },
615                    _ => concatenated,
616                }
617            },
618            // UNION ALL BY NAME
619            #[cfg(feature = "diagonal_concat")]
620            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
621            // UNION [DISTINCT] BY NAME
622            #[cfg(feature = "diagonal_concat")]
623            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
624                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
625                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
626            },
627            #[allow(unreachable_patterns)]
628            _ => {
629                polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
630            },
631        }?;
632
633        self.process_order_by(lf, &query.order_by, None)
634    }
635
636    /// Process UNNEST as a lateral operation when it contains column references
637    /// (handles `CROSS JOIN UNNEST(col) AS name` by exploding the referenced col).
638    fn process_unnest_lateral(
639        &self,
640        lf: LazyFrame,
641        alias: &Option<TableAlias>,
642        array_exprs: &[SQLExpr],
643        with_offset: bool,
644    ) -> PolarsResult<LazyFrame> {
645        let alias = alias
646            .as_ref()
647            .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
648        polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
649
650        let (mut explode_cols, mut rename_from, mut rename_to) = (
651            Vec::with_capacity(array_exprs.len()),
652            Vec::with_capacity(array_exprs.len()),
653            Vec::with_capacity(array_exprs.len()),
654        );
655        let is_single_col = array_exprs.len() == 1;
656
657        for (i, arr_expr) in array_exprs.iter().enumerate() {
658            let col_name = match arr_expr {
659                SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
660                SQLExpr::CompoundIdentifier(parts) => {
661                    PlSmallStr::from_str(&parts.last().unwrap().value)
662                },
663                SQLExpr::Array(_) => polars_bail!(
664                    SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
665                ),
666                other => polars_bail!(
667                    SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
668                ),
669            };
670            // alias: column name from "AS t(col)", or table alias
671            if let Some(name) = alias
672                .columns
673                .get(i)
674                .map(|c| c.name.value.as_str())
675                .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
676                .filter(|name| !name.is_empty() && *name != col_name.as_str())
677            {
678                rename_from.push(col_name.clone());
679                rename_to.push(PlSmallStr::from_str(name));
680            }
681            explode_cols.push(col_name);
682        }
683
684        let mut lf = lf.explode(
685            Selector::ByName {
686                names: Arc::from(explode_cols),
687                strict: true,
688            },
689            ExplodeOptions {
690                empty_as_null: true,
691                keep_nulls: true,
692            },
693        );
694        if !rename_from.is_empty() {
695            lf = lf.rename(rename_from, rename_to, true);
696        }
697        Ok(lf)
698    }
699
700    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
701        let frame_rows: Vec<Row> = values.iter().map(|row| {
702            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
703                let expr = parse_sql_expr(expr, self, None)?;
704                match expr {
705                    Expr::Literal(value) => {
706                        value.to_any_value()
707                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
708                            .map(|av| av.into_static())
709                    },
710                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
711                }
712            }).collect();
713            row_data.map(Row::new)
714        }).collect::<Result<_, _>>()?;
715
716        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
717    }
718
719    // EXPLAIN SELECT * FROM DF
720    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
721        match stmt {
722            Statement::Explain { statement, .. } => {
723                let lf = self.execute_statement(statement)?;
724                let plan = lf.describe_optimized_plan()?;
725                let plan = plan
726                    .split('\n')
727                    .collect::<Series>()
728                    .with_name(PlSmallStr::from_static("Logical Plan"))
729                    .into_column();
730                let df = DataFrame::new_infer_height(vec![plan])?;
731                Ok(df.lazy())
732            },
733            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
734        }
735    }
736
737    // SHOW TABLES
738    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
739        let tables = Column::new("name".into(), self.get_tables());
740        let df = DataFrame::new_infer_height(vec![tables])?;
741        Ok(df.lazy())
742    }
743
744    // DROP TABLE <tbl>
745    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
746        match stmt {
747            Statement::Drop { names, .. } => {
748                names.iter().for_each(|name| {
749                    self.table_map.write().unwrap().remove(&name.to_string());
750                });
751                Ok(DataFrame::empty().lazy())
752            },
753            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
754        }
755    }
756
757    // DELETE FROM <tbl> [WHERE ...]
758    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
759        if let Statement::Delete(Delete {
760            tables,
761            from,
762            using,
763            selection,
764            returning,
765            order_by,
766            limit,
767            delete_token: _,
768        }) = stmt
769        {
770            let error_message: Option<&'static str> = if !tables.is_empty() {
771                Some("DELETE expects exactly one table name")
772            } else if using.is_some() {
773                Some("DELETE does not support the USING clause")
774            } else if returning.is_some() {
775                Some("DELETE does not support the RETURNING clause")
776            } else if limit.is_some() {
777                Some("DELETE does not support the LIMIT clause")
778            } else if !order_by.is_empty() {
779                Some("DELETE does not support the ORDER BY clause")
780            } else {
781                None
782            };
783
784            if let Some(msg) = error_message {
785                polars_bail!(SQLInterface: msg);
786            }
787
788            let from_tables = match &from {
789                FromTable::WithFromKeyword(from) => from,
790                FromTable::WithoutKeyword(from) => from,
791            };
792            if from_tables.len() > 1 {
793                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
794            }
795            let tbl_expr = from_tables.first().unwrap();
796            if !tbl_expr.joins.is_empty() {
797                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
798            }
799            let (_, lf) = self.get_table(&tbl_expr.relation)?;
800            if selection.is_none() {
801                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
802                Ok(lf.clear())
803            } else {
804                // apply constraint as inverted filter (drops rows matching the selection)
805                Ok(self.process_where(lf.clone(), selection, FilterMode::RemoveTrue, None)?)
806            }
807        } else {
808            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
809        }
810    }
811
812    // TRUNCATE <tbl>
813    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
814        if let Statement::Truncate(Truncate {
815            table_names,
816            partitions,
817            ..
818        }) = stmt
819        {
820            match partitions {
821                None => {
822                    if table_names.len() != 1 {
823                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
824                    }
825                    let tbl = table_names[0].name.to_string();
826                    if let Some(lf) = self.table_map.write().unwrap().get_mut(&tbl) {
827                        *lf = lf.clone().clear();
828                        Ok(lf.clone())
829                    } else {
830                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
831                    }
832                },
833                _ => {
834                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
835                },
836            }
837        } else {
838            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
839        }
840    }
841
842    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
843        self.cte_map.insert(name.to_owned(), lf);
844    }
845
846    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
847        if let Some(with) = &query.with {
848            if with.recursive {
849                polars_bail!(SQLInterface: "recursive CTEs are not supported")
850            }
851            for cte in &with.cte_tables {
852                // Note: isolate CTE execution to prevent context state leakage
853                let cte_name = cte.alias.name.value.clone();
854                let mut lf = self.execute_isolated(|ctx| ctx.execute_query(&cte.query))?;
855                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
856                self.register_cte(&cte_name, lf);
857            }
858        }
859        Ok(())
860    }
861
862    fn register_named_windows(
863        &mut self,
864        named_windows: &[NamedWindowDefinition],
865    ) -> PolarsResult<()> {
866        for NamedWindowDefinition(name, expr) in named_windows {
867            let spec = match expr {
868                NamedWindowExpr::NamedWindow(ref_name) => self
869                    .named_windows
870                    .get(&ref_name.value)
871                    .ok_or_else(|| {
872                        polars_err!(
873                            SQLInterface:
874                            "named window '{}' references undefined window '{}'",
875                            name.value, ref_name.value
876                        )
877                    })?
878                    .clone(),
879                NamedWindowExpr::WindowSpec(spec) => spec.clone(),
880            };
881            self.named_windows.insert(name.value.clone(), spec);
882        }
883        Ok(())
884    }
885
886    /// execute the 'FROM' part of the query
887    pub(crate) fn execute_from_statement(
888        &mut self,
889        tbl_expr: &TableWithJoins,
890    ) -> PolarsResult<LazyFrame> {
891        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
892        if !tbl_expr.joins.is_empty() {
893            for join in &tbl_expr.joins {
894                // Handle "CROSS JOIN UNNEST(col)" as a lateral join op
895                if let (
896                    JoinOperator::CrossJoin(JoinConstraint::None),
897                    TableFactor::UNNEST {
898                        alias,
899                        array_exprs,
900                        with_offset,
901                        ..
902                    },
903                ) = (&join.join_operator, &join.relation)
904                {
905                    if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
906                        lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
907                        continue;
908                    }
909                }
910
911                let (r_name, mut rf) = self.get_table(&join.relation)?;
912                if r_name.is_empty() {
913                    // Require non-empty to avoid duplicate column errors from nested self-joins.
914                    polars_bail!(
915                        SQLInterface:
916                        "cannot JOIN on unnamed relation; please provide an alias"
917                    )
918                }
919                let left_schema = self.get_frame_schema(&mut lf)?;
920                let right_schema = self.get_frame_schema(&mut rf)?;
921
922                lf = match &join.join_operator {
923                    op @ (JoinOperator::Join(constraint)  // note: bare "join" is inner
924                    | JoinOperator::FullOuter(constraint)
925                    | JoinOperator::Left(constraint)
926                    | JoinOperator::LeftOuter(constraint)
927                    | JoinOperator::Right(constraint)
928                    | JoinOperator::RightOuter(constraint)
929                    | JoinOperator::Inner(constraint)
930                    | JoinOperator::Anti(constraint)
931                    | JoinOperator::Semi(constraint)
932                    | JoinOperator::LeftAnti(constraint)
933                    | JoinOperator::LeftSemi(constraint)
934                    | JoinOperator::RightAnti(constraint)
935                    | JoinOperator::RightSemi(constraint)) => {
936                        let (lf, rf) = match op {
937                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
938                            _ => (lf, rf),
939                        };
940                        self.process_join(
941                            &TableInfo {
942                                frame: lf,
943                                name: (&l_name).into(),
944                                schema: left_schema.clone(),
945                            },
946                            &TableInfo {
947                                frame: rf,
948                                name: (&r_name).into(),
949                                schema: right_schema.clone(),
950                            },
951                            constraint,
952                            match op {
953                                JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
954                                JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
955                                    JoinType::Left
956                                },
957                                JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
958                                    JoinType::Right
959                                },
960                                JoinOperator::FullOuter(_) => JoinType::Full,
961                                #[cfg(feature = "semi_anti_join")]
962                                JoinOperator::Anti(_)
963                                | JoinOperator::LeftAnti(_)
964                                | JoinOperator::RightAnti(_) => JoinType::Anti,
965                                #[cfg(feature = "semi_anti_join")]
966                                JoinOperator::Semi(_)
967                                | JoinOperator::LeftSemi(_)
968                                | JoinOperator::RightSemi(_) => JoinType::Semi,
969                                join_type => polars_bail!(
970                                    SQLInterface:
971                                    "join type '{:?}' not currently supported",
972                                    join_type
973                                ),
974                            },
975                        )?
976                    },
977                    JoinOperator::CrossJoin(JoinConstraint::None) => {
978                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
979                    },
980                    JoinOperator::CrossJoin(constraint) => {
981                        polars_bail!(
982                            SQLInterface:
983                            "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
984                            constraint
985                        )
986                    },
987                    join_type => {
988                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
989                    },
990                };
991
992                // track join-aliased columns so we can resolve/check them later
993                let joined_schema = self.get_frame_schema(&mut lf)?;
994
995                self.joined_aliases.insert(
996                    r_name.clone(),
997                    right_schema
998                        .iter_names()
999                        .filter_map(|name| {
1000                            // col exists in both tables and is aliased in the joined result
1001                            let aliased_name = format!("{name}:{r_name}");
1002                            if left_schema.contains(name)
1003                                && joined_schema.contains(aliased_name.as_str())
1004                            {
1005                                Some((name.to_string(), aliased_name))
1006                            } else {
1007                                None
1008                            }
1009                        })
1010                        .collect::<PlHashMap<String, String>>(),
1011                );
1012            }
1013        };
1014        Ok(lf)
1015    }
1016
1017    /// Check that the SELECT statement only contains supported clauses.
1018    fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
1019        // Destructure "Select" exhaustively; that way if/when new fields are added in
1020        // future sqlparser versions, we'll get a compilation error and can handle them
1021        let Select {
1022            // Supported clauses
1023            distinct: _,
1024            from: _,
1025            group_by: _,
1026            having: _,
1027            named_window: _,
1028            projection: _,
1029            qualify: _,
1030            selection: _,
1031
1032            // Metadata/token fields (can ignore)
1033            flavor: _,
1034            select_token: _,
1035            top_before_distinct: _,
1036            window_before_qualify: _,
1037
1038            // Unsupported clauses
1039            ref cluster_by,
1040            ref connect_by,
1041            ref distribute_by,
1042            ref exclude,
1043            ref into,
1044            ref lateral_views,
1045            ref prewhere,
1046            ref sort_by,
1047            ref top,
1048            ref value_table_mode,
1049        } = *select_stmt;
1050
1051        // Raise specific error messages for unsupported attributes
1052        polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
1053        polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
1054        polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
1055        polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
1056        polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO` clause is not supported");
1057        polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
1058        polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
1059        polars_ensure!(sort_by.is_empty(), SQLInterface: "`SORT BY` clause is not supported; use `ORDER BY` instead");
1060        polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
1061        polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
1062
1063        Ok(())
1064    }
1065
1066    /// Check that the QUERY only contains supported clauses.
1067    fn validate_query(&self, query: &Query) -> PolarsResult<()> {
1068        // As with "Select" validation (above) destructure "Query" exhaustively
1069        let Query {
1070            // Supported clauses
1071            with: _,
1072            body: _,
1073            order_by: _,
1074            limit_clause: _,
1075            fetch,
1076
1077            // Unsupported clauses
1078            for_clause,
1079            format_clause,
1080            locks,
1081            pipe_operators,
1082            settings,
1083        } = query;
1084
1085        // Raise specific error messages for unsupported attributes
1086        polars_ensure!(for_clause.is_none(), SQLInterface: "`FOR` clause is not supported");
1087        polars_ensure!(format_clause.is_none(), SQLInterface: "`FORMAT` clause is not supported");
1088        polars_ensure!(locks.is_empty(), SQLInterface: "`FOR UPDATE/SHARE` locking clause is not supported");
1089        polars_ensure!(pipe_operators.is_empty(), SQLInterface: "pipe operators are not supported");
1090        polars_ensure!(settings.is_none(), SQLInterface: "`SETTINGS` clause is not supported");
1091
1092        // Validate FETCH clause options (if present)
1093        if let Some(Fetch {
1094            quantity: _, // supported
1095            percent,
1096            with_ties,
1097        }) = fetch
1098        {
1099            polars_ensure!(!percent, SQLInterface: "`FETCH` with `PERCENT` is not supported");
1100            polars_ensure!(!with_ties, SQLInterface: "`FETCH` with `WITH TIES` is not supported");
1101        }
1102        Ok(())
1103    }
1104
1105    /// Execute the 'SELECT' part of the query.
1106    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
1107        // Check that the statement doesn't contain unsupported SELECT clauses
1108        self.validate_select(select_stmt)?;
1109
1110        // Parse named windows first, as they may be referenced in the SELECT clause
1111        self.register_named_windows(&select_stmt.named_window)?;
1112
1113        // Get `FROM` table/data
1114        let mut implicit_join_filter: Option<SQLExpr> = None;
1115        let (mut lf, base_table_name) = if select_stmt.from.is_empty() {
1116            (DataFrame::empty().lazy(), None)
1117        } else {
1118            let from = &select_stmt.from;
1119            let first = from.first().unwrap();
1120            let mut lf = self.execute_from_statement(first)?;
1121            let base_name = get_table_name(&first.relation);
1122            if from.len() > 1 {
1123                implicit_join_filter =
1124                    self.process_implicit_joins(&mut lf, from, &select_stmt.selection)?;
1125            }
1126            (lf, base_name)
1127        };
1128
1129        // Check for ambiguous column references in SELECT and WHERE (if there were joins)
1130        if let Some(ref base_name) = base_table_name {
1131            if !self.joined_aliases.is_empty() {
1132                // Extract USING columns from joins (these are coalesced and not ambiguous)
1133                let using_cols: PlHashSet<String> = select_stmt
1134                    .from
1135                    .first()
1136                    .into_iter()
1137                    .flat_map(|t| t.joins.iter())
1138                    .filter_map(|join| get_using_cols(&join.join_operator))
1139                    .flatten()
1140                    .collect();
1141
1142                // Check SELECT and WHERE expressions for ambiguous column references
1143                let check_expr = |e| {
1144                    check_for_ambiguous_column_refs(e, &self.joined_aliases, base_name, &using_cols)
1145                };
1146                for item in &select_stmt.projection {
1147                    match item {
1148                        SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
1149                            check_expr(e)?
1150                        },
1151                        _ => {},
1152                    }
1153                }
1154                if let Some(ref where_expr) = select_stmt.selection {
1155                    check_expr(where_expr)?;
1156                }
1157            }
1158        }
1159
1160        // Apply `WHERE` constraint (using residual filter for implicit joins)
1161        let effective_where = if implicit_join_filter.is_some() {
1162            &implicit_join_filter
1163        } else {
1164            &select_stmt.selection
1165        };
1166        let mut schema = self.get_frame_schema(&mut lf)?;
1167        lf = self.process_where(
1168            lf,
1169            effective_where,
1170            FilterMode::KeepTrue,
1171            Some(schema.clone()),
1172        )?;
1173
1174        // Determine projections
1175        let mut select_modifiers = SelectModifiers {
1176            ilike: None,
1177            exclude: PlHashSet::new(),
1178            rename: PlHashMap::new(),
1179            replace: vec![],
1180        };
1181
1182        // Collect window function cols if QUALIFY is present (we check at the
1183        // SQL level because empty OVER() clauses don't create Expr::Over)
1184        let window_fn_columns = if select_stmt.qualify.is_some() {
1185            select_stmt
1186                .projection
1187                .iter()
1188                .filter_map(|item| match item {
1189                    SelectItem::ExprWithAlias { expr, alias }
1190                        if expr_has_window_functions(expr) =>
1191                    {
1192                        Some(alias.value.clone())
1193                    },
1194                    _ => None,
1195                })
1196                .collect::<PlHashSet<_>>()
1197        } else {
1198            PlHashSet::new()
1199        };
1200
1201        let mut projections =
1202            self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1203
1204        // Apply `UNNEST` expressions
1205        let mut explode_names = Vec::new();
1206        let mut explode_exprs = Vec::new();
1207        let mut explode_lookup = PlHashMap::new();
1208
1209        for expr in &projections {
1210            for e in expr {
1211                if let Expr::Explode { input, .. } = e {
1212                    match input.as_ref() {
1213                        Expr::Column(name) => explode_names.push(name.clone()),
1214                        other_expr => {
1215                            // Note: skip aggregate expressions; those are handled in the GROUP BY phase
1216                            if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1217                                let temp_name =
1218                                    format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1219                                explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1220                                explode_lookup.insert(other_expr.clone(), temp_name.clone());
1221                                explode_names.push(temp_name);
1222                            }
1223                        },
1224                    }
1225                }
1226            }
1227        }
1228        if !explode_names.is_empty() {
1229            if !explode_exprs.is_empty() {
1230                lf = lf.with_columns(explode_exprs);
1231            }
1232            lf = lf.explode(
1233                Selector::ByName {
1234                    names: Arc::from(explode_names),
1235                    strict: true,
1236                },
1237                ExplodeOptions {
1238                    empty_as_null: true,
1239                    keep_nulls: true,
1240                },
1241            );
1242            projections = projections
1243                .into_iter()
1244                .map(|p| {
1245                    // Update "projections" with column refs to the now-exploded expressions
1246                    p.map_expr(|e| match e {
1247                        Expr::Explode { input, .. } => explode_lookup
1248                            .get(input.as_ref())
1249                            .map(|name| Expr::Column(name.clone()))
1250                            .unwrap_or_else(|| input.as_ref().clone()),
1251                        _ => e,
1252                    })
1253                })
1254                .collect();
1255
1256            schema = self.get_frame_schema(&mut lf)?;
1257        }
1258
1259        // Check for "GROUP BY ..." (after determining projections)
1260        let mut group_by_keys: Vec<Expr> = Vec::new();
1261        match &select_stmt.group_by {
1262            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
1263            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1264                if !modifiers.is_empty() {
1265                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1266                }
1267                // Translate the group expressions, resolving ordinal values and SELECT aliases
1268                group_by_keys = group_by_exprs
1269                    .iter()
1270                    .map(|e| match e {
1271                        SQLExpr::Identifier(ident) => {
1272                            resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1273                                || {
1274                                    self.expr_or_ordinal(
1275                                        e,
1276                                        &projections,
1277                                        None,
1278                                        Some(&schema),
1279                                        "GROUP BY",
1280                                    )
1281                                },
1282                                Ok,
1283                            )
1284                        },
1285                        _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1286                    })
1287                    .collect::<PolarsResult<_>>()?
1288            },
1289            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
1290            // nested agg/window funcs to the group key (also ignores literals).
1291            GroupByExpr::All(modifiers) => {
1292                if !modifiers.is_empty() {
1293                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1294                }
1295                projections.iter().for_each(|expr| match expr {
1296                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
1297                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1298                    Expr::Column(_) => group_by_keys.push(expr.clone()),
1299                    Expr::Alias(e, _)
1300                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1301                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1302                        if let Expr::Column(name) = &**e {
1303                            group_by_keys.push(col(name.clone()));
1304                        }
1305                    },
1306                    _ => {
1307                        // If not quick-matched, add if no nested agg/window expressions
1308                        if !has_expr(expr, |e| {
1309                            matches!(e, Expr::Agg(_))
1310                                || matches!(e, Expr::Len)
1311                                || matches!(e, Expr::Over { .. })
1312                                || {
1313                                    #[cfg(feature = "dynamic_group_by")]
1314                                    {
1315                                        matches!(e, Expr::Rolling { .. })
1316                                    }
1317                                    #[cfg(not(feature = "dynamic_group_by"))]
1318                                    {
1319                                        false
1320                                    }
1321                                }
1322                        }) {
1323                            group_by_keys.push(expr.clone())
1324                        }
1325                    },
1326                });
1327            },
1328        };
1329
1330        lf = if group_by_keys.is_empty() {
1331            // The 'having' clause is only valid inside 'group by'
1332            if select_stmt.having.is_some() {
1333                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1334            };
1335
1336            // Final/selected cols, accounting for 'SELECT *' modifiers
1337            let mut retained_cols = Vec::with_capacity(projections.len());
1338            let mut retained_names = Vec::with_capacity(projections.len());
1339            let have_order_by = query.order_by.is_some();
1340
1341            // Initialize containing InheritsContext to handle empty projection case.
1342            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1343
1344            // Note: if there is an 'order by' then we project everything (original cols
1345            // and new projections) and *then* select the final cols; the retained cols
1346            // are used to ensure a correct final projection. If there's no 'order by',
1347            // clause then we can project the final column *expressions* directly.
1348            for p in projections.iter() {
1349                let name = p.to_field(schema.deref())?.name.to_string();
1350                if select_modifiers.matches_ilike(&name)
1351                    && !select_modifiers.exclude.contains(&name)
1352                {
1353                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1354
1355                    retained_cols.push(if have_order_by {
1356                        col(name.as_str())
1357                    } else {
1358                        p.clone()
1359                    });
1360                    retained_names.push(col(name));
1361                }
1362            }
1363
1364            // Apply the remaining modifiers and establish the final projection
1365            if have_order_by {
1366                // We can safely use `with_columns()` and avoid a join if:
1367                // * There is already a projection that projects to the table height.
1368                // * All projection heights inherit from context (e.g. all scalar literals that
1369                //   are to be broadcasted to table height).
1370                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1371                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1372                {
1373                    lf = lf.with_columns(projections);
1374                } else {
1375                    // We hit this branch if the output height is not guaranteed to match the table
1376                    // height. E.g.:
1377                    //
1378                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
1379                    //
1380                    // For these cases we truncate / extend the sorting columns with NULLs to match
1381                    // the output height. We do this by projecting independently and then joining
1382                    // back the original frame on the row index.
1383                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1384                    lf = lf
1385                        .clone()
1386                        .select(projections)
1387                        .with_row_index(NAME, None)
1388                        .join(
1389                            lf.with_row_index(NAME, None),
1390                            [col(NAME)],
1391                            [col(NAME)],
1392                            JoinArgs {
1393                                how: JoinType::Left,
1394                                validation: Default::default(),
1395                                suffix: None,
1396                                slice: None,
1397                                nulls_equal: false,
1398                                coalesce: Default::default(),
1399                                maintain_order: MaintainOrderJoin::Left,
1400                                build_side: None,
1401                            },
1402                        );
1403                }
1404            }
1405            if !select_modifiers.replace.is_empty() {
1406                lf = lf.with_columns(&select_modifiers.replace);
1407            }
1408            if !select_modifiers.rename.is_empty() {
1409                lf = lf.with_columns(select_modifiers.renamed_cols());
1410            }
1411            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1412
1413            // Note: If `have_order_by`, with_columns is already done above.
1414            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1415                && !have_order_by
1416            {
1417                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
1418                lf = lf.with_columns(retained_cols).select(retained_names);
1419            } else {
1420                lf = lf.select(retained_cols);
1421            }
1422            if !select_modifiers.rename.is_empty() {
1423                lf = lf.rename(
1424                    select_modifiers.rename.keys(),
1425                    select_modifiers.rename.values(),
1426                    true,
1427                );
1428            };
1429            lf
1430        } else {
1431            let having = select_stmt
1432                .having
1433                .as_ref()
1434                .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1435                .transpose()?;
1436            lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1437            lf = self.process_order_by(lf, &query.order_by, None)?;
1438
1439            // Drop any extra columns (eg: added to maintain ORDER BY access to original cols)
1440            let output_cols: Vec<_> = projections
1441                .iter()
1442                .map(|p| p.to_field(&schema))
1443                .collect::<PolarsResult<Vec<_>>>()?
1444                .into_iter()
1445                .map(|f| col(f.name))
1446                .collect();
1447
1448            lf.select(&output_cols)
1449        };
1450
1451        // Apply optional QUALIFY clause (filters on window functions).
1452        lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1453
1454        // Apply optional DISTINCT clause.
1455        lf = match &select_stmt.distinct {
1456            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1457            Some(Distinct::On(exprs)) => {
1458                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
1459                let schema = Some(self.get_frame_schema(&mut lf)?);
1460                let cols = exprs
1461                    .iter()
1462                    .map(|e| {
1463                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
1464                        if let Expr::Column(name) = expr {
1465                            Ok(name)
1466                        } else {
1467                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1468                        }
1469                    })
1470                    .collect::<PolarsResult<Vec<_>>>()?;
1471
1472                // DISTINCT ON has to apply the ORDER BY before the operation.
1473                lf = self.process_order_by(lf, &query.order_by, None)?;
1474                return Ok(lf.unique_stable(
1475                    Some(Selector::ByName {
1476                        names: cols.into(),
1477                        strict: true,
1478                    }),
1479                    UniqueKeepStrategy::First,
1480                ));
1481            },
1482            None => lf,
1483        };
1484        Ok(lf)
1485    }
1486
1487    fn column_projections(
1488        &mut self,
1489        select_stmt: &Select,
1490        schema: &SchemaRef,
1491        select_modifiers: &mut SelectModifiers,
1492    ) -> PolarsResult<Vec<Expr>> {
1493        if select_stmt.projection.is_empty()
1494            && select_stmt.flavor == SelectFlavor::FromFirstNoSelect
1495        {
1496            // eg: bare "FROM tbl" is equivalent to "SELECT * FROM tbl".
1497            return Ok(schema.iter_names().map(|name| col(name.clone())).collect());
1498        }
1499        let mut items: Vec<ProjectionItem> = Vec::with_capacity(select_stmt.projection.len());
1500        let mut has_qualified_wildcard = false;
1501
1502        for select_item in &select_stmt.projection {
1503            match select_item {
1504                SelectItem::UnnamedExpr(expr) => {
1505                    items.push(ProjectionItem::Exprs(vec![parse_sql_expr(
1506                        expr,
1507                        self,
1508                        Some(schema),
1509                    )?]));
1510                },
1511                SelectItem::ExprWithAlias { expr, alias } => {
1512                    let expr = parse_sql_expr(expr, self, Some(schema))?;
1513                    items.push(ProjectionItem::Exprs(vec![
1514                        expr.alias(PlSmallStr::from_str(alias.value.as_str())),
1515                    ]));
1516                },
1517                SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1518                    SelectItemQualifiedWildcardKind::ObjectName(obj_name) => {
1519                        let tbl_name = obj_name
1520                            .0
1521                            .last()
1522                            .and_then(|p| p.as_ident())
1523                            .map(|i| PlSmallStr::from_str(&i.value))
1524                            .unwrap_or_default();
1525                        let exprs = self.process_qualified_wildcard(
1526                            obj_name,
1527                            wildcard_options,
1528                            select_modifiers,
1529                            Some(schema),
1530                        )?;
1531                        items.push(ProjectionItem::QualifiedExprs(tbl_name, exprs));
1532                        has_qualified_wildcard = true;
1533                    },
1534                    SelectItemQualifiedWildcardKind::Expr(_) => {
1535                        polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1536                    },
1537                },
1538                SelectItem::Wildcard(wildcard_options) => {
1539                    let cols = schema.iter_names().map(|name| col(name.clone())).collect();
1540                    items.push(ProjectionItem::Exprs(
1541                        self.process_wildcard_additional_options(
1542                            cols,
1543                            wildcard_options,
1544                            select_modifiers,
1545                            Some(schema),
1546                        )?,
1547                    ));
1548                },
1549            }
1550        }
1551
1552        // Disambiguate qualified wildcards (if any) and flatten expressions
1553        let exprs = if has_qualified_wildcard {
1554            disambiguate_projection_cols(items, schema)?
1555        } else {
1556            items
1557                .into_iter()
1558                .flat_map(|item| match item {
1559                    ProjectionItem::Exprs(exprs) | ProjectionItem::QualifiedExprs(_, exprs) => {
1560                        exprs
1561                    },
1562                })
1563                .collect()
1564        };
1565        let flattened_exprs = exprs
1566            .into_iter()
1567            .flat_map(|expr| expand_exprs(expr, schema))
1568            .collect();
1569
1570        Ok(flattened_exprs)
1571    }
1572
1573    fn process_where(
1574        &mut self,
1575        mut lf: LazyFrame,
1576        expr: &Option<SQLExpr>,
1577        filter_mode: FilterMode,
1578        schema: Option<SchemaRef>,
1579    ) -> PolarsResult<LazyFrame> {
1580        if let Some(expr) = expr {
1581            let schema = match schema {
1582                None => self.get_frame_schema(&mut lf)?,
1583                Some(s) => s,
1584            };
1585
1586            // shortcut filter evaluation if given expression is just TRUE or FALSE
1587            let (all_true, all_false) = match expr {
1588                SQLExpr::Value(ValueWithSpan {
1589                    value: SQLValue::Boolean(b),
1590                    ..
1591                }) => (*b, !*b),
1592                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1593                    (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::Eq) => {
1594                        (a.value == b.value, a.value != b.value)
1595                    },
1596                    (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::NotEq) => {
1597                        (a.value != b.value, a.value == b.value)
1598                    },
1599                    _ => (false, false),
1600                },
1601                _ => (false, false),
1602            };
1603            let removing = filter_mode == FilterMode::RemoveTrue;
1604            if (all_true && !removing) || (all_false && removing) {
1605                return Ok(lf);
1606            } else if (all_false && !removing) || (all_true && removing) {
1607                return Ok(lf.clear());
1608            }
1609
1610            // Lower eligible `[NOT] EXISTS` / `[NOT] IN (subquery)` conjuncts
1611            // to semi / anti joins; whatever remains goes through the ordinary
1612            // filter path below.
1613            let residual_exprs: Vec<&SQLExpr>;
1614            (lf, residual_exprs) =
1615                self.rewrite_subquery_conjuncts(lf, expr, filter_mode, &schema)?;
1616
1617            let Some(parsed_residual) = residual_exprs
1618                .iter()
1619                .map(|e| parse_sql_expr(e, self, Some(&*schema)))
1620                .reduce(|a, b| Ok(a?.and(b?)))
1621            else {
1622                // Every conjunct was rewritten to a join; nothing left to filter.
1623                return Ok(lf);
1624            };
1625            let mut filter_expression = parsed_residual?;
1626            if filter_expression.clone().meta().has_multiple_outputs() {
1627                filter_expression = all_horizontal([filter_expression])?;
1628            }
1629            lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1630            lf = match filter_mode {
1631                FilterMode::KeepTrue => lf.filter(filter_expression),
1632                FilterMode::RemoveTrue => lf.remove(filter_expression),
1633            };
1634        }
1635        Ok(lf)
1636    }
1637
1638    pub(super) fn process_join(
1639        &mut self,
1640        tbl_left: &TableInfo,
1641        tbl_right: &TableInfo,
1642        constraint: &JoinConstraint,
1643        join_type: JoinType,
1644    ) -> PolarsResult<LazyFrame> {
1645        let (left_on, right_on, predicates) =
1646            process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1647        let coalesce_type = match constraint {
1648            // "NATURAL" joins should coalesce; otherwise we disambiguate
1649            JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1650            _ => JoinCoalesce::KeepColumns,
1651        };
1652        let suffix = format!(":{}", tbl_right.name);
1653
1654        let joined = if predicates.is_empty() {
1655            // Equi-join: standard left_on/right_on path
1656            tbl_left
1657                .frame
1658                .clone()
1659                .join_builder()
1660                .with(tbl_right.frame.clone())
1661                .left_on(left_on)
1662                .right_on(right_on)
1663                .how(join_type)
1664                .suffix(suffix)
1665                .coalesce(coalesce_type)
1666                .finish()
1667        } else {
1668            // Non-equi conditions: convert to predicates for `join_where`.
1669            // Any equi-conditions become equality predicates with right-side
1670            // columns suffixed to match their merged-schema names.
1671            let mut all_predicates = predicates;
1672            for (l, r) in left_on.into_iter().zip(right_on) {
1673                let r_suffixed = suffix_conflicting_columns(r, tbl_left, tbl_right, &suffix);
1674                all_predicates.push(l.eq(r_suffixed));
1675            }
1676            tbl_left
1677                .frame
1678                .clone()
1679                .join_builder()
1680                .with(tbl_right.frame.clone())
1681                .how(join_type)
1682                .suffix(suffix)
1683                .coalesce(coalesce_type)
1684                .join_where(all_predicates)
1685        };
1686
1687        Ok(joined)
1688    }
1689
1690    /// Process implicit (comma-separated) joins from `FROM t1, t2, ...` syntax.
1691    ///
1692    /// Extracts join predicates from the WHERE clause, joining each additional table with
1693    /// either an INNER JOIN (cross-table predicates found) or a CROSS JOIN (no predicates)
1694    /// Returns the residual WHERE conditions not consumed as join predicates.
1695    fn process_implicit_joins(
1696        &mut self,
1697        lf: &mut LazyFrame,
1698        from: &[TableWithJoins],
1699        where_clause: &Option<SQLExpr>,
1700    ) -> PolarsResult<Option<SQLExpr>> {
1701        let first = from.first().unwrap();
1702        let base_name = get_table_name(&first.relation).unwrap_or_default();
1703        let mut remaining_where = where_clause.clone();
1704        let mut joined_table_names: Vec<String> = vec![base_name];
1705
1706        // Track table names from explicit joins in the first FROM entry
1707        for join in &first.joins {
1708            if let Some(name) = get_table_name(&join.relation) {
1709                joined_table_names.push(name);
1710            }
1711        }
1712        for tbl_expr in from.iter().skip(1) {
1713            let mut rf = self.execute_from_statement(tbl_expr)?;
1714            let r_name = get_table_name(&tbl_expr.relation).unwrap_or_default();
1715            polars_ensure!(
1716                !r_name.is_empty(),
1717                SQLInterface: "implicit joins require named tables; please provide an alias"
1718            );
1719            let left_schema = self.get_frame_schema(lf)?;
1720            let right_schema = self.get_frame_schema(&mut rf)?;
1721            let (join_expr, residual) =
1722                extract_join_predicates(&remaining_where, &joined_table_names, &r_name);
1723
1724            *lf = if let Some(on_expr) = join_expr {
1725                self.process_join(
1726                    &TableInfo {
1727                        frame: lf.clone(),
1728                        name: PlSmallStr::from_str(
1729                            &joined_table_names.first().cloned().unwrap_or_default(),
1730                        ),
1731                        schema: left_schema.clone(),
1732                    },
1733                    &TableInfo {
1734                        frame: rf,
1735                        name: PlSmallStr::from_str(&r_name),
1736                        schema: right_schema.clone(),
1737                    },
1738                    &JoinConstraint::On(on_expr),
1739                    JoinType::Inner,
1740                )?
1741            } else {
1742                lf.clone()
1743                    .cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
1744            };
1745            remaining_where = residual;
1746
1747            // Track join-aliased columns for later resolution
1748            let joined_schema = self.get_frame_schema(lf)?;
1749            self.joined_aliases.insert(
1750                r_name.clone(),
1751                right_schema
1752                    .iter_names()
1753                    .filter_map(|name| {
1754                        let aliased_name = format!("{name}:{r_name}");
1755                        if left_schema.contains(name)
1756                            && joined_schema.contains(aliased_name.as_str())
1757                        {
1758                            Some((name.to_string(), aliased_name))
1759                        } else {
1760                            None
1761                        }
1762                    })
1763                    .collect::<PlHashMap<String, String>>(),
1764            );
1765            joined_table_names.push(r_name);
1766            for join in &tbl_expr.joins {
1767                if let Some(name) = get_table_name(&join.relation) {
1768                    joined_table_names.push(name);
1769                }
1770            }
1771        }
1772        Ok(remaining_where)
1773    }
1774
1775    fn process_qualify(
1776        &mut self,
1777        mut lf: LazyFrame,
1778        qualify_expr: &Option<SQLExpr>,
1779        window_fn_columns: &PlHashSet<String>,
1780    ) -> PolarsResult<LazyFrame> {
1781        if let Some(expr) = qualify_expr {
1782            // Check the QUALIFY expression to identify window functions
1783            // and collect column refs (for looking up aliases from SELECT)
1784            let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1785            let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1786            if !has_window_fns && !references_window_alias {
1787                polars_bail!(
1788                    SQLSyntax:
1789                    "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1790                );
1791            }
1792            let schema = self.get_frame_schema(&mut lf)?;
1793            let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1794            if filter_expression.clone().meta().has_multiple_outputs() {
1795                filter_expression = all_horizontal([filter_expression])?;
1796            }
1797            lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1798            lf = lf.filter(filter_expression);
1799        }
1800        Ok(lf)
1801    }
1802
1803    fn process_subqueries(
1804        &mut self,
1805        lf: LazyFrame,
1806        exprs: Vec<&mut Expr>,
1807    ) -> PolarsResult<LazyFrame> {
1808        let mut subplans = vec![];
1809
1810        for e in exprs {
1811            *e = e.clone().try_map_expr(|e| {
1812                if let Expr::SubPlan(lp, names) = e {
1813                    assert_eq!(
1814                        names.len(),
1815                        1,
1816                        "multiple columns in subqueries not yet supported"
1817                    );
1818
1819                    let select_expr = names[0].1.clone();
1820                    let mut lf = LazyFrame::from((**lp).clone());
1821                    let schema = self.get_frame_schema(&mut lf)?;
1822                    polars_ensure!(schema.len() == 1,  SQLSyntax: "SQL subquery returns more than one column");
1823                    let lf = lf.select([select_expr.clone()]);
1824
1825                    subplans.push(lf);
1826                    Ok(Expr::Column(names[0].0.clone()).first())
1827                } else {
1828                    Ok(e)
1829                }
1830            })?;
1831        }
1832
1833        if subplans.is_empty() {
1834            Ok(lf)
1835        } else {
1836            subplans.insert(0, lf);
1837            concat_lf_horizontal(
1838                subplans,
1839                HConcatOptions {
1840                    broadcast_unit_length: true,
1841                    ..Default::default()
1842                },
1843            )
1844        }
1845    }
1846
1847    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1848        if let Statement::CreateTable(CreateTable {
1849            if_not_exists,
1850            name,
1851            query,
1852            columns,
1853            like,
1854            ..
1855        }) = stmt
1856        {
1857            let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1858            if *if_not_exists && self.table_map.read().unwrap().contains_key(tbl_name) {
1859                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1860            }
1861            let lf = match (query, columns.is_empty(), like) {
1862                (Some(query), true, None) => {
1863                    // ----------------------------------------------------
1864                    // CREATE TABLE [IF NOT EXISTS] <name> AS <query>
1865                    // ----------------------------------------------------
1866                    self.execute_query(query)?
1867                },
1868                (None, false, None) => {
1869                    // ----------------------------------------------------
1870                    // CREATE TABLE [IF NOT EXISTS] <name> (<coldef>, ...)
1871                    // ----------------------------------------------------
1872                    let mut schema = Schema::with_capacity(columns.len());
1873                    for col in columns {
1874                        let col_name = col.name.value.as_str();
1875                        let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1876                        schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1877                    }
1878                    DataFrame::empty_with_schema(&schema).lazy()
1879                },
1880                (None, true, Some(like_kind)) => {
1881                    // ----------------------------------------------------
1882                    // CREATE TABLE [IF NOT EXISTS] <name> LIKE <table>
1883                    // ----------------------------------------------------
1884                    let like_name = match like_kind {
1885                        CreateTableLikeKind::Plain(like)
1886                        | CreateTableLikeKind::Parenthesized(like) => &like.name,
1887                    };
1888                    let like_table = like_name
1889                        .0
1890                        .first()
1891                        .unwrap()
1892                        .as_ident()
1893                        .unwrap()
1894                        .value
1895                        .as_str();
1896                    if let Some(table) = self.table_map.read().unwrap().get(like_table).cloned() {
1897                        table.clear()
1898                    } else {
1899                        polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1900                    }
1901                },
1902                // No valid options provided
1903                (None, true, None) => {
1904                    polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1905                },
1906                // Mutually exclusive options
1907                _ => {
1908                    polars_bail!(
1909                        SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1910                        query,
1911                        columns,
1912                        like,
1913                    )
1914                },
1915            };
1916            self.register(tbl_name, lf);
1917
1918            let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1919            Ok(df_created.unwrap().lazy())
1920        } else {
1921            unreachable!()
1922        }
1923    }
1924
1925    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1926        match relation {
1927            TableFactor::Table {
1928                name, alias, args, ..
1929            } => {
1930                if let Some(args) = args {
1931                    return self.execute_table_function(name, alias, &args.args);
1932                }
1933                let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1934                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1935                    match alias {
1936                        Some(alias) => {
1937                            self.table_aliases
1938                                .insert(alias.name.value.clone(), tbl_name.to_string());
1939                            Ok((alias.name.value.clone(), lf))
1940                        },
1941                        None => Ok((tbl_name.to_string(), lf)),
1942                    }
1943                } else {
1944                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1945                }
1946            },
1947            TableFactor::Derived {
1948                lateral,
1949                subquery,
1950                alias,
1951            } => {
1952                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1953                // Execute the subquery in isolation so that outer join state
1954                // doesn't leak into it and cause spurious ambiguous-column errors
1955                if let Some(alias) = alias {
1956                    let mut lf =
1957                        self.execute_isolated(|ctx| ctx.execute_query_no_ctes(subquery))?;
1958                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1959                    self.table_map
1960                        .write()
1961                        .unwrap()
1962                        .insert(alias.name.value.clone(), lf.clone());
1963                    Ok((alias.name.value.clone(), lf))
1964                } else {
1965                    let lf = self.execute_isolated(|ctx| ctx.execute_query_no_ctes(subquery))?;
1966                    Ok(("".to_string(), lf))
1967                }
1968            },
1969            TableFactor::UNNEST {
1970                alias,
1971                array_exprs,
1972                with_offset,
1973                with_offset_alias: _,
1974                ..
1975            } => {
1976                if let Some(alias) = alias {
1977                    let column_names: Vec<Option<PlSmallStr>> = alias
1978                        .columns
1979                        .iter()
1980                        .map(|c| {
1981                            if c.name.value.is_empty() {
1982                                None
1983                            } else {
1984                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1985                            }
1986                        })
1987                        .collect();
1988
1989                    let column_values: Vec<Series> = array_exprs
1990                        .iter()
1991                        .map(|arr| parse_sql_array(arr, self))
1992                        .collect::<Result<_, _>>()?;
1993
1994                    polars_ensure!(!column_names.is_empty(),
1995                        SQLSyntax:
1996                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1997                    );
1998                    if column_names.len() != column_values.len() {
1999                        let plural = if column_values.len() > 1 { "s" } else { "" };
2000                        polars_bail!(
2001                            SQLSyntax:
2002                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
2003                        );
2004                    }
2005                    let column_series: Vec<Column> = column_values
2006                        .into_iter()
2007                        .zip(column_names)
2008                        .map(|(s, name)| {
2009                            if let Some(name) = name {
2010                                s.with_name(name)
2011                            } else {
2012                                s
2013                            }
2014                        })
2015                        .map(Column::from)
2016                        .collect();
2017
2018                    let lf = DataFrame::new_infer_height(column_series)?.lazy();
2019
2020                    if *with_offset {
2021                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
2022                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
2023                    }
2024                    let table_name = alias.name.value.clone();
2025                    self.table_map
2026                        .write()
2027                        .unwrap()
2028                        .insert(table_name.clone(), lf.clone());
2029                    Ok((table_name, lf))
2030                } else {
2031                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
2032                }
2033            },
2034            TableFactor::NestedJoin {
2035                table_with_joins,
2036                alias,
2037            } => {
2038                let lf =
2039                    self.execute_isolated(|ctx| ctx.execute_from_statement(table_with_joins))?;
2040                match alias {
2041                    Some(a) => Ok((a.name.value.clone(), lf)),
2042                    None => Ok(("".to_string(), lf)),
2043                }
2044            },
2045            // Support bare table, optionally with an alias, for now
2046            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
2047        }
2048    }
2049
2050    fn execute_table_function(
2051        &mut self,
2052        name: &ObjectName,
2053        alias: &Option<TableAlias>,
2054        args: &[FunctionArg],
2055    ) -> PolarsResult<(String, LazyFrame)> {
2056        let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
2057        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
2058        let (tbl_name, lf) = read_fn.execute(args)?;
2059        #[allow(clippy::useless_asref)]
2060        let tbl_name = alias
2061            .as_ref()
2062            .map(|a| a.name.value.clone())
2063            .unwrap_or_else(|| tbl_name.to_string());
2064
2065        self.table_map
2066            .write()
2067            .unwrap()
2068            .insert(tbl_name.clone(), lf.clone());
2069        Ok((tbl_name, lf))
2070    }
2071
2072    fn process_order_by(
2073        &mut self,
2074        mut lf: LazyFrame,
2075        order_by: &Option<OrderBy>,
2076        selected: Option<&[Expr]>,
2077    ) -> PolarsResult<LazyFrame> {
2078        if order_by.as_ref().is_none_or(|ob| match &ob.kind {
2079            OrderByKind::Expressions(exprs) => exprs.is_empty(),
2080            OrderByKind::All(_) => false,
2081        }) {
2082            return Ok(lf);
2083        }
2084        let schema = self.get_frame_schema(&mut lf)?;
2085        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
2086        let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
2087            OrderByKind::Expressions(exprs) => {
2088                // TODO: will look at making an upstream PR that allows us to more easily
2089                //  create a GenericDialect variant supporting "OrderByKind::All" instead
2090                if exprs.len() == 1
2091                    && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
2092                        if ident.value.to_uppercase() == "ALL"
2093                        && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
2094                {
2095                    // Treat as ORDER BY ALL
2096                    let n_cols = if let Some(selected) = selected {
2097                        selected.len()
2098                    } else {
2099                        schema.len()
2100                    };
2101                    (vec![], Some(&exprs[0].options), n_cols)
2102                } else {
2103                    (exprs.clone(), None, exprs.len())
2104                }
2105            },
2106            OrderByKind::All(opts) => {
2107                let n_cols = if let Some(selected) = selected {
2108                    selected.len()
2109                } else {
2110                    schema.len()
2111                };
2112                (vec![], Some(opts), n_cols)
2113            },
2114        };
2115        let mut descending = Vec::with_capacity(n_order_cols);
2116        let mut nulls_last = Vec::with_capacity(n_order_cols);
2117        let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
2118
2119        if let Some(opts) = order_by_all {
2120            if let Some(selected) = selected {
2121                by.extend(selected.iter().cloned());
2122            } else {
2123                by.extend(columns_iter);
2124            };
2125            let desc_order = !opts.asc.unwrap_or(true);
2126            nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
2127            descending.resize(by.len(), desc_order);
2128        } else {
2129            let columns = &columns_iter.collect::<Vec<_>>();
2130            for ob in order_by {
2131                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
2132                // https://www.postgresql.org/docs/current/queries-order.html
2133                let desc_order = !ob.options.asc.unwrap_or(true);
2134                nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
2135                descending.push(desc_order);
2136
2137                // translate order expression, allowing ordinal values
2138                by.push(self.expr_or_ordinal(
2139                    &ob.expr,
2140                    columns,
2141                    selected,
2142                    Some(&schema),
2143                    "ORDER BY",
2144                )?)
2145            }
2146        }
2147        Ok(lf.sort_by_exprs(
2148            &by,
2149            SortMultipleOptions::default()
2150                .with_order_descending_multi(descending)
2151                .with_nulls_last_multi(nulls_last),
2152        ))
2153    }
2154
2155    fn process_group_by(
2156        &mut self,
2157        mut lf: LazyFrame,
2158        group_by_keys: &[Expr],
2159        projections: &[Expr],
2160        having: Option<Expr>,
2161    ) -> PolarsResult<LazyFrame> {
2162        let schema_before = self.get_frame_schema(&mut lf)?;
2163        let group_by_keys_schema =
2164            expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
2165                format!("group_by keys contained duplicate output name '{duplicate_name}'")
2166            })?;
2167
2168        // Note: remove the `group_by` keys as Polars adds those implicitly.
2169        let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
2170        let mut aggregation_projection = Vec::with_capacity(projections.len());
2171        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
2172        let mut projection_aliases = PlHashSet::new();
2173        let mut group_key_aliases = PlHashSet::new();
2174
2175        // Pre-compute group key data (alias-stripped expression + aggregated output
2176        // name) to avoid repeated work matching projections against the group keys
2177        let group_key_data: Vec<_> = group_by_keys
2178            .iter()
2179            .map(|gk| {
2180                (
2181                    strip_outer_alias(gk),
2182                    gk.to_field(&schema_before).ok().map(|f| f.name),
2183                )
2184            })
2185            .collect();
2186
2187        let projection_group_key: Vec<Option<PlSmallStr>> = projections
2188            .iter()
2189            .map(|p| {
2190                let p_stripped = strip_outer_alias(p);
2191                group_key_data.iter().find_map(|(gk_stripped, gk_name)| {
2192                    (*gk_stripped == p_stripped)
2193                        .then(|| gk_name.clone())
2194                        .flatten()
2195                })
2196            })
2197            .collect();
2198
2199        for (e, group_key) in projections.iter().zip(&projection_group_key) {
2200            let matches_group_key = group_key.is_some();
2201            // `Len` represents COUNT(*) so we treat as an aggregation here.
2202            let is_non_group_key_expr = !matches_group_key
2203                && has_expr(e, |e| {
2204                    match e {
2205                        Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
2206                        #[cfg(feature = "dynamic_group_by")]
2207                        Expr::Rolling { .. } => true,
2208                        Expr::AnonymousFunction { options, .. } => options.returns_scalar(),
2209                        Expr::Function { function: func, .. }
2210                            if !matches!(func, FunctionExpr::StructExpr(_)) =>
2211                        {
2212                            // If it's a function call containing a column NOT in the group by keys,
2213                            // we treat it as an aggregation.
2214                            has_expr(e, |e| match e {
2215                                Expr::Column(name) => !group_by_keys_schema.contains(name),
2216                                _ => false,
2217                            })
2218                        },
2219                        _ => false,
2220                    }
2221                });
2222
2223            // Note: if simple aliased expression we defer aliasing until after the group_by.
2224            // Use `e_inner` to track the potentially unwrapped expression for field lookup.
2225            let mut e_inner = e;
2226            if let Expr::Alias(expr, alias) = e {
2227                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
2228                    group_key_aliases.insert(alias.as_ref());
2229                    e_inner = expr
2230                } else if let Expr::Function {
2231                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
2232                    ..
2233                } = expr.deref()
2234                {
2235                    projection_overrides
2236                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
2237                } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
2238                    projection_aliases.insert(alias.as_ref());
2239                }
2240            }
2241            let field = e_inner.to_field(&schema_before)?;
2242            if is_non_group_key_expr {
2243                let mut e = e.clone();
2244                if let Expr::Agg(AggExpr::Implode {
2245                    input: expr,
2246                    maintain_order: _,
2247                }) = &e
2248                {
2249                    e = (**expr).clone();
2250                } else if let Expr::Alias(expr, name) = &e {
2251                    if let Expr::Agg(AggExpr::Implode {
2252                        input: expr,
2253                        maintain_order: _,
2254                    }) = expr.as_ref()
2255                    {
2256                        e = (**expr).clone().alias(name.clone());
2257                    }
2258                }
2259                // If aggregation colname conflicts with a group key,
2260                // alias it to avoid duplicate/mis-tracked columns
2261                if group_by_keys_schema.get(&field.name).is_some() {
2262                    let alias_name = format_pl_smallstr!("__POLARS_AGG_{}", field.name);
2263                    e = e.alias(alias_name.clone());
2264                    aliased_aggregations.insert(field.name.clone(), alias_name);
2265                }
2266                aggregation_projection.push(e);
2267            } else if !matches_group_key {
2268                // Non-aggregated columns must be part of the GROUP BY clause
2269                if let Expr::Column(_)
2270                | Expr::Function {
2271                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
2272                    ..
2273                } = e_inner
2274                {
2275                    if !group_by_keys_schema.contains(&field.name) {
2276                        polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
2277                    }
2278                }
2279            }
2280        }
2281
2282        // Process HAVING clause: identify aggregate expressions, reusing those already
2283        // in projections, or compute as temporary columns and then post-filter/discard
2284        let having_filter = if let Some(having_expr) = having {
2285            let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
2286                .iter()
2287                .filter_map(|p| match p {
2288                    Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
2289                        Some((inner.as_ref().clone(), name.clone()))
2290                    },
2291                    e @ (Expr::Agg(_) | Expr::Len) => Some((
2292                        e.clone(),
2293                        e.to_field(&schema_before)
2294                            .map(|f| f.name)
2295                            .unwrap_or_default(),
2296                    )),
2297                    _ => None,
2298                })
2299                .collect();
2300
2301            let mut n_having_aggs = 0;
2302            let updated_having = having_expr.map_expr(|e| {
2303                if !matches!(&e, Expr::Agg(_) | Expr::Len) {
2304                    return e;
2305                }
2306                let name = agg_to_name
2307                    .iter()
2308                    .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
2309                    .unwrap_or_else(|| {
2310                        let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
2311                        aggregation_projection.push(e.clone().alias(n.clone()));
2312                        agg_to_name.push((e.clone(), n.clone()));
2313                        n_having_aggs += 1;
2314                        n
2315                    });
2316                col(name)
2317            });
2318            Some(updated_having)
2319        } else {
2320            None
2321        };
2322
2323        // Apply HAVING filter after aggregation
2324        let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
2325        if let Some(filter_expr) = having_filter {
2326            aggregated = aggregated.filter(filter_expr);
2327        }
2328
2329        let projection_schema =
2330            expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
2331                format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
2332            })?;
2333
2334        // A final projection to get the proper order and any deferred transforms/aliases
2335        // (will also drop any temporary columns created for the HAVING post-filter).
2336        let final_projection = projection_schema
2337            .iter_names()
2338            .zip(projections.iter().zip(&projection_group_key))
2339            .map(|(name, (projection_expr, group_key))| {
2340                if let Some(expr) = projection_overrides.get(name.as_str()) {
2341                    expr.clone()
2342                } else if let Some(aliased_name) = aliased_aggregations.get(name) {
2343                    col(aliased_name.clone()).alias(name.clone())
2344                } else if let Some(key_name) = group_key {
2345                    // projection is a group key; reference aggregated key col rather than
2346                    // re-evaluating against aggregated frame (incorrect for computed keys)
2347                    if key_name == name {
2348                        col(name.clone())
2349                    } else {
2350                        col(key_name.clone()).alias(name.clone())
2351                    }
2352                } else if group_by_keys_schema.get(name).is_some()
2353                    || projection_aliases.contains(name.as_str())
2354                    || group_key_aliases.contains(name.as_str())
2355                {
2356                    if has_expr(projection_expr, |e| {
2357                        matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
2358                    }) {
2359                        col(name.clone())
2360                    } else {
2361                        projection_expr.clone()
2362                    }
2363                } else {
2364                    col(name.clone())
2365                }
2366            })
2367            .collect::<Vec<_>>();
2368
2369        // Include original GROUP BY columns for ORDER BY access (if aliased).
2370        let mut output_projection = final_projection;
2371        for key_name in group_by_keys_schema.iter_names() {
2372            if !projection_schema.contains(key_name) {
2373                // Original col name not in output - add for ORDER BY access
2374                output_projection.push(col(key_name.clone()));
2375            } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2376                // Original col name in output - check if cross-aliased
2377                let is_cross_aliased = projection_schema
2378                    .iter_names()
2379                    .zip(projections.iter())
2380                    .any(|(name, p)| name == key_name && !is_simple_col_ref(p, key_name));
2381                if is_cross_aliased {
2382                    // Add original name under a prefixed alias for subsequent ORDER BY resolution
2383                    let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2384                    output_projection.push(col(key_name.clone()).alias(internal_name));
2385                }
2386            }
2387        }
2388        Ok(aggregated.select(&output_projection))
2389    }
2390
2391    fn process_limit_offset(
2392        &self,
2393        lf: LazyFrame,
2394        limit_clause: &Option<LimitClause>,
2395        fetch: &Option<Fetch>,
2396    ) -> PolarsResult<LazyFrame> {
2397        // Extract limit and offset from LimitClause
2398        let (limit, offset) = match limit_clause {
2399            Some(LimitClause::LimitOffset {
2400                limit,
2401                offset,
2402                limit_by,
2403            }) => {
2404                if !limit_by.is_empty() {
2405                    // TODO: might be able to support as an aggregate `top_k_by` operation?
2406                    //  (https://clickhouse.com/docs/sql-reference/statements/select/limit-by)
2407                    polars_bail!(SQLSyntax: "`LIMIT <n> BY <exprs>` clause is not supported");
2408                }
2409                (limit.as_ref(), offset.as_ref().map(|o| &o.value))
2410            },
2411            Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
2412            None => (None, None),
2413        };
2414
2415        // Handle FETCH clause (alternative to LIMIT, mutually exclusive)
2416        let limit = match (fetch, limit) {
2417            (Some(fetch), None) => fetch.quantity.as_ref(),
2418            (Some(_), Some(_)) => {
2419                polars_bail!(SQLSyntax: "cannot use both `LIMIT` and `FETCH` in the same query")
2420            },
2421            (None, limit) => limit,
2422        };
2423
2424        // Apply limit and/or offset
2425        match (offset, limit) {
2426            (
2427                Some(SQLExpr::Value(ValueWithSpan {
2428                    value: SQLValue::Number(offset, _),
2429                    ..
2430                })),
2431                Some(SQLExpr::Value(ValueWithSpan {
2432                    value: SQLValue::Number(limit, _),
2433                    ..
2434                })),
2435            ) => Ok(lf.slice(
2436                offset
2437                    .parse()
2438                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2439                limit.parse().map_err(
2440                    |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2441                )?,
2442            )),
2443            (
2444                Some(SQLExpr::Value(ValueWithSpan {
2445                    value: SQLValue::Number(offset, _),
2446                    ..
2447                })),
2448                None,
2449            ) => Ok(lf.slice(
2450                offset
2451                    .parse()
2452                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2453                IdxSize::MAX,
2454            )),
2455            (
2456                None,
2457                Some(SQLExpr::Value(ValueWithSpan {
2458                    value: SQLValue::Number(limit, _),
2459                    ..
2460                })),
2461            ) => {
2462                Ok(lf.limit(limit.parse().map_err(
2463                    |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2464                )?))
2465            },
2466            (None, None) => Ok(lf),
2467            _ => polars_bail!(
2468                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET/FETCH are not supported",
2469            ),
2470        }
2471    }
2472
2473    fn process_qualified_wildcard(
2474        &mut self,
2475        ObjectName(idents): &ObjectName,
2476        options: &WildcardAdditionalOptions,
2477        modifiers: &mut SelectModifiers,
2478        schema: Option<&Schema>,
2479    ) -> PolarsResult<Vec<Expr>> {
2480        let mut idents_with_wildcard: Vec<Ident> = idents
2481            .iter()
2482            .filter_map(|p| p.as_ident().cloned())
2483            .collect();
2484        idents_with_wildcard.push(Ident::new("*"));
2485
2486        let exprs = resolve_compound_identifier(self, &idents_with_wildcard, schema)?;
2487        self.process_wildcard_additional_options(exprs, options, modifiers, schema)
2488    }
2489
2490    fn process_wildcard_additional_options(
2491        &mut self,
2492        exprs: Vec<Expr>,
2493        options: &WildcardAdditionalOptions,
2494        modifiers: &mut SelectModifiers,
2495        schema: Option<&Schema>,
2496    ) -> PolarsResult<Vec<Expr>> {
2497        if options.opt_except.is_some() && options.opt_exclude.is_some() {
2498            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2499        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2500            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2501        }
2502
2503        // SELECT * EXCLUDE
2504        if let Some(items) = &options.opt_exclude {
2505            match items {
2506                ExcludeSelectItem::Single(ident) => {
2507                    modifiers.exclude.insert(ident.value.clone());
2508                },
2509                ExcludeSelectItem::Multiple(idents) => {
2510                    modifiers
2511                        .exclude
2512                        .extend(idents.iter().map(|i| i.value.clone()));
2513                },
2514            };
2515        }
2516
2517        // SELECT * EXCEPT
2518        if let Some(items) = &options.opt_except {
2519            modifiers.exclude.insert(items.first_element.value.clone());
2520            modifiers
2521                .exclude
2522                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2523        }
2524
2525        // SELECT * ILIKE
2526        if let Some(item) = &options.opt_ilike {
2527            let rx = regex::escape(item.pattern.as_str())
2528                .replace('%', ".*")
2529                .replace('_', ".");
2530
2531            modifiers.ilike = Some(
2532                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2533            );
2534        }
2535
2536        // SELECT * RENAME
2537        if let Some(items) = &options.opt_rename {
2538            let renames = match items {
2539                RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2540                RenameSelectItem::Multiple(renames) => renames.as_slice(),
2541            };
2542            for rn in renames {
2543                let before = PlSmallStr::from_str(rn.ident.value.as_str());
2544                let after = PlSmallStr::from_str(rn.alias.value.as_str());
2545                if before != after {
2546                    modifiers.rename.insert(before, after);
2547                }
2548            }
2549        }
2550
2551        // SELECT * REPLACE
2552        if let Some(replacements) = &options.opt_replace {
2553            for rp in &replacements.items {
2554                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2555                modifiers
2556                    .replace
2557                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2558            }
2559        }
2560        Ok(exprs)
2561    }
2562
2563    fn rename_columns_from_table_alias(
2564        &mut self,
2565        mut lf: LazyFrame,
2566        alias: &TableAlias,
2567    ) -> PolarsResult<LazyFrame> {
2568        if alias.columns.is_empty() {
2569            Ok(lf)
2570        } else {
2571            let schema = self.get_frame_schema(&mut lf)?;
2572            if alias.columns.len() != schema.len() {
2573                polars_bail!(
2574                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2575                    alias.columns.len(), alias.name.value, schema.len()
2576                )
2577            } else {
2578                let existing_columns: Vec<_> = schema.iter_names().collect();
2579                let new_columns: Vec<_> =
2580                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
2581                Ok(lf.rename(existing_columns, new_columns, true))
2582            }
2583        }
2584    }
2585}
2586
2587impl SQLContext {
2588    /// Create a new SQLContext from a table map. For internal use only
2589    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2590        Self {
2591            table_map: Arc::new(RwLock::new(table_map)),
2592            ..Default::default()
2593        }
2594    }
2595}
2596
2597fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2598    match expr {
2599        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2600            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2601            schema
2602                .iter_names()
2603                .filter(|name| re.is_match(name))
2604                .map(|name| col(name.clone()))
2605                .collect::<Vec<_>>()
2606        },
2607        Expr::Selector(s) => s
2608            .into_columns(schema, &Default::default())
2609            .unwrap()
2610            .into_iter()
2611            .map(col)
2612            .collect::<Vec<_>>(),
2613        _ => vec![expr],
2614    }
2615}
2616
2617fn is_regex_colname(nm: &str) -> bool {
2618    nm.starts_with('^') && nm.ends_with('$')
2619}
2620
2621/// Extract column names from a USING clause in a JoinOperator (if present).
2622fn get_using_cols(op: &JoinOperator) -> Option<impl Iterator<Item = String> + '_> {
2623    use JoinOperator::*;
2624    match op {
2625        Join(JoinConstraint::Using(cols))
2626        | Inner(JoinConstraint::Using(cols))
2627        | Left(JoinConstraint::Using(cols))
2628        | LeftOuter(JoinConstraint::Using(cols))
2629        | Right(JoinConstraint::Using(cols))
2630        | RightOuter(JoinConstraint::Using(cols))
2631        | FullOuter(JoinConstraint::Using(cols))
2632        | Semi(JoinConstraint::Using(cols))
2633        | Anti(JoinConstraint::Using(cols))
2634        | LeftSemi(JoinConstraint::Using(cols))
2635        | LeftAnti(JoinConstraint::Using(cols))
2636        | RightSemi(JoinConstraint::Using(cols))
2637        | RightAnti(JoinConstraint::Using(cols)) => Some(cols.iter().filter_map(|c| {
2638            c.0.first()
2639                .and_then(|p| p.as_ident())
2640                .map(|i| i.value.clone())
2641        })),
2642        _ => None,
2643    }
2644}
2645
2646/// Extract the table name (or alias) from a TableFactor.
2647pub(crate) fn get_table_name(factor: &TableFactor) -> Option<String> {
2648    match factor {
2649        TableFactor::Table { name, alias, .. } => {
2650            alias.as_ref().map(|a| a.name.value.clone()).or_else(|| {
2651                name.0
2652                    .last()
2653                    .and_then(|p| p.as_ident())
2654                    .map(|i| i.value.clone())
2655            })
2656        },
2657        TableFactor::Derived { alias, .. }
2658        | TableFactor::NestedJoin { alias, .. }
2659        | TableFactor::TableFunction { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2660        _ => None,
2661    }
2662}
2663
2664/// Check if an expression is a simple column reference (with optional alias) to the given name.
2665fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2666    match expr {
2667        Expr::Column(n) => n == col_name,
2668        Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2669        _ => false,
2670    }
2671}
2672
2673/// Strip the outer alias from an expression (if present) for expression equality comparison.
2674fn strip_outer_alias(expr: &Expr) -> Expr {
2675    if let Expr::Alias(inner, _) = expr {
2676        inner.as_ref().clone()
2677    } else {
2678        expr.clone()
2679    }
2680}
2681
2682/// Resolve a SELECT alias to its underlying expression (for use in GROUP BY).
2683///
2684/// Returns the expression WITH alias if the name matches a projection alias and is NOT a column
2685/// that exists in the schema; otherwise returns `None` to use the default/standard resolution.
2686fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2687    // Original columns take precedence over SELECT aliases
2688    if schema.contains(name) {
2689        return None;
2690    }
2691    // Find a projection with this alias and return its expression (preserving the alias)
2692    projections.iter().find_map(|p| match p {
2693        Expr::Alias(inner, alias) if alias.as_str() == name => {
2694            Some(inner.as_ref().clone().alias(alias.clone()))
2695        },
2696        _ => None,
2697    })
2698}
2699
2700/// Check if all columns referred to in a Polars expression exist in the given Schema.
2701fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2702    let mut found_cols = false;
2703    let mut all_in_schema = true;
2704    for e in expr.into_iter() {
2705        if let Expr::Column(name) = e {
2706            found_cols = true;
2707            if !schema.contains(name.as_str()) {
2708                all_in_schema = false;
2709                break;
2710            }
2711        }
2712    }
2713    found_cols && all_in_schema
2714}
2715
2716/// Determine which parsed join expressions actually belong in `left_om` and which in `right_on`.
2717///
2718/// This needs to be handled carefully because in SQL joins you can write "join on" constraints
2719/// either way round, and in joins with more than two tables you can also join against an earlier
2720/// table (e.g.: you could be joining `df1` to `df2` to `df3`, but the final join condition where
2721/// we join `df2` to `df3` could refer to `df1.a = df3.b`; this takes a little more work to
2722/// resolve as our native `join` function operates on only two tables at a time.
2723fn determine_left_right_join_on(
2724    ctx: &mut SQLContext,
2725    expr_left: &SQLExpr,
2726    expr_right: &SQLExpr,
2727    tbl_left: &TableInfo,
2728    tbl_right: &TableInfo,
2729    join_schema: &Schema,
2730) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2731    // parse, removing any aliases that may have been added by `resolve_column`
2732    // (called inside `parse_sql_expr`) as we need the actual/underlying col
2733    let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2734        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2735        e => e,
2736    };
2737    let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2738        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2739        e => e,
2740    };
2741
2742    // ------------------------------------------------------------------
2743    // simple/typical case: can fully resolve SQL-level table references
2744    // ------------------------------------------------------------------
2745    let left_refs = (
2746        expr_refers_to_table(expr_left, &tbl_left.name),
2747        expr_refers_to_table(expr_left, &tbl_right.name),
2748    );
2749    let right_refs = (
2750        expr_refers_to_table(expr_right, &tbl_left.name),
2751        expr_refers_to_table(expr_right, &tbl_right.name),
2752    );
2753    // if the SQL-level references unambiguously indicate table ownership, we're done
2754    match (left_refs, right_refs) {
2755        // standard: left expr → left table, right expr → right table
2756        ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2757        // reversed: left expr → right table, right expr → left table
2758        ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2759        // unsupported: one side references *both* tables
2760        ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2761            polars_bail!(
2762               SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2763               if left_refs.0 && left_refs.1 {
2764                    "left"
2765                } else {
2766                    "right"
2767                }, tbl_left.name, tbl_right.name
2768            )
2769        },
2770        // fall through to the more involved col/ref resolution
2771        _ => {},
2772    }
2773
2774    // ------------------------------------------------------------------
2775    // more involved: additionally employ schema-based column resolution
2776    // (applies to unqualified columns and/or chained joins)
2777    // ------------------------------------------------------------------
2778    let left_on_cols_in = (
2779        expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2780        expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2781    );
2782    let right_on_cols_in = (
2783        expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2784        expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2785    );
2786    match (left_on_cols_in, right_on_cols_in) {
2787        // each expression's columns exist in exactly one schema
2788        ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2789        ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2790        // one expression in both, other only in one; prefer the unique one
2791        ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2792        ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2793        ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2794        ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2795        // pass through as-is
2796        _ => Ok((vec![left_on], vec![right_on])),
2797    }
2798}
2799
2800/// Returns `(left_on, right_on, join_where_predicates)`.
2801///
2802/// - Equi-conditions (`=`) are returned as paired `left_on`/`right_on` entries.
2803/// - Non-equi conditions (`<`, `<=`, `>`, `>=`, `!=`) are returned as `join_where` predicates
2804///   that reference columns using their merged-schema names (right columns that conflict with the
2805///   left schema are suffixed).
2806fn process_join_on(
2807    ctx: &mut SQLContext,
2808    sql_expr: &SQLExpr,
2809    tbl_left: &TableInfo,
2810    tbl_right: &TableInfo,
2811) -> PolarsResult<(Vec<Expr>, Vec<Expr>, Vec<Expr>)> {
2812    match sql_expr {
2813        SQLExpr::BinaryOp { left, op, right } => match op {
2814            SQLBinaryOperator::And => {
2815                let (mut left_i, mut right_i, mut preds_i) =
2816                    process_join_on(ctx, left, tbl_left, tbl_right)?;
2817                let (mut left_j, mut right_j, mut preds_j) =
2818                    process_join_on(ctx, right, tbl_left, tbl_right)?;
2819                left_i.append(&mut left_j);
2820                right_i.append(&mut right_j);
2821                preds_i.append(&mut preds_j);
2822                Ok((left_i, right_i, preds_i))
2823            },
2824            SQLBinaryOperator::Eq => {
2825                let join_schema = build_join_schema(tbl_left, tbl_right)?;
2826                let (l, r) = determine_left_right_join_on(
2827                    ctx,
2828                    left,
2829                    right,
2830                    tbl_left,
2831                    tbl_right,
2832                    &join_schema,
2833                )?;
2834                Ok((l, r, vec![]))
2835            },
2836            SQLBinaryOperator::Lt
2837            | SQLBinaryOperator::LtEq
2838            | SQLBinaryOperator::Gt
2839            | SQLBinaryOperator::GtEq
2840            | SQLBinaryOperator::NotEq => {
2841                let join_schema = build_join_schema(tbl_left, tbl_right)?;
2842                let suffix = format!(":{}", tbl_right.name);
2843
2844                // Parse both operands and suffix each independently based on whether
2845                // it references the right table (preserving SQL operand order).
2846                let lhs = suffix_if_right_table(
2847                    parse_sql_expr(left, ctx, Some(&join_schema))?,
2848                    left,
2849                    tbl_left,
2850                    tbl_right,
2851                    &suffix,
2852                );
2853                let rhs = suffix_if_right_table(
2854                    parse_sql_expr(right, ctx, Some(&join_schema))?,
2855                    right,
2856                    tbl_left,
2857                    tbl_right,
2858                    &suffix,
2859                );
2860
2861                let polars_op = match op {
2862                    SQLBinaryOperator::Lt => Operator::Lt,
2863                    SQLBinaryOperator::LtEq => Operator::LtEq,
2864                    SQLBinaryOperator::Gt => Operator::Gt,
2865                    SQLBinaryOperator::GtEq => Operator::GtEq,
2866                    SQLBinaryOperator::NotEq => Operator::NotEq,
2867                    _ => unreachable!(),
2868                };
2869                let predicate = Expr::BinaryExpr {
2870                    left: Arc::new(lhs),
2871                    op: polars_op,
2872                    right: Arc::new(rhs),
2873                };
2874                Ok((vec![], vec![], vec![predicate]))
2875            },
2876            _ => polars_bail!(
2877                SQLInterface: "unsupported join constraint operator '{:?}'", op
2878            ),
2879        },
2880        SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2881        _ => polars_bail!(
2882            SQLInterface: "unsupported join constraint expression: {:?}", sql_expr
2883        ),
2884    }
2885}
2886
2887/// Build a unified schema from both tables; needed for multi/chained joins where suffixed
2888/// intermediary/joined cols aren't in an existing schema.
2889fn build_join_schema(tbl_left: &TableInfo, tbl_right: &TableInfo) -> PolarsResult<Schema> {
2890    let mut join_schema = Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2891    for (name, dtype) in tbl_left.schema.iter() {
2892        join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2893    }
2894    for (name, dtype) in tbl_right.schema.iter() {
2895        if !join_schema.contains(name) {
2896            join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2897        }
2898    }
2899    Ok(join_schema)
2900}
2901
2902/// Rename columns in `expr` that appear in *both* table schemas to their merged-schema
2903/// (right-side suffixed) names, so they resolve against the joined frame.
2904fn suffix_conflicting_columns(
2905    expr: Expr,
2906    tbl_left: &TableInfo,
2907    tbl_right: &TableInfo,
2908    suffix: &str,
2909) -> Expr {
2910    expr.map_expr(|e| match e {
2911        Expr::Column(ref name)
2912            if tbl_left.schema.contains(name.as_str())
2913                && tbl_right.schema.contains(name.as_str()) =>
2914        {
2915            Expr::Column(PlSmallStr::from_string(format!("{name}{suffix}")))
2916        },
2917        other => other,
2918    })
2919}
2920
2921/// Suffix conflicting column names in `expr` if the SQL-level expression references the right
2922/// table. Uses table qualifiers first, falling back to schema membership when unqualified.
2923fn suffix_if_right_table(
2924    expr: Expr,
2925    sql_expr: &SQLExpr,
2926    tbl_left: &TableInfo,
2927    tbl_right: &TableInfo,
2928    suffix: &str,
2929) -> Expr {
2930    // Strip any alias added by resolve_column
2931    let expr = match expr {
2932        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2933        e => e,
2934    };
2935
2936    let refs_left = expr_refers_to_table(sql_expr, &tbl_left.name);
2937    let refs_right = expr_refers_to_table(sql_expr, &tbl_right.name);
2938
2939    let is_right = if refs_right && !refs_left {
2940        true
2941    } else if refs_left {
2942        false
2943    } else {
2944        // Unqualified: check schema membership
2945        !expr_cols_all_in_schema(&expr, &tbl_left.schema)
2946            && expr_cols_all_in_schema(&expr, &tbl_right.schema)
2947    };
2948
2949    if is_right {
2950        suffix_conflicting_columns(expr, tbl_left, tbl_right, suffix)
2951    } else {
2952        expr
2953    }
2954}
2955
2956fn process_join_constraint(
2957    constraint: &JoinConstraint,
2958    tbl_left: &TableInfo,
2959    tbl_right: &TableInfo,
2960    ctx: &mut SQLContext,
2961) -> PolarsResult<(Vec<Expr>, Vec<Expr>, Vec<Expr>)> {
2962    match constraint {
2963        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2964            process_join_on(ctx, expr, tbl_left, tbl_right)
2965        },
2966        JoinConstraint::Using(idents) if !idents.is_empty() => {
2967            let using: Vec<Expr> = idents
2968                .iter()
2969                .map(|ObjectName(parts)| {
2970                    if parts.len() != 1 {
2971                        polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2972                    }
2973                    match parts[0].as_ident() {
2974                        Some(ident) => Ok(col(ident.value.as_str())),
2975                        None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2976                    }
2977                })
2978                .collect::<PolarsResult<Vec<_>>>()?;
2979            Ok((using.clone(), using, vec![]))
2980        },
2981        JoinConstraint::Natural => {
2982            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2983            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2984            let on: Vec<Expr> = left_names
2985                .intersection(&right_names)
2986                .map(|&name| col(name.clone()))
2987                .collect();
2988            if on.is_empty() {
2989                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2990            }
2991            Ok((on.clone(), on, vec![]))
2992        },
2993        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2994    }
2995}
2996
2997/// Flatten a SQL AND-expression tree into individual leaf conditions.
2998fn flatten_and_conditions(expr: &SQLExpr) -> Vec<&SQLExpr> {
2999    match expr {
3000        SQLExpr::BinaryOp {
3001            left,
3002            op: SQLBinaryOperator::And,
3003            right,
3004        } => {
3005            let mut conditions = flatten_and_conditions(left);
3006            conditions.extend(flatten_and_conditions(right));
3007            conditions
3008        },
3009        SQLExpr::Nested(inner) => flatten_and_conditions(inner),
3010        _ => vec![expr],
3011    }
3012}
3013
3014/// Reconstruct a SQL AND-expression tree from a list of conditions.
3015fn combine_and_conditions(conditions: Vec<SQLExpr>) -> Option<SQLExpr> {
3016    conditions
3017        .into_iter()
3018        .reduce(|left, right| SQLExpr::BinaryOp {
3019            left: Box::new(left),
3020            op: SQLBinaryOperator::And,
3021            right: Box::new(right),
3022        })
3023}
3024
3025/// Check if a SQL expression is a join condition (equi or non-equi comparison) that bridges
3026/// the given left table set and the right table (both sides must be qualified).
3027fn is_join_comparison(expr: &SQLExpr, left_tables: &[String], right_table: &str) -> bool {
3028    if let SQLExpr::BinaryOp {
3029        left,
3030        op:
3031            SQLBinaryOperator::Eq
3032            | SQLBinaryOperator::Lt
3033            | SQLBinaryOperator::LtEq
3034            | SQLBinaryOperator::Gt
3035            | SQLBinaryOperator::GtEq
3036            | SQLBinaryOperator::NotEq,
3037        right,
3038    } = expr
3039    {
3040        let left_refs_right = expr_refers_to_table(left, right_table);
3041        let right_refs_right = expr_refers_to_table(right, right_table);
3042
3043        let left_refs_any_left = left_tables
3044            .iter()
3045            .any(|t| expr_refers_to_table(left, t.as_str()));
3046        let right_refs_any_left = left_tables
3047            .iter()
3048            .any(|t| expr_refers_to_table(right, t.as_str()));
3049
3050        // One side references a left table, the other references the right table
3051        (left_refs_right && right_refs_any_left) || (right_refs_right && left_refs_any_left)
3052    } else {
3053        false
3054    }
3055}
3056
3057/// Partition a WHERE clause into join predicates (bridging left tables and the
3058/// right table) and residual filter conditions.
3059fn extract_join_predicates(
3060    where_expr: &Option<SQLExpr>,
3061    left_tables: &[String],
3062    right_table: &str,
3063) -> (Option<SQLExpr>, Option<SQLExpr>) {
3064    let Some(expr) = where_expr else {
3065        return (None, None);
3066    };
3067    let conditions = flatten_and_conditions(expr);
3068    let mut join_conds = Vec::new();
3069    let mut filter_conds = Vec::new();
3070    for cond in conditions {
3071        if is_join_comparison(cond, left_tables, right_table) {
3072            join_conds.push(cond.clone());
3073        } else {
3074            filter_conds.push(cond.clone());
3075        }
3076    }
3077    (
3078        combine_and_conditions(join_conds),
3079        combine_and_conditions(filter_conds),
3080    )
3081}
3082
3083/// Extract table identifiers referenced in a SQL query; uses a visitor to
3084/// collect all table names that appear in FROM clauses, JOINs, TABLE refs
3085/// in set operations, and subqueries.
3086pub fn extract_table_identifiers(
3087    query: &str,
3088    include_schema: bool,
3089    unique: bool,
3090) -> PolarsResult<Vec<String>> {
3091    let mut parser = Parser::new(&GenericDialect);
3092    parser = parser.with_options(ParserOptions {
3093        trailing_commas: true,
3094        ..Default::default()
3095    });
3096    let ast = parser
3097        .try_with_sql(query)
3098        .map_err(to_sql_interface_err)?
3099        .parse_statements()
3100        .map_err(to_sql_interface_err)?;
3101
3102    let mut collector = TableIdentifierCollector {
3103        include_schema,
3104        ..Default::default()
3105    };
3106    for stmt in &ast {
3107        let _ = stmt.visit(&mut collector);
3108    }
3109    Ok(if unique {
3110        collector
3111            .tables
3112            .into_iter()
3113            .collect::<PlIndexSet<_>>()
3114            .into_iter()
3115            .collect()
3116    } else {
3117        collector.tables
3118    })
3119}
3120
3121bitflags::bitflags! {
3122    /// Bitfield indicating whether there exists a projection with the specified height behavior.
3123    ///
3124    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
3125    /// context.
3126    #[derive(PartialEq)]
3127    struct ExprSqlProjectionHeightBehavior: u8 {
3128        /// Maintains the height of input column(s)
3129        const MaintainsColumn = 1 << 0;
3130        /// Height is independent of input, e.g.:
3131        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
3132        /// * aggregations: count(*), first(), sum() etc.
3133        const Independent = 1 << 1;
3134        /// "Inherits" the height of the context, e.g.:
3135        /// * Scalar literals
3136        const InheritsContext = 1 << 2;
3137    }
3138}
3139
3140impl ExprSqlProjectionHeightBehavior {
3141    fn identify_from_expr(expr: &Expr) -> Self {
3142        let mut has_column = false;
3143        let mut has_independent = false;
3144
3145        for e in expr.into_iter() {
3146            use Expr::*;
3147            has_column |= matches!(e, Column(_) | Selector(_));
3148            has_independent |= match e {
3149                // @TODO: This is broken now with functions.
3150                AnonymousFunction { options, .. } => {
3151                    options.returns_scalar() || !options.is_length_preserving()
3152                },
3153                Literal(v) => !v.is_scalar(),
3154                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
3155                Agg { .. } | Len => true,
3156                _ => false,
3157            }
3158        }
3159        if has_independent {
3160            Self::Independent
3161        } else if has_column {
3162            Self::MaintainsColumn
3163        } else {
3164            Self::InheritsContext
3165        }
3166    }
3167}