Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
import com.linkedin.venice.meta.SystemStoreAttributes;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
Expand Down Expand Up @@ -308,7 +309,8 @@ public DaVinciBackend(
ingestionService.getVeniceWriterFactory(),
instanceName,
valueSchemaEntry,
updateSchemaEntry);
updateSchemaEntry,
(this::getStore));
}

ingestionService.start();
Expand Down Expand Up @@ -558,6 +560,23 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
return storeRepository;
}

Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method signature uses Object as the return type which is too generic and loses type safety. Based on the implementation, it can return either Store or SystemStoreAttributes (or null). Consider using a more specific return type or union type approach. For example:

  • Change return type to Store and cast SystemStoreAttributes appropriately where used
  • Or document clearly what types can be returned and why Object is necessary

This makes the API unclear for callers who need to know what type to expect and cast to.

Suggested change
/**
* Returns either a {@link Store} or a {@link SystemStoreAttributes} for the given store name.
* <p>
* If the store name refers to a system store, returns the corresponding {@link SystemStoreAttributes}.
* If the store name refers to a user store, returns the corresponding {@link Store}.
* Returns {@code null} if the store is not found.
*
* @param storeName the name of the store
* @return {@link Store}, {@link SystemStoreAttributes}, or {@code null}
*/
@Nullable

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method getStore lacks documentation explaining its purpose, parameters, return value, and the different return types it can produce. Add JavaDoc:

/**
 * Resolves and returns the store object for the given store name.
 * For system stores, this returns the SystemStoreAttributes from the parent user store.
 * For regular stores, this returns the Store object directly.
 * 
 * @param storeName the name of the store to retrieve
 * @return Store object for regular stores, SystemStoreAttributes for system stores, or null if not found
 */
public final Object getStore(String storeName) {
Suggested change
/**
* Resolves and returns the store object for the given store name.
* <p>
* For system stores, this returns the {@link SystemStoreAttributes} from the parent user store.
* For regular stores, this returns the {@link Store} object directly.
* If the store is not found, returns {@code null}.
*
* @param storeName the name of the store to retrieve
* @return {@link Store} for regular stores, {@link SystemStoreAttributes} for system stores, or {@code null} if not found
*/

Copilot uses AI. Check for mistakes.
public final Object getStore(String storeName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic scattered in here is not great IMO.
I think the main purpose to extract push status store's RT version right? Then can we just instead pass ReadOnlyStoreRepository interface into the the PushStatusStoreWriter constructor, and extract the user store object -> extract system store info -> get largest RT version when preparing VW? I think this will make the logic hidden inside the corresponding object.
Same comment goes to MetaStoreWriter.

VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
if (systemStoreType != null) {
String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName);
Store userStore = storeRepository.getStore(userStoreName);
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing null check before calling getSystemStores(). If userStore is null, this will throw a NullPointerException. Add a null check:

if (userStore == null) {
  return null;
}
Map<String, SystemStoreAttributes> systemStores = userStore.getSystemStores();
Suggested change
Store userStore = storeRepository.getStore(userStoreName);
Store userStore = storeRepository.getStore(userStoreName);
if (userStore == null) {
return null;
}

Copilot uses AI. Check for mistakes.
Map<String, SystemStoreAttributes> systemStores = userStore.getSystemStores();
for (Map.Entry<String, SystemStoreAttributes> systemStoreEntries: systemStores.entrySet()) {
if (storeName.startsWith(systemStoreEntries.getKey())) {
return systemStoreEntries.getValue();
}
}
return null;
} else {
return storeRepository.getStore(storeName);
}
}

public ObjectCacheBackend getObjectCache() {
return cacheBackend.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ private void asyncStart() {
ingestionService.getVeniceWriterFactory(),
instance.getNodeId(),
valueSchemaEntry,
updateSchemaEntry);
updateSchemaEntry,
helixReadOnlyStoreRepository::getStore);

// Record replica status in Zookeeper.
// Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ public KafkaStoreIngestionService(
zkSharedSchemaRepository.get(),
pubSubTopicRepository,
serverConfig.getMetaStoreWriterCloseTimeoutInMS(),
serverConfig.getMetaStoreWriterCloseConcurrency());
serverConfig.getMetaStoreWriterCloseConcurrency(),
storeName -> metadataRepo.getStore(storeName));
metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() {
@Override
public void handleStoreDeleted(Store store) {
Expand Down Expand Up @@ -1472,7 +1473,8 @@ public InternalDaVinciRecordTransformerConfig getInternalRecordTransformerConfig
return storeNameToInternalRecordTransformerConfig.get(storeName);
}

public void attemptToPrintIngestionInfoFor(String storeName, Integer version, Integer partition, String regionName) {
public void attemptToPrintIngestionInfoFor(Store store, Integer version, Integer partition, String regionName) {
String storeName = store.getName();
try {
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version));
StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName());
Expand All @@ -1494,7 +1496,7 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In
String infoPrefix = "isCurrentVersion: " + (storeIngestionTask.isCurrentVersion()) + "\n";
if (storeIngestionTask.isHybridMode() && partitionConsumptionState.isEndOfPushReceived()
&& partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER) {
ingestingTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(storeName));
ingestingTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store));
}
PubSubTopicPartition ingestingTopicPartition = new PubSubTopicPartitionImpl(ingestingTopic, partition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4262,9 +4262,12 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep
// cluster these metastore writes could be spiky
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
Store metaStore = metaStoreWriter.storeResolver.apply(metaStoreName);
if (metaStore != null) {
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(metaStore));
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
}
}
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ protected void checkAndMaybeLogHeartbeatDelayMap(
heartbeatTs,
currentTimestamp);
kafkaStoreIngestionService.attemptToPrintIngestionInfoFor(
storeName.getKey(),
metadataRepository.getStore(storeName.getKey()),
version.getKey(),
partition.getKey(),
region.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN;
import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
Expand All @@ -34,6 +35,7 @@
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.ZkClientFactory;
import com.linkedin.venice.meta.ClusterInfoProvider;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
Expand All @@ -43,16 +45,19 @@
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.serialization.avro.SchemaPresenceChecker;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.stats.ZkClientStatusStats;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -66,6 +71,15 @@ public class DaVinciBackendTest {
private MockedStatic<ClientFactory> mockClientFactory;
private MockedConstruction<VeniceMetadataRepositoryBuilder> mockMetadataBuilder;
private MockedConstruction<SchemaPresenceChecker> mockSchemaPresenceChecker;
private MockedStatic<ZkClientFactory> mockZkFactory;

@BeforeClass
public void init() {
mockZkFactory = mockStatic(ZkClientFactory.class);
ZkClient mockZkClient = mock(ZkClient.class);
mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient);
doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class));
}

@BeforeMethod
public void setUp() throws Exception {
Expand Down
Loading
Loading