Skip to content

Commit 9a9c714

Browse files
nicholaschew11HeartSaVioR
authored andcommitted
[SPARK-55628][SS] Integrate stream-stream join state format V4
### What changes were proposed in this pull request? Integrate stream-stream join state format V4 which uses timestamp-based indexing with a secondary index. Key changes: - Enable V4 in `STREAMING_JOIN_STATE_FORMAT_VERSION` config - Gated V4 behind `spark.sql.streaming.join.stateFormatV4.enabled` while V4 is under development. - Route V4 to use VCF (`stateFormatVersion >= 3`) and hardcode schema version 3 for VCF path - Fix checkpoint ID routing for V4's single-store design - Mark V4's secondary index (`TsWithKeyStore`) as `isInternal = true` to prevent double-counting in `numRowsTotal` metrics - Convert watermark from milliseconds to microseconds at all 4 eviction call sites (V4 stores timestamps as `TimestampType`) - Add `TimestampAsPostfixKeyStateEncoderSpec` and `TimestampAsPrefixKeyStateEncoderSpec` to `KeyStateEncoderSpec.fromJson` for checkpoint restart deserialization - Add V4 branch in `getSchemaForStateStores` and `getSchemasForStateStoreWithColFamily` for correct column family schemas and encoder specs ### Why are the changes needed? SPARK-55628 tracks the integration of V4 state format into the stream-stream join operator. V4 was implemented in SPARK-55144 but not yet wired into the operator. ### Does this PR introduce _any_ user-facing change? No. V4 is gated behind an internal config (`spark.sql.streaming.join.stateFormatVersion=4`, default remains 2). V4 is marked as experimental and subject to change. ### How was this patch tested? - Added `StreamingJoinV4Suite.scala` with 4 new test suites: `StreamingInnerJoinV4Suite`, `StreamingOuterJoinV4Suite`, `StreamingFullOuterJoinV4Suite`, `StreamingLeftSemiJoinV4Suite` - All suites re-run existing join tests with V4 config via `TestWithV4StateFormat` trait - 2 V4-specific tests: plan assertion (verifies `stateFormatVersion == 4` in execution plan) and schema validation (verifies correct column families and encoder specs) - 94/94 tests pass across all 4 suites ### Was this patch authored or co-authored using generative AI tooling? Yes ### Behavioral Change Information - [ ] This is a behavioral change - [x] This is not a behavioral change Closes #54777 from nicholaschew11/spark-55628-v4-join-integration. Authored-by: Nicholas Chew <chew.nicky@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 8efc4c6 commit 9a9c714

File tree

6 files changed

+324
-42
lines changed

6 files changed

+324
-42
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3133,14 +3133,23 @@ object SQLConf {
31333133
"State between versions are tend to be incompatible, so state format version shouldn't " +
31343134
"be modified after running. Version 3 uses a single state store with virtual column " +
31353135
"families instead of four stores and is only supported with RocksDB. NOTE: version " +
3136-
"1 is DEPRECATED and should not be explicitly set by users.")
3136+
"1 is DEPRECATED and should not be explicitly set by users. " +
3137+
"Version 4 is under development and only available for testing.")
31373138
.version("3.0.0")
31383139
.intConf
3139-
// TODO: [SPARK-55628] Add version 4 once we integrate the state format version 4 into
3140-
// stream-stream join operator.
3141-
.checkValue(v => Set(1, 2, 3).contains(v), "Valid versions are 1, 2, and 3")
3140+
.checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 3, and 4")
31423141
.createWithDefault(2)
31433142

