From 6c291ab87e433a39d293a65db04a0dce5826d7b9 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Tue, 14 Oct 2025 13:34:34 -0700 Subject: [PATCH 1/5] Add test Change the protocolMap to ConcurrentHashMap Fix spotbug Fix spotbug --- .../java/com/linkedin/venice/ConfigKeys.java | 7 + .../avro/AvroProtocolDefinition.java | 2 +- .../avro/InternalAvroSpecificSerializer.java | 17 +- .../avro/KafkaValueSerializer.java | 2 +- .../java/com/linkedin/venice/utils/Utils.java | 12 +- ...onWithProtocolRollbackIntegrationTest.java | 325 ++++++++++++++++++ ...TestAdminOperationWithPreviousVersion.java | 3 +- .../VeniceControllerClusterConfig.java | 10 + .../VeniceControllerMultiClusterConfig.java | 4 + .../venice/controller/VeniceHelixAdmin.java | 16 + .../controller/VeniceParentHelixAdmin.java | 6 +- .../SystemSchemaInitializationRoutine.java | 28 +- .../init/SystemStoreInitializationHelper.java | 26 +- .../kafka/consumer/AdminConsumptionTask.java | 8 + .../serializer/AdminOperationSerializer.java | 45 ++- .../admin/AdminOperationSerializerTest.java | 46 ++- 16 files changed, 529 insertions(+), 28 deletions(-) create mode 100644 internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionWithProtocolRollbackIntegrationTest.java diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index fa0f931e63e..63853d2f4b3 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -526,6 +526,13 @@ private ConfigKeys() { public static final String CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED = "child.controller.admin.topic.consumption.enabled"; + /** + * Whether to enable admin operation system store or not. + * If yes, controller will register admin operation system store and process admin operations in rollback cases. + */ + public static final String CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED = + "controller.admin.operation.system.store.enabled"; + /** * This config defines the source region of aggregate hybrid store real-time data when native replication is enabled */ diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java index 0cc335bdf3e..f9028db0a6f 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java @@ -303,7 +303,7 @@ public InternalAvroSpecificSerializer getSerialize if (magicByte.isPresent() || protocolVersionStoredInHeader) { return new InternalAvroSpecificSerializer<>(this); } - return new InternalAvroSpecificSerializer<>(this, 0); + return new InternalAvroSpecificSerializer<>(this, 0, this.getCurrentProtocolVersionSchema()); } public int getCurrentProtocolVersion() { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java index 2b0fff12b01..35be6cc096a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java @@ -104,18 +104,26 @@ private static class ReusableObjects { private final BiConsumer newSchemaEncountered; + protected InternalAvroSpecificSerializer( + AvroProtocolDefinition protocolDef, + Integer payloadOffsetOverride, + Schema compiledProtocolSchema) { + this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {}, compiledProtocolSchema); + } + protected InternalAvroSpecificSerializer(AvroProtocolDefinition protocolDef) { this(protocolDef, null); } protected InternalAvroSpecificSerializer(AvroProtocolDefinition protocolDef, Integer payloadOffsetOverride) { - this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {}); + this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {}, protocolDef.getCurrentProtocolVersionSchema()); } protected InternalAvroSpecificSerializer( AvroProtocolDefinition protocolDef, Integer payloadOffsetOverride, - BiConsumer newSchemaEncountered) { + BiConsumer newSchemaEncountered, + Schema compiledProtocolSchema) { // Magic byte handling if (protocolDef.getMagicByte().isPresent()) { this.magicByte = protocolDef.getMagicByte().get(); @@ -125,6 +133,8 @@ protected InternalAvroSpecificSerializer( this.MAGIC_BYTE_LENGTH = 0; } + this.compiledProtocol = compiledProtocolSchema; + // Protocol version handling this.PROTOCOL_VERSION_OFFSET = MAGIC_BYTE_OFFSET + MAGIC_BYTE_LENGTH; if (protocolDef.protocolVersionStoredInHeader) { @@ -156,9 +166,8 @@ protected InternalAvroSpecificSerializer( } this.PAYLOAD_OFFSET = payloadOffsetOverride; } - this.compiledProtocol = protocolDef.getCurrentProtocolVersionSchema(); - Map protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDef); + Map protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDef, compiledProtocolSchema); int minimumSchemaId = protocolSchemaMap.keySet() .stream() .min(Integer::compareTo) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java index 6813fb8bc24..b07b0f84692 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java @@ -20,6 +20,6 @@ public KafkaValueSerializer() { } public KafkaValueSerializer(BiConsumer newSchemaEncountered) { - super(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, null, newSchemaEncountered); + super(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, null, newSchemaEncountered, null); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java index 603421d9117..5dc5af42732 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java @@ -442,6 +442,12 @@ public static Schema getSchemaFromResource(String resourcePath) throws IOExcepti } public static Map getAllSchemasFromResources(AvroProtocolDefinition protocolDef) { + return getAllSchemasFromResources(protocolDef, protocolDef.getCurrentProtocolVersionSchema()); + } + + public static Map getAllSchemasFromResources( + AvroProtocolDefinition protocolDef, + Schema compiledSchema) { final int SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA = InternalAvroSpecificSerializer.SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA; final int SENTINEL_PROTOCOL_VERSION_USED_FOR_UNVERSIONED_PROTOCOL = @@ -461,7 +467,6 @@ public static Map getAllSchemasFromResources(AvroProtocolDefini } byte compiledProtocolVersion = SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA; - String className = protocolDef.getClassName(); Map protocolSchemaMap = new TreeMap<>(); int initialVersion; if (currentProtocolVersion > 0) { @@ -471,6 +476,7 @@ public static Map getAllSchemasFromResources(AvroProtocolDefini } final String sep = "/"; // TODO: Make sure that jar resources are always forward-slash delimited, even on Windows int version = initialVersion; + String className = protocolDef.getClassName(); while (true) { String versionPath = "avro" + sep; if (currentProtocolVersion != SENTINEL_PROTOCOL_VERSION_USED_FOR_UNVERSIONED_PROTOCOL) { @@ -480,7 +486,7 @@ public static Map getAllSchemasFromResources(AvroProtocolDefini try { Schema schema = Utils.getSchemaFromResource(versionPath); protocolSchemaMap.put(version, schema); - if (schema.equals(protocolDef.getCurrentProtocolVersionSchema())) { + if (schema.equals(compiledSchema)) { compiledProtocolVersion = (byte) version; break; } @@ -523,7 +529,7 @@ public static Map getAllSchemasFromResources(AvroProtocolDefini if (intendedCurrentProtocol == null) { throw new VeniceException( "Failed to get schema for current version: " + currentProtocolVersion + " class: " + className); - } else if (!intendedCurrentProtocol.equals(protocolDef.getCurrentProtocolVersionSchema())) { + } else if (!intendedCurrentProtocol.equals(compiledSchema)) { throw new VeniceException( "The intended protocol version (" + currentProtocolVersion + ") does not match the compiled protocol version (" + compiledProtocolVersion + ")."); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionWithProtocolRollbackIntegrationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionWithProtocolRollbackIntegrationTest.java new file mode 100644 index 00000000000..6057d7cf92c --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionWithProtocolRollbackIntegrationTest.java @@ -0,0 +1,325 @@ +package com.linkedin.venice.controller.kafka.consumer; + +import static com.linkedin.venice.ConfigKeys.*; +import static com.linkedin.venice.utils.AvroSchemaUtils.*; + +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.avroutil1.compatibility.FieldBuilder; +import com.linkedin.venice.controller.VeniceParentHelixAdmin; +import com.linkedin.venice.controller.kafka.AdminTopicUtils; +import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation; +import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta; +import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation; +import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore; +import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType; +import com.linkedin.venice.controller.kafka.protocol.enums.SchemaType; +import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.MultiSchemaResponse; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.manager.TopicManager; +import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.utils.ConfigCommonUtils; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Integration test for AdminConsumptionTask to verify that it can handle protocol rollback scenario. + * Scenario: + * 1. Parent has a new schema and register it to schema system store. We roll-out this new schema to all controllers. + * ProtocolAutoDetectionService will mark good protocol version to use the new schema. + * 2. An AdminOperation message (UpdateStore) is serialized with the new schema and sent to admin topic. + * 3. Oops, we need to rollback to previous version with old schema :(. + * 4. Perform rollback in parent and child controllers. + * 5. However, the message in admin topic is having the new schema id, which will cause deserialization failure if + * the AdminConsumptionTask only relies on local schema cache, which then blocks further admin consumption. + * 6. To fix this, AdminConsumptionTask should be able to fetch the new schema from schema system store and proceed to consume the message. + */ +@Test +public class AdminConsumptionWithProtocolRollbackIntegrationTest { + private static final int TIMEOUT = 1 * Time.MS_PER_MINUTE; + + private AdminOperationSerializer adminOperationSerializer; + + private static final String owner = "test_owner"; + private static final String keySchema = "\"string\""; + private static final String valueSchema = "\"string\""; + private static final int adminConsumptionMaxWorkerPoolSize = 3; + + private VeniceTwoLayerMultiRegionMultiClusterWrapper venice; + private ControllerClient parentControllerClient; + private VeniceParentHelixAdmin parentHelixAdmin; + private VeniceWriter writer; + private String clusterName; + private int executionId = 0; + + @BeforeClass + public void setUp() { + Properties serverProperties = new Properties(); + Properties parentControllerProps = new Properties(); + parentControllerProps.put(ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE, adminConsumptionMaxWorkerPoolSize); + parentControllerProps.put(ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS, 3000); + parentControllerProps.put(CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED, true); + + Properties childControllerProps = new Properties(); + childControllerProps.put(CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED, true); + venice = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(1) + .numberOfClusters(1) + .numberOfParentControllers(1) + .numberOfChildControllers(1) + .numberOfServers(1) + .numberOfRouters(1) + .replicationFactor(1) + .parentControllerProperties(parentControllerProps) + .childControllerProperties(childControllerProps) + .serverProperties(serverProperties) + .build()); + + VeniceControllerWrapper parentController = venice.getParentControllers().get(0); + parentControllerClient = new ControllerClient(venice.getClusterNames()[0], parentController.getControllerUrl()); + clusterName = venice.getClusterNames()[0]; + parentHelixAdmin = (VeniceParentHelixAdmin) parentController.getVeniceAdmin(); + PubSubTopicRepository pubSubTopicRepository = parentHelixAdmin.getPubSubTopicRepository(); + TopicManager topicManager = parentHelixAdmin.getTopicManager(); + PubSubTopic adminTopic = pubSubTopicRepository.getTopic(AdminTopicUtils.getTopicNameFromClusterName(clusterName)); + topicManager.createTopic(adminTopic, 1, 1, true); + PubSubBrokerWrapper pubSubBrokerWrapper = venice.getParentKafkaBrokerWrapper(); + + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = + pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); + writer = IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory) + .createVeniceWriter(new VeniceWriterOptions.Builder(adminTopic.getName()).build()); + adminOperationSerializer = parentHelixAdmin.getAdminOperationSerializer(); + } + + @AfterClass + public void cleanUp() { + venice.close(); + } + + @Test + public void testAdminConsumptionTaskWithProtocolRollback() { + // Create a new schema and register it + int newSchemaId = createNewSchemaAndRegister(AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); + + // Create unique store name + String storeName = Utils.getUniqueString("test-store"); + + // Create store + byte[] goodMessage = getStoreCreationMessage( + clusterName, + storeName, + owner, + keySchema, + valueSchema, + nextExecutionId(), + AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); + writer.put(new byte[0], goodMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); + + TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> { + Assert.assertFalse(parentControllerClient.getStore(storeName).isError()); + }); + + // Update store command, which is serialized with new schema + byte[] updateStoreMessage = getStoreUpdateMessage(clusterName, storeName, owner, nextExecutionId(), newSchemaId); + + // Rollback to old schema + adminOperationSerializer.removeSchema(newSchemaId); + + // Send in the serialized message with new schema id + // This message should be consumed successfully since the AdminConsumptionTask should be able to + // fetch the new schema from schema system store + writer.put(new byte[0], updateStoreMessage, newSchemaId); + + TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> { + Assert.assertFalse(parentControllerClient.getStore(storeName).isError()); + StoreInfo storeInfo = parentControllerClient.getStore(storeName).getStore(); + Assert.assertTrue(storeInfo.isEnableStoreWrites()); + Assert.assertTrue(storeInfo.isEnableStoreReads()); + }); + + } + + private void addSchemaToSchemaSystemStore(int schemaId, Schema schema) { + String adminOperationSchemaStoreName = AvroProtocolDefinition.ADMIN_OPERATION.getSystemStoreName(); + TestUtils.waitForNonDeterministicAssertion(TIMEOUT * 5, TimeUnit.MILLISECONDS, () -> { + MultiSchemaResponse multiSchemaResponse = parentControllerClient.getAllValueSchema(adminOperationSchemaStoreName); + assert multiSchemaResponse.getSchemas().length == schemaId - 1; + }); + + String clusterName = parentControllerClient.getStore(adminOperationSchemaStoreName).getCluster(); + + parentHelixAdmin.getVeniceHelixAdmin() + .addValueSchema( + clusterName, + adminOperationSchemaStoreName, + schema.toString(), + schemaId, + DirectionalSchemaCompatibilityType.NONE); + + TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> { + String fetchedSchema = + parentControllerClient.getValueSchema(adminOperationSchemaStoreName, schemaId).getSchemaStr(); + assert fetchedSchema.equals(schema.toString()); + }); + } + + /** + * Create a new schema by adding a new field to the original schema + * and register it to schema system store with new schema id + */ + private int createNewSchemaAndRegister(int originalSchemaId) { + Schema schema = adminOperationSerializer.getSchema(originalSchemaId); + + int newSchemaId = originalSchemaId + 1; + Schema newSchema = createNewSchema(schema); + // We use the same schema and by-pass compatibility check for testing purpose + addSchemaToSchemaSystemStore(newSchemaId, newSchema); + + adminOperationSerializer.addSchema(newSchemaId, newSchema); + return newSchemaId; + } + + /** + * Create a new schema by adding a new field to the original schema + */ + private Schema createNewSchema(final Schema originalSchema) { + if (originalSchema == null) { + throw new IllegalArgumentException("Original schema cannot be null"); + } + + // Create a new record schema with defensive copies + final Schema newSchema = Schema.createRecord( + originalSchema.getName(), + originalSchema.getDoc(), + originalSchema.getNamespace(), + originalSchema.isError()); + + // Deep copy fields + final List newFields = new ArrayList<>(); + for (Schema.Field field: originalSchema.getFields()) { + // Defensive copy of schema (Avro schemas are immutable, so this is safe) + FieldBuilder newField = deepCopySchemaFieldWithoutFieldProps(field); + copyFieldProperties(newField, field); + newFields.add(newField.build()); + } + + // Add extra field + final Schema.Field testField = AvroCompatibilityHelper + .createSchemaField("testField", Schema.create(Schema.Type.INT), "Documentation for testField", 0); + newFields.add(testField); + + // Set fields using defensive copy + newSchema.setFields(Collections.unmodifiableList(new ArrayList<>(newFields))); + + return newSchema; + } + + /** + * Create a StoreCreation admin message + */ + private byte[] getStoreCreationMessage( + String clusterName, + String storeName, + String owner, + String keySchema, + String valueSchema, + long executionId, + int writerSchemaId) { + StoreCreation storeCreation = (StoreCreation) AdminMessageType.STORE_CREATION.getNewInstance(); + storeCreation.clusterName = clusterName; + storeCreation.storeName = storeName; + storeCreation.owner = owner; + storeCreation.keySchema = new SchemaMeta(); + storeCreation.keySchema.definition = keySchema; + storeCreation.keySchema.schemaType = SchemaType.AVRO_1_4.getValue(); + storeCreation.valueSchema = new SchemaMeta(); + storeCreation.valueSchema.definition = valueSchema; + storeCreation.valueSchema.schemaType = SchemaType.AVRO_1_4.getValue(); + AdminOperation adminMessage = new AdminOperation(); + adminMessage.operationType = AdminMessageType.STORE_CREATION.getValue(); + adminMessage.payloadUnion = storeCreation; + adminMessage.executionId = executionId; + return adminOperationSerializer.serialize(adminMessage, writerSchemaId); + } + + /** + * Create an UpdateStore admin message + */ + private byte[] getStoreUpdateMessage( + String clusterName, + String storeName, + String owner, + long executionId, + int writerSchemaId) { + UpdateStore updateStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance(); + updateStore.clusterName = clusterName; + updateStore.storeName = storeName; + updateStore.owner = owner; + updateStore.partitionNum = 3; + updateStore.currentVersion = AdminConsumptionTask.IGNORED_CURRENT_VERSION; + updateStore.enableReads = true; + updateStore.enableWrites = true; + updateStore.replicateAllConfigs = true; + updateStore.updatedConfigsList = Collections.emptyList(); + AdminOperation adminMessage = new AdminOperation(); + adminMessage.operationType = AdminMessageType.UPDATE_STORE.getValue(); + adminMessage.payloadUnion = updateStore; + adminMessage.executionId = executionId; + updateStore.storeLifecycleHooks = Collections.emptyList(); + updateStore.blobTransferInServerEnabled = ConfigCommonUtils.ActivationState.NOT_SPECIFIED.name(); + updateStore.keyUrnFields = Collections.emptyList(); + return adminOperationSerializer.serialize(adminMessage, writerSchemaId); + } + + private int nextExecutionId() { + return executionId++; + } + + private static FieldBuilder deepCopySchemaFieldWithoutFieldProps(Schema.Field field) { + FieldBuilder fieldBuilder = AvroCompatibilityHelper.newField(null) + .setName(field.name()) + .setSchema(field.schema()) + .setDoc(field.doc()) + .setOrder(field.order()); + // set default as AvroCompatibilityHelper builder might drop defaults if there is type mismatch + if (field.hasDefaultValue()) { + fieldBuilder.setDefault(getFieldDefault(field)); + } + return fieldBuilder; + } + + private static void copyFieldProperties(FieldBuilder fieldBuilder, Schema.Field field) { + AvroCompatibilityHelper.getAllPropNames(field).forEach(k -> { + String propValue = AvroCompatibilityHelper.getFieldPropAsJsonString(field, k); + if (propValue != null) { + fieldBuilder.addProp(k, propValue); + } + }); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestAdminOperationWithPreviousVersion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestAdminOperationWithPreviousVersion.java index 2318f4fe825..ce210cbea51 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestAdminOperationWithPreviousVersion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestAdminOperationWithPreviousVersion.java @@ -116,7 +116,8 @@ public class TestAdminOperationWithPreviousVersion { static final int LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION = AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION; static final Schema LATEST_SCHEMA = AdminOperation.getClassSchema(); static final int PREVIOUS_SCHEMA_ID_FOR_ADMIN_OPERATION = LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION - 1; - static final Schema PREVIOUS_SCHEMA = AdminOperationSerializer.getSchema(PREVIOUS_SCHEMA_ID_FOR_ADMIN_OPERATION); + static final Schema PREVIOUS_SCHEMA = + new AdminOperationSerializer().getSchema(PREVIOUS_SCHEMA_ID_FOR_ADMIN_OPERATION); private static final Set NEW_UNION_ENTRIES = getNewUnionEntries(); private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index 6149e1e2e75..6f3515d0150 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -670,6 +670,10 @@ public class VeniceControllerClusterConfig { private final Set activeActiveRealTimeSourceFabrics; private final boolean isSkipHybridStoreRTTopicCompactionPolicyUpdateEnabled; + /** + * Admin operation system store + */ + private final boolean isAdminOperationSystemStoreEnabled; public VeniceControllerClusterConfig(VeniceProperties props) { this.props = props; @@ -1259,6 +1263,8 @@ public VeniceControllerClusterConfig(VeniceProperties props) { props.getBoolean(CONTROLLER_BACKUP_VERSION_REPLICA_REDUCTION_ENABLED, false); this.useMultiRegionRealTimeTopicSwitcher = props.getBoolean(ConfigKeys.CONTROLLER_USE_MULTI_REGION_REAL_TIME_TOPIC_SWITCHER_ENABLED, false); + this.isAdminOperationSystemStoreEnabled = + props.getBoolean(ConfigKeys.CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED, false); this.logClusterConfig(); } @@ -2215,6 +2221,10 @@ public boolean isSkipHybridStoreRTTopicCompactionPolicyUpdateEnabled() { return isSkipHybridStoreRTTopicCompactionPolicyUpdateEnabled; } + public boolean isAdminOperationSystemStoreEnabled() { + return isAdminOperationSystemStoreEnabled; + } + /** * A function that would put a k/v pair into a map with some processing works. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java index ef79b86bd57..8a9108b203c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java @@ -275,6 +275,10 @@ public boolean isZkSharedMetaSystemSchemaStoreAutoCreationEnabled() { return getCommonConfig().isZkSharedMetaSystemSchemaStoreAutoCreationEnabled(); } + public boolean isAdminOperationSystemStoreEnabled() { + return getCommonConfig().isAdminOperationSystemStoreEnabled(); + } + public boolean isZkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled() { return getCommonConfig().isZkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 22fd176e97c..c85bb3ba047 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -64,6 +64,7 @@ import com.linkedin.venice.controller.kafka.StoreStatusDecider; import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService; import com.linkedin.venice.controller.kafka.consumer.AdminMetadata; +import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation; import com.linkedin.venice.controller.kafka.protocol.admin.HybridStoreConfigRecord; import com.linkedin.venice.controller.kafka.protocol.admin.StoreViewConfigRecord; import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer; @@ -699,6 +700,17 @@ public VeniceHelixAdmin( AvroProtocolDefinition.SERVER_STORE_PROPERTIES_PAYLOAD, multiClusterConfigs, this)); + if (multiClusterConfigs.isAdminOperationSystemStoreEnabled()) { + initRoutines.add( + new SystemSchemaInitializationRoutine( + AvroProtocolDefinition.ADMIN_OPERATION, + multiClusterConfigs, + this, + Optional.empty(), + Optional.empty(), + false, + AdminOperation.class)); + } if (multiClusterConfigs.isZkSharedMetaSystemSchemaStoreAutoCreationEnabled()) { // Add routine to create zk shared metadata system store @@ -1109,6 +1121,10 @@ protected HelixAdmin getHelixAdmin() { return this.admin; } + public boolean isAdminOperationSystemStoreEnabled() { + return multiClusterConfigs.isAdminOperationSystemStoreEnabled(); + } + /** * Create a new ZK store and its configuration in the store repository and create schemas in the schema repository. * @param clusterName Venice cluster where the store locates. diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 1117d072a24..25b7e225971 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -703,7 +703,7 @@ void sendAdminMessageAndWaitForConsumed(String clusterName, String storeName, Ad // Validate message before acquiring execution id int writerSchemaId = getWriterSchemaIdFromZK(clusterName); - AdminOperationSerializer.validate(message, writerSchemaId); + adminOperationSerializer.validate(message, writerSchemaId); // Acquire execution id, any exception thrown after this point will result to a missing execution id. AdminCommandExecutionTracker adminCommandExecutionTracker = adminCommandExecutionTrackers.get(clusterName); @@ -6373,4 +6373,8 @@ public StoreDeletedValidation validateStoreDeleted(String clusterName, String st return result; } + @VisibleForTesting + public AdminOperationSerializer getAdminOperationSerializer() { + return adminOperationSerializer; + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java index 16b63d89bb8..031540e5a11 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java @@ -20,6 +20,8 @@ import java.util.Map; import java.util.Optional; import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -29,6 +31,7 @@ public class SystemSchemaInitializationRoutine implements ClusterLeaderInitializ private static final String DEFAULT_KEY_SCHEMA_STR = "\"int\""; private final AvroProtocolDefinition protocolDefinition; + private final Schema protocolSchema; private final VeniceControllerMultiClusterConfig multiClusterConfigs; private final VeniceHelixAdmin admin; private final Optional keySchema; @@ -39,7 +42,7 @@ public SystemSchemaInitializationRoutine( AvroProtocolDefinition protocolDefinition, VeniceControllerMultiClusterConfig multiClusterConfigs, VeniceHelixAdmin admin) { - this(protocolDefinition, multiClusterConfigs, admin, Optional.empty(), Optional.empty(), false); + this(protocolDefinition, multiClusterConfigs, admin, Optional.empty(), Optional.empty(), false, null); } public SystemSchemaInitializationRoutine( @@ -49,7 +52,28 @@ public SystemSchemaInitializationRoutine( Optional keySchema, Optional storeMetadataUpdate, boolean autoRegisterDerivedComputeSchema) { + this( + protocolDefinition, + multiClusterConfigs, + admin, + keySchema, + storeMetadataUpdate, + autoRegisterDerivedComputeSchema, + null); + } + + public SystemSchemaInitializationRoutine( + AvroProtocolDefinition protocolDefinition, + VeniceControllerMultiClusterConfig multiClusterConfigs, + VeniceHelixAdmin admin, + Optional keySchema, + Optional storeMetadataUpdate, + boolean autoRegisterDerivedComputeSchema, + Class specificRecordClass) { this.protocolDefinition = protocolDefinition; + this.protocolSchema = specificRecordClass != null + ? SpecificData.get().getSchema(specificRecordClass) + : protocolDefinition.getCurrentProtocolVersionSchema(); this.multiClusterConfigs = multiClusterConfigs; this.admin = admin; this.keySchema = keySchema; @@ -65,7 +89,7 @@ public void execute(String clusterToInit) { String intendedCluster = multiClusterConfigs.getSystemSchemaClusterName(); if (intendedCluster.equals(clusterToInit)) { String systemStoreName = protocolDefinition.getSystemStoreName(); - Map protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDefinition); + Map protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDefinition, protocolSchema); // Sanity check to make sure the store is not already created in another cluster. try { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemStoreInitializationHelper.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemStoreInitializationHelper.java index 9dca7d2af3a..ca6ee62b4fd 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemStoreInitializationHelper.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemStoreInitializationHelper.java @@ -43,6 +43,27 @@ public final class SystemStoreInitializationHelper { private SystemStoreInitializationHelper() { } + public static void setupSystemStore( + String clusterName, + String systemStoreName, + AvroProtocolDefinition protocolDefinition, + Schema keySchema, + Function updateStoreCheckSupplier, + UpdateStoreQueryParams updateStoreQueryParams, + Admin admin, + VeniceControllerMultiClusterConfig multiClusterConfigs) { + setupSystemStore( + clusterName, + systemStoreName, + protocolDefinition, + keySchema, + updateStoreCheckSupplier, + updateStoreQueryParams, + admin, + multiClusterConfigs, + protocolDefinition.getCurrentProtocolVersionSchema()); + } + /** * The main function that initializes and configures shared system stores * @param clusterName The cluster where the system store exists @@ -63,9 +84,10 @@ public static void setupSystemStore( Function updateStoreCheckSupplier, UpdateStoreQueryParams updateStoreQueryParams, Admin admin, - VeniceControllerMultiClusterConfig multiClusterConfigs) { + VeniceControllerMultiClusterConfig multiClusterConfigs, + Schema compiledSchema) { LOGGER.info("Setting up system store: {} in cluster: {}", systemStoreName, clusterName); - Map protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDefinition); + Map protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDefinition, compiledSchema); Store store = admin.getStore(clusterName, systemStoreName); String keySchemaString = keySchema != null ? keySchema.toString() : DEFAULT_KEY_SCHEMA_STR; if (store == null) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java index b8bee35c931..31b4156cd84 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java @@ -902,6 +902,9 @@ private Pair extractExecutionIdAndAndAdminOperation(Defaul if (MessageType.PUT == messageType) { Put put = (Put) kafkaValue.payloadUnion; try { + if (admin.isAdminOperationSystemStoreEnabled()) { + deserializer.fetchAndStoreSchemaIfAbsent(admin, put.schemaId); + } adminOperation = deserializer.deserialize(put.putValue, put.schemaId); executionIdFromPayload = adminOperation.executionId; } catch (Exception e) { @@ -1266,4 +1269,9 @@ TopicManager getTopicManager() { long getConsumptionLagUpdateIntervalInMs() { return CONSUMPTION_LAG_UPDATE_INTERVAL_IN_MS; } + + // Visible for testing + AdminOperationSerializer getDeserializer() { + return deserializer; + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java index 81a6f7f95da..c2792f44d3c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java @@ -1,16 +1,18 @@ package com.linkedin.venice.controller.kafka.protocol.serializer; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.venice.annotation.VisibleForTesting; +import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation; import com.linkedin.venice.exceptions.VeniceProtocolException; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -18,7 +20,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.specific.SpecificDatumReader; @@ -30,10 +31,7 @@ public class AdminOperationSerializer { private static final Schema LATEST_SCHEMA = AdminOperation.getClassSchema(); - /** Used to generate decoders. */ - private static final DecoderFactory DECODER_FACTORY = new DecoderFactory(); - - private static final Map PROTOCOL_MAP = initProtocolMap(); + private final Map PROTOCOL_MAP = initProtocolMap(); /** * Serialize AdminOperation object to bytes[] with the writer schema @@ -106,7 +104,7 @@ public AdminOperation deserialize(ByteBuffer byteBuffer, int writerSchemaId) { * Validate the AdminOperation message against the target schema. * @throws VeniceProtocolException if the message does not conform to the target schema. */ - public static void validate(AdminOperation message, int targetSchemaId) { + public void validate(AdminOperation message, int targetSchemaId) { Schema targetSchema = getSchema(targetSchemaId); try { SemanticDetector.traverseAndValidate(message, LATEST_SCHEMA, targetSchema, "AdminOperation", null); @@ -119,7 +117,7 @@ public static void validate(AdminOperation message, int targetSchemaId) { public static Map initProtocolMap() { try { - Map protocolSchemaMap = new HashMap<>(); + Map protocolSchemaMap = new VeniceConcurrentHashMap<>(); for (int i = 1; i <= LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION; i++) { protocolSchemaMap.put(i, Utils.getSchemaFromResource("avro/AdminOperation/v" + i + "/AdminOperation.avsc")); } @@ -129,13 +127,32 @@ public static Map initProtocolMap() { } } - public static Schema getSchema(int schemaId) { + public Schema getSchema(int schemaId) { if (!PROTOCOL_MAP.containsKey(schemaId)) { throw new VeniceProtocolException("Admin operation schema version: " + schemaId + " doesn't exist"); } return PROTOCOL_MAP.get(schemaId); } + /** + * Download schema from system store schema repository and add it to the protocol map if not already present. + * @throws VeniceProtocolException if the schema could not be found in the system store schema repository. + */ + public void fetchAndStoreSchemaIfAbsent(VeniceHelixAdmin admin, int schemaId) { + // No need to download if the schema is already available. + if (PROTOCOL_MAP.containsKey(schemaId)) { + return; + } + String adminOperationSchemaStoreName = AvroProtocolDefinition.ADMIN_OPERATION.getSystemStoreName(); + Schema schema = + admin.getReadOnlyZKSharedSchemaRepository().getValueSchema(adminOperationSchemaStoreName, schemaId).getSchema(); + if (schema == null) { + throw new VeniceProtocolException( + "Could not find AdminOperation schema for schema id: " + schemaId + " in system store schema repository"); + } + PROTOCOL_MAP.put(schemaId, schema); + } + /** * Serialize the object by writer schema */ @@ -154,4 +171,14 @@ private byte[] serialize(T object, Schema writerSchema, int writerSchemaId) e); } } + + @VisibleForTesting + public void addSchema(int schemaId, Schema schema) { + PROTOCOL_MAP.put(schemaId, schema); + } + + @VisibleForTesting + public void removeSchema(int schemaId) { + PROTOCOL_MAP.remove(schemaId); + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java index 4fea23f5193..80e20c225ff 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java @@ -2,31 +2,37 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; +import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType; import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer; import com.linkedin.venice.exceptions.VeniceProtocolException; +import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; +import com.linkedin.venice.schema.SchemaEntry; import java.nio.ByteBuffer; import java.util.Collections; +import org.apache.avro.Schema; import org.mockito.Mockito; import org.testng.annotations.Test; public class AdminOperationSerializerTest { - private AdminOperationSerializer adminOperationSerializer = Mockito.mock(AdminOperationSerializer.class); + private final AdminOperationSerializer adminOperationSerializer = Mockito.mock(AdminOperationSerializer.class); @Test public void testGetSchema() { - expectThrows(VeniceProtocolException.class, () -> AdminOperationSerializer.getSchema(0)); + expectThrows(VeniceProtocolException.class, () -> adminOperationSerializer.getSchema(0)); expectThrows( VeniceProtocolException.class, - () -> AdminOperationSerializer.getSchema(AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION + 1)); + () -> adminOperationSerializer.getSchema(AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION + 1)); } @Test @@ -107,7 +113,7 @@ public void testValidateAdminOperation() { // Serialize the AdminOperation object with writer schema id v74 try { - AdminOperationSerializer.validate(adminMessage, 74); + adminOperationSerializer.validate(adminMessage, 74); } catch (VeniceProtocolException e) { String expectedMessage = "Current schema version: 74. New semantic is being used. Field AdminOperation.payloadUnion.UpdateStore.separateRealTimeTopicEnabled: Boolean value true is not the default value false or false"; @@ -145,4 +151,36 @@ public void testSerializeDeserializeWithDocChange() { assertEquals(deserializedOperationPayloadUnion.versionNum, 1); assertEquals(deserializedOperationPayloadUnion.numberOfPartitions, 20); } + + @Test + public void testDownloadAndSchemaIfNecessary() { + VeniceHelixAdmin mockAdmin = Mockito.mock(VeniceHelixAdmin.class); + Schema mockSchema = Mockito.mock(Schema.class); + SchemaEntry mockSchemaEntry = Mockito.mock(SchemaEntry.class); + // First call returns null, second call returns the mock schema + when(mockSchemaEntry.getSchema()).thenReturn(null).thenReturn(mockSchema); + HelixReadOnlyZKSharedSchemaRepository mockSchemaRepo = Mockito.mock(HelixReadOnlyZKSharedSchemaRepository.class); + when(mockAdmin.getReadOnlyZKSharedSchemaRepository()).thenReturn(mockSchemaRepo); + when(mockSchemaRepo.getValueSchema(anyString(), anyInt())).thenReturn(mockSchemaEntry); + + int nonExistSchemaId = AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION + 1; + doCallRealMethod().when(adminOperationSerializer).fetchAndStoreSchemaIfAbsent(any(), anyInt()); + + try { + // First attempt: schema not found in system store schema repository + adminOperationSerializer.fetchAndStoreSchemaIfAbsent(mockAdmin, nonExistSchemaId); + } catch (VeniceProtocolException e) { + String expectedMessage = "Could not find AdminOperation schema for schema id: " + nonExistSchemaId + + " in system store schema repository"; + assertEquals(e.getMessage(), expectedMessage); + } + + // Second attempt: schema found in system store schema repository + adminOperationSerializer.fetchAndStoreSchemaIfAbsent(mockAdmin, nonExistSchemaId); + // Verify schema is downloaded and added to the protocol map + assertEquals(adminOperationSerializer.getSchema(nonExistSchemaId), mockSchema); + + // Clean up the protocol map by removing the added mock schema + adminOperationSerializer.removeSchema(nonExistSchemaId); + } } From 47b4b4e70099efa2fc9db453da851f602548aec1 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Sat, 8 Nov 2025 09:09:01 -0800 Subject: [PATCH 2/5] Create cacheSchemaMapFromSystemStore to make PROTOCOL_MAP static --- .../serializer/AdminOperationSerializer.java | 28 +++++++++++++------ .../admin/AdminOperationSerializerTest.java | 20 ++++++------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java index c2792f44d3c..8164b2e8bcc 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java @@ -31,7 +31,15 @@ public class AdminOperationSerializer { private static final Schema LATEST_SCHEMA = AdminOperation.getClassSchema(); - private final Map PROTOCOL_MAP = initProtocolMap(); + private static final Map PROTOCOL_MAP = initProtocolMap(); + + /** + * Cache for schemas downloaded from system store schema repository. + * This map is separate from PROTOCOL_MAP to distinguish between built-in schemas and downloaded schemas. + * Built-in schemas are initialized at startup and are immutable. + * Downloaded schemas are downloaded from system store schema repository, and are mutable. + */ + private static final Map cacheSchemaMapFromSystemStore = new VeniceConcurrentHashMap<>(); /** * Serialize AdminOperation object to bytes[] with the writer schema @@ -128,10 +136,14 @@ public static Map initProtocolMap() { } public Schema getSchema(int schemaId) { - if (!PROTOCOL_MAP.containsKey(schemaId)) { - throw new VeniceProtocolException("Admin operation schema version: " + schemaId + " doesn't exist"); + if (PROTOCOL_MAP.containsKey(schemaId)) { + return PROTOCOL_MAP.get(schemaId); } - return PROTOCOL_MAP.get(schemaId); + if (cacheSchemaMapFromSystemStore.containsKey(schemaId)) { + return cacheSchemaMapFromSystemStore.get(schemaId); + } + + throw new VeniceProtocolException("Admin operation schema version: " + schemaId + " doesn't exist"); } /** @@ -140,7 +152,7 @@ public Schema getSchema(int schemaId) { */ public void fetchAndStoreSchemaIfAbsent(VeniceHelixAdmin admin, int schemaId) { // No need to download if the schema is already available. - if (PROTOCOL_MAP.containsKey(schemaId)) { + if (PROTOCOL_MAP.containsKey(schemaId) || cacheSchemaMapFromSystemStore.containsKey(schemaId)) { return; } String adminOperationSchemaStoreName = AvroProtocolDefinition.ADMIN_OPERATION.getSystemStoreName(); @@ -150,7 +162,7 @@ public void fetchAndStoreSchemaIfAbsent(VeniceHelixAdmin admin, int schemaId) { throw new VeniceProtocolException( "Could not find AdminOperation schema for schema id: " + schemaId + " in system store schema repository"); } - PROTOCOL_MAP.put(schemaId, schema); + cacheSchemaMapFromSystemStore.put(schemaId, schema); } /** @@ -174,11 +186,11 @@ private byte[] serialize(T object, Schema writerSchema, int writerSchemaId) @VisibleForTesting public void addSchema(int schemaId, Schema schema) { - PROTOCOL_MAP.put(schemaId, schema); + cacheSchemaMapFromSystemStore.put(schemaId, schema); } @VisibleForTesting public void removeSchema(int schemaId) { - PROTOCOL_MAP.remove(schemaId); + cacheSchemaMapFromSystemStore.remove(schemaId); } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java index 80e20c225ff..9f6d7d4c290 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java @@ -25,10 +25,9 @@ public class AdminOperationSerializerTest { - private final AdminOperationSerializer adminOperationSerializer = Mockito.mock(AdminOperationSerializer.class); - @Test public void testGetSchema() { + AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer(); expectThrows(VeniceProtocolException.class, () -> adminOperationSerializer.getSchema(0)); expectThrows( VeniceProtocolException.class, @@ -37,6 +36,8 @@ public void testGetSchema() { @Test public void testAdminOperationSerializer() { + AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer(); + // Create an AdminOperation object with latest version UpdateStore updateStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance(); updateStore.clusterName = "clusterName"; @@ -57,9 +58,6 @@ public void testAdminOperationSerializer() { adminMessage.payloadUnion = updateStore; adminMessage.executionId = 1; - doCallRealMethod().when(adminOperationSerializer).serialize(any(), anyInt()); - doCallRealMethod().when(adminOperationSerializer).deserialize(any(), anyInt()); - // Serialize the AdminOperation object with writer schema id v74, should not fail byte[] serializedBytes = adminOperationSerializer.serialize(adminMessage, 74); AdminOperation deserializedOperation = adminOperationSerializer.deserialize(ByteBuffer.wrap(serializedBytes), 74); @@ -89,6 +87,8 @@ public void testAdminOperationSerializer() { @Test public void testValidateAdminOperation() { + AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer(); + // Create an AdminOperation object with latest version UpdateStore updateStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance(); updateStore.clusterName = "clusterName"; @@ -108,9 +108,6 @@ public void testValidateAdminOperation() { adminMessage.payloadUnion = updateStore; adminMessage.executionId = 1; - doCallRealMethod().when(adminOperationSerializer).serialize(any(), anyInt()); - doCallRealMethod().when(adminOperationSerializer).deserialize(any(), anyInt()); - // Serialize the AdminOperation object with writer schema id v74 try { adminOperationSerializer.validate(adminMessage, 74); @@ -123,6 +120,8 @@ public void testValidateAdminOperation() { @Test public void testSerializeDeserializeWithDocChange() { + AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer(); + // Create an AdminOperation object with latest version AddVersion addVersion = (AddVersion) AdminMessageType.ADD_VERSION.getNewInstance(); addVersion.clusterName = "clusterName"; @@ -136,9 +135,6 @@ public void testSerializeDeserializeWithDocChange() { adminMessage.payloadUnion = addVersion; adminMessage.executionId = 1; - doCallRealMethod().when(adminOperationSerializer).serialize(any(), anyInt()); - doCallRealMethod().when(adminOperationSerializer).deserialize(any(), anyInt()); - // Serialize the AdminOperation object with writer schema id v24 byte[] serializedBytes = adminOperationSerializer.serialize(adminMessage, 24); @@ -154,6 +150,8 @@ public void testSerializeDeserializeWithDocChange() { @Test public void testDownloadAndSchemaIfNecessary() { + AdminOperationSerializer adminOperationSerializer = Mockito.spy(new AdminOperationSerializer()); + VeniceHelixAdmin mockAdmin = Mockito.mock(VeniceHelixAdmin.class); Schema mockSchema = Mockito.mock(Schema.class); SchemaEntry mockSchemaEntry = Mockito.mock(SchemaEntry.class); From 4d305c2cb0c25deb9492957b310d6ca412d6ac59 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Sat, 8 Nov 2025 09:17:21 -0800 Subject: [PATCH 3/5] Avoid passing null --- .../avro/KafkaValueSerializer.java | 6 +++++- .../venice/controller/VeniceHelixAdmin.java | 4 +++- .../SystemSchemaInitializationRoutine.java | 19 +++++++++++-------- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java index b07b0f84692..710259b7645 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java @@ -20,6 +20,10 @@ public KafkaValueSerializer() { } public KafkaValueSerializer(BiConsumer newSchemaEncountered) { - super(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, null, newSchemaEncountered, null); + super( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, + null, + newSchemaEncountered, + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersionSchema()); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index c85bb3ba047..5d89bca80c8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -288,6 +288,7 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; import org.apache.avro.util.Utf8; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; @@ -701,6 +702,7 @@ public VeniceHelixAdmin( multiClusterConfigs, this)); if (multiClusterConfigs.isAdminOperationSystemStoreEnabled()) { + Schema adminOperationCompiledSchema = SpecificData.get().getSchema(AdminOperation.class); initRoutines.add( new SystemSchemaInitializationRoutine( AvroProtocolDefinition.ADMIN_OPERATION, @@ -709,7 +711,7 @@ public VeniceHelixAdmin( Optional.empty(), Optional.empty(), false, - AdminOperation.class)); + adminOperationCompiledSchema)); } if (multiClusterConfigs.isZkSharedMetaSystemSchemaStoreAutoCreationEnabled()) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java index 031540e5a11..ceffdd80e50 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java @@ -20,8 +20,6 @@ import java.util.Map; import java.util.Optional; import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificData; -import org.apache.avro.specific.SpecificRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,7 +40,14 @@ public SystemSchemaInitializationRoutine( AvroProtocolDefinition protocolDefinition, VeniceControllerMultiClusterConfig multiClusterConfigs, VeniceHelixAdmin admin) { - this(protocolDefinition, multiClusterConfigs, admin, Optional.empty(), Optional.empty(), false, null); + this( + protocolDefinition, + multiClusterConfigs, + admin, + Optional.empty(), + Optional.empty(), + false, + protocolDefinition.getCurrentProtocolVersionSchema()); } public SystemSchemaInitializationRoutine( @@ -59,7 +64,7 @@ public SystemSchemaInitializationRoutine( keySchema, storeMetadataUpdate, autoRegisterDerivedComputeSchema, - null); + protocolDefinition.getCurrentProtocolVersionSchema()); } public SystemSchemaInitializationRoutine( @@ -69,11 +74,9 @@ public SystemSchemaInitializationRoutine( Optional keySchema, Optional storeMetadataUpdate, boolean autoRegisterDerivedComputeSchema, - Class specificRecordClass) { + Schema compiledProtocolSchema) { this.protocolDefinition = protocolDefinition; - this.protocolSchema = specificRecordClass != null - ? SpecificData.get().getSchema(specificRecordClass) - : protocolDefinition.getCurrentProtocolVersionSchema(); + this.protocolSchema = compiledProtocolSchema; this.multiClusterConfigs = multiClusterConfigs; this.admin = admin; this.keySchema = keySchema; From 6efb6ddbc081a5de1a9a55338ab9f8a0ee82b985 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Sat, 8 Nov 2025 09:32:21 -0800 Subject: [PATCH 4/5] Refactor code to return correct err message --- .../serializer/AdminOperationSerializer.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java index 8164b2e8bcc..08819a8d5f3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java @@ -5,6 +5,7 @@ import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation; import com.linkedin.venice.exceptions.VeniceProtocolException; +import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; @@ -33,6 +34,9 @@ public class AdminOperationSerializer { private static final Map PROTOCOL_MAP = initProtocolMap(); + private static final String ADMIN_OPERATION_SYSTEM_STORE_NAME = + AvroProtocolDefinition.ADMIN_OPERATION.getSystemStoreName(); + /** * Cache for schemas downloaded from system store schema repository. * This map is separate from PROTOCOL_MAP to distinguish between built-in schemas and downloaded schemas. @@ -135,6 +139,9 @@ public static Map initProtocolMap() { } } + /** + * Get schema by schema id from either built-in protocol map or system store schema repository cache. + */ public Schema getSchema(int schemaId) { if (PROTOCOL_MAP.containsKey(schemaId)) { return PROTOCOL_MAP.get(schemaId); @@ -155,13 +162,22 @@ public void fetchAndStoreSchemaIfAbsent(VeniceHelixAdmin admin, int schemaId) { if (PROTOCOL_MAP.containsKey(schemaId) || cacheSchemaMapFromSystemStore.containsKey(schemaId)) { return; } - String adminOperationSchemaStoreName = AvroProtocolDefinition.ADMIN_OPERATION.getSystemStoreName(); - Schema schema = - admin.getReadOnlyZKSharedSchemaRepository().getValueSchema(adminOperationSchemaStoreName, schemaId).getSchema(); + HelixReadOnlyZKSharedSchemaRepository zkSharedSchemaRepository = admin.getReadOnlyZKSharedSchemaRepository(); + + boolean schemaExists = zkSharedSchemaRepository.hasValueSchema(ADMIN_OPERATION_SYSTEM_STORE_NAME, schemaId); + + Schema schema = null; + if (schemaExists) { + schema = zkSharedSchemaRepository.getValueSchema(ADMIN_OPERATION_SYSTEM_STORE_NAME, schemaId).getSchema(); + } + if (schema == null) { - throw new VeniceProtocolException( - "Could not find AdminOperation schema for schema id: " + schemaId + " in system store schema repository"); + String msg = "Failed to fetch schema id: " + schemaId + " from system store schema repository" + + (schemaExists ? " even though it exists." : " as it does not exist."); + throw new VeniceProtocolException(msg); } + + // Add the downloaded schema to the cache map cacheSchemaMapFromSystemStore.put(schemaId, schema); } From 81cf88d4006c324bd9f8064a7083ef881aa77166 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Sat, 8 Nov 2025 09:53:21 -0800 Subject: [PATCH 5/5] Update tests --- .../kafka/consumer/AdminConsumptionTask.java | 3 +- .../serializer/AdminOperationSerializer.java | 13 ++++++- .../admin/AdminOperationSerializerTest.java | 39 ++++++++++++++++--- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java index 31b4156cd84..d82e8818d1e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java @@ -902,7 +902,8 @@ private Pair extractExecutionIdAndAndAdminOperation(Defaul if (MessageType.PUT == messageType) { Put put = (Put) kafkaValue.payloadUnion; try { - if (admin.isAdminOperationSystemStoreEnabled()) { + if (admin.isAdminOperationSystemStoreEnabled() + && put.schemaId > AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION) { deserializer.fetchAndStoreSchemaIfAbsent(admin, put.schemaId); } adminOperation = deserializer.deserialize(put.putValue, put.schemaId); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java index 08819a8d5f3..84204aed316 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/protocol/serializer/AdminOperationSerializer.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -113,10 +114,18 @@ public AdminOperation deserialize(ByteBuffer byteBuffer, int writerSchemaId) { } /** - * Validate the AdminOperation message against the target schema. + * Validate the AdminOperation message against the target schema for serialization compatibility. * @throws VeniceProtocolException if the message does not conform to the target schema. */ public void validate(AdminOperation message, int targetSchemaId) { + // We don't support serialization to future schema versions. + // Fail fast in this case. + if (targetSchemaId > LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION) { + throw new VeniceProtocolException( + "Target schema id: " + targetSchemaId + " is greater than the latest schema id: " + + LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION + ". We don't support serialization to future schema versions."); + } + Schema targetSchema = getSchema(targetSchemaId); try { SemanticDetector.traverseAndValidate(message, LATEST_SCHEMA, targetSchema, "AdminOperation", null); @@ -129,7 +138,7 @@ public void validate(AdminOperation message, int targetSchemaId) { public static Map initProtocolMap() { try { - Map protocolSchemaMap = new VeniceConcurrentHashMap<>(); + Map protocolSchemaMap = new HashMap<>(); for (int i = 1; i <= LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION; i++) { protocolSchemaMap.put(i, Utils.getSchemaFromResource("avro/AdminOperation/v" + i + "/AdminOperation.avsc")); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java index 9f6d7d4c290..8c20ac378d3 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/protocol/admin/AdminOperationSerializerTest.java @@ -116,6 +116,14 @@ public void testValidateAdminOperation() { "Current schema version: 74. New semantic is being used. Field AdminOperation.payloadUnion.UpdateStore.separateRealTimeTopicEnabled: Boolean value true is not the default value false or false"; assertEquals(e.getMessage(), expectedMessage); } + + try { + // Validate should fail when target schema id is greater than the latest schema id + adminOperationSerializer + .validate(adminMessage, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION + 1); + } catch (VeniceProtocolException e) { + assertTrue(e.getMessage().contains("We don't support serialization to future schema versions.")); + } } @Test @@ -148,6 +156,12 @@ public void testSerializeDeserializeWithDocChange() { assertEquals(deserializedOperationPayloadUnion.numberOfPartitions, 20); } + /** + * Test downloading and storing schema if it's absent in the protocol map. + * 1st attempt: schema not found in system store schema repository, expect VeniceProtocolException. + * 2nd attempt: schema found in system store schema repository, but error downloading schema, expect VeniceProtocolException. + * 3rd attempt: schema found in system store schema repository, schema downloaded successfully and added to protocol map. + */ @Test public void testDownloadAndSchemaIfNecessary() { AdminOperationSerializer adminOperationSerializer = Mockito.spy(new AdminOperationSerializer()); @@ -155,8 +169,6 @@ public void testDownloadAndSchemaIfNecessary() { VeniceHelixAdmin mockAdmin = Mockito.mock(VeniceHelixAdmin.class); Schema mockSchema = Mockito.mock(Schema.class); SchemaEntry mockSchemaEntry = Mockito.mock(SchemaEntry.class); - // First call returns null, second call returns the mock schema - when(mockSchemaEntry.getSchema()).thenReturn(null).thenReturn(mockSchema); HelixReadOnlyZKSharedSchemaRepository mockSchemaRepo = Mockito.mock(HelixReadOnlyZKSharedSchemaRepository.class); when(mockAdmin.getReadOnlyZKSharedSchemaRepository()).thenReturn(mockSchemaRepo); when(mockSchemaRepo.getValueSchema(anyString(), anyInt())).thenReturn(mockSchemaEntry); @@ -164,16 +176,31 @@ public void testDownloadAndSchemaIfNecessary() { int nonExistSchemaId = AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION + 1; doCallRealMethod().when(adminOperationSerializer).fetchAndStoreSchemaIfAbsent(any(), anyInt()); + // hasValueSchema: 1st call: schema not exists; 2nd call & 3rd call: schema exists + when(mockSchemaRepo.hasValueSchema(anyString(), anyInt())).thenReturn(false).thenReturn(true).thenReturn(true); + + // getSchema First call returns null (for 2nd attempt), second call returns the mock schema (for 3rd attempt) + when(mockSchemaEntry.getSchema()).thenReturn(null).thenReturn(mockSchema); + + // 1st attempt: schema not exist in system store schema repository + try { + adminOperationSerializer.fetchAndStoreSchemaIfAbsent(mockAdmin, nonExistSchemaId); + } catch (VeniceProtocolException e) { + String expectedMessage = "Failed to fetch schema id: " + nonExistSchemaId + + " from system store schema repository as it does not exist."; + assertEquals(e.getMessage(), expectedMessage); + } + + // 2nd attempt: error downloading schema from system store schema repository try { - // First attempt: schema not found in system store schema repository adminOperationSerializer.fetchAndStoreSchemaIfAbsent(mockAdmin, nonExistSchemaId); } catch (VeniceProtocolException e) { - String expectedMessage = "Could not find AdminOperation schema for schema id: " + nonExistSchemaId - + " in system store schema repository"; + String expectedMessage = "Failed to fetch schema id: " + nonExistSchemaId + + " from system store schema repository even though it exists."; assertEquals(e.getMessage(), expectedMessage); } - // Second attempt: schema found in system store schema repository + // 3rd attempt: schema found in system store schema repository adminOperationSerializer.fetchAndStoreSchemaIfAbsent(mockAdmin, nonExistSchemaId); // Verify schema is downloaded and added to the protocol map assertEquals(adminOperationSerializer.getSchema(nonExistSchemaId), mockSchema);