Skip to content

Commit e3f7077

Browse files
committed
extend rt versioning to system stores
1 parent ac09987 commit e3f7077

File tree

29 files changed

+708
-140
lines changed

29 files changed

+708
-140
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.linkedin.venice.meta.Store;
5858
import com.linkedin.venice.meta.StoreDataChangedListener;
5959
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
60+
import com.linkedin.venice.meta.SystemStoreAttributes;
6061
import com.linkedin.venice.meta.Version;
6162
import com.linkedin.venice.pubsub.api.PubSubPosition;
6263
import com.linkedin.venice.pushmonitor.ExecutionStatus;
@@ -308,7 +309,8 @@ public DaVinciBackend(
308309
ingestionService.getVeniceWriterFactory(),
309310
instanceName,
310311
valueSchemaEntry,
311-
updateSchemaEntry);
312+
updateSchemaEntry,
313+
(this::getStore));
312314
}
313315

314316
ingestionService.start();
@@ -558,6 +560,23 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
558560
return storeRepository;
559561
}
560562

563+
public final Object getStore(String storeName) {
564+
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
565+
if (systemStoreType != null) {
566+
String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName);
567+
Store userStore = storeRepository.getStore(userStoreName);
568+
Map<String, SystemStoreAttributes> systemStores = userStore.getSystemStores();
569+
for (Map.Entry<String, SystemStoreAttributes> systemStoreEntries: systemStores.entrySet()) {
570+
if (storeName.startsWith(systemStoreEntries.getKey())) {
571+
return systemStoreEntries.getValue();
572+
}
573+
}
574+
return null;
575+
} else {
576+
return storeRepository.getStore(storeName);
577+
}
578+
}
579+
561580
public ObjectCacheBackend getObjectCache() {
562581
return cacheBackend.get();
563582
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ private void asyncStart() {
360360
ingestionService.getVeniceWriterFactory(),
361361
instance.getNodeId(),
362362
valueSchemaEntry,
363-
updateSchemaEntry);
363+
updateSchemaEntry,
364+
helixReadOnlyStoreRepository::getStore);
364365

365366
// Record replica status in Zookeeper.
366367
// Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier.

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,8 @@ public KafkaStoreIngestionService(
338338
zkSharedSchemaRepository.get(),
339339
pubSubTopicRepository,
340340
serverConfig.getMetaStoreWriterCloseTimeoutInMS(),
341-
serverConfig.getMetaStoreWriterCloseConcurrency());
341+
serverConfig.getMetaStoreWriterCloseConcurrency(),
342+
storeName -> metadataRepo.getStore(storeName));
342343
metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() {
343344
@Override
344345
public void handleStoreDeleted(Store store) {
@@ -1472,7 +1473,8 @@ public InternalDaVinciRecordTransformerConfig getInternalRecordTransformerConfig
14721473
return storeNameToInternalRecordTransformerConfig.get(storeName);
14731474
}
14741475

1475-
public void attemptToPrintIngestionInfoFor(String storeName, Integer version, Integer partition, String regionName) {
1476+
public void attemptToPrintIngestionInfoFor(Store store, Integer version, Integer partition, String regionName) {
1477+
String storeName = store.getName();
14761478
try {
14771479
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version));
14781480
StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName());
@@ -1494,7 +1496,7 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In
14941496
String infoPrefix = "isCurrentVersion: " + (storeIngestionTask.isCurrentVersion()) + "\n";
14951497
if (storeIngestionTask.isHybridMode() && partitionConsumptionState.isEndOfPushReceived()
14961498
&& partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER) {
1497-
ingestingTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(storeName));
1499+
ingestingTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store));
14981500
}
14991501
PubSubTopicPartition ingestingTopicPartition = new PubSubTopicPartitionImpl(ingestingTopic, partition);
15001502

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4262,9 +4262,12 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep
42624262
// cluster these metastore writes could be spiky
42634263
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
42644264
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
4265-
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
4266-
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
4267-
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
4265+
Store metaStore = metaStoreWriter.storeResolver.apply(metaStoreName);
4266+
if (metaStore != null) {
4267+
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(metaStore));
4268+
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
4269+
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
4270+
}
42684271
}
42694272
}
42704273
return;

clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ protected void checkAndMaybeLogHeartbeatDelayMap(
645645
heartbeatTs,
646646
currentTimestamp);
647647
kafkaStoreIngestionService.attemptToPrintIngestionInfoFor(
648-
storeName.getKey(),
648+
metadataRepository.getStore(storeName.getKey()),
649649
version.getKey(),
650650
partition.getKey(),
651651
region.getKey());

clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN;
1212
import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator;
1313
import static org.mockito.ArgumentMatchers.any;
14+
import static org.mockito.ArgumentMatchers.anyString;
1415
import static org.mockito.Mockito.clearInvocations;
1516
import static org.mockito.Mockito.doNothing;
1617
import static org.mockito.Mockito.mock;
@@ -34,6 +35,7 @@
3435
import com.linkedin.venice.client.store.ClientFactory;
3536
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
3637
import com.linkedin.venice.exceptions.VeniceException;
38+
import com.linkedin.venice.helix.ZkClientFactory;
3739
import com.linkedin.venice.meta.ClusterInfoProvider;
3840
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
3941
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
@@ -43,16 +45,19 @@
4345
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
4446
import com.linkedin.venice.serialization.avro.SchemaPresenceChecker;
4547
import com.linkedin.venice.service.ICProvider;
48+
import com.linkedin.venice.stats.ZkClientStatusStats;
4649
import com.linkedin.venice.utils.VeniceProperties;
4750
import io.tehuti.metrics.MetricsRepository;
4851
import java.util.Optional;
4952
import java.util.Properties;
5053
import java.util.concurrent.ExecutionException;
5154
import java.util.concurrent.TimeUnit;
5255
import java.util.concurrent.TimeoutException;
56+
import org.apache.helix.zookeeper.impl.client.ZkClient;
5357
import org.mockito.MockedConstruction;
5458
import org.mockito.MockedStatic;
5559
import org.testng.annotations.AfterMethod;
60+
import org.testng.annotations.BeforeClass;
5661
import org.testng.annotations.BeforeMethod;
5762
import org.testng.annotations.DataProvider;
5863
import org.testng.annotations.Test;
@@ -67,6 +72,14 @@ public class DaVinciBackendTest {
6772
private MockedConstruction<VeniceMetadataRepositoryBuilder> mockMetadataBuilder;
6873
private MockedConstruction<SchemaPresenceChecker> mockSchemaPresenceChecker;
6974

75+
@BeforeClass
76+
public void init() {
77+
MockedStatic<ZkClientFactory> mockZkFactory = mockStatic(ZkClientFactory.class);
78+
ZkClient mockZkClient = mock(ZkClient.class);
79+
mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient);
80+
doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class));
81+
}
82+
7083
@BeforeMethod
7184
public void setUp() throws Exception {
7285
ClientConfig clientConfig = new ClientConfig(STORE_NAME).setVeniceURL("http://localhost:7777")

0 commit comments

Comments
 (0)