polars_sql/
context.rs

1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12    BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
13    FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
14    OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
15    Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16    WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29    pub(crate) frame: LazyFrame,
30    pub(crate) name: PlSmallStr,
31    pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
36    ilike: Option<regex::Regex>,               // SELECT * ILIKE
37    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
38    replace: Vec<Expr>,                        // SELECT * REPLACE
39}
40impl SelectModifiers {
41    fn matches_ilike(&self, s: &str) -> bool {
42        match &self.ilike {
43            Some(rx) => rx.is_match(s),
44            None => true,
45        }
46    }
47    fn renamed_cols(&self) -> Vec<Expr> {
48        self.rename
49            .iter()
50            .map(|(before, after)| col(before.clone()).alias(after.clone()))
51            .collect()
52    }
53}
54
55/// The SQLContext is the main entry point for executing SQL queries.
56#[derive(Clone)]
57pub struct SQLContext {
58    pub(crate) table_map: PlHashMap<String, LazyFrame>,
59    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60    pub(crate) lp_arena: Arena<IR>,
61    pub(crate) expr_arena: Arena<AExpr>,
62
63    cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64    table_aliases: RefCell<PlHashMap<String, String>>,
65    joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69    fn default() -> Self {
70        Self {
71            function_registry: Arc::new(DefaultFunctionRegistry {}),
72            table_map: Default::default(),
73            cte_map: Default::default(),
74            table_aliases: Default::default(),
75            joined_aliases: Default::default(),
76            lp_arena: Default::default(),
77            expr_arena: Default::default(),
78        }
79    }
80}
81
82impl SQLContext {
83    /// Create a new SQLContext.
84    /// ```rust
85    /// # use polars_sql::SQLContext;
86    /// # fn main() {
87    /// let ctx = SQLContext::new();
88    /// # }
89    /// ```
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Get the names of all registered tables, in sorted order.
95    pub fn get_tables(&self) -> Vec<String> {
96        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97        tables.sort_unstable();
98        tables
99    }
100
101    /// Register a [`LazyFrame`] as a table in the SQLContext.
102    /// ```rust
103    /// # use polars_sql::SQLContext;
104    /// # use polars_core::prelude::*;
105    /// # use polars_lazy::prelude::*;
106    /// # fn main() {
107    ///
108    /// let mut ctx = SQLContext::new();
109    /// let df = df! {
110    ///    "a" =>  [1, 2, 3],
111    /// }.unwrap().lazy();
112    ///
113    /// ctx.register("df", df);
114    /// # }
115    ///```
116    pub fn register(&mut self, name: &str, lf: LazyFrame) {
117        self.table_map.insert(name.to_owned(), lf);
118    }
119
120    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
121    pub fn unregister(&mut self, name: &str) {
122        self.table_map.remove(&name.to_owned());
123    }
124
125    /// Execute a SQL query, returning a [`LazyFrame`].
126    /// ```rust
127    /// # use polars_sql::SQLContext;
128    /// # use polars_core::prelude::*;
129    /// # use polars_lazy::prelude::*;
130    /// # fn main() {
131    ///
132    /// let mut ctx = SQLContext::new();
133    /// let df = df! {
134    ///    "a" =>  [1, 2, 3],
135    /// }
136    /// .unwrap();
137    ///
138    /// ctx.register("df", df.clone().lazy());
139    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
140    /// assert!(sql_df.equals(&df));
141    /// # }
142    ///```
143    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144        let mut parser = Parser::new(&GenericDialect);
145        parser = parser.with_options(ParserOptions {
146            trailing_commas: true,
147            ..Default::default()
148        });
149
150        let ast = parser
151            .try_with_sql(query)
152            .map_err(to_sql_interface_err)?
153            .parse_statements()
154            .map_err(to_sql_interface_err)?;
155
156        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157        let res = self.execute_statement(ast.first().unwrap())?;
158
159        // Ensure the result uses the proper arenas.
160        // This will instantiate new arenas with a new version.
161        let lp_arena = std::mem::take(&mut self.lp_arena);
162        let expr_arena = std::mem::take(&mut self.expr_arena);
163        res.set_cached_arena(lp_arena, expr_arena);
164
165        // Every execution should clear the statement-level maps.
166        self.cte_map.borrow_mut().clear();
167        self.table_aliases.borrow_mut().clear();
168        self.joined_aliases.borrow_mut().clear();
169
170        Ok(res)
171    }
172
173    /// add a function registry to the SQLContext
174    /// the registry provides the ability to add custom functions to the SQLContext
175    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176        self.function_registry = function_registry;
177        self
178    }
179
180    /// Get the function registry of the SQLContext
181    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182        &self.function_registry
183    }
184
185    /// Get a mutable reference to the function registry of the SQLContext
186    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187        Arc::get_mut(&mut self.function_registry).unwrap()
188    }
189}
190
191impl SQLContext {
192    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193        let ast = stmt;
194        Ok(match ast {
195            Statement::Query(query) => self.execute_query(query)?,
196            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198            stmt @ Statement::Drop {
199                object_type: ObjectType::Table,
200                ..
201            } => self.execute_drop_table(stmt)?,
202            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
205            _ => polars_bail!(
206                SQLInterface: "statement type is not supported:\n{:?}", ast,
207            ),
208        })
209    }
210
211    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
212        self.register_ctes(query)?;
213        self.execute_query_no_ctes(query)
214    }
215
216    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217        let lf = self.process_query(&query.body, query)?;
218        self.process_limit_offset(lf, &query.limit, &query.offset)
219    }
220
221    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
222        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
223    }
224
225    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
226        let table = self.table_map.get(name).cloned();
227        table
228            .or_else(|| self.cte_map.borrow().get(name).cloned())
229            .or_else(|| {
230                self.table_aliases
231                    .borrow()
232                    .get(name)
233                    .and_then(|alias| self.table_map.get(alias).cloned())
234            })
235    }
236
237    fn expr_or_ordinal(
238        &mut self,
239        e: &SQLExpr,
240        exprs: &[Expr],
241        selected: Option<&[Expr]>,
242        schema: Option<&Schema>,
243        clause: &str,
244    ) -> PolarsResult<Expr> {
245        match e {
246            SQLExpr::UnaryOp {
247                op: UnaryOperator::Minus,
248                expr,
249            } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
250                if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
251                    Err(polars_err!(
252                    SQLSyntax:
253                    "negative ordinal values are invalid for {}; found -{}",
254                    clause,
255                    idx
256                    ))
257                } else {
258                    unreachable!()
259                }
260            },
261            SQLExpr::Value(SQLValue::Number(idx, _)) => {
262                // note: sql queries are 1-indexed
263                let idx = idx.parse::<usize>().map_err(|_| {
264                    polars_err!(
265                        SQLSyntax:
266                        "negative ordinal values are invalid for {}; found {}",
267                        clause,
268                        idx
269                    )
270                })?;
271                // note: "selected" cols represent final projection order, so we use those for
272                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
273                let cols = if let Some(cols) = selected {
274                    cols
275                } else {
276                    exprs
277                };
278                Ok(cols
279                    .get(idx - 1)
280                    .ok_or_else(|| {
281                        polars_err!(
282                            SQLInterface:
283                            "{} ordinal value must refer to a valid column; found {}",
284                            clause,
285                            idx
286                        )
287                    })?
288                    .clone())
289            },
290            SQLExpr::Value(v) => Err(polars_err!(
291                SQLSyntax:
292                "{} requires a valid expression or positive ordinal; found {}", clause, v,
293            )),
294            _ => parse_sql_expr(e, self, schema),
295        }
296    }
297
298    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
299        if self.joined_aliases.borrow().contains_key(tbl_name) {
300            self.joined_aliases
301                .borrow()
302                .get(tbl_name)
303                .and_then(|aliases| aliases.get(column_name))
304                .cloned()
305                .unwrap_or_else(|| column_name.to_string())
306        } else {
307            column_name.to_string()
308        }
309    }
310
311    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
312        match expr {
313            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
314            SetExpr::Query(query) => self.execute_query_no_ctes(query),
315            SetExpr::SetOperation {
316                op: SetOperator::Union,
317                set_quantifier,
318                left,
319                right,
320            } => self.process_union(left, right, set_quantifier, query),
321
322            #[cfg(feature = "semi_anti_join")]
323            SetExpr::SetOperation {
324                op: SetOperator::Intersect | SetOperator::Except,
325                set_quantifier,
326                left,
327                right,
328            } => self.process_except_intersect(left, right, set_quantifier, query),
329
330            SetExpr::Values(Values {
331                explicit_row: _,
332                rows,
333            }) => self.process_values(rows),
334
335            SetExpr::Table(tbl) => {
336                if tbl.table_name.is_some() {
337                    let table_name = tbl.table_name.as_ref().unwrap();
338                    self.get_table_from_current_scope(table_name)
339                        .ok_or_else(|| {
340                            polars_err!(
341                                SQLInterface: "no table or alias named '{}' found",
342                                tbl
343                            )
344                        })
345                } else {
346                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
347                }
348            },
349            op => {
350                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
351            },
352        }
353    }
354
355    #[cfg(feature = "semi_anti_join")]
356    fn process_except_intersect(
357        &mut self,
358        left: &SetExpr,
359        right: &SetExpr,
360        quantifier: &SetQuantifier,
361        query: &Query,
362    ) -> PolarsResult<LazyFrame> {
363        let (join_type, op_name) = match *query.body {
364            SetExpr::SetOperation {
365                op: SetOperator::Except,
366                ..
367            } => (JoinType::Anti, "EXCEPT"),
368            _ => (JoinType::Semi, "INTERSECT"),
369        };
370        let mut lf = self.process_query(left, query)?;
371        let mut rf = self.process_query(right, query)?;
372        let join = lf
373            .clone()
374            .join_builder()
375            .with(rf.clone())
376            .how(join_type)
377            .join_nulls(true);
378
379        let lf_schema = self.get_frame_schema(&mut lf)?;
380        let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
381        let joined_tbl = match quantifier {
382            SetQuantifier::ByName => join.on(lf_cols).finish(),
383            SetQuantifier::Distinct | SetQuantifier::None => {
384                let rf_schema = self.get_frame_schema(&mut rf)?;
385                let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
386                if lf_cols.len() != rf_cols.len() {
387                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
388                }
389                join.left_on(lf_cols).right_on(rf_cols).finish()
390            },
391            _ => {
392                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
393            },
394        };
395        Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
396    }
397
398    fn process_union(
399        &mut self,
400        left: &SetExpr,
401        right: &SetExpr,
402        quantifier: &SetQuantifier,
403        query: &Query,
404    ) -> PolarsResult<LazyFrame> {
405        let mut lf = self.process_query(left, query)?;
406        let mut rf = self.process_query(right, query)?;
407        let opts = UnionArgs {
408            parallel: true,
409            to_supertypes: true,
410            ..Default::default()
411        };
412        match quantifier {
413            // UNION [ALL | DISTINCT]
414            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
415                let lf_schema = self.get_frame_schema(&mut lf)?;
416                let rf_schema = self.get_frame_schema(&mut rf)?;
417                if lf_schema.len() != rf_schema.len() {
418                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
419                }
420                let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
421                match quantifier {
422                    SetQuantifier::Distinct | SetQuantifier::None => {
423                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
424                    },
425                    _ => concatenated,
426                }
427            },
428            // UNION ALL BY NAME
429            #[cfg(feature = "diagonal_concat")]
430            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
431            // UNION [DISTINCT] BY NAME
432            #[cfg(feature = "diagonal_concat")]
433            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
434                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
435                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
436            },
437            #[allow(unreachable_patterns)]
438            _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
439        }
440    }
441
442    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
443        let frame_rows: Vec<Row> = values.iter().map(|row| {
444            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
445                let expr = parse_sql_expr(expr, self, None)?;
446                match expr {
447                    Expr::Literal(value) => {
448                        value.to_any_value()
449                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
450                            .map(|av| av.into_static())
451                    },
452                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
453                }
454            }).collect();
455            row_data.map(Row::new)
456        }).collect::<Result<_, _>>()?;
457
458        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
459    }
460
461    // EXPLAIN SELECT * FROM DF
462    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
463        match stmt {
464            Statement::Explain { statement, .. } => {
465                let lf = self.execute_statement(statement)?;
466                let plan = lf.describe_optimized_plan()?;
467                let plan = plan
468                    .split('\n')
469                    .collect::<Series>()
470                    .with_name(PlSmallStr::from_static("Logical Plan"))
471                    .into_column();
472                let df = DataFrame::new(vec![plan])?;
473                Ok(df.lazy())
474            },
475            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
476        }
477    }
478
479    // SHOW TABLES
480    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
481        let tables = Column::new("name".into(), self.get_tables());
482        let df = DataFrame::new(vec![tables])?;
483        Ok(df.lazy())
484    }
485
486    // DROP TABLE <tbl>
487    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
488        match stmt {
489            Statement::Drop { names, .. } => {
490                names.iter().for_each(|name| {
491                    self.table_map.remove(&name.to_string());
492                });
493                Ok(DataFrame::empty().lazy())
494            },
495            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
496        }
497    }
498
499    // DELETE FROM <tbl> [WHERE ...]
500    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
501        if let Statement::Delete(Delete {
502            tables,
503            from,
504            using,
505            selection,
506            returning,
507            order_by,
508            limit,
509        }) = stmt
510        {
511            if !tables.is_empty()
512                || using.is_some()
513                || returning.is_some()
514                || limit.is_some()
515                || !order_by.is_empty()
516            {
517                let error_message = match () {
518                    _ if !tables.is_empty() => "DELETE expects exactly one table name",
519                    _ if using.is_some() => "DELETE does not support the USING clause",
520                    _ if returning.is_some() => "DELETE does not support the RETURNING clause",
521                    _ if limit.is_some() => "DELETE does not support the LIMIT clause",
522                    _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
523                    _ => unreachable!(),
524                };
525                polars_bail!(SQLInterface: error_message);
526            }
527            let from_tables = match &from {
528                FromTable::WithFromKeyword(from) => from,
529                FromTable::WithoutKeyword(from) => from,
530            };
531            if from_tables.len() > 1 {
532                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
533            }
534            let tbl_expr = from_tables.first().unwrap();
535            if !tbl_expr.joins.is_empty() {
536                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
537            }
538            let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
539            if selection.is_none() {
540                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
541                Ok(DataFrame::empty_with_schema(
542                    lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
543                        .unwrap()
544                        .as_ref(),
545                )
546                .lazy())
547            } else {
548                // apply constraint as inverted filter (drops rows matching the selection)
549                Ok(self.process_where(lf.clone(), selection, true)?)
550            }
551        } else {
552            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
553        }
554    }
555
556    // TRUNCATE <tbl>
557    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
558        if let Statement::Truncate {
559            table_names,
560            partitions,
561            ..
562        } = stmt
563        {
564            match partitions {
565                None => {
566                    if table_names.len() != 1 {
567                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
568                    }
569                    let tbl = table_names[0].to_string();
570                    if let Some(lf) = self.table_map.get_mut(&tbl) {
571                        *lf = DataFrame::empty_with_schema(
572                            lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
573                                .unwrap()
574                                .as_ref(),
575                        )
576                        .lazy();
577                        Ok(lf.clone())
578                    } else {
579                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
580                    }
581                },
582                _ => {
583                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
584                },
585            }
586        } else {
587            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
588        }
589    }
590
591    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
592        self.cte_map.borrow_mut().insert(name.to_owned(), lf);
593    }
594
595    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
596        if let Some(with) = &query.with {
597            if with.recursive {
598                polars_bail!(SQLInterface: "recursive CTEs are not supported")
599            }
600            for cte in &with.cte_tables {
601                let cte_name = cte.alias.name.value.clone();
602                let mut lf = self.execute_query(&cte.query)?;
603                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
604                self.register_cte(&cte_name, lf);
605            }
606        }
607        Ok(())
608    }
609
610    /// execute the 'FROM' part of the query
611    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
612        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
613        if !tbl_expr.joins.is_empty() {
614            for join in &tbl_expr.joins {
615                let (r_name, mut rf) = self.get_table(&join.relation)?;
616                if r_name.is_empty() {
617                    // Require non-empty to avoid duplicate column errors from nested self-joins.
618                    polars_bail!(
619                        SQLInterface:
620                        "cannot join on unnamed relation; please provide an alias"
621                    )
622                }
623                let left_schema = self.get_frame_schema(&mut lf)?;
624                let right_schema = self.get_frame_schema(&mut rf)?;
625
626                lf = match &join.join_operator {
627                    op @ (JoinOperator::FullOuter(constraint)
628                    | JoinOperator::LeftOuter(constraint)
629                    | JoinOperator::RightOuter(constraint)
630                    | JoinOperator::Inner(constraint)
631                    | JoinOperator::Anti(constraint)
632                    | JoinOperator::Semi(constraint)
633                    | JoinOperator::LeftAnti(constraint)
634                    | JoinOperator::LeftSemi(constraint)
635                    | JoinOperator::RightAnti(constraint)
636                    | JoinOperator::RightSemi(constraint)) => {
637                        let (lf, rf) = match op {
638                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
639                            _ => (lf, rf),
640                        };
641                        self.process_join(
642                            &TableInfo {
643                                frame: lf,
644                                name: (&l_name).into(),
645                                schema: left_schema.clone(),
646                            },
647                            &TableInfo {
648                                frame: rf,
649                                name: (&r_name).into(),
650                                schema: right_schema.clone(),
651                            },
652                            constraint,
653                            match op {
654                                JoinOperator::FullOuter(_) => JoinType::Full,
655                                JoinOperator::LeftOuter(_) => JoinType::Left,
656                                JoinOperator::RightOuter(_) => JoinType::Right,
657                                JoinOperator::Inner(_) => JoinType::Inner,
658                                #[cfg(feature = "semi_anti_join")]
659                                JoinOperator::Anti(_)
660                                | JoinOperator::LeftAnti(_)
661                                | JoinOperator::RightAnti(_) => JoinType::Anti,
662                                #[cfg(feature = "semi_anti_join")]
663                                JoinOperator::Semi(_)
664                                | JoinOperator::LeftSemi(_)
665                                | JoinOperator::RightSemi(_) => JoinType::Semi,
666                                join_type => polars_bail!(
667                                    SQLInterface:
668                                    "join type '{:?}' not currently supported",
669                                    join_type
670                                ),
671                            },
672                        )?
673                    },
674                    JoinOperator::CrossJoin => {
675                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
676                    },
677                    join_type => {
678                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
679                    },
680                };
681
682                // track join-aliased columns so we can resolve them later
683                let joined_schema = self.get_frame_schema(&mut lf)?;
684
685                self.joined_aliases.borrow_mut().insert(
686                    r_name.clone(),
687                    right_schema
688                        .iter_names()
689                        .filter_map(|name| {
690                            // col exists in both tables and is aliased in the joined result
691                            let aliased_name = format!("{}:{}", name, r_name);
692                            if left_schema.contains(name)
693                                && joined_schema.contains(aliased_name.as_str())
694                            {
695                                Some((name.to_string(), aliased_name))
696                            } else {
697                                None
698                            }
699                        })
700                        .collect::<PlHashMap<String, String>>(),
701                );
702            }
703        };
704        Ok(lf)
705    }
706
707    /// Execute the 'SELECT' part of the query.
708    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
709        let mut lf = if select_stmt.from.is_empty() {
710            DataFrame::empty().lazy()
711        } else {
712            // Note: implicit joins need more work to support properly,
713            // explicit joins are preferred for now (ref: #16662)
714            let from = select_stmt.clone().from;
715            if from.len() > 1 {
716                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
717            }
718            self.execute_from_statement(from.first().unwrap())?
719        };
720
721        // Filter expression (WHERE clause)
722        let schema = self.get_frame_schema(&mut lf)?;
723        lf = self.process_where(lf, &select_stmt.selection, false)?;
724
725        // 'SELECT *' modifiers
726        let mut select_modifiers = SelectModifiers {
727            ilike: None,
728            exclude: PlHashSet::new(),
729            rename: PlHashMap::new(),
730            replace: vec![],
731        };
732
733        let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
734
735        // Check for "GROUP BY ..." (after determining projections)
736        let mut group_by_keys: Vec<Expr> = Vec::new();
737        match &select_stmt.group_by {
738            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
739            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
740                if !modifiers.is_empty() {
741                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
742                }
743                // translate the group expressions, allowing ordinal values
744                group_by_keys = group_by_exprs
745                    .iter()
746                    .map(|e| {
747                        self.expr_or_ordinal(
748                            e,
749                            &projections,
750                            None,
751                            Some(schema.deref()),
752                            "GROUP BY",
753                        )
754                    })
755                    .collect::<PolarsResult<_>>()?
756            },
757            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
758            // nested agg/window funcs to the group key (also ignores literals).
759            GroupByExpr::All(modifiers) => {
760                if !modifiers.is_empty() {
761                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
762                }
763                projections.iter().for_each(|expr| match expr {
764                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
765                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
766                    Expr::Column(_) => group_by_keys.push(expr.clone()),
767                    Expr::Alias(e, _)
768                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
769                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
770                        if let Expr::Column(name) = &**e {
771                            group_by_keys.push(col(name.clone()));
772                        }
773                    },
774                    _ => {
775                        // If not quick-matched, add if no nested agg/window expressions
776                        if !has_expr(expr, |e| {
777                            matches!(e, Expr::Agg(_))
778                                || matches!(e, Expr::Len)
779                                || matches!(e, Expr::Window { .. })
780                        }) {
781                            group_by_keys.push(expr.clone())
782                        }
783                    },
784                });
785            },
786        };
787
788        lf = if group_by_keys.is_empty() {
789            // The 'having' clause is only valid inside 'group by'
790            if select_stmt.having.is_some() {
791                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
792            };
793
794            // Final/selected cols, accounting for 'SELECT *' modifiers
795            let mut retained_cols = Vec::with_capacity(projections.len());
796            let mut retained_names = Vec::with_capacity(projections.len());
797            let have_order_by = query.order_by.is_some();
798            // Initialize containing InheritsContext to handle empty projection case.
799            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
800
801            // Note: if there is an 'order by' then we project everything (original cols
802            // and new projections) and *then* select the final cols; the retained cols
803            // are used to ensure a correct final projection. If there's no 'order by',
804            // clause then we can project the final column *expressions* directly.
805            for p in projections.iter() {
806                let name = p
807                    .to_field(schema.deref(), Context::Default)?
808                    .name
809                    .to_string();
810                if select_modifiers.matches_ilike(&name)
811                    && !select_modifiers.exclude.contains(&name)
812                {
813                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
814
815                    retained_cols.push(if have_order_by {
816                        col(name.as_str())
817                    } else {
818                        p.clone()
819                    });
820                    retained_names.push(col(name));
821                }
822            }
823
824            // Apply the remaining modifiers and establish the final projection
825            if have_order_by {
826                // We can safely use `with_columns()` and avoid a join if:
827                // * There is already a projection that projects to the table height.
828                // * All projection heights inherit from context (e.g. all scalar literals that
829                //   are to be broadcasted to table height).
830                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
831                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
832                {
833                    lf = lf.with_columns(projections);
834                } else {
835                    // We hit this branch if the output height is not guaranteed to match the table
836                    // height. E.g.:
837                    //
838                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
839                    // * SELECT UNNEST(list_col) FROM df ORDER BY sort_key;
840                    //
841                    // For these cases we truncate / extend the sorting columns with NULLs to match
842                    // the output height. We do this by projecting independently and then joining
843                    // back the original frame on the row index.
844                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
845                    lf = lf
846                        .clone()
847                        .select(projections)
848                        .with_row_index(NAME, None)
849                        .join(
850                            lf.with_row_index(NAME, None),
851                            [col(NAME)],
852                            [col(NAME)],
853                            JoinArgs {
854                                how: JoinType::Left,
855                                validation: Default::default(),
856                                suffix: None,
857                                slice: None,
858                                nulls_equal: false,
859                                coalesce: Default::default(),
860                                maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
861                            },
862                        );
863                }
864            }
865
866            if !select_modifiers.replace.is_empty() {
867                lf = lf.with_columns(&select_modifiers.replace);
868            }
869            if !select_modifiers.rename.is_empty() {
870                lf = lf.with_columns(select_modifiers.renamed_cols());
871            }
872
873            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
874
875            // Note: If `have_order_by`, with_columns is already done above.
876            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
877                && !have_order_by
878            {
879                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
880                lf = lf.with_columns(retained_cols).select(retained_names);
881            } else {
882                lf = lf.select(retained_cols);
883            }
884
885            if !select_modifiers.rename.is_empty() {
886                lf = lf.rename(
887                    select_modifiers.rename.keys(),
888                    select_modifiers.rename.values(),
889                    true,
890                );
891            };
892            lf
893        } else {
894            lf = self.process_group_by(lf, &group_by_keys, &projections)?;
895            lf = self.process_order_by(lf, &query.order_by, None)?;
896
897            // Apply optional 'having' clause, post-aggregation.
898            let schema = Some(self.get_frame_schema(&mut lf)?);
899            match select_stmt.having.as_ref() {
900                Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
901                None => lf,
902            }
903        };
904
905        // Apply optional DISTINCT clause.
906        lf = match &select_stmt.distinct {
907            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
908            Some(Distinct::On(exprs)) => {
909                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
910                let schema = Some(self.get_frame_schema(&mut lf)?);
911                let cols = exprs
912                    .iter()
913                    .map(|e| {
914                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
915                        if let Expr::Column(name) = expr {
916                            Ok(name.clone())
917                        } else {
918                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
919                        }
920                    })
921                    .collect::<PolarsResult<Vec<_>>>()?;
922
923                // DISTINCT ON has to apply the ORDER BY before the operation.
924                lf = self.process_order_by(lf, &query.order_by, None)?;
925                return Ok(lf.unique_stable(Some(cols.clone()), UniqueKeepStrategy::First));
926            },
927            None => lf,
928        };
929        Ok(lf)
930    }
931
932    fn column_projections(
933        &mut self,
934        select_stmt: &Select,
935        schema: &SchemaRef,
936        select_modifiers: &mut SelectModifiers,
937    ) -> PolarsResult<Vec<Expr>> {
938        let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
939            .projection
940            .iter()
941            .map(|select_item| match select_item {
942                SelectItem::UnnamedExpr(expr) => {
943                    Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
944                },
945                SelectItem::ExprWithAlias { expr, alias } => {
946                    let expr = parse_sql_expr(expr, self, Some(schema))?;
947                    Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
948                },
949                SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
950                    .process_qualified_wildcard(
951                        obj_name,
952                        wildcard_options,
953                        select_modifiers,
954                        Some(schema),
955                    ),
956                SelectItem::Wildcard(wildcard_options) => {
957                    let cols = schema
958                        .iter_names()
959                        .map(|name| col(name.clone()))
960                        .collect::<Vec<_>>();
961
962                    self.process_wildcard_additional_options(
963                        cols,
964                        wildcard_options,
965                        select_modifiers,
966                        Some(schema),
967                    )
968                },
969            })
970            .collect();
971
972        let flattened_exprs: Vec<Expr> = parsed_items?
973            .into_iter()
974            .flatten()
975            .flat_map(|expr| expand_exprs(expr, schema))
976            .collect();
977
978        Ok(flattened_exprs)
979    }
980
981    fn process_where(
982        &mut self,
983        mut lf: LazyFrame,
984        expr: &Option<SQLExpr>,
985        invert_filter: bool,
986    ) -> PolarsResult<LazyFrame> {
987        if let Some(expr) = expr {
988            let schema = self.get_frame_schema(&mut lf)?;
989
990            // shortcut filter evaluation if given expression is just TRUE or FALSE
991            let (all_true, all_false) = match expr {
992                SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
993                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
994                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
995                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
996                        (a != b, a == b)
997                    },
998                    _ => (false, false),
999                },
1000                _ => (false, false),
1001            };
1002            if (all_true && !invert_filter) || (all_false && invert_filter) {
1003                return Ok(lf);
1004            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1005                return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1006            }
1007
1008            // ...otherwise parse and apply the filter as normal
1009            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1010            if filter_expression.clone().meta().has_multiple_outputs() {
1011                filter_expression = all_horizontal([filter_expression])?;
1012            }
1013            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1014            lf = if invert_filter {
1015                lf.remove(filter_expression)
1016            } else {
1017                lf.filter(filter_expression)
1018            };
1019        }
1020        Ok(lf)
1021    }
1022
1023    pub(super) fn process_join(
1024        &mut self,
1025        tbl_left: &TableInfo,
1026        tbl_right: &TableInfo,
1027        constraint: &JoinConstraint,
1028        join_type: JoinType,
1029    ) -> PolarsResult<LazyFrame> {
1030        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1031
1032        let joined = tbl_left
1033            .frame
1034            .clone()
1035            .join_builder()
1036            .with(tbl_right.frame.clone())
1037            .left_on(left_on)
1038            .right_on(right_on)
1039            .how(join_type)
1040            .suffix(format!(":{}", tbl_right.name))
1041            .coalesce(JoinCoalesce::KeepColumns)
1042            .finish();
1043
1044        Ok(joined)
1045    }
1046
1047    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1048        let mut contexts = vec![];
1049        for expr in exprs {
1050            *expr = expr.clone().map_expr(|e| match e {
1051                Expr::SubPlan(lp, names) => {
1052                    contexts.push(<LazyFrame>::from((**lp).clone()));
1053                    if names.len() == 1 {
1054                        Expr::Column(names[0].as_str().into())
1055                    } else {
1056                        Expr::SubPlan(lp, names)
1057                    }
1058                },
1059                e => e,
1060            })
1061        }
1062
1063        if contexts.is_empty() {
1064            lf
1065        } else {
1066            lf.with_context(contexts)
1067        }
1068    }
1069
1070    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1071        if let Statement::CreateTable(CreateTable {
1072            if_not_exists,
1073            name,
1074            query,
1075            ..
1076        }) = stmt
1077        {
1078            let tbl_name = name.0.first().unwrap().value.as_str();
1079            // CREATE TABLE IF NOT EXISTS
1080            if *if_not_exists && self.table_map.contains_key(tbl_name) {
1081                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1082                // CREATE OR REPLACE TABLE
1083            }
1084            if let Some(query) = query {
1085                let lf = self.execute_query(query)?;
1086                self.register(tbl_name, lf);
1087                let out = df! {
1088                    "Response" => ["CREATE TABLE"]
1089                }
1090                .unwrap()
1091                .lazy();
1092                Ok(out)
1093            } else {
1094                polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1095            }
1096        } else {
1097            unreachable!()
1098        }
1099    }
1100
1101    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1102        match relation {
1103            TableFactor::Table {
1104                name, alias, args, ..
1105            } => {
1106                if let Some(args) = args {
1107                    return self.execute_table_function(name, alias, &args.args);
1108                }
1109                let tbl_name = name.0.first().unwrap().value.as_str();
1110                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1111                    match alias {
1112                        Some(alias) => {
1113                            self.table_aliases
1114                                .borrow_mut()
1115                                .insert(alias.name.value.clone(), tbl_name.to_string());
1116                            Ok((alias.to_string(), lf))
1117                        },
1118                        None => Ok((tbl_name.to_string(), lf)),
1119                    }
1120                } else {
1121                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1122                }
1123            },
1124            TableFactor::Derived {
1125                lateral,
1126                subquery,
1127                alias,
1128            } => {
1129                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1130                if let Some(alias) = alias {
1131                    let mut lf = self.execute_query_no_ctes(subquery)?;
1132                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1133                    self.table_map.insert(alias.name.value.clone(), lf.clone());
1134                    Ok((alias.name.value.clone(), lf))
1135                } else {
1136                    polars_bail!(SQLSyntax: "derived tables must have aliases");
1137                }
1138            },
1139            TableFactor::UNNEST {
1140                alias,
1141                array_exprs,
1142                with_offset,
1143                with_offset_alias: _,
1144                ..
1145            } => {
1146                if let Some(alias) = alias {
1147                    let table_name = alias.name.value.clone();
1148                    let column_names: Vec<Option<PlSmallStr>> = alias
1149                        .columns
1150                        .iter()
1151                        .map(|c| {
1152                            if c.name.value.is_empty() {
1153                                None
1154                            } else {
1155                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1156                            }
1157                        })
1158                        .collect();
1159
1160                    let column_values: Vec<Series> = array_exprs
1161                        .iter()
1162                        .map(|arr| parse_sql_array(arr, self))
1163                        .collect::<Result<_, _>>()?;
1164
1165                    polars_ensure!(!column_names.is_empty(),
1166                        SQLSyntax:
1167                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1168                    );
1169                    if column_names.len() != column_values.len() {
1170                        let plural = if column_values.len() > 1 { "s" } else { "" };
1171                        polars_bail!(
1172                            SQLSyntax:
1173                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1174                        );
1175                    }
1176                    let column_series: Vec<Column> = column_values
1177                        .into_iter()
1178                        .zip(column_names)
1179                        .map(|(s, name)| {
1180                            if let Some(name) = name {
1181                                s.clone().with_name(name)
1182                            } else {
1183                                s.clone()
1184                            }
1185                        })
1186                        .map(Column::from)
1187                        .collect();
1188
1189                    let lf = DataFrame::new(column_series)?.lazy();
1190                    if *with_offset {
1191                        // TODO: support 'WITH ORDINALITY' modifier.
1192                        //  (note that 'WITH OFFSET' is BigQuery-specific syntax, not PostgreSQL)
1193                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH OFFSET/ORDINALITY");
1194                    }
1195                    self.table_map.insert(table_name.clone(), lf.clone());
1196                    Ok((table_name.clone(), lf))
1197                } else {
1198                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1199                }
1200            },
1201            TableFactor::NestedJoin {
1202                table_with_joins,
1203                alias,
1204            } => {
1205                let lf = self.execute_from_statement(table_with_joins)?;
1206                match alias {
1207                    Some(a) => Ok((a.name.value.clone(), lf)),
1208                    None => Ok(("".to_string(), lf)),
1209                }
1210            },
1211            // Support bare table, optionally with an alias, for now
1212            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1213        }
1214    }
1215
1216    fn execute_table_function(
1217        &mut self,
1218        name: &ObjectName,
1219        alias: &Option<TableAlias>,
1220        args: &[FunctionArg],
1221    ) -> PolarsResult<(String, LazyFrame)> {
1222        let tbl_fn = name.0.first().unwrap().value.as_str();
1223        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1224        let (tbl_name, lf) = read_fn.execute(args)?;
1225        #[allow(clippy::useless_asref)]
1226        let tbl_name = alias
1227            .as_ref()
1228            .map(|a| a.name.value.clone())
1229            .unwrap_or_else(|| tbl_name);
1230
1231        self.table_map.insert(tbl_name.clone(), lf.clone());
1232        Ok((tbl_name, lf))
1233    }
1234
1235    fn process_order_by(
1236        &mut self,
1237        mut lf: LazyFrame,
1238        order_by: &Option<OrderBy>,
1239        selected: Option<&[Expr]>,
1240    ) -> PolarsResult<LazyFrame> {
1241        if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1242            return Ok(lf);
1243        }
1244        let schema = self.get_frame_schema(&mut lf)?;
1245        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1246
1247        let order_by = order_by.as_ref().unwrap().exprs.clone();
1248        let mut descending = Vec::with_capacity(order_by.len());
1249        let mut nulls_last = Vec::with_capacity(order_by.len());
1250        let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1251
1252        if order_by.len() == 1  // support `ORDER BY ALL` (iff there is no column named 'ALL' in the schema)
1253            && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1254        {
1255            if let Some(selected) = selected {
1256                by.extend(selected.iter().cloned());
1257            } else {
1258                by.extend(columns_iter);
1259            };
1260            let desc_order = !order_by[0].asc.unwrap_or(true);
1261            nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1262            descending.resize(by.len(), desc_order);
1263        } else {
1264            let columns = &columns_iter.collect::<Vec<_>>();
1265            for ob in order_by {
1266                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1267                // https://www.postgresql.org/docs/current/queries-order.html
1268                let desc_order = !ob.asc.unwrap_or(true);
1269                nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1270                descending.push(desc_order);
1271
1272                // translate order expression, allowing ordinal values
1273                by.push(self.expr_or_ordinal(
1274                    &ob.expr,
1275                    columns,
1276                    selected,
1277                    Some(&schema),
1278                    "ORDER BY",
1279                )?)
1280            }
1281        }
1282        Ok(lf.sort_by_exprs(
1283            &by,
1284            SortMultipleOptions::default()
1285                .with_order_descending_multi(descending)
1286                .with_nulls_last_multi(nulls_last)
1287                .with_maintain_order(true),
1288        ))
1289    }
1290
1291    fn process_group_by(
1292        &mut self,
1293        mut lf: LazyFrame,
1294        group_by_keys: &[Expr],
1295        projections: &[Expr],
1296    ) -> PolarsResult<LazyFrame> {
1297        let schema_before = self.get_frame_schema(&mut lf)?;
1298        let group_by_keys_schema =
1299            expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1300
1301        // Remove the group_by keys as polars adds those implicitly.
1302        let mut aggregation_projection = Vec::with_capacity(projections.len());
1303        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1304        let mut projection_aliases = PlHashSet::new();
1305        let mut group_key_aliases = PlHashSet::new();
1306
1307        for mut e in projections {
1308            // `Len` represents COUNT(*) so we treat as an aggregation here.
1309            let is_agg_or_window = has_expr(e, |e| {
1310                matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1311            });
1312
1313            // Note: if simple aliased expression we defer aliasing until after the group_by.
1314            if let Expr::Alias(expr, alias) = e {
1315                if e.clone().meta().is_simple_projection() {
1316                    group_key_aliases.insert(alias.as_ref());
1317                    e = expr
1318                } else if let Expr::Function {
1319                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1320                    ..
1321                } = expr.deref()
1322                {
1323                    projection_overrides
1324                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1325                } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1326                    projection_aliases.insert(alias.as_ref());
1327                }
1328            }
1329            let field = e.to_field(&schema_before, Context::Default)?;
1330            if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1331                let mut e = e.clone();
1332                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1333                    e = (**expr).clone();
1334                } else if let Expr::Alias(expr, name) = &e {
1335                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1336                        e = (**expr).clone().alias(name.clone());
1337                    }
1338                }
1339                aggregation_projection.push(e);
1340            } else if let Expr::Column(_)
1341            | Expr::Function {
1342                function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1343                ..
1344            } = e
1345            {
1346                // Non-aggregated columns must be part of the GROUP BY clause
1347                if !group_by_keys_schema.contains(&field.name) {
1348                    polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1349                }
1350            }
1351        }
1352        let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1353        let projection_schema =
1354            expressions_to_schema(projections, &schema_before, Context::Default)?;
1355
1356        // A final projection to get the proper order and any deferred transforms/aliases.
1357        let final_projection = projection_schema
1358            .iter_names()
1359            .zip(projections)
1360            .map(|(name, projection_expr)| {
1361                if let Some(expr) = projection_overrides.get(name.as_str()) {
1362                    expr.clone()
1363                } else if group_by_keys_schema.get(name).is_some()
1364                    || projection_aliases.contains(name.as_str())
1365                    || group_key_aliases.contains(name.as_str())
1366                {
1367                    projection_expr.clone()
1368                } else {
1369                    col(name.clone())
1370                }
1371            })
1372            .collect::<Vec<_>>();
1373
1374        Ok(aggregated.select(&final_projection))
1375    }
1376
1377    fn process_limit_offset(
1378        &self,
1379        lf: LazyFrame,
1380        limit: &Option<SQLExpr>,
1381        offset: &Option<Offset>,
1382    ) -> PolarsResult<LazyFrame> {
1383        match (offset, limit) {
1384            (
1385                Some(Offset {
1386                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1387                    ..
1388                }),
1389                Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1390            ) => Ok(lf.slice(
1391                offset
1392                    .parse()
1393                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1394                limit
1395                    .parse()
1396                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1397            )),
1398            (
1399                Some(Offset {
1400                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1401                    ..
1402                }),
1403                None,
1404            ) => Ok(lf.slice(
1405                offset
1406                    .parse()
1407                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1408                IdxSize::MAX,
1409            )),
1410            (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1411                limit
1412                    .parse()
1413                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1414            )),
1415            (None, None) => Ok(lf),
1416            _ => polars_bail!(
1417                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1418            ),
1419        }
1420    }
1421
1422    fn process_qualified_wildcard(
1423        &mut self,
1424        ObjectName(idents): &ObjectName,
1425        options: &WildcardAdditionalOptions,
1426        modifiers: &mut SelectModifiers,
1427        schema: Option<&Schema>,
1428    ) -> PolarsResult<Vec<Expr>> {
1429        let mut new_idents = idents.clone();
1430        new_idents.push(Ident::new("*"));
1431
1432        let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1433        self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1434    }
1435
1436    fn process_wildcard_additional_options(
1437        &mut self,
1438        exprs: Vec<Expr>,
1439        options: &WildcardAdditionalOptions,
1440        modifiers: &mut SelectModifiers,
1441        schema: Option<&Schema>,
1442    ) -> PolarsResult<Vec<Expr>> {
1443        if options.opt_except.is_some() && options.opt_exclude.is_some() {
1444            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1445        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1446            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1447        }
1448
1449        // SELECT * EXCLUDE
1450        if let Some(items) = &options.opt_exclude {
1451            match items {
1452                ExcludeSelectItem::Single(ident) => {
1453                    modifiers.exclude.insert(ident.value.clone());
1454                },
1455                ExcludeSelectItem::Multiple(idents) => {
1456                    modifiers
1457                        .exclude
1458                        .extend(idents.iter().map(|i| i.value.clone()));
1459                },
1460            };
1461        }
1462
1463        // SELECT * EXCEPT
1464        if let Some(items) = &options.opt_except {
1465            modifiers.exclude.insert(items.first_element.value.clone());
1466            modifiers
1467                .exclude
1468                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1469        }
1470
1471        // SELECT * ILIKE
1472        if let Some(item) = &options.opt_ilike {
1473            let rx = regex::escape(item.pattern.as_str())
1474                .replace('%', ".*")
1475                .replace('_', ".");
1476
1477            modifiers.ilike = Some(
1478                polars_utils::regex_cache::compile_regex(format!("^(?is){}$", rx).as_str())
1479                    .unwrap(),
1480            );
1481        }
1482
1483        // SELECT * RENAME
1484        if let Some(items) = &options.opt_rename {
1485            let renames = match items {
1486                RenameSelectItem::Single(rename) => vec![rename],
1487                RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1488            };
1489            for rn in renames {
1490                let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1491                let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1492                if before != after {
1493                    modifiers.rename.insert(before, after);
1494                }
1495            }
1496        }
1497
1498        // SELECT * REPLACE
1499        if let Some(replacements) = &options.opt_replace {
1500            for rp in &replacements.items {
1501                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1502                modifiers
1503                    .replace
1504                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1505            }
1506        }
1507        Ok(exprs)
1508    }
1509
1510    fn rename_columns_from_table_alias(
1511        &mut self,
1512        mut lf: LazyFrame,
1513        alias: &TableAlias,
1514    ) -> PolarsResult<LazyFrame> {
1515        if alias.columns.is_empty() {
1516            Ok(lf)
1517        } else {
1518            let schema = self.get_frame_schema(&mut lf)?;
1519            if alias.columns.len() != schema.len() {
1520                polars_bail!(
1521                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1522                    alias.columns.len(), alias.name.value, schema.len()
1523                )
1524            } else {
1525                let existing_columns: Vec<_> = schema.iter_names().collect();
1526                let new_columns: Vec<_> =
1527                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
1528                Ok(lf.rename(existing_columns, new_columns, true))
1529            }
1530        }
1531    }
1532}
1533
1534impl SQLContext {
1535    /// Get internal table map. For internal use only.
1536    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1537        self.table_map.clone()
1538    }
1539
1540    /// Create a new SQLContext from a table map. For internal use only
1541    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1542        Self {
1543            table_map,
1544            ..Default::default()
1545        }
1546    }
1547}
1548
1549fn collect_compound_identifiers(
1550    left: &[Ident],
1551    right: &[Ident],
1552    left_name: &str,
1553    right_name: &str,
1554) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1555    if left.len() == 2 && right.len() == 2 {
1556        let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1557        let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1558
1559        // switch left/right operands if the caller has them in reverse
1560        if left_name == tbl_b || right_name == tbl_a {
1561            Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1562        } else {
1563            Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1564        }
1565    } else {
1566        polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1567    }
1568}
1569
1570fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1571    match expr {
1572        Expr::Wildcard => schema
1573            .iter_names()
1574            .map(|name| col(name.clone()))
1575            .collect::<Vec<_>>(),
1576        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1577            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1578            schema
1579                .iter_names()
1580                .filter(|name| re.is_match(name))
1581                .map(|name| col(name.clone()))
1582                .collect::<Vec<_>>()
1583        },
1584        Expr::Columns(names) => names
1585            .iter()
1586            .map(|name| col(name.clone()))
1587            .collect::<Vec<_>>(),
1588        _ => vec![expr],
1589    }
1590}
1591
1592fn is_regex_colname(nm: &str) -> bool {
1593    nm.starts_with('^') && nm.ends_with('$')
1594}
1595
1596fn process_join_on(
1597    expression: &sqlparser::ast::Expr,
1598    tbl_left: &TableInfo,
1599    tbl_right: &TableInfo,
1600) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1601    match expression {
1602        SQLExpr::BinaryOp { left, op, right } => match op {
1603            BinaryOperator::And => {
1604                let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1605                let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1606                left_i.append(&mut left_j);
1607                right_i.append(&mut right_j);
1608                Ok((left_i, right_i))
1609            },
1610            BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1611                (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1612                    collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1613                },
1614                _ => {
1615                    polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1616                },
1617            },
1618            _ => {
1619                polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1620            },
1621        },
1622        SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1623        _ => {
1624            polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1625        },
1626    }
1627}
1628
1629fn process_join_constraint(
1630    constraint: &JoinConstraint,
1631    tbl_left: &TableInfo,
1632    tbl_right: &TableInfo,
1633) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1634    match constraint {
1635        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1636            process_join_on(expr, tbl_left, tbl_right)
1637        },
1638        JoinConstraint::Using(idents) if !idents.is_empty() => {
1639            let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1640            Ok((using.clone(), using))
1641        },
1642        JoinConstraint::Natural => {
1643            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1644            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1645            let on: Vec<Expr> = left_names
1646                .intersection(&right_names)
1647                .map(|&name| col(name.clone()))
1648                .collect();
1649            if on.is_empty() {
1650                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1651            }
1652            Ok((on.clone(), on))
1653        },
1654        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1655    }
1656}
1657
1658bitflags::bitflags! {
1659    /// Bitfield indicating whether there exists a projection with the specified height behavior.
1660    ///
1661    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
1662    /// context.
1663    #[derive(PartialEq)]
1664    struct ExprSqlProjectionHeightBehavior: u8 {
1665        /// Maintains the height of input column(s)
1666        const MaintainsColumn = 1 << 0;
1667        /// Height is independent of input, e.g.:
1668        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
1669        /// * aggregations: count(*), first(), sum() etc.
1670        const Independent = 1 << 1;
1671        /// "Inherits" the height of the context, e.g.:
1672        /// * Scalar literals
1673        const InheritsContext = 1 << 2;
1674    }
1675}
1676
1677impl ExprSqlProjectionHeightBehavior {
1678    fn identify_from_expr(expr: &Expr) -> Self {
1679        let mut has_column = false;
1680        let mut has_independent = false;
1681
1682        for e in expr.into_iter() {
1683            use Expr::*;
1684
1685            has_column |= matches!(e, Column(_) | Columns(_) | DtypeColumn(_) | IndexColumn(_));
1686
1687            has_independent |= match e {
1688                Function { options, .. } | AnonymousFunction { options, .. } => {
1689                    options.returns_scalar() || !options.is_length_preserving()
1690                },
1691
1692                Literal(v) => !v.is_scalar(),
1693
1694                Explode(_) | Filter { .. } | Gather { .. } | Slice { .. } => true,
1695
1696                Agg { .. } | Len => true,
1697
1698                _ => false,
1699            }
1700        }
1701
1702        if has_independent {
1703            Self::Independent
1704        } else if has_column {
1705            Self::MaintainsColumn
1706        } else {
1707            Self::InheritsContext
1708        }
1709    }
1710}