3143+
val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED =
3144+
buildConf("spark.sql.streaming.join.stateFormatV4.enabled")
3145+
.internal()
3146+
.doc("When true, enables state format version 4 for stream-stream joins. " +
3147+
"This config will be removed once V4 is complete.")
3148+
.version("4.2.0")
3149+
.withBindingPolicy(ConfigBindingPolicy.SESSION)
3150+
.booleanConf
3151+
.createWithDefaultFunction(() => Utils.isTesting)
3152+
31443153
val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION =
31453154
buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition")
31463155
.doc("When true, streaming session window sorts and merge sessions in local partition " +

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ case class StreamingSymmetricHashJoinExec(
198198
private val allowMultipleStatefulOperators =
199199
conf.getConf(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE)
200200

201-
private val useVirtualColumnFamilies = stateFormatVersion == 3
201+
private val useVirtualColumnFamilies = stateFormatVersion >= 3
202202

203203
// Determine the store names and metadata version based on format version
204204
private val (numStoresPerPartition, _stateStoreNames, _operatorStateMetadataVersion) =
@@ -292,8 +292,12 @@ case class StreamingSymmetricHashJoinExec(
292292
val info = getStateInfo
293293
val stateSchemaDir = stateSchemaDirPath()
294294

295+
// V4 uses VCF like V3, which requires schema version 3. The stateSchemaVersion
296+
// parameter may carry the stateFormatVersion (e.g. 4) from IncrementalExecution,
297+
// so we hardcode 3 here for the VCF path.
298+
val effectiveSchemaVersion = 3
295299
validateAndWriteStateSchema(
296-
hadoopConf, batchId, stateSchemaVersion, info, stateSchemaDir, session
300+
hadoopConf, batchId, effectiveSchemaVersion, info, stateSchemaDir, session
297301
)
298302
} else {
299303
var result: Map[String, (StructType, StructType)] = Map.empty
@@ -437,7 +441,7 @@ case class StreamingSymmetricHashJoinExec(
437441
removedRowIter.filterNot { kv =>
438442
stateFormatVersion match {
439443
case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
440-
case 2 | 3 => kv.matched
444+
case 2 | 3 | 4 => kv.matched
441445
case _ => throwBadStateFormatVersionException()
442446
}
443447
}.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
@@ -463,7 +467,7 @@ case class StreamingSymmetricHashJoinExec(
463467
removedRowIter.filterNot { kv =>
464468
stateFormatVersion match {
465469
case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key, kv.value))
466-
case 2 | 3 => kv.matched
470+
case 2 | 3 | 4 => kv.matched
467471
case _ => throwBadStateFormatVersionException()
468472
}
469473
}.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
@@ -479,7 +483,7 @@ case class StreamingSymmetricHashJoinExec(
479483
case FullOuter =>
480484
lazy val isKeyToValuePairMatched = (kv: KeyToValuePair) =>
481485
stateFormatVersion match {
482-
case 2 | 3 => kv.matched
486+
case 2 | 3 | 4 => kv.matched
483487
case _ => throwBadStateFormatVersionException()
484488
}
485489

@@ -801,15 +805,15 @@ case class StreamingSymmetricHashJoinExec(
801805
s.evictByKeyCondition(stateKeyWatermarkPredicateFunc)
802806

803807
case s: SupportsEvictByTimestamp =>
804-
s.evictByTimestamp(stateWatermark)
808+
s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
805809
}
806810
case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) =>
807811
joinStateManager match {
808812
case s: SupportsEvictByCondition =>
809813
s.evictByValueCondition(stateValueWatermarkPredicateFunc)
810814

811815
case s: SupportsEvictByTimestamp =>
812-
s.evictByTimestamp(stateWatermark)
816+
s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
813817
}
814818
case _ => 0L
815819
}
@@ -833,20 +837,27 @@ case class StreamingSymmetricHashJoinExec(
833837
s.evictAndReturnByKeyCondition(stateKeyWatermarkPredicateFunc)
834838

835839
case s: SupportsEvictByTimestamp =>
836-
s.evictAndReturnByTimestamp(stateWatermark)
840+
s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
837841
}
838842
case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) =>
839843
joinStateManager match {
840844
case s: SupportsEvictByCondition =>
841845
s.evictAndReturnByValueCondition(stateValueWatermarkPredicateFunc)
842846

843847
case s: SupportsEvictByTimestamp =>
844-
s.evictAndReturnByTimestamp(stateWatermark)
848+
s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
845849
}
846850
case _ => Iterator.empty
847851
}
848852
}
849853

854+
/**
855+
* V4 stores timestamps in microseconds (TimestampType) while the watermark
856+
* is tracked in milliseconds. Convert ms to microseconds for eviction calls.
857+
*/
858+
private def watermarkMsToStateTimestamp(watermarkMs: Long): Long =
859+
watermarkMs * 1000
860+
850861
/** Commit changes to the buffer state */
851862
def commitState(): Unit = {
852863
joinStateManager.commit()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
3434
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, WatermarkSupport}
3535
import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper._
3636
import org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor, KeyStateEncoderSpec, NoopStatePartitionKeyExtractor, NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast, StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema, StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics, StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay, TimestampAsPostfixKeyStateEncoderSpec, TimestampAsPrefixKeyStateEncoderSpec, TimestampKeyStateEncoder}
37+
import org.apache.spark.sql.internal.SQLConf
3738
import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType, StructField, StructType}
3839
import org.apache.spark.util.NextIterator
3940

