1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12 BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
13 FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
14 OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
15 Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16 WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29 pub(crate) frame: LazyFrame,
30 pub(crate) name: PlSmallStr,
31 pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
40impl SelectModifiers {
41 fn matches_ilike(&self, s: &str) -> bool {
42 match &self.ilike {
43 Some(rx) => rx.is_match(s),
44 None => true,
45 }
46 }
47 fn renamed_cols(&self) -> Vec<Expr> {
48 self.rename
49 .iter()
50 .map(|(before, after)| col(before.clone()).alias(after.clone()))
51 .collect()
52 }
53}
54
55#[derive(Clone)]
57pub struct SQLContext {
58 pub(crate) table_map: PlHashMap<String, LazyFrame>,
59 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60 pub(crate) lp_arena: Arena<IR>,
61 pub(crate) expr_arena: Arena<AExpr>,
62
63 cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64 table_aliases: RefCell<PlHashMap<String, String>>,
65 joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69 fn default() -> Self {
70 Self {
71 function_registry: Arc::new(DefaultFunctionRegistry {}),
72 table_map: Default::default(),
73 cte_map: Default::default(),
74 table_aliases: Default::default(),
75 joined_aliases: Default::default(),
76 lp_arena: Default::default(),
77 expr_arena: Default::default(),
78 }
79 }
80}
81
82impl SQLContext {
83 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn get_tables(&self) -> Vec<String> {
96 let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97 tables.sort_unstable();
98 tables
99 }
100
101 pub fn register(&mut self, name: &str, lf: LazyFrame) {
117 self.table_map.insert(name.to_owned(), lf);
118 }
119
120 pub fn unregister(&mut self, name: &str) {
122 self.table_map.remove(&name.to_owned());
123 }
124
125 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144 let mut parser = Parser::new(&GenericDialect);
145 parser = parser.with_options(ParserOptions {
146 trailing_commas: true,
147 ..Default::default()
148 });
149
150 let ast = parser
151 .try_with_sql(query)
152 .map_err(to_sql_interface_err)?
153 .parse_statements()
154 .map_err(to_sql_interface_err)?;
155
156 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157 let res = self.execute_statement(ast.first().unwrap())?;
158
159 let lp_arena = std::mem::take(&mut self.lp_arena);
162 let expr_arena = std::mem::take(&mut self.expr_arena);
163 res.set_cached_arena(lp_arena, expr_arena);
164
165 self.cte_map.borrow_mut().clear();
167 self.table_aliases.borrow_mut().clear();
168 self.joined_aliases.borrow_mut().clear();
169
170 Ok(res)
171 }
172
173 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176 self.function_registry = function_registry;
177 self
178 }
179
180 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182 &self.function_registry
183 }
184
185 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187 Arc::get_mut(&mut self.function_registry).unwrap()
188 }
189}
190
191impl SQLContext {
192 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193 let ast = stmt;
194 Ok(match ast {
195 Statement::Query(query) => self.execute_query(query)?,
196 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198 stmt @ Statement::Drop {
199 object_type: ObjectType::Table,
200 ..
201 } => self.execute_drop_table(stmt)?,
202 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
205 _ => polars_bail!(
206 SQLInterface: "statement type is not supported:\n{:?}", ast,
207 ),
208 })
209 }
210
211 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
212 self.register_ctes(query)?;
213 self.execute_query_no_ctes(query)
214 }
215
216 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217 let lf = self.process_query(&query.body, query)?;
218 self.process_limit_offset(lf, &query.limit, &query.offset)
219 }
220
221 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
222 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
223 }
224
225 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
226 let table = self.table_map.get(name).cloned();
227 table
228 .or_else(|| self.cte_map.borrow().get(name).cloned())
229 .or_else(|| {
230 self.table_aliases
231 .borrow()
232 .get(name)
233 .and_then(|alias| self.table_map.get(alias).cloned())
234 })
235 }
236
237 fn expr_or_ordinal(
238 &mut self,
239 e: &SQLExpr,
240 exprs: &[Expr],
241 selected: Option<&[Expr]>,
242 schema: Option<&Schema>,
243 clause: &str,
244 ) -> PolarsResult<Expr> {
245 match e {
246 SQLExpr::UnaryOp {
247 op: UnaryOperator::Minus,
248 expr,
249 } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
250 if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
251 Err(polars_err!(
252 SQLSyntax:
253 "negative ordinal values are invalid for {}; found -{}",
254 clause,
255 idx
256 ))
257 } else {
258 unreachable!()
259 }
260 },
261 SQLExpr::Value(SQLValue::Number(idx, _)) => {
262 let idx = idx.parse::<usize>().map_err(|_| {
264 polars_err!(
265 SQLSyntax:
266 "negative ordinal values are invalid for {}; found {}",
267 clause,
268 idx
269 )
270 })?;
271 let cols = if let Some(cols) = selected {
274 cols
275 } else {
276 exprs
277 };
278 Ok(cols
279 .get(idx - 1)
280 .ok_or_else(|| {
281 polars_err!(
282 SQLInterface:
283 "{} ordinal value must refer to a valid column; found {}",
284 clause,
285 idx
286 )
287 })?
288 .clone())
289 },
290 SQLExpr::Value(v) => Err(polars_err!(
291 SQLSyntax:
292 "{} requires a valid expression or positive ordinal; found {}", clause, v,
293 )),
294 _ => parse_sql_expr(e, self, schema),
295 }
296 }
297
298 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
299 if self.joined_aliases.borrow().contains_key(tbl_name) {
300 self.joined_aliases
301 .borrow()
302 .get(tbl_name)
303 .and_then(|aliases| aliases.get(column_name))
304 .cloned()
305 .unwrap_or_else(|| column_name.to_string())
306 } else {
307 column_name.to_string()
308 }
309 }
310
311 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
312 match expr {
313 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
314 SetExpr::Query(query) => self.execute_query_no_ctes(query),
315 SetExpr::SetOperation {
316 op: SetOperator::Union,
317 set_quantifier,
318 left,
319 right,
320 } => self.process_union(left, right, set_quantifier, query),
321
322 #[cfg(feature = "semi_anti_join")]
323 SetExpr::SetOperation {
324 op: SetOperator::Intersect | SetOperator::Except,
325 set_quantifier,
326 left,
327 right,
328 } => self.process_except_intersect(left, right, set_quantifier, query),
329
330 SetExpr::Values(Values {
331 explicit_row: _,
332 rows,
333 }) => self.process_values(rows),
334
335 SetExpr::Table(tbl) => {
336 if tbl.table_name.is_some() {
337 let table_name = tbl.table_name.as_ref().unwrap();
338 self.get_table_from_current_scope(table_name)
339 .ok_or_else(|| {
340 polars_err!(
341 SQLInterface: "no table or alias named '{}' found",
342 tbl
343 )
344 })
345 } else {
346 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
347 }
348 },
349 op => {
350 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
351 },
352 }
353 }
354
355 #[cfg(feature = "semi_anti_join")]
356 fn process_except_intersect(
357 &mut self,
358 left: &SetExpr,
359 right: &SetExpr,
360 quantifier: &SetQuantifier,
361 query: &Query,
362 ) -> PolarsResult<LazyFrame> {
363 let (join_type, op_name) = match *query.body {
364 SetExpr::SetOperation {
365 op: SetOperator::Except,
366 ..
367 } => (JoinType::Anti, "EXCEPT"),
368 _ => (JoinType::Semi, "INTERSECT"),
369 };
370 let mut lf = self.process_query(left, query)?;
371 let mut rf = self.process_query(right, query)?;
372 let join = lf
373 .clone()
374 .join_builder()
375 .with(rf.clone())
376 .how(join_type)
377 .join_nulls(true);
378
379 let lf_schema = self.get_frame_schema(&mut lf)?;
380 let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
381 let joined_tbl = match quantifier {
382 SetQuantifier::ByName => join.on(lf_cols).finish(),
383 SetQuantifier::Distinct | SetQuantifier::None => {
384 let rf_schema = self.get_frame_schema(&mut rf)?;
385 let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
386 if lf_cols.len() != rf_cols.len() {
387 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
388 }
389 join.left_on(lf_cols).right_on(rf_cols).finish()
390 },
391 _ => {
392 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
393 },
394 };
395 Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
396 }
397
398 fn process_union(
399 &mut self,
400 left: &SetExpr,
401 right: &SetExpr,
402 quantifier: &SetQuantifier,
403 query: &Query,
404 ) -> PolarsResult<LazyFrame> {
405 let mut lf = self.process_query(left, query)?;
406 let mut rf = self.process_query(right, query)?;
407 let opts = UnionArgs {
408 parallel: true,
409 to_supertypes: true,
410 ..Default::default()
411 };
412 match quantifier {
413 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
415 let lf_schema = self.get_frame_schema(&mut lf)?;
416 let rf_schema = self.get_frame_schema(&mut rf)?;
417 if lf_schema.len() != rf_schema.len() {
418 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
419 }
420 let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
421 match quantifier {
422 SetQuantifier::Distinct | SetQuantifier::None => {
423 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
424 },
425 _ => concatenated,
426 }
427 },
428 #[cfg(feature = "diagonal_concat")]
430 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
431 #[cfg(feature = "diagonal_concat")]
433 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
434 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
435 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
436 },
437 #[allow(unreachable_patterns)]
438 _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
439 }
440 }
441
442 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
443 let frame_rows: Vec<Row> = values.iter().map(|row| {
444 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
445 let expr = parse_sql_expr(expr, self, None)?;
446 match expr {
447 Expr::Literal(value) => {
448 value.to_any_value()
449 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
450 .map(|av| av.into_static())
451 },
452 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
453 }
454 }).collect();
455 row_data.map(Row::new)
456 }).collect::<Result<_, _>>()?;
457
458 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
459 }
460
461 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
463 match stmt {
464 Statement::Explain { statement, .. } => {
465 let lf = self.execute_statement(statement)?;
466 let plan = lf.describe_optimized_plan()?;
467 let plan = plan
468 .split('\n')
469 .collect::<Series>()
470 .with_name(PlSmallStr::from_static("Logical Plan"))
471 .into_column();
472 let df = DataFrame::new(vec![plan])?;
473 Ok(df.lazy())
474 },
475 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
476 }
477 }
478
479 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
481 let tables = Column::new("name".into(), self.get_tables());
482 let df = DataFrame::new(vec![tables])?;
483 Ok(df.lazy())
484 }
485
486 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
488 match stmt {
489 Statement::Drop { names, .. } => {
490 names.iter().for_each(|name| {
491 self.table_map.remove(&name.to_string());
492 });
493 Ok(DataFrame::empty().lazy())
494 },
495 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
496 }
497 }
498
499 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
501 if let Statement::Delete(Delete {
502 tables,
503 from,
504 using,
505 selection,
506 returning,
507 order_by,
508 limit,
509 }) = stmt
510 {
511 if !tables.is_empty()
512 || using.is_some()
513 || returning.is_some()
514 || limit.is_some()
515 || !order_by.is_empty()
516 {
517 let error_message = match () {
518 _ if !tables.is_empty() => "DELETE expects exactly one table name",
519 _ if using.is_some() => "DELETE does not support the USING clause",
520 _ if returning.is_some() => "DELETE does not support the RETURNING clause",
521 _ if limit.is_some() => "DELETE does not support the LIMIT clause",
522 _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
523 _ => unreachable!(),
524 };
525 polars_bail!(SQLInterface: error_message);
526 }
527 let from_tables = match &from {
528 FromTable::WithFromKeyword(from) => from,
529 FromTable::WithoutKeyword(from) => from,
530 };
531 if from_tables.len() > 1 {
532 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
533 }
534 let tbl_expr = from_tables.first().unwrap();
535 if !tbl_expr.joins.is_empty() {
536 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
537 }
538 let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
539 if selection.is_none() {
540 Ok(DataFrame::empty_with_schema(
542 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
543 .unwrap()
544 .as_ref(),
545 )
546 .lazy())
547 } else {
548 Ok(self.process_where(lf.clone(), selection, true)?)
550 }
551 } else {
552 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
553 }
554 }
555
556 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
558 if let Statement::Truncate {
559 table_names,
560 partitions,
561 ..
562 } = stmt
563 {
564 match partitions {
565 None => {
566 if table_names.len() != 1 {
567 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
568 }
569 let tbl = table_names[0].to_string();
570 if let Some(lf) = self.table_map.get_mut(&tbl) {
571 *lf = DataFrame::empty_with_schema(
572 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
573 .unwrap()
574 .as_ref(),
575 )
576 .lazy();
577 Ok(lf.clone())
578 } else {
579 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
580 }
581 },
582 _ => {
583 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
584 },
585 }
586 } else {
587 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
588 }
589 }
590
591 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
592 self.cte_map.borrow_mut().insert(name.to_owned(), lf);
593 }
594
595 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
596 if let Some(with) = &query.with {
597 if with.recursive {
598 polars_bail!(SQLInterface: "recursive CTEs are not supported")
599 }
600 for cte in &with.cte_tables {
601 let cte_name = cte.alias.name.value.clone();
602 let mut lf = self.execute_query(&cte.query)?;
603 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
604 self.register_cte(&cte_name, lf);
605 }
606 }
607 Ok(())
608 }
609
610 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
612 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
613 if !tbl_expr.joins.is_empty() {
614 for join in &tbl_expr.joins {
615 let (r_name, mut rf) = self.get_table(&join.relation)?;
616 if r_name.is_empty() {
617 polars_bail!(
619 SQLInterface:
620 "cannot join on unnamed relation; please provide an alias"
621 )
622 }
623 let left_schema = self.get_frame_schema(&mut lf)?;
624 let right_schema = self.get_frame_schema(&mut rf)?;
625
626 lf = match &join.join_operator {
627 op @ (JoinOperator::FullOuter(constraint)
628 | JoinOperator::LeftOuter(constraint)
629 | JoinOperator::RightOuter(constraint)
630 | JoinOperator::Inner(constraint)
631 | JoinOperator::Anti(constraint)
632 | JoinOperator::Semi(constraint)
633 | JoinOperator::LeftAnti(constraint)
634 | JoinOperator::LeftSemi(constraint)
635 | JoinOperator::RightAnti(constraint)
636 | JoinOperator::RightSemi(constraint)) => {
637 let (lf, rf) = match op {
638 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
639 _ => (lf, rf),
640 };
641 self.process_join(
642 &TableInfo {
643 frame: lf,
644 name: (&l_name).into(),
645 schema: left_schema.clone(),
646 },
647 &TableInfo {
648 frame: rf,
649 name: (&r_name).into(),
650 schema: right_schema.clone(),
651 },
652 constraint,
653 match op {
654 JoinOperator::FullOuter(_) => JoinType::Full,
655 JoinOperator::LeftOuter(_) => JoinType::Left,
656 JoinOperator::RightOuter(_) => JoinType::Right,
657 JoinOperator::Inner(_) => JoinType::Inner,
658 #[cfg(feature = "semi_anti_join")]
659 JoinOperator::Anti(_)
660 | JoinOperator::LeftAnti(_)
661 | JoinOperator::RightAnti(_) => JoinType::Anti,
662 #[cfg(feature = "semi_anti_join")]
663 JoinOperator::Semi(_)
664 | JoinOperator::LeftSemi(_)
665 | JoinOperator::RightSemi(_) => JoinType::Semi,
666 join_type => polars_bail!(
667 SQLInterface:
668 "join type '{:?}' not currently supported",
669 join_type
670 ),
671 },
672 )?
673 },
674 JoinOperator::CrossJoin => {
675 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
676 },
677 join_type => {
678 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
679 },
680 };
681
682 let joined_schema = self.get_frame_schema(&mut lf)?;
684
685 self.joined_aliases.borrow_mut().insert(
686 r_name.clone(),
687 right_schema
688 .iter_names()
689 .filter_map(|name| {
690 let aliased_name = format!("{}:{}", name, r_name);
692 if left_schema.contains(name)
693 && joined_schema.contains(aliased_name.as_str())
694 {
695 Some((name.to_string(), aliased_name))
696 } else {
697 None
698 }
699 })
700 .collect::<PlHashMap<String, String>>(),
701 );
702 }
703 };
704 Ok(lf)
705 }
706
707 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
709 let mut lf = if select_stmt.from.is_empty() {
710 DataFrame::empty().lazy()
711 } else {
712 let from = select_stmt.clone().from;
715 if from.len() > 1 {
716 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
717 }
718 self.execute_from_statement(from.first().unwrap())?
719 };
720
721 let schema = self.get_frame_schema(&mut lf)?;
723 lf = self.process_where(lf, &select_stmt.selection, false)?;
724
725 let mut select_modifiers = SelectModifiers {
727 ilike: None,
728 exclude: PlHashSet::new(),
729 rename: PlHashMap::new(),
730 replace: vec![],
731 };
732
733 let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
734
735 let mut group_by_keys: Vec<Expr> = Vec::new();
737 match &select_stmt.group_by {
738 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
740 if !modifiers.is_empty() {
741 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
742 }
743 group_by_keys = group_by_exprs
745 .iter()
746 .map(|e| {
747 self.expr_or_ordinal(
748 e,
749 &projections,
750 None,
751 Some(schema.deref()),
752 "GROUP BY",
753 )
754 })
755 .collect::<PolarsResult<_>>()?
756 },
757 GroupByExpr::All(modifiers) => {
760 if !modifiers.is_empty() {
761 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
762 }
763 projections.iter().for_each(|expr| match expr {
764 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
766 Expr::Column(_) => group_by_keys.push(expr.clone()),
767 Expr::Alias(e, _)
768 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
769 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
770 if let Expr::Column(name) = &**e {
771 group_by_keys.push(col(name.clone()));
772 }
773 },
774 _ => {
775 if !has_expr(expr, |e| {
777 matches!(e, Expr::Agg(_))
778 || matches!(e, Expr::Len)
779 || matches!(e, Expr::Window { .. })
780 }) {
781 group_by_keys.push(expr.clone())
782 }
783 },
784 });
785 },
786 };
787
788 lf = if group_by_keys.is_empty() {
789 if select_stmt.having.is_some() {
791 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
792 };
793
794 let mut retained_cols = Vec::with_capacity(projections.len());
796 let mut retained_names = Vec::with_capacity(projections.len());
797 let have_order_by = query.order_by.is_some();
798 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
800
801 for p in projections.iter() {
806 let name = p
807 .to_field(schema.deref(), Context::Default)?
808 .name
809 .to_string();
810 if select_modifiers.matches_ilike(&name)
811 && !select_modifiers.exclude.contains(&name)
812 {
813 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
814
815 retained_cols.push(if have_order_by {
816 col(name.as_str())
817 } else {
818 p.clone()
819 });
820 retained_names.push(col(name));
821 }
822 }
823
824 if have_order_by {
826 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
831 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
832 {
833 lf = lf.with_columns(projections);
834 } else {
835 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
845 lf = lf
846 .clone()
847 .select(projections)
848 .with_row_index(NAME, None)
849 .join(
850 lf.with_row_index(NAME, None),
851 [col(NAME)],
852 [col(NAME)],
853 JoinArgs {
854 how: JoinType::Left,
855 validation: Default::default(),
856 suffix: None,
857 slice: None,
858 nulls_equal: false,
859 coalesce: Default::default(),
860 maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
861 },
862 );
863 }
864 }
865
866 if !select_modifiers.replace.is_empty() {
867 lf = lf.with_columns(&select_modifiers.replace);
868 }
869 if !select_modifiers.rename.is_empty() {
870 lf = lf.with_columns(select_modifiers.renamed_cols());
871 }
872
873 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
874
875 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
877 && !have_order_by
878 {
879 lf = lf.with_columns(retained_cols).select(retained_names);
881 } else {
882 lf = lf.select(retained_cols);
883 }
884
885 if !select_modifiers.rename.is_empty() {
886 lf = lf.rename(
887 select_modifiers.rename.keys(),
888 select_modifiers.rename.values(),
889 true,
890 );
891 };
892 lf
893 } else {
894 lf = self.process_group_by(lf, &group_by_keys, &projections)?;
895 lf = self.process_order_by(lf, &query.order_by, None)?;
896
897 let schema = Some(self.get_frame_schema(&mut lf)?);
899 match select_stmt.having.as_ref() {
900 Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
901 None => lf,
902 }
903 };
904
905 lf = match &select_stmt.distinct {
907 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
908 Some(Distinct::On(exprs)) => {
909 let schema = Some(self.get_frame_schema(&mut lf)?);
911 let cols = exprs
912 .iter()
913 .map(|e| {
914 let expr = parse_sql_expr(e, self, schema.as_deref())?;
915 if let Expr::Column(name) = expr {
916 Ok(name.clone())
917 } else {
918 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
919 }
920 })
921 .collect::<PolarsResult<Vec<_>>>()?;
922
923 lf = self.process_order_by(lf, &query.order_by, None)?;
925 return Ok(lf.unique_stable(Some(cols.clone()), UniqueKeepStrategy::First));
926 },
927 None => lf,
928 };
929 Ok(lf)
930 }
931
932 fn column_projections(
933 &mut self,
934 select_stmt: &Select,
935 schema: &SchemaRef,
936 select_modifiers: &mut SelectModifiers,
937 ) -> PolarsResult<Vec<Expr>> {
938 let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
939 .projection
940 .iter()
941 .map(|select_item| match select_item {
942 SelectItem::UnnamedExpr(expr) => {
943 Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
944 },
945 SelectItem::ExprWithAlias { expr, alias } => {
946 let expr = parse_sql_expr(expr, self, Some(schema))?;
947 Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
948 },
949 SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
950 .process_qualified_wildcard(
951 obj_name,
952 wildcard_options,
953 select_modifiers,
954 Some(schema),
955 ),
956 SelectItem::Wildcard(wildcard_options) => {
957 let cols = schema
958 .iter_names()
959 .map(|name| col(name.clone()))
960 .collect::<Vec<_>>();
961
962 self.process_wildcard_additional_options(
963 cols,
964 wildcard_options,
965 select_modifiers,
966 Some(schema),
967 )
968 },
969 })
970 .collect();
971
972 let flattened_exprs: Vec<Expr> = parsed_items?
973 .into_iter()
974 .flatten()
975 .flat_map(|expr| expand_exprs(expr, schema))
976 .collect();
977
978 Ok(flattened_exprs)
979 }
980
981 fn process_where(
982 &mut self,
983 mut lf: LazyFrame,
984 expr: &Option<SQLExpr>,
985 invert_filter: bool,
986 ) -> PolarsResult<LazyFrame> {
987 if let Some(expr) = expr {
988 let schema = self.get_frame_schema(&mut lf)?;
989
990 let (all_true, all_false) = match expr {
992 SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
993 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
994 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
995 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
996 (a != b, a == b)
997 },
998 _ => (false, false),
999 },
1000 _ => (false, false),
1001 };
1002 if (all_true && !invert_filter) || (all_false && invert_filter) {
1003 return Ok(lf);
1004 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1005 return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1006 }
1007
1008 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1010 if filter_expression.clone().meta().has_multiple_outputs() {
1011 filter_expression = all_horizontal([filter_expression])?;
1012 }
1013 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1014 lf = if invert_filter {
1015 lf.remove(filter_expression)
1016 } else {
1017 lf.filter(filter_expression)
1018 };
1019 }
1020 Ok(lf)
1021 }
1022
1023 pub(super) fn process_join(
1024 &mut self,
1025 tbl_left: &TableInfo,
1026 tbl_right: &TableInfo,
1027 constraint: &JoinConstraint,
1028 join_type: JoinType,
1029 ) -> PolarsResult<LazyFrame> {
1030 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1031
1032 let joined = tbl_left
1033 .frame
1034 .clone()
1035 .join_builder()
1036 .with(tbl_right.frame.clone())
1037 .left_on(left_on)
1038 .right_on(right_on)
1039 .how(join_type)
1040 .suffix(format!(":{}", tbl_right.name))
1041 .coalesce(JoinCoalesce::KeepColumns)
1042 .finish();
1043
1044 Ok(joined)
1045 }
1046
1047 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1048 let mut contexts = vec![];
1049 for expr in exprs {
1050 *expr = expr.clone().map_expr(|e| match e {
1051 Expr::SubPlan(lp, names) => {
1052 contexts.push(<LazyFrame>::from((**lp).clone()));
1053 if names.len() == 1 {
1054 Expr::Column(names[0].as_str().into())
1055 } else {
1056 Expr::SubPlan(lp, names)
1057 }
1058 },
1059 e => e,
1060 })
1061 }
1062
1063 if contexts.is_empty() {
1064 lf
1065 } else {
1066 lf.with_context(contexts)
1067 }
1068 }
1069
1070 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1071 if let Statement::CreateTable(CreateTable {
1072 if_not_exists,
1073 name,
1074 query,
1075 ..
1076 }) = stmt
1077 {
1078 let tbl_name = name.0.first().unwrap().value.as_str();
1079 if *if_not_exists && self.table_map.contains_key(tbl_name) {
1081 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1082 }
1084 if let Some(query) = query {
1085 let lf = self.execute_query(query)?;
1086 self.register(tbl_name, lf);
1087 let out = df! {
1088 "Response" => ["CREATE TABLE"]
1089 }
1090 .unwrap()
1091 .lazy();
1092 Ok(out)
1093 } else {
1094 polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1095 }
1096 } else {
1097 unreachable!()
1098 }
1099 }
1100
1101 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1102 match relation {
1103 TableFactor::Table {
1104 name, alias, args, ..
1105 } => {
1106 if let Some(args) = args {
1107 return self.execute_table_function(name, alias, &args.args);
1108 }
1109 let tbl_name = name.0.first().unwrap().value.as_str();
1110 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1111 match alias {
1112 Some(alias) => {
1113 self.table_aliases
1114 .borrow_mut()
1115 .insert(alias.name.value.clone(), tbl_name.to_string());
1116 Ok((alias.to_string(), lf))
1117 },
1118 None => Ok((tbl_name.to_string(), lf)),
1119 }
1120 } else {
1121 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1122 }
1123 },
1124 TableFactor::Derived {
1125 lateral,
1126 subquery,
1127 alias,
1128 } => {
1129 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1130 if let Some(alias) = alias {
1131 let mut lf = self.execute_query_no_ctes(subquery)?;
1132 lf = self.rename_columns_from_table_alias(lf, alias)?;
1133 self.table_map.insert(alias.name.value.clone(), lf.clone());
1134 Ok((alias.name.value.clone(), lf))
1135 } else {
1136 polars_bail!(SQLSyntax: "derived tables must have aliases");
1137 }
1138 },
1139 TableFactor::UNNEST {
1140 alias,
1141 array_exprs,
1142 with_offset,
1143 with_offset_alias: _,
1144 ..
1145 } => {
1146 if let Some(alias) = alias {
1147 let table_name = alias.name.value.clone();
1148 let column_names: Vec<Option<PlSmallStr>> = alias
1149 .columns
1150 .iter()
1151 .map(|c| {
1152 if c.name.value.is_empty() {
1153 None
1154 } else {
1155 Some(PlSmallStr::from_str(c.name.value.as_str()))
1156 }
1157 })
1158 .collect();
1159
1160 let column_values: Vec<Series> = array_exprs
1161 .iter()
1162 .map(|arr| parse_sql_array(arr, self))
1163 .collect::<Result<_, _>>()?;
1164
1165 polars_ensure!(!column_names.is_empty(),
1166 SQLSyntax:
1167 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1168 );
1169 if column_names.len() != column_values.len() {
1170 let plural = if column_values.len() > 1 { "s" } else { "" };
1171 polars_bail!(
1172 SQLSyntax:
1173 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1174 );
1175 }
1176 let column_series: Vec<Column> = column_values
1177 .into_iter()
1178 .zip(column_names)
1179 .map(|(s, name)| {
1180 if let Some(name) = name {
1181 s.clone().with_name(name)
1182 } else {
1183 s.clone()
1184 }
1185 })
1186 .map(Column::from)
1187 .collect();
1188
1189 let lf = DataFrame::new(column_series)?.lazy();
1190 if *with_offset {
1191 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH OFFSET/ORDINALITY");
1194 }
1195 self.table_map.insert(table_name.clone(), lf.clone());
1196 Ok((table_name.clone(), lf))
1197 } else {
1198 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1199 }
1200 },
1201 TableFactor::NestedJoin {
1202 table_with_joins,
1203 alias,
1204 } => {
1205 let lf = self.execute_from_statement(table_with_joins)?;
1206 match alias {
1207 Some(a) => Ok((a.name.value.clone(), lf)),
1208 None => Ok(("".to_string(), lf)),
1209 }
1210 },
1211 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1213 }
1214 }
1215
1216 fn execute_table_function(
1217 &mut self,
1218 name: &ObjectName,
1219 alias: &Option<TableAlias>,
1220 args: &[FunctionArg],
1221 ) -> PolarsResult<(String, LazyFrame)> {
1222 let tbl_fn = name.0.first().unwrap().value.as_str();
1223 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1224 let (tbl_name, lf) = read_fn.execute(args)?;
1225 #[allow(clippy::useless_asref)]
1226 let tbl_name = alias
1227 .as_ref()
1228 .map(|a| a.name.value.clone())
1229 .unwrap_or_else(|| tbl_name);
1230
1231 self.table_map.insert(tbl_name.clone(), lf.clone());
1232 Ok((tbl_name, lf))
1233 }
1234
1235 fn process_order_by(
1236 &mut self,
1237 mut lf: LazyFrame,
1238 order_by: &Option<OrderBy>,
1239 selected: Option<&[Expr]>,
1240 ) -> PolarsResult<LazyFrame> {
1241 if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1242 return Ok(lf);
1243 }
1244 let schema = self.get_frame_schema(&mut lf)?;
1245 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1246
1247 let order_by = order_by.as_ref().unwrap().exprs.clone();
1248 let mut descending = Vec::with_capacity(order_by.len());
1249 let mut nulls_last = Vec::with_capacity(order_by.len());
1250 let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1251
1252 if order_by.len() == 1 && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1254 {
1255 if let Some(selected) = selected {
1256 by.extend(selected.iter().cloned());
1257 } else {
1258 by.extend(columns_iter);
1259 };
1260 let desc_order = !order_by[0].asc.unwrap_or(true);
1261 nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1262 descending.resize(by.len(), desc_order);
1263 } else {
1264 let columns = &columns_iter.collect::<Vec<_>>();
1265 for ob in order_by {
1266 let desc_order = !ob.asc.unwrap_or(true);
1269 nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1270 descending.push(desc_order);
1271
1272 by.push(self.expr_or_ordinal(
1274 &ob.expr,
1275 columns,
1276 selected,
1277 Some(&schema),
1278 "ORDER BY",
1279 )?)
1280 }
1281 }
1282 Ok(lf.sort_by_exprs(
1283 &by,
1284 SortMultipleOptions::default()
1285 .with_order_descending_multi(descending)
1286 .with_nulls_last_multi(nulls_last)
1287 .with_maintain_order(true),
1288 ))
1289 }
1290
1291 fn process_group_by(
1292 &mut self,
1293 mut lf: LazyFrame,
1294 group_by_keys: &[Expr],
1295 projections: &[Expr],
1296 ) -> PolarsResult<LazyFrame> {
1297 let schema_before = self.get_frame_schema(&mut lf)?;
1298 let group_by_keys_schema =
1299 expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1300
1301 let mut aggregation_projection = Vec::with_capacity(projections.len());
1303 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1304 let mut projection_aliases = PlHashSet::new();
1305 let mut group_key_aliases = PlHashSet::new();
1306
1307 for mut e in projections {
1308 let is_agg_or_window = has_expr(e, |e| {
1310 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1311 });
1312
1313 if let Expr::Alias(expr, alias) = e {
1315 if e.clone().meta().is_simple_projection() {
1316 group_key_aliases.insert(alias.as_ref());
1317 e = expr
1318 } else if let Expr::Function {
1319 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1320 ..
1321 } = expr.deref()
1322 {
1323 projection_overrides
1324 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1325 } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1326 projection_aliases.insert(alias.as_ref());
1327 }
1328 }
1329 let field = e.to_field(&schema_before, Context::Default)?;
1330 if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1331 let mut e = e.clone();
1332 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1333 e = (**expr).clone();
1334 } else if let Expr::Alias(expr, name) = &e {
1335 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1336 e = (**expr).clone().alias(name.clone());
1337 }
1338 }
1339 aggregation_projection.push(e);
1340 } else if let Expr::Column(_)
1341 | Expr::Function {
1342 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1343 ..
1344 } = e
1345 {
1346 if !group_by_keys_schema.contains(&field.name) {
1348 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1349 }
1350 }
1351 }
1352 let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1353 let projection_schema =
1354 expressions_to_schema(projections, &schema_before, Context::Default)?;
1355
1356 let final_projection = projection_schema
1358 .iter_names()
1359 .zip(projections)
1360 .map(|(name, projection_expr)| {
1361 if let Some(expr) = projection_overrides.get(name.as_str()) {
1362 expr.clone()
1363 } else if group_by_keys_schema.get(name).is_some()
1364 || projection_aliases.contains(name.as_str())
1365 || group_key_aliases.contains(name.as_str())
1366 {
1367 projection_expr.clone()
1368 } else {
1369 col(name.clone())
1370 }
1371 })
1372 .collect::<Vec<_>>();
1373
1374 Ok(aggregated.select(&final_projection))
1375 }
1376
1377 fn process_limit_offset(
1378 &self,
1379 lf: LazyFrame,
1380 limit: &Option<SQLExpr>,
1381 offset: &Option<Offset>,
1382 ) -> PolarsResult<LazyFrame> {
1383 match (offset, limit) {
1384 (
1385 Some(Offset {
1386 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1387 ..
1388 }),
1389 Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1390 ) => Ok(lf.slice(
1391 offset
1392 .parse()
1393 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1394 limit
1395 .parse()
1396 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1397 )),
1398 (
1399 Some(Offset {
1400 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1401 ..
1402 }),
1403 None,
1404 ) => Ok(lf.slice(
1405 offset
1406 .parse()
1407 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1408 IdxSize::MAX,
1409 )),
1410 (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1411 limit
1412 .parse()
1413 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1414 )),
1415 (None, None) => Ok(lf),
1416 _ => polars_bail!(
1417 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1418 ),
1419 }
1420 }
1421
1422 fn process_qualified_wildcard(
1423 &mut self,
1424 ObjectName(idents): &ObjectName,
1425 options: &WildcardAdditionalOptions,
1426 modifiers: &mut SelectModifiers,
1427 schema: Option<&Schema>,
1428 ) -> PolarsResult<Vec<Expr>> {
1429 let mut new_idents = idents.clone();
1430 new_idents.push(Ident::new("*"));
1431
1432 let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1433 self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1434 }
1435
1436 fn process_wildcard_additional_options(
1437 &mut self,
1438 exprs: Vec<Expr>,
1439 options: &WildcardAdditionalOptions,
1440 modifiers: &mut SelectModifiers,
1441 schema: Option<&Schema>,
1442 ) -> PolarsResult<Vec<Expr>> {
1443 if options.opt_except.is_some() && options.opt_exclude.is_some() {
1444 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1445 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1446 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1447 }
1448
1449 if let Some(items) = &options.opt_exclude {
1451 match items {
1452 ExcludeSelectItem::Single(ident) => {
1453 modifiers.exclude.insert(ident.value.clone());
1454 },
1455 ExcludeSelectItem::Multiple(idents) => {
1456 modifiers
1457 .exclude
1458 .extend(idents.iter().map(|i| i.value.clone()));
1459 },
1460 };
1461 }
1462
1463 if let Some(items) = &options.opt_except {
1465 modifiers.exclude.insert(items.first_element.value.clone());
1466 modifiers
1467 .exclude
1468 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1469 }
1470
1471 if let Some(item) = &options.opt_ilike {
1473 let rx = regex::escape(item.pattern.as_str())
1474 .replace('%', ".*")
1475 .replace('_', ".");
1476
1477 modifiers.ilike = Some(
1478 polars_utils::regex_cache::compile_regex(format!("^(?is){}$", rx).as_str())
1479 .unwrap(),
1480 );
1481 }
1482
1483 if let Some(items) = &options.opt_rename {
1485 let renames = match items {
1486 RenameSelectItem::Single(rename) => vec![rename],
1487 RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1488 };
1489 for rn in renames {
1490 let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1491 let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1492 if before != after {
1493 modifiers.rename.insert(before, after);
1494 }
1495 }
1496 }
1497
1498 if let Some(replacements) = &options.opt_replace {
1500 for rp in &replacements.items {
1501 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1502 modifiers
1503 .replace
1504 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1505 }
1506 }
1507 Ok(exprs)
1508 }
1509
1510 fn rename_columns_from_table_alias(
1511 &mut self,
1512 mut lf: LazyFrame,
1513 alias: &TableAlias,
1514 ) -> PolarsResult<LazyFrame> {
1515 if alias.columns.is_empty() {
1516 Ok(lf)
1517 } else {
1518 let schema = self.get_frame_schema(&mut lf)?;
1519 if alias.columns.len() != schema.len() {
1520 polars_bail!(
1521 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1522 alias.columns.len(), alias.name.value, schema.len()
1523 )
1524 } else {
1525 let existing_columns: Vec<_> = schema.iter_names().collect();
1526 let new_columns: Vec<_> =
1527 alias.columns.iter().map(|c| c.name.value.clone()).collect();
1528 Ok(lf.rename(existing_columns, new_columns, true))
1529 }
1530 }
1531 }
1532}
1533
1534impl SQLContext {
1535 pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1537 self.table_map.clone()
1538 }
1539
1540 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1542 Self {
1543 table_map,
1544 ..Default::default()
1545 }
1546 }
1547}
1548
1549fn collect_compound_identifiers(
1550 left: &[Ident],
1551 right: &[Ident],
1552 left_name: &str,
1553 right_name: &str,
1554) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1555 if left.len() == 2 && right.len() == 2 {
1556 let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1557 let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1558
1559 if left_name == tbl_b || right_name == tbl_a {
1561 Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1562 } else {
1563 Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1564 }
1565 } else {
1566 polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1567 }
1568}
1569
1570fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1571 match expr {
1572 Expr::Wildcard => schema
1573 .iter_names()
1574 .map(|name| col(name.clone()))
1575 .collect::<Vec<_>>(),
1576 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1577 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1578 schema
1579 .iter_names()
1580 .filter(|name| re.is_match(name))
1581 .map(|name| col(name.clone()))
1582 .collect::<Vec<_>>()
1583 },
1584 Expr::Columns(names) => names
1585 .iter()
1586 .map(|name| col(name.clone()))
1587 .collect::<Vec<_>>(),
1588 _ => vec![expr],
1589 }
1590}
1591
1592fn is_regex_colname(nm: &str) -> bool {
1593 nm.starts_with('^') && nm.ends_with('$')
1594}
1595
1596fn process_join_on(
1597 expression: &sqlparser::ast::Expr,
1598 tbl_left: &TableInfo,
1599 tbl_right: &TableInfo,
1600) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1601 match expression {
1602 SQLExpr::BinaryOp { left, op, right } => match op {
1603 BinaryOperator::And => {
1604 let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1605 let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1606 left_i.append(&mut left_j);
1607 right_i.append(&mut right_j);
1608 Ok((left_i, right_i))
1609 },
1610 BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1611 (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1612 collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1613 },
1614 _ => {
1615 polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1616 },
1617 },
1618 _ => {
1619 polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1620 },
1621 },
1622 SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1623 _ => {
1624 polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1625 },
1626 }
1627}
1628
1629fn process_join_constraint(
1630 constraint: &JoinConstraint,
1631 tbl_left: &TableInfo,
1632 tbl_right: &TableInfo,
1633) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1634 match constraint {
1635 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1636 process_join_on(expr, tbl_left, tbl_right)
1637 },
1638 JoinConstraint::Using(idents) if !idents.is_empty() => {
1639 let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1640 Ok((using.clone(), using))
1641 },
1642 JoinConstraint::Natural => {
1643 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1644 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1645 let on: Vec<Expr> = left_names
1646 .intersection(&right_names)
1647 .map(|&name| col(name.clone()))
1648 .collect();
1649 if on.is_empty() {
1650 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1651 }
1652 Ok((on.clone(), on))
1653 },
1654 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1655 }
1656}
1657
1658bitflags::bitflags! {
1659 #[derive(PartialEq)]
1664 struct ExprSqlProjectionHeightBehavior: u8 {
1665 const MaintainsColumn = 1 << 0;
1667 const Independent = 1 << 1;
1671 const InheritsContext = 1 << 2;
1674 }
1675}
1676
1677impl ExprSqlProjectionHeightBehavior {
1678 fn identify_from_expr(expr: &Expr) -> Self {
1679 let mut has_column = false;
1680 let mut has_independent = false;
1681
1682 for e in expr.into_iter() {
1683 use Expr::*;
1684
1685 has_column |= matches!(e, Column(_) | Columns(_) | DtypeColumn(_) | IndexColumn(_));
1686
1687 has_independent |= match e {
1688 Function { options, .. } | AnonymousFunction { options, .. } => {
1689 options.returns_scalar() || !options.is_length_preserving()
1690 },
1691
1692 Literal(v) => !v.is_scalar(),
1693
1694 Explode(_) | Filter { .. } | Gather { .. } | Slice { .. } => true,
1695
1696 Agg { .. } | Len => true,
1697
1698 _ => false,
1699 }
1700 }
1701
1702 if has_independent {
1703 Self::Independent
1704 } else if has_column {
1705 Self::MaintainsColumn
1706 } else {
1707 Self::InheritsContext
1708 }
1709 }
1710}