diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java index 76b90dcdc5a..cf52de6933f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java @@ -81,6 +81,22 @@ public class ChangelogClientConfig { */ private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory; private PubSubContext pubSubContext; + private boolean versionSwapByControlMessageEnabled = false; + /** + * Client region name used for filtering version swap messages from other regions in A/A setup. The client will only + * react to version swap messages with the same source region as the client region name. + */ + private String clientRegionName = ""; + /** + * Total region count used for version swap in A/A setup. Each subscribed partition need to receive this many + * corresponding version swap messages before it can safely go to the new version to ensure data completeness. + */ + private int totalRegionCount = 0; + /** + * Version swap timeout in milliseconds. If the version swap is not completed within this time, the consumer will swap + * to the new version and resume normal consumption from EOP for any incomplete partitions. Default is 30 minutes. + */ + private long versionSwapTimeoutInMs = 30 * 60 * 1000; public ChangelogClientConfig(String storeName) { this.innerClientConfig = new ClientConfig<>(storeName); @@ -327,6 +343,42 @@ public boolean isStateful() { return this.isStateful; } + public boolean isVersionSwapByControlMessageEnabled() { + return this.versionSwapByControlMessageEnabled; + } + + public ChangelogClientConfig setVersionSwapByControlMessageEnabled(boolean isVersionSwapByControlMessageEnabled) { + this.versionSwapByControlMessageEnabled = isVersionSwapByControlMessageEnabled; + return this; + } + + public String getClientRegionName() { + return this.clientRegionName; + } + + public ChangelogClientConfig setClientRegionName(String clientRegionName) { + this.clientRegionName = clientRegionName; + return this; + } + + public int getTotalRegionCount() { + return this.totalRegionCount; + } + + public ChangelogClientConfig setTotalRegionCount(int totalRegionCount) { + this.totalRegionCount = totalRegionCount; + return this; + } + + public long getVersionSwapTimeoutInMs() { + return this.versionSwapTimeoutInMs; + } + + public ChangelogClientConfig setVersionSwapTimeoutInMs(long versionSwapTimeoutInMs) { + this.versionSwapTimeoutInMs = versionSwapTimeoutInMs; + return this; + } + public static ChangelogClientConfig cloneConfig(ChangelogClientConfig config) { ChangelogClientConfig newConfig = new ChangelogClientConfig().setStoreName(config.getStoreName()) .setLocalD2ZkHosts(config.getLocalD2ZkHosts()) @@ -354,7 +406,11 @@ public static ChangelogClientConfig cloneConfig(Ch // Store version should not be cloned .setStoreVersion(null) // Is stateful config should not be cloned - .setIsStateful(false); + .setIsStateful(false) + .setVersionSwapByControlMessageEnabled(config.isVersionSwapByControlMessageEnabled()) + .setClientRegionName(config.getClientRegionName()) + .setTotalRegionCount(config.getTotalRegionCount()) + .setVersionSwapTimeoutInMs(config.getVersionSwapTimeoutInMs()); return newConfig; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java index 85c635dea34..44b49612b3a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java @@ -3,6 +3,7 @@ import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.VersionSwap; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; import com.linkedin.venice.pubsub.api.DefaultPubSubMessage; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; @@ -12,6 +13,7 @@ import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.lazy.Lazy; import java.util.Collection; import java.util.Collections; @@ -73,7 +75,13 @@ protected VeniceAfterImageConsumerImpl( storeRepository, storeName, changelogClientConfig.getConsumerName(), - this.changeCaptureStats); + this.changeCaptureStats, + changelogClientConfig.isVersionSwapByControlMessageEnabled()); + } + + // Intended for unit test only. + void setTime(Time time) { + this.time = time; } @Override @@ -178,6 +186,115 @@ protected static void adjustSeekCheckPointsBasedOnHeartbeats( } } + /** + * Similar to {@link #internalSeekToEndOfPush} exception in addition to finding the EOP of each partition we will also + * be looking for the first relevant version swap. This can also be optimized later for a faster find. + */ + @Override + protected CompletableFuture internalFindNewVersionCheckpoints( + String oldVersionTopic, + String newVersionTopic, + long generationId, + Set partitions) { + if (partitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return CompletableFuture.supplyAsync(() -> { + boolean lockAcquired = false; + Map checkpoints = new HashMap<>(); + Map eopCheckpoints = new HashMap<>(); + try { + synchronized (internalSeekConsumer) { + PubSubConsumerAdapter consumerAdapter = internalSeekConsumer.get(); + consumerAdapter.batchUnsubscribe(consumerAdapter.getAssignment()); + Map> polledResults; + Map versionSwapConsumedPerPartitionMap = new HashMap<>(); + for (Integer partition: partitions) { + versionSwapConsumedPerPartitionMap.put(partition, false); + } + List topicPartitionList = getPartitionListToSubscribe( + partitions, + Collections.EMPTY_SET, + pubSubTopicRepository.getTopic(newVersionTopic)); + + for (PubSubTopicPartition topicPartition: topicPartitionList) { + consumerAdapter.subscribe(topicPartition, PubSubSymbolicPosition.EARLIEST); + } + + // Poll until we receive the desired version swap message in the new version topic for each partition + LOGGER.info( + "Polling for version swap messages in: {} with generation id: {} for partitions: {}", + newVersionTopic, + generationId, + partitions); + while (!areAllTrue(versionSwapConsumedPerPartitionMap.values())) { + polledResults = consumerAdapter.poll(5000L); + for (Map.Entry> entry: polledResults.entrySet()) { + PubSubTopicPartition pubSubTopicPartition = entry.getKey(); + List messageList = entry.getValue(); + for (DefaultPubSubMessage message: messageList) { + if (message.getKey().isControlMessage()) { + ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion(); + ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage); + if (controlMessageType.equals(ControlMessageType.END_OF_PUSH)) { + VeniceChangeCoordinate eopCoordinate = new VeniceChangeCoordinate( + pubSubTopicPartition.getPubSubTopic().getName(), + message.getPosition(), + pubSubTopicPartition.getPartitionNumber()); + eopCheckpoints.put(pubSubTopicPartition.getPartitionNumber(), eopCoordinate); + LOGGER.info( + "Found EOP for version swap message with generation id: {} for partition: {}", + generationId, + pubSubTopicPartition.getPartitionNumber()); + // We continue to poll until we find the corresponding version swap which should be after EOP + } else if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) { + VersionSwap versionSwap = (VersionSwap) controlMessage.getControlMessageUnion(); + // In theory just matching the generation id and source region should be sufficient but just to be + // safe we will match all fields + if (versionSwap.getGenerationId() == generationId + && versionSwap.getSourceRegion().toString().equals(clientRegionName) + && oldVersionTopic.equals(versionSwap.getOldServingVersionTopic().toString()) + && newVersionTopic.equals(versionSwap.getNewServingVersionTopic().toString())) { + versionSwapConsumedPerPartitionMap.put(pubSubTopicPartition.getPartitionNumber(), true); + VeniceChangeCoordinate coordinate = new VeniceChangeCoordinate( + pubSubTopicPartition.getPubSubTopic().getName(), + message.getPosition(), + pubSubTopicPartition.getPartitionNumber()); + checkpoints.put(pubSubTopicPartition.getPartitionNumber(), coordinate); + // We are done with this partition + consumerAdapter.unSubscribe(pubSubTopicPartition); + LOGGER.info( + "Found corresponding version swap message with generation id: {} for partition: {}", + generationId, + pubSubTopicPartition.getPartitionNumber()); + break; + } + } + } + } + } + } + LOGGER.info( + "Found all version swap messages in: {} with generation id: {} for partitions: {}", + newVersionTopic, + generationId, + partitions); + } + // We cannot change the subscription here because the consumer might not finish polling all the messages in the + // old version topic yet. We can acquire the lock and update the VersionSwapMessageState. + subscriptionLock.writeLock().lock(); + lockAcquired = true; + versionSwapMessageState.setNewTopicVersionSwapCheckpoints(checkpoints); + versionSwapMessageState.setNewTopicEOPCheckpoints(eopCheckpoints); + } finally { + if (lockAcquired) { + subscriptionLock.writeLock().unlock(); + } + } + return null; + }, seekExecutorService); + } + protected CompletableFuture internalSeekToEndOfPush( Set partitions, PubSubTopic targetTopic, @@ -279,6 +396,15 @@ protected CompletableFuture internalSeekToEndOfPush( }, seekExecutorService); } + private boolean areAllTrue(Collection booleanCollections) { + for (Boolean b: booleanCollections) { + if (!b) { + return false; + } + } + return true; + } + @Override public CompletableFuture seekToEndOfPush(Set partitions) { if (partitions.isEmpty()) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index 195694d5c0f..6350fb0e46f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -71,6 +71,8 @@ import com.linkedin.venice.store.rocksdb.RocksDBUtils; import com.linkedin.venice.utils.DaemonThreadFactory; import com.linkedin.venice.utils.DictionaryUtils; +import com.linkedin.venice.utils.SystemTime; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; @@ -94,11 +96,15 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -175,6 +181,16 @@ public class VeniceChangelogConsumerImpl implements VeniceChangelogConsume protected final long consumerSequenceIdStartingValue; private final RocksDBStorageEngineFactory rocksDBStorageEngineFactory; private final VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory; + protected final boolean versionSwapByControlMessage; + protected final String clientRegionName; + protected final int totalRegionCount; + protected final long versionSwapTimeoutInMs; + protected final AtomicReference onGoingVersionSwapSignal = new AtomicReference<>(); + /** + * Interaction of this field should acquire the subscriptionLock.writeLock() + */ + protected volatile VersionSwapMessageState versionSwapMessageState = null; + protected Time time; public VeniceChangelogConsumerImpl( ChangelogClientConfig changelogClientConfig, @@ -202,6 +218,27 @@ public VeniceChangelogConsumerImpl( this.pubSubTopicRepository = pubSubContext.getPubSubTopicRepository(); this.pubSubPositionDeserializer = pubSubContext.getPubSubPositionDeserializer(); this.pubSubMessageDeserializer = pubSubMessageDeserializer; + this.versionSwapByControlMessage = changelogClientConfig.isVersionSwapByControlMessageEnabled(); + this.totalRegionCount = changelogClientConfig.getTotalRegionCount(); + this.clientRegionName = changelogClientConfig.getClientRegionName(); + this.versionSwapTimeoutInMs = changelogClientConfig.getVersionSwapTimeoutInMs(); + this.time = new SystemTime(); + this.onGoingVersionSwapSignal.set(new CountDownLatch(0)); + if (versionSwapByControlMessage) { + // Version swap related configs should all be resolved or explicitly set at this point. + if (this.clientRegionName.isEmpty()) { + throw new VeniceException( + "Failed to enable version swap by control message because client region name is missing"); + } + if (this.totalRegionCount <= 0) { + throw new VeniceException( + "Failed to enable version swap by control message because total region count is not set"); + } + LOGGER.info( + "VeniceChangelogConsumer version swap by control message is enabled. Client region name: {}, total region count: {}", + clientRegionName, + totalRegionCount); + } seekExecutorService = Executors.newFixedThreadPool(10, new DaemonThreadFactory(getClass().getSimpleName())); @@ -306,6 +343,11 @@ public VeniceChangelogConsumerImpl( LOGGER.info("Start a change log consumer client for store: {}", storeName); } + // Unit test only and read only + VersionSwapMessageState getVersionSwapMessageState() { + return this.versionSwapMessageState; + } + @Override public int getPartitionCount() { Store store = getStore(); @@ -317,7 +359,7 @@ public CompletableFuture subscribe(Set partitions) { for (int partition: partitions) { getPartitionToBootstrapState().put(partition, false); } - subscribeTime = System.currentTimeMillis(); + subscribeTime = time.getMilliseconds(); return internalSubscribe(partitions, null); } @@ -341,15 +383,40 @@ protected CompletableFuture internalSubscribe(Set partitions, Pub throw new RuntimeException(e); } - PubSubTopic topicToSubscribe; - if (topic == null) { - topicToSubscribe = getCurrentServingVersionTopic(); + if (versionSwapByControlMessage) { + boolean lockAcquiredAndNoOngoingVersionSwap = false; + for (int i = 0; i <= MAX_SUBSCRIBE_RETRIES; i++) { + // If version swap is in progress, wait for it to finish + try { + onGoingVersionSwapSignal.get().await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + subscriptionLock.writeLock().lock(); + if (versionSwapMessageState != null) { + // A new version swap is in progress, wait for it to finish again + subscriptionLock.writeLock().unlock(); + } else { + // No version swap is in progress, proceed with subscription + lockAcquiredAndNoOngoingVersionSwap = true; + break; + } + } + if (!lockAcquiredAndNoOngoingVersionSwap) { + // This should be extremely rare where the subscribe request is constantly conflicting with new version swaps + throw new VeniceException("Unable to subscribe to new partitions due to conflicting version swaps"); + } } else { - topicToSubscribe = topic; + subscriptionLock.writeLock().lock(); } - subscriptionLock.writeLock().lock(); try { + PubSubTopic topicToSubscribe; + if (topic == null) { + topicToSubscribe = getCurrentServingVersionTopic(); + } else { + topicToSubscribe = topic; + } Set topicPartitionSet = getTopicAssignment(); for (PubSubTopicPartition topicPartition: topicPartitionSet) { if (partitions.contains(topicPartition.getPartitionNumber())) { @@ -701,6 +768,10 @@ protected List getPartitionListToSubscribe( @Override public void unsubscribe(Set partitions) { + internalUnsubscribe(partitions, false); + } + + protected void internalUnsubscribe(Set partitions, boolean isForVersionSwap) { if (partitions.isEmpty()) { return; } @@ -716,6 +787,9 @@ public void unsubscribe(Set partitions) { } } pubSubConsumer.batchUnsubscribe(topicPartitionsToUnsub); + if (versionSwapMessageState != null && !isForVersionSwap) { + versionSwapMessageState.handleUnsubscribe(partitions); + } } finally { subscriptionLock.writeLock().unlock(); } @@ -749,7 +823,7 @@ protected Collection, VeniceChangeCoordinate>> i String topicSuffix, boolean includeControlMessage) { Collection, VeniceChangeCoordinate>> pubSubMessages = new ArrayList<>(); - Map> messagesMap = Collections.EMPTY_MAP; + Map> messagesMap; boolean lockAcquired = false; try { @@ -765,7 +839,99 @@ protected Collection, VeniceChangeCoordinate>> i if (!lockAcquired) { return Collections.emptyList(); } + + if (versionSwapByControlMessage && versionSwapMessageState != null) { + /* + * If version swap by control message is enabled and the consumer is undergoing version swap we need to check + * and act on two scenarios: + * 1. If all version swap messages have been received for all partitions, we need to seek to the new topic. + * 2. If we have reached the timeout for the version swap, we need to forcefully seek to the new topic using + * the EOP positions for any remaining partitions as our backup plan. See javadoc of + * VersionSwapMessageState.getVersionSwapStartTimestamp() for more details. + */ + if (versionSwapMessageState.isVersionSwapMessagesReceivedForAllPartitions()) { + if (isNewVersionCheckpointsReady(timeoutInMs)) { + synchronousSeekToCheckpoint(versionSwapMessageState.getNewTopicVersionSwapCheckpoints()); + LOGGER.info( + "Version swap completed from topic: {} to topic: {}, generation id: {}", + versionSwapMessageState.getOldVersionTopic(), + versionSwapMessageState.getNewVersionTopic(), + versionSwapMessageState.getVersionSwapGenerationId()); + changeCaptureStats.emitVersionSwapCountMetrics(SUCCESS); + versionSwapMessageState = null; + onGoingVersionSwapSignal.get().countDown(); + } else { + return Collections.emptyList(); + } + } else if (time.getMilliseconds() + - versionSwapMessageState.getVersionSwapStartTimestamp() > versionSwapTimeoutInMs) { + if (!getTopicAssignment().isEmpty()) { + internalUnsubscribe(versionSwapMessageState.getIncompletePartitions(), true); + } + if (isNewVersionCheckpointsReady(timeoutInMs)) { + synchronousSeekToCheckpoint(versionSwapMessageState.getNewTopicCheckpointsWithEOPAsBackup()); + LOGGER.info( + "Version swap completed after timeout from topic: {} to topic: {}, generation id: {}. Partitions: {} are seeked to EOP positions.", + versionSwapMessageState.getOldVersionTopic(), + versionSwapMessageState.getNewVersionTopic(), + versionSwapMessageState.getVersionSwapGenerationId(), + versionSwapMessageState.getIncompletePartitions()); + changeCaptureStats.emitVersionSwapCountMetrics(SUCCESS); + versionSwapMessageState = null; + onGoingVersionSwapSignal.get().countDown(); + } else { + LOGGER.warn( + "Version swap from topic: {} to topic: {}, generation id: {} already timed out but still unable to find new topic checkpoints to go to.", + versionSwapMessageState.getOldVersionTopic(), + versionSwapMessageState.getNewVersionTopic(), + versionSwapMessageState.getVersionSwapGenerationId()); + return Collections.emptyList(); + } + } + } + messagesMap = pubSubConsumer.poll(timeoutInMs); + for (Map.Entry> entry: messagesMap.entrySet()) { + PubSubTopicPartition pubSubTopicPartition = entry.getKey(); + List messageList = entry.getValue(); + for (DefaultPubSubMessage message: messageList) { + maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition); + if (message.getKey().isControlMessage()) { + ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion(); + if (handleControlMessage( + controlMessage, + pubSubTopicPartition, + topicSuffix, + message.getKey().getKey(), + message.getValue().getProducerMetadata().getMessageTimestamp(), + message.getPosition())) { + break; + } + if (includeControlMessage) { + pubSubMessages.add( + new ImmutableChangeCapturePubSubMessage<>( + null, + null, + message.getTopicPartition(), + message.getPosition(), + 0, + 0, + false, + getNextConsumerSequenceId(message.getPartition()))); + } + + } else { + Optional, VeniceChangeCoordinate>> pubSubMessage; + if (versionSwapByControlMessage) { + pubSubMessage = + convertPubSubMessageToPubSubChangeEventWithVersionSwapState(message, pubSubTopicPartition); + } else { + pubSubMessage = convertPubSubMessageToPubSubChangeEventMessage(message, pubSubTopicPartition); + } + pubSubMessage.ifPresent(pubSubMessages::add); + } + } + } } catch (InterruptedException exception) { LOGGER.info("Thread was interrupted", exception); // Restore the interrupt status @@ -775,41 +941,7 @@ protected Collection, VeniceChangeCoordinate>> i subscriptionLock.writeLock().unlock(); } } - for (Map.Entry> entry: messagesMap.entrySet()) { - PubSubTopicPartition pubSubTopicPartition = entry.getKey(); - List messageList = entry.getValue(); - for (DefaultPubSubMessage message: messageList) { - maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition); - if (message.getKey().isControlMessage()) { - ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion(); - if (handleControlMessage( - controlMessage, - pubSubTopicPartition, - topicSuffix, - message.getKey().getKey(), - message.getValue().getProducerMetadata().getMessageTimestamp())) { - break; - } - if (includeControlMessage) { - pubSubMessages.add( - new ImmutableChangeCapturePubSubMessage<>( - null, - null, - message.getTopicPartition(), - message.getPosition(), - 0, - 0, - false, - getNextConsumerSequenceId(message.getPartition()))); - } - } else { - Optional, VeniceChangeCoordinate>> pubSubMessage = - convertPubSubMessageToPubSubChangeEventMessage(message, pubSubTopicPartition); - pubSubMessage.ifPresent(pubSubMessages::add); - } - } - } int messagesPolled = pubSubMessages.size(); if (changelogClientConfig.shouldCompactMessages()) { @@ -843,6 +975,34 @@ protected Collection, VeniceChangeCoordinate>> i } } + private boolean isNewVersionCheckpointsReady(long timeoutInMs) throws InterruptedException { + if (versionSwapMessageState == null) { + return false; + } + try { + versionSwapMessageState.getFindNewTopicCheckpointFuture().get(timeoutInMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException timeoutException) { + // Still waiting for internalFindNewVersionCheckpoints to complete. + return false; + } catch (ExecutionException e) { + // Re-attempt the seek but should report the error. + LOGGER.warn( + "Caught an exception when looking for corresponding checkpoints with generation id: {} in new topic: {}. Retrying.", + versionSwapMessageState.getVersionSwapGenerationId(), + versionSwapMessageState.getNewVersionTopic(), + e); + changeCaptureStats.emitVersionSwapCountMetrics(FAIL); + versionSwapMessageState.setFindNewTopicCheckpointFuture( + internalFindNewVersionCheckpoints( + versionSwapMessageState.getOldVersionTopic(), + versionSwapMessageState.getNewVersionTopic(), + versionSwapMessageState.getVersionSwapGenerationId(), + versionSwapMessageState.getAssignedPartitions())); + return false; + } + return true; + } + void maybeUpdatePartitionToBootstrapMap(DefaultPubSubMessage message, PubSubTopicPartition pubSubTopicPartition) { if (getSubscribeTime() - message.getValue().producerMetadata.messageTimestamp <= TimeUnit.MINUTES.toMillis(1)) { getPartitionToBootstrapState().put(pubSubTopicPartition.getPartitionNumber(), true); @@ -867,7 +1027,8 @@ protected boolean handleControlMessage( PubSubTopicPartition pubSubTopicPartition, String topicSuffix, byte[] key, - long timestamp) { + long timestamp, + PubSubPosition position) { ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage); // TODO: Find a better way to avoid data gap between version topic and change capture topic due to log compaction. if (controlMessageType.equals(ControlMessageType.END_OF_PUSH)) { @@ -895,6 +1056,13 @@ protected boolean handleControlMessage( topicSuffix, pubSubTopicPartition.getPartitionNumber()); } + + // New version swap behavior where we only process VERSION_SWAP messages from VT + if (versionSwapByControlMessage && controlMessageType.equals(ControlMessageType.VERSION_SWAP) + && Version.isVersionTopic(pubSubTopicPartition.getTopicName())) { + return handleVersionSwapMessageInVT(controlMessage, pubSubTopicPartition, position); + } + if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue() && Arrays.equals(key, KafkaKey.HEART_BEAT.getKey())) { currentVersionLastHeartbeat.put(pubSubTopicPartition.getPartitionNumber(), timestamp); @@ -916,6 +1084,154 @@ protected T processRecordBytes( return deserializedValue; } + /** + * Similar to convertPubSubMessageToPubSubChangeEventMessage but without all the RMD extraction. We are moving away + * from storing offset and using offset comparison to support a wider range of pub sub systems. Code duplication here + * should be temporary and only for ease of rollout and rollback reasons. This method should only be called when + * versionSwapByControlMessage is set to true. + * + * Since we are relying on version swap messages to coordinate lossless version swap, it's crucial to prevent users + * from seeking in between a sequence of related version swap messages with a partition. During a version swap we will + * also use a low watermark approach for the {@link VeniceChangeCoordinate} returned. However, the changelog consumer + * is still vulnerable to this edge case when seekToTimestamp and seekToTail is used. These edge cases will be rare + * and will be handled by the version swap timeout and backup strategy of seek to new version's EOP. + */ + protected Optional, VeniceChangeCoordinate>> convertPubSubMessageToPubSubChangeEventWithVersionSwapState( + DefaultPubSubMessage message, + PubSubTopicPartition pubSubTopicPartition) { + Optional, VeniceChangeCoordinate>> pubSubChangeEventMessage = Optional.empty(); + byte[] keyBytes = message.getKey().getKey(); + MessageType messageType = MessageType.valueOf(message.getValue()); + Object assembledObject = null; + PubSubPosition returnedMessagePosition = message.getPosition(); + if (versionSwapMessageState != null) { + PubSubPosition versionSwapLowWatermark = versionSwapMessageState + .getVersionSwapLowWatermarkPosition(pubSubTopicPartition.getTopicName(), message.getPartition()); + if (versionSwapLowWatermark != null) { + returnedMessagePosition = versionSwapLowWatermark; + } + } + if (messageType.equals(MessageType.DELETE)) { + // Deletes have a previous and current value of null. So just fill it in! + ChangeEvent changeEvent = new ChangeEvent<>(null, null); + pubSubChangeEventMessage = Optional.of( + new ImmutableChangeCapturePubSubMessage<>( + keyDeserializer.deserialize(keyBytes), + changeEvent, + pubSubTopicPartition, + returnedMessagePosition, + message.getPubSubMessageTime(), + message.getPayloadSize(), + false, + getNextConsumerSequenceId(message.getPartition()))); + + partitionToDeleteMessageCount.computeIfAbsent(message.getPartition(), x -> new AtomicLong(0)).incrementAndGet(); + } else if (messageType.equals(MessageType.PUT)) { + Put put = (Put) message.getValue().payloadUnion; + // Select appropriate reader schema and compressors + RecordDeserializer deserializer = null; + int readerSchemaId; + VeniceCompressor compressor = getVersionCompressor(pubSubTopicPartition.getPubSubTopic()); + if (pubSubTopicPartition.getPubSubTopic().isViewTopic() && changelogClientConfig.isBeforeImageView()) { + deserializer = recordChangeDeserializer; + readerSchemaId = this.schemaReader.getLatestValueSchemaId(); + } else { + // Use writer schema as the reader schema + readerSchemaId = put.schemaId; + } + + ByteBufferValueRecord assembledRecord; + try { + assembledRecord = chunkAssembler.bufferAndAssembleRecord( + pubSubTopicPartition, + put.getSchemaId(), + keyBytes, + put.getPutValue(), + message.getPosition(), + compressor); + + if (changeCaptureStats != null && ChunkAssembler.isChunkedRecord(put.getSchemaId())) { + changeCaptureStats.emitChunkedRecordCountMetrics(SUCCESS); + } + + if (assembledRecord == null) { + // bufferAndAssembleRecord may have only buffered records and not returned anything yet because + // it's waiting for more input. In this case, just return an empty optional for now. + return Optional.empty(); + } + } catch (Exception exception) { + if (changeCaptureStats != null && ChunkAssembler.isChunkedRecord(put.getSchemaId())) { + changeCaptureStats.emitChunkedRecordCountMetrics(FAIL); + } + + LOGGER.error( + "Encountered an exception when processing a record in ChunkAssembler for replica: {}", + Utils.getReplicaId(pubSubTopicPartition), + exception); + throw exception; + } + + if (readerSchemaId < 0) { + // This was a chunk manifest and the actual writer schema needs to be retrieved + readerSchemaId = assembledRecord.writerSchemaId(); + } + if (deserializer == null) { + // This is not before image view consumer, and we need to set the proper deserializer + try { + deserializer = storeDeserializerCache.getDeserializer(readerSchemaId, readerSchemaId); + } catch (InvalidVeniceSchemaException invalidSchemaException) { + // It's possible that a new schema was just added and our async metadata is outdated + LOGGER.info("{}. Refreshing the local metadata cache to try again", invalidSchemaException.getMessage()); + storeRepository.refreshOneStore(storeName); + deserializer = storeDeserializerCache.getDeserializer(readerSchemaId, readerSchemaId); + } + } + try { + assembledObject = deserializer.deserialize(compressor.decompress(assembledRecord.value())); + } catch (IOException e) { + throw new VeniceException( + "Failed to deserialize or decompress record consumed from topic: " + + pubSubTopicPartition.getPubSubTopic().getName(), + e); + } + try { + assembledObject = processRecordBytes( + compressor.decompress(put.getPutValue()), + assembledObject, + keyBytes, + put.getPutValue(), + pubSubTopicPartition, + readerSchemaId, + message.getPosition()); + } catch (Exception ex) { + throw new VeniceException(ex); + } + + // Now that we've assembled the object, we need to extract the replication vector depending on if it's from VT + // or from the record change event. Records from VT 'typically' don't have an offset vector, but they will in + // repush scenarios (which we want to be opaque to the user and filter accordingly). + int payloadSize = message.getPayloadSize(); + if (assembledObject instanceof RecordChangeEvent) { + throw new UnsupportedOperationException("Venice no longer supports before image view"); + } else { + ChangeEvent changeEvent = new ChangeEvent<>(null, (V) assembledObject); + pubSubChangeEventMessage = Optional.of( + new ImmutableChangeCapturePubSubMessage<>( + keyDeserializer.deserialize(keyBytes), + changeEvent, + pubSubTopicPartition, + returnedMessagePosition, + message.getPubSubMessageTime(), + payloadSize, + false, + getNextConsumerSequenceId(message.getPartition()))); + } + partitionToPutMessageCount.computeIfAbsent(message.getPartition(), x -> new AtomicLong(0)).incrementAndGet(); + } + + return pubSubChangeEventMessage; + } + protected Optional, VeniceChangeCoordinate>> convertPubSubMessageToPubSubChangeEventMessage( DefaultPubSubMessage message, PubSubTopicPartition pubSubTopicPartition) { @@ -1137,6 +1453,48 @@ protected List extractOffsetVectorFromMessage( return new ArrayList<>(); } + protected boolean handleVersionSwapMessageInVT( + ControlMessage controlMessage, + PubSubTopicPartition pubSubTopicPartition, + PubSubPosition position) { + VersionSwap versionSwap = (VersionSwap) controlMessage.getControlMessageUnion(); + if (VersionSwapMessageState + .isVersionSwapRelevant(pubSubTopicPartition.getTopicName(), clientRegionName, versionSwap)) { + if (versionSwapMessageState == null) { + Set currentAssignment = getTopicAssignment(); + versionSwapMessageState = + new VersionSwapMessageState(versionSwap, totalRegionCount, currentAssignment, time.getMilliseconds()); + onGoingVersionSwapSignal.set(new CountDownLatch(1)); + LOGGER.info( + "New version detected for store: {} through version swap messages. Performing version swap from topic: {} to topic: {}, generation id: {}", + storeName, + versionSwapMessageState.getOldVersionTopic(), + versionSwapMessageState.getNewVersionTopic(), + versionSwapMessageState.getVersionSwapGenerationId()); + versionSwapMessageState.setFindNewTopicCheckpointFuture( + internalFindNewVersionCheckpoints( + versionSwapMessageState.getOldVersionTopic(), + versionSwapMessageState.getNewVersionTopic(), + versionSwapMessageState.getVersionSwapGenerationId(), + versionSwapMessageState.getAssignedPartitions())); + } + if (versionSwapMessageState.handleVersionSwap(versionSwap, pubSubTopicPartition, position)) { + // Stop consuming from the old topic for this partition since we have consumed all the version swap messages. + internalUnsubscribe(Collections.singleton(pubSubTopicPartition.getPartitionNumber()), true); + return true; + } + } + return false; + } + + protected CompletableFuture internalFindNewVersionCheckpoints( + String oldVersionTopic, + String newVersionTopic, + long generationId, + Set partitions) { + throw new UnsupportedOperationException("internalSeekToNewVersion not supported by VeniceChangelogConsumerImpl"); + } + protected boolean handleVersionSwapControlMessage( ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, @@ -1292,7 +1650,12 @@ protected boolean switchToNewTopic(PubSubTopic newTopic, String topicSuffix, Int } unsubscribe(partitions); try { - internalSubscribe(partitions, mergedTopicName).get(); + Set beginningOfNewTopic = new HashSet<>(partitions.size()); + for (Integer p: partitions) { + beginningOfNewTopic + .add(new VeniceChangeCoordinate(mergedTopicName.getName(), PubSubSymbolicPosition.EARLIEST, p)); + } + synchronousSeekToCheckpoint(beginningOfNewTopic); } catch (Exception e) { throw new VeniceException("Subscribe to new topic:" + mergedTopicName + " is not successful, error: " + e); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java index a838e268fb0..45ff4590664 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java @@ -29,22 +29,29 @@ class VersionSwapDataChangeListener implements StoreDataChangedListener { private final String consumerName; private final BasicConsumerStats changeCaptureStats; protected final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private final boolean versionSwapByControlMessage; VersionSwapDataChangeListener( VeniceAfterImageConsumerImpl consumer, NativeMetadataRepositoryViewAdapter storeRepository, String storeName, String consumerName, - BasicConsumerStats changeCaptureStats) { + BasicConsumerStats changeCaptureStats, + boolean versionSwapByControlMessage) { this.consumer = consumer; this.storeRepository = storeRepository; this.storeName = storeName; this.consumerName = consumerName; this.changeCaptureStats = changeCaptureStats; + this.versionSwapByControlMessage = versionSwapByControlMessage; } @Override public void handleStoreChanged(Store store) { + if (versionSwapByControlMessage) { + // No op, changelog consumer is configured to perform version swap by version swap messages. + return; + } synchronized (this) { for (int attempt = 1; attempt <= MAX_VERSION_SWAP_RETRIES; attempt++) { // store may be null as this is called by other repair tasks diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapMessageState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapMessageState.java new file mode 100644 index 00000000000..184af51b638 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapMessageState.java @@ -0,0 +1,245 @@ +package com.linkedin.davinci.consumer; + +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.kafka.protocol.VersionSwap; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.api.PubSubPosition; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + + +/** + * A class initialized to indicate that the changelog consumer is undergoing version swap. The class is also keeping + * various states about this version swap. This class is thread safe by having all methods that change or read a state + * that's mutable synchronized. However, it's important to keep in mind that race can still occur if the caller is + * trying to perform a sequence of events that depend on each other. In those scenarios an external lock is required. + * E.g. (1) getFindNewTopicCheckpointFuture() and if the future is complete call (2) getNewTopicVersionSwapCheckpoints() + * By the time (2) is called it's possible that the result from (1) is no longer valid. A different thread slipped in + * between and changed the state. + */ +public class VersionSwapMessageState { + private final String oldVersionTopic; + private final String newVersionTopic; + private final long versionSwapGenerationId; + private final int totalRegionCount; + private final Map> receivedVersionSwapPartitionToRegionsMap; + private final Map partitionToVersionSwapLowWatermarkPositionMap; + private final Set assignedPartitions; + private final Set completedPartitions; + private final long versionSwapStartTimestamp; + private CompletableFuture findNewTopicCheckpointFuture; + private Map newTopicVersionSwapCheckpoints = new HashMap<>(); + private Map newTopicEOPCheckpoints = new HashMap<>(); + + public VersionSwapMessageState( + VersionSwap versionSwap, + int totalRegionCount, + Set currentAssignment, + long versionSwapStartTimestamp) { + this.oldVersionTopic = versionSwap.getOldServingVersionTopic().toString(); + this.newVersionTopic = versionSwap.getNewServingVersionTopic().toString(); + this.versionSwapGenerationId = versionSwap.getGenerationId(); + this.totalRegionCount = totalRegionCount; + this.receivedVersionSwapPartitionToRegionsMap = new HashMap<>(); + this.assignedPartitions = new HashSet<>(); + for (PubSubTopicPartition pubSubTopicPartition: currentAssignment) { + if (oldVersionTopic.equals(pubSubTopicPartition.getTopicName())) { + receivedVersionSwapPartitionToRegionsMap.put(pubSubTopicPartition.getPartitionNumber(), new HashSet<>()); + assignedPartitions.add(pubSubTopicPartition.getPartitionNumber()); + } + } + this.partitionToVersionSwapLowWatermarkPositionMap = new HashMap<>(); + this.completedPartitions = new HashSet<>(); + this.versionSwapStartTimestamp = versionSwapStartTimestamp; + } + + /** + * Get the pub sub position of the first relevant version swap message for the given partition. Null will be returned + * if the partition have not consumed its version swap yet. This is acceptable because different partitions could be + * making progress towards version swap at different pace. e.g. partition 0 consumed its version swap message already + * but partition 1 could still be consuming regular messages from the old version topic before encountering any + * version swap messages. + * @param topic where the version swap message originated from + * @param partitionId of the version topic + * @return the pub sub position or null + */ + public synchronized PubSubPosition getVersionSwapLowWatermarkPosition(String topic, int partitionId) { + if (oldVersionTopic.equals(topic)) { + return partitionToVersionSwapLowWatermarkPositionMap.get(partitionId); + } else { + return null; + } + } + + public String getOldVersionTopic() { + return oldVersionTopic; + } + + public String getNewVersionTopic() { + return newVersionTopic; + } + + public long getVersionSwapGenerationId() { + return versionSwapGenerationId; + } + + public synchronized void setFindNewTopicCheckpointFuture(CompletableFuture findNewTopicCheckpointFuture) { + this.findNewTopicCheckpointFuture = findNewTopicCheckpointFuture; + this.newTopicEOPCheckpoints.clear(); + this.newTopicVersionSwapCheckpoints.clear(); + } + + public synchronized CompletableFuture getFindNewTopicCheckpointFuture() { + return findNewTopicCheckpointFuture; + } + + public synchronized void setNewTopicVersionSwapCheckpoints( + Map newTopicVersionSwapCheckpoints) { + this.newTopicVersionSwapCheckpoints = newTopicVersionSwapCheckpoints; + } + + public synchronized void setNewTopicEOPCheckpoints(Map newTopicEOPCheckpoints) { + this.newTopicEOPCheckpoints = newTopicEOPCheckpoints; + } + + public synchronized Set getNewTopicVersionSwapCheckpoints() { + // Defensive coding + if (findNewTopicCheckpointFuture == null || !findNewTopicCheckpointFuture.isDone()) { + throw new VeniceException("New topic checkpoints are not available yet"); + } + Set checkpoints = new HashSet<>(); + for (Map.Entry entry: newTopicVersionSwapCheckpoints.entrySet()) { + if (completedPartitions.contains(entry.getKey())) { + checkpoints.add(entry.getValue()); + } + } + return checkpoints; + } + + /** + * Intended to be used as a backup strategy if any partition still did not complete version swap within the timeout. + * Remaining partitions will be resumed from EOP instead of first relevant version swap message in the new topic. + */ + public synchronized Set getNewTopicCheckpointsWithEOPAsBackup() { + Set checkpoints = getNewTopicVersionSwapCheckpoints(); + for (Integer partition: getIncompletePartitions()) { + if (newTopicEOPCheckpoints.containsKey(partition)) { + checkpoints.add(newTopicEOPCheckpoints.get(partition)); + } else { + throw new VeniceException( + String.format("EOP VeniceChangeCoordinate is missing unexpectedly for partition: %s", partition)); + } + } + return checkpoints; + } + + public synchronized Set getAssignedPartitions() { + return Collections.unmodifiableSet(assignedPartitions); + } + + public synchronized Set getIncompletePartitions() { + Set incompletePartitions = new HashSet<>(assignedPartitions); + incompletePartitions.removeAll(completedPartitions); + return incompletePartitions; + } + + /** + * If we have reached the timeout for the version swap, we need to forcefully seek to the new topic using the EOP + * positions for any remaining partitions as our backup plan which should cover a variety of edge cases (e.g. consumer + * is not polling fast enough, consumer starting position was in between version swaps, a region is down, etc...) In + * all these edge cases it's better to go to the new topic and consume a lot of duplicate messages than staying on the + * old topic which will eventually be deleted. + * @return the timestamp when the version swap was started. + */ + public long getVersionSwapStartTimestamp() { + return versionSwapStartTimestamp; + } + + /** + * @return true if all partitions have received all the version swap messages required for this version swap event. + * This means we can subscribe to the new version topic and resume normal consumption from the first relevant version + * swap message. + */ + public synchronized boolean isVersionSwapMessagesReceivedForAllPartitions() { + return completedPartitions.size() == receivedVersionSwapPartitionToRegionsMap.size(); + } + + /** + * Handle the version swap message and check if all version swap messages related to this version swap event have + * been received for the given partition. + * @param versionSwap message that we just received. + * @param pubSubTopicPartition where we received the version swap message. + * @param position of the version swap message. + * @return true if all version swap messages related to this version swap event have been received. + */ + public synchronized boolean handleVersionSwap( + VersionSwap versionSwap, + PubSubTopicPartition pubSubTopicPartition, + PubSubPosition position) { + int partitionId = pubSubTopicPartition.getPartitionNumber(); + if (!oldVersionTopic.equals(pubSubTopicPartition.getTopicName())) { + // In theory this can only happen if should user unsubscribed and resubscribed some partitions during a version + // swap or there is a bug in the code. Either way this is not expected/supported. + throw new VeniceException( + String.format( + "Invalid state, received version swap message from %s partition: %s to %s during an ongoing version swap from %s to %s", + pubSubTopicPartition.getTopicName(), + partitionId, + versionSwap.getNewServingVersionTopic(), + oldVersionTopic, + newVersionTopic)); + } + if (versionSwapGenerationId == versionSwap.getGenerationId() + && versionSwap.getNewServingVersionTopic().toString().equals(newVersionTopic)) { + receivedVersionSwapPartitionToRegionsMap.computeIfPresent(partitionId, (p, receivedRegions) -> { + receivedRegions.add(versionSwap.getDestinationRegion().toString()); + if (receivedRegions.size() >= totalRegionCount) { + completedPartitions.add(p); + } + return receivedRegions; + }); + partitionToVersionSwapLowWatermarkPositionMap.computeIfAbsent(partitionId, (p) -> position); + return completedPartitions.contains(partitionId); + } + return false; + } + + /** + * Remove unsubscribed partitions from the ongoing version swap states. + * @param partitions to unsubscribe + */ + public synchronized void handleUnsubscribe(Set partitions) { + for (Integer partition: partitions) { + receivedVersionSwapPartitionToRegionsMap.remove(partition); + partitionToVersionSwapLowWatermarkPositionMap.remove(partition); + assignedPartitions.remove(partition); + completedPartitions.remove(partition); + } + } + + /** + * Determines if the version swap message is relevant or not based on the version topic that it was consumed from and + * the version swap message content. + * @param currentVersionTopic where the version swap message was consumed from. + * @param clientRegion of the consumer. + * @param versionSwap message that was consumed. + * @return if the version swap message is relevant and should be processed. + */ + public static boolean isVersionSwapRelevant( + String currentVersionTopic, + String clientRegion, + VersionSwap versionSwap) { + if (versionSwap.generationId == -1) { + // Old version swap message that should be ignored + return false; + } + return currentVersionTopic.equals(versionSwap.getOldServingVersionTopic().toString()) + && clientRegion.equals(versionSwap.getSourceRegion().toString()) + && Version.isVersionTopic(versionSwap.getNewServingVersionTopic().toString()); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index 800a5c81572..0b9d5bae113 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -264,7 +264,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup // Verify version swap happened. PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(newChangeCaptureTopic, 0); - verify(mockPubSubConsumer).subscribe(pubSubTopicPartition, PubSubSymbolicPosition.EARLIEST); + verify(mockPubSubConsumer).subscribe(pubSubTopicPartition, PubSubSymbolicPosition.EARLIEST, true); pubSubMessages = (List, VeniceChangeCoordinate>>) veniceChangelogConsumer .poll(pollTimeoutMs); Assert.assertTrue(pubSubMessages.isEmpty()); @@ -632,7 +632,7 @@ public void testVersionSwapDataChangeListener() { BasicConsumerStats mockChangeCaptureStats = mock(BasicConsumerStats.class); VersionSwapDataChangeListener changeListener = - new VersionSwapDataChangeListener(mockConsumer, mockRepository, storeName, "", mockChangeCaptureStats); + new VersionSwapDataChangeListener(mockConsumer, mockRepository, storeName, "", mockChangeCaptureStats, false); changeListener.handleStoreChanged(mockStore); verify(mockConsumer).internalSeekToEndOfPush(anySet(), any(), anyBoolean()); verify(mockChangeCaptureStats).emitVersionSwapCountMetrics(SUCCESS); @@ -674,7 +674,8 @@ public void testVersionSwapDataChangeListenerFailure() { mockRepository, storeName, "", - mockChangeCaptureStats); + mockChangeCaptureStats, + false); changeListener.handleStoreChanged(mockStore); verify(veniceChangelogConsumer).handleVersionSwapFailure(any()); @@ -1182,6 +1183,165 @@ public void testPollBeforeSubscribeCompletes() throws ExecutionException, Interr verify(veniceChangelogConsumer, atLeastOnce()).getVersionCompressor(any(PubSubTopic.class)); } + @Test + public void testVersionSwapByControlMessage() throws ExecutionException, InterruptedException, TimeoutException { + ChangelogClientConfig versionSwapConfig = getChangelogClientConfig().setVersionSwapByControlMessageEnabled(true); + // Should fail without providing client region name and total region count. + Assert.assertThrows( + VeniceException.class, + () -> new VeniceAfterImageConsumerImpl<>( + versionSwapConfig, + mockPubSubConsumer, + PubSubMessageDeserializer.createDefaultDeserializer(), + veniceChangelogConsumerClientFactory)); + + PubSubTopic newVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 2)); + PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1)); + String clientRegion = "region1"; + String remoteRegion = "region2"; + versionSwapConfig.setClientRegionName(clientRegion).setTotalRegionCount(2); + PubSubConsumerAdapter mockInternalSeekConsumer = Mockito.mock(PubSubConsumerAdapter.class); + Map> newVersionPubSubMessagesMap = new HashMap<>(); + PubSubTopicPartition newTopicPartition1 = new PubSubTopicPartitionImpl(newVersionTopic, 1); + newVersionPubSubMessagesMap.put( + newTopicPartition1, + Collections.singletonList( + constructVersionSwapMessage( + newVersionTopic, + oldVersionTopic.getName(), + newVersionTopic.getName(), + clientRegion, + clientRegion, + 2L, + 1, + Collections.emptyList()))); + when(mockInternalSeekConsumer.poll(anyLong())).thenThrow(new VeniceException("Test exception")) + .thenReturn(newVersionPubSubMessagesMap); + VeniceAfterImageConsumerImpl veniceChangeLogConsumer = new VeniceAfterImageConsumerImpl<>( + versionSwapConfig, + mockPubSubConsumer, + Lazy.of(() -> mockInternalSeekConsumer), + PubSubMessageDeserializer.createDefaultDeserializer(), + veniceChangelogConsumerClientFactory); + NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class); + Store store = mock(Store.class); + Version mockOldVersion = new VersionImpl(storeName, 1, "foo"); + Version mockNewVersion = new VersionImpl(storeName, 2, "foo"); + mockOldVersion.setPartitionCount(2); + mockNewVersion.setPartitionCount(2); + mockOldVersion.setCompressionStrategy(CompressionStrategy.NO_OP); + mockNewVersion.setCompressionStrategy(CompressionStrategy.NO_OP); + when(store.getCurrentVersion()).thenReturn(2); + when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + when(store.getPartitionCount()).thenReturn(2); + when(mockRepository.getStore(storeName)).thenReturn(store); + when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); + when(store.getVersionOrThrow(1)).thenReturn(mockOldVersion); + when(store.getVersion(1)).thenReturn(mockOldVersion); + when(store.getVersionOrThrow(2)).thenReturn(mockNewVersion); + when(store.getVersion(2)).thenReturn(mockNewVersion); + veniceChangeLogConsumer.setStoreRepository(mockRepository); + + // partition 0 has an irrelevant version swap and partition 1 has a relevant version swap + Map> pubSubMessagesMap = new HashMap<>(); + PubSubTopicPartition topicPartition0 = new PubSubTopicPartitionImpl(oldVersionTopic, 0); + pubSubMessagesMap.put( + topicPartition0, + Collections.singletonList( + constructVersionSwapMessage( + oldVersionTopic, + oldVersionTopic.getName(), + newVersionTopic.getName(), + remoteRegion, + clientRegion, + 1L, + 0, + Collections.emptyList()))); + PubSubTopicPartition topicPartition1 = new PubSubTopicPartitionImpl(oldVersionTopic, 1); + pubSubMessagesMap.put( + topicPartition1, + Collections.singletonList( + constructVersionSwapMessage( + oldVersionTopic, + oldVersionTopic.getName(), + newVersionTopic.getName(), + clientRegion, + clientRegion, + 2L, + 1, + Collections.emptyList()))); + doReturn(pubSubMessagesMap).when(mockPubSubConsumer).poll(pollTimeoutMs); + Set currentAssignment = new HashSet<>(); + currentAssignment.add(topicPartition0); + currentAssignment.add(topicPartition1); + doReturn(currentAssignment).when(mockPubSubConsumer).getAssignment(); + + veniceChangeLogConsumer.poll(pollTimeoutMs); + VersionSwapMessageState versionSwapMessageState = veniceChangeLogConsumer.getVersionSwapMessageState(); + Assert.assertNotNull(versionSwapMessageState); + Assert.assertEquals(versionSwapMessageState.getVersionSwapGenerationId(), 2L); + Assert.assertEquals(versionSwapMessageState.getOldVersionTopic(), oldVersionTopic.getName()); + Assert.assertEquals(versionSwapMessageState.getNewVersionTopic(), newVersionTopic.getName()); + Assert.assertFalse(versionSwapMessageState.isVersionSwapMessagesReceivedForAllPartitions()); + Assert.assertEquals(versionSwapMessageState.getAssignedPartitions().size(), 2); + Assert.assertEquals(versionSwapMessageState.getIncompletePartitions().size(), 2); + Assert.assertNull(versionSwapMessageState.getVersionSwapLowWatermarkPosition(oldVersionTopic.getName(), 0)); + Assert.assertNotNull(versionSwapMessageState.getVersionSwapLowWatermarkPosition(oldVersionTopic.getName(), 1)); + PubSubPosition partition1LowWatermark = + versionSwapMessageState.getVersionSwapLowWatermarkPosition(oldVersionTopic.getName(), 1); + prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 1, false, true); + Collection, VeniceChangeCoordinate>> pubSubMessages = + veniceChangeLogConsumer.poll(pollTimeoutMs); + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: pubSubMessages) { + // VeniceChangeCoordinate should be set to version swap low watermark + Assert.assertEquals(pubSubMessage.getOffset().getPosition(), partition1LowWatermark); + } + // New subscribe should be blocked + CompletableFuture subscribeDuringVersionSwapFuture = + veniceChangeLogConsumer.subscribe(Collections.singleton(0)); + Assert.assertThrows( + TimeoutException.class, + () -> subscribeDuringVersionSwapFuture.get(pollTimeoutMs, TimeUnit.MILLISECONDS)); + // We should still be able to unsubscribe + veniceChangeLogConsumer.unsubscribe(Collections.singleton(0)); + Assert.assertEquals(versionSwapMessageState.getAssignedPartitions().size(), 1); + Assert.assertEquals(versionSwapMessageState.getIncompletePartitions().size(), 1); + + // Allow version swap to complete + pubSubMessagesMap = new HashMap<>(); + pubSubMessagesMap.put( + topicPartition1, + Collections.singletonList( + constructVersionSwapMessage( + oldVersionTopic, + oldVersionTopic.getName(), + newVersionTopic.getName(), + clientRegion, + remoteRegion, + 2L, + 1, + Collections.emptyList()))); + doReturn(pubSubMessagesMap).when(mockPubSubConsumer).poll(pollTimeoutMs); + veniceChangeLogConsumer.poll(pollTimeoutMs); + Assert.assertEquals(versionSwapMessageState.getIncompletePartitions().size(), 0); + Assert.assertTrue(versionSwapMessageState.isVersionSwapMessagesReceivedForAllPartitions()); + + prepareVersionTopicRecordsToBePolled(5L, 10L, mockPubSubConsumer, newVersionTopic, 1, false, true); + // First poll should not swap since we configured the internal consumer to fail on first attempt + Assert.assertTrue(veniceChangeLogConsumer.poll(pollTimeoutMs).isEmpty()); + verify(mockPubSubConsumer, never()).subscribe(any(PubSubTopicPartition.class), any(PubSubPosition.class), eq(true)); + // Next few polls should swap + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> Assert.assertFalse(veniceChangeLogConsumer.poll(pollTimeoutMs).isEmpty())); + ArgumentCaptor pubSubTopicCaptor = ArgumentCaptor.forClass(PubSubTopicPartition.class); + verify(mockPubSubConsumer, times(1)).subscribe(pubSubTopicCaptor.capture(), any(PubSubPosition.class), eq(true)); + Assert.assertEquals(pubSubTopicCaptor.getValue().getTopicName(), newVersionTopic.getName()); + // The subscribe future during version swap should also complete now + subscribeDuringVersionSwapFuture.get(pollTimeoutMs, TimeUnit.MILLISECONDS); + } + private void prepareChangeCaptureRecordsToBePolled( long startIdx, long endIdx, @@ -1249,11 +1409,40 @@ private void prepareVersionTopicRecordsToBePolled( PubSubTopic versionTopic, int partition, boolean prepareEndOfPush) { + prepareVersionTopicRecordsToBePolled( + startIdx, + endIdx, + pubSubConsumerAdapter, + versionTopic, + partition, + prepareEndOfPush, + false); + } + + private void prepareVersionTopicRecordsToBePolled( + long startIdx, + long endIdx, + PubSubConsumerAdapter pubSubConsumerAdapter, + PubSubTopic versionTopic, + int partition, + boolean prepareEndOfPush, + boolean indexPosition) { List consumerRecordList = new ArrayList<>(); Map> consumerRecordsMap = new HashMap<>(); for (long i = startIdx; i < endIdx; i++) { - DefaultPubSubMessage pubSubMessage = - constructConsumerRecord(versionTopic, partition, "newValue" + i, "key" + i, Arrays.asList(i, i)); + PubSubPosition pubSubPosition; + if (indexPosition) { + pubSubPosition = ApacheKafkaOffsetPosition.of(i); + } else { + pubSubPosition = mockPubSubPosition; + } + DefaultPubSubMessage pubSubMessage = constructConsumerRecord( + versionTopic, + partition, + "newValue" + i, + "key" + i, + Arrays.asList(i, i), + pubSubPosition); consumerRecordList.add(pubSubMessage); } @@ -1271,11 +1460,34 @@ private DefaultPubSubMessage constructVersionSwapMessage( PubSubTopic newTopic, int partition, List localHighWatermarkPubSubPositions) { + return constructVersionSwapMessage( + versionTopic, + oldTopic.getName(), + newTopic.getName(), + "", + "", + -1, + partition, + localHighWatermarkPubSubPositions); + } + + private DefaultPubSubMessage constructVersionSwapMessage( + PubSubTopic versionTopic, + String oldTopic, + String newTopic, + String sourceRegion, + String destinationRegion, + long generationId, + int partition, + List localHighWatermarkPubSubPositions) { KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); VersionSwap versionSwapMessage = new VersionSwap(); - versionSwapMessage.oldServingVersionTopic = oldTopic.getName(); - versionSwapMessage.newServingVersionTopic = newTopic.getName(); + versionSwapMessage.oldServingVersionTopic = oldTopic; + versionSwapMessage.newServingVersionTopic = newTopic; versionSwapMessage.localHighWatermarkPubSubPositions = localHighWatermarkPubSubPositions; + versionSwapMessage.sourceRegion = sourceRegion; + versionSwapMessage.destinationRegion = destinationRegion; + versionSwapMessage.generationId = generationId; KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); ProducerMetadata producerMetadata = new ProducerMetadata(); @@ -1328,7 +1540,8 @@ private DefaultPubSubMessage constructConsumerRecord( int partition, String newValue, String key, - List replicationCheckpointVector) { + List replicationCheckpointVector, + PubSubPosition pubSubPosition) { final GenericRecord rmdRecord = new GenericData.Record(rmdSchema); rmdRecord.put(RmdConstants.TIMESTAMP_FIELD_NAME, 0L); rmdRecord.put(RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME, replicationCheckpointVector); @@ -1343,7 +1556,7 @@ private DefaultPubSubMessage constructConsumerRecord( null); KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, keySerializer.serialize(key)); PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(changeCaptureVersionTopic, partition); - return new ImmutablePubSubMessage(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0); + return new ImmutablePubSubMessage(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, pubSubPosition, 0, 0); } private DefaultPubSubMessage constructEndOfPushMessage(PubSubTopic versionTopic, int partition, Long offset) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VersionSwapMessageStateTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VersionSwapMessageStateTest.java new file mode 100644 index 00000000000..e13ed8deafb --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VersionSwapMessageStateTest.java @@ -0,0 +1,196 @@ +package com.linkedin.davinci.consumer; + +import static org.testng.Assert.*; + +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.kafka.protocol.VersionSwap; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.common.ApacheKafkaOffsetPosition; +import com.linkedin.venice.pubsub.api.PubSubPosition; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.avro.util.Utf8; +import org.testng.annotations.Test; + + +public class VersionSwapMessageStateTest { + private static final String STORE = "test-store"; + private static final String OLD_TOPIC = Version.composeKafkaTopic(STORE, 1); + private static final String NEW_TOPIC = Version.composeKafkaTopic(STORE, 2); + private static final String CLIENT_REGION = "us-foo-1"; + private static final String SRC_REGION = CLIENT_REGION; + private static final String DST_REGION_A = "us-bar-1"; + private static final String DST_REGION_B = "eu-baz-1"; + + private VersionSwap newVersionSwap(long generationId, String destRegion) { + VersionSwap vs = new VersionSwap(); + vs.oldServingVersionTopic = new Utf8(OLD_TOPIC); + vs.newServingVersionTopic = new Utf8(NEW_TOPIC); + vs.sourceRegion = new Utf8(SRC_REGION); + vs.destinationRegion = new Utf8(destRegion); + vs.generationId = generationId; + // Optional fields not used in these unit tests remain null + return vs; + } + + private PubSubTopicPartition tpi(String topic, int partition) { + PubSubTopicRepository repo = new PubSubTopicRepository(); + PubSubTopic t = repo.getTopic(topic); + return new PubSubTopicPartitionImpl(t, partition); + } + + @Test + public void testIsVersionSwapRelevant() { + VersionSwap irrelevantOldGen = newVersionSwap(-1, DST_REGION_A); + assertFalse(VersionSwapMessageState.isVersionSwapRelevant(OLD_TOPIC, CLIENT_REGION, irrelevantOldGen)); + + VersionSwap wrongOldTopic = newVersionSwap(1, DST_REGION_A); + assertFalse(VersionSwapMessageState.isVersionSwapRelevant(NEW_TOPIC, CLIENT_REGION, wrongOldTopic)); + + VersionSwap wrongClientRegion = newVersionSwap(1, DST_REGION_A); + assertFalse(VersionSwapMessageState.isVersionSwapRelevant(OLD_TOPIC, "some-other-region", wrongClientRegion)); + + VersionSwap relevant = newVersionSwap(2, DST_REGION_A); + assertTrue(VersionSwapMessageState.isVersionSwapRelevant(OLD_TOPIC, CLIENT_REGION, relevant)); + } + + @Test + public void testInitializationAndLowWatermark() { + // current assignment includes only OLD_TOPIC partitions 0 and 1 + Set currentAssignment = new HashSet<>(); + currentAssignment.add(tpi(OLD_TOPIC, 0)); + currentAssignment.add(tpi(OLD_TOPIC, 1)); + + VersionSwap vs = newVersionSwap(10, DST_REGION_A); + VersionSwapMessageState state = + new VersionSwapMessageState(vs, /* totalRegionCount */ 2, currentAssignment, /* startTs */ 123L); + + assertEquals(state.getOldVersionTopic(), OLD_TOPIC); + assertEquals(state.getNewVersionTopic(), NEW_TOPIC); + assertEquals(state.getVersionSwapGenerationId(), 10L); + assertEquals(state.getAssignedPartitions(), new HashSet<>(java.util.Arrays.asList(0, 1))); + + // low watermark unknown until a swap message is handled for that partition + assertNull(state.getVersionSwapLowWatermarkPosition(OLD_TOPIC, 0)); + PubSubPosition pos = ApacheKafkaOffsetPosition.of(5L); + boolean completed = state.handleVersionSwap(vs, tpi(OLD_TOPIC, 0), pos); + assertFalse(completed); // need totalRegionCount messages per partition + assertEquals(state.getVersionSwapLowWatermarkPosition(OLD_TOPIC, 0), pos); + + // low watermark requests from non-old-topic should return null + assertNull(state.getVersionSwapLowWatermarkPosition(NEW_TOPIC, 0)); + } + + @Test(expectedExceptions = VeniceException.class) + public void testHandleVersionSwapThrowsOnUnexpectedTopic() { + Set currentAssignment = Collections.singleton(tpi(OLD_TOPIC, 0)); + VersionSwap vs = newVersionSwap(1, DST_REGION_A); + VersionSwapMessageState state = new VersionSwapMessageState(vs, 1, currentAssignment, 1L); + // Passing a message from a wrong topic should throw + state.handleVersionSwap(vs, tpi(NEW_TOPIC, 0), ApacheKafkaOffsetPosition.of(1L)); + } + + @Test + public void testCompletionAfterAllRegions() { + // 2 regions required -> A then B completes partition 0 + Set assignment = Collections.singleton(tpi(OLD_TOPIC, 0)); + VersionSwap vsA = newVersionSwap(2, DST_REGION_A); + VersionSwap vsB = newVersionSwap(2, DST_REGION_B); + VersionSwapMessageState state = new VersionSwapMessageState(vsA, 2, assignment, 1L); + + assertFalse(state.handleVersionSwap(vsA, tpi(OLD_TOPIC, 0), ApacheKafkaOffsetPosition.of(10L))); + assertTrue(state.handleVersionSwap(vsB, tpi(OLD_TOPIC, 0), ApacheKafkaOffsetPosition.of(11L))); + assertTrue(state.isVersionSwapMessagesReceivedForAllPartitions()); + } + + @Test + public void testCheckpointsAndEopBackup() { + Set assignment = new HashSet<>(); + assignment.add(tpi(OLD_TOPIC, 0)); + assignment.add(tpi(OLD_TOPIC, 1)); + VersionSwap vs = newVersionSwap(3, DST_REGION_A); + VersionSwapMessageState state = new VersionSwapMessageState(vs, 1, assignment, 1L); + + // Mark only partition 0 as completed + state.handleVersionSwap(vs, tpi(OLD_TOPIC, 0), ApacheKafkaOffsetPosition.of(1L)); + + // Simulate async finder finished + CompletableFuture done = CompletableFuture.completedFuture(null); + state.setFindNewTopicCheckpointFuture(done); + + Map vsCheckpoints = new HashMap<>(); + vsCheckpoints.put(0, new VeniceChangeCoordinate(NEW_TOPIC, ApacheKafkaOffsetPosition.of(100L), 0)); + state.setNewTopicVersionSwapCheckpoints(vsCheckpoints); + + Map eopCheckpoints = new HashMap<>(); + eopCheckpoints.put(1, new VeniceChangeCoordinate(NEW_TOPIC, ApacheKafkaOffsetPosition.of(200L), 1)); + state.setNewTopicEOPCheckpoints(eopCheckpoints); + + // getNewTopicVersionSwapCheckpoints should return only completed partitions (partition 0) + Set onlyCompleted = state.getNewTopicVersionSwapCheckpoints(); + assertEquals(onlyCompleted.size(), 1); + assertTrue(onlyCompleted.stream().anyMatch(c -> c.getPartition() == 0)); + + // Backup getter should include EOP for incomplete partition (partition 1) + Set withBackup = state.getNewTopicCheckpointsWithEOPAsBackup(); + assertEquals(withBackup.size(), 2); + assertTrue(withBackup.stream().anyMatch(c -> c.getPartition() == 0)); + assertTrue(withBackup.stream().anyMatch(c -> c.getPartition() == 1)); + } + + @Test(expectedExceptions = VeniceException.class) + public void testGetNewTopicVersionSwapCheckpointsThrowsIfNotReady() { + Set assignment = Collections.singleton(tpi(OLD_TOPIC, 0)); + VersionSwap vs = newVersionSwap(4, DST_REGION_A); + VersionSwapMessageState state = new VersionSwapMessageState(vs, 1, assignment, 1L); + // future not set -> should throw + state.getNewTopicVersionSwapCheckpoints(); + } + + @Test(expectedExceptions = VeniceException.class) + public void testGetNewTopicCheckpointsWithEopAsBackupThrowsIfMissingEop() { + Set assignment = new HashSet<>(); + assignment.add(tpi(OLD_TOPIC, 0)); + assignment.add(tpi(OLD_TOPIC, 1)); + VersionSwap vs = newVersionSwap(5, DST_REGION_A); + VersionSwapMessageState state = new VersionSwapMessageState(vs, 1, assignment, 1L); + + // Complete only partition 0 + state.handleVersionSwap(vs, tpi(OLD_TOPIC, 0), ApacheKafkaOffsetPosition.of(1L)); + + state.setFindNewTopicCheckpointFuture(CompletableFuture.completedFuture(null)); + Map vsCheckpoints = new HashMap<>(); + vsCheckpoints.put(0, new VeniceChangeCoordinate(NEW_TOPIC, ApacheKafkaOffsetPosition.of(100L), 0)); + state.setNewTopicVersionSwapCheckpoints(vsCheckpoints); + // Missing EOP for partition 1 -> should throw + state.getNewTopicCheckpointsWithEOPAsBackup(); + } + + @Test + public void testHandleUnsubscribe() { + Set assignment = new HashSet<>(); + assignment.add(tpi(OLD_TOPIC, 0)); + assignment.add(tpi(OLD_TOPIC, 1)); + VersionSwap vs = newVersionSwap(6, DST_REGION_A); + VersionSwapMessageState state = new VersionSwapMessageState(vs, 2, assignment, 1L); + + // After handling a swap for partition 0, we should be tracking watermark for 0 + state.handleVersionSwap(vs, tpi(OLD_TOPIC, 0), ApacheKafkaOffsetPosition.of(7L)); + assertNotNull(state.getVersionSwapLowWatermarkPosition(OLD_TOPIC, 0)); + + // Unsubscribe partition 0, it should be removed from tracking structures + state.handleUnsubscribe(Collections.singleton(0)); + assertFalse(state.getAssignedPartitions().contains(0)); + assertNull(state.getVersionSwapLowWatermarkPosition(OLD_TOPIC, 0)); + // Partition 1 remains assigned + assertTrue(state.getAssignedPartitions().contains(1)); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java index b5d6131fead..bf809c6514c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java @@ -620,7 +620,7 @@ public void testAAIngestionWithStoreView() throws Exception { IntegrationTestUtils.pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); IntegrationTestUtils.pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); IntegrationTestUtils.pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 1); + Assert.assertFalse(polledChangeEvents.isEmpty()); }); // Write 20 records diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveVersionSwapMessage.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveVersionSwapMessage.java index 7c83777c31d..5a8240bb280 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveVersionSwapMessage.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveVersionSwapMessage.java @@ -2,54 +2,77 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.CONTROLLER_USE_MULTI_REGION_REAL_TIME_TOPIC_SWITCHER_ENABLED; +import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC; import static com.linkedin.venice.ConfigKeys.PARENT_KAFKA_CLUSTER_FABRIC_LIST; import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; +import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.D2_SERVICE_NAME; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord; import static com.linkedin.venice.utils.TestUtils.assertCommand; import static com.linkedin.venice.utils.TestUtils.createAndVerifyStoreInAllRegions; import static com.linkedin.venice.utils.TestUtils.updateStoreToHybrid; +import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.davinci.consumer.ChangeEvent; +import com.linkedin.davinci.consumer.ChangelogClientConfig; +import com.linkedin.davinci.consumer.VeniceChangeCoordinate; +import com.linkedin.davinci.consumer.VeniceChangelogConsumer; +import com.linkedin.davinci.consumer.VeniceChangelogConsumerClientFactory; import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceControllerWrapper; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; +import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.VersionSwap; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubConsumerAdapterContext; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.DefaultPubSubMessage; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.samza.VeniceSystemProducer; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; +import io.tehuti.metrics.MetricsRepository; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.avro.generic.GenericRecord; +import org.apache.samza.system.SystemProducer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -194,6 +217,209 @@ public void testMultiDataCenterVersioSwapMessageWrite() { } } + @Test(timeOut = TEST_TIMEOUT) + public void testCDCMultiDataCenterVersionSwapMessageHandling() throws Exception { + String storeName = Utils.getUniqueString("test-store"); + try { + String keySchemaStr = TestChangelogKey.SCHEMA$.toString(); + String valueSchemaStr = TestChangelogValue.SCHEMA$.toString(); + createAndVerifyStoreInAllRegions( + storeName, + parentControllerClient, + dcControllerClientList, + keySchemaStr, + valueSchemaStr); + updateStoreToHybrid(storeName, parentControllerClient, Optional.of(true), Optional.of(true), Optional.of(true)); + // Force to rewind the entire RT + parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setHybridRewindSeconds(600L)); + // Empty push to create a version + VersionCreationResponse versionCreationResponse = + parentControllerClient.emptyPush(storeName, Utils.getUniqueString("empty-hybrid-push"), 1L); + assertCommand(versionCreationResponse); + TestUtils.waitForNonDeterministicAssertion( + PUSH_TIMEOUT, + TimeUnit.MILLISECONDS, + true, + () -> Assert.assertEquals( + parentControllerClient.queryJobStatus(versionCreationResponse.getKafkaTopic()).getStatus(), + ExecutionStatus.COMPLETED.toString())); + // Write 100 records in one child region + try (VeniceSystemProducer veniceProducer = IntegrationTestPushUtils.getSamzaProducer( + childDatacenters.get(0).getClusters().get(CLUSTER_NAMES[0]), + storeName, + Version.PushType.STREAM)) { + veniceProducer.start(); + performStreamWrites(veniceProducer, storeName, 100, 0); + } + // Write another 50 records in remote region + try (VeniceSystemProducer veniceProducer = IntegrationTestPushUtils.getSamzaProducer( + childDatacenters.get(1).getClusters().get(CLUSTER_NAMES[0]), + storeName, + Version.PushType.STREAM)) { + veniceProducer.start(); + performStreamWrites(veniceProducer, storeName, 50, 100); + } + // Start a local consumer with version swap by control messages that should subscribe to v1 + MetricsRepository metricsRepository = new MetricsRepository(); + VeniceChangelogConsumerClientFactory changelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory( + getDefaultVersionSwapEnabledChangelogClientConfig(childDatacenters.get(0)), + metricsRepository); + VeniceChangelogConsumer consumer = + changelogConsumerClientFactory.getChangelogConsumer(storeName); + consumer.subscribeAll().get(); + Map polledChangeEventsMap = new HashMap<>(); + List polledChangeEventsKeyList = new ArrayList<>(); + pollUntilSpecificIndexes( + consumer, + polledChangeEventsMap, + polledChangeEventsKeyList, + Collections.singleton("149")); + // Start a second consumer configured with a short version swap timeout and a non-existing region to simulate when + // a region is down. + MetricsRepository metricsRepository2 = new MetricsRepository(); + ChangelogClientConfig timeoutConsumerConfig = + getDefaultVersionSwapEnabledChangelogClientConfig(childDatacenters.get(0)); + timeoutConsumerConfig.setTotalRegionCount(childDatacenters.size() + 1); + timeoutConsumerConfig.setVersionSwapTimeoutInMs(1000L); + timeoutConsumerConfig.setConsumerName("timeout-consumer"); + VeniceChangelogConsumerClientFactory timeoutConsumerFactory = + new VeniceChangelogConsumerClientFactory(timeoutConsumerConfig, metricsRepository2); + VeniceChangelogConsumer timeoutConsumer = + timeoutConsumerFactory.getChangelogConsumer(storeName); + timeoutConsumer.subscribeAll().get(); + Map timeoutConsumerpolledChangeEventsMap = new HashMap<>(); + List timeoutConsumerpolledChangeEventsKeyList = new ArrayList<>(); + // For the other consumer we need to make sure it consumes to tail for all partitions for ease of verification. + // Since it has a short version swap timeout, without this condition a partition could forcefully swap to the new + // version before reaching the first version swap message. + pollUntilSpecificEventKeyListSize( + timeoutConsumer, + timeoutConsumerpolledChangeEventsMap, + timeoutConsumerpolledChangeEventsKeyList, + 150); + + // Another empty push should trigger version swap message write and switch current version to v2 + VersionCreationResponse newVersionCreationResponse = + parentControllerClient.emptyPush(storeName, Utils.getUniqueString("empty-hybrid-push"), 1L); + assertCommand(newVersionCreationResponse); + TestUtils.waitForNonDeterministicAssertion( + PUSH_TIMEOUT, + TimeUnit.MILLISECONDS, + true, + () -> Assert.assertEquals( + parentControllerClient.queryJobStatus(newVersionCreationResponse.getKafkaTopic()).getStatus(), + ExecutionStatus.COMPLETED.toString())); + // Write 5 more records in local region, we need enough to cover all the partitions. + try (VeniceSystemProducer veniceProducer = IntegrationTestPushUtils.getSamzaProducer( + childDatacenters.get(0).getClusters().get(CLUSTER_NAMES[0]), + storeName, + Version.PushType.STREAM)) { + veniceProducer.start(); + performStreamWrites(veniceProducer, storeName, 5, 150); + } + Set afterNewVersionIndexes = new HashSet<>(); + for (int i = 150; i < 155; i++) { + afterNewVersionIndexes.add(String.valueOf(i)); + } + pollUntilSpecificIndexes(consumer, polledChangeEventsMap, polledChangeEventsKeyList, afterNewVersionIndexes); + // In this controlled test we shouldn't see any duplicates since version swap messages are not interleaved with + // any data writes. 100 local writes + 50 remote writs + 5 after version swap writes = 155 + Assert.assertEquals(polledChangeEventsKeyList.size(), 155); + for (int i = 0; i < 155; i++) { + Assert.assertTrue(polledChangeEventsMap.containsKey(String.valueOf(i))); + } + // Hacky verification using toString since internal topic is not exposed by the VeniceChangeCoordinate + Assert + .assertTrue(polledChangeEventsMap.get("154").toString().contains(newVersionCreationResponse.getKafkaTopic())); + + // The other consumer should be able to consume at least 305 events if version swap via timeout is successful. + // 150 (base) + 150 (base replayed in new version) + 5 (new writes after version swap) = 305. + pollUntilSpecificEventKeyListSize( + timeoutConsumer, + timeoutConsumerpolledChangeEventsMap, + timeoutConsumerpolledChangeEventsKeyList, + 305); + Assert.assertTrue( + timeoutConsumerpolledChangeEventsMap.get("149") + .toString() + .contains(newVersionCreationResponse.getKafkaTopic())); + } finally { + deleteStores(storeName); + } + } + + private void pollUntilSpecificIndexes( + VeniceChangelogConsumer consumer, + Map polledChangeEventsMap, + List polledChangeEventsKeyList, + Set specificIndexes) { + final Set consumedSpecificIndexes = new HashSet<>(); + TestUtils.waitForNonDeterministicCompletion(30, TimeUnit.SECONDS, () -> { + Collection, VeniceChangeCoordinate>> pubSubMessages = + consumer.poll(100); + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: pubSubMessages) { + String key = pubSubMessage.getKey() == null ? null : String.valueOf(pubSubMessage.getKey().get("id")); + polledChangeEventsMap.put(key, pubSubMessage.getPosition()); + polledChangeEventsKeyList.add(key); + if (key != null && specificIndexes.contains(key)) { + consumedSpecificIndexes.add(key); + } + } + return consumedSpecificIndexes.size() == specificIndexes.size(); + }); + } + + private void pollUntilSpecificEventKeyListSize( + VeniceChangelogConsumer consumer, + Map polledChangeEventsMap, + List polledChangeEventsKeyList, + int specifiedSize) { + TestUtils.waitForNonDeterministicCompletion(30, TimeUnit.SECONDS, () -> { + Collection, VeniceChangeCoordinate>> pubSubMessages = + consumer.poll(100); + for (PubSubMessage, VeniceChangeCoordinate> pubSubMessage: pubSubMessages) { + String key = pubSubMessage.getKey() == null ? null : String.valueOf(pubSubMessage.getKey().get("id")); + polledChangeEventsMap.put(key, pubSubMessage.getPosition()); + polledChangeEventsKeyList.add(key); + } + return polledChangeEventsKeyList.size() >= specifiedSize; + }); + } + + private ChangelogClientConfig getDefaultVersionSwapEnabledChangelogClientConfig( + VeniceMultiClusterWrapper localRegion) { + Properties consumerProperties = new Properties(); + consumerProperties.putAll(multiRegionMultiClusterWrapper.getPubSubClientProperties()); + consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localRegion.getPubSubBrokerWrapper().getAddress()); + ChangelogClientConfig changelogClientConfig = new ChangelogClientConfig().setConsumerProperties(consumerProperties) + .setControllerD2ServiceName(D2_SERVICE_NAME) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setLocalD2ZkHosts(localRegion.getZkServerWrapper().getAddress()) + .setD2Client(IntegrationTestPushUtils.getD2Client(localRegion.getZkServerWrapper().getAddress())) + .setVersionSwapDetectionIntervalTimeInSeconds(3L) + .setControllerRequestRetryCount(3) + .setBootstrapFileSystemPath(getTempDataDirectory().getAbsolutePath()) + .setVersionSwapByControlMessageEnabled(true) + .setClientRegionName(localRegion.getRegionName()) + .setTotalRegionCount(childDatacenters.size()); + return changelogClientConfig; + } + + private void performStreamWrites(SystemProducer veniceProducer, String storeName, int numPuts, int startIndex) { + for (int i = startIndex; i < startIndex + numPuts; i++) { + TestChangelogKey key = new TestChangelogKey(); + key.id = i; + + Object valueObject; + TestChangelogValue value = new TestChangelogValue(); + value.firstName = "first_name_stream_" + i; + value.lastName = "last_name_stream_" + i; + valueObject = value; + + sendStreamingRecord(veniceProducer, storeName, key, valueObject, null); + } + } + private void verifyVersionSwapMessagesInAllDataCenters( String topicName, String oldVersionTopic, diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java index cccaf7285ef..9e606bcacc3 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java @@ -812,7 +812,21 @@ public static void createAndVerifyStoreInAllRegions( String storeName, ControllerClient parentControllerClient, List controllerClientList) { - Assert.assertFalse(parentControllerClient.createNewStore(storeName, "owner", "\"string\"", "\"string\"").isError()); + createAndVerifyStoreInAllRegions( + storeName, + parentControllerClient, + controllerClientList, + "\"string\"", + "\"string\""); + } + + public static void createAndVerifyStoreInAllRegions( + String storeName, + ControllerClient parentControllerClient, + List controllerClientList, + String keySchema, + String valueSchema) { + Assert.assertFalse(parentControllerClient.createNewStore(storeName, "owner", keySchema, valueSchema).isError()); TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { for (ControllerClient client: controllerClientList) { Assert.assertFalse(client.getStore(storeName).isError());