polars_sql/
context.rs

1use std::ops::Deref;
2
3use polars_core::frame::row::Row;
4use polars_core::prelude::*;
5use polars_lazy::prelude::*;
6use polars_ops::frame::JoinCoalesce;
7use polars_plan::dsl::function_expr::StructFunction;
8use polars_plan::prelude::*;
9use polars_utils::format_pl_smallstr;
10use sqlparser::ast::{
11    BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
12    FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
13    OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
14    Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
15    WildcardAdditionalOptions,
16};
17use sqlparser::dialect::GenericDialect;
18use sqlparser::parser::{Parser, ParserOptions};
19
20use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
21use crate::sql_expr::{
22    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
23};
24use crate::table_functions::PolarsTableFunctions;
25
26#[derive(Clone)]
27pub struct TableInfo {
28    pub(crate) frame: LazyFrame,
29    pub(crate) name: PlSmallStr,
30    pub(crate) schema: Arc<Schema>,
31}
32
33struct SelectModifiers {
34    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
35    ilike: Option<regex::Regex>,               // SELECT * ILIKE
36    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
37    replace: Vec<Expr>,                        // SELECT * REPLACE
38}
39impl SelectModifiers {
40    fn matches_ilike(&self, s: &str) -> bool {
41        match &self.ilike {
42            Some(rx) => rx.is_match(s),
43            None => true,
44        }
45    }
46    fn renamed_cols(&self) -> Vec<Expr> {
47        self.rename
48            .iter()
49            .map(|(before, after)| col(before.clone()).alias(after.clone()))
50            .collect()
51    }
52}
53
54/// The SQLContext is the main entry point for executing SQL queries.
55#[derive(Clone)]
56pub struct SQLContext {
57    pub(crate) table_map: PlHashMap<String, LazyFrame>,
58    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
59    pub(crate) lp_arena: Arena<IR>,
60    pub(crate) expr_arena: Arena<AExpr>,
61
62    cte_map: PlHashMap<String, LazyFrame>,
63    table_aliases: PlHashMap<String, String>,
64    joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
65}
66
67impl Default for SQLContext {
68    fn default() -> Self {
69        Self {
70            function_registry: Arc::new(DefaultFunctionRegistry {}),
71            table_map: Default::default(),
72            cte_map: Default::default(),
73            table_aliases: Default::default(),
74            joined_aliases: Default::default(),
75            lp_arena: Default::default(),
76            expr_arena: Default::default(),
77        }
78    }
79}
80
81impl SQLContext {
82    /// Create a new SQLContext.
83    /// ```rust
84    /// # use polars_sql::SQLContext;
85    /// # fn main() {
86    /// let ctx = SQLContext::new();
87    /// # }
88    /// ```
89    pub fn new() -> Self {
90        Self::default()
91    }
92
93    /// Get the names of all registered tables, in sorted order.
94    pub fn get_tables(&self) -> Vec<String> {
95        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
96        tables.sort_unstable();
97        tables
98    }
99
100    /// Register a [`LazyFrame`] as a table in the SQLContext.
101    /// ```rust
102    /// # use polars_sql::SQLContext;
103    /// # use polars_core::prelude::*;
104    /// # use polars_lazy::prelude::*;
105    /// # fn main() {
106    ///
107    /// let mut ctx = SQLContext::new();
108    /// let df = df! {
109    ///    "a" =>  [1, 2, 3],
110    /// }.unwrap().lazy();
111    ///
112    /// ctx.register("df", df);
113    /// # }
114    ///```
115    pub fn register(&mut self, name: &str, lf: LazyFrame) {
116        self.table_map.insert(name.to_owned(), lf);
117    }
118
119    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
120    pub fn unregister(&mut self, name: &str) {
121        self.table_map.remove(&name.to_owned());
122    }
123
124    /// Execute a SQL query, returning a [`LazyFrame`].
125    /// ```rust
126    /// # use polars_sql::SQLContext;
127    /// # use polars_core::prelude::*;
128    /// # use polars_lazy::prelude::*;
129    /// # fn main() {
130    ///
131    /// let mut ctx = SQLContext::new();
132    /// let df = df! {
133    ///    "a" =>  [1, 2, 3],
134    /// }
135    /// .unwrap();
136    ///
137    /// ctx.register("df", df.clone().lazy());
138    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
139    /// assert!(sql_df.equals(&df));
140    /// # }
141    ///```
142    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
143        let mut parser = Parser::new(&GenericDialect);
144        parser = parser.with_options(ParserOptions {
145            trailing_commas: true,
146            ..Default::default()
147        });
148
149        let ast = parser
150            .try_with_sql(query)
151            .map_err(to_sql_interface_err)?
152            .parse_statements()
153            .map_err(to_sql_interface_err)?;
154
155        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
156        let res = self.execute_statement(ast.first().unwrap())?;
157
158        // Ensure the result uses the proper arenas.
159        // This will instantiate new arenas with a new version.
160        let lp_arena = std::mem::take(&mut self.lp_arena);
161        let expr_arena = std::mem::take(&mut self.expr_arena);
162        res.set_cached_arena(lp_arena, expr_arena);
163
164        // Every execution should clear the statement-level maps.
165        self.cte_map.clear();
166        self.table_aliases.clear();
167        self.joined_aliases.clear();
168
169        Ok(res)
170    }
171
172    /// add a function registry to the SQLContext
173    /// the registry provides the ability to add custom functions to the SQLContext
174    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
175        self.function_registry = function_registry;
176        self
177    }
178
179    /// Get the function registry of the SQLContext
180    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
181        &self.function_registry
182    }
183
184    /// Get a mutable reference to the function registry of the SQLContext
185    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
186        Arc::get_mut(&mut self.function_registry).unwrap()
187    }
188}
189
190impl SQLContext {
191    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
192        let ast = stmt;
193        Ok(match ast {
194            Statement::Query(query) => self.execute_query(query)?,
195            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
196            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
197            stmt @ Statement::Drop {
198                object_type: ObjectType::Table,
199                ..
200            } => self.execute_drop_table(stmt)?,
201            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
202            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
203            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
204            _ => polars_bail!(
205                SQLInterface: "statement type is not supported:\n{:?}", ast,
206            ),
207        })
208    }
209
210    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
211        self.register_ctes(query)?;
212        self.execute_query_no_ctes(query)
213    }
214
215    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
216        let lf = self.process_query(&query.body, query)?;
217        self.process_limit_offset(lf, &query.limit, &query.offset)
218    }
219
220    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
221        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
222    }
223
224    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
225        let table = self.table_map.get(name).cloned();
226        table
227            .or_else(|| self.cte_map.get(name).cloned())
228            .or_else(|| {
229                self.table_aliases
230                    .get(name)
231                    .and_then(|alias| self.table_map.get(alias).cloned())
232            })
233    }
234
235    fn expr_or_ordinal(
236        &mut self,
237        e: &SQLExpr,
238        exprs: &[Expr],
239        selected: Option<&[Expr]>,
240        schema: Option<&Schema>,
241        clause: &str,
242    ) -> PolarsResult<Expr> {
243        match e {
244            SQLExpr::UnaryOp {
245                op: UnaryOperator::Minus,
246                expr,
247            } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
248                if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
249                    Err(polars_err!(
250                    SQLSyntax:
251                    "negative ordinal values are invalid for {}; found -{}",
252                    clause,
253                    idx
254                    ))
255                } else {
256                    unreachable!()
257                }
258            },
259            SQLExpr::Value(SQLValue::Number(idx, _)) => {
260                // note: sql queries are 1-indexed
261                let idx = idx.parse::<usize>().map_err(|_| {
262                    polars_err!(
263                        SQLSyntax:
264                        "negative ordinal values are invalid for {}; found {}",
265                        clause,
266                        idx
267                    )
268                })?;
269                // note: "selected" cols represent final projection order, so we use those for
270                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
271                let cols = if let Some(cols) = selected {
272                    cols
273                } else {
274                    exprs
275                };
276                Ok(cols
277                    .get(idx - 1)
278                    .ok_or_else(|| {
279                        polars_err!(
280                            SQLInterface:
281                            "{} ordinal value must refer to a valid column; found {}",
282                            clause,
283                            idx
284                        )
285                    })?
286                    .clone())
287            },
288            SQLExpr::Value(v) => Err(polars_err!(
289                SQLSyntax:
290                "{} requires a valid expression or positive ordinal; found {}", clause, v,
291            )),
292            _ => parse_sql_expr(e, self, schema),
293        }
294    }
295
296    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
297        if let Some(aliases) = self.joined_aliases.get(tbl_name) {
298            if let Some(name) = aliases.get(column_name) {
299                return name.to_string();
300            }
301        }
302        column_name.to_string()
303    }
304
305    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
306        match expr {
307            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
308            SetExpr::Query(query) => self.execute_query_no_ctes(query),
309            SetExpr::SetOperation {
310                op: SetOperator::Union,
311                set_quantifier,
312                left,
313                right,
314            } => self.process_union(left, right, set_quantifier, query),
315
316            #[cfg(feature = "semi_anti_join")]
317            SetExpr::SetOperation {
318                op: SetOperator::Intersect | SetOperator::Except,
319                set_quantifier,
320                left,
321                right,
322            } => self.process_except_intersect(left, right, set_quantifier, query),
323
324            SetExpr::Values(Values {
325                explicit_row: _,
326                rows,
327            }) => self.process_values(rows),
328
329            SetExpr::Table(tbl) => {
330                if tbl.table_name.is_some() {
331                    let table_name = tbl.table_name.as_ref().unwrap();
332                    self.get_table_from_current_scope(table_name)
333                        .ok_or_else(|| {
334                            polars_err!(
335                                SQLInterface: "no table or alias named '{}' found",
336                                tbl
337                            )
338                        })
339                } else {
340                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
341                }
342            },
343            op => {
344                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
345            },
346        }
347    }
348
349    #[cfg(feature = "semi_anti_join")]
350    fn process_except_intersect(
351        &mut self,
352        left: &SetExpr,
353        right: &SetExpr,
354        quantifier: &SetQuantifier,
355        query: &Query,
356    ) -> PolarsResult<LazyFrame> {
357        let (join_type, op_name) = match *query.body {
358            SetExpr::SetOperation {
359                op: SetOperator::Except,
360                ..
361            } => (JoinType::Anti, "EXCEPT"),
362            _ => (JoinType::Semi, "INTERSECT"),
363        };
364        let mut lf = self.process_query(left, query)?;
365        let mut rf = self.process_query(right, query)?;
366        let join = lf
367            .clone()
368            .join_builder()
369            .with(rf.clone())
370            .how(join_type)
371            .join_nulls(true);
372
373        let lf_schema = self.get_frame_schema(&mut lf)?;
374        let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
375        let joined_tbl = match quantifier {
376            SetQuantifier::ByName => join.on(lf_cols).finish(),
377            SetQuantifier::Distinct | SetQuantifier::None => {
378                let rf_schema = self.get_frame_schema(&mut rf)?;
379                let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
380                if lf_cols.len() != rf_cols.len() {
381                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
382                }
383                join.left_on(lf_cols).right_on(rf_cols).finish()
384            },
385            _ => {
386                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
387            },
388        };
389        Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
390    }
391
392    fn process_union(
393        &mut self,
394        left: &SetExpr,
395        right: &SetExpr,
396        quantifier: &SetQuantifier,
397        query: &Query,
398    ) -> PolarsResult<LazyFrame> {
399        let mut lf = self.process_query(left, query)?;
400        let mut rf = self.process_query(right, query)?;
401        let opts = UnionArgs {
402            parallel: true,
403            to_supertypes: true,
404            ..Default::default()
405        };
406        match quantifier {
407            // UNION [ALL | DISTINCT]
408            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
409                let lf_schema = self.get_frame_schema(&mut lf)?;
410                let rf_schema = self.get_frame_schema(&mut rf)?;
411                if lf_schema.len() != rf_schema.len() {
412                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
413                }
414                let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
415                match quantifier {
416                    SetQuantifier::Distinct | SetQuantifier::None => {
417                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
418                    },
419                    _ => concatenated,
420                }
421            },
422            // UNION ALL BY NAME
423            #[cfg(feature = "diagonal_concat")]
424            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
425            // UNION [DISTINCT] BY NAME
426            #[cfg(feature = "diagonal_concat")]
427            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
428                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
429                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
430            },
431            #[allow(unreachable_patterns)]
432            _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
433        }
434    }
435
436    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
437        let frame_rows: Vec<Row> = values.iter().map(|row| {
438            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
439                let expr = parse_sql_expr(expr, self, None)?;
440                match expr {
441                    Expr::Literal(value) => {
442                        value.to_any_value()
443                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
444                            .map(|av| av.into_static())
445                    },
446                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
447                }
448            }).collect();
449            row_data.map(Row::new)
450        }).collect::<Result<_, _>>()?;
451
452        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
453    }
454
455    // EXPLAIN SELECT * FROM DF
456    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
457        match stmt {
458            Statement::Explain { statement, .. } => {
459                let lf = self.execute_statement(statement)?;
460                let plan = lf.describe_optimized_plan()?;
461                let plan = plan
462                    .split('\n')
463                    .collect::<Series>()
464                    .with_name(PlSmallStr::from_static("Logical Plan"))
465                    .into_column();
466                let df = DataFrame::new(vec![plan])?;
467                Ok(df.lazy())
468            },
469            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
470        }
471    }
472
473    // SHOW TABLES
474    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
475        let tables = Column::new("name".into(), self.get_tables());
476        let df = DataFrame::new(vec![tables])?;
477        Ok(df.lazy())
478    }
479
480    // DROP TABLE <tbl>
481    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
482        match stmt {
483            Statement::Drop { names, .. } => {
484                names.iter().for_each(|name| {
485                    self.table_map.remove(&name.to_string());
486                });
487                Ok(DataFrame::empty().lazy())
488            },
489            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
490        }
491    }
492
493    // DELETE FROM <tbl> [WHERE ...]
494    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
495        if let Statement::Delete(Delete {
496            tables,
497            from,
498            using,
499            selection,
500            returning,
501            order_by,
502            limit,
503        }) = stmt
504        {
505            if !tables.is_empty()
506                || using.is_some()
507                || returning.is_some()
508                || limit.is_some()
509                || !order_by.is_empty()
510            {
511                let error_message = match () {
512                    _ if !tables.is_empty() => "DELETE expects exactly one table name",
513                    _ if using.is_some() => "DELETE does not support the USING clause",
514                    _ if returning.is_some() => "DELETE does not support the RETURNING clause",
515                    _ if limit.is_some() => "DELETE does not support the LIMIT clause",
516                    _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
517                    _ => unreachable!(),
518                };
519                polars_bail!(SQLInterface: error_message);
520            }
521            let from_tables = match &from {
522                FromTable::WithFromKeyword(from) => from,
523                FromTable::WithoutKeyword(from) => from,
524            };
525            if from_tables.len() > 1 {
526                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
527            }
528            let tbl_expr = from_tables.first().unwrap();
529            if !tbl_expr.joins.is_empty() {
530                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
531            }
532            let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
533            if selection.is_none() {
534                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
535                Ok(DataFrame::empty_with_schema(
536                    lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
537                        .unwrap()
538                        .as_ref(),
539                )
540                .lazy())
541            } else {
542                // apply constraint as inverted filter (drops rows matching the selection)
543                Ok(self.process_where(lf.clone(), selection, true)?)
544            }
545        } else {
546            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
547        }
548    }
549
550    // TRUNCATE <tbl>
551    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
552        if let Statement::Truncate {
553            table_names,
554            partitions,
555            ..
556        } = stmt
557        {
558            match partitions {
559                None => {
560                    if table_names.len() != 1 {
561                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
562                    }
563                    let tbl = table_names[0].to_string();
564                    if let Some(lf) = self.table_map.get_mut(&tbl) {
565                        *lf = DataFrame::empty_with_schema(
566                            lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
567                                .unwrap()
568                                .as_ref(),
569                        )
570                        .lazy();
571                        Ok(lf.clone())
572                    } else {
573                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
574                    }
575                },
576                _ => {
577                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
578                },
579            }
580        } else {
581            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
582        }
583    }
584
585    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
586        self.cte_map.insert(name.to_owned(), lf);
587    }
588
589    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
590        if let Some(with) = &query.with {
591            if with.recursive {
592                polars_bail!(SQLInterface: "recursive CTEs are not supported")
593            }
594            for cte in &with.cte_tables {
595                let cte_name = cte.alias.name.value.clone();
596                let mut lf = self.execute_query(&cte.query)?;
597                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
598                self.register_cte(&cte_name, lf);
599            }
600        }
601        Ok(())
602    }
603
604    /// execute the 'FROM' part of the query
605    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
606        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
607        if !tbl_expr.joins.is_empty() {
608            for join in &tbl_expr.joins {
609                let (r_name, mut rf) = self.get_table(&join.relation)?;
610                if r_name.is_empty() {
611                    // Require non-empty to avoid duplicate column errors from nested self-joins.
612                    polars_bail!(
613                        SQLInterface:
614                        "cannot join on unnamed relation; please provide an alias"
615                    )
616                }
617                let left_schema = self.get_frame_schema(&mut lf)?;
618                let right_schema = self.get_frame_schema(&mut rf)?;
619
620                lf = match &join.join_operator {
621                    op @ (JoinOperator::FullOuter(constraint)
622                    | JoinOperator::LeftOuter(constraint)
623                    | JoinOperator::RightOuter(constraint)
624                    | JoinOperator::Inner(constraint)
625                    | JoinOperator::Anti(constraint)
626                    | JoinOperator::Semi(constraint)
627                    | JoinOperator::LeftAnti(constraint)
628                    | JoinOperator::LeftSemi(constraint)
629                    | JoinOperator::RightAnti(constraint)
630                    | JoinOperator::RightSemi(constraint)) => {
631                        let (lf, rf) = match op {
632                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
633                            _ => (lf, rf),
634                        };
635                        self.process_join(
636                            &TableInfo {
637                                frame: lf,
638                                name: (&l_name).into(),
639                                schema: left_schema.clone(),
640                            },
641                            &TableInfo {
642                                frame: rf,
643                                name: (&r_name).into(),
644                                schema: right_schema.clone(),
645                            },
646                            constraint,
647                            match op {
648                                JoinOperator::FullOuter(_) => JoinType::Full,
649                                JoinOperator::LeftOuter(_) => JoinType::Left,
650                                JoinOperator::RightOuter(_) => JoinType::Right,
651                                JoinOperator::Inner(_) => JoinType::Inner,
652                                #[cfg(feature = "semi_anti_join")]
653                                JoinOperator::Anti(_)
654                                | JoinOperator::LeftAnti(_)
655                                | JoinOperator::RightAnti(_) => JoinType::Anti,
656                                #[cfg(feature = "semi_anti_join")]
657                                JoinOperator::Semi(_)
658                                | JoinOperator::LeftSemi(_)
659                                | JoinOperator::RightSemi(_) => JoinType::Semi,
660                                join_type => polars_bail!(
661                                    SQLInterface:
662                                    "join type '{:?}' not currently supported",
663                                    join_type
664                                ),
665                            },
666                        )?
667                    },
668                    JoinOperator::CrossJoin => {
669                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
670                    },
671                    join_type => {
672                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
673                    },
674                };
675
676                // track join-aliased columns so we can resolve them later
677                let joined_schema = self.get_frame_schema(&mut lf)?;
678
679                self.joined_aliases.insert(
680                    r_name.clone(),
681                    right_schema
682                        .iter_names()
683                        .filter_map(|name| {
684                            // col exists in both tables and is aliased in the joined result
685                            let aliased_name = format!("{name}:{r_name}");
686                            if left_schema.contains(name)
687                                && joined_schema.contains(aliased_name.as_str())
688                            {
689                                Some((name.to_string(), aliased_name))
690                            } else {
691                                None
692                            }
693                        })
694                        .collect::<PlHashMap<String, String>>(),
695                );
696            }
697        };
698        Ok(lf)
699    }
700
701    /// Execute the 'SELECT' part of the query.
702    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
703        let mut lf = if select_stmt.from.is_empty() {
704            DataFrame::empty().lazy()
705        } else {
706            // Note: implicit joins need more work to support properly,
707            // explicit joins are preferred for now (ref: #16662)
708            let from = select_stmt.clone().from;
709            if from.len() > 1 {
710                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
711            }
712            self.execute_from_statement(from.first().unwrap())?
713        };
714
715        // Filter expression (WHERE clause)
716        let schema = self.get_frame_schema(&mut lf)?;
717        lf = self.process_where(lf, &select_stmt.selection, false)?;
718
719        // 'SELECT *' modifiers
720        let mut select_modifiers = SelectModifiers {
721            ilike: None,
722            exclude: PlHashSet::new(),
723            rename: PlHashMap::new(),
724            replace: vec![],
725        };
726
727        let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
728
729        // Check for "GROUP BY ..." (after determining projections)
730        let mut group_by_keys: Vec<Expr> = Vec::new();
731        match &select_stmt.group_by {
732            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
733            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
734                if !modifiers.is_empty() {
735                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
736                }
737                // translate the group expressions, allowing ordinal values
738                group_by_keys = group_by_exprs
739                    .iter()
740                    .map(|e| {
741                        self.expr_or_ordinal(
742                            e,
743                            &projections,
744                            None,
745                            Some(schema.deref()),
746                            "GROUP BY",
747                        )
748                    })
749                    .collect::<PolarsResult<_>>()?
750            },
751            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
752            // nested agg/window funcs to the group key (also ignores literals).
753            GroupByExpr::All(modifiers) => {
754                if !modifiers.is_empty() {
755                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
756                }
757                projections.iter().for_each(|expr| match expr {
758                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
759                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
760                    Expr::Column(_) => group_by_keys.push(expr.clone()),
761                    Expr::Alias(e, _)
762                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
763                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
764                        if let Expr::Column(name) = &**e {
765                            group_by_keys.push(col(name.clone()));
766                        }
767                    },
768                    _ => {
769                        // If not quick-matched, add if no nested agg/window expressions
770                        if !has_expr(expr, |e| {
771                            matches!(e, Expr::Agg(_))
772                                || matches!(e, Expr::Len)
773                                || matches!(e, Expr::Window { .. })
774                        }) {
775                            group_by_keys.push(expr.clone())
776                        }
777                    },
778                });
779            },
780        };
781
782        lf = if group_by_keys.is_empty() {
783            // The 'having' clause is only valid inside 'group by'
784            if select_stmt.having.is_some() {
785                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
786            };
787
788            // Final/selected cols, accounting for 'SELECT *' modifiers
789            let mut retained_cols = Vec::with_capacity(projections.len());
790            let mut retained_names = Vec::with_capacity(projections.len());
791            let have_order_by = query.order_by.is_some();
792            // Initialize containing InheritsContext to handle empty projection case.
793            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
794
795            // Note: if there is an 'order by' then we project everything (original cols
796            // and new projections) and *then* select the final cols; the retained cols
797            // are used to ensure a correct final projection. If there's no 'order by',
798            // clause then we can project the final column *expressions* directly.
799            for p in projections.iter() {
800                let name = p.to_field(schema.deref())?.name.to_string();
801                if select_modifiers.matches_ilike(&name)
802                    && !select_modifiers.exclude.contains(&name)
803                {
804                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
805
806                    retained_cols.push(if have_order_by {
807                        col(name.as_str())
808                    } else {
809                        p.clone()
810                    });
811                    retained_names.push(col(name));
812                }
813            }
814
815            // Apply the remaining modifiers and establish the final projection
816            if have_order_by {
817                // We can safely use `with_columns()` and avoid a join if:
818                // * There is already a projection that projects to the table height.
819                // * All projection heights inherit from context (e.g. all scalar literals that
820                //   are to be broadcasted to table height).
821                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
822                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
823                {
824                    lf = lf.with_columns(projections);
825                } else {
826                    // We hit this branch if the output height is not guaranteed to match the table
827                    // height. E.g.:
828                    //
829                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
830                    // * SELECT UNNEST(list_col) FROM df ORDER BY sort_key;
831                    //
832                    // For these cases we truncate / extend the sorting columns with NULLs to match
833                    // the output height. We do this by projecting independently and then joining
834                    // back the original frame on the row index.
835                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
836                    lf = lf
837                        .clone()
838                        .select(projections)
839                        .with_row_index(NAME, None)
840                        .join(
841                            lf.with_row_index(NAME, None),
842                            [col(NAME)],
843                            [col(NAME)],
844                            JoinArgs {
845                                how: JoinType::Left,
846                                validation: Default::default(),
847                                suffix: None,
848                                slice: None,
849                                nulls_equal: false,
850                                coalesce: Default::default(),
851                                maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
852                            },
853                        );
854                }
855            }
856
857            if !select_modifiers.replace.is_empty() {
858                lf = lf.with_columns(&select_modifiers.replace);
859            }
860            if !select_modifiers.rename.is_empty() {
861                lf = lf.with_columns(select_modifiers.renamed_cols());
862            }
863
864            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
865
866            // Note: If `have_order_by`, with_columns is already done above.
867            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
868                && !have_order_by
869            {
870                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
871                lf = lf.with_columns(retained_cols).select(retained_names);
872            } else {
873                lf = lf.select(retained_cols);
874            }
875
876            if !select_modifiers.rename.is_empty() {
877                lf = lf.rename(
878                    select_modifiers.rename.keys(),
879                    select_modifiers.rename.values(),
880                    true,
881                );
882            };
883            lf
884        } else {
885            lf = self.process_group_by(lf, &group_by_keys, &projections)?;
886            lf = self.process_order_by(lf, &query.order_by, None)?;
887
888            // Apply optional 'having' clause, post-aggregation.
889            let schema = Some(self.get_frame_schema(&mut lf)?);
890            match select_stmt.having.as_ref() {
891                Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
892                None => lf,
893            }
894        };
895
896        // Apply optional DISTINCT clause.
897        lf = match &select_stmt.distinct {
898            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
899            Some(Distinct::On(exprs)) => {
900                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
901                let schema = Some(self.get_frame_schema(&mut lf)?);
902                let cols = exprs
903                    .iter()
904                    .map(|e| {
905                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
906                        if let Expr::Column(name) = expr {
907                            Ok(name)
908                        } else {
909                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
910                        }
911                    })
912                    .collect::<PolarsResult<Vec<_>>>()?;
913
914                // DISTINCT ON has to apply the ORDER BY before the operation.
915                lf = self.process_order_by(lf, &query.order_by, None)?;
916                return Ok(lf.unique_stable(
917                    Some(Selector::ByName {
918                        names: cols.into(),
919                        strict: true,
920                    }),
921                    UniqueKeepStrategy::First,
922                ));
923            },
924            None => lf,
925        };
926        Ok(lf)
927    }
928
929    fn column_projections(
930        &mut self,
931        select_stmt: &Select,
932        schema: &SchemaRef,
933        select_modifiers: &mut SelectModifiers,
934    ) -> PolarsResult<Vec<Expr>> {
935        let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
936            .projection
937            .iter()
938            .map(|select_item| match select_item {
939                SelectItem::UnnamedExpr(expr) => {
940                    Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
941                },
942                SelectItem::ExprWithAlias { expr, alias } => {
943                    let expr = parse_sql_expr(expr, self, Some(schema))?;
944                    Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
945                },
946                SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
947                    .process_qualified_wildcard(
948                        obj_name,
949                        wildcard_options,
950                        select_modifiers,
951                        Some(schema),
952                    ),
953                SelectItem::Wildcard(wildcard_options) => {
954                    let cols = schema
955                        .iter_names()
956                        .map(|name| col(name.clone()))
957                        .collect::<Vec<_>>();
958
959                    self.process_wildcard_additional_options(
960                        cols,
961                        wildcard_options,
962                        select_modifiers,
963                        Some(schema),
964                    )
965                },
966            })
967            .collect();
968
969        let flattened_exprs: Vec<Expr> = parsed_items?
970            .into_iter()
971            .flatten()
972            .flat_map(|expr| expand_exprs(expr, schema))
973            .collect();
974
975        Ok(flattened_exprs)
976    }
977
978    fn process_where(
979        &mut self,
980        mut lf: LazyFrame,
981        expr: &Option<SQLExpr>,
982        invert_filter: bool,
983    ) -> PolarsResult<LazyFrame> {
984        if let Some(expr) = expr {
985            let schema = self.get_frame_schema(&mut lf)?;
986
987            // shortcut filter evaluation if given expression is just TRUE or FALSE
988            let (all_true, all_false) = match expr {
989                SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
990                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
991                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
992                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
993                        (a != b, a == b)
994                    },
995                    _ => (false, false),
996                },
997                _ => (false, false),
998            };
999            if (all_true && !invert_filter) || (all_false && invert_filter) {
1000                return Ok(lf);
1001            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1002                return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1003            }
1004
1005            // ...otherwise parse and apply the filter as normal
1006            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1007            if filter_expression.clone().meta().has_multiple_outputs() {
1008                filter_expression = all_horizontal([filter_expression])?;
1009            }
1010            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1011            lf = if invert_filter {
1012                lf.remove(filter_expression)
1013            } else {
1014                lf.filter(filter_expression)
1015            };
1016        }
1017        Ok(lf)
1018    }
1019
1020    pub(super) fn process_join(
1021        &mut self,
1022        tbl_left: &TableInfo,
1023        tbl_right: &TableInfo,
1024        constraint: &JoinConstraint,
1025        join_type: JoinType,
1026    ) -> PolarsResult<LazyFrame> {
1027        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1028
1029        let joined = tbl_left
1030            .frame
1031            .clone()
1032            .join_builder()
1033            .with(tbl_right.frame.clone())
1034            .left_on(left_on)
1035            .right_on(right_on)
1036            .how(join_type)
1037            .suffix(format!(":{}", tbl_right.name))
1038            .coalesce(JoinCoalesce::KeepColumns)
1039            .finish();
1040
1041        Ok(joined)
1042    }
1043
1044    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1045        let mut contexts = vec![];
1046        for expr in exprs {
1047            *expr = expr.clone().map_expr(|e| match e {
1048                Expr::SubPlan(lp, names) => {
1049                    contexts.push(<LazyFrame>::from((**lp).clone()));
1050                    if names.len() == 1 {
1051                        Expr::Column(names[0].as_str().into())
1052                    } else {
1053                        Expr::SubPlan(lp, names)
1054                    }
1055                },
1056                e => e,
1057            })
1058        }
1059
1060        if contexts.is_empty() {
1061            lf
1062        } else {
1063            lf.with_context(contexts)
1064        }
1065    }
1066
1067    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1068        if let Statement::CreateTable(CreateTable {
1069            if_not_exists,
1070            name,
1071            query,
1072            ..
1073        }) = stmt
1074        {
1075            let tbl_name = name.0.first().unwrap().value.as_str();
1076            // CREATE TABLE IF NOT EXISTS
1077            if *if_not_exists && self.table_map.contains_key(tbl_name) {
1078                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1079                // CREATE OR REPLACE TABLE
1080            }
1081            if let Some(query) = query {
1082                let lf = self.execute_query(query)?;
1083                self.register(tbl_name, lf);
1084                let out = df! {
1085                    "Response" => ["CREATE TABLE"]
1086                }
1087                .unwrap()
1088                .lazy();
1089                Ok(out)
1090            } else {
1091                polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1092            }
1093        } else {
1094            unreachable!()
1095        }
1096    }
1097
1098    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1099        match relation {
1100            TableFactor::Table {
1101                name, alias, args, ..
1102            } => {
1103                if let Some(args) = args {
1104                    return self.execute_table_function(name, alias, &args.args);
1105                }
1106                let tbl_name = name.0.first().unwrap().value.as_str();
1107                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1108                    match alias {
1109                        Some(alias) => {
1110                            self.table_aliases
1111                                .insert(alias.name.value.clone(), tbl_name.to_string());
1112                            Ok((alias.to_string(), lf))
1113                        },
1114                        None => Ok((tbl_name.to_string(), lf)),
1115                    }
1116                } else {
1117                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1118                }
1119            },
1120            TableFactor::Derived {
1121                lateral,
1122                subquery,
1123                alias,
1124            } => {
1125                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1126                if let Some(alias) = alias {
1127                    let mut lf = self.execute_query_no_ctes(subquery)?;
1128                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1129                    self.table_map.insert(alias.name.value.clone(), lf.clone());
1130                    Ok((alias.name.value.clone(), lf))
1131                } else {
1132                    polars_bail!(SQLSyntax: "derived tables must have aliases");
1133                }
1134            },
1135            TableFactor::UNNEST {
1136                alias,
1137                array_exprs,
1138                with_offset,
1139                with_offset_alias: _,
1140                ..
1141            } => {
1142                if let Some(alias) = alias {
1143                    let column_names: Vec<Option<PlSmallStr>> = alias
1144                        .columns
1145                        .iter()
1146                        .map(|c| {
1147                            if c.name.value.is_empty() {
1148                                None
1149                            } else {
1150                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1151                            }
1152                        })
1153                        .collect();
1154
1155                    let column_values: Vec<Series> = array_exprs
1156                        .iter()
1157                        .map(|arr| parse_sql_array(arr, self))
1158                        .collect::<Result<_, _>>()?;
1159
1160                    polars_ensure!(!column_names.is_empty(),
1161                        SQLSyntax:
1162                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1163                    );
1164                    if column_names.len() != column_values.len() {
1165                        let plural = if column_values.len() > 1 { "s" } else { "" };
1166                        polars_bail!(
1167                            SQLSyntax:
1168                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1169                        );
1170                    }
1171                    let column_series: Vec<Column> = column_values
1172                        .into_iter()
1173                        .zip(column_names)
1174                        .map(|(s, name)| {
1175                            if let Some(name) = name {
1176                                s.with_name(name)
1177                            } else {
1178                                s
1179                            }
1180                        })
1181                        .map(Column::from)
1182                        .collect();
1183
1184                    let lf = DataFrame::new(column_series)?.lazy();
1185
1186                    if *with_offset {
1187                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1188                        //  (note that 'WITH OFFSET' is BigQuery-specific syntax, not PostgreSQL)
1189                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1190                    }
1191                    let table_name = alias.name.value.clone();
1192                    self.table_map.insert(table_name.clone(), lf.clone());
1193                    Ok((table_name, lf))
1194                } else {
1195                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1196                }
1197            },
1198            TableFactor::NestedJoin {
1199                table_with_joins,
1200                alias,
1201            } => {
1202                let lf = self.execute_from_statement(table_with_joins)?;
1203                match alias {
1204                    Some(a) => Ok((a.name.value.clone(), lf)),
1205                    None => Ok(("".to_string(), lf)),
1206                }
1207            },
1208            // Support bare table, optionally with an alias, for now
1209            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1210        }
1211    }
1212
1213    fn execute_table_function(
1214        &mut self,
1215        name: &ObjectName,
1216        alias: &Option<TableAlias>,
1217        args: &[FunctionArg],
1218    ) -> PolarsResult<(String, LazyFrame)> {
1219        let tbl_fn = name.0.first().unwrap().value.as_str();
1220        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1221        let (tbl_name, lf) = read_fn.execute(args)?;
1222        #[allow(clippy::useless_asref)]
1223        let tbl_name = alias
1224            .as_ref()
1225            .map(|a| a.name.value.clone())
1226            .unwrap_or_else(|| tbl_name.to_str().to_string());
1227
1228        self.table_map.insert(tbl_name.clone(), lf.clone());
1229        Ok((tbl_name, lf))
1230    }
1231
1232    fn process_order_by(
1233        &mut self,
1234        mut lf: LazyFrame,
1235        order_by: &Option<OrderBy>,
1236        selected: Option<&[Expr]>,
1237    ) -> PolarsResult<LazyFrame> {
1238        if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1239            return Ok(lf);
1240        }
1241        let schema = self.get_frame_schema(&mut lf)?;
1242        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1243
1244        let order_by = order_by.as_ref().unwrap().exprs.clone();
1245        let mut descending = Vec::with_capacity(order_by.len());
1246        let mut nulls_last = Vec::with_capacity(order_by.len());
1247        let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1248
1249        if order_by.len() == 1  // support `ORDER BY ALL` (iff there is no column named 'ALL' in the schema)
1250            && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1251        {
1252            if let Some(selected) = selected {
1253                by.extend(selected.iter().cloned());
1254            } else {
1255                by.extend(columns_iter);
1256            };
1257            let desc_order = !order_by[0].asc.unwrap_or(true);
1258            nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1259            descending.resize(by.len(), desc_order);
1260        } else {
1261            let columns = &columns_iter.collect::<Vec<_>>();
1262            for ob in order_by {
1263                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1264                // https://www.postgresql.org/docs/current/queries-order.html
1265                let desc_order = !ob.asc.unwrap_or(true);
1266                nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1267                descending.push(desc_order);
1268
1269                // translate order expression, allowing ordinal values
1270                by.push(self.expr_or_ordinal(
1271                    &ob.expr,
1272                    columns,
1273                    selected,
1274                    Some(&schema),
1275                    "ORDER BY",
1276                )?)
1277            }
1278        }
1279        Ok(lf.sort_by_exprs(
1280            &by,
1281            SortMultipleOptions::default()
1282                .with_order_descending_multi(descending)
1283                .with_nulls_last_multi(nulls_last)
1284                .with_maintain_order(true),
1285        ))
1286    }
1287
1288    fn process_group_by(
1289        &mut self,
1290        mut lf: LazyFrame,
1291        group_by_keys: &[Expr],
1292        projections: &[Expr],
1293    ) -> PolarsResult<LazyFrame> {
1294        let schema_before = self.get_frame_schema(&mut lf)?;
1295        let group_by_keys_schema = expressions_to_schema(group_by_keys, &schema_before)?;
1296
1297        // Remove the group_by keys as polars adds those implicitly.
1298        let mut aggregation_projection = Vec::with_capacity(projections.len());
1299        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1300        let mut projection_aliases = PlHashSet::new();
1301        let mut group_key_aliases = PlHashSet::new();
1302
1303        for mut e in projections {
1304            // `Len` represents COUNT(*) so we treat as an aggregation here.
1305            let is_non_group_key_expr = has_expr(e, |e| {
1306                match e {
1307                    Expr::Agg(_) | Expr::Len | Expr::Window { .. } => true,
1308                    Expr::Function { function: func, .. }
1309                        if !matches!(func, FunctionExpr::StructExpr(_)) =>
1310                    {
1311                        // If it's a function call containing a column NOT in the group by keys,
1312                        // we treat it as an aggregation.
1313                        has_expr(e, |e| match e {
1314                            Expr::Column(name) => !group_by_keys_schema.contains(name),
1315                            _ => false,
1316                        })
1317                    },
1318                    _ => false,
1319                }
1320            });
1321
1322            // Note: if simple aliased expression we defer aliasing until after the group_by.
1323            if let Expr::Alias(expr, alias) = e {
1324                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1325                    group_key_aliases.insert(alias.as_ref());
1326                    e = expr
1327                } else if let Expr::Function {
1328                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1329                    ..
1330                } = expr.deref()
1331                {
1332                    projection_overrides
1333                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1334                } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
1335                    projection_aliases.insert(alias.as_ref());
1336                }
1337            }
1338            let field = e.to_field(&schema_before)?;
1339            if group_by_keys_schema.get(&field.name).is_none() && is_non_group_key_expr {
1340                let mut e = e.clone();
1341                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1342                    e = (**expr).clone();
1343                } else if let Expr::Alias(expr, name) = &e {
1344                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1345                        e = (**expr).clone().alias(name.clone());
1346                    }
1347                }
1348                aggregation_projection.push(e);
1349            } else if let Expr::Column(_)
1350            | Expr::Function {
1351                function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1352                ..
1353            } = e
1354            {
1355                // Non-aggregated columns must be part of the GROUP BY clause
1356                if !group_by_keys_schema.contains(&field.name) {
1357                    polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1358                }
1359            }
1360        }
1361        let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1362        let projection_schema = expressions_to_schema(projections, &schema_before)?;
1363
1364        // A final projection to get the proper order and any deferred transforms/aliases.
1365        let final_projection = projection_schema
1366            .iter_names()
1367            .zip(projections)
1368            .map(|(name, projection_expr)| {
1369                if let Some(expr) = projection_overrides.get(name.as_str()) {
1370                    expr.clone()
1371                } else if group_by_keys_schema.get(name).is_some()
1372                    || projection_aliases.contains(name.as_str())
1373                    || group_key_aliases.contains(name.as_str())
1374                {
1375                    projection_expr.clone()
1376                } else {
1377                    col(name.clone())
1378                }
1379            })
1380            .collect::<Vec<_>>();
1381
1382        Ok(aggregated.select(&final_projection))
1383    }
1384
1385    fn process_limit_offset(
1386        &self,
1387        lf: LazyFrame,
1388        limit: &Option<SQLExpr>,
1389        offset: &Option<Offset>,
1390    ) -> PolarsResult<LazyFrame> {
1391        match (offset, limit) {
1392            (
1393                Some(Offset {
1394                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1395                    ..
1396                }),
1397                Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1398            ) => Ok(lf.slice(
1399                offset
1400                    .parse()
1401                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1402                limit
1403                    .parse()
1404                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1405            )),
1406            (
1407                Some(Offset {
1408                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1409                    ..
1410                }),
1411                None,
1412            ) => Ok(lf.slice(
1413                offset
1414                    .parse()
1415                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1416                IdxSize::MAX,
1417            )),
1418            (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1419                limit
1420                    .parse()
1421                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1422            )),
1423            (None, None) => Ok(lf),
1424            _ => polars_bail!(
1425                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1426            ),
1427        }
1428    }
1429
1430    fn process_qualified_wildcard(
1431        &mut self,
1432        ObjectName(idents): &ObjectName,
1433        options: &WildcardAdditionalOptions,
1434        modifiers: &mut SelectModifiers,
1435        schema: Option<&Schema>,
1436    ) -> PolarsResult<Vec<Expr>> {
1437        let mut new_idents = idents.clone();
1438        new_idents.push(Ident::new("*"));
1439
1440        let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1441        self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1442    }
1443
1444    fn process_wildcard_additional_options(
1445        &mut self,
1446        exprs: Vec<Expr>,
1447        options: &WildcardAdditionalOptions,
1448        modifiers: &mut SelectModifiers,
1449        schema: Option<&Schema>,
1450    ) -> PolarsResult<Vec<Expr>> {
1451        if options.opt_except.is_some() && options.opt_exclude.is_some() {
1452            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1453        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1454            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1455        }
1456
1457        // SELECT * EXCLUDE
1458        if let Some(items) = &options.opt_exclude {
1459            match items {
1460                ExcludeSelectItem::Single(ident) => {
1461                    modifiers.exclude.insert(ident.value.clone());
1462                },
1463                ExcludeSelectItem::Multiple(idents) => {
1464                    modifiers
1465                        .exclude
1466                        .extend(idents.iter().map(|i| i.value.clone()));
1467                },
1468            };
1469        }
1470
1471        // SELECT * EXCEPT
1472        if let Some(items) = &options.opt_except {
1473            modifiers.exclude.insert(items.first_element.value.clone());
1474            modifiers
1475                .exclude
1476                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1477        }
1478
1479        // SELECT * ILIKE
1480        if let Some(item) = &options.opt_ilike {
1481            let rx = regex::escape(item.pattern.as_str())
1482                .replace('%', ".*")
1483                .replace('_', ".");
1484
1485            modifiers.ilike = Some(
1486                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1487            );
1488        }
1489
1490        // SELECT * RENAME
1491        if let Some(items) = &options.opt_rename {
1492            let renames = match items {
1493                RenameSelectItem::Single(rename) => vec![rename],
1494                RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1495            };
1496            for rn in renames {
1497                let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1498                let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1499                if before != after {
1500                    modifiers.rename.insert(before, after);
1501                }
1502            }
1503        }
1504
1505        // SELECT * REPLACE
1506        if let Some(replacements) = &options.opt_replace {
1507            for rp in &replacements.items {
1508                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1509                modifiers
1510                    .replace
1511                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1512            }
1513        }
1514        Ok(exprs)
1515    }
1516
1517    fn rename_columns_from_table_alias(
1518        &mut self,
1519        mut lf: LazyFrame,
1520        alias: &TableAlias,
1521    ) -> PolarsResult<LazyFrame> {
1522        if alias.columns.is_empty() {
1523            Ok(lf)
1524        } else {
1525            let schema = self.get_frame_schema(&mut lf)?;
1526            if alias.columns.len() != schema.len() {
1527                polars_bail!(
1528                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1529                    alias.columns.len(), alias.name.value, schema.len()
1530                )
1531            } else {
1532                let existing_columns: Vec<_> = schema.iter_names().collect();
1533                let new_columns: Vec<_> =
1534                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
1535                Ok(lf.rename(existing_columns, new_columns, true))
1536            }
1537        }
1538    }
1539}
1540
1541impl SQLContext {
1542    /// Get internal table map. For internal use only.
1543    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1544        self.table_map.clone()
1545    }
1546
1547    /// Create a new SQLContext from a table map. For internal use only
1548    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1549        Self {
1550            table_map,
1551            ..Default::default()
1552        }
1553    }
1554}
1555
1556fn collect_compound_identifiers(
1557    left: &[Ident],
1558    right: &[Ident],
1559    left_name: &str,
1560    right_name: &str,
1561) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1562    if left.len() == 2 && right.len() == 2 {
1563        let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1564        let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1565
1566        // switch left/right operands if the caller has them in reverse
1567        if left_name == tbl_b || right_name == tbl_a {
1568            Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1569        } else {
1570            Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1571        }
1572    } else {
1573        polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1574    }
1575}
1576
1577fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1578    match expr {
1579        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1580            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1581            schema
1582                .iter_names()
1583                .filter(|name| re.is_match(name))
1584                .map(|name| col(name.clone()))
1585                .collect::<Vec<_>>()
1586        },
1587        Expr::Selector(s) => s
1588            .into_columns(schema, &Default::default())
1589            .unwrap()
1590            .into_iter()
1591            .map(col)
1592            .collect::<Vec<_>>(),
1593        _ => vec![expr],
1594    }
1595}
1596
1597fn is_regex_colname(nm: &str) -> bool {
1598    nm.starts_with('^') && nm.ends_with('$')
1599}
1600
1601fn process_join_on(
1602    expression: &sqlparser::ast::Expr,
1603    tbl_left: &TableInfo,
1604    tbl_right: &TableInfo,
1605) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1606    match expression {
1607        SQLExpr::BinaryOp { left, op, right } => match op {
1608            BinaryOperator::And => {
1609                let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1610                let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1611                left_i.append(&mut left_j);
1612                right_i.append(&mut right_j);
1613                Ok((left_i, right_i))
1614            },
1615            BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1616                (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1617                    collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1618                },
1619                _ => {
1620                    polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1621                },
1622            },
1623            _ => {
1624                polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1625            },
1626        },
1627        SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1628        _ => {
1629            polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1630        },
1631    }
1632}
1633
1634fn process_join_constraint(
1635    constraint: &JoinConstraint,
1636    tbl_left: &TableInfo,
1637    tbl_right: &TableInfo,
1638) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1639    match constraint {
1640        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1641            process_join_on(expr, tbl_left, tbl_right)
1642        },
1643        JoinConstraint::Using(idents) if !idents.is_empty() => {
1644            let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1645            Ok((using.clone(), using))
1646        },
1647        JoinConstraint::Natural => {
1648            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1649            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1650            let on: Vec<Expr> = left_names
1651                .intersection(&right_names)
1652                .map(|&name| col(name.clone()))
1653                .collect();
1654            if on.is_empty() {
1655                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1656            }
1657            Ok((on.clone(), on))
1658        },
1659        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1660    }
1661}
1662
1663bitflags::bitflags! {
1664    /// Bitfield indicating whether there exists a projection with the specified height behavior.
1665    ///
1666    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
1667    /// context.
1668    #[derive(PartialEq)]
1669    struct ExprSqlProjectionHeightBehavior: u8 {
1670        /// Maintains the height of input column(s)
1671        const MaintainsColumn = 1 << 0;
1672        /// Height is independent of input, e.g.:
1673        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
1674        /// * aggregations: count(*), first(), sum() etc.
1675        const Independent = 1 << 1;
1676        /// "Inherits" the height of the context, e.g.:
1677        /// * Scalar literals
1678        const InheritsContext = 1 << 2;
1679    }
1680}
1681
1682impl ExprSqlProjectionHeightBehavior {
1683    fn identify_from_expr(expr: &Expr) -> Self {
1684        let mut has_column = false;
1685        let mut has_independent = false;
1686
1687        for e in expr.into_iter() {
1688            use Expr::*;
1689
1690            has_column |= matches!(e, Column(_) | Selector(_));
1691
1692            has_independent |= match e {
1693                // @TODO: This is broken now with functions.
1694                AnonymousFunction { options, .. } => {
1695                    options.returns_scalar() || !options.is_length_preserving()
1696                },
1697
1698                Literal(v) => !v.is_scalar(),
1699
1700                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1701
1702                Agg { .. } | Len => true,
1703
1704                _ => false,
1705            }
1706        }
1707
1708        if has_independent {
1709            Self::Independent
1710        } else if has_column {
1711            Self::MaintainsColumn
1712        } else {
1713            Self::InheritsContext
1714        }
1715    }
1716}