77import static com .linkedin .venice .ConfigKeys .DATA_BASE_PATH ;
88import static com .linkedin .venice .ConfigKeys .KAFKA_BOOTSTRAP_SERVERS ;
99import static com .linkedin .venice .ConfigKeys .ZOOKEEPER_ADDRESS ;
10- import static com .linkedin .venice .VeniceConstants .ENVIRONMENT_CONFIG_KEY_FOR_REGION_NAME ;
11- import static com .linkedin .venice .VeniceConstants .SYSTEM_PROPERTY_FOR_APP_RUNNING_REGION ;
1210import static com .linkedin .venice .kafka .protocol .enums .ControlMessageType .START_OF_SEGMENT ;
1311import static com .linkedin .venice .schema .rmd .RmdConstants .REPLICATION_CHECKPOINT_VECTOR_FIELD_POS ;
1412import static com .linkedin .venice .stats .dimensions .VeniceResponseStatusCategory .FAIL ;
9896import java .util .Set ;
9997import java .util .concurrent .CompletableFuture ;
10098import java .util .concurrent .ConcurrentHashMap ;
99+ import java .util .concurrent .CountDownLatch ;
101100import java .util .concurrent .ExecutionException ;
102101import java .util .concurrent .ExecutorService ;
103102import java .util .concurrent .Executors ;
104103import java .util .concurrent .TimeUnit ;
105104import java .util .concurrent .TimeoutException ;
106105import java .util .concurrent .atomic .AtomicBoolean ;
107106import java .util .concurrent .atomic .AtomicLong ;
107+ import java .util .concurrent .atomic .AtomicReference ;
108108import java .util .concurrent .locks .ReadWriteLock ;
109109import java .util .concurrent .locks .ReentrantReadWriteLock ;
110110import java .util .stream .Collectors ;
@@ -185,6 +185,7 @@ public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsume
185185 protected final String clientRegionName ;
186186 protected final int totalRegionCount ;
187187 protected final long versionSwapTimeoutInMs ;
188+ protected final AtomicReference <CountDownLatch > onGoingVersionSwapSignal = new AtomicReference <>();
188189 /**
189190 * Interaction of this field should acquire the subscriptionLock.readLock()
190191 */
@@ -219,29 +220,24 @@ public VeniceChangelogConsumerImpl(
219220 this .pubSubMessageDeserializer = pubSubMessageDeserializer ;
220221 this .versionSwapByControlMessage = changelogClientConfig .isVersionSwapByControlMessageEnabled ();
221222 this .totalRegionCount = changelogClientConfig .getTotalRegionCount ();
223+ this .clientRegionName = changelogClientConfig .getClientRegionName ();
222224 this .versionSwapTimeoutInMs = changelogClientConfig .getVersionSwapTimeoutInMs ();
223225 this .time = new SystemTime ();
226+ this .onGoingVersionSwapSignal .set (new CountDownLatch (0 ));
224227 if (versionSwapByControlMessage ) {
225- String clientRegionNameFromConfig = changelogClientConfig .getClientRegionName ();
226- if (clientRegionNameFromConfig .isEmpty ()) {
227- String regionFromEnv = System .getenv (ENVIRONMENT_CONFIG_KEY_FOR_REGION_NAME );
228- if (regionFromEnv == null ) {
229- regionFromEnv = System .getProperty (SYSTEM_PROPERTY_FOR_APP_RUNNING_REGION );
230- }
231- if (regionFromEnv == null ) {
232- throw new VeniceException (
233- "Failed to enable version swap by control message because cannot resolve client region name from config, environment or system property" );
234- }
235- clientRegionName = regionFromEnv ;
236- } else {
237- clientRegionName = clientRegionNameFromConfig ;
228+ // Version swap related configs should all be resolved or explicitly set at this point.
229+ if (this .clientRegionName .isEmpty ()) {
230+ throw new VeniceException (
231+ "Failed to enable version swap by control message because client region name is missing" );
232+ }
233+ if (this .totalRegionCount <= 0 ) {
234+ throw new VeniceException (
235+ "Failed to enable version swap by control message because total region count is not set" );
238236 }
239237 LOGGER .info (
240238 "VeniceChangelogConsumer version swap by control message is enabled. Client region name: {}, total region count: {}" ,
241239 clientRegionName ,
242240 totalRegionCount );
243- } else {
244- clientRegionName = "" ;
245241 }
246242
247243 seekExecutorService = Executors .newFixedThreadPool (10 , new DaemonThreadFactory (getClass ().getSimpleName ()));
@@ -382,21 +378,39 @@ protected CompletableFuture<Void> internalSubscribe(Set<Integer> partitions, Pub
382378 throw new RuntimeException (e );
383379 }
384380
385- PubSubTopic topicToSubscribe ;
386- if (topic == null ) {
387- topicToSubscribe = getCurrentServingVersionTopic ();
381+ if (versionSwapByControlMessage ) {
382+ boolean lockAcquiredAndNoOngoingVersionSwap = false ;
383+ for (int i = 0 ; i <= MAX_SUBSCRIBE_RETRIES ; i ++) {
384+ // If version swap is in progress, wait for it to finish
385+ try {
386+ onGoingVersionSwapSignal .get ().await ();
387+ } catch (InterruptedException e ) {
388+ throw new RuntimeException (e );
389+ }
390+ subscriptionLock .writeLock ().lock ();
391+ if (versionSwapMessageState != null ) {
392+ // A new version swap is in progress, wait for it to finish again
393+ subscriptionLock .writeLock ().unlock ();
394+ } else {
395+ // No version swap is in progress, proceed with subscription
396+ lockAcquiredAndNoOngoingVersionSwap = true ;
397+ break ;
398+ }
399+ }
400+ if (!lockAcquiredAndNoOngoingVersionSwap ) {
401+ // This should be extremely rare where the subscribe request is constantly conflicting with new version swaps
402+ throw new VeniceException ("Unable to subscribe to new partitions due to conflicting version swaps" );
403+ }
388404 } else {
389- topicToSubscribe = topic ;
405+ subscriptionLock . writeLock (). lock () ;
390406 }
391407
392- subscriptionLock .writeLock ().lock ();
393408 try {
394- if (versionSwapByControlMessage && versionSwapMessageState != null ) {
395- throw new VeniceException (
396- String .format (
397- "Unable to subscribe to new partitions while the changelog consumer is undergoing version swap from topic %s to topic %s" ,
398- versionSwapMessageState .getOldVersionTopic (),
399- versionSwapMessageState .getNewVersionTopic ()));
409+ PubSubTopic topicToSubscribe ;
410+ if (topic == null ) {
411+ topicToSubscribe = getCurrentServingVersionTopic ();
412+ } else {
413+ topicToSubscribe = topic ;
400414 }
401415 Set <PubSubTopicPartition > topicPartitionSet = getTopicAssignment ();
402416 for (PubSubTopicPartition topicPartition : topicPartitionSet ) {
@@ -841,6 +855,7 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
841855 changeCaptureStats .emitVersionSwapCountMetrics (SUCCESS );
842856 changeCaptureStats .setUndergoingVersionSwap (0 );
843857 versionSwapMessageState = null ;
858+ onGoingVersionSwapSignal .get ().countDown ();
844859 } else {
845860 return Collections .emptyList ();
846861 }
@@ -860,7 +875,13 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
860875 changeCaptureStats .emitVersionSwapCountMetrics (SUCCESS );
861876 changeCaptureStats .setUndergoingVersionSwap (0 );
862877 versionSwapMessageState = null ;
878+ onGoingVersionSwapSignal .get ().countDown ();
863879 } else {
880+ LOGGER .warn (
881+ "Version swap from topic: {} to topic: {}, generation id: {} already timed out but still unable to find new topic checkpoints to go to." ,
882+ versionSwapMessageState .getOldVersionTopic (),
883+ versionSwapMessageState .getNewVersionTopic (),
884+ versionSwapMessageState .getVersionSwapGenerationId ());
864885 return Collections .emptyList ();
865886 }
866887 }
@@ -974,6 +995,7 @@ private boolean isNewVersionCheckpointsReady(long timeoutInMs) throws Interrupte
974995 versionSwapMessageState .getNewVersionTopic (),
975996 versionSwapMessageState .getVersionSwapGenerationId (),
976997 versionSwapMessageState .getAssignedPartitions ()));
998+ return false ;
977999 }
9781000 return true ;
9791001 }
@@ -1441,6 +1463,7 @@ protected boolean handleVersionSwapMessageInVT(
14411463 Set <PubSubTopicPartition > currentAssignment = getTopicAssignment ();
14421464 versionSwapMessageState =
14431465 new VersionSwapMessageState (versionSwap , totalRegionCount , currentAssignment , time .getMilliseconds ());
1466+ onGoingVersionSwapSignal .set (new CountDownLatch (1 ));
14441467 changeCaptureStats .setUndergoingVersionSwap (1 );
14451468 LOGGER .info (
14461469 "New version detected for store: {} through version swap messages. Performing version swap from topic: {} to topic: {}, generation id: {}" ,
0 commit comments