Skip to content

Commit d80e857

Browse files
Peter PashkinHyukjinKwon
authored andcommitted
[SPARK-51818][CONNECT] Move QueryExecution creation to AnalyzeHandler and don't Execute for AnalyzePlanRequests
### What changes were proposed in this pull request? Analyze Plan Requests for Schema should not trigger an Execute on the Logical Plan, currently when sending an AnalyzePlanRequest with a command that gets executed eagerly the Dataset.ofRows(logicalPlan) call executes the underlying command. We do not want this to happen when doing AnalyzePlan. So instead we construct the LogicalPlan with the CommandExecutionMode.SKIP and return the resulting schema that way. https://issues.apache.org/jira/browse/SPARK-51818 ### Why are the changes needed? SQL commands that get sent via an AnalyzePlanRequest get executed eagerly right now, this PR fixes that ### Does this PR introduce _any_ user-facing change? When calling .schema on DataFrame via Spark Connect the plan saved in the DataFrame is not executed anymore, that was the case beforehand. Example: spark.newDataFrame(plan: proto.Plan).schema with plan encoding some SQL command that gets executed eagerly like DROP TABLE the current behavior would execute the SQL command. This will not happen anymore after this change. ### How was this patch tested? Added Test for sending an AnalyzePlanRequest with Drop Table and making sure the table was not dropped ### Was this patch authored or co-authored using generative AI tooling? No Closes #50605 from peterpashkin/peter-pashkin/MoveAnalyzeAndSkipExecution. Authored-by: Peter Pashkin <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent bb0b2d2 commit d80e857

File tree

2 files changed

+70
-35
lines changed

2 files changed

+70
-35
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ import io.grpc.stub.StreamObserver
2323

2424
import org.apache.spark.connect.proto
2525
import org.apache.spark.internal.Logging
26-
import org.apache.spark.sql.classic.Dataset
26+
import org.apache.spark.sql.Row
27+
import org.apache.spark.sql.catalyst.encoders.RowEncoder
28+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
29+
import org.apache.spark.sql.classic.{DataFrame, Dataset}
2730
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput, StorageLevelProtoConverter}
2831
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
29-
import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, FormattedMode, SimpleMode}
32+
import org.apache.spark.sql.execution.{CodegenMode, CommandExecutionMode, CostMode, ExtendedMode, FormattedMode, SimpleMode}
3033
import org.apache.spark.sql.types.{DataType, StructType}
3134
import org.apache.spark.util.ArrayImplicits._
3235

