1use std::ops::{ControlFlow, Deref};
2
3use polars_core::frame::row::Row;
4use polars_core::prelude::*;
5use polars_lazy::prelude::*;
6use polars_ops::frame::JoinCoalesce;
7use polars_plan::dsl::function_expr::StructFunction;
8use polars_plan::prelude::*;
9use polars_utils::format_pl_smallstr;
10use sqlparser::ast::{
11 BinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct, ExcludeSelectItem,
12 Expr as SQLExpr, FromTable, FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator,
13 LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName, ObjectNamePart, ObjectType,
14 OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectFlavor, SelectItem,
15 SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
16 TableFactor, TableWithJoins, Truncate, UnaryOperator, Value as SQLValue, ValueWithSpan, Values,
17 Visit, Visitor as SQLVisitor, WildcardAdditionalOptions, WindowSpec,
18};
19use sqlparser::dialect::GenericDialect;
20use sqlparser::parser::{Parser, ParserOptions};
21
22use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
23use crate::sql_expr::{
24 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
25};
26use crate::table_functions::PolarsTableFunctions;
27use crate::types::map_sql_dtype_to_polars;
28
29#[derive(Clone)]
30pub struct TableInfo {
31 pub(crate) frame: LazyFrame,
32 pub(crate) name: PlSmallStr,
33 pub(crate) schema: Arc<Schema>,
34}
35
36struct SelectModifiers {
37 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
42impl SelectModifiers {
43 fn matches_ilike(&self, s: &str) -> bool {
44 match &self.ilike {
45 Some(rx) => rx.is_match(s),
46 None => true,
47 }
48 }
49 fn renamed_cols(&self) -> Vec<Expr> {
50 self.rename
51 .iter()
52 .map(|(before, after)| col(before.clone()).alias(after.clone()))
53 .collect()
54 }
55}
56
57#[derive(Clone)]
59pub struct SQLContext {
60 pub(crate) table_map: PlHashMap<String, LazyFrame>,
61 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
62 pub(crate) lp_arena: Arena<IR>,
63 pub(crate) expr_arena: Arena<AExpr>,
64
65 cte_map: PlHashMap<String, LazyFrame>,
66 table_aliases: PlHashMap<String, String>,
67 joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
68 pub(crate) named_windows: PlHashMap<String, WindowSpec>,
69}
70
71impl Default for SQLContext {
72 fn default() -> Self {
73 Self {
74 function_registry: Arc::new(DefaultFunctionRegistry {}),
75 table_map: Default::default(),
76 cte_map: Default::default(),
77 table_aliases: Default::default(),
78 joined_aliases: Default::default(),
79 named_windows: Default::default(),
80 lp_arena: Default::default(),
81 expr_arena: Default::default(),
82 }
83 }
84}
85
86impl SQLContext {
87 pub fn new() -> Self {
95 Self::default()
96 }
97
98 pub fn get_tables(&self) -> Vec<String> {
100 let mut tables = Vec::from_iter(self.table_map.keys().cloned());
101 tables.sort_unstable();
102 tables
103 }
104
105 pub fn register(&mut self, name: &str, lf: LazyFrame) {
121 self.table_map.insert(name.to_owned(), lf);
122 }
123
124 pub fn unregister(&mut self, name: &str) {
126 self.table_map.remove(&name.to_owned());
127 }
128
129 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
148 let mut parser = Parser::new(&GenericDialect);
149 parser = parser.with_options(ParserOptions {
150 trailing_commas: true,
151 ..Default::default()
152 });
153
154 let ast = parser
155 .try_with_sql(query)
156 .map_err(to_sql_interface_err)?
157 .parse_statements()
158 .map_err(to_sql_interface_err)?;
159
160 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
161 let res = self.execute_statement(ast.first().unwrap())?;
162
163 let lp_arena = std::mem::take(&mut self.lp_arena);
166 let expr_arena = std::mem::take(&mut self.expr_arena);
167 res.set_cached_arena(lp_arena, expr_arena);
168
169 self.cte_map.clear();
171 self.table_aliases.clear();
172 self.joined_aliases.clear();
173 self.named_windows.clear();
174
175 Ok(res)
176 }
177
178 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
181 self.function_registry = function_registry;
182 self
183 }
184
185 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
187 &self.function_registry
188 }
189
190 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
192 Arc::get_mut(&mut self.function_registry).unwrap()
193 }
194}
195
196impl SQLContext {
197 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
198 let ast = stmt;
199 Ok(match ast {
200 Statement::Query(query) => self.execute_query(query)?,
201 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
202 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
203 stmt @ Statement::Drop {
204 object_type: ObjectType::Table,
205 ..
206 } => self.execute_drop_table(stmt)?,
207 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
208 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
209 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
210 _ => polars_bail!(
211 SQLInterface: "statement type is not supported:\n{:?}", ast,
212 ),
213 })
214 }
215
216 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217 self.register_ctes(query)?;
218 self.execute_query_no_ctes(query)
219 }
220
221 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
222 let lf = self.process_query(&query.body, query)?;
223 let (limit, offset) = match &query.limit_clause {
224 Some(LimitClause::LimitOffset {
225 limit,
226 offset,
227 limit_by,
228 }) => {
229 if !limit_by.is_empty() {
230 polars_bail!(SQLSyntax: "LIMIT BY clause is not supported");
232 }
233 (limit.as_ref(), offset.as_ref().map(|o| &o.value))
234 },
235 Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
236 None => (None, None),
237 };
238 self.process_limit_offset(lf, limit, offset)
239 }
240
241 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
242 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
243 }
244
245 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
246 self.table_map
250 .get(name)
251 .or_else(|| self.cte_map.get(name))
252 .or_else(|| {
253 self.table_aliases.get(name).and_then(|alias| {
254 self.table_map
255 .get(alias.as_str())
256 .or_else(|| self.cte_map.get(alias.as_str()))
257 })
258 })
259 .cloned()
260 }
261
262 pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<(LazyFrame, SchemaRef)>
266 where
267 F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
268 {
269 let (joined_aliases, table_aliases, lp_arena, expr_arena, table_map) = (
271 std::mem::take(&mut self.joined_aliases),
273 std::mem::take(&mut self.table_aliases),
274 std::mem::take(&mut self.lp_arena),
275 std::mem::take(&mut self.expr_arena),
276 self.table_map.clone(),
278 );
279
280 let mut lf = query(self)?;
282 let schema = self.get_frame_schema(&mut lf)?;
283
284 lf.set_cached_arena(
286 std::mem::replace(&mut self.lp_arena, lp_arena),
287 std::mem::replace(&mut self.expr_arena, expr_arena),
288 );
289 self.joined_aliases = joined_aliases;
290 self.table_aliases = table_aliases;
291 self.table_map = table_map;
292
293 Ok((lf, schema))
294 }
295
296 fn expr_or_ordinal(
297 &mut self,
298 e: &SQLExpr,
299 exprs: &[Expr],
300 selected: Option<&[Expr]>,
301 schema: Option<&Schema>,
302 clause: &str,
303 ) -> PolarsResult<Expr> {
304 match e {
305 SQLExpr::UnaryOp {
306 op: UnaryOperator::Minus,
307 expr,
308 } if matches!(
309 **expr,
310 SQLExpr::Value(ValueWithSpan {
311 value: SQLValue::Number(_, _),
312 ..
313 })
314 ) =>
315 {
316 if let SQLExpr::Value(ValueWithSpan {
317 value: SQLValue::Number(ref idx, _),
318 ..
319 }) = **expr
320 {
321 Err(polars_err!(
322 SQLSyntax:
323 "negative ordinal values are invalid for {}; found -{}",
324 clause,
325 idx
326 ))
327 } else {
328 unreachable!()
329 }
330 },
331 SQLExpr::Value(ValueWithSpan {
332 value: SQLValue::Number(idx, _),
333 ..
334 }) => {
335 let idx = idx.parse::<usize>().map_err(|_| {
337 polars_err!(
338 SQLSyntax:
339 "negative ordinal values are invalid for {}; found {}",
340 clause,
341 idx
342 )
343 })?;
344 let cols = if let Some(cols) = selected {
347 cols
348 } else {
349 exprs
350 };
351 Ok(cols
352 .get(idx - 1)
353 .ok_or_else(|| {
354 polars_err!(
355 SQLInterface:
356 "{} ordinal value must refer to a valid column; found {}",
357 clause,
358 idx
359 )
360 })?
361 .clone())
362 },
363 SQLExpr::Value(v) => Err(polars_err!(
364 SQLSyntax:
365 "{} requires a valid expression or positive ordinal; found {}", clause, v,
366 )),
367 _ => {
368 let mut expr = parse_sql_expr(e, self, schema)?;
371 if matches!(e, SQLExpr::CompoundIdentifier(_)) {
372 if let Some(schema) = schema {
373 expr = expr.map_expr(|ex| match &ex {
374 Expr::Column(name) => {
375 let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
376 if schema.contains(prefixed.as_str()) {
377 col(prefixed)
378 } else {
379 ex
380 }
381 },
382 _ => ex,
383 });
384 }
385 }
386 Ok(expr)
387 },
388 }
389 }
390
391 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
392 if let Some(aliases) = self.joined_aliases.get(tbl_name) {
393 if let Some(name) = aliases.get(column_name) {
394 return name.to_string();
395 }
396 }
397 column_name.to_string()
398 }
399
400 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
401 match expr {
402 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
403 SetExpr::Query(query) => self.execute_query_no_ctes(query),
404 SetExpr::SetOperation {
405 op: SetOperator::Union,
406 set_quantifier,
407 left,
408 right,
409 } => self.process_union(left, right, set_quantifier, query),
410
411 #[cfg(feature = "semi_anti_join")]
412 SetExpr::SetOperation {
413 op: SetOperator::Intersect | SetOperator::Except,
414 set_quantifier,
415 left,
416 right,
417 } => self.process_except_intersect(left, right, set_quantifier, query),
418
419 SetExpr::Values(Values {
420 explicit_row: _,
421 rows,
422 value_keyword: _,
423 }) => self.process_values(rows),
424
425 SetExpr::Table(tbl) => {
426 if tbl.table_name.is_some() {
427 let table_name = tbl.table_name.as_ref().unwrap();
428 self.get_table_from_current_scope(table_name)
429 .ok_or_else(|| {
430 polars_err!(
431 SQLInterface: "no table or alias named '{}' found",
432 tbl
433 )
434 })
435 } else {
436 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
437 }
438 },
439 op => {
440 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
441 },
442 }
443 }
444
445 #[cfg(feature = "semi_anti_join")]
446 fn process_except_intersect(
447 &mut self,
448 left: &SetExpr,
449 right: &SetExpr,
450 quantifier: &SetQuantifier,
451 query: &Query,
452 ) -> PolarsResult<LazyFrame> {
453 let (join_type, op_name) = match *query.body {
454 SetExpr::SetOperation {
455 op: SetOperator::Except,
456 ..
457 } => (JoinType::Anti, "EXCEPT"),
458 _ => (JoinType::Semi, "INTERSECT"),
459 };
460 let mut lf = self.process_query(left, query)?;
461 let mut rf = self.process_query(right, query)?;
462 let lf_schema = self.get_frame_schema(&mut lf)?;
463
464 let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
465 let rf_cols = match quantifier {
466 SetQuantifier::ByName => None,
467 SetQuantifier::Distinct | SetQuantifier::None => {
468 let rf_schema = self.get_frame_schema(&mut rf)?;
469 let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
470 if lf_cols.len() != rf_cols.len() {
471 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
472 }
473 Some(rf_cols)
474 },
475 _ => {
476 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
477 },
478 };
479 let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
480 let joined_tbl = match rf_cols {
481 Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
482 None => join.on(lf_cols).finish(),
483 };
484 Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
485 }
486
487 fn process_union(
488 &mut self,
489 left: &SetExpr,
490 right: &SetExpr,
491 quantifier: &SetQuantifier,
492 query: &Query,
493 ) -> PolarsResult<LazyFrame> {
494 let quantifier = *quantifier;
495 let lf = self.process_query(left, query)?;
496 let rf = self.process_query(right, query)?;
497
498 let cb = PlanCallback::new(
499 move |(mut plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
500 let mut rf = LazyFrame::from(plans.pop().unwrap());
501 let lf = LazyFrame::from(plans.pop().unwrap());
502
503 let opts = UnionArgs {
504 parallel: true,
505 to_supertypes: true,
506 ..Default::default()
507 };
508 let out = match quantifier {
509 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
511 let lf_schema = &schemas[0];
512 let rf_schema = &schemas[1];
513 if lf_schema.len() != rf_schema.len() {
514 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
515 }
516 if lf_schema.iter_names().ne(rf_schema.iter_names()) {
519 rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
520 }
521 let concatenated = concat(vec![lf, rf], opts);
522 match quantifier {
523 SetQuantifier::Distinct | SetQuantifier::None => {
524 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
525 },
526 _ => concatenated,
527 }
528 },
529 #[cfg(feature = "diagonal_concat")]
531 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
532 #[cfg(feature = "diagonal_concat")]
534 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
535 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
536 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
537 },
538 #[allow(unreachable_patterns)]
539 _ => {
540 polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
541 },
542 };
543
544 out.map(|lf| lf.logical_plan)
545 },
546 );
547
548 Ok(lf.pipe_with_schemas(vec![rf], cb))
549 }
550
551 fn process_unnest_lateral(
554 &self,
555 lf: LazyFrame,
556 alias: &Option<TableAlias>,
557 array_exprs: &[SQLExpr],
558 with_offset: bool,
559 ) -> PolarsResult<LazyFrame> {
560 let alias = alias
561 .as_ref()
562 .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
563 polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
564
565 let (mut explode_cols, mut rename_from, mut rename_to) = (
566 Vec::with_capacity(array_exprs.len()),
567 Vec::with_capacity(array_exprs.len()),
568 Vec::with_capacity(array_exprs.len()),
569 );
570 let is_single_col = array_exprs.len() == 1;
571
572 for (i, arr_expr) in array_exprs.iter().enumerate() {
573 let col_name = match arr_expr {
574 SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
575 SQLExpr::CompoundIdentifier(parts) => {
576 PlSmallStr::from_str(&parts.last().unwrap().value)
577 },
578 SQLExpr::Array(_) => polars_bail!(
579 SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
580 ),
581 other => polars_bail!(
582 SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
583 ),
584 };
585 if let Some(name) = alias
587 .columns
588 .get(i)
589 .map(|c| c.name.value.as_str())
590 .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
591 .filter(|name| !name.is_empty() && *name != col_name.as_str())
592 {
593 rename_from.push(col_name.clone());
594 rename_to.push(PlSmallStr::from_str(name));
595 }
596 explode_cols.push(col_name);
597 }
598
599 let mut lf = lf.explode(
600 Selector::ByName {
601 names: Arc::from(explode_cols),
602 strict: true,
603 },
604 ExplodeOptions {
605 empty_as_null: true,
606 keep_nulls: true,
607 },
608 );
609 if !rename_from.is_empty() {
610 lf = lf.rename(rename_from, rename_to, true);
611 }
612 Ok(lf)
613 }
614
615 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
616 let frame_rows: Vec<Row> = values.iter().map(|row| {
617 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
618 let expr = parse_sql_expr(expr, self, None)?;
619 match expr {
620 Expr::Literal(value) => {
621 value.to_any_value()
622 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
623 .map(|av| av.into_static())
624 },
625 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
626 }
627 }).collect();
628 row_data.map(Row::new)
629 }).collect::<Result<_, _>>()?;
630
631 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
632 }
633
634 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
636 match stmt {
637 Statement::Explain { statement, .. } => {
638 let lf = self.execute_statement(statement)?;
639 let plan = lf.describe_optimized_plan()?;
640 let plan = plan
641 .split('\n')
642 .collect::<Series>()
643 .with_name(PlSmallStr::from_static("Logical Plan"))
644 .into_column();
645 let df = DataFrame::new(vec![plan])?;
646 Ok(df.lazy())
647 },
648 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
649 }
650 }
651
652 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
654 let tables = Column::new("name".into(), self.get_tables());
655 let df = DataFrame::new(vec![tables])?;
656 Ok(df.lazy())
657 }
658
659 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
661 match stmt {
662 Statement::Drop { names, .. } => {
663 names.iter().for_each(|name| {
664 self.table_map.remove(&name.to_string());
665 });
666 Ok(DataFrame::empty().lazy())
667 },
668 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
669 }
670 }
671
672 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
674 if let Statement::Delete(Delete {
675 tables,
676 from,
677 using,
678 selection,
679 returning,
680 order_by,
681 limit,
682 delete_token: _,
683 }) = stmt
684 {
685 if !tables.is_empty()
686 || using.is_some()
687 || returning.is_some()
688 || limit.is_some()
689 || !order_by.is_empty()
690 {
691 let error_message = match () {
692 _ if !tables.is_empty() => "DELETE expects exactly one table name",
693 _ if using.is_some() => "DELETE does not support the USING clause",
694 _ if returning.is_some() => "DELETE does not support the RETURNING clause",
695 _ if limit.is_some() => "DELETE does not support the LIMIT clause",
696 _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
697 _ => unreachable!(),
698 };
699 polars_bail!(SQLInterface: error_message);
700 }
701 let from_tables = match &from {
702 FromTable::WithFromKeyword(from) => from,
703 FromTable::WithoutKeyword(from) => from,
704 };
705 if from_tables.len() > 1 {
706 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
707 }
708 let tbl_expr = from_tables.first().unwrap();
709 if !tbl_expr.joins.is_empty() {
710 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
711 }
712 let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
713 if selection.is_none() {
714 Ok(DataFrame::empty_with_schema(
716 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
717 .unwrap()
718 .as_ref(),
719 )
720 .lazy())
721 } else {
722 Ok(self.process_where(lf.clone(), selection, true, None)?)
724 }
725 } else {
726 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
727 }
728 }
729
730 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
732 if let Statement::Truncate(Truncate {
733 table_names,
734 partitions,
735 ..
736 }) = stmt
737 {
738 match partitions {
739 None => {
740 if table_names.len() != 1 {
741 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
742 }
743 let tbl = table_names[0].name.to_string();
744 if let Some(lf) = self.table_map.get_mut(&tbl) {
745 *lf = DataFrame::empty_with_schema(
746 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
747 .unwrap()
748 .as_ref(),
749 )
750 .lazy();
751 Ok(lf.clone())
752 } else {
753 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
754 }
755 },
756 _ => {
757 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
758 },
759 }
760 } else {
761 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
762 }
763 }
764
765 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
766 self.cte_map.insert(name.to_owned(), lf);
767 }
768
769 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
770 if let Some(with) = &query.with {
771 if with.recursive {
772 polars_bail!(SQLInterface: "recursive CTEs are not supported")
773 }
774 for cte in &with.cte_tables {
775 let cte_name = cte.alias.name.value.clone();
776 let mut lf = self.execute_query(&cte.query)?;
777 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
778 self.register_cte(&cte_name, lf);
779 }
780 }
781 Ok(())
782 }
783
784 fn register_named_windows(
785 &mut self,
786 named_windows: &[NamedWindowDefinition],
787 ) -> PolarsResult<()> {
788 for NamedWindowDefinition(name, expr) in named_windows {
789 let spec = match expr {
790 NamedWindowExpr::NamedWindow(ref_name) => self
791 .named_windows
792 .get(&ref_name.value)
793 .ok_or_else(|| {
794 polars_err!(
795 SQLInterface:
796 "named window '{}' references undefined window '{}'",
797 name.value, ref_name.value
798 )
799 })?
800 .clone(),
801 NamedWindowExpr::WindowSpec(spec) => spec.clone(),
802 };
803 self.named_windows.insert(name.value.clone(), spec);
804 }
805 Ok(())
806 }
807
808 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
810 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
811 if !tbl_expr.joins.is_empty() {
812 for join in &tbl_expr.joins {
813 if let (
815 JoinOperator::CrossJoin(JoinConstraint::None),
816 TableFactor::UNNEST {
817 alias,
818 array_exprs,
819 with_offset,
820 ..
821 },
822 ) = (&join.join_operator, &join.relation)
823 {
824 if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
825 lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
826 continue;
827 }
828 }
829
830 let (r_name, mut rf) = self.get_table(&join.relation)?;
831 if r_name.is_empty() {
832 polars_bail!(
834 SQLInterface:
835 "cannot join on unnamed relation; please provide an alias"
836 )
837 }
838 let left_schema = self.get_frame_schema(&mut lf)?;
839 let right_schema = self.get_frame_schema(&mut rf)?;
840
841 lf = match &join.join_operator {
842 op @ (JoinOperator::Join(constraint) | JoinOperator::FullOuter(constraint)
844 | JoinOperator::Left(constraint)
845 | JoinOperator::LeftOuter(constraint)
846 | JoinOperator::Right(constraint)
847 | JoinOperator::RightOuter(constraint)
848 | JoinOperator::Inner(constraint)
849 | JoinOperator::Anti(constraint)
850 | JoinOperator::Semi(constraint)
851 | JoinOperator::LeftAnti(constraint)
852 | JoinOperator::LeftSemi(constraint)
853 | JoinOperator::RightAnti(constraint)
854 | JoinOperator::RightSemi(constraint)) => {
855 let (lf, rf) = match op {
856 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
857 _ => (lf, rf),
858 };
859 self.process_join(
860 &TableInfo {
861 frame: lf,
862 name: (&l_name).into(),
863 schema: left_schema.clone(),
864 },
865 &TableInfo {
866 frame: rf,
867 name: (&r_name).into(),
868 schema: right_schema.clone(),
869 },
870 constraint,
871 match op {
872 JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
873 JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
874 JoinType::Left
875 },
876 JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
877 JoinType::Right
878 },
879 JoinOperator::FullOuter(_) => JoinType::Full,
880 #[cfg(feature = "semi_anti_join")]
881 JoinOperator::Anti(_)
882 | JoinOperator::LeftAnti(_)
883 | JoinOperator::RightAnti(_) => JoinType::Anti,
884 #[cfg(feature = "semi_anti_join")]
885 JoinOperator::Semi(_)
886 | JoinOperator::LeftSemi(_)
887 | JoinOperator::RightSemi(_) => JoinType::Semi,
888 join_type => polars_bail!(
889 SQLInterface:
890 "join type '{:?}' not currently supported",
891 join_type
892 ),
893 },
894 )?
895 },
896 JoinOperator::CrossJoin(JoinConstraint::None) => {
897 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
898 },
899 JoinOperator::CrossJoin(constraint) => {
900 polars_bail!(
901 SQLInterface:
902 "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
903 constraint
904 )
905 },
906 join_type => {
907 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
908 },
909 };
910
911 let joined_schema = self.get_frame_schema(&mut lf)?;
913
914 self.joined_aliases.insert(
915 r_name.clone(),
916 right_schema
917 .iter_names()
918 .filter_map(|name| {
919 let aliased_name = format!("{name}:{r_name}");
921 if left_schema.contains(name)
922 && joined_schema.contains(aliased_name.as_str())
923 {
924 Some((name.to_string(), aliased_name))
925 } else {
926 None
927 }
928 })
929 .collect::<PlHashMap<String, String>>(),
930 );
931 }
932 };
933 Ok(lf)
934 }
935
936 fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
938 let Select {
941 distinct: _,
943 from: _,
944 group_by: _,
945 having: _,
946 named_window: _,
947 projection: _,
948 qualify: _,
949 selection: _,
950
951 select_token: _,
953 top_before_distinct: _,
954 window_before_qualify: _,
955
956 ref cluster_by,
958 ref connect_by,
959 ref distribute_by,
960 ref exclude,
961 ref flavor,
962 ref into,
963 ref lateral_views,
964 ref prewhere,
965 ref sort_by,
966 ref top,
967 ref value_table_mode,
968 } = *select_stmt;
969
970 polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
972 polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
973 polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
974 polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
975 polars_ensure!(matches!(flavor, SelectFlavor::Standard), SQLInterface: "this `SELECT` flavor is not supported");
976 polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO`clause is not supported");
977 polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
978 polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
979 polars_ensure!(sort_by.is_empty(), SQLInterface: "SORT BY` clause is not supported; use `ORDER BY` instead");
980 polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
981 polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
982
983 Ok(())
984 }
985
986 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
988 self.validate_select(select_stmt)?;
990
991 self.register_named_windows(&select_stmt.named_window)?;
993
994 let mut lf = if select_stmt.from.is_empty() {
996 DataFrame::empty().lazy()
997 } else {
998 let from = select_stmt.clone().from;
1001 if from.len() > 1 {
1002 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
1003 }
1004 self.execute_from_statement(from.first().unwrap())?
1005 };
1006
1007 let mut schema = self.get_frame_schema(&mut lf)?;
1009 lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
1010
1011 let mut select_modifiers = SelectModifiers {
1013 ilike: None,
1014 exclude: PlHashSet::new(),
1015 rename: PlHashMap::new(),
1016 replace: vec![],
1017 };
1018
1019 let window_fn_columns = if select_stmt.qualify.is_some() {
1022 select_stmt
1023 .projection
1024 .iter()
1025 .filter_map(|item| match item {
1026 SelectItem::ExprWithAlias { expr, alias }
1027 if expr_has_window_functions(expr) =>
1028 {
1029 Some(alias.value.clone())
1030 },
1031 _ => None,
1032 })
1033 .collect::<PlHashSet<_>>()
1034 } else {
1035 PlHashSet::new()
1036 };
1037
1038 let mut projections =
1039 self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1040
1041 let mut explode_names = Vec::new();
1043 let mut explode_exprs = Vec::new();
1044 let mut explode_lookup = PlHashMap::new();
1045
1046 for expr in &projections {
1047 for e in expr {
1048 if let Expr::Explode { input, .. } = e {
1049 match input.as_ref() {
1050 Expr::Column(name) => explode_names.push(name.clone()),
1051 other_expr => {
1052 if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1054 let temp_name =
1055 format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1056 explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1057 explode_lookup.insert(other_expr.clone(), temp_name.clone());
1058 explode_names.push(temp_name);
1059 }
1060 },
1061 }
1062 }
1063 }
1064 }
1065 if !explode_names.is_empty() {
1066 if !explode_exprs.is_empty() {
1067 lf = lf.with_columns(explode_exprs);
1068 }
1069 lf = lf.explode(
1070 Selector::ByName {
1071 names: Arc::from(explode_names),
1072 strict: true,
1073 },
1074 ExplodeOptions {
1075 empty_as_null: true,
1076 keep_nulls: true,
1077 },
1078 );
1079 projections = projections
1080 .into_iter()
1081 .map(|p| {
1082 p.map_expr(|e| match e {
1084 Expr::Explode { input, .. } => explode_lookup
1085 .get(input.as_ref())
1086 .map(|name| Expr::Column(name.clone()))
1087 .unwrap_or_else(|| input.as_ref().clone()),
1088 _ => e,
1089 })
1090 })
1091 .collect();
1092
1093 schema = self.get_frame_schema(&mut lf)?;
1094 }
1095
1096 let mut group_by_keys: Vec<Expr> = Vec::new();
1098 match &select_stmt.group_by {
1099 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1101 if !modifiers.is_empty() {
1102 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1103 }
1104 group_by_keys = group_by_exprs
1106 .iter()
1107 .map(|e| match e {
1108 SQLExpr::Identifier(ident) => {
1109 resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1110 || {
1111 self.expr_or_ordinal(
1112 e,
1113 &projections,
1114 None,
1115 Some(&schema),
1116 "GROUP BY",
1117 )
1118 },
1119 Ok,
1120 )
1121 },
1122 _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1123 })
1124 .collect::<PolarsResult<_>>()?
1125 },
1126 GroupByExpr::All(modifiers) => {
1129 if !modifiers.is_empty() {
1130 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1131 }
1132 projections.iter().for_each(|expr| match expr {
1133 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1135 Expr::Column(_) => group_by_keys.push(expr.clone()),
1136 Expr::Alias(e, _)
1137 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1138 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1139 if let Expr::Column(name) = &**e {
1140 group_by_keys.push(col(name.clone()));
1141 }
1142 },
1143 _ => {
1144 if !has_expr(expr, |e| {
1146 matches!(e, Expr::Agg(_))
1147 || matches!(e, Expr::Len)
1148 || matches!(e, Expr::Over { .. })
1149 || {
1150 #[cfg(feature = "dynamic_group_by")]
1151 {
1152 matches!(e, Expr::Rolling { .. })
1153 }
1154 #[cfg(not(feature = "dynamic_group_by"))]
1155 {
1156 false
1157 }
1158 }
1159 }) {
1160 group_by_keys.push(expr.clone())
1161 }
1162 },
1163 });
1164 },
1165 };
1166
1167 lf = if group_by_keys.is_empty() {
1168 if select_stmt.having.is_some() {
1170 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1171 };
1172
1173 let mut retained_cols = Vec::with_capacity(projections.len());
1175 let mut retained_names = Vec::with_capacity(projections.len());
1176 let have_order_by = query.order_by.is_some();
1177
1178 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1180
1181 for p in projections.iter() {
1186 let name = p.to_field(schema.deref())?.name.to_string();
1187 if select_modifiers.matches_ilike(&name)
1188 && !select_modifiers.exclude.contains(&name)
1189 {
1190 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1191
1192 retained_cols.push(if have_order_by {
1193 col(name.as_str())
1194 } else {
1195 p.clone()
1196 });
1197 retained_names.push(col(name));
1198 }
1199 }
1200
1201 if have_order_by {
1203 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1208 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1209 {
1210 lf = lf.with_columns(projections);
1211 } else {
1212 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1221 lf = lf
1222 .clone()
1223 .select(projections)
1224 .with_row_index(NAME, None)
1225 .join(
1226 lf.with_row_index(NAME, None),
1227 [col(NAME)],
1228 [col(NAME)],
1229 JoinArgs {
1230 how: JoinType::Left,
1231 validation: Default::default(),
1232 suffix: None,
1233 slice: None,
1234 nulls_equal: false,
1235 coalesce: Default::default(),
1236 maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
1237 },
1238 );
1239 }
1240 }
1241 if !select_modifiers.replace.is_empty() {
1242 lf = lf.with_columns(&select_modifiers.replace);
1243 }
1244 if !select_modifiers.rename.is_empty() {
1245 lf = lf.with_columns(select_modifiers.renamed_cols());
1246 }
1247 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1248
1249 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1251 && !have_order_by
1252 {
1253 lf = lf.with_columns(retained_cols).select(retained_names);
1255 } else {
1256 lf = lf.select(retained_cols);
1257 }
1258 if !select_modifiers.rename.is_empty() {
1259 lf = lf.rename(
1260 select_modifiers.rename.keys(),
1261 select_modifiers.rename.values(),
1262 true,
1263 );
1264 };
1265 lf
1266 } else {
1267 let having = select_stmt
1268 .having
1269 .as_ref()
1270 .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1271 .transpose()?;
1272 lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1273 lf = self.process_order_by(lf, &query.order_by, None)?;
1274
1275 let output_cols: Vec<_> = projections
1277 .iter()
1278 .map(|p| p.to_field(&schema))
1279 .collect::<PolarsResult<Vec<_>>>()?
1280 .into_iter()
1281 .map(|f| col(f.name))
1282 .collect();
1283
1284 lf.select(&output_cols)
1285 };
1286
1287 lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1289
1290 lf = match &select_stmt.distinct {
1292 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1293 Some(Distinct::On(exprs)) => {
1294 let schema = Some(self.get_frame_schema(&mut lf)?);
1296 let cols = exprs
1297 .iter()
1298 .map(|e| {
1299 let expr = parse_sql_expr(e, self, schema.as_deref())?;
1300 if let Expr::Column(name) = expr {
1301 Ok(name)
1302 } else {
1303 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1304 }
1305 })
1306 .collect::<PolarsResult<Vec<_>>>()?;
1307
1308 lf = self.process_order_by(lf, &query.order_by, None)?;
1310 return Ok(lf.unique_stable(
1311 Some(Selector::ByName {
1312 names: cols.into(),
1313 strict: true,
1314 }),
1315 UniqueKeepStrategy::First,
1316 ));
1317 },
1318 None => lf,
1319 };
1320 Ok(lf)
1321 }
1322
1323 fn column_projections(
1324 &mut self,
1325 select_stmt: &Select,
1326 schema: &SchemaRef,
1327 select_modifiers: &mut SelectModifiers,
1328 ) -> PolarsResult<Vec<Expr>> {
1329 let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
1330 .projection
1331 .iter()
1332 .map(|select_item| match select_item {
1333 SelectItem::UnnamedExpr(expr) => {
1334 Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
1335 },
1336 SelectItem::ExprWithAlias { expr, alias } => {
1337 let expr = parse_sql_expr(expr, self, Some(schema))?;
1338 Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
1339 },
1340 SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1341 SelectItemQualifiedWildcardKind::ObjectName(obj_name) => self
1343 .process_qualified_wildcard(
1344 obj_name,
1345 wildcard_options,
1346 select_modifiers,
1347 Some(schema),
1348 ),
1349 SelectItemQualifiedWildcardKind::Expr(_) => {
1351 polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1352 },
1353 },
1354 SelectItem::Wildcard(wildcard_options) => {
1355 let cols = schema
1356 .iter_names()
1357 .map(|name| col(name.clone()))
1358 .collect::<Vec<_>>();
1359
1360 self.process_wildcard_additional_options(
1361 cols,
1362 wildcard_options,
1363 select_modifiers,
1364 Some(schema),
1365 )
1366 },
1367 })
1368 .collect();
1369
1370 let flattened_exprs: Vec<Expr> = parsed_items?
1371 .into_iter()
1372 .flatten()
1373 .flat_map(|expr| expand_exprs(expr, schema))
1374 .collect();
1375
1376 Ok(flattened_exprs)
1377 }
1378
1379 fn process_where(
1380 &mut self,
1381 mut lf: LazyFrame,
1382 expr: &Option<SQLExpr>,
1383 invert_filter: bool,
1384 schema: Option<SchemaRef>,
1385 ) -> PolarsResult<LazyFrame> {
1386 if let Some(expr) = expr {
1387 let schema = match schema {
1388 None => self.get_frame_schema(&mut lf)?,
1389 Some(s) => s,
1390 };
1391
1392 let (all_true, all_false) = match expr {
1394 SQLExpr::Value(ValueWithSpan {
1395 value: SQLValue::Boolean(b),
1396 ..
1397 }) => (*b, !*b),
1398 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1399 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => {
1400 (a.value == b.value, a.value != b.value)
1401 },
1402 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1403 (a.value != b.value, a.value == b.value)
1404 },
1405 _ => (false, false),
1406 },
1407 _ => (false, false),
1408 };
1409 if (all_true && !invert_filter) || (all_false && invert_filter) {
1410 return Ok(lf);
1411 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1412 return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1413 }
1414
1415 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1417 if filter_expression.clone().meta().has_multiple_outputs() {
1418 filter_expression = all_horizontal([filter_expression])?;
1419 }
1420 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1421 lf = if invert_filter {
1422 lf.remove(filter_expression)
1423 } else {
1424 lf.filter(filter_expression)
1425 };
1426 }
1427 Ok(lf)
1428 }
1429
1430 pub(super) fn process_join(
1431 &mut self,
1432 tbl_left: &TableInfo,
1433 tbl_right: &TableInfo,
1434 constraint: &JoinConstraint,
1435 join_type: JoinType,
1436 ) -> PolarsResult<LazyFrame> {
1437 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1438 let coalesce_type = match constraint {
1439 JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1441 _ => JoinCoalesce::KeepColumns,
1442 };
1443 let joined = tbl_left
1444 .frame
1445 .clone()
1446 .join_builder()
1447 .with(tbl_right.frame.clone())
1448 .left_on(left_on)
1449 .right_on(right_on)
1450 .how(join_type)
1451 .suffix(format!(":{}", tbl_right.name))
1452 .coalesce(coalesce_type)
1453 .finish();
1454
1455 Ok(joined)
1456 }
1457
1458 fn process_qualify(
1459 &mut self,
1460 mut lf: LazyFrame,
1461 qualify_expr: &Option<SQLExpr>,
1462 window_fn_columns: &PlHashSet<String>,
1463 ) -> PolarsResult<LazyFrame> {
1464 if let Some(expr) = qualify_expr {
1465 let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1468 let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1469 if !has_window_fns && !references_window_alias {
1470 polars_bail!(
1471 SQLSyntax:
1472 "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1473 );
1474 }
1475 let schema = self.get_frame_schema(&mut lf)?;
1476 let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1477 if filter_expression.clone().meta().has_multiple_outputs() {
1478 filter_expression = all_horizontal([filter_expression])?;
1479 }
1480 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1481 lf = lf.filter(filter_expression);
1482 }
1483 Ok(lf)
1484 }
1485
1486 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1487 let mut contexts = vec![];
1488 for expr in exprs {
1489 *expr = expr.clone().map_expr(|e| match e {
1490 Expr::SubPlan(lp, names) => {
1491 contexts.push(<LazyFrame>::from((**lp).clone()));
1492 if names.len() == 1 {
1493 Expr::Column(names[0].as_str().into())
1494 } else {
1495 Expr::SubPlan(lp, names)
1496 }
1497 },
1498 e => e,
1499 })
1500 }
1501 if contexts.is_empty() {
1502 lf
1503 } else {
1504 lf.with_context(contexts)
1505 }
1506 }
1507
1508 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1509 if let Statement::CreateTable(CreateTable {
1510 if_not_exists,
1511 name,
1512 query,
1513 columns,
1514 like,
1515 ..
1516 }) = stmt
1517 {
1518 let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1519 if *if_not_exists && self.table_map.contains_key(tbl_name) {
1520 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1521 }
1522 let lf = match (query, columns.is_empty(), like) {
1523 (Some(query), true, None) => {
1524 self.execute_query(query)?
1528 },
1529 (None, false, None) => {
1530 let mut schema = Schema::with_capacity(columns.len());
1534 for col in columns {
1535 let col_name = col.name.value.as_str();
1536 let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1537 schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1538 }
1539 DataFrame::empty_with_schema(&schema).lazy()
1540 },
1541 (None, true, Some(like_kind)) => {
1542 let like_name = match like_kind {
1546 CreateTableLikeKind::Plain(like)
1547 | CreateTableLikeKind::Parenthesized(like) => &like.name,
1548 };
1549 let like_table = like_name
1550 .0
1551 .first()
1552 .unwrap()
1553 .as_ident()
1554 .unwrap()
1555 .value
1556 .as_str();
1557 if let Some(mut table) = self.table_map.get(like_table).cloned() {
1558 let schema = self.get_frame_schema(&mut table)?;
1559 DataFrame::empty_with_schema(&schema).lazy()
1560 } else {
1561 polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1562 }
1563 },
1564 (None, true, None) => {
1566 polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1567 },
1568 _ => {
1570 polars_bail!(
1571 SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1572 query,
1573 columns,
1574 like,
1575 )
1576 },
1577 };
1578 self.register(tbl_name, lf);
1579
1580 let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1581 Ok(df_created.unwrap().lazy())
1582 } else {
1583 unreachable!()
1584 }
1585 }
1586
1587 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1588 match relation {
1589 TableFactor::Table {
1590 name, alias, args, ..
1591 } => {
1592 if let Some(args) = args {
1593 return self.execute_table_function(name, alias, &args.args);
1594 }
1595 let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1596 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1597 match alias {
1598 Some(alias) => {
1599 self.table_aliases
1600 .insert(alias.name.value.clone(), tbl_name.to_string());
1601 Ok((alias.name.value.clone(), lf))
1602 },
1603 None => Ok((tbl_name.to_string(), lf)),
1604 }
1605 } else {
1606 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1607 }
1608 },
1609 TableFactor::Derived {
1610 lateral,
1611 subquery,
1612 alias,
1613 } => {
1614 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1615 if let Some(alias) = alias {
1616 let mut lf = self.execute_query_no_ctes(subquery)?;
1617 lf = self.rename_columns_from_table_alias(lf, alias)?;
1618 self.table_map.insert(alias.name.value.clone(), lf.clone());
1619 Ok((alias.name.value.clone(), lf))
1620 } else {
1621 polars_bail!(SQLSyntax: "derived tables must have aliases");
1622 }
1623 },
1624 TableFactor::UNNEST {
1625 alias,
1626 array_exprs,
1627 with_offset,
1628 with_offset_alias: _,
1629 ..
1630 } => {
1631 if let Some(alias) = alias {
1632 let column_names: Vec<Option<PlSmallStr>> = alias
1633 .columns
1634 .iter()
1635 .map(|c| {
1636 if c.name.value.is_empty() {
1637 None
1638 } else {
1639 Some(PlSmallStr::from_str(c.name.value.as_str()))
1640 }
1641 })
1642 .collect();
1643
1644 let column_values: Vec<Series> = array_exprs
1645 .iter()
1646 .map(|arr| parse_sql_array(arr, self))
1647 .collect::<Result<_, _>>()?;
1648
1649 polars_ensure!(!column_names.is_empty(),
1650 SQLSyntax:
1651 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1652 );
1653 if column_names.len() != column_values.len() {
1654 let plural = if column_values.len() > 1 { "s" } else { "" };
1655 polars_bail!(
1656 SQLSyntax:
1657 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1658 );
1659 }
1660 let column_series: Vec<Column> = column_values
1661 .into_iter()
1662 .zip(column_names)
1663 .map(|(s, name)| {
1664 if let Some(name) = name {
1665 s.with_name(name)
1666 } else {
1667 s
1668 }
1669 })
1670 .map(Column::from)
1671 .collect();
1672
1673 let lf = DataFrame::new(column_series)?.lazy();
1674
1675 if *with_offset {
1676 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1678 }
1679 let table_name = alias.name.value.clone();
1680 self.table_map.insert(table_name.clone(), lf.clone());
1681 Ok((table_name, lf))
1682 } else {
1683 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1684 }
1685 },
1686 TableFactor::NestedJoin {
1687 table_with_joins,
1688 alias,
1689 } => {
1690 let lf = self.execute_from_statement(table_with_joins)?;
1691 match alias {
1692 Some(a) => Ok((a.name.value.clone(), lf)),
1693 None => Ok(("".to_string(), lf)),
1694 }
1695 },
1696 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1698 }
1699 }
1700
1701 fn execute_table_function(
1702 &mut self,
1703 name: &ObjectName,
1704 alias: &Option<TableAlias>,
1705 args: &[FunctionArg],
1706 ) -> PolarsResult<(String, LazyFrame)> {
1707 let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1708 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1709 let (tbl_name, lf) = read_fn.execute(args)?;
1710 #[allow(clippy::useless_asref)]
1711 let tbl_name = alias
1712 .as_ref()
1713 .map(|a| a.name.value.clone())
1714 .unwrap_or_else(|| tbl_name.to_str().to_string());
1715
1716 self.table_map.insert(tbl_name.clone(), lf.clone());
1717 Ok((tbl_name, lf))
1718 }
1719
1720 fn process_order_by(
1721 &mut self,
1722 mut lf: LazyFrame,
1723 order_by: &Option<OrderBy>,
1724 selected: Option<&[Expr]>,
1725 ) -> PolarsResult<LazyFrame> {
1726 if order_by.as_ref().is_none_or(|ob| match &ob.kind {
1727 OrderByKind::Expressions(exprs) => exprs.is_empty(),
1728 OrderByKind::All(_) => false,
1729 }) {
1730 return Ok(lf);
1731 }
1732 let schema = self.get_frame_schema(&mut lf)?;
1733 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1734 let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
1735 OrderByKind::Expressions(exprs) => {
1736 if exprs.len() == 1
1739 && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
1740 if ident.value.to_uppercase() == "ALL"
1741 && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1742 {
1743 let n_cols = if let Some(selected) = selected {
1745 selected.len()
1746 } else {
1747 schema.len()
1748 };
1749 (vec![], Some(&exprs[0].options), n_cols)
1750 } else {
1751 (exprs.clone(), None, exprs.len())
1752 }
1753 },
1754 OrderByKind::All(opts) => {
1755 let n_cols = if let Some(selected) = selected {
1756 selected.len()
1757 } else {
1758 schema.len()
1759 };
1760 (vec![], Some(opts), n_cols)
1761 },
1762 };
1763 let mut descending = Vec::with_capacity(n_order_cols);
1764 let mut nulls_last = Vec::with_capacity(n_order_cols);
1765 let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
1766
1767 if let Some(opts) = order_by_all {
1768 if let Some(selected) = selected {
1769 by.extend(selected.iter().cloned());
1770 } else {
1771 by.extend(columns_iter);
1772 };
1773 let desc_order = !opts.asc.unwrap_or(true);
1774 nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
1775 descending.resize(by.len(), desc_order);
1776 } else {
1777 let columns = &columns_iter.collect::<Vec<_>>();
1778 for ob in order_by {
1779 let desc_order = !ob.options.asc.unwrap_or(true);
1782 nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
1783 descending.push(desc_order);
1784
1785 by.push(self.expr_or_ordinal(
1787 &ob.expr,
1788 columns,
1789 selected,
1790 Some(&schema),
1791 "ORDER BY",
1792 )?)
1793 }
1794 }
1795 Ok(lf.sort_by_exprs(
1796 &by,
1797 SortMultipleOptions::default()
1798 .with_order_descending_multi(descending)
1799 .with_nulls_last_multi(nulls_last)
1800 .with_maintain_order(true),
1801 ))
1802 }
1803
1804 fn process_group_by(
1805 &mut self,
1806 mut lf: LazyFrame,
1807 group_by_keys: &[Expr],
1808 projections: &[Expr],
1809 having: Option<Expr>,
1810 ) -> PolarsResult<LazyFrame> {
1811 let schema_before = self.get_frame_schema(&mut lf)?;
1812 let group_by_keys_schema =
1813 expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
1814 format!("group_by keys contained duplicate output name '{duplicate_name}'")
1815 })?;
1816
1817 let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
1819 let mut aggregation_projection = Vec::with_capacity(projections.len());
1820 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1821 let mut projection_aliases = PlHashSet::new();
1822 let mut group_key_aliases = PlHashSet::new();
1823
1824 let group_key_data: Vec<_> = group_by_keys
1827 .iter()
1828 .map(|gk| {
1829 (
1830 strip_outer_alias(gk),
1831 gk.to_field(&schema_before).ok().map(|f| f.name),
1832 )
1833 })
1834 .collect();
1835
1836 let projection_matches_group_key: Vec<bool> = projections
1837 .iter()
1838 .map(|p| {
1839 let p_stripped = strip_outer_alias(p);
1840 let p_name = p.to_field(&schema_before).ok().map(|f| f.name);
1841 group_key_data
1842 .iter()
1843 .any(|(gk_stripped, gk_name)| *gk_stripped == p_stripped && *gk_name == p_name)
1844 })
1845 .collect();
1846
1847 for (e, &matches_group_key) in projections.iter().zip(&projection_matches_group_key) {
1848 let is_non_group_key_expr = !matches_group_key
1850 && has_expr(e, |e| {
1851 match e {
1852 Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
1853 #[cfg(feature = "dynamic_group_by")]
1854 Expr::Rolling { .. } => true,
1855 Expr::Function { function: func, .. }
1856 if !matches!(func, FunctionExpr::StructExpr(_)) =>
1857 {
1858 has_expr(e, |e| match e {
1861 Expr::Column(name) => !group_by_keys_schema.contains(name),
1862 _ => false,
1863 })
1864 },
1865 _ => false,
1866 }
1867 });
1868
1869 let mut e_inner = e;
1872 if let Expr::Alias(expr, alias) = e {
1873 if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1874 group_key_aliases.insert(alias.as_ref());
1875 e_inner = expr
1876 } else if let Expr::Function {
1877 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1878 ..
1879 } = expr.deref()
1880 {
1881 projection_overrides
1882 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1883 } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
1884 projection_aliases.insert(alias.as_ref());
1885 }
1886 }
1887 let field = e_inner.to_field(&schema_before)?;
1888 if is_non_group_key_expr {
1889 let mut e = e.clone();
1890 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1891 e = (**expr).clone();
1892 } else if let Expr::Alias(expr, name) = &e {
1893 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1894 e = (**expr).clone().alias(name.clone());
1895 }
1896 }
1897 if group_by_keys_schema.get(&field.name).is_some() {
1900 let alias_name = format!("__POLARS_AGG_{}", field.name);
1901 e = e.alias(alias_name.as_str());
1902 aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
1903 }
1904 aggregation_projection.push(e);
1905 } else if !matches_group_key {
1906 if let Expr::Column(_)
1908 | Expr::Function {
1909 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1910 ..
1911 } = e_inner
1912 {
1913 if !group_by_keys_schema.contains(&field.name) {
1914 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1915 }
1916 }
1917 }
1918 }
1919
1920 let having_filter = if let Some(having_expr) = having {
1923 let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
1924 .iter()
1925 .filter_map(|p| match p {
1926 Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
1927 Some((inner.as_ref().clone(), name.clone()))
1928 },
1929 e @ (Expr::Agg(_) | Expr::Len) => Some((
1930 e.clone(),
1931 e.to_field(&schema_before)
1932 .map(|f| f.name)
1933 .unwrap_or_default(),
1934 )),
1935 _ => None,
1936 })
1937 .collect();
1938
1939 let mut n_having_aggs = 0;
1940 let updated_having = having_expr.map_expr(|e| {
1941 if !matches!(&e, Expr::Agg(_) | Expr::Len) {
1942 return e;
1943 }
1944 let name = agg_to_name
1945 .iter()
1946 .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
1947 .unwrap_or_else(|| {
1948 let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
1949 aggregation_projection.push(e.clone().alias(n.clone()));
1950 agg_to_name.push((e.clone(), n.clone()));
1951 n_having_aggs += 1;
1952 n
1953 });
1954 col(name)
1955 });
1956 Some(updated_having)
1957 } else {
1958 None
1959 };
1960
1961 let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1963 if let Some(filter_expr) = having_filter {
1964 aggregated = aggregated.filter(filter_expr);
1965 }
1966
1967 let projection_schema =
1968 expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
1969 format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
1970 })?;
1971
1972 let final_projection = projection_schema
1975 .iter_names()
1976 .zip(projections.iter().zip(&projection_matches_group_key))
1977 .map(|(name, (projection_expr, &matches_group_key))| {
1978 if let Some(expr) = projection_overrides.get(name.as_str()) {
1979 expr.clone()
1980 } else if let Some(aliased_name) = aliased_aggregations.get(name) {
1981 col(aliased_name.clone()).alias(name.clone())
1982 } else if group_by_keys_schema.get(name).is_some() && matches_group_key {
1983 col(name.clone())
1984 } else if group_by_keys_schema.get(name).is_some()
1985 || projection_aliases.contains(name.as_str())
1986 || group_key_aliases.contains(name.as_str())
1987 {
1988 if has_expr(projection_expr, |e| {
1989 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
1990 }) {
1991 col(name.clone())
1992 } else {
1993 projection_expr.clone()
1994 }
1995 } else {
1996 col(name.clone())
1997 }
1998 })
1999 .collect::<Vec<_>>();
2000
2001 let mut output_projection = final_projection;
2003 for key_name in group_by_keys_schema.iter_names() {
2004 if !projection_schema.contains(key_name) {
2005 output_projection.push(col(key_name.clone()));
2007 } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2008 let is_cross_aliased = projections.iter().any(|p| {
2010 p.to_field(&schema_before).is_ok_and(|f| f.name == key_name)
2011 && !is_simple_col_ref(p, key_name)
2012 });
2013 if is_cross_aliased {
2014 let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2016 output_projection.push(col(key_name.clone()).alias(internal_name));
2017 }
2018 }
2019 }
2020 Ok(aggregated.select(&output_projection))
2021 }
2022
2023 fn process_limit_offset(
2024 &self,
2025 lf: LazyFrame,
2026 limit: Option<&SQLExpr>,
2027 offset: Option<&SQLExpr>,
2028 ) -> PolarsResult<LazyFrame> {
2029 match (offset, limit) {
2030 (
2031 Some(SQLExpr::Value(ValueWithSpan {
2032 value: SQLValue::Number(offset, _),
2033 ..
2034 })),
2035 Some(SQLExpr::Value(ValueWithSpan {
2036 value: SQLValue::Number(limit, _),
2037 ..
2038 })),
2039 ) => Ok(lf.slice(
2040 offset
2041 .parse()
2042 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2043 limit
2044 .parse()
2045 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
2046 )),
2047 (
2048 Some(SQLExpr::Value(ValueWithSpan {
2049 value: SQLValue::Number(offset, _),
2050 ..
2051 })),
2052 None,
2053 ) => Ok(lf.slice(
2054 offset
2055 .parse()
2056 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2057 IdxSize::MAX,
2058 )),
2059 (
2060 None,
2061 Some(SQLExpr::Value(ValueWithSpan {
2062 value: SQLValue::Number(limit, _),
2063 ..
2064 })),
2065 ) => Ok(lf.limit(
2066 limit
2067 .parse()
2068 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
2069 )),
2070 (None, None) => Ok(lf),
2071 _ => polars_bail!(
2072 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
2073 ),
2074 }
2075 }
2076
2077 fn process_qualified_wildcard(
2078 &mut self,
2079 ObjectName(idents): &ObjectName,
2080 options: &WildcardAdditionalOptions,
2081 modifiers: &mut SelectModifiers,
2082 schema: Option<&Schema>,
2083 ) -> PolarsResult<Vec<Expr>> {
2084 let mut new_idents = idents.clone();
2085 new_idents.push(ObjectNamePart::Identifier(Ident::new("*")));
2086
2087 let ident_refs: Vec<&Ident> = new_idents.iter().filter_map(|p| p.as_ident()).collect();
2088 let expr = resolve_compound_identifier(
2089 self,
2090 &ident_refs.iter().map(|&i| i.clone()).collect::<Vec<_>>(),
2091 schema,
2092 );
2093 self.process_wildcard_additional_options(expr?, options, modifiers, schema)
2094 }
2095
2096 fn process_wildcard_additional_options(
2097 &mut self,
2098 exprs: Vec<Expr>,
2099 options: &WildcardAdditionalOptions,
2100 modifiers: &mut SelectModifiers,
2101 schema: Option<&Schema>,
2102 ) -> PolarsResult<Vec<Expr>> {
2103 if options.opt_except.is_some() && options.opt_exclude.is_some() {
2104 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2105 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2106 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2107 }
2108
2109 if let Some(items) = &options.opt_exclude {
2111 match items {
2112 ExcludeSelectItem::Single(ident) => {
2113 modifiers.exclude.insert(ident.value.clone());
2114 },
2115 ExcludeSelectItem::Multiple(idents) => {
2116 modifiers
2117 .exclude
2118 .extend(idents.iter().map(|i| i.value.clone()));
2119 },
2120 };
2121 }
2122
2123 if let Some(items) = &options.opt_except {
2125 modifiers.exclude.insert(items.first_element.value.clone());
2126 modifiers
2127 .exclude
2128 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2129 }
2130
2131 if let Some(item) = &options.opt_ilike {
2133 let rx = regex::escape(item.pattern.as_str())
2134 .replace('%', ".*")
2135 .replace('_', ".");
2136
2137 modifiers.ilike = Some(
2138 polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2139 );
2140 }
2141
2142 if let Some(items) = &options.opt_rename {
2144 let renames = match items {
2145 RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2146 RenameSelectItem::Multiple(renames) => renames.as_slice(),
2147 };
2148 for rn in renames {
2149 let before = PlSmallStr::from_str(rn.ident.value.as_str());
2150 let after = PlSmallStr::from_str(rn.alias.value.as_str());
2151 if before != after {
2152 modifiers.rename.insert(before, after);
2153 }
2154 }
2155 }
2156
2157 if let Some(replacements) = &options.opt_replace {
2159 for rp in &replacements.items {
2160 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2161 modifiers
2162 .replace
2163 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2164 }
2165 }
2166 Ok(exprs)
2167 }
2168
2169 fn rename_columns_from_table_alias(
2170 &mut self,
2171 mut lf: LazyFrame,
2172 alias: &TableAlias,
2173 ) -> PolarsResult<LazyFrame> {
2174 if alias.columns.is_empty() {
2175 Ok(lf)
2176 } else {
2177 let schema = self.get_frame_schema(&mut lf)?;
2178 if alias.columns.len() != schema.len() {
2179 polars_bail!(
2180 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2181 alias.columns.len(), alias.name.value, schema.len()
2182 )
2183 } else {
2184 let existing_columns: Vec<_> = schema.iter_names().collect();
2185 let new_columns: Vec<_> =
2186 alias.columns.iter().map(|c| c.name.value.clone()).collect();
2187 Ok(lf.rename(existing_columns, new_columns, true))
2188 }
2189 }
2190 }
2191}
2192
2193impl SQLContext {
2194 pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
2196 self.table_map.clone()
2197 }
2198
2199 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2201 Self {
2202 table_map,
2203 ..Default::default()
2204 }
2205 }
2206}
2207
2208fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2209 match expr {
2210 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2211 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2212 schema
2213 .iter_names()
2214 .filter(|name| re.is_match(name))
2215 .map(|name| col(name.clone()))
2216 .collect::<Vec<_>>()
2217 },
2218 Expr::Selector(s) => s
2219 .into_columns(schema, &Default::default())
2220 .unwrap()
2221 .into_iter()
2222 .map(col)
2223 .collect::<Vec<_>>(),
2224 _ => vec![expr],
2225 }
2226}
2227
2228fn is_regex_colname(nm: &str) -> bool {
2229 nm.starts_with('^') && nm.ends_with('$')
2230}
2231
2232struct FindTableIdentifier<'a> {
2234 table_name: &'a str,
2235 found: bool,
2236}
2237
2238impl<'a> SQLVisitor for FindTableIdentifier<'a> {
2239 type Break = ();
2240
2241 fn pre_visit_expr(&mut self, expr: &SQLExpr) -> ControlFlow<Self::Break> {
2242 if let SQLExpr::CompoundIdentifier(idents) = expr {
2243 if idents.len() >= 2 && idents[0].value.as_str() == self.table_name {
2244 self.found = true; return ControlFlow::Break(());
2246 }
2247 }
2248 ControlFlow::Continue(())
2249 }
2250}
2251
2252struct QualifyExpression {
2255 has_window_functions: bool,
2256 column_refs: PlHashSet<String>,
2257}
2258
2259impl QualifyExpression {
2260 fn new() -> Self {
2261 Self {
2262 has_window_functions: false,
2263 column_refs: PlHashSet::new(),
2264 }
2265 }
2266
2267 fn analyze(expr: &SQLExpr) -> (bool, PlHashSet<String>) {
2268 let mut analyzer = Self::new();
2269 let _ = expr.visit(&mut analyzer);
2270 (analyzer.has_window_functions, analyzer.column_refs)
2271 }
2272}
2273
2274impl SQLVisitor for QualifyExpression {
2275 type Break = ();
2276
2277 fn pre_visit_expr(&mut self, expr: &SQLExpr) -> ControlFlow<Self::Break> {
2278 match expr {
2279 SQLExpr::Function(func) if func.over.is_some() => {
2280 self.has_window_functions = true;
2281 },
2282 SQLExpr::Identifier(ident) => {
2283 self.column_refs.insert(ident.value.clone());
2284 },
2285 SQLExpr::CompoundIdentifier(idents) if !idents.is_empty() => {
2286 self.column_refs
2287 .insert(idents.last().unwrap().value.clone());
2288 },
2289 _ => {},
2290 }
2291 ControlFlow::Continue(())
2292 }
2293}
2294
2295fn expr_has_window_functions(expr: &SQLExpr) -> bool {
2298 struct WindowFunctionFinder;
2299 impl SQLVisitor for WindowFunctionFinder {
2300 type Break = ();
2301 fn pre_visit_expr(&mut self, expr: &SQLExpr) -> ControlFlow<()> {
2302 if matches!(expr, SQLExpr::Function(f) if f.over.is_some()) {
2303 ControlFlow::Break(())
2304 } else {
2305 ControlFlow::Continue(())
2306 }
2307 }
2308 }
2309 expr.visit(&mut WindowFunctionFinder).is_break()
2310}
2311
2312fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2314 match expr {
2315 Expr::Column(n) => n == col_name,
2316 Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2317 _ => false,
2318 }
2319}
2320
2321fn strip_outer_alias(expr: &Expr) -> Expr {
2323 if let Expr::Alias(inner, _) = expr {
2324 inner.as_ref().clone()
2325 } else {
2326 expr.clone()
2327 }
2328}
2329
2330fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2335 if schema.contains(name) {
2337 return None;
2338 }
2339 projections.iter().find_map(|p| match p {
2341 Expr::Alias(inner, alias) if alias.as_str() == name => {
2342 Some(inner.as_ref().clone().alias(alias.clone()))
2343 },
2344 _ => None,
2345 })
2346}
2347
2348fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2350 let mut found_cols = false;
2351 let mut all_in_schema = true;
2352 for e in expr.into_iter() {
2353 if let Expr::Column(name) = e {
2354 found_cols = true;
2355 if !schema.contains(name.as_str()) {
2356 all_in_schema = false;
2357 break;
2358 }
2359 }
2360 }
2361 found_cols && all_in_schema
2362}
2363
2364fn expr_refers_to_table(expr: &SQLExpr, table_name: &str) -> bool {
2366 let mut table_finder = FindTableIdentifier {
2367 table_name,
2368 found: false,
2369 };
2370 let _ = expr.visit(&mut table_finder);
2371 table_finder.found
2372}
2373
2374fn determine_left_right_join_on(
2382 ctx: &mut SQLContext,
2383 expr_left: &SQLExpr,
2384 expr_right: &SQLExpr,
2385 tbl_left: &TableInfo,
2386 tbl_right: &TableInfo,
2387 join_schema: &Schema,
2388) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2389 let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2392 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2393 e => e,
2394 };
2395 let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2396 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2397 e => e,
2398 };
2399
2400 let left_refs = (
2404 expr_refers_to_table(expr_left, &tbl_left.name),
2405 expr_refers_to_table(expr_left, &tbl_right.name),
2406 );
2407 let right_refs = (
2408 expr_refers_to_table(expr_right, &tbl_left.name),
2409 expr_refers_to_table(expr_right, &tbl_right.name),
2410 );
2411 match (left_refs, right_refs) {
2413 ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2415 ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2417 ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2419 polars_bail!(
2420 SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2421 if left_refs.0 && left_refs.1 {
2422 "left"
2423 } else {
2424 "right"
2425 }, tbl_left.name, tbl_right.name
2426 )
2427 },
2428 _ => {},
2430 }
2431
2432 let left_on_cols_in = (
2437 expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2438 expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2439 );
2440 let right_on_cols_in = (
2441 expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2442 expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2443 );
2444 match (left_on_cols_in, right_on_cols_in) {
2445 ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2447 ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2448 ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2450 ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2451 ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2452 ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2453 _ => Ok((vec![left_on], vec![right_on])),
2455 }
2456}
2457
2458fn process_join_on(
2459 ctx: &mut SQLContext,
2460 sql_expr: &SQLExpr,
2461 tbl_left: &TableInfo,
2462 tbl_right: &TableInfo,
2463) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2464 match sql_expr {
2465 SQLExpr::BinaryOp { left, op, right } => match op {
2466 BinaryOperator::And => {
2467 let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
2468 let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
2469 left_i.append(&mut left_j);
2470 right_i.append(&mut right_j);
2471 Ok((left_i, right_i))
2472 },
2473 BinaryOperator::Eq => {
2474 let mut join_schema =
2477 Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2478 for (name, dtype) in tbl_left.schema.iter() {
2479 join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2480 }
2481 for (name, dtype) in tbl_right.schema.iter() {
2482 if !join_schema.contains(name) {
2483 join_schema.insert_at_index(
2484 join_schema.len(),
2485 name.clone(),
2486 dtype.clone(),
2487 )?;
2488 }
2489 }
2490 determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
2491 },
2492 _ => polars_bail!(
2493 SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
2495 ),
2496 },
2497 SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2498 _ => polars_bail!(
2499 SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
2500 ),
2501 }
2502}
2503
2504fn process_join_constraint(
2505 constraint: &JoinConstraint,
2506 tbl_left: &TableInfo,
2507 tbl_right: &TableInfo,
2508 ctx: &mut SQLContext,
2509) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2510 match constraint {
2511 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2512 process_join_on(ctx, expr, tbl_left, tbl_right)
2513 },
2514 JoinConstraint::Using(idents) if !idents.is_empty() => {
2515 let using: Vec<Expr> = idents
2516 .iter()
2517 .map(|ObjectName(parts)| {
2518 if parts.len() != 1 {
2519 polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2520 }
2521 match parts[0].as_ident() {
2522 Some(ident) => Ok(col(ident.value.as_str())),
2523 None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2524 }
2525 })
2526 .collect::<PolarsResult<Vec<_>>>()?;
2527 Ok((using.clone(), using))
2528 },
2529 JoinConstraint::Natural => {
2530 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2531 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2532 let on: Vec<Expr> = left_names
2533 .intersection(&right_names)
2534 .map(|&name| col(name.clone()))
2535 .collect();
2536 if on.is_empty() {
2537 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2538 }
2539 Ok((on.clone(), on))
2540 },
2541 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2542 }
2543}
2544
2545bitflags::bitflags! {
2546 #[derive(PartialEq)]
2551 struct ExprSqlProjectionHeightBehavior: u8 {
2552 const MaintainsColumn = 1 << 0;
2554 const Independent = 1 << 1;
2558 const InheritsContext = 1 << 2;
2561 }
2562}
2563
2564impl ExprSqlProjectionHeightBehavior {
2565 fn identify_from_expr(expr: &Expr) -> Self {
2566 let mut has_column = false;
2567 let mut has_independent = false;
2568
2569 for e in expr.into_iter() {
2570 use Expr::*;
2571 has_column |= matches!(e, Column(_) | Selector(_));
2572 has_independent |= match e {
2573 AnonymousFunction { options, .. } => {
2575 options.returns_scalar() || !options.is_length_preserving()
2576 },
2577 Literal(v) => !v.is_scalar(),
2578 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
2579 Agg { .. } | Len => true,
2580 _ => false,
2581 }
2582 }
2583 if has_independent {
2584 Self::Independent
2585 } else if has_column {
2586 Self::MaintainsColumn
2587 } else {
2588 Self::InheritsContext
2589 }
2590 }
2591}