Skip to content

Commit 28f76b0

Browse files
committed
[changelog][cdc] Add client side handling of new A/A version swap messages
1 parent 23f7509 commit 28f76b0

File tree

9 files changed

+877
-42
lines changed

9 files changed

+877
-42
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
8181
*/
8282
private PubSubConsumerAdapterFactory<? extends PubSubConsumerAdapter> pubSubConsumerAdapterFactory;
8383
private PubSubContext pubSubContext;
84+
private boolean versionSwapByControlMessageEnabled = false;
85+
/**
86+
* Client region name used for filtering version swap messages from other regions in A/A setup. The client will only
87+
* react to version swap messages with the same source region as the client region name.
88+
*/
89+
private String clientRegionName = "";
90+
/**
91+
* Total region count used for version swap in A/A setup. Each subscribed partition need to receive this many
92+
* corresponding version swap messages before it can safely go to the new version to ensure data completeness.
93+
*/
94+
private int totalRegionCount = 1;
8495

8596
public ChangelogClientConfig(String storeName) {
8697
this.innerClientConfig = new ClientConfig<>(storeName);
@@ -327,6 +338,33 @@ public boolean isStateful() {
327338
return this.isStateful;
328339
}
329340

341+
public boolean isVersionSwapByControlMessageEnabled() {
342+
return this.versionSwapByControlMessageEnabled;
343+
}
344+
345+
public ChangelogClientConfig setVersionSwapByControlMessageEnabled(boolean isVersionSwapByControlMessageEnabled) {
346+
this.versionSwapByControlMessageEnabled = isVersionSwapByControlMessageEnabled;
347+
return this;
348+
}
349+
350+
public String getClientRegionName() {
351+
return this.clientRegionName;
352+
}
353+
354+
public ChangelogClientConfig setClientRegionName(String clientRegionName) {
355+
this.clientRegionName = clientRegionName;
356+
return this;
357+
}
358+
359+
public int getTotalRegionCount() {
360+
return this.totalRegionCount;
361+
}
362+
363+
public ChangelogClientConfig setTotalRegionCount(int totalRegionCount) {
364+
this.totalRegionCount = totalRegionCount;
365+
return this;
366+
}
367+
330368
public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(ChangelogClientConfig<V> config) {
331369
ChangelogClientConfig<V> newConfig = new ChangelogClientConfig<V>().setStoreName(config.getStoreName())
332370
.setLocalD2ZkHosts(config.getLocalD2ZkHosts())
@@ -354,7 +392,10 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
354392
// Store version should not be cloned
355393
.setStoreVersion(null)
356394
// Is stateful config should not be cloned
357-
.setIsStateful(false);
395+
.setIsStateful(false)
396+
.setVersionSwapByControlMessageEnabled(config.isVersionSwapByControlMessageEnabled())
397+
.setClientRegionName(config.getClientRegionName())
398+
.setTotalRegionCount(config.getTotalRegionCount());
358399
return newConfig;
359400
}
360401

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter;
44
import com.linkedin.venice.exceptions.VeniceException;
55
import com.linkedin.venice.kafka.protocol.ControlMessage;
6+
import com.linkedin.venice.kafka.protocol.VersionSwap;
67
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
78
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
89
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
@@ -73,7 +74,8 @@ protected VeniceAfterImageConsumerImpl(
7374
storeRepository,
7475
storeName,
7576
changelogClientConfig.getConsumerName(),
76-
this.changeCaptureStats);
77+
this.changeCaptureStats,
78+
changelogClientConfig.isVersionSwapByControlMessageEnabled());
7779
}
7880

7981
@Override
@@ -178,6 +180,100 @@ protected static void adjustSeekCheckPointsBasedOnHeartbeats(
178180
}
179181
}
180182

