@@ -187,9 +187,9 @@ public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsume
187187 protected final long versionSwapTimeoutInMs ;
188188 protected final AtomicReference <CountDownLatch > onGoingVersionSwapSignal = new AtomicReference <>();
189189 /**
190- * Interaction of this field should acquire the subscriptionLock.readLock ()
190+ * Interaction of this field should acquire the subscriptionLock.writeLock ()
191191 */
192- protected VersionSwapMessageState versionSwapMessageState = null ;
192+ protected volatile VersionSwapMessageState versionSwapMessageState = null ;
193193 protected Time time ;
194194
195195 public VeniceChangelogConsumerImpl (
@@ -343,6 +343,11 @@ public VeniceChangelogConsumerImpl(
343343 LOGGER .info ("Start a change log consumer client for store: {}" , storeName );
344344 }
345345
346+ // Unit test only and read only
347+ VersionSwapMessageState getVersionSwapMessageState () {
348+ return this .versionSwapMessageState ;
349+ }
350+
346351 @ Override
347352 public int getPartitionCount () {
348353 Store store = getStore ();
@@ -818,7 +823,7 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
818823 String topicSuffix ,
819824 boolean includeControlMessage ) {
820825 Collection <PubSubMessage <K , ChangeEvent <V >, VeniceChangeCoordinate >> pubSubMessages = new ArrayList <>();
821- Map <PubSubTopicPartition , List <DefaultPubSubMessage >> messagesMap = Collections . EMPTY_MAP ;
826+ Map <PubSubTopicPartition , List <DefaultPubSubMessage >> messagesMap ;
822827 boolean lockAcquired = false ;
823828
824829 try {
@@ -853,7 +858,6 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
853858 versionSwapMessageState .getNewVersionTopic (),
854859 versionSwapMessageState .getVersionSwapGenerationId ());
855860 changeCaptureStats .emitVersionSwapCountMetrics (SUCCESS );
856- changeCaptureStats .setUndergoingVersionSwap (0 );
857861 versionSwapMessageState = null ;
858862 onGoingVersionSwapSignal .get ().countDown ();
859863 } else {
@@ -873,7 +877,6 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
873877 versionSwapMessageState .getVersionSwapGenerationId (),
874878 versionSwapMessageState .getIncompletePartitions ());
875879 changeCaptureStats .emitVersionSwapCountMetrics (SUCCESS );
876- changeCaptureStats .setUndergoingVersionSwap (0 );
877880 versionSwapMessageState = null ;
878881 onGoingVersionSwapSignal .get ().countDown ();
879882 } else {
@@ -1109,8 +1112,6 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
11091112 }
11101113 }
11111114 if (messageType .equals (MessageType .DELETE )) {
1112- Delete delete = (Delete ) message .getValue ().payloadUnion ;
1113-
11141115 // Deletes have a previous and current value of null. So just fill it in!
11151116 ChangeEvent <V > changeEvent = new ChangeEvent <>(null , null );
11161117 pubSubChangeEventMessage = Optional .of (
@@ -1464,7 +1465,6 @@ protected boolean handleVersionSwapMessageInVT(
14641465 versionSwapMessageState =
14651466 new VersionSwapMessageState (versionSwap , totalRegionCount , currentAssignment , time .getMilliseconds ());
14661467 onGoingVersionSwapSignal .set (new CountDownLatch (1 ));
1467- changeCaptureStats .setUndergoingVersionSwap (1 );
14681468 LOGGER .info (
14691469 "New version detected for store: {} through version swap messages. Performing version swap from topic: {} to topic: {}, generation id: {}" ,
14701470 storeName ,
@@ -1650,7 +1650,12 @@ protected boolean switchToNewTopic(PubSubTopic newTopic, String topicSuffix, Int
16501650 }
16511651 unsubscribe (partitions );
16521652 try {
1653- internalSubscribe (partitions , mergedTopicName ).get ();
1653+ Set <VeniceChangeCoordinate > beginningOfNewTopic = new HashSet <>(partitions .size ());
1654+ for (Integer p : partitions ) {
1655+ beginningOfNewTopic
1656+ .add (new VeniceChangeCoordinate (mergedTopicName .getName (), PubSubSymbolicPosition .EARLIEST , p ));
1657+ }
1658+ synchronousSeekToCheckpoint (beginningOfNewTopic );
16541659 } catch (Exception e ) {
16551660 throw new VeniceException ("Subscribe to new topic:" + mergedTopicName + " is not successful, error: " + e );
16561661 }
0 commit comments