@@ -252,9 +253,9 @@ class SymmetricHashJoinStateManagerV4(
252253
Seq(StructField("dummy", NullType, nullable = true))
253254
)
254255

255-
// TODO: [SPARK-55628] Below two fields need to be handled properly during integration with
256-
// the operator.
257-
private val stateStoreCkptId: Option[String] = None
256+
// V4 uses a single store with VCFs (not separate keyToNumValues/keyWithIndexToValue stores).
257+
// Use the keyToNumValues checkpoint ID for loading the correct committed version.
258+
private val stateStoreCkptId: Option[String] = keyToNumValuesStateStoreCkptId
258259
private val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None
259260

260261
private var stateStoreProvider: StateStoreProvider = _
@@ -496,7 +497,7 @@ class SymmetricHashJoinStateManagerV4(
496497
private val attachTimestampProjection: UnsafeProjection =
497498
TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema)
498499

499-
// Create the specific column family in the store for this join side's KeyWithIndexToValueStore
500+
// Create the specific column family in the store for this join side's KeyWithTsToValuesStore.
500501
stateStore.createColFamilyIfAbsent(
501502
colFamilyName,
502503
keySchema,
@@ -648,13 +649,15 @@ class SymmetricHashJoinStateManagerV4(
648649
private val attachTimestampProjection: UnsafeProjection =
649650
TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema)
650651

651-
// Create the specific column family in the store for this join side's KeyWithIndexToValueStore
652+
// Create the specific column family in the store for this join side's TsWithKeyStore.
653+
// Mark as internal so that numKeys counts only primary data, not the secondary index.
652654
stateStore.createColFamilyIfAbsent(
653655
colFamilyName,
654656
keySchema,
655657
valueStructType,
656658
TimestampAsPrefixKeyStateEncoderSpec(keySchemaWithTimestamp),
657-
useMultipleValuesPerKey = true
659+
useMultipleValuesPerKey = true,
660+
isInternal = true
658661
)
659662

660663
private def createKeyRow(key: UnsafeRow, timestamp: Long): UnsafeRow = {
@@ -1311,8 +1314,8 @@ abstract class SymmetricHashJoinStateManagerBase(
13111314
val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None)
13121315
extends StateStoreHandler(
13131316
KeyToNumValuesType, keyToNumValuesStateStoreCkptId, handlerSnapshotOptions) {
1314-
SnapshotOptions
1315-
private val useVirtualColumnFamilies = stateFormatVersion == 3
1317+
1318+
private val useVirtualColumnFamilies = stateFormatVersion >= 3
13161319
private val longValueSchema = new StructType().add("value", "long")
13171320
private val longToUnsafeRow = UnsafeProjection.create(longValueSchema)
13181321
private val valueRow = longToUnsafeRow(new SpecificInternalRow(longValueSchema))
@@ -1411,7 +1414,7 @@ SnapshotOptions
14111414
extends StateStoreHandler(
14121415
KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId, handlerSnapshotOptions) {
14131416

1414-
private val useVirtualColumnFamilies = stateFormatVersion == 3
1417+
private val useVirtualColumnFamilies = stateFormatVersion >= 3
14151418
private val keyWithIndexExprs = keyAttributes :+ Literal(1L)
14161419
private val keyWithIndexSchema = keySchema.add("index", LongType)
14171420
private val indexOrdinalInKeyWithIndexRow = keyAttributes.size
@@ -1744,6 +1747,8 @@ object SymmetricHashJoinStateManager {
17441747
snapshotOptions: Option[SnapshotOptions] = None,
17451748
joinStoreGenerator: JoinStateManagerStoreGenerator): SymmetricHashJoinStateManager = {
17461749
if (stateFormatVersion == 4) {
1750+
require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED),
1751+
"State format version 4 is under development.")
17471752
new SymmetricHashJoinStateManagerV4(
17481753
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf,
17491754
partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId,
@@ -1780,28 +1785,44 @@ object SymmetricHashJoinStateManager {
17801785
inputValueAttributes: Seq[Attribute],
17811786
joinKeys: Seq[Expression],
17821787
stateFormatVersion: Int): Map[String, (StructType, StructType)] = {
1783-
var result: Map[String, (StructType, StructType)] = Map.empty
1784-
1785-
// get the key and value schema for the KeyToNumValues state store
17861788
val keySchema = StructType(
17871789
joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i", k.dataType, k.nullable) })
1788-
val longValueSchema = new StructType().add("value", "long")
1789-
result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema, longValueSchema))
1790-
1791-
// get the key and value schema for the KeyWithIndexToValue state store
1792-
val keyWithIndexSchema = keySchema.add("index", LongType)
1793-
val valueSchema = if (stateFormatVersion == 1) {
1794-
inputValueAttributes
1795-
} else if (stateFormatVersion == 2 || stateFormatVersion == 3) {
1796-
inputValueAttributes :+ AttributeReference("matched", BooleanType)()
1790+
1791+
if (stateFormatVersion == 4) {
1792+
// V4 uses two column families: KeyWithTsToValues and TsWithKey
1793+
val keySchemaWithTimestamp =
1794+
TimestampKeyStateEncoder.keySchemaWithTimestamp(keySchema)
1795+
val valueWithMatchedSchema =
1796+
(inputValueAttributes :+ AttributeReference("matched", BooleanType)()).toStructType
1797+
val dummyValueSchema = StructType(Array(StructField("__dummy__", NullType)))
1798+
1799+
Map(
1800+
getStateStoreName(joinSide, KeyWithTsToValuesType) ->
1801+
(keySchemaWithTimestamp, valueWithMatchedSchema),
1802+
getStateStoreName(joinSide, TsWithKeyType) ->
1803+
(keySchemaWithTimestamp, dummyValueSchema))
17971804
} else {
1798-
throw new IllegalArgumentException("Incorrect state format version! " +
1799-
s"version=$stateFormatVersion")
1800-
}
1801-
result += (getStateStoreName(joinSide, KeyWithIndexToValueType) ->
1802-
(keyWithIndexSchema, valueSchema.toStructType))
1805+
var result: Map[String, (StructType, StructType)] = Map.empty
1806+
1807+
// get the key and value schema for the KeyToNumValues state store
1808+
val longValueSchema = new StructType().add("value", "long")
1809+
result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema, longValueSchema))
1810+
1811+
// get the key and value schema for the KeyWithIndexToValue state store
1812+
val keyWithIndexSchema = keySchema.add("index", LongType)
1813+
val valueSchema = if (stateFormatVersion == 1) {
1814+
inputValueAttributes
1815+
} else if (stateFormatVersion == 2 || stateFormatVersion == 3) {
1816+
inputValueAttributes :+ AttributeReference("matched", BooleanType)()
1817+
} else {
1818+
throw new IllegalArgumentException("Incorrect state format version! " +
1819+
s"version=$stateFormatVersion")
1820+
}
1821+
result += (getStateStoreName(joinSide, KeyWithIndexToValueType) ->
1822+
(keyWithIndexSchema, valueSchema.toStructType))
18031823

1804-
result
1824+
result
1825+
}
18051826
}
18061827

18071828
/** Retrieves the schemas used for join operator state stores that use column families */
@@ -1816,9 +1837,18 @@ object SymmetricHashJoinStateManager {
18161837

18171838
schemas.map {
18181839
case (colFamilyName, (keySchema, valueSchema)) =>
1840+
val keyStateEncoderSpec = if (stateFormatVersion == 4) {
1841+
if (colFamilyName == getStateStoreName(joinSide, KeyWithTsToValuesType)) {
1842+
TimestampAsPostfixKeyStateEncoderSpec(keySchema)
1843+
} else {
1844+
TimestampAsPrefixKeyStateEncoderSpec(keySchema)
1845+
}
1846+
} else {
1847+
NoPrefixKeyStateEncoderSpec(keySchema)
1848+
}
18191849
colFamilyName -> StateStoreColFamilySchema(
18201850
colFamilyName, 0, keySchema, 0, valueSchema,
1821-
Some(NoPrefixKeyStateEncoderSpec(keySchema))
1851+
Some(keyStateEncoderSpec)
18221852
)
18231853
}
18241854
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,10 @@ object KeyStateEncoderSpec {
611611
case "PrefixKeyScanStateEncoderSpec" =>
612612
val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt].toInt
613613
PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
614+
case "TimestampAsPostfixKeyStateEncoderSpec" =>
615+
TimestampAsPostfixKeyStateEncoderSpec(keySchema)
616+
case "TimestampAsPrefixKeyStateEncoderSpec" =>
617+
TimestampAsPrefixKeyStateEncoderSpec(keySchema)
614618
}
615619
}
616620
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
941941
.select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue")
942942

943943
val useVirtualColumnFamilies =
944-
spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) == 3
944+
spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) >= 3
945945
// Number of shuffle partitions being used is 3
946946
val numStateStoreInstances = if (useVirtualColumnFamilies) {
947947
// Only one state store is created per partition if we're using virtual column families

0 commit comments

Comments
 (0)