Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES;
import static com.linkedin.venice.ConfigKeys.SERVER_LOAD_CONTROLLER_ACCEPT_MULTIPLIER;
Expand Down Expand Up @@ -666,6 +668,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final boolean validateSpecificSchemaEnabled;
private final boolean useMetricsBasedPositionInLagComputation;
private final boolean leaderHandoverUseDoLMechanismForSystemStores;
private final boolean leaderHandoverUseDoLMechanismForUserStores;
private final LogContext logContext;
private final IngestionTaskReusableObjects.Strategy ingestionTaskReusableObjectsStrategy;
private final boolean keyUrnCompressionEnabled;
Expand Down Expand Up @@ -1149,6 +1153,10 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getInt(SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_THRESHOLD_IN_SECONDS, 5);
this.useMetricsBasedPositionInLagComputation =
serverProperties.getBoolean(SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION, false);
this.leaderHandoverUseDoLMechanismForSystemStores =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES, false);
this.leaderHandoverUseDoLMechanismForUserStores =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES, false);
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
this.parallelResourceShutdownEnabled =
serverProperties.getBoolean(SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED, false);
Expand Down Expand Up @@ -2075,6 +2083,14 @@ public boolean isUseMetricsBasedPositionInLagComputationEnabled() {
return this.useMetricsBasedPositionInLagComputation;
}

public boolean isLeaderHandoverUseDoLMechanismEnabledForSystemStores() {
return this.leaderHandoverUseDoLMechanismForSystemStores;
}

public boolean isLeaderHandoverUseDoLMechanismEnabledForUserStores() {
return this.leaderHandoverUseDoLMechanismForUserStores;
}

