1use std::ops::Deref;
2
3use polars_core::frame::row::Row;
4use polars_core::prelude::*;
5use polars_lazy::prelude::*;
6use polars_ops::frame::JoinCoalesce;
7use polars_plan::dsl::function_expr::StructFunction;
8use polars_plan::prelude::*;
9use polars_utils::format_pl_smallstr;
10use sqlparser::ast::{
11 BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
12 FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
13 OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
14 Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
15 WildcardAdditionalOptions,
16};
17use sqlparser::dialect::GenericDialect;
18use sqlparser::parser::{Parser, ParserOptions};
19
20use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
21use crate::sql_expr::{
22 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
23};
24use crate::table_functions::PolarsTableFunctions;
25
26#[derive(Clone)]
27pub struct TableInfo {
28 pub(crate) frame: LazyFrame,
29 pub(crate) name: PlSmallStr,
30 pub(crate) schema: Arc<Schema>,
31}
32
33struct SelectModifiers {
34 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
39impl SelectModifiers {
40 fn matches_ilike(&self, s: &str) -> bool {
41 match &self.ilike {
42 Some(rx) => rx.is_match(s),
43 None => true,
44 }
45 }
46 fn renamed_cols(&self) -> Vec<Expr> {
47 self.rename
48 .iter()
49 .map(|(before, after)| col(before.clone()).alias(after.clone()))
50 .collect()
51 }
52}
53
54#[derive(Clone)]
56pub struct SQLContext {
57 pub(crate) table_map: PlHashMap<String, LazyFrame>,
58 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
59 pub(crate) lp_arena: Arena<IR>,
60 pub(crate) expr_arena: Arena<AExpr>,
61
62 cte_map: PlHashMap<String, LazyFrame>,
63 table_aliases: PlHashMap<String, String>,
64 joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
65}
66
67impl Default for SQLContext {
68 fn default() -> Self {
69 Self {
70 function_registry: Arc::new(DefaultFunctionRegistry {}),
71 table_map: Default::default(),
72 cte_map: Default::default(),
73 table_aliases: Default::default(),
74 joined_aliases: Default::default(),
75 lp_arena: Default::default(),
76 expr_arena: Default::default(),
77 }
78 }
79}
80
81impl SQLContext {
82 pub fn new() -> Self {
90 Self::default()
91 }
92
93 pub fn get_tables(&self) -> Vec<String> {
95 let mut tables = Vec::from_iter(self.table_map.keys().cloned());
96 tables.sort_unstable();
97 tables
98 }
99
100 pub fn register(&mut self, name: &str, lf: LazyFrame) {
116 self.table_map.insert(name.to_owned(), lf);
117 }
118
119 pub fn unregister(&mut self, name: &str) {
121 self.table_map.remove(&name.to_owned());
122 }
123
124 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
143 let mut parser = Parser::new(&GenericDialect);
144 parser = parser.with_options(ParserOptions {
145 trailing_commas: true,
146 ..Default::default()
147 });
148
149 let ast = parser
150 .try_with_sql(query)
151 .map_err(to_sql_interface_err)?
152 .parse_statements()
153 .map_err(to_sql_interface_err)?;
154
155 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
156 let res = self.execute_statement(ast.first().unwrap())?;
157
158 let lp_arena = std::mem::take(&mut self.lp_arena);
161 let expr_arena = std::mem::take(&mut self.expr_arena);
162 res.set_cached_arena(lp_arena, expr_arena);
163
164 self.cte_map.clear();
166 self.table_aliases.clear();
167 self.joined_aliases.clear();
168
169 Ok(res)
170 }
171
172 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
175 self.function_registry = function_registry;
176 self
177 }
178
179 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
181 &self.function_registry
182 }
183
184 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
186 Arc::get_mut(&mut self.function_registry).unwrap()
187 }
188}
189
190impl SQLContext {
191 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
192 let ast = stmt;
193 Ok(match ast {
194 Statement::Query(query) => self.execute_query(query)?,
195 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
196 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
197 stmt @ Statement::Drop {
198 object_type: ObjectType::Table,
199 ..
200 } => self.execute_drop_table(stmt)?,
201 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
202 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
203 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
204 _ => polars_bail!(
205 SQLInterface: "statement type is not supported:\n{:?}", ast,
206 ),
207 })
208 }
209
210 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
211 self.register_ctes(query)?;
212 self.execute_query_no_ctes(query)
213 }
214
215 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
216 let lf = self.process_query(&query.body, query)?;
217 self.process_limit_offset(lf, &query.limit, &query.offset)
218 }
219
220 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
221 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
222 }
223
224 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
225 let table = self.table_map.get(name).cloned();
226 table
227 .or_else(|| self.cte_map.get(name).cloned())
228 .or_else(|| {
229 self.table_aliases
230 .get(name)
231 .and_then(|alias| self.table_map.get(alias).cloned())
232 })
233 }
234
235 fn expr_or_ordinal(
236 &mut self,
237 e: &SQLExpr,
238 exprs: &[Expr],
239 selected: Option<&[Expr]>,
240 schema: Option<&Schema>,
241 clause: &str,
242 ) -> PolarsResult<Expr> {
243 match e {
244 SQLExpr::UnaryOp {
245 op: UnaryOperator::Minus,
246 expr,
247 } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
248 if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
249 Err(polars_err!(
250 SQLSyntax:
251 "negative ordinal values are invalid for {}; found -{}",
252 clause,
253 idx
254 ))
255 } else {
256 unreachable!()
257 }
258 },
259 SQLExpr::Value(SQLValue::Number(idx, _)) => {
260 let idx = idx.parse::<usize>().map_err(|_| {
262 polars_err!(
263 SQLSyntax:
264 "negative ordinal values are invalid for {}; found {}",
265 clause,
266 idx
267 )
268 })?;
269 let cols = if let Some(cols) = selected {
272 cols
273 } else {
274 exprs
275 };
276 Ok(cols
277 .get(idx - 1)
278 .ok_or_else(|| {
279 polars_err!(
280 SQLInterface:
281 "{} ordinal value must refer to a valid column; found {}",
282 clause,
283 idx
284 )
285 })?
286 .clone())
287 },
288 SQLExpr::Value(v) => Err(polars_err!(
289 SQLSyntax:
290 "{} requires a valid expression or positive ordinal; found {}", clause, v,
291 )),
292 _ => parse_sql_expr(e, self, schema),
293 }
294 }
295
296 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
297 if let Some(aliases) = self.joined_aliases.get(tbl_name) {
298 if let Some(name) = aliases.get(column_name) {
299 return name.to_string();
300 }
301 }
302 column_name.to_string()
303 }
304
305 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
306 match expr {
307 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
308 SetExpr::Query(query) => self.execute_query_no_ctes(query),
309 SetExpr::SetOperation {
310 op: SetOperator::Union,
311 set_quantifier,
312 left,
313 right,
314 } => self.process_union(left, right, set_quantifier, query),
315
316 #[cfg(feature = "semi_anti_join")]
317 SetExpr::SetOperation {
318 op: SetOperator::Intersect | SetOperator::Except,
319 set_quantifier,
320 left,
321 right,
322 } => self.process_except_intersect(left, right, set_quantifier, query),
323
324 SetExpr::Values(Values {
325 explicit_row: _,
326 rows,
327 }) => self.process_values(rows),
328
329 SetExpr::Table(tbl) => {
330 if tbl.table_name.is_some() {
331 let table_name = tbl.table_name.as_ref().unwrap();
332 self.get_table_from_current_scope(table_name)
333 .ok_or_else(|| {
334 polars_err!(
335 SQLInterface: "no table or alias named '{}' found",
336 tbl
337 )
338 })
339 } else {
340 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
341 }
342 },
343 op => {
344 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
345 },
346 }
347 }
348
349 #[cfg(feature = "semi_anti_join")]
350 fn process_except_intersect(
351 &mut self,
352 left: &SetExpr,
353 right: &SetExpr,
354 quantifier: &SetQuantifier,
355 query: &Query,
356 ) -> PolarsResult<LazyFrame> {
357 let (join_type, op_name) = match *query.body {
358 SetExpr::SetOperation {
359 op: SetOperator::Except,
360 ..
361 } => (JoinType::Anti, "EXCEPT"),
362 _ => (JoinType::Semi, "INTERSECT"),
363 };
364 let mut lf = self.process_query(left, query)?;
365 let mut rf = self.process_query(right, query)?;
366 let join = lf
367 .clone()
368 .join_builder()
369 .with(rf.clone())
370 .how(join_type)
371 .join_nulls(true);
372
373 let lf_schema = self.get_frame_schema(&mut lf)?;
374 let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
375 let joined_tbl = match quantifier {
376 SetQuantifier::ByName => join.on(lf_cols).finish(),
377 SetQuantifier::Distinct | SetQuantifier::None => {
378 let rf_schema = self.get_frame_schema(&mut rf)?;
379 let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
380 if lf_cols.len() != rf_cols.len() {
381 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
382 }
383 join.left_on(lf_cols).right_on(rf_cols).finish()
384 },
385 _ => {
386 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
387 },
388 };
389 Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
390 }
391
392 fn process_union(
393 &mut self,
394 left: &SetExpr,
395 right: &SetExpr,
396 quantifier: &SetQuantifier,
397 query: &Query,
398 ) -> PolarsResult<LazyFrame> {
399 let mut lf = self.process_query(left, query)?;
400 let mut rf = self.process_query(right, query)?;
401 let opts = UnionArgs {
402 parallel: true,
403 to_supertypes: true,
404 ..Default::default()
405 };
406 match quantifier {
407 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
409 let lf_schema = self.get_frame_schema(&mut lf)?;
410 let rf_schema = self.get_frame_schema(&mut rf)?;
411 if lf_schema.len() != rf_schema.len() {
412 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
413 }
414 let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
415 match quantifier {
416 SetQuantifier::Distinct | SetQuantifier::None => {
417 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
418 },
419 _ => concatenated,
420 }
421 },
422 #[cfg(feature = "diagonal_concat")]
424 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
425 #[cfg(feature = "diagonal_concat")]
427 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
428 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
429 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
430 },
431 #[allow(unreachable_patterns)]
432 _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
433 }
434 }
435
436 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
437 let frame_rows: Vec<Row> = values.iter().map(|row| {
438 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
439 let expr = parse_sql_expr(expr, self, None)?;
440 match expr {
441 Expr::Literal(value) => {
442 value.to_any_value()
443 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
444 .map(|av| av.into_static())
445 },
446 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
447 }
448 }).collect();
449 row_data.map(Row::new)
450 }).collect::<Result<_, _>>()?;
451
452 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
453 }
454
455 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
457 match stmt {
458 Statement::Explain { statement, .. } => {
459 let lf = self.execute_statement(statement)?;
460 let plan = lf.describe_optimized_plan()?;
461 let plan = plan
462 .split('\n')
463 .collect::<Series>()
464 .with_name(PlSmallStr::from_static("Logical Plan"))
465 .into_column();
466 let df = DataFrame::new(vec![plan])?;
467 Ok(df.lazy())
468 },
469 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
470 }
471 }
472
473 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
475 let tables = Column::new("name".into(), self.get_tables());
476 let df = DataFrame::new(vec![tables])?;
477 Ok(df.lazy())
478 }
479
480 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
482 match stmt {
483 Statement::Drop { names, .. } => {
484 names.iter().for_each(|name| {
485 self.table_map.remove(&name.to_string());
486 });
487 Ok(DataFrame::empty().lazy())
488 },
489 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
490 }
491 }
492
493 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
495 if let Statement::Delete(Delete {
496 tables,
497 from,
498 using,
499 selection,
500 returning,
501 order_by,
502 limit,
503 }) = stmt
504 {
505 if !tables.is_empty()
506 || using.is_some()
507 || returning.is_some()
508 || limit.is_some()
509 || !order_by.is_empty()
510 {
511 let error_message = match () {
512 _ if !tables.is_empty() => "DELETE expects exactly one table name",
513 _ if using.is_some() => "DELETE does not support the USING clause",
514 _ if returning.is_some() => "DELETE does not support the RETURNING clause",
515 _ if limit.is_some() => "DELETE does not support the LIMIT clause",
516 _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
517 _ => unreachable!(),
518 };
519 polars_bail!(SQLInterface: error_message);
520 }
521 let from_tables = match &from {
522 FromTable::WithFromKeyword(from) => from,
523 FromTable::WithoutKeyword(from) => from,
524 };
525 if from_tables.len() > 1 {
526 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
527 }
528 let tbl_expr = from_tables.first().unwrap();
529 if !tbl_expr.joins.is_empty() {
530 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
531 }
532 let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
533 if selection.is_none() {
534 Ok(DataFrame::empty_with_schema(
536 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
537 .unwrap()
538 .as_ref(),
539 )
540 .lazy())
541 } else {
542 Ok(self.process_where(lf.clone(), selection, true)?)
544 }
545 } else {
546 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
547 }
548 }
549
550 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
552 if let Statement::Truncate {
553 table_names,
554 partitions,
555 ..
556 } = stmt
557 {
558 match partitions {
559 None => {
560 if table_names.len() != 1 {
561 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
562 }
563 let tbl = table_names[0].to_string();
564 if let Some(lf) = self.table_map.get_mut(&tbl) {
565 *lf = DataFrame::empty_with_schema(
566 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
567 .unwrap()
568 .as_ref(),
569 )
570 .lazy();
571 Ok(lf.clone())
572 } else {
573 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
574 }
575 },
576 _ => {
577 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
578 },
579 }
580 } else {
581 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
582 }
583 }
584
585 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
586 self.cte_map.insert(name.to_owned(), lf);
587 }
588
589 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
590 if let Some(with) = &query.with {
591 if with.recursive {
592 polars_bail!(SQLInterface: "recursive CTEs are not supported")
593 }
594 for cte in &with.cte_tables {
595 let cte_name = cte.alias.name.value.clone();
596 let mut lf = self.execute_query(&cte.query)?;
597 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
598 self.register_cte(&cte_name, lf);
599 }
600 }
601 Ok(())
602 }
603
604 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
606 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
607 if !tbl_expr.joins.is_empty() {
608 for join in &tbl_expr.joins {
609 let (r_name, mut rf) = self.get_table(&join.relation)?;
610 if r_name.is_empty() {
611 polars_bail!(
613 SQLInterface:
614 "cannot join on unnamed relation; please provide an alias"
615 )
616 }
617 let left_schema = self.get_frame_schema(&mut lf)?;
618 let right_schema = self.get_frame_schema(&mut rf)?;
619
620 lf = match &join.join_operator {
621 op @ (JoinOperator::FullOuter(constraint)
622 | JoinOperator::LeftOuter(constraint)
623 | JoinOperator::RightOuter(constraint)
624 | JoinOperator::Inner(constraint)
625 | JoinOperator::Anti(constraint)
626 | JoinOperator::Semi(constraint)
627 | JoinOperator::LeftAnti(constraint)
628 | JoinOperator::LeftSemi(constraint)
629 | JoinOperator::RightAnti(constraint)
630 | JoinOperator::RightSemi(constraint)) => {
631 let (lf, rf) = match op {
632 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
633 _ => (lf, rf),
634 };
635 self.process_join(
636 &TableInfo {
637 frame: lf,
638 name: (&l_name).into(),
639 schema: left_schema.clone(),
640 },
641 &TableInfo {
642 frame: rf,
643 name: (&r_name).into(),
644 schema: right_schema.clone(),
645 },
646 constraint,
647 match op {
648 JoinOperator::FullOuter(_) => JoinType::Full,
649 JoinOperator::LeftOuter(_) => JoinType::Left,
650 JoinOperator::RightOuter(_) => JoinType::Right,
651 JoinOperator::Inner(_) => JoinType::Inner,
652 #[cfg(feature = "semi_anti_join")]
653 JoinOperator::Anti(_)
654 | JoinOperator::LeftAnti(_)
655 | JoinOperator::RightAnti(_) => JoinType::Anti,
656 #[cfg(feature = "semi_anti_join")]
657 JoinOperator::Semi(_)
658 | JoinOperator::LeftSemi(_)
659 | JoinOperator::RightSemi(_) => JoinType::Semi,
660 join_type => polars_bail!(
661 SQLInterface:
662 "join type '{:?}' not currently supported",
663 join_type
664 ),
665 },
666 )?
667 },
668 JoinOperator::CrossJoin => {
669 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
670 },
671 join_type => {
672 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
673 },
674 };
675
676 let joined_schema = self.get_frame_schema(&mut lf)?;
678
679 self.joined_aliases.insert(
680 r_name.clone(),
681 right_schema
682 .iter_names()
683 .filter_map(|name| {
684 let aliased_name = format!("{name}:{r_name}");
686 if left_schema.contains(name)
687 && joined_schema.contains(aliased_name.as_str())
688 {
689 Some((name.to_string(), aliased_name))
690 } else {
691 None
692 }
693 })
694 .collect::<PlHashMap<String, String>>(),
695 );
696 }
697 };
698 Ok(lf)
699 }
700
701 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
703 let mut lf = if select_stmt.from.is_empty() {
704 DataFrame::empty().lazy()
705 } else {
706 let from = select_stmt.clone().from;
709 if from.len() > 1 {
710 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
711 }
712 self.execute_from_statement(from.first().unwrap())?
713 };
714
715 let schema = self.get_frame_schema(&mut lf)?;
717 lf = self.process_where(lf, &select_stmt.selection, false)?;
718
719 let mut select_modifiers = SelectModifiers {
721 ilike: None,
722 exclude: PlHashSet::new(),
723 rename: PlHashMap::new(),
724 replace: vec![],
725 };
726
727 let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
728
729 let mut group_by_keys: Vec<Expr> = Vec::new();
731 match &select_stmt.group_by {
732 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
734 if !modifiers.is_empty() {
735 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
736 }
737 group_by_keys = group_by_exprs
739 .iter()
740 .map(|e| {
741 self.expr_or_ordinal(
742 e,
743 &projections,
744 None,
745 Some(schema.deref()),
746 "GROUP BY",
747 )
748 })
749 .collect::<PolarsResult<_>>()?
750 },
751 GroupByExpr::All(modifiers) => {
754 if !modifiers.is_empty() {
755 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
756 }
757 projections.iter().for_each(|expr| match expr {
758 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
760 Expr::Column(_) => group_by_keys.push(expr.clone()),
761 Expr::Alias(e, _)
762 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
763 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
764 if let Expr::Column(name) = &**e {
765 group_by_keys.push(col(name.clone()));
766 }
767 },
768 _ => {
769 if !has_expr(expr, |e| {
771 matches!(e, Expr::Agg(_))
772 || matches!(e, Expr::Len)
773 || matches!(e, Expr::Window { .. })
774 }) {
775 group_by_keys.push(expr.clone())
776 }
777 },
778 });
779 },
780 };
781
782 lf = if group_by_keys.is_empty() {
783 if select_stmt.having.is_some() {
785 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
786 };
787
788 let mut retained_cols = Vec::with_capacity(projections.len());
790 let mut retained_names = Vec::with_capacity(projections.len());
791 let have_order_by = query.order_by.is_some();
792 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
794
795 for p in projections.iter() {
800 let name = p.to_field(schema.deref())?.name.to_string();
801 if select_modifiers.matches_ilike(&name)
802 && !select_modifiers.exclude.contains(&name)
803 {
804 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
805
806 retained_cols.push(if have_order_by {
807 col(name.as_str())
808 } else {
809 p.clone()
810 });
811 retained_names.push(col(name));
812 }
813 }
814
815 if have_order_by {
817 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
822 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
823 {
824 lf = lf.with_columns(projections);
825 } else {
826 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
836 lf = lf
837 .clone()
838 .select(projections)
839 .with_row_index(NAME, None)
840 .join(
841 lf.with_row_index(NAME, None),
842 [col(NAME)],
843 [col(NAME)],
844 JoinArgs {
845 how: JoinType::Left,
846 validation: Default::default(),
847 suffix: None,
848 slice: None,
849 nulls_equal: false,
850 coalesce: Default::default(),
851 maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
852 },
853 );
854 }
855 }
856
857 if !select_modifiers.replace.is_empty() {
858 lf = lf.with_columns(&select_modifiers.replace);
859 }
860 if !select_modifiers.rename.is_empty() {
861 lf = lf.with_columns(select_modifiers.renamed_cols());
862 }
863
864 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
865
866 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
868 && !have_order_by
869 {
870 lf = lf.with_columns(retained_cols).select(retained_names);
872 } else {
873 lf = lf.select(retained_cols);
874 }
875
876 if !select_modifiers.rename.is_empty() {
877 lf = lf.rename(
878 select_modifiers.rename.keys(),
879 select_modifiers.rename.values(),
880 true,
881 );
882 };
883 lf
884 } else {
885 lf = self.process_group_by(lf, &group_by_keys, &projections)?;
886 lf = self.process_order_by(lf, &query.order_by, None)?;
887
888 let schema = Some(self.get_frame_schema(&mut lf)?);
890 match select_stmt.having.as_ref() {
891 Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
892 None => lf,
893 }
894 };
895
896 lf = match &select_stmt.distinct {
898 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
899 Some(Distinct::On(exprs)) => {
900 let schema = Some(self.get_frame_schema(&mut lf)?);
902 let cols = exprs
903 .iter()
904 .map(|e| {
905 let expr = parse_sql_expr(e, self, schema.as_deref())?;
906 if let Expr::Column(name) = expr {
907 Ok(name)
908 } else {
909 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
910 }
911 })
912 .collect::<PolarsResult<Vec<_>>>()?;
913
914 lf = self.process_order_by(lf, &query.order_by, None)?;
916 return Ok(lf.unique_stable(
917 Some(Selector::ByName {
918 names: cols.into(),
919 strict: true,
920 }),
921 UniqueKeepStrategy::First,
922 ));
923 },
924 None => lf,
925 };
926 Ok(lf)
927 }
928
929 fn column_projections(
930 &mut self,
931 select_stmt: &Select,
932 schema: &SchemaRef,
933 select_modifiers: &mut SelectModifiers,
934 ) -> PolarsResult<Vec<Expr>> {
935 let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
936 .projection
937 .iter()
938 .map(|select_item| match select_item {
939 SelectItem::UnnamedExpr(expr) => {
940 Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
941 },
942 SelectItem::ExprWithAlias { expr, alias } => {
943 let expr = parse_sql_expr(expr, self, Some(schema))?;
944 Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
945 },
946 SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
947 .process_qualified_wildcard(
948 obj_name,
949 wildcard_options,
950 select_modifiers,
951 Some(schema),
952 ),
953 SelectItem::Wildcard(wildcard_options) => {
954 let cols = schema
955 .iter_names()
956 .map(|name| col(name.clone()))
957 .collect::<Vec<_>>();
958
959 self.process_wildcard_additional_options(
960 cols,
961 wildcard_options,
962 select_modifiers,
963 Some(schema),
964 )
965 },
966 })
967 .collect();
968
969 let flattened_exprs: Vec<Expr> = parsed_items?
970 .into_iter()
971 .flatten()
972 .flat_map(|expr| expand_exprs(expr, schema))
973 .collect();
974
975 Ok(flattened_exprs)
976 }
977
978 fn process_where(
979 &mut self,
980 mut lf: LazyFrame,
981 expr: &Option<SQLExpr>,
982 invert_filter: bool,
983 ) -> PolarsResult<LazyFrame> {
984 if let Some(expr) = expr {
985 let schema = self.get_frame_schema(&mut lf)?;
986
987 let (all_true, all_false) = match expr {
989 SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
990 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
991 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
992 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
993 (a != b, a == b)
994 },
995 _ => (false, false),
996 },
997 _ => (false, false),
998 };
999 if (all_true && !invert_filter) || (all_false && invert_filter) {
1000 return Ok(lf);
1001 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1002 return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1003 }
1004
1005 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1007 if filter_expression.clone().meta().has_multiple_outputs() {
1008 filter_expression = all_horizontal([filter_expression])?;
1009 }
1010 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1011 lf = if invert_filter {
1012 lf.remove(filter_expression)
1013 } else {
1014 lf.filter(filter_expression)
1015 };
1016 }
1017 Ok(lf)
1018 }
1019
1020 pub(super) fn process_join(
1021 &mut self,
1022 tbl_left: &TableInfo,
1023 tbl_right: &TableInfo,
1024 constraint: &JoinConstraint,
1025 join_type: JoinType,
1026 ) -> PolarsResult<LazyFrame> {
1027 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1028
1029 let joined = tbl_left
1030 .frame
1031 .clone()
1032 .join_builder()
1033 .with(tbl_right.frame.clone())
1034 .left_on(left_on)
1035 .right_on(right_on)
1036 .how(join_type)
1037 .suffix(format!(":{}", tbl_right.name))
1038 .coalesce(JoinCoalesce::KeepColumns)
1039 .finish();
1040
1041 Ok(joined)
1042 }
1043
1044 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1045 let mut contexts = vec![];
1046 for expr in exprs {
1047 *expr = expr.clone().map_expr(|e| match e {
1048 Expr::SubPlan(lp, names) => {
1049 contexts.push(<LazyFrame>::from((**lp).clone()));
1050 if names.len() == 1 {
1051 Expr::Column(names[0].as_str().into())
1052 } else {
1053 Expr::SubPlan(lp, names)
1054 }
1055 },
1056 e => e,
1057 })
1058 }
1059
1060 if contexts.is_empty() {
1061 lf
1062 } else {
1063 lf.with_context(contexts)
1064 }
1065 }
1066
1067 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1068 if let Statement::CreateTable(CreateTable {
1069 if_not_exists,
1070 name,
1071 query,
1072 ..
1073 }) = stmt
1074 {
1075 let tbl_name = name.0.first().unwrap().value.as_str();
1076 if *if_not_exists && self.table_map.contains_key(tbl_name) {
1078 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1079 }
1081 if let Some(query) = query {
1082 let lf = self.execute_query(query)?;
1083 self.register(tbl_name, lf);
1084 let out = df! {
1085 "Response" => ["CREATE TABLE"]
1086 }
1087 .unwrap()
1088 .lazy();
1089 Ok(out)
1090 } else {
1091 polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1092 }
1093 } else {
1094 unreachable!()
1095 }
1096 }
1097
1098 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1099 match relation {
1100 TableFactor::Table {
1101 name, alias, args, ..
1102 } => {
1103 if let Some(args) = args {
1104 return self.execute_table_function(name, alias, &args.args);
1105 }
1106 let tbl_name = name.0.first().unwrap().value.as_str();
1107 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1108 match alias {
1109 Some(alias) => {
1110 self.table_aliases
1111 .insert(alias.name.value.clone(), tbl_name.to_string());
1112 Ok((alias.to_string(), lf))
1113 },
1114 None => Ok((tbl_name.to_string(), lf)),
1115 }
1116 } else {
1117 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1118 }
1119 },
1120 TableFactor::Derived {
1121 lateral,
1122 subquery,
1123 alias,
1124 } => {
1125 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1126 if let Some(alias) = alias {
1127 let mut lf = self.execute_query_no_ctes(subquery)?;
1128 lf = self.rename_columns_from_table_alias(lf, alias)?;
1129 self.table_map.insert(alias.name.value.clone(), lf.clone());
1130 Ok((alias.name.value.clone(), lf))
1131 } else {
1132 polars_bail!(SQLSyntax: "derived tables must have aliases");
1133 }
1134 },
1135 TableFactor::UNNEST {
1136 alias,
1137 array_exprs,
1138 with_offset,
1139 with_offset_alias: _,
1140 ..
1141 } => {
1142 if let Some(alias) = alias {
1143 let column_names: Vec<Option<PlSmallStr>> = alias
1144 .columns
1145 .iter()
1146 .map(|c| {
1147 if c.name.value.is_empty() {
1148 None
1149 } else {
1150 Some(PlSmallStr::from_str(c.name.value.as_str()))
1151 }
1152 })
1153 .collect();
1154
1155 let column_values: Vec<Series> = array_exprs
1156 .iter()
1157 .map(|arr| parse_sql_array(arr, self))
1158 .collect::<Result<_, _>>()?;
1159
1160 polars_ensure!(!column_names.is_empty(),
1161 SQLSyntax:
1162 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1163 );
1164 if column_names.len() != column_values.len() {
1165 let plural = if column_values.len() > 1 { "s" } else { "" };
1166 polars_bail!(
1167 SQLSyntax:
1168 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1169 );
1170 }
1171 let column_series: Vec<Column> = column_values
1172 .into_iter()
1173 .zip(column_names)
1174 .map(|(s, name)| {
1175 if let Some(name) = name {
1176 s.with_name(name)
1177 } else {
1178 s
1179 }
1180 })
1181 .map(Column::from)
1182 .collect();
1183
1184 let lf = DataFrame::new(column_series)?.lazy();
1185
1186 if *with_offset {
1187 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1190 }
1191 let table_name = alias.name.value.clone();
1192 self.table_map.insert(table_name.clone(), lf.clone());
1193 Ok((table_name, lf))
1194 } else {
1195 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1196 }
1197 },
1198 TableFactor::NestedJoin {
1199 table_with_joins,
1200 alias,
1201 } => {
1202 let lf = self.execute_from_statement(table_with_joins)?;
1203 match alias {
1204 Some(a) => Ok((a.name.value.clone(), lf)),
1205 None => Ok(("".to_string(), lf)),
1206 }
1207 },
1208 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1210 }
1211 }
1212
1213 fn execute_table_function(
1214 &mut self,
1215 name: &ObjectName,
1216 alias: &Option<TableAlias>,
1217 args: &[FunctionArg],
1218 ) -> PolarsResult<(String, LazyFrame)> {
1219 let tbl_fn = name.0.first().unwrap().value.as_str();
1220 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1221 let (tbl_name, lf) = read_fn.execute(args)?;
1222 #[allow(clippy::useless_asref)]
1223 let tbl_name = alias
1224 .as_ref()
1225 .map(|a| a.name.value.clone())
1226 .unwrap_or_else(|| tbl_name.to_str().to_string());
1227
1228 self.table_map.insert(tbl_name.clone(), lf.clone());
1229 Ok((tbl_name, lf))
1230 }
1231
1232 fn process_order_by(
1233 &mut self,
1234 mut lf: LazyFrame,
1235 order_by: &Option<OrderBy>,
1236 selected: Option<&[Expr]>,
1237 ) -> PolarsResult<LazyFrame> {
1238 if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1239 return Ok(lf);
1240 }
1241 let schema = self.get_frame_schema(&mut lf)?;
1242 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1243
1244 let order_by = order_by.as_ref().unwrap().exprs.clone();
1245 let mut descending = Vec::with_capacity(order_by.len());
1246 let mut nulls_last = Vec::with_capacity(order_by.len());
1247 let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1248
1249 if order_by.len() == 1 && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1251 {
1252 if let Some(selected) = selected {
1253 by.extend(selected.iter().cloned());
1254 } else {
1255 by.extend(columns_iter);
1256 };
1257 let desc_order = !order_by[0].asc.unwrap_or(true);
1258 nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1259 descending.resize(by.len(), desc_order);
1260 } else {
1261 let columns = &columns_iter.collect::<Vec<_>>();
1262 for ob in order_by {
1263 let desc_order = !ob.asc.unwrap_or(true);
1266 nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1267 descending.push(desc_order);
1268
1269 by.push(self.expr_or_ordinal(
1271 &ob.expr,
1272 columns,
1273 selected,
1274 Some(&schema),
1275 "ORDER BY",
1276 )?)
1277 }
1278 }
1279 Ok(lf.sort_by_exprs(
1280 &by,
1281 SortMultipleOptions::default()
1282 .with_order_descending_multi(descending)
1283 .with_nulls_last_multi(nulls_last)
1284 .with_maintain_order(true),
1285 ))
1286 }
1287
1288 fn process_group_by(
1289 &mut self,
1290 mut lf: LazyFrame,
1291 group_by_keys: &[Expr],
1292 projections: &[Expr],
1293 ) -> PolarsResult<LazyFrame> {
1294 let schema_before = self.get_frame_schema(&mut lf)?;
1295 let group_by_keys_schema = expressions_to_schema(group_by_keys, &schema_before)?;
1296
1297 let mut aggregation_projection = Vec::with_capacity(projections.len());
1299 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1300 let mut projection_aliases = PlHashSet::new();
1301 let mut group_key_aliases = PlHashSet::new();
1302
1303 for mut e in projections {
1304 let is_non_group_key_expr = has_expr(e, |e| {
1306 match e {
1307 Expr::Agg(_) | Expr::Len | Expr::Window { .. } => true,
1308 Expr::Function { function: func, .. }
1309 if !matches!(func, FunctionExpr::StructExpr(_)) =>
1310 {
1311 has_expr(e, |e| match e {
1314 Expr::Column(name) => !group_by_keys_schema.contains(name),
1315 _ => false,
1316 })
1317 },
1318 _ => false,
1319 }
1320 });
1321
1322 if let Expr::Alias(expr, alias) = e {
1324 if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1325 group_key_aliases.insert(alias.as_ref());
1326 e = expr
1327 } else if let Expr::Function {
1328 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1329 ..
1330 } = expr.deref()
1331 {
1332 projection_overrides
1333 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1334 } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
1335 projection_aliases.insert(alias.as_ref());
1336 }
1337 }
1338 let field = e.to_field(&schema_before)?;
1339 if group_by_keys_schema.get(&field.name).is_none() && is_non_group_key_expr {
1340 let mut e = e.clone();
1341 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1342 e = (**expr).clone();
1343 } else if let Expr::Alias(expr, name) = &e {
1344 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1345 e = (**expr).clone().alias(name.clone());
1346 }
1347 }
1348 aggregation_projection.push(e);
1349 } else if let Expr::Column(_)
1350 | Expr::Function {
1351 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1352 ..
1353 } = e
1354 {
1355 if !group_by_keys_schema.contains(&field.name) {
1357 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1358 }
1359 }
1360 }
1361 let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1362 let projection_schema = expressions_to_schema(projections, &schema_before)?;
1363
1364 let final_projection = projection_schema
1366 .iter_names()
1367 .zip(projections)
1368 .map(|(name, projection_expr)| {
1369 if let Some(expr) = projection_overrides.get(name.as_str()) {
1370 expr.clone()
1371 } else if group_by_keys_schema.get(name).is_some()
1372 || projection_aliases.contains(name.as_str())
1373 || group_key_aliases.contains(name.as_str())
1374 {
1375 projection_expr.clone()
1376 } else {
1377 col(name.clone())
1378 }
1379 })
1380 .collect::<Vec<_>>();
1381
1382 Ok(aggregated.select(&final_projection))
1383 }
1384
1385 fn process_limit_offset(
1386 &self,
1387 lf: LazyFrame,
1388 limit: &Option<SQLExpr>,
1389 offset: &Option<Offset>,
1390 ) -> PolarsResult<LazyFrame> {
1391 match (offset, limit) {
1392 (
1393 Some(Offset {
1394 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1395 ..
1396 }),
1397 Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1398 ) => Ok(lf.slice(
1399 offset
1400 .parse()
1401 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1402 limit
1403 .parse()
1404 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1405 )),
1406 (
1407 Some(Offset {
1408 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1409 ..
1410 }),
1411 None,
1412 ) => Ok(lf.slice(
1413 offset
1414 .parse()
1415 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1416 IdxSize::MAX,
1417 )),
1418 (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1419 limit
1420 .parse()
1421 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1422 )),
1423 (None, None) => Ok(lf),
1424 _ => polars_bail!(
1425 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1426 ),
1427 }
1428 }
1429
1430 fn process_qualified_wildcard(
1431 &mut self,
1432 ObjectName(idents): &ObjectName,
1433 options: &WildcardAdditionalOptions,
1434 modifiers: &mut SelectModifiers,
1435 schema: Option<&Schema>,
1436 ) -> PolarsResult<Vec<Expr>> {
1437 let mut new_idents = idents.clone();
1438 new_idents.push(Ident::new("*"));
1439
1440 let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1441 self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1442 }
1443
1444 fn process_wildcard_additional_options(
1445 &mut self,
1446 exprs: Vec<Expr>,
1447 options: &WildcardAdditionalOptions,
1448 modifiers: &mut SelectModifiers,
1449 schema: Option<&Schema>,
1450 ) -> PolarsResult<Vec<Expr>> {
1451 if options.opt_except.is_some() && options.opt_exclude.is_some() {
1452 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1453 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1454 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1455 }
1456
1457 if let Some(items) = &options.opt_exclude {
1459 match items {
1460 ExcludeSelectItem::Single(ident) => {
1461 modifiers.exclude.insert(ident.value.clone());
1462 },
1463 ExcludeSelectItem::Multiple(idents) => {
1464 modifiers
1465 .exclude
1466 .extend(idents.iter().map(|i| i.value.clone()));
1467 },
1468 };
1469 }
1470
1471 if let Some(items) = &options.opt_except {
1473 modifiers.exclude.insert(items.first_element.value.clone());
1474 modifiers
1475 .exclude
1476 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1477 }
1478
1479 if let Some(item) = &options.opt_ilike {
1481 let rx = regex::escape(item.pattern.as_str())
1482 .replace('%', ".*")
1483 .replace('_', ".");
1484
1485 modifiers.ilike = Some(
1486 polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1487 );
1488 }
1489
1490 if let Some(items) = &options.opt_rename {
1492 let renames = match items {
1493 RenameSelectItem::Single(rename) => vec![rename],
1494 RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1495 };
1496 for rn in renames {
1497 let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1498 let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1499 if before != after {
1500 modifiers.rename.insert(before, after);
1501 }
1502 }
1503 }
1504
1505 if let Some(replacements) = &options.opt_replace {
1507 for rp in &replacements.items {
1508 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1509 modifiers
1510 .replace
1511 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1512 }
1513 }
1514 Ok(exprs)
1515 }
1516
1517 fn rename_columns_from_table_alias(
1518 &mut self,
1519 mut lf: LazyFrame,
1520 alias: &TableAlias,
1521 ) -> PolarsResult<LazyFrame> {
1522 if alias.columns.is_empty() {
1523 Ok(lf)
1524 } else {
1525 let schema = self.get_frame_schema(&mut lf)?;
1526 if alias.columns.len() != schema.len() {
1527 polars_bail!(
1528 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1529 alias.columns.len(), alias.name.value, schema.len()
1530 )
1531 } else {
1532 let existing_columns: Vec<_> = schema.iter_names().collect();
1533 let new_columns: Vec<_> =
1534 alias.columns.iter().map(|c| c.name.value.clone()).collect();
1535 Ok(lf.rename(existing_columns, new_columns, true))
1536 }
1537 }
1538 }
1539}
1540
1541impl SQLContext {
1542 pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1544 self.table_map.clone()
1545 }
1546
1547 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1549 Self {
1550 table_map,
1551 ..Default::default()
1552 }
1553 }
1554}
1555
1556fn collect_compound_identifiers(
1557 left: &[Ident],
1558 right: &[Ident],
1559 left_name: &str,
1560 right_name: &str,
1561) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1562 if left.len() == 2 && right.len() == 2 {
1563 let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1564 let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1565
1566 if left_name == tbl_b || right_name == tbl_a {
1568 Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1569 } else {
1570 Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1571 }
1572 } else {
1573 polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1574 }
1575}
1576
1577fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1578 match expr {
1579 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1580 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1581 schema
1582 .iter_names()
1583 .filter(|name| re.is_match(name))
1584 .map(|name| col(name.clone()))
1585 .collect::<Vec<_>>()
1586 },
1587 Expr::Selector(s) => s
1588 .into_columns(schema, &Default::default())
1589 .unwrap()
1590 .into_iter()
1591 .map(col)
1592 .collect::<Vec<_>>(),
1593 _ => vec![expr],
1594 }
1595}
1596
1597fn is_regex_colname(nm: &str) -> bool {
1598 nm.starts_with('^') && nm.ends_with('$')
1599}
1600
1601fn process_join_on(
1602 expression: &sqlparser::ast::Expr,
1603 tbl_left: &TableInfo,
1604 tbl_right: &TableInfo,
1605) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1606 match expression {
1607 SQLExpr::BinaryOp { left, op, right } => match op {
1608 BinaryOperator::And => {
1609 let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1610 let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1611 left_i.append(&mut left_j);
1612 right_i.append(&mut right_j);
1613 Ok((left_i, right_i))
1614 },
1615 BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1616 (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1617 collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1618 },
1619 _ => {
1620 polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1621 },
1622 },
1623 _ => {
1624 polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1625 },
1626 },
1627 SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1628 _ => {
1629 polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1630 },
1631 }
1632}
1633
1634fn process_join_constraint(
1635 constraint: &JoinConstraint,
1636 tbl_left: &TableInfo,
1637 tbl_right: &TableInfo,
1638) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1639 match constraint {
1640 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1641 process_join_on(expr, tbl_left, tbl_right)
1642 },
1643 JoinConstraint::Using(idents) if !idents.is_empty() => {
1644 let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1645 Ok((using.clone(), using))
1646 },
1647 JoinConstraint::Natural => {
1648 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1649 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1650 let on: Vec<Expr> = left_names
1651 .intersection(&right_names)
1652 .map(|&name| col(name.clone()))
1653 .collect();
1654 if on.is_empty() {
1655 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1656 }
1657 Ok((on.clone(), on))
1658 },
1659 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1660 }
1661}
1662
1663bitflags::bitflags! {
1664 #[derive(PartialEq)]
1669 struct ExprSqlProjectionHeightBehavior: u8 {
1670 const MaintainsColumn = 1 << 0;
1672 const Independent = 1 << 1;
1676 const InheritsContext = 1 << 2;
1679 }
1680}
1681
1682impl ExprSqlProjectionHeightBehavior {
1683 fn identify_from_expr(expr: &Expr) -> Self {
1684 let mut has_column = false;
1685 let mut has_independent = false;
1686
1687 for e in expr.into_iter() {
1688 use Expr::*;
1689
1690 has_column |= matches!(e, Column(_) | Selector(_));
1691
1692 has_independent |= match e {
1693 AnonymousFunction { options, .. } => {
1695 options.returns_scalar() || !options.is_length_preserving()
1696 },
1697
1698 Literal(v) => !v.is_scalar(),
1699
1700 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1701
1702 Agg { .. } | Len => true,
1703
1704 _ => false,
1705 }
1706 }
1707
1708 if has_independent {
1709 Self::Independent
1710 } else if has_column {
1711 Self::MaintainsColumn
1712 } else {
1713 Self::InheritsContext
1714 }
1715 }
1716}