Skip to content

Commit db528e4

Browse files
brkyvzgatorsmile
authored andcommitted
[SPARK-30535][SQL] Revert "[] Migrate ALTER TABLE commands to the new framework
### What changes were proposed in this pull request? This reverts commit b5cb9ab. ### Why are the changes needed? The merged commit (#27243) was too risky for several reasons: 1. It doesn't fix a bug 2. It makes the resolution of the table that's going to be altered a child. We had avoided this on purpose as having an arbitrary rule change the child of AlterTable seemed risky. This change alone is a big -1 for me for this change. 3. While the code may look cleaner, I think this approach makes certain things harder, e.g. differentiating between the Hive based Alter table CHANGE COLUMN and ALTER COLUMN syntax. Resolving and normalizing columns for ALTER COLUMN also becomes a bit harder, as we now have to check every single AlterTable command instead of just a single ALTER TABLE ALTER COLUMN statement ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing unit tests This closes #27315 Closes #27327 from brkyvz/revAlter. Authored-by: Burak Yavuz <[email protected]> Signed-off-by: Xiao Li <[email protected]>
1 parent d2bca8f commit db528e4

File tree

19 files changed

+462
-324
lines changed

19 files changed

+462
-324
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -755,14 +755,12 @@ class Analyzer(
755755
.map(view => i.copy(table = view))
756756
.getOrElse(i)
757757
case u @ UnresolvedTable(ident) =>
758-
lookupTempView(ident)
759-
.map(_ => UnresolvedTableWithViewExists(
760-
ResolvedView(ident.asIdentifier, isTempView = true)))
761-
.getOrElse(u)
758+
lookupTempView(ident).foreach { _ =>
759+
u.failAnalysis(s"${ident.quoted} is a temp view not table.")
760+
}
761+
u
762762
case u @ UnresolvedTableOrView(ident) =>
763-
lookupTempView(ident)
764-
.map(_ => ResolvedView(ident.asIdentifier, isTempView = true))
765-
.getOrElse(u)
763+
lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u)
766764
}
767765

768766
def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
@@ -816,6 +814,14 @@ class Analyzer(
816814
lookupV2Relation(u.multipartIdentifier)
817815
.map(v2Relation => i.copy(table = v2Relation))
818816
.getOrElse(i)
817+
818+
case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
819+
CatalogV2Util.loadRelation(u.catalog, u.tableName)
820+
.map(rel => alter.copy(table = rel))
821+
.getOrElse(alter)
822+
823+
case u: UnresolvedV2Relation =>
824+
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
819825
}
820826

