Skip to main content

polars_sql/
context.rs

1use std::ops::Deref;
2use std::sync::RwLock;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::aliases::{PlHashSet, PlIndexSet};
11use polars_utils::format_pl_smallstr;
12use sqlparser::ast::{
13    BinaryOperator as SQLBinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct,
14    ExcludeSelectItem, Expr as SQLExpr, Fetch, FromTable, FunctionArg, GroupByExpr, Ident,
15    JoinConstraint, JoinOperator, LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName,
16    ObjectType, OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectFlavor, SelectItem,
17    SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
18    TableFactor, TableWithJoins, Truncate, UnaryOperator as SQLUnaryOperator, Value as SQLValue,
19    ValueWithSpan, Values, Visit, WildcardAdditionalOptions, WindowSpec,
20};
21use sqlparser::dialect::GenericDialect;
22use sqlparser::parser::{Parser, ParserOptions};
23
24use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
25use crate::sql_expr::{
26    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
27};
28use crate::sql_visitors::{
29    QualifyExpression, TableIdentifierCollector, check_for_ambiguous_column_refs,
30    expr_has_window_functions, expr_refers_to_table,
31};
32use crate::table_functions::PolarsTableFunctions;
33use crate::types::map_sql_dtype_to_polars;
34
35#[derive(Clone)]
36pub struct TableInfo {
37    pub(crate) frame: LazyFrame,
38    pub(crate) name: PlSmallStr,
39    pub(crate) schema: Arc<Schema>,
40}
41
42struct SelectModifiers {
43    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
44    ilike: Option<regex::Regex>,               // SELECT * ILIKE
45    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
46    replace: Vec<Expr>,                        // SELECT * REPLACE
47}
48impl SelectModifiers {
49    fn matches_ilike(&self, s: &str) -> bool {
50        match &self.ilike {
51            Some(rx) => rx.is_match(s),
52            None => true,
53        }
54    }
55    fn renamed_cols(&self) -> Vec<Expr> {
56        self.rename
57            .iter()
58            .map(|(before, after)| col(before.clone()).alias(after.clone()))
59            .collect()
60    }
61}
62
63/// For SELECT projection items; helps simplify any required disambiguation.
64enum ProjectionItem {
65    QualifiedExprs(PlSmallStr, Vec<Expr>),
66    Exprs(Vec<Expr>),
67}
68
69/// Extract the output column name from an expression (if it has one).
70fn expr_output_name(expr: &Expr) -> Option<&PlSmallStr> {
71    match expr {
72        Expr::Column(name) | Expr::Alias(_, name) => Some(name),
73        _ => None,
74    }
75}
76
77/// Disambiguate qualified wildcard columns that conflict with each other or other projections.
78fn disambiguate_projection_cols(
79    items: Vec<ProjectionItem>,
80    schema: &Schema,
81) -> PolarsResult<Vec<Expr>> {
82    // Establish qualified wildcard names (with counts), and other expression names
83    let mut qualified_wildcard_names: PlHashMap<PlSmallStr, usize> = PlHashMap::new();
84    let mut other_names: PlHashSet<PlSmallStr> = PlHashSet::new();
85    for item in &items {
86        match item {
87            ProjectionItem::QualifiedExprs(_, exprs) => {
88                for expr in exprs {
89                    if let Some(name) = expr_output_name(expr) {
90                        *qualified_wildcard_names.entry(name.clone()).or_insert(0) += 1;
91                    }
92                }
93            },
94            ProjectionItem::Exprs(exprs) => {
95                for expr in exprs {
96                    if let Some(name) = expr_output_name(expr) {
97                        other_names.insert(name.clone());
98                    }
99                }
100            },
101        }
102    }
103
104    // Names requiring disambiguation (duplicates across wildcards, eg: `tbl1.*`,`tbl2.*`)
105    let needs_suffix: PlHashSet<PlSmallStr> = qualified_wildcard_names
106        .into_iter()
107        .filter(|(name, count)| *count > 1 || other_names.contains(name))
108        .map(|(name, _)| name)
109        .collect();
110
111    // Output, applying suffixes where needed
112    let mut result: Vec<Expr> = Vec::new();
113    for item in items {
114        match item {
115            ProjectionItem::QualifiedExprs(tbl_name, exprs) if !needs_suffix.is_empty() => {
116                for expr in exprs {
117                    if let Some(name) = expr_output_name(&expr) {
118                        if needs_suffix.contains(name) {
119                            let suffixed = format_pl_smallstr!("{}:{}", name, tbl_name);
120                            if schema.contains(suffixed.as_str()) {
121                                result.push(col(suffixed));
122                                continue;
123                            }
124                            if other_names.contains(name) {
125                                polars_bail!(
126                                    SQLInterface:
127                                    "column '{}' is duplicated in the SELECT (explicitly, and via the `*` wildcard)", name
128                                );
129                            }
130                        }
131                    }
132                    result.push(expr);
133                }
134            },
135            ProjectionItem::QualifiedExprs(_, exprs) | ProjectionItem::Exprs(exprs) => {
136                result.extend(exprs);
137            },
138        }
139    }
140    Ok(result)
141}
142
143/// The SQLContext is the main entry point for executing SQL queries.
144#[derive(Clone)]
145pub struct SQLContext {
146    pub(crate) table_map: Arc<RwLock<PlHashMap<String, LazyFrame>>>,
147    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
148    pub(crate) lp_arena: Arena<IR>,
149    pub(crate) expr_arena: Arena<AExpr>,
150
151    cte_map: PlHashMap<String, LazyFrame>,
152    table_aliases: PlHashMap<String, String>,
153    joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
154    pub(crate) named_windows: PlHashMap<String, WindowSpec>,
155}
156
157impl Default for SQLContext {
158    fn default() -> Self {
159        Self {
160            function_registry: Arc::new(DefaultFunctionRegistry {}),
161            table_map: Default::default(),
162            cte_map: Default::default(),
163            table_aliases: Default::default(),
164            joined_aliases: Default::default(),
165            named_windows: Default::default(),
166            lp_arena: Default::default(),
167            expr_arena: Default::default(),
168        }
169    }
170}
171
172impl SQLContext {
173    /// Create a new SQLContext.
174    /// ```rust
175    /// # use polars_sql::SQLContext;
176    /// # fn main() {
177    /// let ctx = SQLContext::new();
178    /// # }
179    /// ```
180    pub fn new() -> Self {
181        Self::default()
182    }
183
184    /// Get the names of all registered tables, in sorted order.
185    pub fn get_tables(&self) -> Vec<String> {
186        let mut tables = Vec::from_iter(self.table_map.read().unwrap().keys().cloned());
187        tables.sort_unstable();
188        tables
189    }
190
191    /// Register a [`LazyFrame`] as a table in the SQLContext.
192    /// ```rust
193    /// # use polars_sql::SQLContext;
194    /// # use polars_core::prelude::*;
195    /// # use polars_lazy::prelude::*;
196    /// # fn main() {
197    ///
198    /// let mut ctx = SQLContext::new();
199    /// let df = df! {
200    ///    "a" =>  [1, 2, 3],
201    /// }.unwrap().lazy();
202    ///
203    /// ctx.register("df", df);
204    /// # }
205    ///```
206    pub fn register(&self, name: &str, lf: LazyFrame) {
207        self.table_map.write().unwrap().insert(name.to_owned(), lf);
208    }
209
210    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
211    pub fn unregister(&self, name: &str) {
212        self.table_map.write().unwrap().remove(&name.to_owned());
213    }
214
215    /// Execute a SQL query, returning a [`LazyFrame`].
216    /// ```rust
217    /// # use polars_sql::SQLContext;
218    /// # use polars_core::prelude::*;
219    /// # use polars_lazy::prelude::*;
220    /// # fn main() {
221    ///
222    /// let mut ctx = SQLContext::new();
223    /// let df = df! {
224    ///    "a" =>  [1, 2, 3],
225    /// }
226    /// .unwrap();
227    ///
228    /// ctx.register("df", df.clone().lazy());
229    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
230    /// assert!(sql_df.equals(&df));
231    /// # }
232    ///```
233    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
234        let mut parser = Parser::new(&GenericDialect);
235        parser = parser.with_options(ParserOptions {
236            trailing_commas: true,
237            ..Default::default()
238        });
239
240        let ast = parser
241            .try_with_sql(query)
242            .map_err(to_sql_interface_err)?
243            .parse_statements()
244            .map_err(to_sql_interface_err)?;
245
246        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
247        let res = self.execute_statement(ast.first().unwrap())?;
248
249        // Ensure the result uses the proper arenas.
250        // This will instantiate new arenas with a new version.
251        let lp_arena = std::mem::take(&mut self.lp_arena);
252        let expr_arena = std::mem::take(&mut self.expr_arena);
253        res.set_cached_arena(lp_arena, expr_arena);
254
255        // Every execution should clear the statement-level maps.
256        self.cte_map.clear();
257        self.table_aliases.clear();
258        self.joined_aliases.clear();
259        self.named_windows.clear();
260
261        Ok(res)
262    }
263
264    /// Add a function registry to the SQLContext.
265    /// The registry provides the ability to add custom functions to the SQLContext.
266    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
267        self.function_registry = function_registry;
268        self
269    }
270
271    /// Get the function registry of the SQLContext
272    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
273        &self.function_registry
274    }
275
276    /// Get a mutable reference to the function registry of the SQLContext
277    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
278        Arc::get_mut(&mut self.function_registry).unwrap()
279    }
280}
281
282impl SQLContext {
283    fn isolated(&self) -> Self {
284        Self {
285            // Deep clone to isolate
286            table_map: Arc::new(RwLock::new(self.table_map.read().unwrap().clone())),
287            named_windows: self.named_windows.clone(),
288            cte_map: self.cte_map.clone(),
289
290            ..Default::default()
291        }
292    }
293
294    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
295        let ast = stmt;
296        Ok(match ast {
297            Statement::Query(query) => self.execute_query(query)?,
298            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
299            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
300            stmt @ Statement::Drop {
301                object_type: ObjectType::Table,
302                ..
303            } => self.execute_drop_table(stmt)?,
304            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
305            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
306            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
307            _ => polars_bail!(
308                SQLInterface: "statement type is not supported:\n{:?}", ast,
309            ),
310        })
311    }
312
313    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
314        self.register_ctes(query)?;
315        self.execute_query_no_ctes(query)
316    }
317
318    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
319        self.validate_query(query)?;
320
321        let lf = self.process_query(&query.body, query)?;
322        self.process_limit_offset(lf, &query.limit_clause, &query.fetch)
323    }
324
325    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
326        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
327    }
328
329    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
330        // Resolve the table name in the current scope; multi-stage fallback
331        // * table name → cte name
332        // * table alias → cte alias
333        self.table_map
334            .read()
335            .unwrap()
336            .get(name)
337            .cloned()
338            .or_else(|| self.cte_map.get(name).cloned())
339            .or_else(|| {
340                self.table_aliases.get(name).and_then(|alias| {
341                    self.table_map
342                        .read()
343                        .unwrap()
344                        .get(alias.as_str())
345                        .or_else(|| self.cte_map.get(alias.as_str()))
346                        .cloned()
347                })
348            })
349    }
350
351    /// Execute a query in an isolated context. This prevents subqueries from mutating
352    /// arenas and other context state. Returns both the LazyFrame *and* its associated
353    /// Schema (so that the correct arenas are used when determining schema).
354    pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<LazyFrame>
355    where
356        F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
357    {
358        let mut ctx = self.isolated();
359
360        // Execute query with clean state (eg: nested/subquery)
361        let lf = query(&mut ctx)?;
362
363        // Save state
364        lf.set_cached_arena(ctx.lp_arena, ctx.expr_arena);
365
366        Ok(lf)
367    }
368
369    fn expr_or_ordinal(
370        &mut self,
371        e: &SQLExpr,
372        exprs: &[Expr],
373        selected: Option<&[Expr]>,
374        schema: Option<&Schema>,
375        clause: &str,
376    ) -> PolarsResult<Expr> {
377        match e {
378            SQLExpr::UnaryOp {
379                op: SQLUnaryOperator::Minus,
380                expr,
381            } if matches!(
382                **expr,
383                SQLExpr::Value(ValueWithSpan {
384                    value: SQLValue::Number(_, _),
385                    ..
386                })
387            ) =>
388            {
389                if let SQLExpr::Value(ValueWithSpan {
390                    value: SQLValue::Number(ref idx, _),
391                    ..
392                }) = **expr
393                {
394                    Err(polars_err!(
395                    SQLSyntax:
396                    "negative ordinal values are invalid for {}; found -{}",
397                    clause,
398                    idx
399                    ))
400                } else {
401                    unreachable!()
402                }
403            },
404            SQLExpr::Value(ValueWithSpan {
405                value: SQLValue::Number(idx, _),
406                ..
407            }) => {
408                // note: sql queries are 1-indexed
409                let idx = idx.parse::<usize>().map_err(|_| {
410                    polars_err!(
411                        SQLSyntax:
412                        "negative ordinal values are invalid for {}; found {}",
413                        clause,
414                        idx
415                    )
416                })?;
417                // note: "selected" cols represent final projection order, so we use those for
418                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
419                let cols = if let Some(cols) = selected {
420                    cols
421                } else {
422                    exprs
423                };
424                Ok(cols
425                    .get(idx - 1)
426                    .ok_or_else(|| {
427                        polars_err!(
428                            SQLInterface:
429                            "{} ordinal value must refer to a valid column; found {}",
430                            clause,
431                            idx
432                        )
433                    })?
434                    .clone())
435            },
436            SQLExpr::Value(v) => Err(polars_err!(
437                SQLSyntax:
438                "{} requires a valid expression or positive ordinal; found {}", clause, v,
439            )),
440            _ => {
441                // Handle qualified cross-aliasing in ORDER BY clauses
442                // (eg: `SELECT a AS b, -b AS a ... ORDER BY self.a`)
443                let mut expr = parse_sql_expr(e, self, schema)?;
444                if matches!(e, SQLExpr::CompoundIdentifier(_)) {
445                    if let Some(schema) = schema {
446                        expr = expr.map_expr(|ex| match &ex {
447                            Expr::Column(name) => {
448                                let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
449                                if schema.contains(prefixed.as_str()) {
450                                    col(prefixed)
451                                } else {
452                                    ex
453                                }
454                            },
455                            _ => ex,
456                        });
457                    }
458                }
459                Ok(expr)
460            },
461        }
462    }
463
464    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
465        if let Some(aliases) = self.joined_aliases.get(tbl_name) {
466            if let Some(name) = aliases.get(column_name) {
467                return name.to_string();
468            }
469        }
470        column_name.to_string()
471    }
472
473    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
474        match expr {
475            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
476            SetExpr::Query(nested_query) => {
477                let lf = self.execute_query_no_ctes(nested_query)?;
478                self.process_order_by(lf, &query.order_by, None)
479            },
480            SetExpr::SetOperation {
481                op: SetOperator::Union,
482                set_quantifier,
483                left,
484                right,
485            } => self.process_union(left, right, set_quantifier, query),
486
487            #[cfg(feature = "semi_anti_join")]
488            SetExpr::SetOperation {
489                op: SetOperator::Intersect | SetOperator::Except,
490                set_quantifier,
491                left,
492                right,
493            } => self.process_except_intersect(left, right, set_quantifier, query),
494
495            SetExpr::Values(Values {
496                explicit_row: _,
497                rows,
498                value_keyword: _,
499            }) => self.process_values(rows),
500
501            SetExpr::Table(tbl) => {
502                if let Some(table_name) = tbl.table_name.as_ref() {
503                    self.get_table_from_current_scope(table_name)
504                        .ok_or_else(|| {
505                            polars_err!(
506                                SQLInterface: "no table or alias named '{}' found",
507                                tbl
508                            )
509                        })
510                } else {
511                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
512                }
513            },
514            op => {
515                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
516            },
517        }
518    }
519
520    #[cfg(feature = "semi_anti_join")]
521    fn process_except_intersect(
522        &mut self,
523        left: &SetExpr,
524        right: &SetExpr,
525        quantifier: &SetQuantifier,
526        query: &Query,
527    ) -> PolarsResult<LazyFrame> {
528        let (join_type, op_name) = match *query.body {
529            SetExpr::SetOperation {
530                op: SetOperator::Except,
531                ..
532            } => (JoinType::Anti, "EXCEPT"),
533            _ => (JoinType::Semi, "INTERSECT"),
534        };
535
536        // Note: each side of the EXCEPT/INTERSECT operation should execute
537        // in isolation to prevent context state leakage between them
538        let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
539        let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
540        let lf_schema = self.get_frame_schema(&mut lf)?;
541
542        let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
543        let rf_cols = match quantifier {
544            SetQuantifier::ByName => None,
545            SetQuantifier::Distinct | SetQuantifier::None => {
546                let rf_schema = self.get_frame_schema(&mut rf)?;
547                let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
548                if lf_cols.len() != rf_cols.len() {
549                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
550                }
551                Some(rf_cols)
552            },
553            _ => {
554                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
555            },
556        };
557        let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
558        let joined_tbl = match rf_cols {
559            Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
560            None => join.on(lf_cols).finish(),
561        };
562        let lf = joined_tbl.unique(None, UniqueKeepStrategy::Any);
563        self.process_order_by(lf, &query.order_by, None)
564    }
565
566    fn process_union(
567        &mut self,
568        left: &SetExpr,
569        right: &SetExpr,
570        quantifier: &SetQuantifier,
571        query: &Query,
572    ) -> PolarsResult<LazyFrame> {
573        let quantifier = *quantifier;
574
575        // Note: each side of the UNION operation should execute
576        // in isolation to prevent context state leakage between them
577        let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
578        let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
579
580        let opts = UnionArgs {
581            parallel: true,
582            to_supertypes: true,
583            maintain_order: false,
584            ..Default::default()
585        };
586        let lf = match quantifier {
587            // UNION [ALL | DISTINCT]
588            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
589                let lf_schema = self.get_frame_schema(&mut lf)?;
590                let rf_schema = self.get_frame_schema(&mut rf)?;
591                if lf_schema.len() != rf_schema.len() {
592                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
593                }
594                // rename `rf` columns to match `lf` if they differ; SQL behaves
595                // positionally on UNION ops (unless using the "BY NAME" qualifier)
596                if lf_schema.iter_names().ne(rf_schema.iter_names()) {
597                    rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
598                }
599                let concatenated = concat(vec![lf, rf], opts);
600                match quantifier {
601                    SetQuantifier::Distinct | SetQuantifier::None => {
602                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
603                    },
604                    _ => concatenated,
605                }
606            },
607            // UNION ALL BY NAME
608            #[cfg(feature = "diagonal_concat")]
609            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
610            // UNION [DISTINCT] BY NAME
611            #[cfg(feature = "diagonal_concat")]
612            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
613                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
614                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
615            },
616            #[allow(unreachable_patterns)]
617            _ => {
618                polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
619            },
620        }?;
621
622        self.process_order_by(lf, &query.order_by, None)
623    }
624
625    /// Process UNNEST as a lateral operation when it contains column references
626    /// (handles `CROSS JOIN UNNEST(col) AS name` by exploding the referenced col).
627    fn process_unnest_lateral(
628        &self,
629        lf: LazyFrame,
630        alias: &Option<TableAlias>,
631        array_exprs: &[SQLExpr],
632        with_offset: bool,
633    ) -> PolarsResult<LazyFrame> {
634        let alias = alias
635            .as_ref()
636            .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
637        polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
638
639        let (mut explode_cols, mut rename_from, mut rename_to) = (
640            Vec::with_capacity(array_exprs.len()),
641            Vec::with_capacity(array_exprs.len()),
642            Vec::with_capacity(array_exprs.len()),
643        );
644        let is_single_col = array_exprs.len() == 1;
645
646        for (i, arr_expr) in array_exprs.iter().enumerate() {
647            let col_name = match arr_expr {
648                SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
649                SQLExpr::CompoundIdentifier(parts) => {
650                    PlSmallStr::from_str(&parts.last().unwrap().value)
651                },
652                SQLExpr::Array(_) => polars_bail!(
653                    SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
654                ),
655                other => polars_bail!(
656                    SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
657                ),
658            };
659            // alias: column name from "AS t(col)", or table alias
660            if let Some(name) = alias
661                .columns
662                .get(i)
663                .map(|c| c.name.value.as_str())
664                .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
665                .filter(|name| !name.is_empty() && *name != col_name.as_str())
666            {
667                rename_from.push(col_name.clone());
668                rename_to.push(PlSmallStr::from_str(name));
669            }
670            explode_cols.push(col_name);
671        }
672
673        let mut lf = lf.explode(
674            Selector::ByName {
675                names: Arc::from(explode_cols),
676                strict: true,
677            },
678            ExplodeOptions {
679                empty_as_null: true,
680                keep_nulls: true,
681            },
682        );
683        if !rename_from.is_empty() {
684            lf = lf.rename(rename_from, rename_to, true);
685        }
686        Ok(lf)
687    }
688
689    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
690        let frame_rows: Vec<Row> = values.iter().map(|row| {
691            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
692                let expr = parse_sql_expr(expr, self, None)?;
693                match expr {
694                    Expr::Literal(value) => {
695                        value.to_any_value()
696                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
697                            .map(|av| av.into_static())
698                    },
699                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
700                }
701            }).collect();
702            row_data.map(Row::new)
703        }).collect::<Result<_, _>>()?;
704
705        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
706    }
707
708    // EXPLAIN SELECT * FROM DF
709    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
710        match stmt {
711            Statement::Explain { statement, .. } => {
712                let lf = self.execute_statement(statement)?;
713                let plan = lf.describe_optimized_plan()?;
714                let plan = plan
715                    .split('\n')
716                    .collect::<Series>()
717                    .with_name(PlSmallStr::from_static("Logical Plan"))
718                    .into_column();
719                let df = DataFrame::new_infer_height(vec![plan])?;
720                Ok(df.lazy())
721            },
722            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
723        }
724    }
725
726    // SHOW TABLES
727    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
728        let tables = Column::new("name".into(), self.get_tables());
729        let df = DataFrame::new_infer_height(vec![tables])?;
730        Ok(df.lazy())
731    }
732
733    // DROP TABLE <tbl>
734    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
735        match stmt {
736            Statement::Drop { names, .. } => {
737                names.iter().for_each(|name| {
738                    self.table_map.write().unwrap().remove(&name.to_string());
739                });
740                Ok(DataFrame::empty().lazy())
741            },
742            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
743        }
744    }
745
746    // DELETE FROM <tbl> [WHERE ...]
747    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
748        if let Statement::Delete(Delete {
749            tables,
750            from,
751            using,
752            selection,
753            returning,
754            order_by,
755            limit,
756            delete_token: _,
757        }) = stmt
758        {
759            let error_message: Option<&'static str> = if !tables.is_empty() {
760                Some("DELETE expects exactly one table name")
761            } else if using.is_some() {
762                Some("DELETE does not support the USING clause")
763            } else if returning.is_some() {
764                Some("DELETE does not support the RETURNING clause")
765            } else if limit.is_some() {
766                Some("DELETE does not support the LIMIT clause")
767            } else if !order_by.is_empty() {
768                Some("DELETE does not support the ORDER BY clause")
769            } else {
770                None
771            };
772
773            if let Some(msg) = error_message {
774                polars_bail!(SQLInterface: msg);
775            }
776
777            let from_tables = match &from {
778                FromTable::WithFromKeyword(from) => from,
779                FromTable::WithoutKeyword(from) => from,
780            };
781            if from_tables.len() > 1 {
782                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
783            }
784            let tbl_expr = from_tables.first().unwrap();
785            if !tbl_expr.joins.is_empty() {
786                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
787            }
788            let (_, lf) = self.get_table(&tbl_expr.relation)?;
789            if selection.is_none() {
790                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
791                Ok(lf.clear())
792            } else {
793                // apply constraint as inverted filter (drops rows matching the selection)
794                Ok(self.process_where(lf.clone(), selection, true, None)?)
795            }
796        } else {
797            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
798        }
799    }
800
801    // TRUNCATE <tbl>
802    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
803        if let Statement::Truncate(Truncate {
804            table_names,
805            partitions,
806            ..
807        }) = stmt
808        {
809            match partitions {
810                None => {
811                    if table_names.len() != 1 {
812                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
813                    }
814                    let tbl = table_names[0].name.to_string();
815                    if let Some(lf) = self.table_map.write().unwrap().get_mut(&tbl) {
816                        *lf = lf.clone().clear();
817                        Ok(lf.clone())
818                    } else {
819                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
820                    }
821                },
822                _ => {
823                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
824                },
825            }
826        } else {
827            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
828        }
829    }
830
831    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
832        self.cte_map.insert(name.to_owned(), lf);
833    }
834
835    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
836        if let Some(with) = &query.with {
837            if with.recursive {
838                polars_bail!(SQLInterface: "recursive CTEs are not supported")
839            }
840            for cte in &with.cte_tables {
841                let cte_name = cte.alias.name.value.clone();
842                let mut lf = self.execute_query(&cte.query)?;
843                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
844                self.register_cte(&cte_name, lf);
845            }
846        }
847        Ok(())
848    }
849
850    fn register_named_windows(
851        &mut self,
852        named_windows: &[NamedWindowDefinition],
853    ) -> PolarsResult<()> {
854        for NamedWindowDefinition(name, expr) in named_windows {
855            let spec = match expr {
856                NamedWindowExpr::NamedWindow(ref_name) => self
857                    .named_windows
858                    .get(&ref_name.value)
859                    .ok_or_else(|| {
860                        polars_err!(
861                            SQLInterface:
862                            "named window '{}' references undefined window '{}'",
863                            name.value, ref_name.value
864                        )
865                    })?
866                    .clone(),
867                NamedWindowExpr::WindowSpec(spec) => spec.clone(),
868            };
869            self.named_windows.insert(name.value.clone(), spec);
870        }
871        Ok(())
872    }
873
874    /// execute the 'FROM' part of the query
875    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
876        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
877        if !tbl_expr.joins.is_empty() {
878            for join in &tbl_expr.joins {
879                // Handle "CROSS JOIN UNNEST(col)" as a lateral join op
880                if let (
881                    JoinOperator::CrossJoin(JoinConstraint::None),
882                    TableFactor::UNNEST {
883                        alias,
884                        array_exprs,
885                        with_offset,
886                        ..
887                    },
888                ) = (&join.join_operator, &join.relation)
889                {
890                    if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
891                        lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
892                        continue;
893                    }
894                }
895
896                let (r_name, mut rf) = self.get_table(&join.relation)?;
897                if r_name.is_empty() {
898                    // Require non-empty to avoid duplicate column errors from nested self-joins.
899                    polars_bail!(
900                        SQLInterface:
901                        "cannot JOIN on unnamed relation; please provide an alias"
902                    )
903                }
904                let left_schema = self.get_frame_schema(&mut lf)?;
905                let right_schema = self.get_frame_schema(&mut rf)?;
906
907                lf = match &join.join_operator {
908                    op @ (JoinOperator::Join(constraint)  // note: bare "join" is inner
909                    | JoinOperator::FullOuter(constraint)
910                    | JoinOperator::Left(constraint)
911                    | JoinOperator::LeftOuter(constraint)
912                    | JoinOperator::Right(constraint)
913                    | JoinOperator::RightOuter(constraint)
914                    | JoinOperator::Inner(constraint)
915                    | JoinOperator::Anti(constraint)
916                    | JoinOperator::Semi(constraint)
917                    | JoinOperator::LeftAnti(constraint)
918                    | JoinOperator::LeftSemi(constraint)
919                    | JoinOperator::RightAnti(constraint)
920                    | JoinOperator::RightSemi(constraint)) => {
921                        let (lf, rf) = match op {
922                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
923                            _ => (lf, rf),
924                        };
925                        self.process_join(
926                            &TableInfo {
927                                frame: lf,
928                                name: (&l_name).into(),
929                                schema: left_schema.clone(),
930                            },
931                            &TableInfo {
932                                frame: rf,
933                                name: (&r_name).into(),
934                                schema: right_schema.clone(),
935                            },
936                            constraint,
937                            match op {
938                                JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
939                                JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
940                                    JoinType::Left
941                                },
942                                JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
943                                    JoinType::Right
944                                },
945                                JoinOperator::FullOuter(_) => JoinType::Full,
946                                #[cfg(feature = "semi_anti_join")]
947                                JoinOperator::Anti(_)
948                                | JoinOperator::LeftAnti(_)
949                                | JoinOperator::RightAnti(_) => JoinType::Anti,
950                                #[cfg(feature = "semi_anti_join")]
951                                JoinOperator::Semi(_)
952                                | JoinOperator::LeftSemi(_)
953                                | JoinOperator::RightSemi(_) => JoinType::Semi,
954                                join_type => polars_bail!(
955                                    SQLInterface:
956                                    "join type '{:?}' not currently supported",
957                                    join_type
958                                ),
959                            },
960                        )?
961                    },
962                    JoinOperator::CrossJoin(JoinConstraint::None) => {
963                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
964                    },
965                    JoinOperator::CrossJoin(constraint) => {
966                        polars_bail!(
967                            SQLInterface:
968                            "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
969                            constraint
970                        )
971                    },
972                    join_type => {
973                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
974                    },
975                };
976
977                // track join-aliased columns so we can resolve/check them later
978                let joined_schema = self.get_frame_schema(&mut lf)?;
979
980                self.joined_aliases.insert(
981                    r_name.clone(),
982                    right_schema
983                        .iter_names()
984                        .filter_map(|name| {
985                            // col exists in both tables and is aliased in the joined result
986                            let aliased_name = format!("{name}:{r_name}");
987                            if left_schema.contains(name)
988                                && joined_schema.contains(aliased_name.as_str())
989                            {
990                                Some((name.to_string(), aliased_name))
991                            } else {
992                                None
993                            }
994                        })
995                        .collect::<PlHashMap<String, String>>(),
996                );
997            }
998        };
999        Ok(lf)
1000    }
1001
1002    /// Check that the SELECT statement only contains supported clauses.
1003    fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
1004        // Destructure "Select" exhaustively; that way if/when new fields are added in
1005        // future sqlparser versions, we'll get a compilation error and can handle them
1006        let Select {
1007            // Supported clauses
1008            distinct: _,
1009            from: _,
1010            group_by: _,
1011            having: _,
1012            named_window: _,
1013            projection: _,
1014            qualify: _,
1015            selection: _,
1016
1017            // Metadata/token fields (can ignore)
1018            flavor: _,
1019            select_token: _,
1020            top_before_distinct: _,
1021            window_before_qualify: _,
1022
1023            // Unsupported clauses
1024            ref cluster_by,
1025            ref connect_by,
1026            ref distribute_by,
1027            ref exclude,
1028            ref into,
1029            ref lateral_views,
1030            ref prewhere,
1031            ref sort_by,
1032            ref top,
1033            ref value_table_mode,
1034        } = *select_stmt;
1035
1036        // Raise specific error messages for unsupported attributes
1037        polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
1038        polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
1039        polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
1040        polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
1041        polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO` clause is not supported");
1042        polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
1043        polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
1044        polars_ensure!(sort_by.is_empty(), SQLInterface: "`SORT BY` clause is not supported; use `ORDER BY` instead");
1045        polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
1046        polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
1047
1048        Ok(())
1049    }
1050
1051    /// Check that the QUERY only contains supported clauses.
1052    fn validate_query(&self, query: &Query) -> PolarsResult<()> {
1053        // As with "Select" validation (above) destructure "Query" exhaustively
1054        let Query {
1055            // Supported clauses
1056            with: _,
1057            body: _,
1058            order_by: _,
1059            limit_clause: _,
1060            fetch,
1061
1062            // Unsupported clauses
1063            for_clause,
1064            format_clause,
1065            locks,
1066            pipe_operators,
1067            settings,
1068        } = query;
1069
1070        // Raise specific error messages for unsupported attributes
1071        polars_ensure!(for_clause.is_none(), SQLInterface: "`FOR` clause is not supported");
1072        polars_ensure!(format_clause.is_none(), SQLInterface: "`FORMAT` clause is not supported");
1073        polars_ensure!(locks.is_empty(), SQLInterface: "`FOR UPDATE/SHARE` locking clause is not supported");
1074        polars_ensure!(pipe_operators.is_empty(), SQLInterface: "pipe operators are not supported");
1075        polars_ensure!(settings.is_none(), SQLInterface: "`SETTINGS` clause is not supported");
1076
1077        // Validate FETCH clause options (if present)
1078        if let Some(Fetch {
1079            quantity: _, // supported
1080            percent,
1081            with_ties,
1082        }) = fetch
1083        {
1084            polars_ensure!(!percent, SQLInterface: "`FETCH` with `PERCENT` is not supported");
1085            polars_ensure!(!with_ties, SQLInterface: "`FETCH` with `WITH TIES` is not supported");
1086        }
1087        Ok(())
1088    }
1089
1090    /// Execute the 'SELECT' part of the query.
1091    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
1092        // Check that the statement doesn't contain unsupported SELECT clauses
1093        self.validate_select(select_stmt)?;
1094
1095        // Parse named windows first, as they may be referenced in the SELECT clause
1096        self.register_named_windows(&select_stmt.named_window)?;
1097
1098        // Get `FROM` table/data
1099        let (mut lf, base_table_name) = if select_stmt.from.is_empty() {
1100            (DataFrame::empty().lazy(), None)
1101        } else {
1102            // Note: implicit joins need more work to support properly,
1103            // explicit joins are preferred for now (ref: #16662)
1104            let from = select_stmt.clone().from;
1105            if from.len() > 1 {
1106                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
1107            }
1108            let tbl_expr = from.first().unwrap();
1109            let lf = self.execute_from_statement(tbl_expr)?;
1110            let base_name = get_table_name(&tbl_expr.relation);
1111            (lf, base_name)
1112        };
1113
1114        // Check for ambiguous column references in SELECT and WHERE (if there were joins)
1115        if let Some(ref base_name) = base_table_name {
1116            if !self.joined_aliases.is_empty() {
1117                // Extract USING columns from joins (these are coalesced and not ambiguous)
1118                let using_cols: PlHashSet<String> = select_stmt
1119                    .from
1120                    .first()
1121                    .into_iter()
1122                    .flat_map(|t| t.joins.iter())
1123                    .filter_map(|join| get_using_cols(&join.join_operator))
1124                    .flatten()
1125                    .collect();
1126
1127                // Check SELECT and WHERE expressions for ambiguous column references
1128                let check_expr = |e| {
1129                    check_for_ambiguous_column_refs(e, &self.joined_aliases, base_name, &using_cols)
1130                };
1131                for item in &select_stmt.projection {
1132                    match item {
1133                        SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
1134                            check_expr(e)?
1135                        },
1136                        _ => {},
1137                    }
1138                }
1139                if let Some(ref where_expr) = select_stmt.selection {
1140                    check_expr(where_expr)?;
1141                }
1142            }
1143        }
1144
1145        // Apply `WHERE` constraint
1146        let mut schema = self.get_frame_schema(&mut lf)?;
1147        lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
1148
1149        // Determine projections
1150        let mut select_modifiers = SelectModifiers {
1151            ilike: None,
1152            exclude: PlHashSet::new(),
1153            rename: PlHashMap::new(),
1154            replace: vec![],
1155        };
1156
1157        // Collect window function cols if QUALIFY is present (we check at the
1158        // SQL level because empty OVER() clauses don't create Expr::Over)
1159        let window_fn_columns = if select_stmt.qualify.is_some() {
1160            select_stmt
1161                .projection
1162                .iter()
1163                .filter_map(|item| match item {
1164                    SelectItem::ExprWithAlias { expr, alias }
1165                        if expr_has_window_functions(expr) =>
1166                    {
1167                        Some(alias.value.clone())
1168                    },
1169                    _ => None,
1170                })
1171                .collect::<PlHashSet<_>>()
1172        } else {
1173            PlHashSet::new()
1174        };
1175
1176        let mut projections =
1177            self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1178
1179        // Apply `UNNEST` expressions
1180        let mut explode_names = Vec::new();
1181        let mut explode_exprs = Vec::new();
1182        let mut explode_lookup = PlHashMap::new();
1183
1184        for expr in &projections {
1185            for e in expr {
1186                if let Expr::Explode { input, .. } = e {
1187                    match input.as_ref() {
1188                        Expr::Column(name) => explode_names.push(name.clone()),
1189                        other_expr => {
1190                            // Note: skip aggregate expressions; those are handled in the GROUP BY phase
1191                            if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1192                                let temp_name =
1193                                    format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1194                                explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1195                                explode_lookup.insert(other_expr.clone(), temp_name.clone());
1196                                explode_names.push(temp_name);
1197                            }
1198                        },
1199                    }
1200                }
1201            }
1202        }
1203        if !explode_names.is_empty() {
1204            if !explode_exprs.is_empty() {
1205                lf = lf.with_columns(explode_exprs);
1206            }
1207            lf = lf.explode(
1208                Selector::ByName {
1209                    names: Arc::from(explode_names),
1210                    strict: true,
1211                },
1212                ExplodeOptions {
1213                    empty_as_null: true,
1214                    keep_nulls: true,
1215                },
1216            );
1217            projections = projections
1218                .into_iter()
1219                .map(|p| {
1220                    // Update "projections" with column refs to the now-exploded expressions
1221                    p.map_expr(|e| match e {
1222                        Expr::Explode { input, .. } => explode_lookup
1223                            .get(input.as_ref())
1224                            .map(|name| Expr::Column(name.clone()))
1225                            .unwrap_or_else(|| input.as_ref().clone()),
1226                        _ => e,
1227                    })
1228                })
1229                .collect();
1230
1231            schema = self.get_frame_schema(&mut lf)?;
1232        }
1233
1234        // Check for "GROUP BY ..." (after determining projections)
1235        let mut group_by_keys: Vec<Expr> = Vec::new();
1236        match &select_stmt.group_by {
1237            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
1238            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1239                if !modifiers.is_empty() {
1240                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1241                }
1242                // Translate the group expressions, resolving ordinal values and SELECT aliases
1243                group_by_keys = group_by_exprs
1244                    .iter()
1245                    .map(|e| match e {
1246                        SQLExpr::Identifier(ident) => {
1247                            resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1248                                || {
1249                                    self.expr_or_ordinal(
1250                                        e,
1251                                        &projections,
1252                                        None,
1253                                        Some(&schema),
1254                                        "GROUP BY",
1255                                    )
1256                                },
1257                                Ok,
1258                            )
1259                        },
1260                        _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1261                    })
1262                    .collect::<PolarsResult<_>>()?
1263            },
1264            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
1265            // nested agg/window funcs to the group key (also ignores literals).
1266            GroupByExpr::All(modifiers) => {
1267                if !modifiers.is_empty() {
1268                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1269                }
1270                projections.iter().for_each(|expr| match expr {
1271                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
1272                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1273                    Expr::Column(_) => group_by_keys.push(expr.clone()),
1274                    Expr::Alias(e, _)
1275                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1276                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1277                        if let Expr::Column(name) = &**e {
1278                            group_by_keys.push(col(name.clone()));
1279                        }
1280                    },
1281                    _ => {
1282                        // If not quick-matched, add if no nested agg/window expressions
1283                        if !has_expr(expr, |e| {
1284                            matches!(e, Expr::Agg(_))
1285                                || matches!(e, Expr::Len)
1286                                || matches!(e, Expr::Over { .. })
1287                                || {
1288                                    #[cfg(feature = "dynamic_group_by")]
1289                                    {
1290                                        matches!(e, Expr::Rolling { .. })
1291                                    }
1292                                    #[cfg(not(feature = "dynamic_group_by"))]
1293                                    {
1294                                        false
1295                                    }
1296                                }
1297                        }) {
1298                            group_by_keys.push(expr.clone())
1299                        }
1300                    },
1301                });
1302            },
1303        };
1304
1305        lf = if group_by_keys.is_empty() {
1306            // The 'having' clause is only valid inside 'group by'
1307            if select_stmt.having.is_some() {
1308                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1309            };
1310
1311            // Final/selected cols, accounting for 'SELECT *' modifiers
1312            let mut retained_cols = Vec::with_capacity(projections.len());
1313            let mut retained_names = Vec::with_capacity(projections.len());
1314            let have_order_by = query.order_by.is_some();
1315
1316            // Initialize containing InheritsContext to handle empty projection case.
1317            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1318
1319            // Note: if there is an 'order by' then we project everything (original cols
1320            // and new projections) and *then* select the final cols; the retained cols
1321            // are used to ensure a correct final projection. If there's no 'order by',
1322            // clause then we can project the final column *expressions* directly.
1323            for p in projections.iter() {
1324                let name = p.to_field(schema.deref())?.name.to_string();
1325                if select_modifiers.matches_ilike(&name)
1326                    && !select_modifiers.exclude.contains(&name)
1327                {
1328                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1329
1330                    retained_cols.push(if have_order_by {
1331                        col(name.as_str())
1332                    } else {
1333                        p.clone()
1334                    });
1335                    retained_names.push(col(name));
1336                }
1337            }
1338
1339            // Apply the remaining modifiers and establish the final projection
1340            if have_order_by {
1341                // We can safely use `with_columns()` and avoid a join if:
1342                // * There is already a projection that projects to the table height.
1343                // * All projection heights inherit from context (e.g. all scalar literals that
1344                //   are to be broadcasted to table height).
1345                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1346                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1347                {
1348                    lf = lf.with_columns(projections);
1349                } else {
1350                    // We hit this branch if the output height is not guaranteed to match the table
1351                    // height. E.g.:
1352                    //
1353                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
1354                    //
1355                    // For these cases we truncate / extend the sorting columns with NULLs to match
1356                    // the output height. We do this by projecting independently and then joining
1357                    // back the original frame on the row index.
1358                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1359                    lf = lf
1360                        .clone()
1361                        .select(projections)
1362                        .with_row_index(NAME, None)
1363                        .join(
1364                            lf.with_row_index(NAME, None),
1365                            [col(NAME)],
1366                            [col(NAME)],
1367                            JoinArgs {
1368                                how: JoinType::Left,
1369                                validation: Default::default(),
1370                                suffix: None,
1371                                slice: None,
1372                                nulls_equal: false,
1373                                coalesce: Default::default(),
1374                                maintain_order: MaintainOrderJoin::Left,
1375                                build_side: None,
1376                            },
1377                        );
1378                }
1379            }
1380            if !select_modifiers.replace.is_empty() {
1381                lf = lf.with_columns(&select_modifiers.replace);
1382            }
1383            if !select_modifiers.rename.is_empty() {
1384                lf = lf.with_columns(select_modifiers.renamed_cols());
1385            }
1386            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1387
1388            // Note: If `have_order_by`, with_columns is already done above.
1389            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1390                && !have_order_by
1391            {
1392                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
1393                lf = lf.with_columns(retained_cols).select(retained_names);
1394            } else {
1395                lf = lf.select(retained_cols);
1396            }
1397            if !select_modifiers.rename.is_empty() {
1398                lf = lf.rename(
1399                    select_modifiers.rename.keys(),
1400                    select_modifiers.rename.values(),
1401                    true,
1402                );
1403            };
1404            lf
1405        } else {
1406            let having = select_stmt
1407                .having
1408                .as_ref()
1409                .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1410                .transpose()?;
1411            lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1412            lf = self.process_order_by(lf, &query.order_by, None)?;
1413
1414            // Drop any extra columns (eg: added to maintain ORDER BY access to original cols)
1415            let output_cols: Vec<_> = projections
1416                .iter()
1417                .map(|p| p.to_field(&schema))
1418                .collect::<PolarsResult<Vec<_>>>()?
1419                .into_iter()
1420                .map(|f| col(f.name))
1421                .collect();
1422
1423            lf.select(&output_cols)
1424        };
1425
1426        // Apply optional QUALIFY clause (filters on window functions).
1427        lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1428
1429        // Apply optional DISTINCT clause.
1430        lf = match &select_stmt.distinct {
1431            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1432            Some(Distinct::On(exprs)) => {
1433                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
1434                let schema = Some(self.get_frame_schema(&mut lf)?);
1435                let cols = exprs
1436                    .iter()
1437                    .map(|e| {
1438                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
1439                        if let Expr::Column(name) = expr {
1440                            Ok(name)
1441                        } else {
1442                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1443                        }
1444                    })
1445                    .collect::<PolarsResult<Vec<_>>>()?;
1446
1447                // DISTINCT ON has to apply the ORDER BY before the operation.
1448                lf = self.process_order_by(lf, &query.order_by, None)?;
1449                return Ok(lf.unique_stable(
1450                    Some(Selector::ByName {
1451                        names: cols.into(),
1452                        strict: true,
1453                    }),
1454                    UniqueKeepStrategy::First,
1455                ));
1456            },
1457            None => lf,
1458        };
1459        Ok(lf)
1460    }
1461
1462    fn column_projections(
1463        &mut self,
1464        select_stmt: &Select,
1465        schema: &SchemaRef,
1466        select_modifiers: &mut SelectModifiers,
1467    ) -> PolarsResult<Vec<Expr>> {
1468        if select_stmt.projection.is_empty()
1469            && select_stmt.flavor == SelectFlavor::FromFirstNoSelect
1470        {
1471            // eg: bare "FROM tbl" is equivalent to "SELECT * FROM tbl".
1472            return Ok(schema.iter_names().map(|name| col(name.clone())).collect());
1473        }
1474        let mut items: Vec<ProjectionItem> = Vec::with_capacity(select_stmt.projection.len());
1475        let mut has_qualified_wildcard = false;
1476
1477        for select_item in &select_stmt.projection {
1478            match select_item {
1479                SelectItem::UnnamedExpr(expr) => {
1480                    items.push(ProjectionItem::Exprs(vec![parse_sql_expr(
1481                        expr,
1482                        self,
1483                        Some(schema),
1484                    )?]));
1485                },
1486                SelectItem::ExprWithAlias { expr, alias } => {
1487                    let expr = parse_sql_expr(expr, self, Some(schema))?;
1488                    items.push(ProjectionItem::Exprs(vec![
1489                        expr.alias(PlSmallStr::from_str(alias.value.as_str())),
1490                    ]));
1491                },
1492                SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1493                    SelectItemQualifiedWildcardKind::ObjectName(obj_name) => {
1494                        let tbl_name = obj_name
1495                            .0
1496                            .last()
1497                            .and_then(|p| p.as_ident())
1498                            .map(|i| PlSmallStr::from_str(&i.value))
1499                            .unwrap_or_default();
1500                        let exprs = self.process_qualified_wildcard(
1501                            obj_name,
1502                            wildcard_options,
1503                            select_modifiers,
1504                            Some(schema),
1505                        )?;
1506                        items.push(ProjectionItem::QualifiedExprs(tbl_name, exprs));
1507                        has_qualified_wildcard = true;
1508                    },
1509                    SelectItemQualifiedWildcardKind::Expr(_) => {
1510                        polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1511                    },
1512                },
1513                SelectItem::Wildcard(wildcard_options) => {
1514                    let cols = schema.iter_names().map(|name| col(name.clone())).collect();
1515                    items.push(ProjectionItem::Exprs(
1516                        self.process_wildcard_additional_options(
1517                            cols,
1518                            wildcard_options,
1519                            select_modifiers,
1520                            Some(schema),
1521                        )?,
1522                    ));
1523                },
1524            }
1525        }
1526
1527        // Disambiguate qualified wildcards (if any) and flatten expressions
1528        let exprs = if has_qualified_wildcard {
1529            disambiguate_projection_cols(items, schema)?
1530        } else {
1531            items
1532                .into_iter()
1533                .flat_map(|item| match item {
1534                    ProjectionItem::Exprs(exprs) | ProjectionItem::QualifiedExprs(_, exprs) => {
1535                        exprs
1536                    },
1537                })
1538                .collect()
1539        };
1540        let flattened_exprs = exprs
1541            .into_iter()
1542            .flat_map(|expr| expand_exprs(expr, schema))
1543            .collect();
1544
1545        Ok(flattened_exprs)
1546    }
1547
1548    fn process_where(
1549        &mut self,
1550        mut lf: LazyFrame,
1551        expr: &Option<SQLExpr>,
1552        invert_filter: bool,
1553        schema: Option<SchemaRef>,
1554    ) -> PolarsResult<LazyFrame> {
1555        if let Some(expr) = expr {
1556            let schema = match schema {
1557                None => self.get_frame_schema(&mut lf)?,
1558                Some(s) => s,
1559            };
1560
1561            // shortcut filter evaluation if given expression is just TRUE or FALSE
1562            let (all_true, all_false) = match expr {
1563                SQLExpr::Value(ValueWithSpan {
1564                    value: SQLValue::Boolean(b),
1565                    ..
1566                }) => (*b, !*b),
1567                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1568                    (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::Eq) => {
1569                        (a.value == b.value, a.value != b.value)
1570                    },
1571                    (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::NotEq) => {
1572                        (a.value != b.value, a.value == b.value)
1573                    },
1574                    _ => (false, false),
1575                },
1576                _ => (false, false),
1577            };
1578            if (all_true && !invert_filter) || (all_false && invert_filter) {
1579                return Ok(lf);
1580            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1581                return Ok(lf.clear());
1582            }
1583
1584            // ...otherwise parse and apply the filter as normal
1585            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1586            if filter_expression.clone().meta().has_multiple_outputs() {
1587                filter_expression = all_horizontal([filter_expression])?;
1588            }
1589            lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1590            lf = if invert_filter {
1591                lf.remove(filter_expression)
1592            } else {
1593                lf.filter(filter_expression)
1594            };
1595        }
1596        Ok(lf)
1597    }
1598
1599    pub(super) fn process_join(
1600        &mut self,
1601        tbl_left: &TableInfo,
1602        tbl_right: &TableInfo,
1603        constraint: &JoinConstraint,
1604        join_type: JoinType,
1605    ) -> PolarsResult<LazyFrame> {
1606        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1607        let coalesce_type = match constraint {
1608            // "NATURAL" joins should coalesce; otherwise we disambiguate
1609            JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1610            _ => JoinCoalesce::KeepColumns,
1611        };
1612        let joined = tbl_left
1613            .frame
1614            .clone()
1615            .join_builder()
1616            .with(tbl_right.frame.clone())
1617            .left_on(left_on)
1618            .right_on(right_on)
1619            .how(join_type)
1620            .suffix(format!(":{}", tbl_right.name))
1621            .coalesce(coalesce_type)
1622            .finish();
1623
1624        Ok(joined)
1625    }
1626
1627    fn process_qualify(
1628        &mut self,
1629        mut lf: LazyFrame,
1630        qualify_expr: &Option<SQLExpr>,
1631        window_fn_columns: &PlHashSet<String>,
1632    ) -> PolarsResult<LazyFrame> {
1633        if let Some(expr) = qualify_expr {
1634            // Check the QUALIFY expression to identify window functions
1635            // and collect column refs (for looking up aliases from SELECT)
1636            let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1637            let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1638            if !has_window_fns && !references_window_alias {
1639                polars_bail!(
1640                    SQLSyntax:
1641                    "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1642                );
1643            }
1644            let schema = self.get_frame_schema(&mut lf)?;
1645            let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1646            if filter_expression.clone().meta().has_multiple_outputs() {
1647                filter_expression = all_horizontal([filter_expression])?;
1648            }
1649            lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1650            lf = lf.filter(filter_expression);
1651        }
1652        Ok(lf)
1653    }
1654
1655    fn process_subqueries(
1656        &mut self,
1657        lf: LazyFrame,
1658        exprs: Vec<&mut Expr>,
1659    ) -> PolarsResult<LazyFrame> {
1660        let mut subplans = vec![];
1661
1662        for e in exprs {
1663            *e = e.clone().try_map_expr(|e| {
1664                if let Expr::SubPlan(lp, names) = e {
1665                    assert_eq!(
1666                        names.len(),
1667                        1,
1668                        "multiple columns in subqueries not yet supported"
1669                    );
1670
1671                    let select_expr = names[0].1.clone();
1672                    let mut lf = LazyFrame::from((**lp).clone());
1673                    let schema = self.get_frame_schema(&mut lf)?;
1674                    polars_ensure!(schema.len() == 1,  SQLSyntax: "SQL subquery returns more than one column");
1675                    let lf = lf.select([select_expr.clone()]);
1676
1677                    subplans.push(lf);
1678                    Ok(Expr::Column(names[0].0.clone()).first())
1679                } else {
1680                    Ok(e)
1681                }
1682            })?;
1683        }
1684
1685        if subplans.is_empty() {
1686            Ok(lf)
1687        } else {
1688            subplans.insert(0, lf);
1689            concat_lf_horizontal(
1690                subplans,
1691                HConcatOptions {
1692                    broadcast_unit_length: true,
1693                    ..Default::default()
1694                },
1695            )
1696        }
1697    }
1698
1699    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1700        if let Statement::CreateTable(CreateTable {
1701            if_not_exists,
1702            name,
1703            query,
1704            columns,
1705            like,
1706            ..
1707        }) = stmt
1708        {
1709            let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1710            if *if_not_exists && self.table_map.read().unwrap().contains_key(tbl_name) {
1711                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1712            }
1713            let lf = match (query, columns.is_empty(), like) {
1714                (Some(query), true, None) => {
1715                    // ----------------------------------------------------
1716                    // CREATE TABLE [IF NOT EXISTS] <name> AS <query>
1717                    // ----------------------------------------------------
1718                    self.execute_query(query)?
1719                },
1720                (None, false, None) => {
1721                    // ----------------------------------------------------
1722                    // CREATE TABLE [IF NOT EXISTS] <name> (<coldef>, ...)
1723                    // ----------------------------------------------------
1724                    let mut schema = Schema::with_capacity(columns.len());
1725                    for col in columns {
1726                        let col_name = col.name.value.as_str();
1727                        let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1728                        schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1729                    }
1730                    DataFrame::empty_with_schema(&schema).lazy()
1731                },
1732                (None, true, Some(like_kind)) => {
1733                    // ----------------------------------------------------
1734                    // CREATE TABLE [IF NOT EXISTS] <name> LIKE <table>
1735                    // ----------------------------------------------------
1736                    let like_name = match like_kind {
1737                        CreateTableLikeKind::Plain(like)
1738                        | CreateTableLikeKind::Parenthesized(like) => &like.name,
1739                    };
1740                    let like_table = like_name
1741                        .0
1742                        .first()
1743                        .unwrap()
1744                        .as_ident()
1745                        .unwrap()
1746                        .value
1747                        .as_str();
1748                    if let Some(table) = self.table_map.read().unwrap().get(like_table).cloned() {
1749                        table.clear()
1750                    } else {
1751                        polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1752                    }
1753                },
1754                // No valid options provided
1755                (None, true, None) => {
1756                    polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1757                },
1758                // Mutually exclusive options
1759                _ => {
1760                    polars_bail!(
1761                        SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1762                        query,
1763                        columns,
1764                        like,
1765                    )
1766                },
1767            };
1768            self.register(tbl_name, lf);
1769
1770            let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1771            Ok(df_created.unwrap().lazy())
1772        } else {
1773            unreachable!()
1774        }
1775    }
1776
1777    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1778        match relation {
1779            TableFactor::Table {
1780                name, alias, args, ..
1781            } => {
1782                if let Some(args) = args {
1783                    return self.execute_table_function(name, alias, &args.args);
1784                }
1785                let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1786                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1787                    match alias {
1788                        Some(alias) => {
1789                            self.table_aliases
1790                                .insert(alias.name.value.clone(), tbl_name.to_string());
1791                            Ok((alias.name.value.clone(), lf))
1792                        },
1793                        None => Ok((tbl_name.to_string(), lf)),
1794                    }
1795                } else {
1796                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1797                }
1798            },
1799            TableFactor::Derived {
1800                lateral,
1801                subquery,
1802                alias,
1803            } => {
1804                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1805                if let Some(alias) = alias {
1806                    let mut lf = self.execute_query_no_ctes(subquery)?;
1807                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1808                    self.table_map
1809                        .write()
1810                        .unwrap()
1811                        .insert(alias.name.value.clone(), lf.clone());
1812                    Ok((alias.name.value.clone(), lf))
1813                } else {
1814                    let lf = self.execute_query_no_ctes(subquery)?;
1815                    Ok(("".to_string(), lf))
1816                }
1817            },
1818            TableFactor::UNNEST {
1819                alias,
1820                array_exprs,
1821                with_offset,
1822                with_offset_alias: _,
1823                ..
1824            } => {
1825                if let Some(alias) = alias {
1826                    let column_names: Vec<Option<PlSmallStr>> = alias
1827                        .columns
1828                        .iter()
1829                        .map(|c| {
1830                            if c.name.value.is_empty() {
1831                                None
1832                            } else {
1833                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1834                            }
1835                        })
1836                        .collect();
1837
1838                    let column_values: Vec<Series> = array_exprs
1839                        .iter()
1840                        .map(|arr| parse_sql_array(arr, self))
1841                        .collect::<Result<_, _>>()?;
1842
1843                    polars_ensure!(!column_names.is_empty(),
1844                        SQLSyntax:
1845                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1846                    );
1847                    if column_names.len() != column_values.len() {
1848                        let plural = if column_values.len() > 1 { "s" } else { "" };
1849                        polars_bail!(
1850                            SQLSyntax:
1851                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1852                        );
1853                    }
1854                    let column_series: Vec<Column> = column_values
1855                        .into_iter()
1856                        .zip(column_names)
1857                        .map(|(s, name)| {
1858                            if let Some(name) = name {
1859                                s.with_name(name)
1860                            } else {
1861                                s
1862                            }
1863                        })
1864                        .map(Column::from)
1865                        .collect();
1866
1867                    let lf = DataFrame::new_infer_height(column_series)?.lazy();
1868
1869                    if *with_offset {
1870                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1871                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1872                    }
1873                    let table_name = alias.name.value.clone();
1874                    self.table_map
1875                        .write()
1876                        .unwrap()
1877                        .insert(table_name.clone(), lf.clone());
1878                    Ok((table_name, lf))
1879                } else {
1880                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1881                }
1882            },
1883            TableFactor::NestedJoin {
1884                table_with_joins,
1885                alias,
1886            } => {
1887                let lf = self.execute_from_statement(table_with_joins)?;
1888                match alias {
1889                    Some(a) => Ok((a.name.value.clone(), lf)),
1890                    None => Ok(("".to_string(), lf)),
1891                }
1892            },
1893            // Support bare table, optionally with an alias, for now
1894            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1895        }
1896    }
1897
1898    fn execute_table_function(
1899        &mut self,
1900        name: &ObjectName,
1901        alias: &Option<TableAlias>,
1902        args: &[FunctionArg],
1903    ) -> PolarsResult<(String, LazyFrame)> {
1904        let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1905        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1906        let (tbl_name, lf) = read_fn.execute(args)?;
1907        #[allow(clippy::useless_asref)]
1908        let tbl_name = alias
1909            .as_ref()
1910            .map(|a| a.name.value.clone())
1911            .unwrap_or_else(|| tbl_name.to_string());
1912
1913        self.table_map
1914            .write()
1915            .unwrap()
1916            .insert(tbl_name.clone(), lf.clone());
1917        Ok((tbl_name, lf))
1918    }
1919
1920    fn process_order_by(
1921        &mut self,
1922        mut lf: LazyFrame,
1923        order_by: &Option<OrderBy>,
1924        selected: Option<&[Expr]>,
1925    ) -> PolarsResult<LazyFrame> {
1926        if order_by.as_ref().is_none_or(|ob| match &ob.kind {
1927            OrderByKind::Expressions(exprs) => exprs.is_empty(),
1928            OrderByKind::All(_) => false,
1929        }) {
1930            return Ok(lf);
1931        }
1932        let schema = self.get_frame_schema(&mut lf)?;
1933        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1934        let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
1935            OrderByKind::Expressions(exprs) => {
1936                // TODO: will look at making an upstream PR that allows us to more easily
1937                //  create a GenericDialect variant supporting "OrderByKind::All" instead
1938                if exprs.len() == 1
1939                    && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
1940                        if ident.value.to_uppercase() == "ALL"
1941                        && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1942                {
1943                    // Treat as ORDER BY ALL
1944                    let n_cols = if let Some(selected) = selected {
1945                        selected.len()
1946                    } else {
1947                        schema.len()
1948                    };
1949                    (vec![], Some(&exprs[0].options), n_cols)
1950                } else {
1951                    (exprs.clone(), None, exprs.len())
1952                }
1953            },
1954            OrderByKind::All(opts) => {
1955                let n_cols = if let Some(selected) = selected {
1956                    selected.len()
1957                } else {
1958                    schema.len()
1959                };
1960                (vec![], Some(opts), n_cols)
1961            },
1962        };
1963        let mut descending = Vec::with_capacity(n_order_cols);
1964        let mut nulls_last = Vec::with_capacity(n_order_cols);
1965        let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
1966
1967        if let Some(opts) = order_by_all {
1968            if let Some(selected) = selected {
1969                by.extend(selected.iter().cloned());
1970            } else {
1971                by.extend(columns_iter);
1972            };
1973            let desc_order = !opts.asc.unwrap_or(true);
1974            nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
1975            descending.resize(by.len(), desc_order);
1976        } else {
1977            let columns = &columns_iter.collect::<Vec<_>>();
1978            for ob in order_by {
1979                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1980                // https://www.postgresql.org/docs/current/queries-order.html
1981                let desc_order = !ob.options.asc.unwrap_or(true);
1982                nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
1983                descending.push(desc_order);
1984
1985                // translate order expression, allowing ordinal values
1986                by.push(self.expr_or_ordinal(
1987                    &ob.expr,
1988                    columns,
1989                    selected,
1990                    Some(&schema),
1991                    "ORDER BY",
1992                )?)
1993            }
1994        }
1995        Ok(lf.sort_by_exprs(
1996            &by,
1997            SortMultipleOptions::default()
1998                .with_order_descending_multi(descending)
1999                .with_nulls_last_multi(nulls_last),
2000        ))
2001    }
2002
2003    fn process_group_by(
2004        &mut self,
2005        mut lf: LazyFrame,
2006        group_by_keys: &[Expr],
2007        projections: &[Expr],
2008        having: Option<Expr>,
2009    ) -> PolarsResult<LazyFrame> {
2010        let schema_before = self.get_frame_schema(&mut lf)?;
2011        let group_by_keys_schema =
2012            expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
2013                format!("group_by keys contained duplicate output name '{duplicate_name}'")
2014            })?;
2015
2016        // Note: remove the `group_by` keys as Polars adds those implicitly.
2017        let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
2018        let mut aggregation_projection = Vec::with_capacity(projections.len());
2019        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
2020        let mut projection_aliases = PlHashSet::new();
2021        let mut group_key_aliases = PlHashSet::new();
2022
2023        // Pre-compute group key data (stripped expression + output name) to avoid repeated work.
2024        // We check both expression AND output name match to avoid cross-aliasing issues.
2025        let group_key_data: Vec<_> = group_by_keys
2026            .iter()
2027            .map(|gk| {
2028                (
2029                    strip_outer_alias(gk),
2030                    gk.to_field(&schema_before).ok().map(|f| f.name),
2031                )
2032            })
2033            .collect();
2034
2035        let projection_matches_group_key: Vec<bool> = projections
2036            .iter()
2037            .map(|p| {
2038                let p_stripped = strip_outer_alias(p);
2039                let p_name = p.to_field(&schema_before).ok().map(|f| f.name);
2040                group_key_data
2041                    .iter()
2042                    .any(|(gk_stripped, gk_name)| *gk_stripped == p_stripped && *gk_name == p_name)
2043            })
2044            .collect();
2045
2046        for (e, &matches_group_key) in projections.iter().zip(&projection_matches_group_key) {
2047            // `Len` represents COUNT(*) so we treat as an aggregation here.
2048            let is_non_group_key_expr = !matches_group_key
2049                && has_expr(e, |e| {
2050                    match e {
2051                        Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
2052                        #[cfg(feature = "dynamic_group_by")]
2053                        Expr::Rolling { .. } => true,
2054                        Expr::Function { function: func, .. }
2055                            if !matches!(func, FunctionExpr::StructExpr(_)) =>
2056                        {
2057                            // If it's a function call containing a column NOT in the group by keys,
2058                            // we treat it as an aggregation.
2059                            has_expr(e, |e| match e {
2060                                Expr::Column(name) => !group_by_keys_schema.contains(name),
2061                                _ => false,
2062                            })
2063                        },
2064                        _ => false,
2065                    }
2066                });
2067
2068            // Note: if simple aliased expression we defer aliasing until after the group_by.
2069            // Use `e_inner` to track the potentially unwrapped expression for field lookup.
2070            let mut e_inner = e;
2071            if let Expr::Alias(expr, alias) = e {
2072                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
2073                    group_key_aliases.insert(alias.as_ref());
2074                    e_inner = expr
2075                } else if let Expr::Function {
2076                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
2077                    ..
2078                } = expr.deref()
2079                {
2080                    projection_overrides
2081                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
2082                } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
2083                    projection_aliases.insert(alias.as_ref());
2084                }
2085            }
2086            let field = e_inner.to_field(&schema_before)?;
2087            if is_non_group_key_expr {
2088                let mut e = e.clone();
2089                if let Expr::Agg(AggExpr::Implode {
2090                    input: expr,
2091                    maintain_order: _,
2092                }) = &e
2093                {
2094                    e = (**expr).clone();
2095                } else if let Expr::Alias(expr, name) = &e {
2096                    if let Expr::Agg(AggExpr::Implode {
2097                        input: expr,
2098                        maintain_order: _,
2099                    }) = expr.as_ref()
2100                    {
2101                        e = (**expr).clone().alias(name.clone());
2102                    }
2103                }
2104                // If aggregation colname conflicts with a group key,
2105                // alias it to avoid duplicate/mis-tracked columns
2106                if group_by_keys_schema.get(&field.name).is_some() {
2107                    let alias_name = format!("__POLARS_AGG_{}", field.name);
2108                    e = e.alias(alias_name.as_str());
2109                    aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
2110                }
2111                aggregation_projection.push(e);
2112            } else if !matches_group_key {
2113                // Non-aggregated columns must be part of the GROUP BY clause
2114                if let Expr::Column(_)
2115                | Expr::Function {
2116                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
2117                    ..
2118                } = e_inner
2119                {
2120                    if !group_by_keys_schema.contains(&field.name) {
2121                        polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
2122                    }
2123                }
2124            }
2125        }
2126
2127        // Process HAVING clause: identify aggregate expressions, reusing those already
2128        // in projections, or compute as temporary columns and then post-filter/discard
2129        let having_filter = if let Some(having_expr) = having {
2130            let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
2131                .iter()
2132                .filter_map(|p| match p {
2133                    Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
2134                        Some((inner.as_ref().clone(), name.clone()))
2135                    },
2136                    e @ (Expr::Agg(_) | Expr::Len) => Some((
2137                        e.clone(),
2138                        e.to_field(&schema_before)
2139                            .map(|f| f.name)
2140                            .unwrap_or_default(),
2141                    )),
2142                    _ => None,
2143                })
2144                .collect();
2145
2146            let mut n_having_aggs = 0;
2147            let updated_having = having_expr.map_expr(|e| {
2148                if !matches!(&e, Expr::Agg(_) | Expr::Len) {
2149                    return e;
2150                }
2151                let name = agg_to_name
2152                    .iter()
2153                    .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
2154                    .unwrap_or_else(|| {
2155                        let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
2156                        aggregation_projection.push(e.clone().alias(n.clone()));
2157                        agg_to_name.push((e.clone(), n.clone()));
2158                        n_having_aggs += 1;
2159                        n
2160                    });
2161                col(name)
2162            });
2163            Some(updated_having)
2164        } else {
2165            None
2166        };
2167
2168        // Apply HAVING filter after aggregation
2169        let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
2170        if let Some(filter_expr) = having_filter {
2171            aggregated = aggregated.filter(filter_expr);
2172        }
2173
2174        let projection_schema =
2175            expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
2176                format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
2177            })?;
2178
2179        // A final projection to get the proper order and any deferred transforms/aliases
2180        // (will also drop any temporary columns created for the HAVING post-filter).
2181        let final_projection = projection_schema
2182            .iter_names()
2183            .zip(projections.iter().zip(&projection_matches_group_key))
2184            .map(|(name, (projection_expr, &matches_group_key))| {
2185                if let Some(expr) = projection_overrides.get(name.as_str()) {
2186                    expr.clone()
2187                } else if let Some(aliased_name) = aliased_aggregations.get(name) {
2188                    col(aliased_name.clone()).alias(name.clone())
2189                } else if group_by_keys_schema.get(name).is_some() && matches_group_key {
2190                    col(name.clone())
2191                } else if group_by_keys_schema.get(name).is_some()
2192                    || projection_aliases.contains(name.as_str())
2193                    || group_key_aliases.contains(name.as_str())
2194                {
2195                    if has_expr(projection_expr, |e| {
2196                        matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
2197                    }) {
2198                        col(name.clone())
2199                    } else {
2200                        projection_expr.clone()
2201                    }
2202                } else {
2203                    col(name.clone())
2204                }
2205            })
2206            .collect::<Vec<_>>();
2207
2208        // Include original GROUP BY columns for ORDER BY access (if aliased).
2209        let mut output_projection = final_projection;
2210        for key_name in group_by_keys_schema.iter_names() {
2211            if !projection_schema.contains(key_name) {
2212                // Original col name not in output - add for ORDER BY access
2213                output_projection.push(col(key_name.clone()));
2214            } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2215                // Original col name in output - check if cross-aliased
2216                let is_cross_aliased = projections.iter().any(|p| {
2217                    p.to_field(&schema_before).is_ok_and(|f| f.name == key_name)
2218                        && !is_simple_col_ref(p, key_name)
2219                });
2220                if is_cross_aliased {
2221                    // Add original name under a prefixed alias for subsequent ORDER BY resolution
2222                    let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2223                    output_projection.push(col(key_name.clone()).alias(internal_name));
2224                }
2225            }
2226        }
2227        Ok(aggregated.select(&output_projection))
2228    }
2229
2230    fn process_limit_offset(
2231        &self,
2232        lf: LazyFrame,
2233        limit_clause: &Option<LimitClause>,
2234        fetch: &Option<Fetch>,
2235    ) -> PolarsResult<LazyFrame> {
2236        // Extract limit and offset from LimitClause
2237        let (limit, offset) = match limit_clause {
2238            Some(LimitClause::LimitOffset {
2239                limit,
2240                offset,
2241                limit_by,
2242            }) => {
2243                if !limit_by.is_empty() {
2244                    // TODO: might be able to support as an aggregate `top_k_by` operation?
2245                    //  (https://clickhouse.com/docs/sql-reference/statements/select/limit-by)
2246                    polars_bail!(SQLSyntax: "`LIMIT <n> BY <exprs>` clause is not supported");
2247                }
2248                (limit.as_ref(), offset.as_ref().map(|o| &o.value))
2249            },
2250            Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
2251            None => (None, None),
2252        };
2253
2254        // Handle FETCH clause (alternative to LIMIT, mutually exclusive)
2255        let limit = match (fetch, limit) {
2256            (Some(fetch), None) => fetch.quantity.as_ref(),
2257            (Some(_), Some(_)) => {
2258                polars_bail!(SQLSyntax: "cannot use both `LIMIT` and `FETCH` in the same query")
2259            },
2260            (None, limit) => limit,
2261        };
2262
2263        // Apply limit and/or offset
2264        match (offset, limit) {
2265            (
2266                Some(SQLExpr::Value(ValueWithSpan {
2267                    value: SQLValue::Number(offset, _),
2268                    ..
2269                })),
2270                Some(SQLExpr::Value(ValueWithSpan {
2271                    value: SQLValue::Number(limit, _),
2272                    ..
2273                })),
2274            ) => Ok(lf.slice(
2275                offset
2276                    .parse()
2277                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2278                limit.parse().map_err(
2279                    |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2280                )?,
2281            )),
2282            (
2283                Some(SQLExpr::Value(ValueWithSpan {
2284                    value: SQLValue::Number(offset, _),
2285                    ..
2286                })),
2287                None,
2288            ) => Ok(lf.slice(
2289                offset
2290                    .parse()
2291                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2292                IdxSize::MAX,
2293            )),
2294            (
2295                None,
2296                Some(SQLExpr::Value(ValueWithSpan {
2297                    value: SQLValue::Number(limit, _),
2298                    ..
2299                })),
2300            ) => {
2301                Ok(lf.limit(limit.parse().map_err(
2302                    |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2303                )?))
2304            },
2305            (None, None) => Ok(lf),
2306            _ => polars_bail!(
2307                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET/FETCH are not supported",
2308            ),
2309        }
2310    }
2311
2312    fn process_qualified_wildcard(
2313        &mut self,
2314        ObjectName(idents): &ObjectName,
2315        options: &WildcardAdditionalOptions,
2316        modifiers: &mut SelectModifiers,
2317        schema: Option<&Schema>,
2318    ) -> PolarsResult<Vec<Expr>> {
2319        let mut idents_with_wildcard: Vec<Ident> = idents
2320            .iter()
2321            .filter_map(|p| p.as_ident().cloned())
2322            .collect();
2323        idents_with_wildcard.push(Ident::new("*"));
2324
2325        let exprs = resolve_compound_identifier(self, &idents_with_wildcard, schema)?;
2326        self.process_wildcard_additional_options(exprs, options, modifiers, schema)
2327    }
2328
2329    fn process_wildcard_additional_options(
2330        &mut self,
2331        exprs: Vec<Expr>,
2332        options: &WildcardAdditionalOptions,
2333        modifiers: &mut SelectModifiers,
2334        schema: Option<&Schema>,
2335    ) -> PolarsResult<Vec<Expr>> {
2336        if options.opt_except.is_some() && options.opt_exclude.is_some() {
2337            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2338        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2339            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2340        }
2341
2342        // SELECT * EXCLUDE
2343        if let Some(items) = &options.opt_exclude {
2344            match items {
2345                ExcludeSelectItem::Single(ident) => {
2346                    modifiers.exclude.insert(ident.value.clone());
2347                },
2348                ExcludeSelectItem::Multiple(idents) => {
2349                    modifiers
2350                        .exclude
2351                        .extend(idents.iter().map(|i| i.value.clone()));
2352                },
2353            };
2354        }
2355
2356        // SELECT * EXCEPT
2357        if let Some(items) = &options.opt_except {
2358            modifiers.exclude.insert(items.first_element.value.clone());
2359            modifiers
2360                .exclude
2361                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2362        }
2363
2364        // SELECT * ILIKE
2365        if let Some(item) = &options.opt_ilike {
2366            let rx = regex::escape(item.pattern.as_str())
2367                .replace('%', ".*")
2368                .replace('_', ".");
2369
2370            modifiers.ilike = Some(
2371                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2372            );
2373        }
2374
2375        // SELECT * RENAME
2376        if let Some(items) = &options.opt_rename {
2377            let renames = match items {
2378                RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2379                RenameSelectItem::Multiple(renames) => renames.as_slice(),
2380            };
2381            for rn in renames {
2382                let before = PlSmallStr::from_str(rn.ident.value.as_str());
2383                let after = PlSmallStr::from_str(rn.alias.value.as_str());
2384                if before != after {
2385                    modifiers.rename.insert(before, after);
2386                }
2387            }
2388        }
2389
2390        // SELECT * REPLACE
2391        if let Some(replacements) = &options.opt_replace {
2392            for rp in &replacements.items {
2393                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2394                modifiers
2395                    .replace
2396                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2397            }
2398        }
2399        Ok(exprs)
2400    }
2401
2402    fn rename_columns_from_table_alias(
2403        &mut self,
2404        mut lf: LazyFrame,
2405        alias: &TableAlias,
2406    ) -> PolarsResult<LazyFrame> {
2407        if alias.columns.is_empty() {
2408            Ok(lf)
2409        } else {
2410            let schema = self.get_frame_schema(&mut lf)?;
2411            if alias.columns.len() != schema.len() {
2412                polars_bail!(
2413                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2414                    alias.columns.len(), alias.name.value, schema.len()
2415                )
2416            } else {
2417                let existing_columns: Vec<_> = schema.iter_names().collect();
2418                let new_columns: Vec<_> =
2419                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
2420                Ok(lf.rename(existing_columns, new_columns, true))
2421            }
2422        }
2423    }
2424}
2425
2426impl SQLContext {
2427    /// Create a new SQLContext from a table map. For internal use only
2428    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2429        Self {
2430            table_map: Arc::new(RwLock::new(table_map)),
2431            ..Default::default()
2432        }
2433    }
2434}
2435
2436fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2437    match expr {
2438        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2439            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2440            schema
2441                .iter_names()
2442                .filter(|name| re.is_match(name))
2443                .map(|name| col(name.clone()))
2444                .collect::<Vec<_>>()
2445        },
2446        Expr::Selector(s) => s
2447            .into_columns(schema, &Default::default())
2448            .unwrap()
2449            .into_iter()
2450            .map(col)
2451            .collect::<Vec<_>>(),
2452        _ => vec![expr],
2453    }
2454}
2455
2456fn is_regex_colname(nm: &str) -> bool {
2457    nm.starts_with('^') && nm.ends_with('$')
2458}
2459
2460/// Extract column names from a USING clause in a JoinOperator (if present).
2461fn get_using_cols(op: &JoinOperator) -> Option<impl Iterator<Item = String> + '_> {
2462    use JoinOperator::*;
2463    match op {
2464        Join(JoinConstraint::Using(cols))
2465        | Inner(JoinConstraint::Using(cols))
2466        | Left(JoinConstraint::Using(cols))
2467        | LeftOuter(JoinConstraint::Using(cols))
2468        | Right(JoinConstraint::Using(cols))
2469        | RightOuter(JoinConstraint::Using(cols))
2470        | FullOuter(JoinConstraint::Using(cols))
2471        | Semi(JoinConstraint::Using(cols))
2472        | Anti(JoinConstraint::Using(cols))
2473        | LeftSemi(JoinConstraint::Using(cols))
2474        | LeftAnti(JoinConstraint::Using(cols))
2475        | RightSemi(JoinConstraint::Using(cols))
2476        | RightAnti(JoinConstraint::Using(cols)) => Some(cols.iter().filter_map(|c| {
2477            c.0.first()
2478                .and_then(|p| p.as_ident())
2479                .map(|i| i.value.clone())
2480        })),
2481        _ => None,
2482    }
2483}
2484
2485/// Extract the table name (or alias) from a TableFactor.
2486fn get_table_name(factor: &TableFactor) -> Option<String> {
2487    match factor {
2488        TableFactor::Table { name, alias, .. } => {
2489            alias.as_ref().map(|a| a.name.value.clone()).or_else(|| {
2490                name.0
2491                    .last()
2492                    .and_then(|p| p.as_ident())
2493                    .map(|i| i.value.clone())
2494            })
2495        },
2496        TableFactor::Derived { alias, .. }
2497        | TableFactor::NestedJoin { alias, .. }
2498        | TableFactor::TableFunction { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2499        _ => None,
2500    }
2501}
2502
2503/// Check if an expression is a simple column reference (with optional alias) to the given name.
2504fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2505    match expr {
2506        Expr::Column(n) => n == col_name,
2507        Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2508        _ => false,
2509    }
2510}
2511
2512/// Strip the outer alias from an expression (if present) for expression equality comparison.
2513fn strip_outer_alias(expr: &Expr) -> Expr {
2514    if let Expr::Alias(inner, _) = expr {
2515        inner.as_ref().clone()
2516    } else {
2517        expr.clone()
2518    }
2519}
2520
2521/// Resolve a SELECT alias to its underlying expression (for use in GROUP BY).
2522///
2523/// Returns the expression WITH alias if the name matches a projection alias and is NOT a column
2524/// that exists in the schema; otherwise returns `None` to use the default/standard resolution.
2525fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2526    // Original columns take precedence over SELECT aliases
2527    if schema.contains(name) {
2528        return None;
2529    }
2530    // Find a projection with this alias and return its expression (preserving the alias)
2531    projections.iter().find_map(|p| match p {
2532        Expr::Alias(inner, alias) if alias.as_str() == name => {
2533            Some(inner.as_ref().clone().alias(alias.clone()))
2534        },
2535        _ => None,
2536    })
2537}
2538
2539/// Check if all columns referred to in a Polars expression exist in the given Schema.
2540fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2541    let mut found_cols = false;
2542    let mut all_in_schema = true;
2543    for e in expr.into_iter() {
2544        if let Expr::Column(name) = e {
2545            found_cols = true;
2546            if !schema.contains(name.as_str()) {
2547                all_in_schema = false;
2548                break;
2549            }
2550        }
2551    }
2552    found_cols && all_in_schema
2553}
2554
2555/// Determine which parsed join expressions actually belong in `left_om` and which in `right_on`.
2556///
2557/// This needs to be handled carefully because in SQL joins you can write "join on" constraints
2558/// either way round, and in joins with more than two tables you can also join against an earlier
2559/// table (e.g.: you could be joining `df1` to `df2` to `df3`, but the final join condition where
2560/// we join `df2` to `df3` could refer to `df1.a = df3.b`; this takes a little more work to
2561/// resolve as our native `join` function operates on only two tables at a time.
2562fn determine_left_right_join_on(
2563    ctx: &mut SQLContext,
2564    expr_left: &SQLExpr,
2565    expr_right: &SQLExpr,
2566    tbl_left: &TableInfo,
2567    tbl_right: &TableInfo,
2568    join_schema: &Schema,
2569) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2570    // parse, removing any aliases that may have been added by `resolve_column`
2571    // (called inside `parse_sql_expr`) as we need the actual/underlying col
2572    let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2573        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2574        e => e,
2575    };
2576    let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2577        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2578        e => e,
2579    };
2580
2581    // ------------------------------------------------------------------
2582    // simple/typical case: can fully resolve SQL-level table references
2583    // ------------------------------------------------------------------
2584    let left_refs = (
2585        expr_refers_to_table(expr_left, &tbl_left.name),
2586        expr_refers_to_table(expr_left, &tbl_right.name),
2587    );
2588    let right_refs = (
2589        expr_refers_to_table(expr_right, &tbl_left.name),
2590        expr_refers_to_table(expr_right, &tbl_right.name),
2591    );
2592    // if the SQL-level references unambiguously indicate table ownership, we're done
2593    match (left_refs, right_refs) {
2594        // standard: left expr → left table, right expr → right table
2595        ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2596        // reversed: left expr → right table, right expr → left table
2597        ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2598        // unsupported: one side references *both* tables
2599        ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2600            polars_bail!(
2601               SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2602               if left_refs.0 && left_refs.1 {
2603                    "left"
2604                } else {
2605                    "right"
2606                }, tbl_left.name, tbl_right.name
2607            )
2608        },
2609        // fall through to the more involved col/ref resolution
2610        _ => {},
2611    }
2612
2613    // ------------------------------------------------------------------
2614    // more involved: additionally employ schema-based column resolution
2615    // (applies to unqualified columns and/or chained joins)
2616    // ------------------------------------------------------------------
2617    let left_on_cols_in = (
2618        expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2619        expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2620    );
2621    let right_on_cols_in = (
2622        expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2623        expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2624    );
2625    match (left_on_cols_in, right_on_cols_in) {
2626        // each expression's columns exist in exactly one schema
2627        ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2628        ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2629        // one expression in both, other only in one; prefer the unique one
2630        ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2631        ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2632        ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2633        ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2634        // pass through as-is
2635        _ => Ok((vec![left_on], vec![right_on])),
2636    }
2637}
2638
2639fn process_join_on(
2640    ctx: &mut SQLContext,
2641    sql_expr: &SQLExpr,
2642    tbl_left: &TableInfo,
2643    tbl_right: &TableInfo,
2644) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2645    match sql_expr {
2646        SQLExpr::BinaryOp { left, op, right } => match op {
2647            SQLBinaryOperator::And => {
2648                let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
2649                let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
2650                left_i.append(&mut left_j);
2651                right_i.append(&mut right_j);
2652                Ok((left_i, right_i))
2653            },
2654            SQLBinaryOperator::Eq => {
2655                // establish unified schema with cols from both tables; needed for multi/chained
2656                // joins where suffixed intermediary/joined cols aren't in an existing schema.
2657                let mut join_schema =
2658                    Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2659                for (name, dtype) in tbl_left.schema.iter() {
2660                    join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2661                }
2662                for (name, dtype) in tbl_right.schema.iter() {
2663                    if !join_schema.contains(name) {
2664                        join_schema.insert_at_index(
2665                            join_schema.len(),
2666                            name.clone(),
2667                            dtype.clone(),
2668                        )?;
2669                    }
2670                }
2671                determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
2672            },
2673            _ => polars_bail!(
2674                // TODO: should be able to support more operators later (via `join_where`?)
2675                SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
2676            ),
2677        },
2678        SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2679        _ => polars_bail!(
2680            SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
2681        ),
2682    }
2683}
2684
2685fn process_join_constraint(
2686    constraint: &JoinConstraint,
2687    tbl_left: &TableInfo,
2688    tbl_right: &TableInfo,
2689    ctx: &mut SQLContext,
2690) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2691    match constraint {
2692        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2693            process_join_on(ctx, expr, tbl_left, tbl_right)
2694        },
2695        JoinConstraint::Using(idents) if !idents.is_empty() => {
2696            let using: Vec<Expr> = idents
2697                .iter()
2698                .map(|ObjectName(parts)| {
2699                    if parts.len() != 1 {
2700                        polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2701                    }
2702                    match parts[0].as_ident() {
2703                        Some(ident) => Ok(col(ident.value.as_str())),
2704                        None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2705                    }
2706                })
2707                .collect::<PolarsResult<Vec<_>>>()?;
2708            Ok((using.clone(), using))
2709        },
2710        JoinConstraint::Natural => {
2711            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2712            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2713            let on: Vec<Expr> = left_names
2714                .intersection(&right_names)
2715                .map(|&name| col(name.clone()))
2716                .collect();
2717            if on.is_empty() {
2718                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2719            }
2720            Ok((on.clone(), on))
2721        },
2722        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2723    }
2724}
2725
2726/// Extract table identifiers referenced in a SQL query; uses a visitor to
2727/// collect all table names that appear in FROM clauses, JOINs, TABLE refs
2728/// in set operations, and subqueries.
2729pub fn extract_table_identifiers(
2730    query: &str,
2731    include_schema: bool,
2732    unique: bool,
2733) -> PolarsResult<Vec<String>> {
2734    let mut parser = Parser::new(&GenericDialect);
2735    parser = parser.with_options(ParserOptions {
2736        trailing_commas: true,
2737        ..Default::default()
2738    });
2739    let ast = parser
2740        .try_with_sql(query)
2741        .map_err(to_sql_interface_err)?
2742        .parse_statements()
2743        .map_err(to_sql_interface_err)?;
2744
2745    let mut collector = TableIdentifierCollector {
2746        include_schema,
2747        ..Default::default()
2748    };
2749    for stmt in &ast {
2750        let _ = stmt.visit(&mut collector);
2751    }
2752    Ok(if unique {
2753        collector
2754            .tables
2755            .into_iter()
2756            .collect::<PlIndexSet<_>>()
2757            .into_iter()
2758            .collect()
2759    } else {
2760        collector.tables
2761    })
2762}
2763
2764bitflags::bitflags! {
2765    /// Bitfield indicating whether there exists a projection with the specified height behavior.
2766    ///
2767    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
2768    /// context.
2769    #[derive(PartialEq)]
2770    struct ExprSqlProjectionHeightBehavior: u8 {
2771        /// Maintains the height of input column(s)
2772        const MaintainsColumn = 1 << 0;
2773        /// Height is independent of input, e.g.:
2774        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
2775        /// * aggregations: count(*), first(), sum() etc.
2776        const Independent = 1 << 1;
2777        /// "Inherits" the height of the context, e.g.:
2778        /// * Scalar literals
2779        const InheritsContext = 1 << 2;
2780    }
2781}
2782
2783impl ExprSqlProjectionHeightBehavior {
2784    fn identify_from_expr(expr: &Expr) -> Self {
2785        let mut has_column = false;
2786        let mut has_independent = false;
2787
2788        for e in expr.into_iter() {
2789            use Expr::*;
2790            has_column |= matches!(e, Column(_) | Selector(_));
2791            has_independent |= match e {
2792                // @TODO: This is broken now with functions.
2793                AnonymousFunction { options, .. } => {
2794                    options.returns_scalar() || !options.is_length_preserving()
2795                },
2796                Literal(v) => !v.is_scalar(),
2797                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
2798                Agg { .. } | Len => true,
2799                _ => false,
2800            }
2801        }
2802        if has_independent {
2803            Self::Independent
2804        } else if has_column {
2805            Self::MaintainsColumn
2806        } else {
2807            Self::InheritsContext
2808        }
2809    }
2810}