Skip to content

Commit 1bbff55

Browse files
committed
Bug fixes
Works well Full working copy Close parition in VW before sending DOL add latency Delete bogus test Fix flaky test Improve logs Fix flaky test
1 parent ab9b98b commit 1bbff55

File tree

15 files changed

+279
-181
lines changed

15 files changed

+279
-181
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ protected Map<String, PubSubPosition> calculateRtConsumptionStartPositions(
988988
pubSubAddress,
989989
sourceTopicPartition);
990990
PubSubTopicPartition newSourceTopicPartition =
991-
resolveRtTopicPartitionWithPubSubBrokerAddress(newSourceTopic, pcs, pubSubAddress);
991+
resolveTopicPartitionWithPubSubBrokerAddress(newSourceTopic, pcs, pubSubAddress);
992992
try {
993993
rtStartPosition =
994994
getRewindStartPositionForRealTimeTopic(pubSubAddress, newSourceTopicPartition, rewindStartTimestamp);
@@ -1372,7 +1372,7 @@ Runnable buildRepairTask(
13721372
return () -> {
13731373
PubSubTopic pubSubTopic = sourceTopicPartition.getPubSubTopic();
13741374
PubSubTopicPartition resolvedTopicPartition =
1375-
resolveRtTopicPartitionWithPubSubBrokerAddress(pubSubTopic, pcs, sourceKafkaUrl);
1375+
resolveTopicPartitionWithPubSubBrokerAddress(pubSubTopic, pcs, sourceKafkaUrl);
13761376
// Calculate upstream offset
13771377
PubSubPosition upstreamOffset =
13781378
getRewindStartPositionForRealTimeTopic(sourceKafkaUrl, resolvedTopicPartition, rewindStartTimestamp);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/DolStamp.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
public class DolStamp {
1212
private final long leadershipTerm;
1313
private final String hostId;
14+
private final long produceStartTimeMs; // Timestamp when DoL production started
1415
private volatile boolean dolProduced; // DoL message was acked by broker
1516
private volatile boolean dolConsumed; // DoL message was consumed back by this replica
1617
private volatile CompletableFuture<PubSubProduceResult> dolProduceFuture; // Future tracking DoL produce result
1718

1819
public DolStamp(long leadershipTerm, String hostId) {
1920
this.leadershipTerm = leadershipTerm;
2021
this.hostId = hostId;
22+
this.produceStartTimeMs = System.currentTimeMillis();
2123
this.dolProduced = false;
2224
this.dolConsumed = false;
2325
this.dolProduceFuture = null;
@@ -59,9 +61,29 @@ public boolean isReady() {
5961
return dolProduced && dolConsumed;
6062
}
6163

64+
public long getProduceStartTimeMs() {
65+
return produceStartTimeMs;
66+
}
67+
68+
/**
69+
* Calculate latency from DoL production start to now.
70+
* @return latency in milliseconds
71+
*/
72+
public long getLatencyMs() {
73+
return System.currentTimeMillis() - produceStartTimeMs;
74+
}
75+
6276
@Override
6377
public String toString() {
78+
String produceResult = "";
79+
if (dolProduceFuture != null && dolProduceFuture.isDone()) {
80+
try {
81+
produceResult = ", offset=" + dolProduceFuture.get().getPubSubPosition();
82+
} catch (Exception e) {
83+
// Ignore, keep empty
84+
}
85+
}
6486
return "DolStamp{term=" + leadershipTerm + ", host=" + hostId + ", produced=" + dolProduced + ", consumed="
65-
+ dolConsumed + ", futureSet=" + (dolProduceFuture != null) + "}";
87+
+ dolConsumed + produceResult + ", latencyMs=" + getLatencyMs() + "}";
6688
}
6789
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
* c) {@link ConsumerSubscriptionCleaner}
6464
* 2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by
6565
* maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably,
66-
* the {@link #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver)} function allows the
66+
* the {@link #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, PubSubPosition, ConsumedDataReceiver)} function allows the
6767
* caller to start funneling consumed data into a receiver (i.e. into another task).
6868
* 3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption
6969
* load balancing strategy: {@link #pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)}
@@ -118,7 +118,7 @@ protected KafkaConsumerService(
118118
final boolean isKafkaConsumerOffsetCollectionEnabled,
119119
final ReadOnlyStoreRepository metadataRepository,
120120
final boolean isUnregisterMetricForDeletedStoreEnabled,
121-
VeniceServerConfig serverConfig) {
121+
final VeniceServerConfig serverConfig) {
122122
this.kafkaUrl = consumerProperties.getProperty(KAFKA_BOOTSTRAP_SERVERS);
123123
this.kafkaUrlForLogger = Utils.getSanitizedStringForLogger(kafkaUrl);
124124
this.LOGGER = LogManager.getLogger(
@@ -155,7 +155,9 @@ protected KafkaConsumerService(
155155
pubSubConsumerAdapterFactory.create(contextBuilder.build()),
156156
aggStats,
157157
this::recordPartitionsPerConsumerSensor,
158-
this::handleUnsubscription);
158+
this::handleUnsubscription,
159+
serverConfig.getRegionName(),
160+
i);
159161

160162
Supplier<Map<PubSubTopicPartition, List<DefaultPubSubMessage>>> pollFunction =
161163
liveConfigBasedKafkaThrottlingEnabled

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,8 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum
865865
partitionConsumptionState.getReplicaId());
866866
return;
867867
}
868+
// Close any existing VeniceWriter partition session
869+
veniceWriter.get().closePartition(partitionConsumptionState.getPartition());
868870

869871
DolStamp dolStamp = new DolStamp(leadershipTerm, veniceWriter.get().getWriterId());
870872
partitionConsumptionState.setDolState(dolStamp);
@@ -878,26 +880,13 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum
878880
// Send DolStamp to local VT
879881
PubSubProducerCallback dolCallback = new DolStampProduceCallback(partitionConsumptionState, leadershipTerm);
880882
CompletableFuture<PubSubProduceResult> dolProduceFuture = veniceWriter.get()
881-
.sendDoLStamp(partitionConsumptionState.getReplicaTopicPartition(), dolCallback, leadershipTerm);
882-
883+
.sendDoLStamp(
884+
partitionConsumptionState.getReplicaTopicPartition(),
885+
dolCallback,
886+
leadershipTerm,
887+
localKafkaClusterId);
883888
// Store the produce future in DolStamp
884889
dolStamp.setDolProduceFuture(dolProduceFuture);
885-
886-
// Chain logging for produce completion
887-
dolProduceFuture.thenAccept((result) -> {
888-
LOGGER.info(
889-
"DoL stamp produced for replica: {} at offset: {} for term: {}",
890-
partitionConsumptionState.getReplicaId(),
891-
result.getPubSubPosition(),
892-
leadershipTerm);
893-
}).exceptionally((ex) -> {
894-
LOGGER.error(
895-
"Failed to produce DoL stamp for replica: {} for term: {}",
896-
partitionConsumptionState.getReplicaId(),
897-
leadershipTerm,
898-
ex);
899-
return null;
900-
});
901890
}
902891

903892
/**
@@ -916,8 +905,8 @@ public DolStampProduceCallback(PartitionConsumptionState pcs, long leadershipTer
916905
public void onCompletion(PubSubProduceResult produceResult, Exception exception) {
917906
if (exception != null) {
918907
LOGGER.error(
919-
"Failed to produce DoL message for partition {} with term {}",
920-
pcs.getPartition(),
908+
"Failed to produce DoL message for replica: {} with term: {}",
909+
pcs.getReplicaId(),
921910
leadershipTerm,
922911
exception);
923912
// Clear DoL state on failure
@@ -930,15 +919,15 @@ public void onCompletion(PubSubProduceResult produceResult, Exception exception)
930919
if (dolStamp != null && dolStamp.getLeadershipTerm() == leadershipTerm) {
931920
dolStamp.setDolProduced(true);
932921
LOGGER.info(
933-
"DoL message produce confirmed for partition {} at position {} (term: {}) - dolStamp: {}",
934-
pcs.getPartition(),
922+
"DoL message produce confirmed for replica: {} at position: {} (term: {}) - dolStamp: {}",
923+
pcs.getReplicaId(),
935924
produceResult.getPubSubPosition(),
936925
leadershipTerm,
937926
dolStamp);
938927
} else {
939928
LOGGER.warn(
940-
"DoL state mismatch or null for partition {} - expected term: {}, dolStamp: {}",
941-
pcs.getPartition(),
929+
"DoL state mismatch or null for replica: {} - expected term: {}, dolStamp: {}",
930+
pcs.getReplicaId(),
942931
leadershipTerm,
943932
dolStamp);
944933
}
@@ -958,17 +947,19 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) {
958947
if (shouldUseDolMechanism() && dolStamp != null) {
959948
// Check if DoL state is ready (both produced and consumed)
960949
if (dolStamp.isReady()) {
950+
long dolLatencyMs = dolStamp.getLatencyMs();
961951
LOGGER.info(
962-
"DoL mechanism complete for replica: {} - switching to leader topic. DolStamp: {}",
952+
"DoL mechanism complete for replica: {} - unblocking switch to the leader topic. Total DoL latency: {} ms. DolStamp: {}",
963953
pcs.getReplicaId(),
954+
dolLatencyMs,
964955
dolStamp);
965956
// Clear DoL state as we're done with this transition
966957
pcs.clearDolState();
967958
return true;
968959
}
969960

970961
// DoL not ready yet, stay on local VT
971-
LOGGER.info("DoL mechanism not ready for partition {} - DolStamp: {}", pcs.getReplicaId(), dolStamp);
962+
LOGGER.debug("DoL mechanism not ready for replica: {} - DolStamp: {}", pcs.getReplicaId(), dolStamp);
972963
return false;
973964
} else {
974965
// Use legacy time-based mechanism
@@ -1230,7 +1221,7 @@ void preparePositionCheckpointAndStartConsumptionAsLeader(
12301221
syncConsumedUpstreamRTOffsetMapIfNeeded(pcs, Collections.singletonMap(pubSubAddress, startPos));
12311222
LOGGER.info(
12321223
"Leader replica: {} started consuming: {} from: {}",
1233-
pcs.getReplicaId(),
1224+
pcs,
12341225
Utils.getReplicaId(leaderTopic, pcs.getPartition()),
12351226
startPos);
12361227
}
@@ -1336,6 +1327,14 @@ protected boolean shouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionS
13361327
}
13371328

13381329
private boolean isConsumingFromRemoteVersionTopic(PartitionConsumptionState partitionConsumptionState) {
1330+
// orint all three conditions for easier debugging
1331+
LOGGER.info(
1332+
"Checking remote VT consumption for replica: {}. EOP received: {}, isCurrentVersion: {}, nativeReplicationSourceVersionTopicKafkaURL: {}, localKafkaServer: {}",
1333+
partitionConsumptionState.getReplicaId(),
1334+
partitionConsumptionState.isEndOfPushReceived(),
1335+
isCurrentVersion.getAsBoolean(),
1336+
nativeReplicationSourceVersionTopicKafkaURL,
1337+
localKafkaServer);
13391338
return !partitionConsumptionState.isEndOfPushReceived() && !isCurrentVersion.getAsBoolean()
13401339
// Do not enable remote consumption for the source fabric leader. Otherwise, it will produce extra messages.
13411340
&& !Objects.equals(nativeReplicationSourceVersionTopicKafkaURL, localKafkaServer);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,8 @@ public void setLatestProcessedRemoteVtPosition(PubSubPosition upstreamVtPosition
904904
* @return the current upstream version topic position
905905
*/
906906
public PubSubPosition getLatestProcessedRemoteVtPosition() {
907+
// TODO: Ideally, we should get this from offset record to ensure durability
908+
// return this.offsetRecord.getCheckpointedRemoteVtPosition();
907909
return this.latestProcessedRemoteVtPosition;
908910
}
909911

@@ -988,6 +990,13 @@ public PubSubPosition getLeaderPosition(String pubSubBrokerAddress, boolean useC
988990
? getDivRtCheckpointPosition(pubSubBrokerAddress)
989991
: getLatestProcessedRtPosition(pubSubBrokerAddress);
990992
} else {
993+
LOGGER.info(
994+
"### Getting leader position for replica: {}. Remote consumption enabled: {} remote: {} local: {}",
995+
getReplicaTopicPartition(),
996+
consumeRemotely(),
997+
getLatestProcessedRemoteVtPosition(),
998+
getLatestProcessedVtPosition());
999+
9911000
return consumeRemotely() ? getLatestProcessedRemoteVtPosition() : getLatestProcessedVtPosition();
9921001
}
9931002
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionWiseKafkaConsumerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ protected synchronized SharedKafkaConsumer pickConsumerForPartition(
177177
"Did not find a suitable consumer after checking " + consumersChecked + " instances.");
178178
}
179179
LOGGER.info(
180-
"Get shared consumer for: {} from the ingestion task belonging to version topic: {} with index: {}",
180+
"Get shared consumer: {} for: {} from the ingestion task belonging to version topic: {} with index: {}",
181+
consumer,
181182
topicPartition,
182183
versionTopic,
183184
consumerIndex);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ class SharedKafkaConsumer implements PubSubConsumerAdapter {
7272

7373
private final Time time;
7474

75+
private final String toString;
76+
7577
/**
7678
* Used to keep track of which version-topic is intended to use a given subscription, in order to detect
7779
* regressions where we would end up using this consumer to subscribe to a given topic-partition on behalf
@@ -95,23 +97,28 @@ public SharedKafkaConsumer(
9597
PubSubConsumerAdapter delegate,
9698
AggKafkaConsumerServiceStats stats,
9799
Runnable assignmentChangeListener,
98-
UnsubscriptionListener unsubscriptionListener) {
99-
this(delegate, stats, assignmentChangeListener, unsubscriptionListener, new SystemTime());
100+
UnsubscriptionListener unsubscriptionListener,
101+
String regionName,
102+
int idx) {
103+
this(delegate, stats, assignmentChangeListener, unsubscriptionListener, new SystemTime(), regionName, idx);
100104
}
101105

102106
SharedKafkaConsumer(
103107
PubSubConsumerAdapter delegate,
104108
AggKafkaConsumerServiceStats stats,
105109
Runnable assignmentChangeListener,
106110
UnsubscriptionListener unsubscriptionListener,
107-
Time time) {
111+
Time time,
112+
String regionName,
113+
int idx) {
108114
this.delegate = delegate;
109115
this.stats = stats;
110116
this.assignmentChangeListener = assignmentChangeListener;
111117
this.unsubscriptionListener = unsubscriptionListener;
112118
this.time = time;
113119
this.currentAssignment = Collections.emptySet();
114120
this.currentAssignmentSize = new AtomicInteger(0);
121+
this.toString = String.format("SharedKafkaConsumer-%s:%s ", idx, regionName);
115122
}
116123

117124
/**
@@ -132,17 +139,15 @@ protected synchronized void updateCurrentAssignment(Set<PubSubTopicPartition> ne
132139
@UnderDevelopment(value = "This API may not be implemented in all PubSubConsumerAdapter implementations.")
133140
@Override
134141
public synchronized void subscribe(PubSubTopicPartition pubSubTopicPartition, PubSubPosition lastReadPubSubPosition) {
135-
throw new VeniceException(
136-
this.getClass().getSimpleName() + " does not support subscribe without specifying a version-topic.");
142+
throw new VeniceException(this + " does not support subscribe without specifying a version-topic.");
137143
}
138144

139145
@Override
140146
public void subscribe(
141147
@Nonnull PubSubTopicPartition pubSubTopicPartition,
142148
@Nonnull PubSubPosition position,
143149
boolean isInclusive) {
144-
throw new VeniceException(
145-
this.getClass().getSimpleName() + " does not support subscribe without specifying a version-topic.");
150+
throw new VeniceException(this + " does not support subscribe without specifying a version-topic.");
146151
}
147152

148153
synchronized void subscribe(
@@ -209,7 +214,7 @@ protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>
209214
long elapsedTime = System.currentTimeMillis() - startTime;
210215
LOGGER.info(
211216
"Shared consumer {} unsubscribed {} partition(s): ({}) in {} ms",
212-
this.getClass().getSimpleName(),
217+
this,
213218
topicPartitions.size(),
214219
topicPartitions,
215220
elapsedTime);
@@ -419,4 +424,9 @@ public synchronized PubSubPosition decodePosition(
419424
ByteBuffer buffer) {
420425
return delegate.decodePosition(partition, positionTypeId, buffer);
421426
}
427+
428+
@Override
429+
public String toString() {
430+
return toString;
431+
}
422432
}

0 commit comments

Comments
 (0)