183+
/**
184+
* Similar to {@link #internalSeekToEndOfPush} this can also be optimized later for a faster find.
185+
*/
186+
@Override
187+
protected CompletableFuture<Void> internalFindNewVersionCheckpoints(
188+
String oldVersionTopic,
189+
String newVersionTopic,
190+
long generationId,
191+
Set<Integer> partitions) {
192+
if (partitions.isEmpty()) {
193+
return CompletableFuture.completedFuture(null);
194+
}
195+
return CompletableFuture.supplyAsync(() -> {
196+
boolean lockAcquired = false;
197+
Map<Integer, VeniceChangeCoordinate> checkpoints = new HashMap<>();
198+
try {
199+
synchronized (internalSeekConsumer) {
200+
PubSubConsumerAdapter consumerAdapter = internalSeekConsumer.get();
201+
consumerAdapter.batchUnsubscribe(consumerAdapter.getAssignment());
202+
Map<PubSubTopicPartition, List<DefaultPubSubMessage>> polledResults;
203+
Map<Integer, Boolean> versionSwapConsumedPerPartitionMap = new HashMap<>();
204+
for (Integer partition: partitions) {
205+
versionSwapConsumedPerPartitionMap.put(partition, false);
206+
}
207+
List<PubSubTopicPartition> topicPartitionList = getPartitionListToSubscribe(
208+
partitions,
209+
Collections.EMPTY_SET,
210+
pubSubTopicRepository.getTopic(newVersionTopic));
211+
212+
for (PubSubTopicPartition topicPartition: topicPartitionList) {
213+
consumerAdapter.subscribe(topicPartition, PubSubSymbolicPosition.EARLIEST);
214+
}
215+
216+
// Poll until we receive the desired version swap message in the new version topic for each partition
217+
LOGGER.info(
218+
"Polling for version swap messages in: {} with generation id: {} for partitions: {}",
219+
newVersionTopic,
220+
generationId,
221+
partitions);
222+
while (!versionSwapConsumedPerPartitionMap.values().stream().allMatch(x -> x)) {
223+
polledResults = consumerAdapter.poll(5000L);
224+
for (Map.Entry<PubSubTopicPartition, List<DefaultPubSubMessage>> entry: polledResults.entrySet()) {
225+
PubSubTopicPartition pubSubTopicPartition = entry.getKey();
226+
List<DefaultPubSubMessage> messageList = entry.getValue();
227+
for (DefaultPubSubMessage message: messageList) {
228+
if (message.getKey().isControlMessage()) {
229+
ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion();
230+
ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage);
231+
if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) {
232+
VersionSwap versionSwap = (VersionSwap) controlMessage.getControlMessageUnion();
233+
// In theory just matching the generation id and source region should be sufficient but just to be
234+
// safe we will match all fields
235+
if (versionSwap.getGenerationId() == generationId
236+
&& versionSwap.getSourceRegion().toString().equals(clientRegionName)
237+
&& oldVersionTopic.equals(versionSwap.getOldServingVersionTopic().toString())
238+
&& newVersionTopic.equals(versionSwap.getNewServingVersionTopic().toString())) {
239+
LOGGER.info(
240+
"Found corresponding version swap message for partition: {}",
241+
pubSubTopicPartition.getPartitionNumber());
242+
versionSwapConsumedPerPartitionMap.put(pubSubTopicPartition.getPartitionNumber(), true);
243+
VeniceChangeCoordinate coordinate = new VeniceChangeCoordinate(
244+
pubSubTopicPartition.getPubSubTopic().getName(),
245+
message.getPosition(),
246+
pubSubTopicPartition.getPartitionNumber());
247+
checkpoints.put(pubSubTopicPartition.getPartitionNumber(), coordinate);
248+
// We are done with this partition
249+
consumerAdapter.unSubscribe(pubSubTopicPartition);
250+
break;
251+
}
252+
}
253+
}
254+
}
255+
}
256+
}
257+
LOGGER.info(
258+
"Found all version swap messages in: {} with generation id: {} for partitions: {}",
259+
newVersionTopic,
260+
generationId,
261+
partitions);
262+
}
263+
// We cannot change the subscription here because the consumer might not finish polling all the messages in the
264+
// old version topic yet. We can acquire the lock and update the VersionSwapMessageState.
265+
subscriptionLock.writeLock().lock();
266+
lockAcquired = true;
267+
versionSwapMessageState.setNewTopicCheckpoints(checkpoints);
268+
} finally {
269+
if (lockAcquired) {
270+
subscriptionLock.writeLock().unlock();
271+
}
272+
}
273+
return null;
274+
}, seekExecutorService);
275+
}
276+
181277
protected CompletableFuture<Void> internalSeekToEndOfPush(
182278
Set<Integer> partitions,
183279
PubSubTopic targetTopic,

0 commit comments

Comments
 (0)