@@ -61,21 +64,23 @@ private[connect] class SparkConnectAnalyzeHandler(
6164

6265
def transformRelation(rel: proto.Relation) = planner.transformRelation(rel, cachePlan = true)
6366

67+
def getDataFrameWithoutExecuting(rel: LogicalPlan): DataFrame = {
68+
val qe = session.sessionState.executePlan(rel, CommandExecutionMode.SKIP)
69+
new Dataset[Row](qe, () => RowEncoder.encoderFor(qe.analyzed.schema))
70+
}
71+
6472
request.getAnalyzeCase match {
6573
case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
66-
val schema = Dataset
67-
.ofRows(session, transformRelation(request.getSchema.getPlan.getRoot))
68-
.schema
74+
val rel = transformRelation(request.getSchema.getPlan.getRoot)
75+
val schema = getDataFrameWithoutExecuting(rel).schema
6976
builder.setSchema(
7077
proto.AnalyzePlanResponse.Schema
7178
.newBuilder()
7279
.setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
7380
.build())
74-
7581
case proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN =>
76-
val queryExecution = Dataset
77-
.ofRows(session, transformRelation(request.getExplain.getPlan.getRoot))
78-
.queryExecution
82+
val rel = transformRelation(request.getExplain.getPlan.getRoot)
83+
val queryExecution = getDataFrameWithoutExecuting(rel).queryExecution
7984
val explainString = request.getExplain.getExplainMode match {
8085
case proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_SIMPLE =>
8186
queryExecution.explainString(SimpleMode)
@@ -96,9 +101,8 @@ private[connect] class SparkConnectAnalyzeHandler(
96101
.build())
97102

98103
case proto.AnalyzePlanRequest.AnalyzeCase.TREE_STRING =>
99-
val schema = Dataset
100-
.ofRows(session, transformRelation(request.getTreeString.getPlan.getRoot))
101-
.schema
104+
val rel = transformRelation(request.getTreeString.getPlan.getRoot)
105+
val schema = getDataFrameWithoutExecuting(rel).schema
102106
val treeString = if (request.getTreeString.hasLevel) {
103107
schema.treeString(request.getTreeString.getLevel)
104108
} else {
@@ -111,29 +115,26 @@ private[connect] class SparkConnectAnalyzeHandler(
111115
.build())
112116

113117
case proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL =>
114-
val isLocal = Dataset
115-
.ofRows(session, transformRelation(request.getIsLocal.getPlan.getRoot))
116-
.isLocal
118+
val rel = transformRelation(request.getIsLocal.getPlan.getRoot)
119+
val isLocal = getDataFrameWithoutExecuting(rel).isLocal
117120
builder.setIsLocal(
118121
proto.AnalyzePlanResponse.IsLocal
119122
.newBuilder()
120123
.setIsLocal(isLocal)
121124
.build())
122125

123126
case proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING =>
124-
val isStreaming = Dataset
125-
.ofRows(session, transformRelation(request.getIsStreaming.getPlan.getRoot))
126-
.isStreaming
127+
val rel = transformRelation(request.getIsStreaming.getPlan.getRoot)
128+
val isStreaming = getDataFrameWithoutExecuting(rel).isStreaming
127129
builder.setIsStreaming(
128130
proto.AnalyzePlanResponse.IsStreaming
129131
.newBuilder()
130132
.setIsStreaming(isStreaming)
131133
.build())
132134

133135
case proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES =>
134-
val inputFiles = Dataset
135-
.ofRows(session, transformRelation(request.getInputFiles.getPlan.getRoot))
136-
.inputFiles
136+
val rel = transformRelation(request.getInputFiles.getPlan.getRoot)
137+
val inputFiles = getDataFrameWithoutExecuting(rel).inputFiles
137138
builder.setInputFiles(
138139
proto.AnalyzePlanResponse.InputFiles
139140
.newBuilder()
@@ -156,29 +157,27 @@ private[connect] class SparkConnectAnalyzeHandler(
156157
.build())
157158

158159
case proto.AnalyzePlanRequest.AnalyzeCase.SAME_SEMANTICS =>
159-
val target = Dataset.ofRows(
160-
session,
161-
transformRelation(request.getSameSemantics.getTargetPlan.getRoot))
162-
val other = Dataset.ofRows(
163-
session,
164-
transformRelation(request.getSameSemantics.getOtherPlan.getRoot))
160+
val targetRel = transformRelation(request.getSameSemantics.getTargetPlan.getRoot)
161+
val otherRel = transformRelation(request.getSameSemantics.getOtherPlan.getRoot)
162+
val target = getDataFrameWithoutExecuting(targetRel)
163+
val other = getDataFrameWithoutExecuting(otherRel)
165164
builder.setSameSemantics(
166165
proto.AnalyzePlanResponse.SameSemantics
167166
.newBuilder()
168167
.setResult(target.sameSemantics(other)))
169168

170169
case proto.AnalyzePlanRequest.AnalyzeCase.SEMANTIC_HASH =>
171-
val semanticHash = Dataset
172-
.ofRows(session, transformRelation(request.getSemanticHash.getPlan.getRoot))
170+
val rel = transformRelation(request.getSemanticHash.getPlan.getRoot)
171+
val semanticHash = getDataFrameWithoutExecuting(rel)
173172
.semanticHash()
174173
builder.setSemanticHash(
175174
proto.AnalyzePlanResponse.SemanticHash
176175
.newBuilder()
177176
.setResult(semanticHash))
178177

179178
case proto.AnalyzePlanRequest.AnalyzeCase.PERSIST =>
180-
val target = Dataset
181-
.ofRows(session, transformRelation(request.getPersist.getRelation))
179+
val rel = transformRelation(request.getPersist.getRelation)
180+
val target = getDataFrameWithoutExecuting(rel)
182181
if (request.getPersist.hasStorageLevel) {
183182
target.persist(
184183
StorageLevelProtoConverter.toStorageLevel(request.getPersist.getStorageLevel))
@@ -188,8 +187,8 @@ private[connect] class SparkConnectAnalyzeHandler(
188187
builder.setPersist(proto.AnalyzePlanResponse.Persist.newBuilder().build())
189188

190189
case proto.AnalyzePlanRequest.AnalyzeCase.UNPERSIST =>
191-
val target = Dataset
192-
.ofRows(session, transformRelation(request.getUnpersist.getRelation))
190+
val rel = transformRelation(request.getUnpersist.getRelation)
191+
val target = getDataFrameWithoutExecuting(rel)
193192
if (request.getUnpersist.hasBlocking) {
194193
target.unpersist(request.getUnpersist.getBlocking)
195194
} else {
@@ -198,8 +197,8 @@ private[connect] class SparkConnectAnalyzeHandler(
198197
builder.setUnpersist(proto.AnalyzePlanResponse.Unpersist.newBuilder().build())
199198

200199
case proto.AnalyzePlanRequest.AnalyzeCase.GET_STORAGE_LEVEL =>
201-
val target = Dataset
202-
.ofRows(session, transformRelation(request.getGetStorageLevel.getRelation))
200+
val rel = transformRelation(request.getGetStorageLevel.getRelation)
201+
val target = getDataFrameWithoutExecuting(rel)
203202
val storageLevel = target.storageLevel
204203
builder.setGetStorageLevel(
205204
proto.AnalyzePlanResponse.GetStorageLevel

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,42 @@ class SparkConnectServiceSuite
688688
}
689689
}
690690

691+
test("SPARK-51818: AnalyzePlanRequest does not execute the command") {
692+
withTable("test") {
693+
spark.sql("""
694+
| CREATE TABLE test (col1 INT, col2 STRING)
695+
|""".stripMargin)
696+
val sqlString = "DROP TABLE test"
697+
val plan = proto.Plan
698+
.newBuilder()
699+
.setRoot(
700+
proto.Relation
701+
.newBuilder()
702+
.setCommon(proto.RelationCommon.newBuilder().setPlanId(1))
703+
.setSql(proto.SQL.newBuilder().setQuery(sqlString).build())
704+
.build())
705+
.build()
706+
707+
val handler = new SparkConnectAnalyzeHandler(null)
708+
709+
val request = proto.AnalyzePlanRequest
710+
.newBuilder()
711+
.setExplain(
712+
proto.AnalyzePlanRequest.Explain
713+
.newBuilder()
714+
.setPlan(plan)
715+
.setExplainMode(proto.AnalyzePlanRequest.Explain.ExplainMode.EXPLAIN_MODE_EXTENDED)
716+
.build())
717+
.build()
718+
719+
handler.process(request, sparkSessionHolder)
720+
721+
// assert that table was not dropped
722+
val tableExists = spark.catalog.tableExists("test")
723+
assert(tableExists, "Table test should still exist after analyze request of DROP TABLE")
724+
}
725+
}
726+
691727
test("Test explain mode in analyze response") {
692728
withTable("test") {
693729
spark.sql("""

0 commit comments

Comments
 (0)