821827
/**
@@ -882,7 +888,8 @@ class Analyzer(
882888

883889
case u @ UnresolvedTable(identifier) =>
884890
lookupTableOrView(identifier).map {
885-
case v: ResolvedView => UnresolvedTableWithViewExists(v)
891+
case v: ResolvedView =>
892+
u.failAnalysis(s"${v.identifier.quoted} is a view not table.")
886893
case table => table
887894
}.getOrElse(u)
888895

@@ -895,7 +902,7 @@ class Analyzer(
895902
case SessionCatalogAndIdentifier(catalog, ident) =>
896903
CatalogV2Util.loadTable(catalog, ident).map {
897904
case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW =>
898-
ResolvedView(ident, isTempView = false)
905+
ResolvedView(ident)
899906
case table =>
900907
ResolvedTable(catalog.asTableCatalog, ident, table)
901908
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
2626
import org.apache.spark.sql.catalyst.plans._
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.util.TypeUtils
29-
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
29+
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnType}
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.types._
3232

@@ -87,20 +87,6 @@ trait CheckAnalysis extends PredicateHelper {
8787
}
8888

8989
def checkAnalysis(plan: LogicalPlan): Unit = {
90-
// Analysis that needs to be performed top down can be added here.
91-
plan.foreach {
92-
case p if p.analyzed => // Skip already analyzed sub-plans
93-
94-
case alter: AlterTable =>
95-
alter.table match {
96-
case u @ UnresolvedTableWithViewExists(view) if !view.isTempView =>
97-
u.failAnalysis("Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
98-
case _ =>
99-
}
100-
101-
case _ => // Analysis successful!
102-
}
103-
10490
// We transform up and order the rules so as to catch the first possible failure instead
10591
// of the result of cascading resolution failures.
10692
plan.foreachUp {
@@ -119,13 +105,23 @@ trait CheckAnalysis extends PredicateHelper {
119105
case u: UnresolvedRelation =>
120106
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
121107

122-
case u: UnresolvedTableWithViewExists =>
123-
val viewKind = if (u.view.isTempView) { "temp view" } else { "view" }
124-
u.failAnalysis(s"${u.view.identifier.quoted} is a $viewKind not a table.")
125-
126108
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) =>
127109
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
128110

111+
case u: UnresolvedV2Relation if isView(u.originalNameParts) =>
112+
u.failAnalysis(
113+
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")
114+
115+
case u: UnresolvedV2Relation =>
116+
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
117+
118+
case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
119+
u.failAnalysis(
120+
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")
121+
122+
case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
123+
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
124+
129125
case operator: LogicalPlan =>
130126
// Check argument data types of higher-order functions downwards first.
131127
// If the arguments of the higher-order functions are resolved but the type check fails,
@@ -429,9 +425,8 @@ trait CheckAnalysis extends PredicateHelper {
429425
case _ =>
430426
}
431427

432-
case alter: AlterTable
433-
if alter.childrenResolved && alter.table.isInstanceOf[ResolvedTable] =>
434-
val table = alter.table.asInstanceOf[ResolvedTable].table
428+
case alter: AlterTable if alter.childrenResolved =>
429+
val table = alter.table
435430
def findField(operation: String, fieldName: Array[String]): StructField = {
436431
// include collections because structs nested in maps and arrays may be altered
437432
val field = table.schema.findNestedField(fieldName, includeCollections = true)
@@ -484,8 +479,6 @@ trait CheckAnalysis extends PredicateHelper {
484479
throw new AnalysisException(
485480
s"Cannot change nullable column to non-nullable: $fieldName")
486481
}
487-
case update: UpdateColumnPosition =>
488-
findField("update", update.fieldNames)
489482
case rename: RenameColumn =>
490483
findField("rename", rename.fieldNames)
491484
case update: UpdateColumnComment =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
2020
import org.apache.spark.sql.AnalysisException
2121
import org.apache.spark.sql.catalyst.plans.logical._
2222
import org.apache.spark.sql.catalyst.rules.Rule
23-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog}
23+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange}
2424

2525
/**
2626
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
@@ -32,6 +32,71 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
3232
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
3333

3434
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
35+
case AlterTableAddColumnsStatement(
36+
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
37+
val changes = cols.map { col =>
38+
TableChange.addColumn(
39+
col.name.toArray,
40+
col.dataType,
41+
col.nullable,
42+
col.comment.orNull,
43+
col.position.orNull)
44+
}
45+
createAlterTable(nameParts, catalog, tbl, changes)
46+
47+
case a @ AlterTableAlterColumnStatement(
48+
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
49+
val colName = a.column.toArray
50+
val typeChange = a.dataType.map { newDataType =>
51+
TableChange.updateColumnType(colName, newDataType)
52+
}
53+
val nullabilityChange = a.nullable.map { nullable =>
54+
TableChange.updateColumnNullability(colName, nullable)
55+
}
56+
val commentChange = a.comment.map { newComment =>
57+
TableChange.updateColumnComment(colName, newComment)
58+
}
59+
val positionChange = a.position.map { newPosition =>
60+
TableChange.updateColumnPosition(colName, newPosition)
61+
}
62+
createAlterTable(
63+
nameParts,
64+
catalog,
65+
tbl,
66+
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
67+
68+
case AlterTableRenameColumnStatement(
69+
nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
70+
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
71+
createAlterTable(nameParts, catalog, tbl, changes)
72+
73+
case AlterTableDropColumnsStatement(
74+
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
75+
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
76+
createAlterTable(nameParts, catalog, tbl, changes)
77+
78+
case AlterTableSetPropertiesStatement(
79+
nameParts @ NonSessionCatalogAndTable(catalog, tbl), props) =>
80+
val changes = props.map { case (key, value) =>
81+
TableChange.setProperty(key, value)
82+
}.toSeq
83+
createAlterTable(nameParts, catalog, tbl, changes)
84+
85+
// TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag.
86+
case AlterTableUnsetPropertiesStatement(
87+
nameParts @ NonSessionCatalogAndTable(catalog, tbl), keys, _) =>
88+
val changes = keys.map(key => TableChange.removeProperty(key))
89+
createAlterTable(nameParts, catalog, tbl, changes)
90+
91+
case AlterTableSetLocationStatement(
92+
nameParts @ NonSessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) =>
93+
if (partitionSpec.nonEmpty) {
94+
throw new AnalysisException(
95+
"ALTER TABLE SET LOCATION does not support partition for v2 tables.")
96+
}
97+
val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc))
98+
createAlterTable(nameParts, catalog, tbl, changes)
99+
35100
case AlterViewSetPropertiesStatement(
36101
NonSessionCatalogAndTable(catalog, tbl), props) =>
37102
throw new AnalysisException(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils
2626
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
2727
import org.apache.spark.sql.catalyst.trees.TreeNode
2828
import org.apache.spark.sql.catalyst.util.quoteIdentifier
29+
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
2930
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
3031

3132
/**
@@ -59,6 +60,28 @@ object UnresolvedRelation {
5960
UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table)
6061
}
6162

63+
/**
64+
* A variant of [[UnresolvedRelation]] which can only be resolved to a v2 relation
65+
* (`DataSourceV2Relation`), not v1 relation or temp view.
66+
*
67+
* @param originalNameParts the original table identifier name parts before catalog is resolved.
68+
* @param catalog The catalog which the table should be looked up from.
69+
* @param tableName The name of the table to look up.
70+
*/
71+
case class UnresolvedV2Relation(
72+
originalNameParts: Seq[String],
73+
catalog: TableCatalog,
74+
tableName: Identifier)
75+
extends LeafNode with NamedRelation {
76+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
77+
78+
override def name: String = originalNameParts.quoted
79+
80+
override def output: Seq[Attribute] = Nil
81+
82+
override lazy val resolved = false
83+
}
84+
6285
/**
6386
* An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into
6487
* a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]].

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.catalyst.expressions.Attribute
21-
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
21+
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
2222
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, Table, TableCatalog}
2323

2424
/**
@@ -41,16 +41,6 @@ case class UnresolvedTable(multipartIdentifier: Seq[String]) extends LeafNode {
4141
override def output: Seq[Attribute] = Nil
4242
}
4343

44-
/**
45-
* Holds the resolved view. It is used in a scenario where table is expected but the identifier
46-
* is resolved to a (temp) view.
47-
*/
48-
case class UnresolvedTableWithViewExists(view: ResolvedView) extends LeafNode {
49-
override lazy val resolved: Boolean = false
50-
51-
override def output: Seq[Attribute] = Nil
52-
}
53-
5444
/**
5545
* Holds the name of a table or view that has yet to be looked up in a catalog. It will
5646
* be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis.
@@ -81,6 +71,6 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T
8171
*/
8272
// TODO: create a generic representation for temp view, v1 view and v2 view, after we add view
8373
// support to v2 catalog. For now we only need the identifier to fallback to v1 command.
84-
case class ResolvedView(identifier: Identifier, isTempView: Boolean) extends LeafNode {
74+
case class ResolvedView(identifier: Identifier) extends LeafNode {
8575
override def output: Seq[Attribute] = Nil
8676
}

0 commit comments

Comments
 (0)