Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
*/
private PubSubConsumerAdapterFactory<? extends PubSubConsumerAdapter> pubSubConsumerAdapterFactory;
private PubSubContext pubSubContext;
private boolean versionSwapByControlMessageEnabled = false;
/**
* Client region name used for filtering version swap messages from other regions in A/A setup. The client will only
* react to version swap messages with the same source region as the client region name.
*/
private String clientRegionName = "";
/**
* Total region count used for version swap in A/A setup. Each subscribed partition need to receive this many
* corresponding version swap messages before it can safely go to the new version to ensure data completeness.
*/
private int totalRegionCount = 0;
/**
* Version swap timeout in milliseconds. If the version swap is not completed within this time, the consumer will swap
* to the new version and resume normal consumption from EOP for any incomplete partitions. Default is 30 minutes.
*/
private long versionSwapTimeoutInMs = 30 * 60 * 1000;

public ChangelogClientConfig(String storeName) {
this.innerClientConfig = new ClientConfig<>(storeName);
Expand Down Expand Up @@ -327,6 +343,42 @@ public boolean isStateful() {
return this.isStateful;
}

public boolean isVersionSwapByControlMessageEnabled() {
return this.versionSwapByControlMessageEnabled;
}

public ChangelogClientConfig setVersionSwapByControlMessageEnabled(boolean isVersionSwapByControlMessageEnabled) {
this.versionSwapByControlMessageEnabled = isVersionSwapByControlMessageEnabled;
return this;
}

public String getClientRegionName() {
return this.clientRegionName;
}

public ChangelogClientConfig setClientRegionName(String clientRegionName) {
this.clientRegionName = clientRegionName;
return this;
}

public int getTotalRegionCount() {
return this.totalRegionCount;
}

public ChangelogClientConfig setTotalRegionCount(int totalRegionCount) {
this.totalRegionCount = totalRegionCount;
return this;
}

public long getVersionSwapTimeoutInMs() {
return this.versionSwapTimeoutInMs;
}

public ChangelogClientConfig setVersionSwapTimeoutInMs(long versionSwapTimeoutInMs) {
this.versionSwapTimeoutInMs = versionSwapTimeoutInMs;
return this;
}

public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(ChangelogClientConfig<V> config) {
ChangelogClientConfig<V> newConfig = new ChangelogClientConfig<V>().setStoreName(config.getStoreName())
.setLocalD2ZkHosts(config.getLocalD2ZkHosts())
Expand Down Expand Up @@ -354,7 +406,11 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
// Store version should not be cloned
.setStoreVersion(null)
// Is stateful config should not be cloned
.setIsStateful(false);
.setIsStateful(false)
.setVersionSwapByControlMessageEnabled(config.isVersionSwapByControlMessageEnabled())
.setClientRegionName(config.getClientRegionName())
.setTotalRegionCount(config.getTotalRegionCount())
.setVersionSwapTimeoutInMs(config.getVersionSwapTimeoutInMs());
return newConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.VersionSwap;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
Expand All @@ -12,6 +13,7 @@
import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.lazy.Lazy;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -73,7 +75,13 @@ protected VeniceAfterImageConsumerImpl(
storeRepository,
storeName,
changelogClientConfig.getConsumerName(),
this.changeCaptureStats);
this.changeCaptureStats,
changelogClientConfig.isVersionSwapByControlMessageEnabled());
}

// Intended for unit test only.
void setTime(Time time) {
this.time = time;
}

@Override
Expand Down Expand Up @@ -178,6 +186,115 @@ protected static void adjustSeekCheckPointsBasedOnHeartbeats(
}
}

