Skip to content

Conversation

sarutak
Copy link
Member

@sarutak sarutak commented Oct 11, 2025

What changes were proposed in this pull request?

In Spark Connect environment, QueryExecution#observedMetrics can be called by two threads concurrently.

  • Thread1(ObservationManager)
private def tryComplete(qe: QueryExecution): Unit = {
  val allMetrics = qe.observedMetrics
  qe.logical.foreach {
    case c: CollectMetrics =>
      allMetrics.get(c.name).foreach { metrics =>
        val observation = observations.remove((c.name, c.dataframeId))
        if (observation != null) {
          observation.setMetricsAndNotify(metrics)
        }
      }
    case _ =>
  }
}
  • Thread2(SparkConnectPlanExecution)
private def createObservedMetricsResponse(
    sessionId: String,
    observationAndPlanIds: Map[String, Long],
    dataframe: DataFrame): Option[ExecutePlanResponse] = {
  val observedMetrics = dataframe.queryExecution.observedMetrics.collect {
    case (name, row) if !executeHolder.observations.contains(name) =>
      val values = SparkConnectPlanExecution.toObservedMetricsValues(row)
      name -> values
  } 

This can cause race condition issues. We can see CI failure caused by this issue.
https://github.com/apache/spark/actions/runs/18422173471/job/52497913985

======================================================================
ERROR [0.181s]: test_observe_with_map_type (pyspark.sql.tests.connect.test_parity_observation.DataFrameObservationParityTests.test_observe_with_map_type)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/__w/spark/spark/python/pyspark/testing/utils.py", line 228, in wrapper
    lastValue = condition(*args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/sql/tests/test_observation.py", line 226, in test_observe_with_map_type
    assertDataFrameEqual(df, [Row(id=id) for id in range(10)])
  File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1098, in assertDataFrameEqual
    actual_list = actual.collect()
                  ^^^^^^^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 1817, in collect
    table, schema = self._to_table()
                    ^^^^^^^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 1830, in _to_table
    table, schema, self._execution_info = self._session.client.to_table(
                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 946, in to_table
    table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req, observations)
                                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1642, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1619, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1893, in _handle_error
    self._handle_rpc_error(error)
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1966, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.IllegalArgumentException: requirement failed

JVM stacktrace:
java.lang.IllegalArgumentException
	at scala.Predef$.require(Predef.scala:324)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapData.<init>(ArrayBasedMapData.scala:31)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.build(ArrayBasedMapBuilder.scala:130)
	at org.apache.spark.sql.catalyst.expressions.CreateMap.eval(complexTypeCreator.scala:260)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:162)
	at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:84)
	at org.apache.spark.sql.execution.AggregatingAccumulator.$anonfun$value$2(AggregatingAccumulator.scala:199)
	at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:162)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withSQLConf(AggregatingAccumulator.scala:106)
	at org.apache.spark.sql.execution.AggregatingAccumulator.value(AggregatingAccumulator.scala:188)
	at org.apache.spark.sql.execution.CollectMetricsExec.collectedMetrics(CollectMetricsExec.scala:59)
	at org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:111)
	at org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:109)
	at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
	at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1(AdaptiveSparkPlanHelper.scala:86)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1$adapted(AdaptiveSparkPlanHelper.scala:86)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach(AdaptiveSparkPlanHelper.scala:45)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach$(AdaptiveSparkPlanHelper.scala:44)
	at org.apache.spark.sql.execution.CollectMetricsExec$.foreach(CollectMetricsExec.scala:101)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect(AdaptiveSparkPlanHelper.scala:86)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect$(AdaptiveSparkPlanHelper.scala:83)
	at org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:101)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collectWithSubqueries$1(AdaptiveSparkPlanHelper.scala:113)
	at scala.collection.immutable.List.flatMap(List.scala:294)
	at scala.collection.immutable.List.flatMap(List.scala:79)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries(AdaptiveSparkPlanHelper.scala:113)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries$(AdaptiveSparkPlanHelper.scala:112)
	at org.apache.spark.sql.execution.CollectMetricsExec$.collectWithSubqueries(CollectMetricsExec.scala:101)
	at org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:109)
	at org.apache.spark.sql.execution.QueryExecution.observedMetrics(QueryExecution.scala:276)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.createObservedMetricsResponse(SparkConnectPlanExecution.scala:322)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:82)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:394)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:394)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:113)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:184)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:103)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:112)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:393)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:333)

This test failure can be reproduced by inserting sleep into ArrayBasedMapBuilder.scala like as follows.

   private def reset(): Unit = {
     keyToIndex.clear()
     keys.clear()
+    Thread.sleep(10)
     values.clear()
   }

And then, run the test as follows.

$ python/run-tests --modules=pyspark-connect --parallelism=1 --testnames pyspark.sql.tests.connect.test_parity_observation  --python-executables=python3.11

To fix this issue, this PR proposes to protect QueryExecution#observedMdetrics using synchronized block.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Ran the problematic test with inserting sleep like as mentioned above, and confirmed the test passed.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot removed the CONNECT label Oct 11, 2025
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @sarutak . The JIRA ID looks wrong to me.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-51166][SQL][CONNECT] Fix race condition issue related to ObservedMetrics [SPARK-XXX][SQL][CONNECT] Fix race condition issue related to ObservedMetrics Oct 11, 2025
@dongjoon-hyun
Copy link
Member

I changed the PR title to prevent accidental merging. Please let me know when you correct the PR title.

@sarutak sarutak changed the title [SPARK-XXX][SQL][CONNECT] Fix race condition issue related to ObservedMetrics [SPARK-53878][SQL][CONNECT] Fix race condition issue related to ObservedMetrics Oct 11, 2025
@sarutak
Copy link
Member Author

sarutak commented Oct 11, 2025

@dongjoon-hyun Thank you for letting me know. I've modified it.

@dongjoon-hyun
Copy link
Member

Thank you, @sarutak .


/** Get the metrics observed during the execution of the query plan. */
def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan)
def observedMetrics: Map[String, Row] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this was designed as def observedMetrics instead of lazy val, this method can return different values at every invocation. Are we sure that this PR are not going to lose the AS-IS capability?

If the return value is static, why don't we use lazy val?

Copy link
Member Author

@sarutak sarutak Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I walked through the related code and I confirmed observedMetrics can return different value at every invocation so I'll consider it.


/** Get the metrics observed during the execution of the query plan. */
def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan)
def observedMetrics: Map[String, Row] = observedMetricsLock.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @sarutak . Please add the following annotation, javax.annotation.concurrent.GuardedBy.

@GuardedBy("observedMetricsLock")

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (except the above annotation comment.)

@dongjoon-hyun
Copy link
Member

cc @peter-toth

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert #52566 now?
Otherwise, LGTM.

@sarutak
Copy link
Member Author

sarutak commented Oct 13, 2025

@ueshin Let's revert that PR separately to make git-history clean.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants