From acf9f86205f9e2fa20bba119d311082780bfdf55 Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Mon, 17 Nov 2025 23:34:05 -0800 Subject: [PATCH 1/4] [server] Add config for Declaration of Leadership mechanism Add SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM config to enable the new Declaration of Leadership (DoL) mechanism for fast leader handover. Changes: - Add SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM config key in ConfigKeys.java - Add leaderHandoverUseDoLMechanism field and getter in VeniceServerConfig - Refactor canSwitchToLeaderTopic() to check config and route logic - Extract canSwitchToLeaderTopicLegacy() with original time-based logic - Add comprehensive design document for DoL mechanism Default: false (maintains backward compatibility with legacy time-based mechanism) This is step 1 of the DoL implementation. The actual DoL loopback logic will be implemented in subsequent commits when the config is enabled. Add a separate config Create DoL message Add leadership term Add leadership term in LeaderSessionIdChecker --- .../davinci/config/VeniceServerConfig.java | 16 + .../LeaderFollowerPartitionStateModel.java | 20 +- .../LeaderFollowerStoreIngestionTask.java | 136 ++++ .../consumer/PartitionConsumptionState.java | 80 ++ .../declaration_of_leadership_design.md | 765 ++++++++++++++++++ .../java/com/linkedin/venice/ConfigKeys.java | 45 ++ .../venice/guid/DoLStampGuidGenerator.java | 41 + .../com/linkedin/venice/message/KafkaKey.java | 26 + .../linkedin/venice/writer/VeniceWriter.java | 63 +- .../utils/VeniceServerWrapper.java | 4 + 10 files changed, 1194 insertions(+), 2 deletions(-) create mode 100644 docs/dev_guide/declaration_of_leadership_design.md create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/guid/DoLStampGuidGenerator.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index b69c9e55653..58ab34634ca 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -133,6 +133,8 @@ import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS; import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS; +import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES; +import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES; import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES; import static com.linkedin.venice.ConfigKeys.SERVER_LOAD_CONTROLLER_ACCEPT_MULTIPLIER; @@ -666,6 +668,8 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final boolean validateSpecificSchemaEnabled; private final boolean useMetricsBasedPositionInLagComputation; + private final boolean leaderHandoverUseDoLMechanismForSystemStores; + private final boolean leaderHandoverUseDoLMechanismForUserStores; private final LogContext logContext; private final IngestionTaskReusableObjects.Strategy ingestionTaskReusableObjectsStrategy; private final boolean keyUrnCompressionEnabled; @@ -1149,6 +1153,10 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map veniceWriter = partitionConsumptionState.getVeniceWriterLazyRef().get(); + DolState dolState = new DolState(leadershipTerm, veniceWriter.getWriterId()); + partitionConsumptionState.setDolState(dolState); + LOGGER.info( + "Initialized DoL state: {} for replica: {} with term: {} and hostId: {}", + dolState, + partitionConsumptionState.getReplicaId(), + leadershipTerm, + veniceWriter.getWriterId()); + + // Send DolStamp to local VT + PubSubProducerCallback dolCallback = new DolStampProduceCallback(partitionConsumptionState, leadershipTerm); + CompletableFuture dolProduceFuture = + veniceWriter.sendDoLStamp(partitionConsumptionState.getReplicaTopicPartition(), dolCallback, leadershipTerm); + + // Store the produce future in DolState + dolState.setDolProduceFuture(dolProduceFuture); + + // Chain logging for produce completion + dolProduceFuture.thenAccept((result) -> { + LOGGER.info( + "DoL stamp produced for replica: {} at offset: {} for term: {}", + partitionConsumptionState.getReplicaId(), + result.getPubSubPosition(), + leadershipTerm); + }).exceptionally((ex) -> { + LOGGER.error( + "Failed to produce DoL stamp for replica: {} for term: {}", + partitionConsumptionState.getReplicaId(), + leadershipTerm, + ex); + return null; + }); + } + + /** + * Callback to handle DoL message produce completion. + */ + private static class DolStampProduceCallback implements PubSubProducerCallback { + private final PartitionConsumptionState pcs; + private final long leadershipTerm; + + public DolStampProduceCallback(PartitionConsumptionState pcs, long leadershipTerm) { + this.pcs = pcs; + this.leadershipTerm = leadershipTerm; + } + + @Override + public void onCompletion(PubSubProduceResult produceResult, Exception exception) { + if (exception != null) { + LOGGER.error( + "Failed to produce DoL message for partition {} with term {}", + pcs.getPartition(), + leadershipTerm, + exception); + // Clear DoL state on failure + pcs.clearDolState(); + return; + } + + // Mark DoL as produced + DolState dolState = pcs.getDolState(); + if (dolState != null && dolState.getLeadershipTerm() == leadershipTerm) { + dolState.setDolProduced(true); + LOGGER.info( + "DoL message produce confirmed for partition {} at position {} (term: {}) - dolState: {}", + pcs.getPartition(), + produceResult.getPubSubPosition(), + leadershipTerm, + dolState); + } else { + LOGGER.warn( + "DoL state mismatch or null for partition {} - expected term: {}, dolState: {}", + pcs.getPartition(), + leadershipTerm, + dolState); + } + } + } + /** * TODO: Replace this mechanism with loopback logic. * The leader replica should append a special message (e.g., Declaration of Leadership with term info) @@ -858,6 +963,37 @@ protected static boolean checkWhetherToCloseUnusedVeniceWriter( * This is part of the fast leadership handover project. */ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) { + // Check if DoL mechanism is enabled via config (system stores vs user stores) + DolState dolState = pcs.getDolState(); + if (shouldUseDolMechanism() && dolState != null) { + // Check if DoL state is ready (both produced and consumed) + if (dolState.isReady()) { + LOGGER.info( + "DoL mechanism complete for replica: {} - switching to leader topic. DolState: {}", + pcs.getReplicaId(), + dolState); + // Clear DoL state as we're done with this transition + pcs.clearDolState(); + return true; + } + + // DoL not ready yet, stay on local VT + LOGGER.info("DoL mechanism not ready for partition {} - DolState: {}", pcs.getReplicaId(), dolState); + return false; + } else { + // Use legacy time-based mechanism + return canSwitchToLeaderTopicLegacy(pcs); + } + } + + /** + * Legacy mechanism for determining when a replica can switch to consuming from the leader (RT) topic. + * Uses time-based waiting and special handling for user system stores. + * + *

