-
Notifications
You must be signed in to change notification settings - Fork 108
[server] Add DoL loopback to ensure new leader is fully caught up on VT #2314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[server] Add DoL loopback to ensure new leader is fully caught up on VT #2314
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a Declaration-of-Leadership (DoL) loopback mechanism to ensure new leader replicas are fully caught up on the Version Topic (VT) before switching to consume from remote VT or Real-Time (RT) topics. This replaces the previous time-based heuristic with a deterministic approach that eliminates the risk of duplicate consumption and data inconsistencies during leader transitions.
Key Changes:
- New DoL control message type with unique GUID that leaders produce to local VT during STANDBY→LEADER transition
- Leader waits to consume its own DoL message back (loopback confirmation) before switching to remote sources
- Configurable rollout via separate flags for system stores and user stores (
SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORESandSERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES)
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
DolStamp.java |
New class tracking DoL state (produced/consumed flags, leadership term, host ID) during leader transition |
DolGuidGenerator.java |
GUID generator for DoL control messages using UUID type 3 |
DoLStampGuidGenerator.java |
Duplicate GUID generator implementation (identical to DolGuidGenerator) |
KafkaKey.java |
Adds DOL_STAMP constant for DoL control message key |
VeniceWriter.java |
Implements sendDoLStamp() and getDoLStampKME() for producing DoL messages |
StoreIngestionTask.java |
Adds checkAndHandleDoLMessage() to detect and handle consumed DoL messages, validates DoL messages like heartbeats |
LeaderFollowerStoreIngestionTask.java |
Orchestrates DoL mechanism: initializes DoL state, sends DoL stamp, checks readiness in canSwitchToLeaderTopic(), falls back to legacy behavior when DoL disabled |
PartitionConsumptionState.java |
Tracks DoL state and highest observed leadership term per partition |
LeaderFollowerPartitionStateModel.java |
Uses Helix message creation timestamp as leadership term |
SharedKafkaConsumer.java |
Adds region name and index to toString() for better debugging |
ConfigKeys.java |
Defines two new config flags for DoL mechanism enablement |
VeniceServerConfig.java |
Exposes DoL config flags via getters |
VeniceServerWrapper.java |
Enables DoL mechanism for both system and user stores in integration tests |
VeniceClusterWrapper.java |
Increases timeout for version wait from 60s to 120s to accommodate DoL latency |
TestHybrid.java |
Adds timeout and unique store name for log compaction test |
TestHybridMultiRegion.java |
Updates test to use sendEmptyPushAndWait() and improves error message assertion |
TestTopicRequestOnHybridDelete.java |
Removes unused imports and deletes deleteStoreAfterStartedPushAllowsNewPush test |
log4j2.properties |
Updates logging configuration (contains hardcoded user path) |
StoreIngestionTaskTest.java |
Renames test method from resolveRtTopicPartitionWithPubSubBrokerAddress to resolveTopicPartitionWithPubSubBrokerAddress |
SharedKafkaConsumerTest.java |
Updates test to pass region name and index to SharedKafkaConsumer constructor |
ActiveActiveStoreIngestionTask.java |
Updates method calls from resolveRtTopicPartitionWithPubSubBrokerAddress to resolveTopicPartitionWithPubSubBrokerAddress |
KafkaConsumerService.java |
Passes region name and index when creating SharedKafkaConsumer instances |
PartitionWiseKafkaConsumerService.java |
Adds consumer instance to log output for better debugging |
HelixReadWriteSchemaRepository.java |
Adds store name to exception log message |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| LOGGER.info( | ||
| "### Getting leader position for replica: {}. Remote consumption enabled: {} remote: {} local: {}", | ||
| getReplicaTopicPartition(), | ||
| consumeRemotely(), | ||
| getLatestProcessedRemoteVtPosition(), | ||
| getLatestProcessedVtPosition()); |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log statement at line 993-998 uses "###" prefix which appears to be debug/temporary logging. This should either be removed or changed to a standard DEBUG level log without the "###" prefix, as it will appear in production logs at INFO level.
| this.dolProduceFuture = dolProduceFuture; | ||
| } | ||
|
|
||
| public boolean isReady() { |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isReady() method reads two volatile boolean fields (dolProduced and dolConsumed) without synchronization. While each field read is atomic, the combined read is not atomic, which could lead to a race condition where the state appears inconsistent. For example, between reading dolProduced and dolConsumed, another thread could update one of these values.
Consider either:
- Making the method synchronized, or
- Using an AtomicInteger to track a combined state, or
- Adding a memory barrier or using AtomicBoolean fields with appropriate ordering guarantees
This is especially important since this method is used to determine when to switch the leader topic, which is a critical operation.
| public boolean isReady() { | |
| public synchronized boolean isReady() { |
| // Close any existing VeniceWriter partition session | ||
| veniceWriter.get().closePartition(partitionConsumptionState.getPartition()); | ||
|
|
||
| DolStamp dolStamp = new DolStamp(leadershipTerm, veniceWriter.get().getWriterId()); | ||
| partitionConsumptionState.setDolState(dolStamp); | ||
| LOGGER.info( | ||
| "Initialized DoL state: {} for replica: {} with term: {} and hostId: {}", | ||
| dolStamp, | ||
| partitionConsumptionState.getReplicaId(), | ||
| leadershipTerm, | ||
| veniceWriter.get().getWriterId()); | ||
|
|
||
| // Send DolStamp to local VT | ||
| PubSubProducerCallback dolCallback = new DolStampProduceCallback(partitionConsumptionState, leadershipTerm); | ||
| CompletableFuture<PubSubProduceResult> dolProduceFuture = veniceWriter.get() | ||
| .sendDoLStamp( | ||
| partitionConsumptionState.getReplicaTopicPartition(), | ||
| dolCallback, | ||
| leadershipTerm, | ||
| localKafkaClusterId); | ||
| // Store the produce future in DolStamp | ||
| dolStamp.setDolProduceFuture(dolProduceFuture); |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If veniceWriter.get() throws an exception at line 869, the DoL state won't be initialized and the method will fail. However, subsequent calls to veniceWriter.get() at lines 871 and 882-887 assume the writer is available. Consider wrapping the entire method in a try-catch block to handle writer initialization failures gracefully and ensure the DoL state is properly cleaned up on failure.
| // Close any existing VeniceWriter partition session | |
| veniceWriter.get().closePartition(partitionConsumptionState.getPartition()); | |
| DolStamp dolStamp = new DolStamp(leadershipTerm, veniceWriter.get().getWriterId()); | |
| partitionConsumptionState.setDolState(dolStamp); | |
| LOGGER.info( | |
| "Initialized DoL state: {} for replica: {} with term: {} and hostId: {}", | |
| dolStamp, | |
| partitionConsumptionState.getReplicaId(), | |
| leadershipTerm, | |
| veniceWriter.get().getWriterId()); | |
| // Send DolStamp to local VT | |
| PubSubProducerCallback dolCallback = new DolStampProduceCallback(partitionConsumptionState, leadershipTerm); | |
| CompletableFuture<PubSubProduceResult> dolProduceFuture = veniceWriter.get() | |
| .sendDoLStamp( | |
| partitionConsumptionState.getReplicaTopicPartition(), | |
| dolCallback, | |
| leadershipTerm, | |
| localKafkaClusterId); | |
| // Store the produce future in DolStamp | |
| dolStamp.setDolProduceFuture(dolProduceFuture); | |
| try { | |
| // Close any existing VeniceWriter partition session | |
| veniceWriter.get().closePartition(partitionConsumptionState.getPartition()); | |
| DolStamp dolStamp = new DolStamp(leadershipTerm, veniceWriter.get().getWriterId()); | |
| partitionConsumptionState.setDolState(dolStamp); | |
| LOGGER.info( | |
| "Initialized DoL state: {} for replica: {} with term: {} and hostId: {}", | |
| dolStamp, | |
| partitionConsumptionState.getReplicaId(), | |
| leadershipTerm, | |
| veniceWriter.get().getWriterId()); | |
| // Send DolStamp to local VT | |
| PubSubProducerCallback dolCallback = new DolStampProduceCallback(partitionConsumptionState, leadershipTerm); | |
| CompletableFuture<PubSubProduceResult> dolProduceFuture = veniceWriter.get() | |
| .sendDoLStamp( | |
| partitionConsumptionState.getReplicaTopicPartition(), | |
| dolCallback, | |
| leadershipTerm, | |
| localKafkaClusterId); | |
| // Store the produce future in DolStamp | |
| dolStamp.setDolProduceFuture(dolProduceFuture); | |
| } catch (Exception e) { | |
| LOGGER.error( | |
| "Failed to initialize and send DoL stamp for replica: {} with term: {}", | |
| partitionConsumptionState.getReplicaId(), | |
| leadershipTerm, | |
| e); | |
| // Clear DoL state on failure | |
| partitionConsumptionState.clearDolState(); | |
| } |
| CompletableFuture<PubSubProduceResult> future = new CompletableFuture<>(); | ||
| future.completedFuture(null); | ||
| logger.warn("VeniceWriter already closed for topic partition " + topicPartition); | ||
| logger.warn("VeniceWriter already closed for topic-partition ", topicPartition); |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message format is inconsistent with the pattern used elsewhere in the file. Line 2304 uses a comma in the message string without a corresponding placeholder: "VeniceWriter already closed for topic-partition ", topicPartition. This should be "VeniceWriter already closed for topic-partition {}" with topicPartition as an argument.
| logger.warn("VeniceWriter already closed for topic-partition ", topicPartition); | |
| logger.warn("VeniceWriter already closed for topic-partition {}", topicPartition); |
|
|
||
| appender.rolling.type = RollingFile | ||
| appender.rolling.name = fileLogger | ||
| appender.rolling.fileName=/Users/sumane/logs/venice-test.log |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file path /Users/sumane/logs/venice-test.log contains a hardcoded username-specific path. This will not work for other developers or CI environments. Consider using a relative path or a system property like ${sys:user.home}/logs/venice-test.log or a temporary directory.
| appender.rolling.fileName=/Users/sumane/logs/venice-test.log | |
| appender.rolling.fileName=${sys:user.home}/logs/venice-test.log |
| } | ||
|
|
||
| private boolean isConsumingFromRemoteVersionTopic(PartitionConsumptionState partitionConsumptionState) { | ||
| // orint all three conditions for easier debugging |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a typo in the comment: "orint" should be "print". Additionally, the comment says "all three conditions" but only mentions two conditions in the log statement (Remote consumption enabled and position checks). The comment should be corrected for clarity.
| // orint all three conditions for easier debugging | |
| // Print relevant conditions for easier debugging: EOP received, isCurrentVersion, remote and local Kafka URLs |
| try { | ||
| produceResult = ", offset=" + dolProduceFuture.get().getPubSubPosition(); | ||
| } catch (Exception e) { | ||
| // Ignore, keep empty |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The toString() method calls dolProduceFuture.get() which can block indefinitely if the future hasn't completed. Although there's a check for isDone(), this could still cause issues if toString() is called from logging in a performance-sensitive path. Consider using getNow(null) or adding a timeout to avoid blocking, or simply not including the offset in the toString output.
| try { | |
| produceResult = ", offset=" + dolProduceFuture.get().getPubSubPosition(); | |
| } catch (Exception e) { | |
| // Ignore, keep empty | |
| PubSubProduceResult result = dolProduceFuture.getNow(null); | |
| if (result != null) { | |
| produceResult = ", offset=" + result.getPubSubPosition(); |
| LOGGER.debug("DoL mechanism not ready for replica: {} - DolStamp: {}", pcs.getReplicaId(), dolStamp); | ||
| return false; | ||
| } else { | ||
| // Use legacy time-based mechanism |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When DoL produce fails (line 913), the DoL state is cleared via pcs.clearDolState(). However, the replica is still in the IN_TRANSITION_FROM_STANDBY_TO_LEADER state and will call canSwitchToLeaderTopic() which will find dolStamp == null.
In this scenario, the code will fall through to line 964 and use the legacy time-based mechanism (canSwitchToLeaderTopicLegacy). This fallback behavior should be documented, or alternatively, the code should explicitly handle the DoL failure case - perhaps by retrying the DoL send or transitioning back to STANDBY state.
| // Use legacy time-based mechanism | |
| // If DoL mechanism is enabled but dolStamp is null, this means DoL produce failed and DoL state was cleared. | |
| // In this case, we fall back to the legacy time-based mechanism. This fallback should be monitored. | |
| if (shouldUseDolMechanism() && dolStamp == null) { | |
| LOGGER.warn("Falling back to legacy leader topic switch mechanism for replica: {} due to missing DoL state (likely DoL produce failure).", pcs.getReplicaId()); | |
| } |
| package com.linkedin.venice.guid; | ||
|
|
||
| import com.linkedin.venice.kafka.protocol.GUID; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.UUID; | ||
|
|
||
|
|
||
| /** | ||
| * A GUID generator for Declaration of Leadership (DoL) control messages. | ||
| * | ||
| * <p>Uses {@link java.util.UUID#nameUUIDFromBytes(byte[])} to generate a type 3 UUID that will | ||
| * not collide with: | ||
| * <ul> | ||
| * <li>User data GUIDs (type 4, generated by {@link JavaUtilGuidV4Generator})</li> | ||
| * <li>Heartbeat control message GUIDs (type 3, generated by {@link HeartbeatGuidV3Generator})</li> | ||
| * </ul> | ||
| * | ||
| * <p>DoL messages use a distinct GUID to clearly identify leadership declaration messages | ||
| * during the leader handover process. This allows replicas to distinguish DoL messages from | ||
| * regular heartbeat messages and user data. | ||
| */ | ||
| public class DolGuidGenerator implements GuidGenerator { | ||
| private static DolGuidGenerator instance; | ||
|
|
||
| private DolGuidGenerator() { | ||
| } | ||
|
|
||
| public static synchronized DolGuidGenerator getInstance() { | ||
| if (instance == null) { | ||
| instance = new DolGuidGenerator(); | ||
| } | ||
| return instance; | ||
| } | ||
|
|
||
| @Override | ||
| public GUID getGuid() { | ||
| UUID javaUtilUuid = UUID.nameUUIDFromBytes("declarationOfLeadershipControlMessage".getBytes()); | ||
| GUID guid = new GUID(); | ||
| byte[] guidBytes = ByteBuffer.allocate(16) | ||
| .putLong(javaUtilUuid.getMostSignificantBits()) | ||
| .putLong(javaUtilUuid.getLeastSignificantBits()) | ||
| .array(); | ||
| guid.bytes(guidBytes); | ||
| return guid; | ||
| } | ||
| } |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class DolGuidGenerator appears to be a duplicate of DoLStampGuidGenerator (note the slight naming difference). Both classes:
- Use identical implementation (
UUID.nameUUIDFromBytes("declarationOfLeadershipControlMessage".getBytes())) - Have similar singleton patterns
- Serve the same purpose
This duplication can lead to confusion and maintenance issues. Consider consolidating into a single class with a consistent naming convention. The naming inconsistency (DolGuidGenerator vs DoLStampGuidGenerator) also makes it unclear which one should be used where.
Add SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM config to enable the new Declaration of Leadership (DoL) mechanism for fast leader handover. Changes: - Add SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM config key in ConfigKeys.java - Add leaderHandoverUseDoLMechanism field and getter in VeniceServerConfig - Refactor canSwitchToLeaderTopic() to check config and route logic - Extract canSwitchToLeaderTopicLegacy() with original time-based logic - Add comprehensive design document for DoL mechanism Default: false (maintains backward compatibility with legacy time-based mechanism) This is step 1 of the DoL implementation. The actual DoL loopback logic will be implemented in subsequent commits when the config is enabled. Add a separate config Create DoL message Add leadership term Add leadership term in LeaderSessionIdChecker
0998fab to
1bbff55
Compare
[server] Add DoL loopback to ensure new leader is fully caught up on VT
Newly elected leaders were re-consuming the NR source topic because the
promotion logic relied only on elapsed time since the last consumed
message before switching to the remote version topic (VT). This
time-based heuristic is insufficient and can cause duplicate consumption
and data inconsistencies.
Fix the issue by requiring the new leader to produce a Declaration-of-
Leadership (DoL) marker to the local VT and wait until it consumes that
same marker back. This provides a deterministic guarantee that the
leader has fully caught up on VT before switching to RT or NR sources.
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?