public int getServerIngestionInfoLogLineLimit() {
return this.serverIngestionInfoLogLineLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,15 @@ public void onBecomeStandbyFromOffline(Message message, NotificationContext cont

@Transition(to = HelixState.LEADER_STATE, from = HelixState.STANDBY_STATE)
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
LeaderSessionIdChecker checker = new LeaderSessionIdChecker(leaderSessionId.incrementAndGet(), leaderSessionId);
/**
* Use the Helix message creation timestamp as the leadership term for the
* leader session. Record filtering based on leadership term validity is not
* enforced yet; the goal is to observe system behavior before turning on strict
* checks. This value also helps with diagnosing ordering and handover issues.
*/
long leadershipTerm = message.getCreateTimeStamp();
LeaderSessionIdChecker checker =
new LeaderSessionIdChecker(leadershipTerm, leaderSessionId.incrementAndGet(), leaderSessionId);
executeStateTransition(
message,
context,
Expand Down Expand Up @@ -223,14 +231,24 @@ public void onBecomeOfflineFromError(Message message, NotificationContext contex
public static class LeaderSessionIdChecker {
private final long assignedSessionId;
private final AtomicLong latestSessionIdHandle;
private final long leadershipTerm;

public LeaderSessionIdChecker(long assignedSessionId, AtomicLong latestSessionIdHandle) {
this(-1, assignedSessionId, latestSessionIdHandle);
}

public LeaderSessionIdChecker(long leadershipTerm, long assignedSessionId, AtomicLong latestSessionIdHandle) {
this.leadershipTerm = leadershipTerm;
this.assignedSessionId = assignedSessionId;
this.latestSessionIdHandle = latestSessionIdHandle;
}

public boolean isSessionIdValid() {
return assignedSessionId == latestSessionIdHandle.get();
}

public long getLeadershipTerm() {
return leadershipTerm;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ protected Map<String, PubSubPosition> calculateRtConsumptionStartPositions(
pubSubAddress,
sourceTopicPartition);
PubSubTopicPartition newSourceTopicPartition =
resolveRtTopicPartitionWithPubSubBrokerAddress(newSourceTopic, pcs, pubSubAddress);
resolveTopicPartitionWithPubSubBrokerAddress(newSourceTopic, pcs, pubSubAddress);
try {
rtStartPosition =
getRewindStartPositionForRealTimeTopic(pubSubAddress, newSourceTopicPartition, rewindStartTimestamp);
Expand Down Expand Up @@ -1372,7 +1372,7 @@ Runnable buildRepairTask(
return () -> {
PubSubTopic pubSubTopic = sourceTopicPartition.getPubSubTopic();
PubSubTopicPartition resolvedTopicPartition =
resolveRtTopicPartitionWithPubSubBrokerAddress(pubSubTopic, pcs, sourceKafkaUrl);
resolveTopicPartitionWithPubSubBrokerAddress(pubSubTopic, pcs, sourceKafkaUrl);
// Calculate upstream offset
PubSubPosition upstreamOffset =
getRewindStartPositionForRealTimeTopic(sourceKafkaUrl, resolvedTopicPartition, rewindStartTimestamp);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import java.util.concurrent.CompletableFuture;


/**
* Tracks Declaration of Leadership (DoL) state during STANDBY to LEADER transition.
* DoL mechanism ensures the new leader is fully caught up with VT before switching to remote VT or RT.
*/
public class DolStamp {
private final long leadershipTerm;
private final String hostId;
private final long produceStartTimeMs; // Timestamp when DoL production started
private volatile boolean dolProduced; // DoL message was acked by broker
private volatile boolean dolConsumed; // DoL message was consumed back by this replica
private volatile CompletableFuture<PubSubProduceResult> dolProduceFuture; // Future tracking DoL produce result

public DolStamp(long leadershipTerm, String hostId) {
this.leadershipTerm = leadershipTerm;
this.hostId = hostId;
this.produceStartTimeMs = System.currentTimeMillis();
this.dolProduced = false;
this.dolConsumed = false;
this.dolProduceFuture = null;
}

public long getLeadershipTerm() {
return leadershipTerm;
}

public String getHostId() {
return hostId;
}

public boolean isDolProduced() {
return dolProduced;
}

public void setDolProduced(boolean dolProduced) {
this.dolProduced = dolProduced;
}

public boolean isDolConsumed() {
return dolConsumed;
}

public void setDolConsumed(boolean dolConsumed) {
this.dolConsumed = dolConsumed;
}

public CompletableFuture<PubSubProduceResult> getDolProduceFuture() {
return dolProduceFuture;
}

public void setDolProduceFuture(CompletableFuture<PubSubProduceResult> dolProduceFuture) {
this.dolProduceFuture = dolProduceFuture;
}

public boolean isReady() {
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isReady() method reads two volatile boolean fields (dolProduced and dolConsumed) without synchronization. While each field read is atomic, the combined read is not atomic, which could lead to a race condition where the state appears inconsistent. For example, between reading dolProduced and dolConsumed, another thread could update one of these values.

Consider either:

  1. Making the method synchronized, or
  2. Using an AtomicInteger to track a combined state, or
  3. Adding a memory barrier or using AtomicBoolean fields with appropriate ordering guarantees

This is especially important since this method is used to determine when to switch the leader topic, which is a critical operation.

Suggested change
public boolean isReady() {
public synchronized boolean isReady() {

Copilot uses AI. Check for mistakes.
return dolProduced && dolConsumed;
}

public long getProduceStartTimeMs() {
return produceStartTimeMs;
}

/**
* Calculate latency from DoL production start to now.
* @return latency in milliseconds
*/
public long getLatencyMs() {
return System.currentTimeMillis() - produceStartTimeMs;
}

@Override
public String toString() {
String produceResult = "";
if (dolProduceFuture != null && dolProduceFuture.isDone()) {
try {
produceResult = ", offset=" + dolProduceFuture.get().getPubSubPosition();
} catch (Exception e) {
// Ignore, keep empty
Comment on lines +80 to +83
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The toString() method calls dolProduceFuture.get() which can block indefinitely if the future hasn't completed. Although there's a check for isDone(), this could still cause issues if toString() is called from logging in a performance-sensitive path. Consider using getNow(null) or adding a timeout to avoid blocking, or simply not including the offset in the toString output.

Suggested change
try {
produceResult = ", offset=" + dolProduceFuture.get().getPubSubPosition();
} catch (Exception e) {
// Ignore, keep empty
PubSubProduceResult result = dolProduceFuture.getNow(null);
if (result != null) {
produceResult = ", offset=" + result.getPubSubPosition();

Copilot uses AI. Check for mistakes.
}
}
return "DolStamp{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed="
+ dolConsumed + produceResult + ", latencyMs=" + getLatencyMs() + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* c) {@link ConsumerSubscriptionCleaner}
* 2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by
* maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably,
* the {@link #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver)} function allows the
* the {@link #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, PubSubPosition, ConsumedDataReceiver)} function allows the
* caller to start funneling consumed data into a receiver (i.e. into another task).
* 3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption
* load balancing strategy: {@link #pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)}
Expand Down Expand Up @@ -118,7 +118,7 @@ protected KafkaConsumerService(
final boolean isKafkaConsumerOffsetCollectionEnabled,
final ReadOnlyStoreRepository metadataRepository,
final boolean isUnregisterMetricForDeletedStoreEnabled,
VeniceServerConfig serverConfig) {
final VeniceServerConfig serverConfig) {
this.kafkaUrl = consumerProperties.getProperty(KAFKA_BOOTSTRAP_SERVERS);
this.kafkaUrlForLogger = Utils.getSanitizedStringForLogger(kafkaUrl);
this.LOGGER = LogManager.getLogger(
Expand Down Expand Up @@ -155,7 +155,9 @@ protected KafkaConsumerService(
pubSubConsumerAdapterFactory.create(contextBuilder.build()),
aggStats,
this::recordPartitionsPerConsumerSensor,
this::handleUnsubscription);
this::handleUnsubscription,
serverConfig.getRegionName(),
i);

Supplier<Map<PubSubTopicPartition, List<DefaultPubSubMessage>>> pollFunction =
liveConfigBasedKafkaThrottlingEnabled
Expand Down
Loading
Loading