This will be replaced by the DoL (Declaration of Leadership) mechanism when + * {@code server.leader.handover.use.dol.mechanism} is enabled. + */ + private boolean canSwitchToLeaderTopicLegacy(PartitionConsumptionState pcs) { /** * Potential risk: it's possible that Kafka consumer would starve one of the partitions for a long * time even though there are new messages in it, so it's possible that the old leader is still producing diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index a6e26a69da0..9f316f49ceb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -18,6 +18,7 @@ import com.linkedin.venice.pubsub.PubSubContext; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.api.PubSubPosition; +import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -115,6 +116,68 @@ enum LatchStatus { NONE, LATCH_CREATED, LATCH_RELEASED } + /** + * Tracks Declaration of Leadership (DoL) state during STANDBY to LEADER transition. + * DoL mechanism ensures the new leader is fully caught up with VT before switching to remote VT or RT. + */ + static class DolState { + private final long leadershipTerm; + private final String hostId; + private volatile boolean dolProduced; // DoL message was acked by broker + private volatile boolean dolConsumed; // DoL message was consumed back by this replica + private volatile CompletableFuture dolProduceFuture; // Future tracking DoL produce result + + public DolState(long leadershipTerm, String hostId) { + this.leadershipTerm = leadershipTerm; + this.hostId = hostId; + this.dolProduced = false; + this.dolConsumed = false; + this.dolProduceFuture = null; + } + + public long getLeadershipTerm() { + return leadershipTerm; + } + + public String getHostId() { + return hostId; + } + + public boolean isDolProduced() { + return dolProduced; + } + + public void setDolProduced(boolean dolProduced) { + this.dolProduced = dolProduced; + } + + public boolean isDolConsumed() { + return dolConsumed; + } + + public void setDolConsumed(boolean dolConsumed) { + this.dolConsumed = dolConsumed; + } + + public CompletableFuture getDolProduceFuture() { + return dolProduceFuture; + } + + public void setDolProduceFuture(CompletableFuture dolProduceFuture) { + this.dolProduceFuture = dolProduceFuture; + } + + public boolean isReady() { + return dolProduced && dolConsumed; + } + + @Override + public String toString() { + return "DolState{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed=" + + dolConsumed + ", futureSet=" + (dolProduceFuture != null) + "}"; + } + } + /** * Only used in L/F model. Check if the partition has released the latch. * In L/F ingestion task, Optionally, the state model holds a latch that @@ -126,6 +189,11 @@ enum LatchStatus { */ private final AtomicReference latchStatus = new AtomicReference<>(LatchStatus.NONE); + /** + * Tracks DoL state during STANDBY to LEADER transition. Null when not in transition or DoL not enabled. + */ + private volatile DolState dolState = null; + /** * This future is completed in drainer thread after persisting the associated record and offset to DB. */ @@ -522,6 +590,18 @@ public final LeaderFollowerStateType getLeaderFollowerState() { return this.leaderFollowerState; } + public DolState getDolState() { + return this.dolState; + } + + public void setDolState(DolState dolState) { + this.dolState = dolState; + } + + public void clearDolState() { + this.dolState = null; + } + public void setLastLeaderPersistFuture(Future future) { this.lastLeaderPersistFuture = future; } diff --git a/docs/dev_guide/declaration_of_leadership_design.md b/docs/dev_guide/declaration_of_leadership_design.md new file mode 100644 index 00000000000..3e526518dee --- /dev/null +++ b/docs/dev_guide/declaration_of_leadership_design.md @@ -0,0 +1,765 @@ +# Declaration of Leadership (DoL) Design for Fast Leader Handover + +## Problem Statement + +The current `canSwitchToLeaderTopic()` implementation relies on a time-based mechanism (waiting for `newLeaderInactiveTime`) to determine when it's safe to switch from consuming the local version topic (VT) to the leader source topic (remote VT during batch push, or RT topic for hybrid stores). This approach has reliability issues: + +1. **Non-deterministic timing**: Potential Kafka consumer starvation could cause the old leader to continue producing even after the wait period +2. **Downstream detection burden**: Followers must detect upstream offset rewind via producer hostname changes +3. **Lack of deterministic confirmation**: No explicit confirmation that the new leader has established its position in the VT + +## Proposed Solution: Declaration of Leadership (DoL) + +Use a **loopback mechanism** where the new leader explicitly declares its leadership by: +1. Appending a special DoL message to the local VT +2. Waiting until it consumes this message back from the VT (loopback confirmation) +3. Only after confirmation, switching to consume from the leader source topic (remote VT or RT) + +This provides a **deterministic guarantee** that the leader has successfully written to and consumed from the VT before transitioning to the leader source topic. + +--- + +## Design Details + +### 1. DoL Message Format + +**Use `START_OF_SEGMENT` control message with special metadata and dedicated key:** + +- **Message Type**: `ControlMessageType.START_OF_SEGMENT` +- **Key**: `KafkaKey.DECLARATION_OF_LEADERSHIP` (new static constant to add to `KafkaKey.java`) + - Similar structure to `KafkaKey.HEART_BEAT` but with distinct GUID for semantic clarity + - Distinguishes DoL messages from regular heartbeat messages + - Enables clean filtering and monitoring of DoL messages +- **Special Metadata** (in `StartOfSegment` payload): + - **`isDeclarationOfLeadership`**: boolean flag (new field to add to `StartOfSegment.avsc`) + - **`leadershipTerm`**: long value representing the leadership term/epoch + - **`producerGUID`**: GUID identifying the new leader replica + - **`declarationTimestamp`**: timestamp when DoL was issued + +**Why START_OF_SEGMENT with dedicated DoL key?** +- Semantic alignment: declaring leadership is starting a new segment of leader activity +- Minimal schema changes: add optional fields to existing `StartOfSegment` message +- Existing infrastructure already handles START_OF_SEGMENT control messages +- **Dedicated key prevents confusion with heartbeat messages** and enables clear distinction +- Clean separation of concerns: DoL != heartbeat + +### 2. State Tracking in PartitionConsumptionState + +Add new fields to track DoL lifecycle: + +```java +/** + * Tracks the Declaration of Leadership (DoL) state for fast leader handover. + * Only relevant when transitioning from STANDBY to LEADER. + */ +public static class DeclarationOfLeadershipState { + // Whether DoL message has been produced to local VT + private volatile boolean dolMessageProduced = false; + + // The future tracking the DoL message produce operation + private volatile CompletableFuture dolProduceFuture = null; + + // The position where DoL message was produced (null until confirmed) + private volatile PubSubPosition dolPosition = null; + + // The leadership term included in the DoL message + private volatile long leadershipTerm = 0; + + // Timestamp when DoL was produced + private volatile long dolProducedTimestamp = 0; + + // Whether we've consumed the DoL message back (loopback confirmation) + private volatile boolean dolConsumed = false; + + // Timestamp when DoL was consumed back + private volatile long dolConsumedTimestamp = 0; + + // Reset state when transitioning out of leader or back to standby + public void reset() { + this.dolMessageProduced = false; + this.dolProduceFuture = null; + this.dolPosition = null; + this.leadershipTerm = 0; + this.dolProducedTimestamp = 0; + this.dolConsumed = false; + this.dolConsumedTimestamp = 0; + } + + // Getters and setters... +} + +// In PartitionConsumptionState class: +private final DeclarationOfLeadershipState dolState = new DeclarationOfLeadershipState(); + +public DeclarationOfLeadershipState getDolState() { + return dolState; +} +``` + +### 3. Updated `canSwitchToLeaderTopic()` Logic + +Replace the time-based check with DoL-based confirmation: + +```java +/** + * Checks whether the replica can safely switch to consuming from the leader source topic. + * + *

Uses Declaration of Leadership (DoL) loopback mechanism: + * 1. DoL message must have been produced to local VT + * 2. DoL message must have been consumed back from local VT (loopback confirmation) + * + *

The DoL loopback confirmation provides a strong guarantee that: + * - The new leader can successfully write to the local VT + * - The new leader can successfully consume from the local VT + * - The new leader has established its position in the VT before switching to leader source topic + * - **The new leader has fully caught up on the VT** (consuming the DoL back means + * the leader has processed all messages up to and including its own DoL message) + * + *

This eliminates the need for time-based waits or special handling for different + * store types (e.g., user system stores). The loopback mechanism is sufficient for all stores + * because consuming the DoL back is inherently a confirmation of being caught up on VT. + * + *

The leader source topic can be either: + * - Remote VT during batch push (before EOP) + * - RT topic for hybrid stores (after EOP) + * + * @param pcs partition consumption state + * @return true if safe to switch to leader topic; false otherwise + */ +private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) { + DeclarationOfLeadershipState dolState = pcs.getDolState(); + + // Step 1: Check if DoL message has been produced + if (!dolState.isDolMessageProduced()) { + LOGGER.debug( + "Cannot switch to leader topic for partition {}: DoL message not yet produced", + pcs.getPartition()); + return false; + } + + // Step 2: Check if DoL message has been consumed back (loopback confirmation) + if (!dolState.isDolConsumed()) { + // Check if DoL produce operation is still pending + CompletableFuture produceFuture = dolState.getDolProduceFuture(); + if (produceFuture != null && !produceFuture.isDone()) { + LOGGER.debug( + "Cannot switch to leader topic for partition {}: DoL produce operation still pending", + pcs.getPartition()); + return false; + } + + // Produce completed but not consumed yet - this is expected during normal operation + LOGGER.debug( + "Cannot switch to leader topic for partition {}: DoL message produced at {} but not yet consumed back", + pcs.getPartition(), + dolState.getDolPosition()); + return false; + } + + // All conditions met - safe to switch to leader topic + // The DoL loopback confirmation is sufficient for all store types + long loopbackLatency = dolState.getDolConsumedTimestamp() - dolState.getDolProducedTimestamp(); + LOGGER.info( + "Ready to switch to leader topic for partition {}: DoL loopback confirmed (latency: {}ms, term: {})", + pcs.getPartition(), + loopbackLatency, + dolState.getLeadershipTerm()); + + return true; +} +``` + +### 4. DoL Message Production + +Produce DoL message when entering `IN_TRANSITION_FROM_STANDBY_TO_LEADER` state: + +```java +/** + * Produces a Declaration of Leadership (DoL) message to the local VT. + * This is called when transitioning from STANDBY to LEADER state. + * + * @param pcs partition consumption state + */ +private void produceDeclarationOfLeadership(PartitionConsumptionState pcs) { + DeclarationOfLeadershipState dolState = pcs.getDolState(); + + // Avoid duplicate production + if (dolState.isDolMessageProduced()) { + LOGGER.warn( + "DoL message already produced for partition {}, skipping duplicate production", + pcs.getPartition()); + return; + } + + // Generate leadership term (can use timestamp or counter) + long leadershipTerm = System.currentTimeMillis(); + dolState.setLeadershipTerm(leadershipTerm); + + // Create DoL message using START_OF_SEGMENT with special metadata + StartOfSegment sosPayload = new StartOfSegment(); + sosPayload.isDeclarationOfLeadership = true; // New field + sosPayload.leadershipTerm = leadershipTerm; // New field + sosPayload.checksumType = CheckSumType.NONE.getValue(); + + // Wrap in control message + ControlMessage controlMessage = new ControlMessage(); + controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); + controlMessage.controlMessageUnion = sosPayload; + + // Produce to local VT with DECLARATION_OF_LEADERSHIP key + int partition = pcs.getPartition(); + VeniceWriter writer = getVeniceWriter(pcs); + + try { + long producedTimestamp = System.currentTimeMillis(); + dolState.setDolProducedTimestamp(producedTimestamp); + + CompletableFuture produceFuture = + writer.sendControlMessage( + KafkaKey.DECLARATION_OF_LEADERSHIP, + controlMessage, + partition, + new DolProduceCallback(pcs, leadershipTerm, producedTimestamp)); + + dolState.setDolProduceFuture(produceFuture); + dolState.setDolMessageProduced(true); + + LOGGER.info( + "Produced DoL message to local VT for partition {} with term {}", + partition, + leadershipTerm); + + } catch (Exception e) { + dolState.reset(); + throw new VeniceException( + "Failed to produce DoL message for partition " + partition, + e); + } +} + +/** + * Callback to handle DoL message produce completion. + */ +private class DolProduceCallback implements PubSubProducerCallback { + private final PartitionConsumptionState pcs; + private final long leadershipTerm; + private final long producedTimestamp; + + public DolProduceCallback( + PartitionConsumptionState pcs, + long leadershipTerm, + long producedTimestamp) { + this.pcs = pcs; + this.leadershipTerm = leadershipTerm; + this.producedTimestamp = producedTimestamp; + } + + @Override + public void onCompletion(PubSubProduceResult produceResult, Exception exception) { + if (exception != null) { + LOGGER.error( + "Failed to produce DoL message for partition {} with term {}", + pcs.getPartition(), + leadershipTerm, + exception); + pcs.getDolState().reset(); + return; + } + + // Record the position where DoL was produced + PubSubPosition dolPosition = produceResult.getPosition(); + pcs.getDolState().setDolPosition(dolPosition); + + LOGGER.info( + "DoL message produce confirmed for partition {} at position {} (term: {})", + pcs.getPartition(), + dolPosition, + leadershipTerm); + } +} +``` + +### 5. DoL Message Consumption Detection + +Detect when the DoL message is consumed back: + +```java +/** + * Checks if the consumed control message is the DoL message we're waiting for. + * Called during control message processing in the consumption path. + * + * @param consumerRecord the consumed record + * @param controlMessage the control message payload + * @param pcs partition consumption state + * @return true if this is the DoL message we're waiting for + */ +private boolean checkAndHandleDeclarationOfLeadership( + DefaultPubSubMessage consumerRecord, + ControlMessage controlMessage, + PartitionConsumptionState pcs) { + + // Only check during transition to leader + if (pcs.getLeaderFollowerState() != IN_TRANSITION_FROM_STANDBY_TO_LEADER) { + return false; + } + + // Must be START_OF_SEGMENT with DECLARATION_OF_LEADERSHIP key + if (ControlMessageType.valueOf(controlMessage) != START_OF_SEGMENT) { + return false; + } + + if (!Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.DECLARATION_OF_LEADERSHIP.getKey())) { + return false; + } + + // Check if this is a DoL message (verify the flag is set) + StartOfSegment sos = (StartOfSegment) controlMessage.controlMessageUnion; + if (!sos.isDeclarationOfLeadership) { + return false; // Not a DoL message, possibly malformed + } + + // Verify this is OUR DoL message (match leadership term) + DeclarationOfLeadershipState dolState = pcs.getDolState(); + if (sos.leadershipTerm != dolState.getLeadershipTerm()) { + LOGGER.warn( + "Consumed DoL message with mismatched term for partition {}: expected {}, got {}", + pcs.getPartition(), + dolState.getLeadershipTerm(), + sos.leadershipTerm); + return false; + } + + // Loopback confirmed! + long consumedTimestamp = System.currentTimeMillis(); + dolState.setDolConsumed(true); + dolState.setDolConsumedTimestamp(consumedTimestamp); + + long loopbackLatency = consumedTimestamp - dolState.getDolProducedTimestamp(); + LOGGER.info( + "DoL loopback confirmed for partition {}: term={}, loopback_latency={}ms, position={}", + pcs.getPartition(), + sos.leadershipTerm, + loopbackLatency, + consumerRecord.getPosition()); + + // Emit metrics for monitoring + emitDolLoopbackMetric(pcs.getPartition(), loopbackLatency); + + return true; +} +``` + +### 6. Integration into State Transition Logic + +Update the state transition flow to use DoL: + +```java +// In checkConsumptionStateForStoreMetadataChanges() method + +case IN_TRANSITION_FROM_STANDBY_TO_LEADER: + // NEW: Produce DoL message if not already produced + DeclarationOfLeadershipState dolState = partitionConsumptionState.getDolState(); + if (!dolState.isDolMessageProduced()) { + produceDeclarationOfLeadership(partitionConsumptionState); + } + + // Check if we can switch to leader topic (now uses DoL confirmation) + if (canSwitchToLeaderTopic(partitionConsumptionState)) { + LOGGER.info( + "Initiating promotion of replica: {} to leader for the partition. Unsubscribing from the current topic: {}", + partitionConsumptionState.getReplicaId(), + kafkaVersionTopic); + + // Unsubscribe from VT and switch to RT... + // (existing logic continues) + + // IMPORTANT: Reset DoL state after successful transition + dolState.reset(); + } + break; + +// In processControlMessage() method +case START_OF_SEGMENT: + // Check if this is a DoL message we're waiting for + if (checkAndHandleDeclarationOfLeadership(consumerRecord, controlMessage, partitionConsumptionState)) { + // DoL loopback confirmed - canSwitchToLeaderTopic() will now return true + } + // ... existing START_OF_SEGMENT handling ... + break; +``` + +### 7. State Reset and Cleanup + +Reset DoL state in appropriate scenarios: + +```java +// When transitioning back to STANDBY or on partition unsubscribe +private void resetDeclarationOfLeadershipState(PartitionConsumptionState pcs) { + pcs.getDolState().reset(); + LOGGER.debug("Reset DoL state for partition {}", pcs.getPartition()); +} + +// Call in: +// 1. LEADER -> STANDBY transition +// 2. Partition unsubscribe +// 3. Ingestion task shutdown +// 4. Error handling paths +``` + +--- + +## Schema Changes + +### 1. Create v14 of KafkaMessageEnvelope and add fields to StartOfSegment + +**Path**: `/internal/venice-common/src/main/resources/avro/KafkaMessageEnvelope/v14/KafkaMessageEnvelope.avsc` + +Copy v13 and add these two fields to the `StartOfSegment` record (within the `controlMessageUnion`): + +```json +{ + "name": "StartOfSegment", + "type": "record", + "fields": [ + { + "name": "checksumType", + "type": "int" + }, { + "name": "upcomingAggregates", + "type": { + "type": "array", + "items": "string" + } + }, { + "name": "isDeclarationOfLeadership", + "type": "boolean", + "default": false, + "doc": "Flag indicating this START_OF_SEGMENT is a Declaration of Leadership (DoL) message for fast leader handover" + }, { + "name": "leadershipTerm", + "type": "long", + "default": 0, + "doc": "Leadership term/epoch for DoL messages. Used to match produced and consumed DoL messages during loopback" + } + ] +} +``` + +These are **optional fields with defaults**, ensuring **backward compatibility** with existing v13 producers/consumers. + +After adding the schema, update the protocol version mapping in the codebase to register v14. + +### 2. Add new constant to `KafkaKey.java`: + +```java +/** + * Special key for Declaration of Leadership (DoL) control messages. + * Used during leader handover to confirm the new leader can write to and consume from the local VT. + * This is distinct from HEART_BEAT to clearly separate leadership declaration from heartbeat semantics. + */ +public static final KafkaKey DECLARATION_OF_LEADERSHIP = new KafkaKey( + MessageType.CONTROL_MESSAGE, + ByteBuffer.allocate(CONTROL_MESSAGE_KAFKA_KEY_LENGTH) + .put(DolGuidGenerator.getInstance().getGuid().bytes()) // Use distinct GUID generator + .putInt(0) // segment number + .putInt(0) // sequence number + .array()); +``` + +**Note**: Create a dedicated `DolGuidGenerator` similar to `HeartbeatGuidV3Generator` to ensure DoL messages have a distinct, recognizable GUID pattern. + +--- + +## Configuration + +The DoL mechanism is controlled by **two separate configuration flags** to enable independent rollout for system stores and user stores: + +### 1. System Stores Configuration + +**Config Key**: `server.leader.handover.use.dol.mechanism.for.system.stores` + +**Purpose**: Controls DoL mechanism for system stores (meta stores, push status stores, etc.) + +**Default**: `false` (uses legacy time-based mechanism) + +**Usage in VeniceServerConfig**: +```java +private final boolean leaderHandoverUseDoLMechanismForSystemStores; + +public VeniceServerConfig(VeniceProperties serverProperties) { + this.leaderHandoverUseDoLMechanismForSystemStores = + serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES, false); +} + +public boolean isLeaderHandoverUseDoLMechanismEnabledForSystemStores() { + return this.leaderHandoverUseDoLMechanismForSystemStores; +} +``` + +### 2. User Stores Configuration + +**Config Key**: `server.leader.handover.use.dol.mechanism.for.user.stores` + +**Purpose**: Controls DoL mechanism for user stores (regular application data stores) + +**Default**: `false` (uses legacy time-based mechanism) + +**Usage in VeniceServerConfig**: +```java +private final boolean leaderHandoverUseDoLMechanismForUserStores; + +public VeniceServerConfig(VeniceProperties serverProperties) { + this.leaderHandoverUseDoLMechanismForUserStores = + serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES, false); +} + +public boolean isLeaderHandoverUseDoLMechanismEnabledForUserStores() { + return this.leaderHandoverUseDoLMechanismForUserStores; +} +``` + +### 3. Config Selection Logic + +The `canSwitchToLeaderTopic()` method selects the appropriate config based on store type: + +```java +private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) { + // Check if DoL mechanism is enabled via config (system stores vs user stores) + boolean useDoLMechanism = isIngestingSystemStore() + ? serverConfig.isLeaderHandoverUseDoLMechanismEnabledForSystemStores() + : serverConfig.isLeaderHandoverUseDoLMechanismEnabledForUserStores(); + + if (useDoLMechanism) { + // Use DoL-based logic + return checkDoLLoopbackConfirmation(pcs); + } else { + // Use legacy time-based mechanism + return canSwitchToLeaderTopicLegacy(pcs); + } +} +``` + +### 4. Rollout Strategy + +Having separate configs enables a **phased rollout approach**: + +**Phase 1: System Stores First** +- Set `server.leader.handover.use.dol.mechanism.for.system.stores=true` +- System stores typically have lower volume and are easier to monitor +- Validate DoL mechanism works correctly on critical infrastructure stores + +**Phase 2: User Stores Rollout** +- After system stores are stable, enable for user stores +- Set `server.leader.handover.use.dol.mechanism.for.user.stores=true` +- Can roll out gradually by store or by region + +**Phase 3: Full Adoption** +- Both configs enabled cluster-wide +- Monitor metrics to ensure improved handover latency +- Eventual deprecation of legacy time-based mechanism + +### 5. Benefits of Separate Configs + +- **Risk Mitigation**: Test on critical system stores before wider user store rollout +- **Independent Control**: Disable DoL for user stores without affecting system stores (or vice versa) +- **Gradual Migration**: Roll out to different store types at different paces +- **Operational Flexibility**: Quick rollback per store type if issues arise +- **A/B Testing**: Compare DoL vs. legacy behavior for different store types + +--- + +## Benefits of DoL Approach + +### 1. **Deterministic Confirmation** +- Explicit proof that new leader can write to and read from VT +- No reliance on timing assumptions or probabilistic waits + +### 2. **Fast Handover** +- Loopback latency typically milliseconds (vs. minutes with time-based approach) +- Eliminates unnecessary `newLeaderInactiveTime` wait period + +### 3. **Strong Consistency Guarantee** +- New leader has confirmed its position in VT before consuming RT +- Prevents split-brain scenarios where old/new leaders overlap +- **Consuming DoL back is inherent confirmation of being fully caught up on VT** +- Eliminates need for special-case logic (e.g., `isUserSystemStore()` checks) + +### 4. **Unified Logic Across Store Types** +- DoL loopback confirmation means the leader has processed all VT messages up to its DoL +- No need for separate `isLocalVersionTopicPartitionFullyConsumed()` checks for user system stores +- Single, consistent handover mechanism for all stores (user stores, system stores, hybrid stores) +- Simplifies code and reduces maintenance burden + +### 5. **Observability** +- DoL loopback latency metric tracks handover speed +- Clear state tracking for debugging and monitoring +- Explicit log messages for handover progress + +### 5. **Graceful Degradation** +- If DoL message never loops back (e.g., VT issues), state transition won't proceed +- Prevents unsafe promotion to leader + +### 6. **Backward Compatible** +- New DoL fields are optional in `StartOfSegment` +- Existing replicas ignore DoL messages (see them as regular heartbeats) +- Gradual rollout possible + +--- + +## Metrics and Monitoring + +Add metrics to track DoL mechanism: + +```java +// DoL loopback latency histogram +hostLevelIngestionStats.recordDolLoopbackLatency(storeName, partition, loopbackLatency); + +// DoL state gauges +hostLevelIngestionStats.recordDolProduced(storeName, partition, 1); +hostLevelIngestionStats.recordDolConsumed(storeName, partition, 1); + +// DoL failures counter +hostLevelIngestionStats.recordDolFailure(storeName, partition, reason); +``` + +--- + +## Testing Strategy + +### Unit Tests +1. Test DoL state transitions +2. Test DoL message production +3. Test DoL message consumption detection +4. Test term matching logic +5. Test state reset scenarios + +### Integration Tests +1. Leader handover with DoL mechanism end-to-end +2. Multiple concurrent leader transitions +3. DoL message loss scenarios (retry/timeout) +4. Backward compatibility with old replicas + +### Stress Tests +1. Rapid leader failover cycles +2. High partition count scenarios +3. Network partition during DoL loopback + +--- + +## Migration and Rollout + +### Phase 1: Schema Evolution +- Deploy `StartOfSegment` schema with new optional fields (`isDeclarationOfLeadership`, `leadershipTerm`) +- Add `KafkaKey.DECLARATION_OF_LEADERSHIP` constant and `DolGuidGenerator` +- No behavior changes yet - configs remain `false` by default + +### Phase 2: DoL Code Deployment +- Deploy DoL production and consumption logic +- Both configs (`for.system.stores` and `for.user.stores`) default to `false` +- Code is in place but not activated + +### Phase 3: Enable DoL for System Stores (Canary) +- Set `server.leader.handover.use.dol.mechanism.for.system.stores=true` on canary servers +- Monitor DoL loopback metrics for system stores (meta stores, push status stores) +- Validate DoL messages are produced and consumed correctly +- User stores continue using legacy time-based mechanism + +### Phase 4: System Stores Full Rollout +- After canary validation, enable DoL for all system stores cluster-wide +- Set `server.leader.handover.use.dol.mechanism.for.system.stores=true` globally +- Monitor for 1-2 weeks to ensure stability + +### Phase 5: Enable DoL for User Stores (Gradual) +- Start with low-traffic user stores or specific regions +- Set `server.leader.handover.use.dol.mechanism.for.user.stores=true` gradually +- Monitor handover latency improvements and error rates +- Expand to high-traffic stores after validation + +### Phase 6: Full Adoption +- Both configs enabled cluster-wide +- System stores and user stores using DoL mechanism +- Legacy time-based logic remains as fallback in code + +### Phase 7: Cleanup (Future) +- After 6-12 months of stable operation, consider removing legacy code +- Deprecate and remove `canSwitchToLeaderTopicLegacy()` method +- Remove time-based fallback logic + +--- + +## Edge Cases and Error Handling + +### 1. DoL Message Lost +- **Symptom**: DoL produced but never consumed back +- **Handling**: Add timeout (e.g., 2 ร— `newLeaderInactiveTime`) +- **Recovery**: Retry DoL production or fall back to time-based check + +### 2. Concurrent Leader Transitions +- **Symptom**: Multiple DoL messages with different terms +- **Handling**: Term matching ensures we only confirm OUR DoL +- **Recovery**: Reset on state transitions, only track latest term + +### 3. VT Write Failures +- **Symptom**: DoL produce operation fails +- **Handling**: Reset DoL state, retry or escalate error +- **Recovery**: Don't transition to LEADER without DoL confirmation + +### 4. Partition Reassignment During DoL +- **Symptom**: Partition unsubscribed while waiting for DoL +- **Handling**: Reset DoL state on unsubscribe +- **Recovery**: New state model invocation starts fresh DoL cycle + +### 5. Old Leader Still Producing +- **Symptom**: Old leader continues after new leader DoL +- **Handling**: Term/GUID in DoL helps identify producer +- **Recovery**: Followers detect via changed producer metadata + +--- + +## Alternative Considered: Explicit DoL Control Message Type + +Instead of reusing `START_OF_SEGMENT`, we could create a new `ControlMessageType.DECLARATION_OF_LEADERSHIP`. + +**Pros:** +- Clearer semantic meaning +- Dedicated schema without mixing concerns + +**Cons:** +- Requires new control message type (more schema evolution) +- Less reuse of existing infrastructure +- More code changes across consumers + +**Decision**: Use `START_OF_SEGMENT` with special flag for simpler implementation and better reuse. + +--- + +## Summary + +The Declaration of Leadership (DoL) mechanism provides a **deterministic, fast, and reliable** approach to leader handover by: + +1. **Loopback Confirmation**: New leader produces DoL message to VT and waits to consume it back +2. **Explicit State Tracking**: `DeclarationOfLeadershipState` in `PartitionConsumptionState` tracks DoL lifecycle +3. **Deterministic Verification**: Replaces unreliable time-based waits with position-based verification +4. **Unified Logic**: Same handover mechanism for all store types (no special `isUserSystemStore()` checks) +5. **Backward Compatibility**: Optional schema fields with defaults, dedicated `KafkaKey.DECLARATION_OF_LEADERSHIP` + +### Key Implementation Components + +**Schema/Protocol Changes:** +- Add `isDeclarationOfLeadership` and `leadershipTerm` fields to `StartOfSegment.avsc` +- Add `KafkaKey.DECLARATION_OF_LEADERSHIP` constant with dedicated GUID generator +- Add `DeclarationOfLeadershipState` class to track DoL lifecycle + +**Core Logic Changes:** +- `produceDeclarationOfLeadership()`: Produces DoL message when entering `IN_TRANSITION_FROM_STANDBY_TO_LEADER` +- `checkAndHandleDeclarationOfLeadership()`: Detects DoL message consumption with term matching +- `canSwitchToLeaderTopic()`: Simplified to check only DoL confirmation (no time-based or store-type logic) + +**Benefits:** +- โšก **Millisecond handover** vs. minutes with time-based approach +- ๐ŸŽฏ **Deterministic** confirmation of VT write/read capability and caught-up state +- ๐Ÿ”’ **Safe** prevention of split-brain scenarios +- ๐Ÿ“Š **Observable** with clear metrics and state tracking +- ๐Ÿงน **Simplified** codebase with unified logic for all store types + +This design eliminates the race conditions and timing dependencies of the current implementation while providing clear observability into the handover process. diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index ff88ab38dbd..acf8e20624a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -731,6 +731,51 @@ private ConfigKeys() { public static final String SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION = "server.use.metrics.based.position.in.lag.computation"; + /** + * Controls whether to use the Declaration of Leadership (DoL) mechanism for leader handover + * of system stores (meta stores, push status stores, etc.). + * + *

When enabled, the new leader replica will: + * 1. Produce a Declaration of Leadership message to the local version topic (VT) + * 2. Wait until it consumes this message back from VT (loopback confirmation) + * 3. Only then switch to consuming from the leader source topic (remote VT during batch push, + * or RT topic for hybrid stores post-EOP) + * + *

This provides a deterministic guarantee that the leader has successfully written to + * and consumed from the local VT before transitioning to the leader source topic, eliminating + * the need for time-based waits. + * + *

System stores typically benefit more from fast handover due to their critical role in + * cluster operations. Having a separate config allows independent rollout strategy. + * + * Default: false (uses legacy time-based mechanism) + */ + public static final String SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES = + "server.leader.handover.use.dol.mechanism.for.system.stores"; + + /** + * Controls whether to use the Declaration of Leadership (DoL) mechanism for leader handover + * of user stores (regular application data stores). + * + *

When enabled, the new leader replica will: + * 1. Produce a Declaration of Leadership message to the local version topic (VT) + * 2. Wait until it consumes this message back from VT (loopback confirmation) + * 3. Only then switch to consuming from the leader source topic (remote VT during batch push, + * or RT topic for hybrid stores post-EOP) + * + *

This provides a deterministic guarantee that the leader has successfully written to + * and consumed from the local VT before transitioning to the leader source topic, eliminating + * the need for time-based waits. + * + *

Having a separate config from system stores allows independent rollout - typically you + * would enable DoL for system stores first, validate it works correctly, then roll out to + * user stores. + * + * Default: false (uses legacy time-based mechanism) + */ + public static final String SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES = + "server.leader.handover.use.dol.mechanism.for.user.stores"; + public static final String SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS = "server.netty.graceful.shutdown.period.seconds"; public static final String SERVER_NETTY_WORKER_THREADS = "server.netty.worker.threads"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/guid/DoLStampGuidGenerator.java b/internal/venice-common/src/main/java/com/linkedin/venice/guid/DoLStampGuidGenerator.java new file mode 100644 index 00000000000..ec8fa4ac4f3 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/guid/DoLStampGuidGenerator.java @@ -0,0 +1,41 @@ +package com.linkedin.venice.guid; + +import com.linkedin.venice.kafka.protocol.GUID; +import java.nio.ByteBuffer; +import java.util.UUID; + + +/** + * A GUID generator for Declaration of Leadership (DoL) control messages. + * + *

Uses {@link java.util.UUID#nameUUIDFromBytes(byte[])} to generate a type 3 GUID that will + * not collide with type 4 GUIDs generated for user data using {@link JavaUtilGuidV4Generator}. + * + *

This GUID is used to uniquely identify DoL messages in the version topic, allowing followers + * to detect and process leadership transition messages during fast leader handover. + */ +public class DoLStampGuidGenerator implements GuidGenerator { + private static DoLStampGuidGenerator instance; + + private DoLStampGuidGenerator() { + } + + public static synchronized DoLStampGuidGenerator getInstance() { + if (instance == null) { + instance = new DoLStampGuidGenerator(); + } + return instance; + } + + @Override + public GUID getGuid() { + UUID javaUtilUuid = UUID.nameUUIDFromBytes("declarationOfLeadershipControlMessage".getBytes()); + GUID guid = new GUID(); + byte[] guidBytes = ByteBuffer.allocate(16) + .putLong(javaUtilUuid.getMostSignificantBits()) + .putLong(javaUtilUuid.getLeastSignificantBits()) + .array(); + guid.bytes(guidBytes); + return guid; + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java index c6f9afafefd..c2ac1107018 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java @@ -1,5 +1,6 @@ package com.linkedin.venice.message; +import com.linkedin.venice.guid.DolGuidGenerator; import com.linkedin.venice.guid.HeartbeatGuidV3Generator; import com.linkedin.venice.kafka.protocol.GUID; import com.linkedin.venice.kafka.protocol.enums.MessageType; @@ -33,6 +34,31 @@ public class KafkaKey implements Measurable { .putInt(0) .putInt(0) .array()); + + /** + * Special key for Declaration of Leadership (DoL) control messages. + * + *

Used during leader handover to confirm the new leader can write to and consume from the local VT. + * This is distinct from HEART_BEAT to clearly separate leadership declaration from heartbeat semantics. + * + *

The DoL mechanism provides deterministic confirmation that a new leader has successfully: + *

    + *
  • Produced a message to the local version topic (VT)
  • + *
  • Consumed that message back from VT (loopback confirmation)
  • + *
+ * + *

This eliminates the need for time-based waits during leader handover and provides + * strong guarantees that the leader is ready to switch to consuming from the leader source topic + * (remote VT or RT topic). + */ + public static final KafkaKey DOL_STAMP = new KafkaKey( + MessageType.CONTROL_MESSAGE, + ByteBuffer.allocate(CONTROL_MESSAGE_KAFKA_KEY_LENGTH) + .put(DolGuidGenerator.getInstance().getGuid().bytes()) + .putInt(0) // segment number + .putInt(0) // sequence number + .array()); + private final byte keyHeaderByte; private final byte[] key; // TODO: Consider whether we may want to use a ByteBuffer here diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 27633e669a9..661c2fbf70a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -12,6 +12,7 @@ import com.linkedin.venice.exceptions.RecordTooLargeException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceResourceAccessException; +import com.linkedin.venice.guid.DoLStampGuidGenerator; import com.linkedin.venice.guid.GuidUtils; import com.linkedin.venice.guid.HeartbeatGuidV3Generator; import com.linkedin.venice.kafka.protocol.ControlMessage; @@ -561,6 +562,10 @@ public GUID getProducerGUID() { return producerGUID; } + public String getWriterId() { + return writerId; + } + /** * @return the Kafka topic name that this {@link VeniceWriter} instance writes into. */ @@ -2190,6 +2195,62 @@ public Future asyncSendControlMessage( } } + /** + * Constructs a KafkaMessageEnvelope for Declaration of Leadership (DoL) stamp messages. + * + * @param leadershipTerm unique timestamp identifying the leader session + * @param writerId identifier of the writer/host producing this message + * @param dolStampMessage the control message payload (typically a START_OF_SEGMENT) + * @return a KafkaMessageEnvelope configured with DoL-specific metadata + */ + public static KafkaMessageEnvelope getDoLStampKME( + long leadershipTerm, + String writerId, + ControlMessage dolStampMessage) { + ProducerMetadata producerMetadata = new ProducerMetadata(); + producerMetadata.producerGUID = DoLStampGuidGenerator.getInstance().getGuid(); + producerMetadata.segmentNumber = 0; + producerMetadata.messageSequenceNumber = 0; + producerMetadata.messageTimestamp = System.currentTimeMillis(); + producerMetadata.logicalTimestamp = VENICE_DEFAULT_LOGICAL_TS; + + LeaderMetadata leaderMetadataFooter = new LeaderMetadata(); + leaderMetadataFooter.hostName = writerId; + leaderMetadataFooter.termId = leadershipTerm; + + KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); + kafkaMessageEnvelope.messageType = MessageType.CONTROL_MESSAGE.getValue(); + kafkaMessageEnvelope.producerMetadata = producerMetadata; + kafkaMessageEnvelope.leaderMetadataFooter = leaderMetadataFooter; + kafkaMessageEnvelope.payloadUnion = dolStampMessage; + + return kafkaMessageEnvelope; + } + + /** + * Sends a Declaration of Leadership (DoL) stamp message during STANDBY to LEADER transition. + * The new leader produces this message to the local VT and waits to consume it back, ensuring + * the replica is fully caught up with the VT before switching over to consume from remote VT or RT. + * + * @param topicPartition the topic-partition to send the DoL stamp to + * @param callback callback to invoke when the produce completes + * @param leadershipTerm unique timestamp identifying this leader session + * @return a future that completes when the message is acknowledged by the broker + */ + public CompletableFuture sendDoLStamp( + PubSubTopicPartition topicPartition, + PubSubProducerCallback callback, + long leadershipTerm) { + KafkaMessageEnvelope kafkaMessageEnvelope = getDoLStampKME(leadershipTerm, writerId, heartBeatMessage); + return producerAdapter.sendMessage( + topicPartition.getPubSubTopic().getName(), + topicPartition.getPartitionNumber(), + KafkaKey.DOL_STAMP, + kafkaMessageEnvelope, + EmptyPubSubMessageHeaders.SINGLETON, + callback); + } + public static KafkaMessageEnvelope getHeartbeatKME( long originTimeStampMs, LeaderMetadataWrapper leaderMetadataWrapper, @@ -2228,7 +2289,7 @@ public CompletableFuture sendHeartbeat( if (isClosed) { CompletableFuture future = new CompletableFuture<>(); future.completedFuture(null); - logger.warn("VeniceWriter already closed for topic partition " + topicPartition); + logger.warn("VeniceWriter already closed for topic-partition ", topicPartition); return future; } KafkaMessageEnvelope kafkaMessageEnvelope = diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index ddc5291bff7..370a159baa6 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -33,6 +33,8 @@ import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_REUSABLE_OBJECTS_STRATEGY; import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS; +import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES; +import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES; import static com.linkedin.venice.ConfigKeys.SERVER_MAX_WAIT_FOR_VERSION_INFO_MS_CONFIG; import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS; @@ -277,6 +279,8 @@ static StatefulServiceProvider generateService( .put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000) .put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000) .put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true) + .put(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES, true) + .put(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES, true) .put(ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES, 512 * 1024 * 1024L) .put(ROCKSDB_RMD_BLOCK_CACHE_SIZE_IN_BYTES, 128 * 1024 * 1024L) From d55a8bc29d77f58c356cc9ec1129c379a450da06 Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Fri, 21 Nov 2025 01:17:29 -0800 Subject: [PATCH 2/4] Fix NPE Remove docs/dev_guide/declaration_of_leadership_design.md --- .../davinci/kafka/consumer/DolState.java | 67 ++ .../LeaderFollowerStoreIngestionTask.java | 26 +- .../consumer/PartitionConsumptionState.java | 77 +- .../kafka/consumer/StoreIngestionTask.java | 105 ++- .../declaration_of_leadership_design.md | 765 ------------------ .../venice/guid/DolGuidGenerator.java | 46 ++ .../linkedin/venice/writer/VeniceWriter.java | 1 + 7 files changed, 240 insertions(+), 847 deletions(-) create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolState.java delete mode 100644 docs/dev_guide/declaration_of_leadership_design.md create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/guid/DolGuidGenerator.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolState.java new file mode 100644 index 00000000000..28c53d0ef7b --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolState.java @@ -0,0 +1,67 @@ +package com.linkedin.davinci.kafka.consumer; + +import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import java.util.concurrent.CompletableFuture; + + +/** + * Tracks Declaration of Leadership (DoL) state during STANDBY to LEADER transition. + * DoL mechanism ensures the new leader is fully caught up with VT before switching to remote VT or RT. + */ +public class DolState { + private final long leadershipTerm; + private final String hostId; + private volatile boolean dolProduced; // DoL message was acked by broker + private volatile boolean dolConsumed; // DoL message was consumed back by this replica + private volatile CompletableFuture dolProduceFuture; // Future tracking DoL produce result + + public DolState(long leadershipTerm, String hostId) { + this.leadershipTerm = leadershipTerm; + this.hostId = hostId; + this.dolProduced = false; + this.dolConsumed = false; + this.dolProduceFuture = null; + } + + public long getLeadershipTerm() { + return leadershipTerm; + } + + public String getHostId() { + return hostId; + } + + public boolean isDolProduced() { + return dolProduced; + } + + public void setDolProduced(boolean dolProduced) { + this.dolProduced = dolProduced; + } + + public boolean isDolConsumed() { + return dolConsumed; + } + + public void setDolConsumed(boolean dolConsumed) { + this.dolConsumed = dolConsumed; + } + + public CompletableFuture getDolProduceFuture() { + return dolProduceFuture; + } + + public void setDolProduceFuture(CompletableFuture dolProduceFuture) { + this.dolProduceFuture = dolProduceFuture; + } + + public boolean isReady() { + return dolProduced && dolConsumed; + } + + @Override + public String toString() { + return "DolState{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed=" + + dolConsumed + ", futureSet=" + (dolProduceFuture != null) + "}"; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index c8b69eeaaed..c64fedf4e4c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -22,7 +22,6 @@ import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel; import com.linkedin.davinci.ingestion.LagType; -import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState.DolState; import com.linkedin.davinci.listener.response.NoOpReadResponseStats; import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper; import com.linkedin.davinci.schema.merge.MergeRecordHelper; @@ -43,7 +42,6 @@ import com.linkedin.davinci.validation.PartitionTracker; import com.linkedin.davinci.validation.PartitionTracker.TopicType; import com.linkedin.venice.annotation.VisibleForTesting; -import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceMessageException; @@ -854,12 +852,6 @@ protected static boolean checkWhetherToCloseUnusedVeniceWriter( return true; } - private boolean shouldUseDolMechanism() { - return isIngestingSystemStore() - ? serverConfig.isLeaderHandoverUseDoLMechanismEnabledForSystemStores() - : serverConfig.isLeaderHandoverUseDoLMechanismEnabledForUserStores(); - } - /** * Initializes DoL state and sends DoL stamp to local VT during STANDBY to LEADER transition. * @@ -867,28 +859,26 @@ private boolean shouldUseDolMechanism() { * @param leadershipTerm the leadership term for this transition */ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsumptionState, long leadershipTerm) { - if (!shouldUseDolMechanism() || !partitionConsumptionState.getVeniceWriterLazyRef().isPresent()) { + if (!shouldUseDolMechanism()) { LOGGER.debug( - "Skipping DoL stamp initialization for replica: {} as {}", - partitionConsumptionState.getReplicaId(), - !shouldUseDolMechanism() ? "DoL mechanism is disabled" : "VeniceWriter is not initialized"); + "Skipping DoL stamp initialization for replica: {} as DoL mechanism is disabled", + partitionConsumptionState.getReplicaId()); return; } - VeniceWriter veniceWriter = partitionConsumptionState.getVeniceWriterLazyRef().get(); - DolState dolState = new DolState(leadershipTerm, veniceWriter.getWriterId()); + DolState dolState = new DolState(leadershipTerm, veniceWriter.get().getWriterId()); partitionConsumptionState.setDolState(dolState); LOGGER.info( "Initialized DoL state: {} for replica: {} with term: {} and hostId: {}", dolState, partitionConsumptionState.getReplicaId(), leadershipTerm, - veniceWriter.getWriterId()); + veniceWriter.get().getWriterId()); // Send DolStamp to local VT PubSubProducerCallback dolCallback = new DolStampProduceCallback(partitionConsumptionState, leadershipTerm); - CompletableFuture dolProduceFuture = - veniceWriter.sendDoLStamp(partitionConsumptionState.getReplicaTopicPartition(), dolCallback, leadershipTerm); + CompletableFuture dolProduceFuture = veniceWriter.get() + .sendDoLStamp(partitionConsumptionState.getReplicaTopicPartition(), dolCallback, leadershipTerm); // Store the produce future in DolState dolState.setDolProduceFuture(dolProduceFuture); @@ -3953,7 +3943,7 @@ interface GetLastKnownUpstreamTopicOffset { } private boolean isIngestingSystemStore() { - return VeniceSystemStoreUtils.isSystemStore(storeName); + return isSystemStore; } /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 9f316f49ceb..a8e471b4252 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -18,7 +18,6 @@ import com.linkedin.venice.pubsub.PubSubContext; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.api.PubSubPosition; -import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -116,68 +115,6 @@ enum LatchStatus { NONE, LATCH_CREATED, LATCH_RELEASED } - /** - * Tracks Declaration of Leadership (DoL) state during STANDBY to LEADER transition. - * DoL mechanism ensures the new leader is fully caught up with VT before switching to remote VT or RT. - */ - static class DolState { - private final long leadershipTerm; - private final String hostId; - private volatile boolean dolProduced; // DoL message was acked by broker - private volatile boolean dolConsumed; // DoL message was consumed back by this replica - private volatile CompletableFuture dolProduceFuture; // Future tracking DoL produce result - - public DolState(long leadershipTerm, String hostId) { - this.leadershipTerm = leadershipTerm; - this.hostId = hostId; - this.dolProduced = false; - this.dolConsumed = false; - this.dolProduceFuture = null; - } - - public long getLeadershipTerm() { - return leadershipTerm; - } - - public String getHostId() { - return hostId; - } - - public boolean isDolProduced() { - return dolProduced; - } - - public void setDolProduced(boolean dolProduced) { - this.dolProduced = dolProduced; - } - - public boolean isDolConsumed() { - return dolConsumed; - } - - public void setDolConsumed(boolean dolConsumed) { - this.dolConsumed = dolConsumed; - } - - public CompletableFuture getDolProduceFuture() { - return dolProduceFuture; - } - - public void setDolProduceFuture(CompletableFuture dolProduceFuture) { - this.dolProduceFuture = dolProduceFuture; - } - - public boolean isReady() { - return dolProduced && dolConsumed; - } - - @Override - public String toString() { - return "DolState{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed=" - + dolConsumed + ", futureSet=" + (dolProduceFuture != null) + "}"; - } - } - /** * Only used in L/F model. Check if the partition has released the latch. * In L/F ingestion task, Optionally, the state model holds a latch that @@ -194,6 +131,12 @@ public String toString() { */ private volatile DolState dolState = null; + /** + * The highest leadership term observed by this replica. Currently used only + * for troubleshooting. This will eventually become part of the durable state. + */ + public volatile long highestLeadershipTerm = -1; + /** * This future is completed in drainer thread after persisting the associated record and offset to DB. */ @@ -602,6 +545,14 @@ public void clearDolState() { this.dolState = null; } + public long getHighestLeadershipTerm() { + return highestLeadershipTerm; + } + + public void setHighestLeadershipTerm(long term) { + this.highestLeadershipTerm = term; + } + public void setLastLeaderPersistFuture(Future future) { this.lastLeaderPersistFuture = future; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index a70d0a2afe6..e6d5c4ccfdc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1333,6 +1333,101 @@ private int handleSingleMessage( return record.getPayloadSize(); } + /** + * Checks if the consumed message is a DoL (Declaration of Leadership) stamp and marks it as consumed + * if it matches the current DolState. This method is called during message consumption to detect when + * the leader replica has successfully consumed back its own DoL stamp, indicating it's fully caught up + * with the version topic and ready to switch to consuming from remote VT or RT. + * + * @param partitionConsumptionState the partition consumption state tracking DoL status + * @param dolPubSubMessage the consumed DoL PubSub message with leadership metadata + */ + private void checkAndHandleDoLMessage( + PartitionConsumptionState partitionConsumptionState, + DefaultPubSubMessage dolPubSubMessage) { + // Early exit if DoL mechanism is disabled via config + if (!shouldUseDolMechanism()) { + return; + } + + // Extract leadership metadata from the message + KafkaMessageEnvelope messageEnvelope = dolPubSubMessage.getValue(); + LeaderMetadata leaderMetadata = messageEnvelope.getLeaderMetadataFooter(); + if (leaderMetadata == null) { + // Message doesn't have leadership metadata, skip processing + return; + } + + // Extract term information from the consumed message + long consumedTermId = leaderMetadata.getTermId(); + String consumedHostId = leaderMetadata.getHostName().toString(); + long messageTimestamp = messageEnvelope.getProducerMetadata() != null + ? messageEnvelope.getProducerMetadata().getMessageTimestamp() + : -1; + String replicaId = partitionConsumptionState.getReplicaId(); + + // Update highest observed leadership term for monitoring + long previousHighestTerm = partitionConsumptionState.getHighestLeadershipTerm(); + if (consumedTermId > previousHighestTerm) { + partitionConsumptionState.setHighestLeadershipTerm(consumedTermId); + LOGGER.info( + "Replica: {} observed new highest leadership term: {} (previous: {}) from host: {} at timestamp: {}", + replicaId, + consumedTermId, + previousHighestTerm, + consumedHostId, + messageTimestamp); + } + + // Get current DoL state - may be null if not in STANDBY->LEADER transition + DolState currentDolState = partitionConsumptionState.getDolState(); + if (currentDolState == null) { + // Not currently waiting for a DoL, just log for observability + LOGGER.debug( + "Replica: {} consumed DoL stamp for term: {} from host: {} (timestamp: {}), but not currently waiting for DoL", + replicaId, + consumedTermId, + consumedHostId, + messageTimestamp); + return; + } + + // Check if this DoL matches the expected term and host + long expectedTermId = currentDolState.getLeadershipTerm(); + String expectedHostId = currentDolState.getHostId(); + + if (consumedTermId == expectedTermId && consumedHostId.equals(expectedHostId)) { + // Successfully consumed our own DoL stamp - mark as consumed + currentDolState.setDolConsumed(true); + LOGGER.info( + "Replica {}: finished DoL loopback. The leader wrote its DoL stamp to the " + + "local VT and successfully consumed it again, confirming the replica is " + + "fully caught up. [term={}, host={}, timestamp={}]. DolState={}", + replicaId, + consumedTermId, + consumedHostId, + messageTimestamp, + currentDolState); + } else { + // Received a DoL stamp that doesn't match our expected state + LOGGER.warn( + "Replica: {} consumed DoL stamp with mismatched metadata. Expected: [term={}, host={}], Received: [term={}, host={}]. " + + "This may indicate a stale message or concurrent leadership changes. Current DolState: {}", + replicaId, + expectedTermId, + expectedHostId, + consumedTermId, + consumedHostId, + currentDolState); + } + } + + protected boolean shouldUseDolMechanism() { + return isSystemStore + ? serverConfig.isLeaderHandoverUseDoLMechanismEnabledForSystemStores() + : serverConfig.isLeaderHandoverUseDoLMechanismEnabledForUserStores(); + } + /** * This function is in charge of producing the consumer records to the writer buffers maintained by {@link StoreBufferService}. * @@ -3467,6 +3562,13 @@ private int internalProcessConsumerRecord( consumerRecord.getPubSubMessageTime()); } } + // Check if this is a DoL (Declaration of Leadership) stamp message + if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue() + && Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.DOL_STAMP.getKey())) { + // Process DoL stamp to check if it matches the expected leadership term + checkAndHandleDoLMessage(partitionConsumptionState, consumerRecord); + } + } catch (Exception e) { LOGGER.error("Failed to record Record heartbeat with message: ", e); } @@ -3641,7 +3743,8 @@ protected void validateMessage( PartitionConsumptionState partitionConsumptionState, boolean tolerateMissingMessagesForRealTimeTopic) { KafkaKey key = consumerRecord.getKey(); - if (key.isControlMessage() && Arrays.equals(KafkaKey.HEART_BEAT.getKey(), key.getKey())) { + if (key.isControlMessage() && (Arrays.equals(KafkaKey.HEART_BEAT.getKey(), key.getKey()) + || Arrays.equals(KafkaKey.DOL_STAMP.getKey(), key.getKey()))) { return; // Skip validation for ingestion heartbeat records. } else if (isGlobalRtDivEnabled() && isRecordSelfProduced(consumerRecord)) { // Skip validation for self-produced records. If there were any issues, the followers would've reported it already diff --git a/docs/dev_guide/declaration_of_leadership_design.md b/docs/dev_guide/declaration_of_leadership_design.md deleted file mode 100644 index 3e526518dee..00000000000 --- a/docs/dev_guide/declaration_of_leadership_design.md +++ /dev/null @@ -1,765 +0,0 @@ -# Declaration of Leadership (DoL) Design for Fast Leader Handover - -## Problem Statement - -The current `canSwitchToLeaderTopic()` implementation relies on a time-based mechanism (waiting for `newLeaderInactiveTime`) to determine when it's safe to switch from consuming the local version topic (VT) to the leader source topic (remote VT during batch push, or RT topic for hybrid stores). This approach has reliability issues: - -1. **Non-deterministic timing**: Potential Kafka consumer starvation could cause the old leader to continue producing even after the wait period -2. **Downstream detection burden**: Followers must detect upstream offset rewind via producer hostname changes -3. **Lack of deterministic confirmation**: No explicit confirmation that the new leader has established its position in the VT - -## Proposed Solution: Declaration of Leadership (DoL) - -Use a **loopback mechanism** where the new leader explicitly declares its leadership by: -1. Appending a special DoL message to the local VT -2. Waiting until it consumes this message back from the VT (loopback confirmation) -3. Only after confirmation, switching to consume from the leader source topic (remote VT or RT) - -This provides a **deterministic guarantee** that the leader has successfully written to and consumed from the VT before transitioning to the leader source topic. - ---- - -## Design Details - -### 1. DoL Message Format - -**Use `START_OF_SEGMENT` control message with special metadata and dedicated key:** - -- **Message Type**: `ControlMessageType.START_OF_SEGMENT` -- **Key**: `KafkaKey.DECLARATION_OF_LEADERSHIP` (new static constant to add to `KafkaKey.java`) - - Similar structure to `KafkaKey.HEART_BEAT` but with distinct GUID for semantic clarity - - Distinguishes DoL messages from regular heartbeat messages - - Enables clean filtering and monitoring of DoL messages -- **Special Metadata** (in `StartOfSegment` payload): - - **`isDeclarationOfLeadership`**: boolean flag (new field to add to `StartOfSegment.avsc`) - - **`leadershipTerm`**: long value representing the leadership term/epoch - - **`producerGUID`**: GUID identifying the new leader replica - - **`declarationTimestamp`**: timestamp when DoL was issued - -**Why START_OF_SEGMENT with dedicated DoL key?** -- Semantic alignment: declaring leadership is starting a new segment of leader activity -- Minimal schema changes: add optional fields to existing `StartOfSegment` message -- Existing infrastructure already handles START_OF_SEGMENT control messages -- **Dedicated key prevents confusion with heartbeat messages** and enables clear distinction -- Clean separation of concerns: DoL != heartbeat - -### 2. State Tracking in PartitionConsumptionState - -Add new fields to track DoL lifecycle: - -```java -/** - * Tracks the Declaration of Leadership (DoL) state for fast leader handover. - * Only relevant when transitioning from STANDBY to LEADER. - */ -public static class DeclarationOfLeadershipState { - // Whether DoL message has been produced to local VT - private volatile boolean dolMessageProduced = false; - - // The future tracking the DoL message produce operation - private volatile CompletableFuture dolProduceFuture = null; - - // The position where DoL message was produced (null until confirmed) - private volatile PubSubPosition dolPosition = null; - - // The leadership term included in the DoL message - private volatile long leadershipTerm = 0; - - // Timestamp when DoL was produced - private volatile long dolProducedTimestamp = 0; - - // Whether we've consumed the DoL message back (loopback confirmation) - private volatile boolean dolConsumed = false; - - // Timestamp when DoL was consumed back - private volatile long dolConsumedTimestamp = 0; - - // Reset state when transitioning out of leader or back to standby - public void reset() { - this.dolMessageProduced = false; - this.dolProduceFuture = null; - this.dolPosition = null; - this.leadershipTerm = 0; - this.dolProducedTimestamp = 0; - this.dolConsumed = false; - this.dolConsumedTimestamp = 0; - } - - // Getters and setters... -} - -// In PartitionConsumptionState class: -private final DeclarationOfLeadershipState dolState = new DeclarationOfLeadershipState(); - -public DeclarationOfLeadershipState getDolState() { - return dolState; -} -``` - -### 3. Updated `canSwitchToLeaderTopic()` Logic - -Replace the time-based check with DoL-based confirmation: - -```java -/** - * Checks whether the replica can safely switch to consuming from the leader source topic. - * - *

Uses Declaration of Leadership (DoL) loopback mechanism: - * 1. DoL message must have been produced to local VT - * 2. DoL message must have been consumed back from local VT (loopback confirmation) - * - *

The DoL loopback confirmation provides a strong guarantee that: - * - The new leader can successfully write to the local VT - * - The new leader can successfully consume from the local VT - * - The new leader has established its position in the VT before switching to leader source topic - * - **The new leader has fully caught up on the VT** (consuming the DoL back means - * the leader has processed all messages up to and including its own DoL message) - * - *

This eliminates the need for time-based waits or special handling for different - * store types (e.g., user system stores). The loopback mechanism is sufficient for all stores - * because consuming the DoL back is inherently a confirmation of being caught up on VT. - * - *

The leader source topic can be either: - * - Remote VT during batch push (before EOP) - * - RT topic for hybrid stores (after EOP) - * - * @param pcs partition consumption state - * @return true if safe to switch to leader topic; false otherwise - */ -private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) { - DeclarationOfLeadershipState dolState = pcs.getDolState(); - - // Step 1: Check if DoL message has been produced - if (!dolState.isDolMessageProduced()) { - LOGGER.debug( - "Cannot switch to leader topic for partition {}: DoL message not yet produced", - pcs.getPartition()); - return false; - } - - // Step 2: Check if DoL message has been consumed back (loopback confirmation) - if (!dolState.isDolConsumed()) { - // Check if DoL produce operation is still pending - CompletableFuture produceFuture = dolState.getDolProduceFuture(); - if (produceFuture != null && !produceFuture.isDone()) { - LOGGER.debug( - "Cannot switch to leader topic for partition {}: DoL produce operation still pending", - pcs.getPartition()); - return false; - } - - // Produce completed but not consumed yet - this is expected during normal operation - LOGGER.debug( - "Cannot switch to leader topic for partition {}: DoL message produced at {} but not yet consumed back", - pcs.getPartition(), - dolState.getDolPosition()); - return false; - } - - // All conditions met - safe to switch to leader topic - // The DoL loopback confirmation is sufficient for all store types - long loopbackLatency = dolState.getDolConsumedTimestamp() - dolState.getDolProducedTimestamp(); - LOGGER.info( - "Ready to switch to leader topic for partition {}: DoL loopback confirmed (latency: {}ms, term: {})", - pcs.getPartition(), - loopbackLatency, - dolState.getLeadershipTerm()); - - return true; -} -``` - -### 4. DoL Message Production - -Produce DoL message when entering `IN_TRANSITION_FROM_STANDBY_TO_LEADER` state: - -```java -/** - * Produces a Declaration of Leadership (DoL) message to the local VT. - * This is called when transitioning from STANDBY to LEADER state. - * - * @param pcs partition consumption state - */ -private void produceDeclarationOfLeadership(PartitionConsumptionState pcs) { - DeclarationOfLeadershipState dolState = pcs.getDolState(); - - // Avoid duplicate production - if (dolState.isDolMessageProduced()) { - LOGGER.warn( - "DoL message already produced for partition {}, skipping duplicate production", - pcs.getPartition()); - return; - } - - // Generate leadership term (can use timestamp or counter) - long leadershipTerm = System.currentTimeMillis(); - dolState.setLeadershipTerm(leadershipTerm); - - // Create DoL message using START_OF_SEGMENT with special metadata - StartOfSegment sosPayload = new StartOfSegment(); - sosPayload.isDeclarationOfLeadership = true; // New field - sosPayload.leadershipTerm = leadershipTerm; // New field - sosPayload.checksumType = CheckSumType.NONE.getValue(); - - // Wrap in control message - ControlMessage controlMessage = new ControlMessage(); - controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); - controlMessage.controlMessageUnion = sosPayload; - - // Produce to local VT with DECLARATION_OF_LEADERSHIP key - int partition = pcs.getPartition(); - VeniceWriter writer = getVeniceWriter(pcs); - - try { - long producedTimestamp = System.currentTimeMillis(); - dolState.setDolProducedTimestamp(producedTimestamp); - - CompletableFuture produceFuture = - writer.sendControlMessage( - KafkaKey.DECLARATION_OF_LEADERSHIP, - controlMessage, - partition, - new DolProduceCallback(pcs, leadershipTerm, producedTimestamp)); - - dolState.setDolProduceFuture(produceFuture); - dolState.setDolMessageProduced(true); - - LOGGER.info( - "Produced DoL message to local VT for partition {} with term {}", - partition, - leadershipTerm); - - } catch (Exception e) { - dolState.reset(); - throw new VeniceException( - "Failed to produce DoL message for partition " + partition, - e); - } -} - -/** - * Callback to handle DoL message produce completion. - */ -private class DolProduceCallback implements PubSubProducerCallback { - private final PartitionConsumptionState pcs; - private final long leadershipTerm; - private final long producedTimestamp; - - public DolProduceCallback( - PartitionConsumptionState pcs, - long leadershipTerm, - long producedTimestamp) { - this.pcs = pcs; - this.leadershipTerm = leadershipTerm; - this.producedTimestamp = producedTimestamp; - } - - @Override - public void onCompletion(PubSubProduceResult produceResult, Exception exception) { - if (exception != null) { - LOGGER.error( - "Failed to produce DoL message for partition {} with term {}", - pcs.getPartition(), - leadershipTerm, - exception); - pcs.getDolState().reset(); - return; - } - - // Record the position where DoL was produced - PubSubPosition dolPosition = produceResult.getPosition(); - pcs.getDolState().setDolPosition(dolPosition); - - LOGGER.info( - "DoL message produce confirmed for partition {} at position {} (term: {})", - pcs.getPartition(), - dolPosition, - leadershipTerm); - } -} -``` - -### 5. DoL Message Consumption Detection - -Detect when the DoL message is consumed back: - -```java -/** - * Checks if the consumed control message is the DoL message we're waiting for. - * Called during control message processing in the consumption path. - * - * @param consumerRecord the consumed record - * @param controlMessage the control message payload - * @param pcs partition consumption state - * @return true if this is the DoL message we're waiting for - */ -private boolean checkAndHandleDeclarationOfLeadership( - DefaultPubSubMessage consumerRecord, - ControlMessage controlMessage, - PartitionConsumptionState pcs) { - - // Only check during transition to leader - if (pcs.getLeaderFollowerState() != IN_TRANSITION_FROM_STANDBY_TO_LEADER) { - return false; - } - - // Must be START_OF_SEGMENT with DECLARATION_OF_LEADERSHIP key - if (ControlMessageType.valueOf(controlMessage) != START_OF_SEGMENT) { - return false; - } - - if (!Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.DECLARATION_OF_LEADERSHIP.getKey())) { - return false; - } - - // Check if this is a DoL message (verify the flag is set) - StartOfSegment sos = (StartOfSegment) controlMessage.controlMessageUnion; - if (!sos.isDeclarationOfLeadership) { - return false; // Not a DoL message, possibly malformed - } - - // Verify this is OUR DoL message (match leadership term) - DeclarationOfLeadershipState dolState = pcs.getDolState(); - if (sos.leadershipTerm != dolState.getLeadershipTerm()) { - LOGGER.warn( - "Consumed DoL message with mismatched term for partition {}: expected {}, got {}", - pcs.getPartition(), - dolState.getLeadershipTerm(), - sos.leadershipTerm); - return false; - } - - // Loopback confirmed! - long consumedTimestamp = System.currentTimeMillis(); - dolState.setDolConsumed(true); - dolState.setDolConsumedTimestamp(consumedTimestamp); - - long loopbackLatency = consumedTimestamp - dolState.getDolProducedTimestamp(); - LOGGER.info( - "DoL loopback confirmed for partition {}: term={}, loopback_latency={}ms, position={}", - pcs.getPartition(), - sos.leadershipTerm, - loopbackLatency, - consumerRecord.getPosition()); - - // Emit metrics for monitoring - emitDolLoopbackMetric(pcs.getPartition(), loopbackLatency); - - return true; -} -``` - -### 6. Integration into State Transition Logic - -Update the state transition flow to use DoL: - -```java -// In checkConsumptionStateForStoreMetadataChanges() method - -case IN_TRANSITION_FROM_STANDBY_TO_LEADER: - // NEW: Produce DoL message if not already produced - DeclarationOfLeadershipState dolState = partitionConsumptionState.getDolState(); - if (!dolState.isDolMessageProduced()) { - produceDeclarationOfLeadership(partitionConsumptionState); - } - - // Check if we can switch to leader topic (now uses DoL confirmation) - if (canSwitchToLeaderTopic(partitionConsumptionState)) { - LOGGER.info( - "Initiating promotion of replica: {} to leader for the partition. Unsubscribing from the current topic: {}", - partitionConsumptionState.getReplicaId(), - kafkaVersionTopic); - - // Unsubscribe from VT and switch to RT... - // (existing logic continues) - - // IMPORTANT: Reset DoL state after successful transition - dolState.reset(); - } - break; - -// In processControlMessage() method -case START_OF_SEGMENT: - // Check if this is a DoL message we're waiting for - if (checkAndHandleDeclarationOfLeadership(consumerRecord, controlMessage, partitionConsumptionState)) { - // DoL loopback confirmed - canSwitchToLeaderTopic() will now return true - } - // ... existing START_OF_SEGMENT handling ... - break; -``` - -### 7. State Reset and Cleanup - -Reset DoL state in appropriate scenarios: - -```java -// When transitioning back to STANDBY or on partition unsubscribe -private void resetDeclarationOfLeadershipState(PartitionConsumptionState pcs) { - pcs.getDolState().reset(); - LOGGER.debug("Reset DoL state for partition {}", pcs.getPartition()); -} - -// Call in: -// 1. LEADER -> STANDBY transition -// 2. Partition unsubscribe -// 3. Ingestion task shutdown -// 4. Error handling paths -``` - ---- - -## Schema Changes - -### 1. Create v14 of KafkaMessageEnvelope and add fields to StartOfSegment - -**Path**: `/internal/venice-common/src/main/resources/avro/KafkaMessageEnvelope/v14/KafkaMessageEnvelope.avsc` - -Copy v13 and add these two fields to the `StartOfSegment` record (within the `controlMessageUnion`): - -```json -{ - "name": "StartOfSegment", - "type": "record", - "fields": [ - { - "name": "checksumType", - "type": "int" - }, { - "name": "upcomingAggregates", - "type": { - "type": "array", - "items": "string" - } - }, { - "name": "isDeclarationOfLeadership", - "type": "boolean", - "default": false, - "doc": "Flag indicating this START_OF_SEGMENT is a Declaration of Leadership (DoL) message for fast leader handover" - }, { - "name": "leadershipTerm", - "type": "long", - "default": 0, - "doc": "Leadership term/epoch for DoL messages. Used to match produced and consumed DoL messages during loopback" - } - ] -} -``` - -These are **optional fields with defaults**, ensuring **backward compatibility** with existing v13 producers/consumers. - -After adding the schema, update the protocol version mapping in the codebase to register v14. - -### 2. Add new constant to `KafkaKey.java`: - -```java -/** - * Special key for Declaration of Leadership (DoL) control messages. - * Used during leader handover to confirm the new leader can write to and consume from the local VT. - * This is distinct from HEART_BEAT to clearly separate leadership declaration from heartbeat semantics. - */ -public static final KafkaKey DECLARATION_OF_LEADERSHIP = new KafkaKey( - MessageType.CONTROL_MESSAGE, - ByteBuffer.allocate(CONTROL_MESSAGE_KAFKA_KEY_LENGTH) - .put(DolGuidGenerator.getInstance().getGuid().bytes()) // Use distinct GUID generator - .putInt(0) // segment number - .putInt(0) // sequence number - .array()); -``` - -**Note**: Create a dedicated `DolGuidGenerator` similar to `HeartbeatGuidV3Generator` to ensure DoL messages have a distinct, recognizable GUID pattern. - ---- - -## Configuration - -The DoL mechanism is controlled by **two separate configuration flags** to enable independent rollout for system stores and user stores: - -### 1. System Stores Configuration - -**Config Key**: `server.leader.handover.use.dol.mechanism.for.system.stores` - -**Purpose**: Controls DoL mechanism for system stores (meta stores, push status stores, etc.) - -**Default**: `false` (uses legacy time-based mechanism) - -**Usage in VeniceServerConfig**: -```java -private final boolean leaderHandoverUseDoLMechanismForSystemStores; - -public VeniceServerConfig(VeniceProperties serverProperties) { - this.leaderHandoverUseDoLMechanismForSystemStores = - serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES, false); -} - -public boolean isLeaderHandoverUseDoLMechanismEnabledForSystemStores() { - return this.leaderHandoverUseDoLMechanismForSystemStores; -} -``` - -### 2. User Stores Configuration - -**Config Key**: `server.leader.handover.use.dol.mechanism.for.user.stores` - -**Purpose**: Controls DoL mechanism for user stores (regular application data stores) - -**Default**: `false` (uses legacy time-based mechanism) - -**Usage in VeniceServerConfig**: -```java -private final boolean leaderHandoverUseDoLMechanismForUserStores; - -public VeniceServerConfig(VeniceProperties serverProperties) { - this.leaderHandoverUseDoLMechanismForUserStores = - serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES, false); -} - -public boolean isLeaderHandoverUseDoLMechanismEnabledForUserStores() { - return this.leaderHandoverUseDoLMechanismForUserStores; -} -``` - -### 3. Config Selection Logic - -The `canSwitchToLeaderTopic()` method selects the appropriate config based on store type: - -```java -private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) { - // Check if DoL mechanism is enabled via config (system stores vs user stores) - boolean useDoLMechanism = isIngestingSystemStore() - ? serverConfig.isLeaderHandoverUseDoLMechanismEnabledForSystemStores() - : serverConfig.isLeaderHandoverUseDoLMechanismEnabledForUserStores(); - - if (useDoLMechanism) { - // Use DoL-based logic - return checkDoLLoopbackConfirmation(pcs); - } else { - // Use legacy time-based mechanism - return canSwitchToLeaderTopicLegacy(pcs); - } -} -``` - -### 4. Rollout Strategy - -Having separate configs enables a **phased rollout approach**: - -**Phase 1: System Stores First** -- Set `server.leader.handover.use.dol.mechanism.for.system.stores=true` -- System stores typically have lower volume and are easier to monitor -- Validate DoL mechanism works correctly on critical infrastructure stores - -**Phase 2: User Stores Rollout** -- After system stores are stable, enable for user stores -- Set `server.leader.handover.use.dol.mechanism.for.user.stores=true` -- Can roll out gradually by store or by region - -**Phase 3: Full Adoption** -- Both configs enabled cluster-wide -- Monitor metrics to ensure improved handover latency -- Eventual deprecation of legacy time-based mechanism - -### 5. Benefits of Separate Configs - -- **Risk Mitigation**: Test on critical system stores before wider user store rollout -- **Independent Control**: Disable DoL for user stores without affecting system stores (or vice versa) -- **Gradual Migration**: Roll out to different store types at different paces -- **Operational Flexibility**: Quick rollback per store type if issues arise -- **A/B Testing**: Compare DoL vs. legacy behavior for different store types - ---- - -## Benefits of DoL Approach - -### 1. **Deterministic Confirmation** -- Explicit proof that new leader can write to and read from VT -- No reliance on timing assumptions or probabilistic waits - -### 2. **Fast Handover** -- Loopback latency typically milliseconds (vs. minutes with time-based approach) -- Eliminates unnecessary `newLeaderInactiveTime` wait period - -### 3. **Strong Consistency Guarantee** -- New leader has confirmed its position in VT before consuming RT -- Prevents split-brain scenarios where old/new leaders overlap -- **Consuming DoL back is inherent confirmation of being fully caught up on VT** -- Eliminates need for special-case logic (e.g., `isUserSystemStore()` checks) - -### 4. **Unified Logic Across Store Types** -- DoL loopback confirmation means the leader has processed all VT messages up to its DoL -- No need for separate `isLocalVersionTopicPartitionFullyConsumed()` checks for user system stores -- Single, consistent handover mechanism for all stores (user stores, system stores, hybrid stores) -- Simplifies code and reduces maintenance burden - -### 5. **Observability** -- DoL loopback latency metric tracks handover speed -- Clear state tracking for debugging and monitoring -- Explicit log messages for handover progress - -### 5. **Graceful Degradation** -- If DoL message never loops back (e.g., VT issues), state transition won't proceed -- Prevents unsafe promotion to leader - -### 6. **Backward Compatible** -- New DoL fields are optional in `StartOfSegment` -- Existing replicas ignore DoL messages (see them as regular heartbeats) -- Gradual rollout possible - ---- - -## Metrics and Monitoring - -Add metrics to track DoL mechanism: - -```java -// DoL loopback latency histogram -hostLevelIngestionStats.recordDolLoopbackLatency(storeName, partition, loopbackLatency); - -// DoL state gauges -hostLevelIngestionStats.recordDolProduced(storeName, partition, 1); -hostLevelIngestionStats.recordDolConsumed(storeName, partition, 1); - -// DoL failures counter -hostLevelIngestionStats.recordDolFailure(storeName, partition, reason); -``` - ---- - -## Testing Strategy - -### Unit Tests -1. Test DoL state transitions -2. Test DoL message production -3. Test DoL message consumption detection -4. Test term matching logic -5. Test state reset scenarios - -### Integration Tests -1. Leader handover with DoL mechanism end-to-end -2. Multiple concurrent leader transitions -3. DoL message loss scenarios (retry/timeout) -4. Backward compatibility with old replicas - -### Stress Tests -1. Rapid leader failover cycles -2. High partition count scenarios -3. Network partition during DoL loopback - ---- - -## Migration and Rollout - -### Phase 1: Schema Evolution -- Deploy `StartOfSegment` schema with new optional fields (`isDeclarationOfLeadership`, `leadershipTerm`) -- Add `KafkaKey.DECLARATION_OF_LEADERSHIP` constant and `DolGuidGenerator` -- No behavior changes yet - configs remain `false` by default - -### Phase 2: DoL Code Deployment -- Deploy DoL production and consumption logic -- Both configs (`for.system.stores` and `for.user.stores`) default to `false` -- Code is in place but not activated - -### Phase 3: Enable DoL for System Stores (Canary) -- Set `server.leader.handover.use.dol.mechanism.for.system.stores=true` on canary servers -- Monitor DoL loopback metrics for system stores (meta stores, push status stores) -- Validate DoL messages are produced and consumed correctly -- User stores continue using legacy time-based mechanism - -### Phase 4: System Stores Full Rollout -- After canary validation, enable DoL for all system stores cluster-wide -- Set `server.leader.handover.use.dol.mechanism.for.system.stores=true` globally -- Monitor for 1-2 weeks to ensure stability - -### Phase 5: Enable DoL for User Stores (Gradual) -- Start with low-traffic user stores or specific regions -- Set `server.leader.handover.use.dol.mechanism.for.user.stores=true` gradually -- Monitor handover latency improvements and error rates -- Expand to high-traffic stores after validation - -### Phase 6: Full Adoption -- Both configs enabled cluster-wide -- System stores and user stores using DoL mechanism -- Legacy time-based logic remains as fallback in code - -### Phase 7: Cleanup (Future) -- After 6-12 months of stable operation, consider removing legacy code -- Deprecate and remove `canSwitchToLeaderTopicLegacy()` method -- Remove time-based fallback logic - ---- - -## Edge Cases and Error Handling - -### 1. DoL Message Lost -- **Symptom**: DoL produced but never consumed back -- **Handling**: Add timeout (e.g., 2 ร— `newLeaderInactiveTime`) -- **Recovery**: Retry DoL production or fall back to time-based check - -### 2. Concurrent Leader Transitions -- **Symptom**: Multiple DoL messages with different terms -- **Handling**: Term matching ensures we only confirm OUR DoL -- **Recovery**: Reset on state transitions, only track latest term - -### 3. VT Write Failures -- **Symptom**: DoL produce operation fails -- **Handling**: Reset DoL state, retry or escalate error -- **Recovery**: Don't transition to LEADER without DoL confirmation - -### 4. Partition Reassignment During DoL -- **Symptom**: Partition unsubscribed while waiting for DoL -- **Handling**: Reset DoL state on unsubscribe -- **Recovery**: New state model invocation starts fresh DoL cycle - -### 5. Old Leader Still Producing -- **Symptom**: Old leader continues after new leader DoL -- **Handling**: Term/GUID in DoL helps identify producer -- **Recovery**: Followers detect via changed producer metadata - ---- - -## Alternative Considered: Explicit DoL Control Message Type - -Instead of reusing `START_OF_SEGMENT`, we could create a new `ControlMessageType.DECLARATION_OF_LEADERSHIP`. - -**Pros:** -- Clearer semantic meaning -- Dedicated schema without mixing concerns - -**Cons:** -- Requires new control message type (more schema evolution) -- Less reuse of existing infrastructure -- More code changes across consumers - -**Decision**: Use `START_OF_SEGMENT` with special flag for simpler implementation and better reuse. - ---- - -## Summary - -The Declaration of Leadership (DoL) mechanism provides a **deterministic, fast, and reliable** approach to leader handover by: - -1. **Loopback Confirmation**: New leader produces DoL message to VT and waits to consume it back -2. **Explicit State Tracking**: `DeclarationOfLeadershipState` in `PartitionConsumptionState` tracks DoL lifecycle -3. **Deterministic Verification**: Replaces unreliable time-based waits with position-based verification -4. **Unified Logic**: Same handover mechanism for all store types (no special `isUserSystemStore()` checks) -5. **Backward Compatibility**: Optional schema fields with defaults, dedicated `KafkaKey.DECLARATION_OF_LEADERSHIP` - -### Key Implementation Components - -**Schema/Protocol Changes:** -- Add `isDeclarationOfLeadership` and `leadershipTerm` fields to `StartOfSegment.avsc` -- Add `KafkaKey.DECLARATION_OF_LEADERSHIP` constant with dedicated GUID generator -- Add `DeclarationOfLeadershipState` class to track DoL lifecycle - -**Core Logic Changes:** -- `produceDeclarationOfLeadership()`: Produces DoL message when entering `IN_TRANSITION_FROM_STANDBY_TO_LEADER` -- `checkAndHandleDeclarationOfLeadership()`: Detects DoL message consumption with term matching -- `canSwitchToLeaderTopic()`: Simplified to check only DoL confirmation (no time-based or store-type logic) - -**Benefits:** -- โšก **Millisecond handover** vs. minutes with time-based approach -- ๐ŸŽฏ **Deterministic** confirmation of VT write/read capability and caught-up state -- ๐Ÿ”’ **Safe** prevention of split-brain scenarios -- ๐Ÿ“Š **Observable** with clear metrics and state tracking -- ๐Ÿงน **Simplified** codebase with unified logic for all store types - -This design eliminates the race conditions and timing dependencies of the current implementation while providing clear observability into the handover process. diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/guid/DolGuidGenerator.java b/internal/venice-common/src/main/java/com/linkedin/venice/guid/DolGuidGenerator.java new file mode 100644 index 00000000000..f898a32d33c --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/guid/DolGuidGenerator.java @@ -0,0 +1,46 @@ +package com.linkedin.venice.guid; + +import com.linkedin.venice.kafka.protocol.GUID; +import java.nio.ByteBuffer; +import java.util.UUID; + + +/** + * A GUID generator for Declaration of Leadership (DoL) control messages. + * + *

Uses {@link java.util.UUID#nameUUIDFromBytes(byte[])} to generate a type 3 UUID that will + * not collide with: + *

    + *
  • User data GUIDs (type 4, generated by {@link JavaUtilGuidV4Generator})
  • + *
  • Heartbeat control message GUIDs (type 3, generated by {@link HeartbeatGuidV3Generator})
  • + *
+ * + *

DoL messages use a distinct GUID to clearly identify leadership declaration messages + * during the leader handover process. This allows replicas to distinguish DoL messages from + * regular heartbeat messages and user data. + */ +public class DolGuidGenerator implements GuidGenerator { + private static DolGuidGenerator instance; + + private DolGuidGenerator() { + } + + public static synchronized DolGuidGenerator getInstance() { + if (instance == null) { + instance = new DolGuidGenerator(); + } + return instance; + } + + @Override + public GUID getGuid() { + UUID javaUtilUuid = UUID.nameUUIDFromBytes("declarationOfLeadershipControlMessage".getBytes()); + GUID guid = new GUID(); + byte[] guidBytes = ByteBuffer.allocate(16) + .putLong(javaUtilUuid.getMostSignificantBits()) + .putLong(javaUtilUuid.getLeastSignificantBits()) + .array(); + guid.bytes(guidBytes); + return guid; + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 661c2fbf70a..766efb1e2dd 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -2217,6 +2217,7 @@ public static KafkaMessageEnvelope getDoLStampKME( LeaderMetadata leaderMetadataFooter = new LeaderMetadata(); leaderMetadataFooter.hostName = writerId; leaderMetadataFooter.termId = leadershipTerm; + leaderMetadataFooter.upstreamPubSubPosition = ByteBuffer.allocate(0); // Initialize to empty ByteBuffer KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); kafkaMessageEnvelope.messageType = MessageType.CONTROL_MESSAGE.getValue(); From ab9b98b868eadc43f6e64d7884acdd7ffbabf488 Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Fri, 21 Nov 2025 13:21:03 -0800 Subject: [PATCH 3/4] Cleanup --- .../consumer/{DolState.java => DolStamp.java} | 6 ++-- .../LeaderFollowerStoreIngestionTask.java | 36 +++++++++---------- .../consumer/PartitionConsumptionState.java | 12 +++---- .../kafka/consumer/StoreIngestionTask.java | 22 ++++++------ .../linkedin/venice/writer/VeniceWriter.java | 6 ++++ 5 files changed, 44 insertions(+), 38 deletions(-) rename clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/{DolState.java => DolStamp.java} (92%) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java similarity index 92% rename from clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolState.java rename to clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java index 28c53d0ef7b..53b979fdef5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java @@ -8,14 +8,14 @@ * Tracks Declaration of Leadership (DoL) state during STANDBY to LEADER transition. * DoL mechanism ensures the new leader is fully caught up with VT before switching to remote VT or RT. */ -public class DolState { +public class DolStamp { private final long leadershipTerm; private final String hostId; private volatile boolean dolProduced; // DoL message was acked by broker private volatile boolean dolConsumed; // DoL message was consumed back by this replica private volatile CompletableFuture dolProduceFuture; // Future tracking DoL produce result - public DolState(long leadershipTerm, String hostId) { + public DolStamp(long leadershipTerm, String hostId) { this.leadershipTerm = leadershipTerm; this.hostId = hostId; this.dolProduced = false; @@ -61,7 +61,7 @@ public boolean isReady() { @Override public String toString() { - return "DolState{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed=" + return "DolStamp{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed=" + dolConsumed + ", futureSet=" + (dolProduceFuture != null) + "}"; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index c64fedf4e4c..304535f4e93 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -866,11 +866,11 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum return; } - DolState dolState = new DolState(leadershipTerm, veniceWriter.get().getWriterId()); - partitionConsumptionState.setDolState(dolState); + DolStamp dolStamp = new DolStamp(leadershipTerm, veniceWriter.get().getWriterId()); + partitionConsumptionState.setDolState(dolStamp); LOGGER.info( "Initialized DoL state: {} for replica: {} with term: {} and hostId: {}", - dolState, + dolStamp, partitionConsumptionState.getReplicaId(), leadershipTerm, veniceWriter.get().getWriterId()); @@ -880,8 +880,8 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum CompletableFuture dolProduceFuture = veniceWriter.get() .sendDoLStamp(partitionConsumptionState.getReplicaTopicPartition(), dolCallback, leadershipTerm); - // Store the produce future in DolState - dolState.setDolProduceFuture(dolProduceFuture); + // Store the produce future in DolStamp + dolStamp.setDolProduceFuture(dolProduceFuture); // Chain logging for produce completion dolProduceFuture.thenAccept((result) -> { @@ -926,21 +926,21 @@ public void onCompletion(PubSubProduceResult produceResult, Exception exception) } // Mark DoL as produced - DolState dolState = pcs.getDolState(); - if (dolState != null && dolState.getLeadershipTerm() == leadershipTerm) { - dolState.setDolProduced(true); + DolStamp dolStamp = pcs.getDolState(); + if (dolStamp != null && dolStamp.getLeadershipTerm() == leadershipTerm) { + dolStamp.setDolProduced(true); LOGGER.info( - "DoL message produce confirmed for partition {} at position {} (term: {}) - dolState: {}", + "DoL message produce confirmed for partition {} at position {} (term: {}) - dolStamp: {}", pcs.getPartition(), produceResult.getPubSubPosition(), leadershipTerm, - dolState); + dolStamp); } else { LOGGER.warn( - "DoL state mismatch or null for partition {} - expected term: {}, dolState: {}", + "DoL state mismatch or null for partition {} - expected term: {}, dolStamp: {}", pcs.getPartition(), leadershipTerm, - dolState); + dolStamp); } } } @@ -954,21 +954,21 @@ public void onCompletion(PubSubProduceResult produceResult, Exception exception) */ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) { // Check if DoL mechanism is enabled via config (system stores vs user stores) - DolState dolState = pcs.getDolState(); - if (shouldUseDolMechanism() && dolState != null) { + DolStamp dolStamp = pcs.getDolState(); + if (shouldUseDolMechanism() && dolStamp != null) { // Check if DoL state is ready (both produced and consumed) - if (dolState.isReady()) { + if (dolStamp.isReady()) { LOGGER.info( - "DoL mechanism complete for replica: {} - switching to leader topic. DolState: {}", + "DoL mechanism complete for replica: {} - switching to leader topic. DolStamp: {}", pcs.getReplicaId(), - dolState); + dolStamp); // Clear DoL state as we're done with this transition pcs.clearDolState(); return true; } // DoL not ready yet, stay on local VT - LOGGER.info("DoL mechanism not ready for partition {} - DolState: {}", pcs.getReplicaId(), dolState); + LOGGER.info("DoL mechanism not ready for partition {} - DolStamp: {}", pcs.getReplicaId(), dolStamp); return false; } else { // Use legacy time-based mechanism diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index a8e471b4252..3a92a66aac6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -129,7 +129,7 @@ enum LatchStatus { /** * Tracks DoL state during STANDBY to LEADER transition. Null when not in transition or DoL not enabled. */ - private volatile DolState dolState = null; + private volatile DolStamp dolStamp = null; /** * The highest leadership term observed by this replica. Currently used only @@ -533,16 +533,16 @@ public final LeaderFollowerStateType getLeaderFollowerState() { return this.leaderFollowerState; } - public DolState getDolState() { - return this.dolState; + public DolStamp getDolState() { + return this.dolStamp; } - public void setDolState(DolState dolState) { - this.dolState = dolState; + public void setDolState(DolStamp dolStamp) { + this.dolStamp = dolStamp; } public void clearDolState() { - this.dolState = null; + this.dolStamp = null; } public long getHighestLeadershipTerm() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index e6d5c4ccfdc..805c2a1f768 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -272,7 +272,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { /** * Keeps track of producer states inside version topic that drainer threads have processed so far. * Producers states in this validator will be flushed to the metadata partition of the storage engine regularly in - * {@link #syncOffset(String, PartitionConsumptionState)} + * {@link #syncOffset(PartitionConsumptionState)} * NOTE: consumerDiv will be used in place of this when {@link #isGlobalRtDivEnabled()} is true. */ private final DataIntegrityValidator drainerDiv; @@ -1335,7 +1335,7 @@ private int handleSingleMessage( /** * Checks if the consumed message is a DoL (Declaration of Leadership) stamp and marks it as consumed - * if it matches the current DolState. This method is called during message consumption to detect when + * if it matches the current DolStamp. This method is called during message consumption to detect when * the leader replica has successfully consumed back its own DoL stamp, indicating it's fully caught up * with the version topic and ready to switch to consuming from remote VT or RT. * @@ -1380,8 +1380,8 @@ private void checkAndHandleDoLMessage( } // Get current DoL state - may be null if not in STANDBY->LEADER transition - DolState currentDolState = partitionConsumptionState.getDolState(); - if (currentDolState == null) { + DolStamp currentDolStamp = partitionConsumptionState.getDolState(); + if (currentDolStamp == null) { // Not currently waiting for a DoL, just log for observability LOGGER.debug( "Replica: {} consumed DoL stamp for term: {} from host: {} (timestamp: {}), but not currently waiting for DoL", @@ -1393,32 +1393,32 @@ private void checkAndHandleDoLMessage( } // Check if this DoL matches the expected term and host - long expectedTermId = currentDolState.getLeadershipTerm(); - String expectedHostId = currentDolState.getHostId(); + long expectedTermId = currentDolStamp.getLeadershipTerm(); + String expectedHostId = currentDolStamp.getHostId(); if (consumedTermId == expectedTermId && consumedHostId.equals(expectedHostId)) { // Successfully consumed our own DoL stamp - mark as consumed - currentDolState.setDolConsumed(true); + currentDolStamp.setDolConsumed(true); LOGGER.info( "Replica {}: finished DoL loopback. The leader wrote its DoL stamp to the " + "local VT and successfully consumed it again, confirming the replica is " - + "fully caught up. [term={}, host={}, timestamp={}]. DolState={}", + + "fully caught up. [term={}, host={}, timestamp={}]. DolStamp={}", replicaId, consumedTermId, consumedHostId, messageTimestamp, - currentDolState); + currentDolStamp); } else { // Received a DoL stamp that doesn't match our expected state LOGGER.warn( "Replica: {} consumed DoL stamp with mismatched metadata. Expected: [term={}, host={}], Received: [term={}, host={}]. " - + "This may indicate a stale message or concurrent leadership changes. Current DolState: {}", + + "This may indicate a stale message or concurrent leadership changes. Current DolStamp: {}", replicaId, expectedTermId, expectedHostId, consumedTermId, consumedHostId, - currentDolState); + currentDolStamp); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 766efb1e2dd..b1bd67c90cc 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -2243,6 +2243,12 @@ public CompletableFuture sendDoLStamp( PubSubProducerCallback callback, long leadershipTerm) { KafkaMessageEnvelope kafkaMessageEnvelope = getDoLStampKME(leadershipTerm, writerId, heartBeatMessage); + + logger.info( + "Sending DoL stamp message to topic-partition {} for leadership term {} kme: {}", + topicPartition, + leadershipTerm, + kafkaMessageEnvelope); return producerAdapter.sendMessage( topicPartition.getPubSubTopic().getName(), topicPartition.getPartitionNumber(), From 1bbff558556233a9e1a977d0127d79b492d9801c Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Fri, 21 Nov 2025 15:20:24 -0800 Subject: [PATCH 4/4] Bug fixes Works well Full working copy Close parition in VW before sending DOL add latency Delete bogus test Fix flaky test Improve logs Fix flaky test --- .../ActiveActiveStoreIngestionTask.java | 4 +- .../davinci/kafka/consumer/DolStamp.java | 24 +- .../kafka/consumer/KafkaConsumerService.java | 8 +- .../LeaderFollowerStoreIngestionTask.java | 53 ++-- .../consumer/PartitionConsumptionState.java | 9 + .../PartitionWiseKafkaConsumerService.java | 3 +- .../kafka/consumer/SharedKafkaConsumer.java | 26 +- .../kafka/consumer/StoreIngestionTask.java | 239 ++++++++++-------- .../consumer/SharedKafkaConsumerTest.java | 7 +- .../consumer/StoreIngestionTaskTest.java | 13 +- .../helix/HelixReadWriteSchemaRepository.java | 5 +- .../linkedin/venice/writer/VeniceWriter.java | 11 +- .../linkedin/venice/endToEnd/TestHybrid.java | 4 +- .../utils/VeniceClusterWrapper.java | 2 +- .../resources/log4j2.properties | 52 +++- 15 files changed, 279 insertions(+), 181 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 558f934e4a7..66e1783156e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -988,7 +988,7 @@ protected Map calculateRtConsumptionStartPositions( pubSubAddress, sourceTopicPartition); PubSubTopicPartition newSourceTopicPartition = - resolveRtTopicPartitionWithPubSubBrokerAddress(newSourceTopic, pcs, pubSubAddress); + resolveTopicPartitionWithPubSubBrokerAddress(newSourceTopic, pcs, pubSubAddress); try { rtStartPosition = getRewindStartPositionForRealTimeTopic(pubSubAddress, newSourceTopicPartition, rewindStartTimestamp); @@ -1372,7 +1372,7 @@ Runnable buildRepairTask( return () -> { PubSubTopic pubSubTopic = sourceTopicPartition.getPubSubTopic(); PubSubTopicPartition resolvedTopicPartition = - resolveRtTopicPartitionWithPubSubBrokerAddress(pubSubTopic, pcs, sourceKafkaUrl); + resolveTopicPartitionWithPubSubBrokerAddress(pubSubTopic, pcs, sourceKafkaUrl); // Calculate upstream offset PubSubPosition upstreamOffset = getRewindStartPositionForRealTimeTopic(sourceKafkaUrl, resolvedTopicPartition, rewindStartTimestamp); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java index 53b979fdef5..79a65eb3fbb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java @@ -11,6 +11,7 @@ public class DolStamp { private final long leadershipTerm; private final String hostId; + private final long produceStartTimeMs; // Timestamp when DoL production started private volatile boolean dolProduced; // DoL message was acked by broker private volatile boolean dolConsumed; // DoL message was consumed back by this replica private volatile CompletableFuture dolProduceFuture; // Future tracking DoL produce result @@ -18,6 +19,7 @@ public class DolStamp { public DolStamp(long leadershipTerm, String hostId) { this.leadershipTerm = leadershipTerm; this.hostId = hostId; + this.produceStartTimeMs = System.currentTimeMillis(); this.dolProduced = false; this.dolConsumed = false; this.dolProduceFuture = null; @@ -59,9 +61,29 @@ public boolean isReady() { return dolProduced && dolConsumed; } + public long getProduceStartTimeMs() { + return produceStartTimeMs; + } + + /** + * Calculate latency from DoL production start to now. + * @return latency in milliseconds + */ + public long getLatencyMs() { + return System.currentTimeMillis() - produceStartTimeMs; + } + @Override public String toString() { + String produceResult = ""; + if (dolProduceFuture != null && dolProduceFuture.isDone()) { + try { + produceResult = ", offset=" + dolProduceFuture.get().getPubSubPosition(); + } catch (Exception e) { + // Ignore, keep empty + } + } return "DolStamp{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed=" - + dolConsumed + ", futureSet=" + (dolProduceFuture != null) + "}"; + + dolConsumed + produceResult + ", latencyMs=" + getLatencyMs() + "}"; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index 695c7ba67a5..bc799940d55 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -63,7 +63,7 @@ * c) {@link ConsumerSubscriptionCleaner} * 2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by * maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably, - * the {@link #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver)} function allows the + * the {@link #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, PubSubPosition, ConsumedDataReceiver)} function allows the * caller to start funneling consumed data into a receiver (i.e. into another task). * 3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption * load balancing strategy: {@link #pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)} @@ -118,7 +118,7 @@ protected KafkaConsumerService( final boolean isKafkaConsumerOffsetCollectionEnabled, final ReadOnlyStoreRepository metadataRepository, final boolean isUnregisterMetricForDeletedStoreEnabled, - VeniceServerConfig serverConfig) { + final VeniceServerConfig serverConfig) { this.kafkaUrl = consumerProperties.getProperty(KAFKA_BOOTSTRAP_SERVERS); this.kafkaUrlForLogger = Utils.getSanitizedStringForLogger(kafkaUrl); this.LOGGER = LogManager.getLogger( @@ -155,7 +155,9 @@ protected KafkaConsumerService( pubSubConsumerAdapterFactory.create(contextBuilder.build()), aggStats, this::recordPartitionsPerConsumerSensor, - this::handleUnsubscription); + this::handleUnsubscription, + serverConfig.getRegionName(), + i); Supplier>> pollFunction = liveConfigBasedKafkaThrottlingEnabled diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 304535f4e93..7497ebac7e0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -865,6 +865,8 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum partitionConsumptionState.getReplicaId()); return; } + // Close any existing VeniceWriter partition session + veniceWriter.get().closePartition(partitionConsumptionState.getPartition()); DolStamp dolStamp = new DolStamp(leadershipTerm, veniceWriter.get().getWriterId()); partitionConsumptionState.setDolState(dolStamp); @@ -878,26 +880,13 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum // Send DolStamp to local VT PubSubProducerCallback dolCallback = new DolStampProduceCallback(partitionConsumptionState, leadershipTerm); CompletableFuture dolProduceFuture = veniceWriter.get() - .sendDoLStamp(partitionConsumptionState.getReplicaTopicPartition(), dolCallback, leadershipTerm); - + .sendDoLStamp( + partitionConsumptionState.getReplicaTopicPartition(), + dolCallback, + leadershipTerm, + localKafkaClusterId); // Store the produce future in DolStamp dolStamp.setDolProduceFuture(dolProduceFuture); - - // Chain logging for produce completion - dolProduceFuture.thenAccept((result) -> { - LOGGER.info( - "DoL stamp produced for replica: {} at offset: {} for term: {}", - partitionConsumptionState.getReplicaId(), - result.getPubSubPosition(), - leadershipTerm); - }).exceptionally((ex) -> { - LOGGER.error( - "Failed to produce DoL stamp for replica: {} for term: {}", - partitionConsumptionState.getReplicaId(), - leadershipTerm, - ex); - return null; - }); } /** @@ -916,8 +905,8 @@ public DolStampProduceCallback(PartitionConsumptionState pcs, long leadershipTer public void onCompletion(PubSubProduceResult produceResult, Exception exception) { if (exception != null) { LOGGER.error( - "Failed to produce DoL message for partition {} with term {}", - pcs.getPartition(), + "Failed to produce DoL message for replica: {} with term: {}", + pcs.getReplicaId(), leadershipTerm, exception); // Clear DoL state on failure @@ -930,15 +919,15 @@ public void onCompletion(PubSubProduceResult produceResult, Exception exception) if (dolStamp != null && dolStamp.getLeadershipTerm() == leadershipTerm) { dolStamp.setDolProduced(true); LOGGER.info( - "DoL message produce confirmed for partition {} at position {} (term: {}) - dolStamp: {}", - pcs.getPartition(), + "DoL message produce confirmed for replica: {} at position: {} (term: {}) - dolStamp: {}", + pcs.getReplicaId(), produceResult.getPubSubPosition(), leadershipTerm, dolStamp); } else { LOGGER.warn( - "DoL state mismatch or null for partition {} - expected term: {}, dolStamp: {}", - pcs.getPartition(), + "DoL state mismatch or null for replica: {} - expected term: {}, dolStamp: {}", + pcs.getReplicaId(), leadershipTerm, dolStamp); } @@ -958,9 +947,11 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) { if (shouldUseDolMechanism() && dolStamp != null) { // Check if DoL state is ready (both produced and consumed) if (dolStamp.isReady()) { + long dolLatencyMs = dolStamp.getLatencyMs(); LOGGER.info( - "DoL mechanism complete for replica: {} - switching to leader topic. DolStamp: {}", + "DoL mechanism complete for replica: {} - unblocking switch to the leader topic. Total DoL latency: {} ms. DolStamp: {}", pcs.getReplicaId(), + dolLatencyMs, dolStamp); // Clear DoL state as we're done with this transition pcs.clearDolState(); @@ -968,7 +959,7 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) { } // DoL not ready yet, stay on local VT - LOGGER.info("DoL mechanism not ready for partition {} - DolStamp: {}", pcs.getReplicaId(), dolStamp); + LOGGER.debug("DoL mechanism not ready for replica: {} - DolStamp: {}", pcs.getReplicaId(), dolStamp); return false; } else { // Use legacy time-based mechanism @@ -1230,7 +1221,7 @@ void preparePositionCheckpointAndStartConsumptionAsLeader( syncConsumedUpstreamRTOffsetMapIfNeeded(pcs, Collections.singletonMap(pubSubAddress, startPos)); LOGGER.info( "Leader replica: {} started consuming: {} from: {}", - pcs.getReplicaId(), + pcs, Utils.getReplicaId(leaderTopic, pcs.getPartition()), startPos); } @@ -1336,6 +1327,14 @@ protected boolean shouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionS } private boolean isConsumingFromRemoteVersionTopic(PartitionConsumptionState partitionConsumptionState) { + // orint all three conditions for easier debugging + LOGGER.info( + "Checking remote VT consumption for replica: {}. EOP received: {}, isCurrentVersion: {}, nativeReplicationSourceVersionTopicKafkaURL: {}, localKafkaServer: {}", + partitionConsumptionState.getReplicaId(), + partitionConsumptionState.isEndOfPushReceived(), + isCurrentVersion.getAsBoolean(), + nativeReplicationSourceVersionTopicKafkaURL, + localKafkaServer); return !partitionConsumptionState.isEndOfPushReceived() && !isCurrentVersion.getAsBoolean() // Do not enable remote consumption for the source fabric leader. Otherwise, it will produce extra messages. && !Objects.equals(nativeReplicationSourceVersionTopicKafkaURL, localKafkaServer); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 3a92a66aac6..31076354d45 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -904,6 +904,8 @@ public void setLatestProcessedRemoteVtPosition(PubSubPosition upstreamVtPosition * @return the current upstream version topic position */ public PubSubPosition getLatestProcessedRemoteVtPosition() { + // TODO: Ideally, we should get this from offset record to ensure durability + // return this.offsetRecord.getCheckpointedRemoteVtPosition(); return this.latestProcessedRemoteVtPosition; } @@ -988,6 +990,13 @@ public PubSubPosition getLeaderPosition(String pubSubBrokerAddress, boolean useC ? getDivRtCheckpointPosition(pubSubBrokerAddress) : getLatestProcessedRtPosition(pubSubBrokerAddress); } else { + LOGGER.info( + "### Getting leader position for replica: {}. Remote consumption enabled: {} remote: {} local: {}", + getReplicaTopicPartition(), + consumeRemotely(), + getLatestProcessedRemoteVtPosition(), + getLatestProcessedVtPosition()); + return consumeRemotely() ? getLatestProcessedRemoteVtPosition() : getLatestProcessedVtPosition(); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionWiseKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionWiseKafkaConsumerService.java index b2bccdfa093..f7c1d38ac63 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionWiseKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionWiseKafkaConsumerService.java @@ -177,7 +177,8 @@ protected synchronized SharedKafkaConsumer pickConsumerForPartition( "Did not find a suitable consumer after checking " + consumersChecked + " instances."); } LOGGER.info( - "Get shared consumer for: {} from the ingestion task belonging to version topic: {} with index: {}", + "Get shared consumer: {} for: {} from the ingestion task belonging to version topic: {} with index: {}", + consumer, topicPartition, versionTopic, consumerIndex); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java index 4f29514e52b..8a8c87dd4de 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java @@ -72,6 +72,8 @@ class SharedKafkaConsumer implements PubSubConsumerAdapter { private final Time time; + private final String toString; + /** * Used to keep track of which version-topic is intended to use a given subscription, in order to detect * regressions where we would end up using this consumer to subscribe to a given topic-partition on behalf @@ -95,8 +97,10 @@ public SharedKafkaConsumer( PubSubConsumerAdapter delegate, AggKafkaConsumerServiceStats stats, Runnable assignmentChangeListener, - UnsubscriptionListener unsubscriptionListener) { - this(delegate, stats, assignmentChangeListener, unsubscriptionListener, new SystemTime()); + UnsubscriptionListener unsubscriptionListener, + String regionName, + int idx) { + this(delegate, stats, assignmentChangeListener, unsubscriptionListener, new SystemTime(), regionName, idx); } SharedKafkaConsumer( @@ -104,7 +108,9 @@ public SharedKafkaConsumer( AggKafkaConsumerServiceStats stats, Runnable assignmentChangeListener, UnsubscriptionListener unsubscriptionListener, - Time time) { + Time time, + String regionName, + int idx) { this.delegate = delegate; this.stats = stats; this.assignmentChangeListener = assignmentChangeListener; @@ -112,6 +118,7 @@ public SharedKafkaConsumer( this.time = time; this.currentAssignment = Collections.emptySet(); this.currentAssignmentSize = new AtomicInteger(0); + this.toString = String.format("SharedKafkaConsumer-%s:%s ", idx, regionName); } /** @@ -132,8 +139,7 @@ protected synchronized void updateCurrentAssignment(Set ne @UnderDevelopment(value = "This API may not be implemented in all PubSubConsumerAdapter implementations.") @Override public synchronized void subscribe(PubSubTopicPartition pubSubTopicPartition, PubSubPosition lastReadPubSubPosition) { - throw new VeniceException( - this.getClass().getSimpleName() + " does not support subscribe without specifying a version-topic."); + throw new VeniceException(this + " does not support subscribe without specifying a version-topic."); } @Override @@ -141,8 +147,7 @@ public void subscribe( @Nonnull PubSubTopicPartition pubSubTopicPartition, @Nonnull PubSubPosition position, boolean isInclusive) { - throw new VeniceException( - this.getClass().getSimpleName() + " does not support subscribe without specifying a version-topic."); + throw new VeniceException(this + " does not support subscribe without specifying a version-topic."); } synchronized void subscribe( @@ -209,7 +214,7 @@ protected synchronized void unSubscribeAction(Supplier long elapsedTime = System.currentTimeMillis() - startTime; LOGGER.info( "Shared consumer {} unsubscribed {} partition(s): ({}) in {} ms", - this.getClass().getSimpleName(), + this, topicPartitions.size(), topicPartitions, elapsedTime); @@ -419,4 +424,9 @@ public synchronized PubSubPosition decodePosition( ByteBuffer buffer) { return delegate.decodePosition(partition, positionTypeId, buffer); } + + @Override + public String toString() { + return toString; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 805c2a1f768..892f9777c86 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1333,101 +1333,6 @@ private int handleSingleMessage( return record.getPayloadSize(); } - /** - * Checks if the consumed message is a DoL (Declaration of Leadership) stamp and marks it as consumed - * if it matches the current DolStamp. This method is called during message consumption to detect when - * the leader replica has successfully consumed back its own DoL stamp, indicating it's fully caught up - * with the version topic and ready to switch to consuming from remote VT or RT. - * - * @param partitionConsumptionState the partition consumption state tracking DoL status - * @param dolPubSubMessage the consumed DoL PubSub message with leadership metadata - */ - private void checkAndHandleDoLMessage( - PartitionConsumptionState partitionConsumptionState, - DefaultPubSubMessage dolPubSubMessage) { - // Early exit if DoL mechanism is disabled via config - if (!shouldUseDolMechanism()) { - return; - } - - // Extract leadership metadata from the message - KafkaMessageEnvelope messageEnvelope = dolPubSubMessage.getValue(); - LeaderMetadata leaderMetadata = messageEnvelope.getLeaderMetadataFooter(); - if (leaderMetadata == null) { - // Message doesn't have leadership metadata, skip processing - return; - } - - // Extract term information from the consumed message - long consumedTermId = leaderMetadata.getTermId(); - String consumedHostId = leaderMetadata.getHostName().toString(); - long messageTimestamp = messageEnvelope.getProducerMetadata() != null - ? messageEnvelope.getProducerMetadata().getMessageTimestamp() - : -1; - String replicaId = partitionConsumptionState.getReplicaId(); - - // Update highest observed leadership term for monitoring - long previousHighestTerm = partitionConsumptionState.getHighestLeadershipTerm(); - if (consumedTermId > previousHighestTerm) { - partitionConsumptionState.setHighestLeadershipTerm(consumedTermId); - LOGGER.info( - "Replica: {} observed new highest leadership term: {} (previous: {}) from host: {} at timestamp: {}", - replicaId, - consumedTermId, - previousHighestTerm, - consumedHostId, - messageTimestamp); - } - - // Get current DoL state - may be null if not in STANDBY->LEADER transition - DolStamp currentDolStamp = partitionConsumptionState.getDolState(); - if (currentDolStamp == null) { - // Not currently waiting for a DoL, just log for observability - LOGGER.debug( - "Replica: {} consumed DoL stamp for term: {} from host: {} (timestamp: {}), but not currently waiting for DoL", - replicaId, - consumedTermId, - consumedHostId, - messageTimestamp); - return; - } - - // Check if this DoL matches the expected term and host - long expectedTermId = currentDolStamp.getLeadershipTerm(); - String expectedHostId = currentDolStamp.getHostId(); - - if (consumedTermId == expectedTermId && consumedHostId.equals(expectedHostId)) { - // Successfully consumed our own DoL stamp - mark as consumed - currentDolStamp.setDolConsumed(true); - LOGGER.info( - "Replica {}: finished DoL loopback. The leader wrote its DoL stamp to the " - + "local VT and successfully consumed it again, confirming the replica is " - + "fully caught up. [term={}, host={}, timestamp={}]. DolStamp={}", - replicaId, - consumedTermId, - consumedHostId, - messageTimestamp, - currentDolStamp); - } else { - // Received a DoL stamp that doesn't match our expected state - LOGGER.warn( - "Replica: {} consumed DoL stamp with mismatched metadata. Expected: [term={}, host={}], Received: [term={}, host={}]. " - + "This may indicate a stale message or concurrent leadership changes. Current DolStamp: {}", - replicaId, - expectedTermId, - expectedHostId, - consumedTermId, - consumedHostId, - currentDolStamp); - } - } - - protected boolean shouldUseDolMechanism() { - return isSystemStore - ? serverConfig.isLeaderHandoverUseDoLMechanismEnabledForSystemStores() - : serverConfig.isLeaderHandoverUseDoLMechanismEnabledForUserStores(); - } - /** * This function is in charge of producing the consumer records to the writer buffers maintained by {@link StoreBufferService}. * @@ -3553,22 +3458,19 @@ private int internalProcessConsumerRecord( consumerRecord.getPubSubMessageTime(), partitionConsumptionState); try { - if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue() - && Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) { - recordHeartbeatReceived(partitionConsumptionState, consumerRecord, kafkaUrl); - if (recordTransformer != null) { - recordTransformer.onHeartbeat( - consumerRecord.getTopicPartition().getPartitionNumber(), - consumerRecord.getPubSubMessageTime()); + if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue()) { + if (Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) { + recordHeartbeatReceived(partitionConsumptionState, consumerRecord, kafkaUrl); + if (recordTransformer != null) { + recordTransformer.onHeartbeat( + consumerRecord.getTopicPartition().getPartitionNumber(), + consumerRecord.getPubSubMessageTime()); + } + } else { + // Check and handle DoL message for non-heartbeat messages + checkAndHandleDoLMessage(partitionConsumptionState, consumerRecord); } } - // Check if this is a DoL (Declaration of Leadership) stamp message - if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue() - && Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.DOL_STAMP.getKey())) { - // Process DoL stamp to check if it matches the expected leadership term - checkAndHandleDoLMessage(partitionConsumptionState, consumerRecord); - } - } catch (Exception e) { LOGGER.error("Failed to record Record heartbeat with message: ", e); } @@ -3990,7 +3892,7 @@ public void consumerSubscribe( PubSubPosition startOffset, String kafkaURL) { PubSubTopicPartition resolvedTopicPartition = - resolveRtTopicPartitionWithPubSubBrokerAddress(pubSubTopic, partitionConsumptionState, kafkaURL); + resolveTopicPartitionWithPubSubBrokerAddress(pubSubTopic, partitionConsumptionState, kafkaURL); consumerSubscribe(resolvedTopicPartition, startOffset, kafkaURL); } @@ -5034,7 +4936,7 @@ public boolean isSeparatedRealtimeTopicEnabled() { * For RT input topic with separate-RT kafka URL, this method will return topic-partition with separated-RT topic. * For other case, it will return topic-partition with input topic. */ - PubSubTopicPartition resolveRtTopicPartitionWithPubSubBrokerAddress( + PubSubTopicPartition resolveTopicPartitionWithPubSubBrokerAddress( PubSubTopic topic, PartitionConsumptionState partitionConsumptionState, String pubSubBrokerAddress) { @@ -5320,6 +5222,121 @@ protected static void validateEndOfPushReceivedBeforeTopicSwitch( } } + /** + * Checks if the consumed message is a DoL (Declaration of Leadership) stamp and marks it as consumed + * if it matches the current DolStamp. This method is called during message consumption to detect when + * the leader replica has successfully consumed back its own DoL stamp, indicating it's fully caught up + * with the version topic and ready to switch to consuming from remote VT or RT. + * + * @param pcs the partition consumption state tracking DoL status + * @param dolPubSubMessage the consumed DoL PubSub message with leadership metadata + */ + private void checkAndHandleDoLMessage(PartitionConsumptionState pcs, DefaultPubSubMessage dolPubSubMessage) { + // Early exit if DoL mechanism is disabled via config or message key is not DoL stamp + if (!shouldUseDolMechanism() || !Arrays.equals(dolPubSubMessage.getKey().getKey(), KafkaKey.DOL_STAMP.getKey())) { + return; + } + + // Extract leadership metadata from the message + KafkaMessageEnvelope value = dolPubSubMessage.getValue(); + LeaderMetadata leaderMetadata = value.getLeaderMetadataFooter(); + if (leaderMetadata == null) { + // Message doesn't have leadership metadata, skip processing + return; + } + if (leaderMetadata.getUpstreamKafkaClusterId() != localKafkaClusterId) { + // Message is not from our own local region, skip processing + return; + } + + String consumedHostId = leaderMetadata.getHostName().toString(); + // Extract term information from the consumed message + long consumedTermId = leaderMetadata.getTermId(); + long messageTimestamp = + value.getProducerMetadata() != null ? value.getProducerMetadata().getMessageTimestamp() : -1; + String replicaId = pcs.getReplicaId(); + + // Update highest observed leadership term for monitoring + long previousHighestTerm = pcs.getHighestLeadershipTerm(); + if (consumedTermId > previousHighestTerm) { + pcs.setHighestLeadershipTerm(consumedTermId); + LOGGER.info( + "Replica: {} observed new highest leadership term: {} (previous: {}) from host: {} at timestamp: {}", + replicaId, + consumedTermId, + previousHighestTerm, + consumedHostId, + messageTimestamp); + } + DolStamp currentDolStamp = pcs.getDolState(); + // Get current DoL state - may be null if not in STANDBY->LEADER transition + if (currentDolStamp == null) { + // Not currently waiting for a DoL, just log for observability + LOGGER.debug( + "Replica: {} consumed DoL stamp for term: {} from host: {} (timestamp: {}), but not currently waiting for DoL", + replicaId, + consumedTermId, + consumedHostId, + messageTimestamp); + return; + } + + // Validate DoL matches expected term and host + long expectedTermId = currentDolStamp.getLeadershipTerm(); + String expectedHostId = currentDolStamp.getHostId(); + + // Ignore DoL from different host + if (!expectedHostId.equals(consumedHostId)) { + LOGGER.debug( + "Replica: {} ignoring DoL from different host. Expected: {}, received: {}", + replicaId, + expectedHostId, + consumedHostId); + return; + } + + // Ignore stale DoL from older term + if (consumedTermId < expectedTermId) { + LOGGER.debug( + "Replica: {} ignoring stale DoL from older term. Expected: {}, received: {}", + replicaId, + expectedTermId, + consumedTermId); + return; + } + + // Handle DoL from future term - indicates race or concurrent leadership change + if (consumedTermId > expectedTermId) { + LOGGER.warn( + "Replica: {} consumed DoL from future term. Expected: {}, received: {}. DolStamp: {}", + replicaId, + expectedTermId, + consumedTermId, + currentDolStamp); + return; + } + + // DoL loopback complete - term and host match + currentDolStamp.setDolConsumed(true); + long loopbackLatencyMs = currentDolStamp.getLatencyMs(); + LOGGER.info( + "Replica {}: DoL loopback complete - successfully produced to and consumed back from local VT, " + + "confirming replica is fully caught up and ready to switch to leader source topic. " + + "Loopback latency: {} ms. [term={}, host={}, timestamp={}]. DolStamp={}", + replicaId, + loopbackLatencyMs, + consumedTermId, + consumedHostId, + messageTimestamp, + currentDolStamp); + } + + protected boolean shouldUseDolMechanism() { + return isSystemStore + ? serverConfig.isLeaderHandoverUseDoLMechanismEnabledForSystemStores() + : serverConfig.isLeaderHandoverUseDoLMechanismEnabledForUserStores(); + } + AbstractStoreBufferService getStoreBufferService() { return storeBufferService; } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java index 4f387224acb..5347101fc7f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java @@ -48,7 +48,8 @@ public void setUp() { public void testSubscriptionEmptyPoll() { PubSubTopic nonExistingTopic1 = pubSubTopicRepository.getTopic("nonExistingTopic1_v3"); - SharedKafkaConsumer sharedConsumer = new SharedKafkaConsumer(consumer, stats, () -> {}, (c, vt, tp) -> {}); + SharedKafkaConsumer sharedConsumer = + new SharedKafkaConsumer(consumer, stats, () -> {}, (c, vt, tp) -> {}, "region1", 1); Set assignmentReturnedConsumer = new HashSet<>(); PubSubTopicPartition nonExistentPubSubTopicPartition = new PubSubTopicPartitionImpl(nonExistingTopic1, 1); @@ -81,7 +82,9 @@ private void setUpSharedConsumer() { stats, assignmentChangeListener, unsubscriptionListener, - new SystemTime()); + new SystemTime(), + "region1", + 1); topicPartitions = new HashSet<>(); topicPartitions.add(mock(PubSubTopicPartition.class)); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index adf2d0d38e9..f01fe3d9222 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -4367,7 +4367,7 @@ public void testLeaderShouldSubscribeToCorrectVTOffset() { doCallRealMethod().when(leaderFollowerStoreIngestionTask).startConsumingAsLeader(any()); doCallRealMethod().when(leaderFollowerStoreIngestionTask) - .resolveRtTopicPartitionWithPubSubBrokerAddress(any(), any(), any()); + .resolveTopicPartitionWithPubSubBrokerAddress(any(), any(), any()); doReturn(false).when(leaderFollowerStoreIngestionTask).shouldNewLeaderSwitchToRemoteConsumption(any()); Set kafkaServerSet = new HashSet<>(); kafkaServerSet.add("localhost"); @@ -6071,11 +6071,10 @@ public void testLoadGlobalRtDiv() { } @Test - public void testResolveRtTopicPartitionWithPubSubBrokerAddress() throws NoSuchFieldException, IllegalAccessException { + public void testResolveTopicPartitionWithPubSubBrokerAddress() throws NoSuchFieldException, IllegalAccessException { StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class); Function resolver = Utils::resolveKafkaUrlForSepTopic; - doCallRealMethod().when(storeIngestionTask) - .resolveRtTopicPartitionWithPubSubBrokerAddress(any(), any(), anyString()); + doCallRealMethod().when(storeIngestionTask).resolveTopicPartitionWithPubSubBrokerAddress(any(), any(), anyString()); doCallRealMethod().when(storeIngestionTask).resolveRtTopicWithPubSubBrokerAddress(any(), anyString()); doReturn(pubSubTopicRepository).when(storeIngestionTask).getPubSubTopicRepository(); doReturn(resolver).when(storeIngestionTask).getKafkaClusterUrlResolver(); @@ -6094,14 +6093,14 @@ public void testResolveRtTopicPartitionWithPubSubBrokerAddress() throws NoSuchFi field.set(storeIngestionTask, separateRealTimeTopic); PubSubTopicPartition resolvedRtTopicPartition = - storeIngestionTask.resolveRtTopicPartitionWithPubSubBrokerAddress(realTimeTopic, pcs, kafkaUrl); + storeIngestionTask.resolveTopicPartitionWithPubSubBrokerAddress(realTimeTopic, pcs, kafkaUrl); Assert.assertEquals(resolvedRtTopicPartition.getPubSubTopic(), realTimeTopic); Assert.assertEquals( - storeIngestionTask.resolveRtTopicPartitionWithPubSubBrokerAddress(versionTopic, pcs, kafkaUrl).getPubSubTopic(), + storeIngestionTask.resolveTopicPartitionWithPubSubBrokerAddress(versionTopic, pcs, kafkaUrl).getPubSubTopic(), versionTopic); Assert.assertEquals( storeIngestionTask - .resolveRtTopicPartitionWithPubSubBrokerAddress(realTimeTopic, pcs, kafkaUrl + Utils.SEPARATE_TOPIC_SUFFIX) + .resolveTopicPartitionWithPubSubBrokerAddress(realTimeTopic, pcs, kafkaUrl + Utils.SEPARATE_TOPIC_SUFFIX) .getPubSubTopic(), separateRealTimeTopic); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java index bfc2a1e20d0..e0c4cdfc73a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java @@ -433,7 +433,10 @@ private int getNextAvailableSchemaId( }).max(Integer::compare).get() + 1; } } catch (SchemaDuplicateException e) { - logger.warn("Exception occurred while fetching next available schemaId. Msg: {}", e.getMessage()); + logger.warn( + "Exception occurred while fetching next available schemaId for store: {}. Msg: {}", + storeName, + e.getMessage()); newValueSchemaId = SchemaData.DUPLICATE_VALUE_SCHEMA_CODE; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index b1bd67c90cc..07b21081fea 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -2205,6 +2205,7 @@ public Future asyncSendControlMessage( */ public static KafkaMessageEnvelope getDoLStampKME( long leadershipTerm, + int localPubSubClusterId, String writerId, ControlMessage dolStampMessage) { ProducerMetadata producerMetadata = new ProducerMetadata(); @@ -2217,7 +2218,9 @@ public static KafkaMessageEnvelope getDoLStampKME( LeaderMetadata leaderMetadataFooter = new LeaderMetadata(); leaderMetadataFooter.hostName = writerId; leaderMetadataFooter.termId = leadershipTerm; - leaderMetadataFooter.upstreamPubSubPosition = ByteBuffer.allocate(0); // Initialize to empty ByteBuffer + leaderMetadataFooter.upstreamOffset = -1; // Indicate no upstream offset + leaderMetadataFooter.upstreamPubSubPosition = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer(); + leaderMetadataFooter.upstreamKafkaClusterId = localPubSubClusterId; KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); kafkaMessageEnvelope.messageType = MessageType.CONTROL_MESSAGE.getValue(); @@ -2241,8 +2244,10 @@ public static KafkaMessageEnvelope getDoLStampKME( public CompletableFuture sendDoLStamp( PubSubTopicPartition topicPartition, PubSubProducerCallback callback, - long leadershipTerm) { - KafkaMessageEnvelope kafkaMessageEnvelope = getDoLStampKME(leadershipTerm, writerId, heartBeatMessage); + long leadershipTerm, + int localPubSubClusterId) { + KafkaMessageEnvelope kafkaMessageEnvelope = + getDoLStampKME(leadershipTerm, localPubSubClusterId, writerId, heartBeatMessage); logger.info( "Sending DoL stamp message to topic-partition {} for leadership term {} kme: {}", diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java index 7f55881a6bb..31c7fd43b48 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java @@ -1068,7 +1068,7 @@ public void testHybridWithZeroLagThreshold() throws Exception { IntStream.range(0, 10).mapToObj(i -> new AbstractMap.SimpleEntry<>(i, i))); } - @Test + @Test(timeOut = 120 * Time.MS_PER_SECOND) public void testHybridStoreLogCompaction() throws Exception { UpdateStoreQueryParams params = new UpdateStoreQueryParams() // set hybridRewindSecond to a big number so following versions won't ignore old records in RT @@ -1076,7 +1076,7 @@ public void testHybridStoreLogCompaction() throws Exception { .setHybridOffsetLagThreshold(0) .setPartitionCount(2) .setActiveActiveReplicationEnabled(true); - String storeName = Utils.getUniqueString("store"); + String storeName = Utils.getUniqueString("store-for-lc"); sharedVenice.useControllerClient(client -> { client.createNewStore(storeName, "owner", DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA); client.updateStore(storeName, params); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java index e43fa94aa0c..ffeb2ffa523 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java @@ -1302,7 +1302,7 @@ public void waitVersion(String storeName, int versionId) { } public void waitVersion(String storeName, int versionId, ControllerClient client) { - TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { + TestUtils.waitForNonDeterministicAssertion(120, TimeUnit.SECONDS, true, () -> { String kafkaTopic = Version.composeKafkaTopic(storeName, versionId); JobStatusQueryResponse response = TestUtils.assertCommand(client.queryJobStatus(kafkaTopic)); if (response.getStatus().equals(ExecutionStatus.ERROR.toString())) { diff --git a/internal/venice-test-common/src/integrationTest/resources/log4j2.properties b/internal/venice-test-common/src/integrationTest/resources/log4j2.properties index aa243388041..3becc9f9120 100644 --- a/internal/venice-test-common/src/integrationTest/resources/log4j2.properties +++ b/internal/venice-test-common/src/integrationTest/resources/log4j2.properties @@ -1,32 +1,60 @@ -status = error name = PropertiesConfig filters = threshold filter.threshold.type = ThresholdFilter -filter.threshold.level = debug +filter.threshold.level = info -appenders = console +#appenders = console appender.console.type = Console appender.console.name = STDOUT appender.console.layout.type = PatternLayout +#appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %p [%c{1}] [%t] %m%n appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} - [%X{logContext}] %p [%c{1}] [%t] %m%n +appender.rolling.type = RollingFile +appender.rolling.name = fileLogger +appender.rolling.fileName=/Users/sumane/logs/venice-test.log +appender.rolling.filePattern= ${basePath}/app_%d{yyyyMMdd}.log.gz +appender.rolling.layout.type = PatternLayout +appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} [%X{logContext}] %p [%c{1.}] [%t] %m%n +appender.rolling.policies.type = Policies + rootLogger.level = info -rootLogger.appenderRefs = stdout -rootLogger.appenderRef.stdout.ref = STDOUT +#rootLogger.appenderRefs = stdout +#rootLogger.appenderRef.stdout.ref = STDOUT +#rootLogger.appenderRefs = RollingFile +rootLogger.appenderRef.RollingFile.ref = fileLogger -# The following loggers produce excessive logs when set to INFO. Set them to ERROR to reduce log volume. logger.zk.name = org.apache.zookeeper logger.zk.level = error -logger.helix.name = org.apache.helix -logger.helix.level = error +logger.apache.name = org.apache +logger.apache.level = error logger.kafka.name = kafka -logger.kafka.level = error -logger.aKafka.name = org.apache.kafka -logger.aKafka.level = error +logger.kafka.level = off +#logger.kafkaClients.name = org.apache.kafka.clients +#logger.kafkaClients.level = INFO +#logger.kafkaClients.additivity = false +#logger.kafkaClients.appenderRef.console.ref = Console +# +## Disable Kafka server (broker) logs +#logger.kafkaServer.name = kafka +#logger.kafkaServer.level = OFF +#logger.kafkaServer.additivity = false +logger.kafka.clients.name = org.apache.kafka.clients +logger.kafka.clients.level = error logger.e.name = org.eclipse logger.e.level = off logger.d2.name = com.linkedin.d2 -logger.d2.level = error \ No newline at end of file +logger.d2.level = off +logger.venice.name = com.linkedin.venice +logger.venice.level = info +logger.vs.name = com.linkedin.venice.serialization.avro +logger.vs.level = off +logger.vu.name = com.linkedin.venice.utils +logger.vu.level = info +#turn off com.linkedin.venice.controllerapi.D2ControllerClient +logger.d2controller.name = com.linkedin.venice.controllerapi.D2ControllerClient +logger.d2controller.level = off +#com.linkedin.venice.helix