/**
* Similar to {@link #internalSeekToEndOfPush} exception in addition to finding the EOP of each partition we will also
* be looking for the first relevant version swap. This can also be optimized later for a faster find.
*/
@Override
protected CompletableFuture<Void> internalFindNewVersionCheckpoints(
String oldVersionTopic,
String newVersionTopic,
long generationId,
Set<Integer> partitions) {
if (partitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.supplyAsync(() -> {
boolean lockAcquired = false;
Map<Integer, VeniceChangeCoordinate> checkpoints = new HashMap<>();
Map<Integer, VeniceChangeCoordinate> eopCheckpoints = new HashMap<>();
try {
synchronized (internalSeekConsumer) {
PubSubConsumerAdapter consumerAdapter = internalSeekConsumer.get();
consumerAdapter.batchUnsubscribe(consumerAdapter.getAssignment());
Map<PubSubTopicPartition, List<DefaultPubSubMessage>> polledResults;
Map<Integer, Boolean> versionSwapConsumedPerPartitionMap = new HashMap<>();
for (Integer partition: partitions) {
versionSwapConsumedPerPartitionMap.put(partition, false);
}
List<PubSubTopicPartition> topicPartitionList = getPartitionListToSubscribe(
partitions,
Collections.EMPTY_SET,
pubSubTopicRepository.getTopic(newVersionTopic));

for (PubSubTopicPartition topicPartition: topicPartitionList) {
consumerAdapter.subscribe(topicPartition, PubSubSymbolicPosition.EARLIEST);
}

// Poll until we receive the desired version swap message in the new version topic for each partition
LOGGER.info(
"Polling for version swap messages in: {} with generation id: {} for partitions: {}",
newVersionTopic,
generationId,
partitions);
while (!areAllTrue(versionSwapConsumedPerPartitionMap.values())) {
polledResults = consumerAdapter.poll(5000L);
for (Map.Entry<PubSubTopicPartition, List<DefaultPubSubMessage>> entry: polledResults.entrySet()) {
PubSubTopicPartition pubSubTopicPartition = entry.getKey();
List<DefaultPubSubMessage> messageList = entry.getValue();
for (DefaultPubSubMessage message: messageList) {
if (message.getKey().isControlMessage()) {
ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion();
ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage);
if (controlMessageType.equals(ControlMessageType.END_OF_PUSH)) {
VeniceChangeCoordinate eopCoordinate = new VeniceChangeCoordinate(
pubSubTopicPartition.getPubSubTopic().getName(),
message.getPosition(),
pubSubTopicPartition.getPartitionNumber());
eopCheckpoints.put(pubSubTopicPartition.getPartitionNumber(), eopCoordinate);
LOGGER.info(
"Found EOP for version swap message with generation id: {} for partition: {}",
generationId,
pubSubTopicPartition.getPartitionNumber());
// We continue to poll until we find the corresponding version swap which should be after EOP
} else if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) {
VersionSwap versionSwap = (VersionSwap) controlMessage.getControlMessageUnion();
// In theory just matching the generation id and source region should be sufficient but just to be
// safe we will match all fields
if (versionSwap.getGenerationId() == generationId
&& versionSwap.getSourceRegion().toString().equals(clientRegionName)
&& oldVersionTopic.equals(versionSwap.getOldServingVersionTopic().toString())
&& newVersionTopic.equals(versionSwap.getNewServingVersionTopic().toString())) {
versionSwapConsumedPerPartitionMap.put(pubSubTopicPartition.getPartitionNumber(), true);
VeniceChangeCoordinate coordinate = new VeniceChangeCoordinate(
pubSubTopicPartition.getPubSubTopic().getName(),
message.getPosition(),
pubSubTopicPartition.getPartitionNumber());
checkpoints.put(pubSubTopicPartition.getPartitionNumber(), coordinate);
// We are done with this partition
consumerAdapter.unSubscribe(pubSubTopicPartition);
LOGGER.info(
"Found corresponding version swap message with generation id: {} for partition: {}",
generationId,
pubSubTopicPartition.getPartitionNumber());
break;
}
}
}
}
}
}
LOGGER.info(
"Found all version swap messages in: {} with generation id: {} for partitions: {}",
newVersionTopic,
generationId,
partitions);
}
// We cannot change the subscription here because the consumer might not finish polling all the messages in the
// old version topic yet. We can acquire the lock and update the VersionSwapMessageState.
subscriptionLock.writeLock().lock();
lockAcquired = true;
versionSwapMessageState.setNewTopicVersionSwapCheckpoints(checkpoints);
versionSwapMessageState.setNewTopicEOPCheckpoints(eopCheckpoints);
} finally {
if (lockAcquired) {
subscriptionLock.writeLock().unlock();
}
}
return null;
}, seekExecutorService);
}

protected CompletableFuture<Void> internalSeekToEndOfPush(
Set<Integer> partitions,
PubSubTopic targetTopic,
Expand Down Expand Up @@ -279,6 +396,15 @@ protected CompletableFuture<Void> internalSeekToEndOfPush(
}, seekExecutorService);
}

private boolean areAllTrue(Collection<Boolean> booleanCollections) {
for (Boolean b: booleanCollections) {
if (!b) {
return false;
}
}
return true;
}

@Override
public CompletableFuture<Void> seekToEndOfPush(Set<Integer> partitions) {
if (partitions.isEmpty()) {
Expand Down
Loading
Loading