7373import com .linkedin .venice .store .rocksdb .RocksDBUtils ;
7474import com .linkedin .venice .utils .DaemonThreadFactory ;
7575import com .linkedin .venice .utils .DictionaryUtils ;
76+ import com .linkedin .venice .utils .SystemTime ;
77+ import com .linkedin .venice .utils .Time ;
7678import com .linkedin .venice .utils .Utils ;
7779import com .linkedin .venice .utils .VeniceProperties ;
7880import com .linkedin .venice .utils .concurrent .VeniceConcurrentHashMap ;
@@ -182,10 +184,12 @@ public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsume
182184 protected final boolean versionSwapByControlMessage ;
183185 protected final String clientRegionName ;
184186 protected final int totalRegionCount ;
187+ protected final long versionSwapTimeoutInMs ;
185188 /**
186189 * Interaction of this field should acquire the subscriptionLock.readLock()
187190 */
188191 protected VersionSwapMessageState versionSwapMessageState = null ;
192+ protected Time time ;
189193
190194 public VeniceChangelogConsumerImpl (
191195 ChangelogClientConfig changelogClientConfig ,
@@ -215,6 +219,8 @@ public VeniceChangelogConsumerImpl(
215219 this .pubSubMessageDeserializer = pubSubMessageDeserializer ;
216220 this .versionSwapByControlMessage = changelogClientConfig .isVersionSwapByControlMessageEnabled ();
217221 this .totalRegionCount = changelogClientConfig .getTotalRegionCount ();
222+ this .versionSwapTimeoutInMs = changelogClientConfig .getVersionSwapTimeoutInMs ();
223+ this .time = new SystemTime ();
218224 if (versionSwapByControlMessage ) {
219225 String clientRegionNameFromConfig = changelogClientConfig .getClientRegionName ();
220226 if (clientRegionNameFromConfig .isEmpty ()) {
@@ -352,7 +358,7 @@ public CompletableFuture<Void> subscribe(Set<Integer> partitions) {
352358 for (int partition : partitions ) {
353359 getPartitionToBootstrapState ().put (partition , false );
354360 }
355- subscribeTime = System . currentTimeMillis ();
361+ subscribeTime = time . getMilliseconds ();
356362 return internalSubscribe (partitions , null );
357363 }
358364
@@ -815,12 +821,18 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
815821 return Collections .emptyList ();
816822 }
817823
818- if (versionSwapByControlMessage ) {
819- if (versionSwapMessageState != null
820- && versionSwapMessageState .isVersionSwapMessagesReceivedForAllPartitions ()) {
821- try {
822- versionSwapMessageState .getFindNewTopicCheckpointFuture ().get (timeoutInMs , TimeUnit .MILLISECONDS );
823- synchronousSeekToCheckpoint (versionSwapMessageState .getNewTopicCheckpoints ());
824+ if (versionSwapByControlMessage && versionSwapMessageState != null ) {
825+ /*
826+ * If version swap by control message is enabled and the consumer is undergoing version swap we need to check
827+ * and act on two scenarios:
828+ * 1. If all version swap messages have been received for all partitions, we need to seek to the new topic.
829+ * 2. If we have reached the timeout for the version swap, we need to forcefully seek to the new topic using
830+ * the EOP positions for any remaining partitions as our backup plan. See javadoc of
831+ * VersionSwapMessageState.getVersionSwapStartTimestamp() for more details.
832+ */
833+ if (versionSwapMessageState .isVersionSwapMessagesReceivedForAllPartitions ()) {
834+ if (isNewVersionCheckpointsReady (timeoutInMs )) {
835+ synchronousSeekToCheckpoint (versionSwapMessageState .getNewTopicVersionSwapCheckpoints ());
824836 LOGGER .info (
825837 "Version swap completed from topic: {} to topic: {}, generation id: {}" ,
826838 versionSwapMessageState .getOldVersionTopic (),
@@ -829,23 +841,27 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
829841 changeCaptureStats .emitVersionSwapCountMetrics (SUCCESS );
830842 changeCaptureStats .setUndergoingVersionSwap (0 );
831843 versionSwapMessageState = null ;
832- } catch (TimeoutException timeoutException ) {
833- // Still waiting for internalFindNewVersionCheckpoints to complete
844+ } else {
834845 return Collections .emptyList ();
835- } catch (ExecutionException e ) {
836- // Re-attempt the seek but should report the error
837- LOGGER .warn (
838- "Completed consuming old topic: {} for version swap but caught an exception when looking for corresponding checkpoint in new topic: {}. Retrying." ,
846+ }
847+ } else if (time .getMilliseconds ()
848+ - versionSwapMessageState .getVersionSwapStartTimestamp () > versionSwapTimeoutInMs ) {
849+ if (!getTopicAssignment ().isEmpty ()) {
850+ internalUnsubscribe (versionSwapMessageState .getIncompletePartitions (), true );
851+ }
852+ if (isNewVersionCheckpointsReady (timeoutInMs )) {
853+ synchronousSeekToCheckpoint (versionSwapMessageState .getNewTopicCheckpointsWithEOPAsBackup ());
854+ LOGGER .info (
855+ "Version swap completed after timeout from topic: {} to topic: {}, generation id: {}. Partitions: {} are seeked to EOP positions." ,
839856 versionSwapMessageState .getOldVersionTopic (),
840857 versionSwapMessageState .getNewVersionTopic (),
841- e );
842- changeCaptureStats .emitVersionSwapCountMetrics (FAIL );
843- versionSwapMessageState .setFindNewTopicCheckpointFuture (
844- internalFindNewVersionCheckpoints (
845- versionSwapMessageState .getOldVersionTopic (),
846- versionSwapMessageState .getNewVersionTopic (),
847- versionSwapMessageState .getVersionSwapGenerationId (),
848- versionSwapMessageState .getAssignedPartitions ()));
858+ versionSwapMessageState .getVersionSwapGenerationId (),
859+ versionSwapMessageState .getIncompletePartitions ());
860+ changeCaptureStats .emitVersionSwapCountMetrics (SUCCESS );
861+ changeCaptureStats .setUndergoingVersionSwap (0 );
862+ versionSwapMessageState = null ;
863+ } else {
864+ return Collections .emptyList ();
849865 }
850866 }
851867 }
@@ -935,6 +951,33 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
935951 }
936952 }
937953
954+ private boolean isNewVersionCheckpointsReady (long timeoutInMs ) throws InterruptedException {
955+ if (versionSwapMessageState == null ) {
956+ return false ;
957+ }
958+ try {
959+ versionSwapMessageState .getFindNewTopicCheckpointFuture ().get (timeoutInMs , TimeUnit .MILLISECONDS );
960+ } catch (TimeoutException timeoutException ) {
961+ // Still waiting for internalFindNewVersionCheckpoints to complete.
962+ return false ;
963+ } catch (ExecutionException e ) {
964+ // Re-attempt the seek but should report the error.
965+ LOGGER .warn (
966+ "Caught an exception when looking for corresponding checkpoints with generation id: {} in new topic: {}. Retrying." ,
967+ versionSwapMessageState .getVersionSwapGenerationId (),
968+ versionSwapMessageState .getNewVersionTopic (),
969+ e );
970+ changeCaptureStats .emitVersionSwapCountMetrics (FAIL );
971+ versionSwapMessageState .setFindNewTopicCheckpointFuture (
972+ internalFindNewVersionCheckpoints (
973+ versionSwapMessageState .getOldVersionTopic (),
974+ versionSwapMessageState .getNewVersionTopic (),
975+ versionSwapMessageState .getVersionSwapGenerationId (),
976+ versionSwapMessageState .getAssignedPartitions ()));
977+ }
978+ return true ;
979+ }
980+
938981 void maybeUpdatePartitionToBootstrapMap (DefaultPubSubMessage message , PubSubTopicPartition pubSubTopicPartition ) {
939982 if (getSubscribeTime () - message .getValue ().producerMetadata .messageTimestamp <= TimeUnit .MINUTES .toMillis (1 )) {
940983 getPartitionToBootstrapState ().put (pubSubTopicPartition .getPartitionNumber (), true );
@@ -1026,7 +1069,7 @@ protected <T> T processRecordBytes(
10261069 * from seeking in between a sequence of related version swap messages with a partition. During a version swap we will
10271070 * also use a low watermark approach for the {@link VeniceChangeCoordinate} returned. However, the changelog consumer
10281071 * is still vulnerable to this edge case when seekToTimestamp and seekToTail is used. These edge cases will be rare
1029- * so for now we will have a metric to detect it and restarting the changelog consumer and re- seek should fix it .
1072+ * and will be handled by the version swap timeout and backup strategy of seek to new version's EOP .
10301073 */
10311074 protected Optional <PubSubMessage <K , ChangeEvent <V >, VeniceChangeCoordinate >> convertPubSubMessageToPubSubChangeEventWithVersionSwapState (
10321075 DefaultPubSubMessage message ,
@@ -1396,7 +1439,8 @@ protected boolean handleVersionSwapMessageInVT(
13961439 .isVersionSwapRelevant (pubSubTopicPartition .getTopicName (), clientRegionName , versionSwap )) {
13971440 if (versionSwapMessageState == null ) {
13981441 Set <PubSubTopicPartition > currentAssignment = getTopicAssignment ();
1399- versionSwapMessageState = new VersionSwapMessageState (versionSwap , totalRegionCount , currentAssignment );
1442+ versionSwapMessageState =
1443+ new VersionSwapMessageState (versionSwap , totalRegionCount , currentAssignment , time .getMilliseconds ());
14001444 changeCaptureStats .setUndergoingVersionSwap (1 );
14011445 LOGGER .info (
14021446 "New version detected for store: {} through version swap messages. Performing version swap from topic: {} to topic: {}, generation id: {}" ,
0 commit comments