Skip to content

Commit ab9b98b

Browse files
committed
Cleanup
1 parent d55a8bc commit ab9b98b

File tree

5 files changed

+44
-38
lines changed

5 files changed

+44
-38
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolState.java renamed to clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
* Tracks Declaration of Leadership (DoL) state during STANDBY to LEADER transition.
99
* DoL mechanism ensures the new leader is fully caught up with VT before switching to remote VT or RT.
1010
*/
11-
public class DolState {
11+
public class DolStamp {
1212
private final long leadershipTerm;
1313
private final String hostId;
1414
private volatile boolean dolProduced; // DoL message was acked by broker
1515
private volatile boolean dolConsumed; // DoL message was consumed back by this replica
1616
private volatile CompletableFuture<PubSubProduceResult> dolProduceFuture; // Future tracking DoL produce result
1717

18-
public DolState(long leadershipTerm, String hostId) {
18+
public DolStamp(long leadershipTerm, String hostId) {
1919
this.leadershipTerm = leadershipTerm;
2020
this.hostId = hostId;
2121
this.dolProduced = false;
@@ -61,7 +61,7 @@ public boolean isReady() {
6161

6262
@Override
6363
public String toString() {
64-
return "DolState{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed="
64+
return "DolStamp{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed="
6565
+ dolConsumed + ", futureSet=" + (dolProduceFuture != null) + "}";
6666
}
6767
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -866,11 +866,11 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum
866866
return;
867867
}
868868

869-
DolState dolState = new DolState(leadershipTerm, veniceWriter.get().getWriterId());
870-
partitionConsumptionState.setDolState(dolState);
869+
DolStamp dolStamp = new DolStamp(leadershipTerm, veniceWriter.get().getWriterId());
870+
partitionConsumptionState.setDolState(dolStamp);
871871
LOGGER.info(
872872
"Initialized DoL state: {} for replica: {} with term: {} and hostId: {}",
873-
dolState,
873+
dolStamp,
874874
partitionConsumptionState.getReplicaId(),
875875
leadershipTerm,
876876
veniceWriter.get().getWriterId());
@@ -880,8 +880,8 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum
880880
CompletableFuture<PubSubProduceResult> dolProduceFuture = veniceWriter.get()
881881
.sendDoLStamp(partitionConsumptionState.getReplicaTopicPartition(), dolCallback, leadershipTerm);
882882

883-
// Store the produce future in DolState
884-
dolState.setDolProduceFuture(dolProduceFuture);
883+
// Store the produce future in DolStamp
884+
dolStamp.setDolProduceFuture(dolProduceFuture);
885885

886886
// Chain logging for produce completion
887887
dolProduceFuture.thenAccept((result) -> {
@@ -926,21 +926,21 @@ public void onCompletion(PubSubProduceResult produceResult, Exception exception)
926926
}
927927

928928
// Mark DoL as produced
929-
DolState dolState = pcs.getDolState();
930-
if (dolState != null && dolState.getLeadershipTerm() == leadershipTerm) {
931-
dolState.setDolProduced(true);
929+
DolStamp dolStamp = pcs.getDolState();
930+
if (dolStamp != null && dolStamp.getLeadershipTerm() == leadershipTerm) {
931+
dolStamp.setDolProduced(true);
932932
LOGGER.info(
933-
"DoL message produce confirmed for partition {} at position {} (term: {}) - dolState: {}",
933+
"DoL message produce confirmed for partition {} at position {} (term: {}) - dolStamp: {}",
934934
pcs.getPartition(),
935935
produceResult.getPubSubPosition(),
936936
leadershipTerm,
937-
dolState);
937+
dolStamp);
938938
} else {
939939
LOGGER.warn(
940-
"DoL state mismatch or null for partition {} - expected term: {}, dolState: {}",
940+
"DoL state mismatch or null for partition {} - expected term: {}, dolStamp: {}",
941941
pcs.getPartition(),
942942
leadershipTerm,
943-
dolState);
943+
dolStamp);
944944
}
945945
}
946946
}
@@ -954,21 +954,21 @@ public void onCompletion(PubSubProduceResult produceResult, Exception exception)
954954
*/
955955
private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) {
956956
// Check if DoL mechanism is enabled via config (system stores vs user stores)
957-
DolState dolState = pcs.getDolState();
958-
if (shouldUseDolMechanism() && dolState != null) {
957+
DolStamp dolStamp = pcs.getDolState();
958+
if (shouldUseDolMechanism() && dolStamp != null) {
959959
// Check if DoL state is ready (both produced and consumed)
960-
if (dolState.isReady()) {
960+
if (dolStamp.isReady()) {
961961
LOGGER.info(
962-
"DoL mechanism complete for replica: {} - switching to leader topic. DolState: {}",
962+
"DoL mechanism complete for replica: {} - switching to leader topic. DolStamp: {}",
963963
pcs.getReplicaId(),
964-
dolState);
964+
dolStamp);
965965
// Clear DoL state as we're done with this transition
966966
pcs.clearDolState();
967967
return true;
968968
}
969969

970970
// DoL not ready yet, stay on local VT
971-
LOGGER.info("DoL mechanism not ready for partition {} - DolState: {}", pcs.getReplicaId(), dolState);
971+
LOGGER.info("DoL mechanism not ready for partition {} - DolStamp: {}", pcs.getReplicaId(), dolStamp);
972972
return false;
973973
} else {
974974
// Use legacy time-based mechanism

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ enum LatchStatus {
129129
/**
130130
* Tracks DoL state during STANDBY to LEADER transition. Null when not in transition or DoL not enabled.
131131
*/
132-
private volatile DolState dolState = null;
132+
private volatile DolStamp dolStamp = null;
133133

134134
/**
135135
* The highest leadership term observed by this replica. Currently used only
@@ -533,16 +533,16 @@ public final LeaderFollowerStateType getLeaderFollowerState() {
533533
return this.leaderFollowerState;
534534
}
535535

536-
public DolState getDolState() {
537-
return this.dolState;
536+
public DolStamp getDolState() {
537+
return this.dolStamp;
538538
}
539539

540-
public void setDolState(DolState dolState) {
541-
this.dolState = dolState;
540+
public void setDolState(DolStamp dolStamp) {
541+
this.dolStamp = dolStamp;
542542
}
543543

544544
public void clearDolState() {
545-
this.dolState = null;
545+
this.dolStamp = null;
546546
}
547547

548548
public long getHighestLeadershipTerm() {

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
272272
/**
273273
* Keeps track of producer states inside version topic that drainer threads have processed so far.
274274
* Producers states in this validator will be flushed to the metadata partition of the storage engine regularly in
275-
* {@link #syncOffset(String, PartitionConsumptionState)}
275+
* {@link #syncOffset(PartitionConsumptionState)}
276276
* NOTE: consumerDiv will be used in place of this when {@link #isGlobalRtDivEnabled()} is true.
277277
*/
278278
private final DataIntegrityValidator drainerDiv;
@@ -1335,7 +1335,7 @@ private int handleSingleMessage(
13351335

13361336
/**
13371337
* Checks if the consumed message is a DoL (Declaration of Leadership) stamp and marks it as consumed
1338-
* if it matches the current DolState. This method is called during message consumption to detect when
1338+
* if it matches the current DolStamp. This method is called during message consumption to detect when
13391339
* the leader replica has successfully consumed back its own DoL stamp, indicating it's fully caught up
13401340
* with the version topic and ready to switch to consuming from remote VT or RT.
13411341
*
@@ -1380,8 +1380,8 @@ private void checkAndHandleDoLMessage(
13801380
}
13811381

13821382
// Get current DoL state - may be null if not in STANDBY->LEADER transition
1383-
DolState currentDolState = partitionConsumptionState.getDolState();
1384-
if (currentDolState == null) {
1383+
DolStamp currentDolStamp = partitionConsumptionState.getDolState();
1384+
if (currentDolStamp == null) {
13851385
// Not currently waiting for a DoL, just log for observability
13861386
LOGGER.debug(
13871387
"Replica: {} consumed DoL stamp for term: {} from host: {} (timestamp: {}), but not currently waiting for DoL",
@@ -1393,32 +1393,32 @@ private void checkAndHandleDoLMessage(
13931393
}
13941394

13951395
// Check if this DoL matches the expected term and host
1396-
long expectedTermId = currentDolState.getLeadershipTerm();
1397-
String expectedHostId = currentDolState.getHostId();
1396+
long expectedTermId = currentDolStamp.getLeadershipTerm();
1397+
String expectedHostId = currentDolStamp.getHostId();
13981398

13991399
if (consumedTermId == expectedTermId && consumedHostId.equals(expectedHostId)) {
14001400
// Successfully consumed our own DoL stamp - mark as consumed
1401-
currentDolState.setDolConsumed(true);
1401+
currentDolStamp.setDolConsumed(true);
14021402
LOGGER.info(
14031403
"Replica {}: finished DoL loopback. The leader wrote its DoL stamp to the "
14041404
+ "local VT and successfully consumed it again, confirming the replica is "
1405-
+ "fully caught up. [term={}, host={}, timestamp={}]. DolState={}",
1405+
+ "fully caught up. [term={}, host={}, timestamp={}]. DolStamp={}",
14061406
replicaId,
14071407
consumedTermId,
14081408
consumedHostId,
14091409
messageTimestamp,
1410-
currentDolState);
1410+
currentDolStamp);
14111411
} else {
14121412
// Received a DoL stamp that doesn't match our expected state
14131413
LOGGER.warn(
14141414
"Replica: {} consumed DoL stamp with mismatched metadata. Expected: [term={}, host={}], Received: [term={}, host={}]. "
1415-
+ "This may indicate a stale message or concurrent leadership changes. Current DolState: {}",
1415+
+ "This may indicate a stale message or concurrent leadership changes. Current DolStamp: {}",
14161416
replicaId,
14171417
expectedTermId,
14181418
expectedHostId,
14191419
consumedTermId,
14201420
consumedHostId,
1421-
currentDolState);
1421+
currentDolStamp);
14221422
}
14231423
}
14241424

internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2243,6 +2243,12 @@ public CompletableFuture<PubSubProduceResult> sendDoLStamp(
22432243
PubSubProducerCallback callback,
22442244
long leadershipTerm) {
22452245
KafkaMessageEnvelope kafkaMessageEnvelope = getDoLStampKME(leadershipTerm, writerId, heartBeatMessage);
2246+
2247+
logger.info(
2248+
"Sending DoL stamp message to topic-partition {} for leadership term {} kme: {}",
2249+
topicPartition,
2250+
leadershipTerm,
2251+
kafkaMessageEnvelope);
22462252
return producerAdapter.sendMessage(
22472253
topicPartition.getPubSubTopic().getName(),
22482254
topicPartition.getPartitionNumber(),

0 commit comments

Comments
 (0)