diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/utils/IsolatedIngestionUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/utils/IsolatedIngestionUtils.java index e0581e1874b..75ced2e5f8f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/utils/IsolatedIngestionUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/utils/IsolatedIngestionUtils.java @@ -115,8 +115,6 @@ public class IsolatedIngestionUtils { AvroProtocolDefinition.INGESTION_STORAGE_METADATA.getSerializer(); private static final InternalAvroSpecificSerializer processShutdownCommandSerializer = AvroProtocolDefinition.PROCESS_SHUTDOWN_COMMAND.getSerializer(); - private static final InternalAvroSpecificSerializer ingestionDummyContentSerializer = - ingestionTaskCommandSerializer; private static final InternalAvroSpecificSerializer storeUserPartitionMappingSerializer = AvroProtocolDefinition.LOADED_STORE_USER_PARTITION_MAPPING.getSerializer(); @@ -126,17 +124,17 @@ public class IsolatedIngestionUtils { new AbstractMap.SimpleEntry<>(COMMAND, ingestionTaskCommandSerializer), new AbstractMap.SimpleEntry<>(REPORT, ingestionTaskReportSerializer), new AbstractMap.SimpleEntry<>(METRIC, ingestionMetricsReportSerializer), - new AbstractMap.SimpleEntry<>(HEARTBEAT, ingestionDummyContentSerializer), + new AbstractMap.SimpleEntry<>(HEARTBEAT, ingestionTaskCommandSerializer), new AbstractMap.SimpleEntry<>(UPDATE_METADATA, ingestionStorageMetadataSerializer), new AbstractMap.SimpleEntry<>(SHUTDOWN_COMPONENT, processShutdownCommandSerializer), - new AbstractMap.SimpleEntry<>(GET_LOADED_STORE_USER_PARTITION_MAPPING, ingestionDummyContentSerializer)) + new AbstractMap.SimpleEntry<>(GET_LOADED_STORE_USER_PARTITION_MAPPING, ingestionTaskCommandSerializer)) .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); private static final Map ingestionActionToResponseSerializerMap = Stream.of( new AbstractMap.SimpleEntry<>(COMMAND, ingestionTaskReportSerializer), - new AbstractMap.SimpleEntry<>(REPORT, ingestionDummyContentSerializer), - new AbstractMap.SimpleEntry<>(METRIC, ingestionDummyContentSerializer), + new AbstractMap.SimpleEntry<>(REPORT, ingestionTaskCommandSerializer), + new AbstractMap.SimpleEntry<>(METRIC, ingestionTaskCommandSerializer), new AbstractMap.SimpleEntry<>(HEARTBEAT, ingestionTaskCommandSerializer), new AbstractMap.SimpleEntry<>(UPDATE_METADATA, ingestionTaskReportSerializer), new AbstractMap.SimpleEntry<>(SHUTDOWN_COMPONENT, ingestionTaskReportSerializer), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 978412c877b..f3918a0018d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -421,7 +421,13 @@ public void handleStoreDeleted(Store store) { ? new OptimizedKafkaValueSerializer(newSchemaEncountered) : new OptimizedKafkaValueSerializer(); - kafkaMessageEnvelopeSchemaReader.ifPresent(kafkaValueSerializer::setSchemaReader); + kafkaMessageEnvelopeSchemaReader.ifPresent(reader -> { + LOGGER.info( + "Initialized KME schema reader. Type: {}, Latest value schema ID: {}", + reader.getClass().getSimpleName(), + reader.getLatestValueSchemaId()); + kafkaValueSerializer.setSchemaReader(reader); + }); PubSubMessageDeserializer pubSubDeserializer = new PubSubMessageDeserializer( kafkaValueSerializer, new LandFillObjectPool<>(KafkaMessageEnvelope::new), @@ -1518,4 +1524,7 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In } } + public boolean isKMESchemaReaderPresent() { + return kafkaMessageEnvelopeSchemaReader.isPresent(); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 0504865dd6f..8e26aa6cf74 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -3943,7 +3943,7 @@ private void sendIngestionHeartbeatToVT( originTimeStampMs); } - private CompletableFuture sendIngestionHeartbeat( + CompletableFuture sendIngestionHeartbeat( PartitionConsumptionState partitionConsumptionState, PubSubTopicPartition topicPartition, PubSubProducerCallback callback, @@ -3953,6 +3953,8 @@ private CompletableFuture sendIngestionHeartbeat( LeaderCompleteState leaderCompleteState, long originTimeStampMs) { CompletableFuture heartBeatFuture; + boolean dependentFeatureEnabled = isSystemSchemaInitializationAtStartTimeEnabled() + && getHeartbeatMonitoringService().getKafkaStoreIngestionService().isKMESchemaReaderPresent(); try { heartBeatFuture = partitionConsumptionState.getVeniceWriterLazyRef() .get() @@ -3962,7 +3964,8 @@ private CompletableFuture sendIngestionHeartbeat( leaderMetadataWrapper, addLeaderCompleteState, leaderCompleteState, - originTimeStampMs); + originTimeStampMs, + dependentFeatureEnabled); if (shouldLog) { heartBeatFuture .whenComplete((ignore, throwable) -> logIngestionHeartbeat(topicPartition, (Exception) throwable)); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 627773f9d02..857da81210c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -416,6 +416,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { private final Version version; private boolean skipValidationForSeekableClientEnabled = false; private long lastResubscriptionCheckTimestamp = System.currentTimeMillis(); + private final boolean isSystemSchemaInitializationAtStartTimeEnabled; public StoreIngestionTask( StorageService storageService, @@ -682,6 +683,7 @@ public StoreIngestionTask( this.parallelProcessingThreadPool = builder.getAAWCWorkLoadProcessingThreadPool(); this.hostName = Utils.getHostName() + "_" + storeVersionConfig.getListenerPort(); this.zkHelixAdmin = zkHelixAdmin; + this.isSystemSchemaInitializationAtStartTimeEnabled = serverConfig.isSystemSchemaInitializationAtStartTimeEnabled(); } /** Package-private on purpose, only intended for tests. Do not use for production use cases. */ @@ -5003,6 +5005,10 @@ boolean isGlobalRtDivEnabled() { return isGlobalRtDivEnabled; } + boolean isSystemSchemaInitializationAtStartTimeEnabled() { + return isSystemSchemaInitializationAtStartTimeEnabled; + } + /** When Global RT DIV is enabled the ConsumptionTask's DIV is exclusively used to validate data integrity. */ @VisibleForTesting public DataIntegrityValidator getDataIntegrityValidator() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index 360c82613c9..2120c38a7f4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -889,4 +889,8 @@ String getLocalRegionName() { public void setKafkaStoreIngestionService(KafkaStoreIngestionService kafkaStoreIngestionService) { this.kafkaStoreIngestionService = kafkaStoreIngestionService; } + + public KafkaStoreIngestionService getKafkaStoreIngestionService() { + return kafkaStoreIngestionService; + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index 65054788267..742439b28f6 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -70,6 +70,7 @@ import com.linkedin.venice.pubsub.api.DefaultPubSubMessage; import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -87,6 +88,8 @@ import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.writer.LeaderCompleteState; +import com.linkedin.venice.writer.LeaderMetadataWrapper; import com.linkedin.venice.writer.VeniceWriter; import it.unimi.dsi.fastutil.objects.Object2IntMaps; import java.io.IOException; @@ -523,6 +526,111 @@ private PubSubMessageProcessedResultWrapper getMockMessage(int seqNumber) { return pubSubMessageProcessedResultWrapper; } + @Test + public void testSendIngestionHeartbeatWithDependentFeatures() throws InterruptedException { + setUp(); + + // Mock the HeartbeatMonitoringService and KafkaStoreIngestionService + HeartbeatMonitoringService mockHeartbeatMonitoringService = mock(HeartbeatMonitoringService.class); + KafkaStoreIngestionService mockKafkaStoreIngestionService = mock(KafkaStoreIngestionService.class); + + // Mock the getHeartbeatMonitoringService method to return our mock + doReturn(mockHeartbeatMonitoringService).when(leaderFollowerStoreIngestionTask).getHeartbeatMonitoringService(); + when(mockHeartbeatMonitoringService.getKafkaStoreIngestionService()).thenReturn(mockKafkaStoreIngestionService); + + // Mock VeniceWriter and its sendHeartbeat method + VeniceWriter mockVeniceWriter = mock(VeniceWriter.class); + Lazy> lazyMockWriter = Lazy.of(() -> mockVeniceWriter); + when(mockPartitionConsumptionState.getVeniceWriterLazyRef()).thenReturn(lazyMockWriter); + + CompletableFuture mockFuture = mock(CompletableFuture.class); + when(mockVeniceWriter.sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean())) + .thenReturn(mockFuture); + + // Mock other required objects + PubSubTopicPartition mockTopicPartition = mock(PubSubTopicPartition.class); + PubSubProducerCallback mockCallback = mock(PubSubProducerCallback.class); + LeaderMetadataWrapper mockLeaderMetadataWrapper = mock(LeaderMetadataWrapper.class); + + // Test case 1: Both dependent features enabled - should pass true to sendHeartbeat + doReturn(true).when(leaderFollowerStoreIngestionTask).isSystemSchemaInitializationAtStartTimeEnabled(); + when(mockKafkaStoreIngestionService.isKMESchemaReaderPresent()).thenReturn(true); + + leaderFollowerStoreIngestionTask.sendIngestionHeartbeat( + mockPartitionConsumptionState, + mockTopicPartition, + mockCallback, + mockLeaderMetadataWrapper, + false, // shouldLog + false, // addLeaderCompleteState + LeaderCompleteState.LEADER_NOT_COMPLETED, + System.currentTimeMillis()); + + // Verify that sendHeartbeat was called with dependentFeatureEnabled = true + verify(mockVeniceWriter).sendHeartbeat( + eq(mockTopicPartition), + eq(mockCallback), + eq(mockLeaderMetadataWrapper), + eq(false), // addLeaderCompleteState + eq(LeaderCompleteState.LEADER_NOT_COMPLETED), + anyLong(), // originTimeStampMs + eq(true)); // dependentFeatureEnabled should be true + + // Reset the mock for next test + clearInvocations(mockVeniceWriter); + + // Test case 2: System schema initialization disabled - should pass false to sendHeartbeat + doReturn(false).when(leaderFollowerStoreIngestionTask).isSystemSchemaInitializationAtStartTimeEnabled(); + when(mockKafkaStoreIngestionService.isKMESchemaReaderPresent()).thenReturn(true); + + leaderFollowerStoreIngestionTask.sendIngestionHeartbeat( + mockPartitionConsumptionState, + mockTopicPartition, + mockCallback, + mockLeaderMetadataWrapper, + false, // shouldLog + false, // addLeaderCompleteState + LeaderCompleteState.LEADER_NOT_COMPLETED, + System.currentTimeMillis()); + + // Verify that sendHeartbeat was called with dependentFeatureEnabled = false + verify(mockVeniceWriter).sendHeartbeat( + eq(mockTopicPartition), + eq(mockCallback), + eq(mockLeaderMetadataWrapper), + eq(false), // addLeaderCompleteState + eq(LeaderCompleteState.LEADER_NOT_COMPLETED), + anyLong(), // originTimeStampMs + eq(false)); // dependentFeatureEnabled should be false + + // Reset the mock for next test + clearInvocations(mockVeniceWriter); + + // Test case 3: KME reader disabled - should pass false to sendHeartbeat + doReturn(true).when(leaderFollowerStoreIngestionTask).isSystemSchemaInitializationAtStartTimeEnabled(); + when(mockKafkaStoreIngestionService.isKMESchemaReaderPresent()).thenReturn(false); + + leaderFollowerStoreIngestionTask.sendIngestionHeartbeat( + mockPartitionConsumptionState, + mockTopicPartition, + mockCallback, + mockLeaderMetadataWrapper, + false, // shouldLog + false, // addLeaderCompleteState + LeaderCompleteState.LEADER_NOT_COMPLETED, + System.currentTimeMillis()); + + // Verify that sendHeartbeat was called with dependentFeatureEnabled = false + verify(mockVeniceWriter).sendHeartbeat( + eq(mockTopicPartition), + eq(mockCallback), + eq(mockLeaderMetadataWrapper), + eq(false), // addLeaderCompleteState + eq(LeaderCompleteState.LEADER_NOT_COMPLETED), + anyLong(), // originTimeStampMs + eq(false)); // dependentFeatureEnabled should be false + } + @Test(timeOut = 60_000) public void testIsRecordSelfProduced() throws InterruptedException { setUp(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index adf2d0d38e9..679b068d131 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -2994,7 +2994,8 @@ public void testDelayedTransitionToOnlineInHybridMode(AAConfig aaConfig) throws DEFAULT_LEADER_METADATA_WRAPPER, true, LeaderCompleteState.getLeaderCompleteState(true), - System.currentTimeMillis()); + System.currentTimeMillis(), + true); messageCountPerPartition[partition]++; } @@ -5134,7 +5135,8 @@ public void testMaybeSendIngestionHeartbeat( VeniceWriterFactory veniceWriterFactory = mock(VeniceWriterFactory.class); CompletableFuture heartBeatFuture = new CompletableFuture(); heartBeatFuture.complete(null); - doReturn(heartBeatFuture).when(veniceWriter).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong()); + doReturn(heartBeatFuture).when(veniceWriter) + .sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); doReturn(veniceWriter).when(veniceWriterFactory).createVeniceWriter(any()); doReturn(Lazy.of(() -> veniceWriter)).when(pcs).getVeniceWriterLazyRef(); @@ -5168,9 +5170,9 @@ public void testMaybeSendIngestionHeartbeat( // Second invocation should be skipped since it shouldn't be time for another heartbeat yet. ingestionTask.maybeSendIngestionHeartbeat(); if (hybridConfig == HYBRID && isRealTimeTopic && nodeType == NodeType.LEADER) { - verify(veniceWriter, times(1)).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong()); + verify(veniceWriter, times(1)).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); } else { - verify(veniceWriter, never()).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong()); + verify(veniceWriter, never()).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); } /** @@ -5272,17 +5274,19 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() { PubSubTopicPartition pubSubTopicPartition1sep = new PubSubTopicPartitionImpl(sepRTtopic, 1); // all succeeded - doReturn(heartBeatFuture).when(veniceWriter).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong()); + doReturn(heartBeatFuture).when(veniceWriter) + .sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); AtomicReference> failedPartitions = new AtomicReference<>(null); failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat()); assertEquals(failedPartitions.get().size(), 0); // 1 partition throws exception doReturn(heartBeatFuture).when(veniceWriter) - .sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong()); + .sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); doAnswer(invocation -> { throw new Exception("mock exception"); - }).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition1), any(), any(), anyBoolean(), any(), anyLong()); + }).when(veniceWriter) + .sendHeartbeat(eq(pubSubTopicPartition1), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); // wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat()); @@ -5297,10 +5301,11 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() { // 1 partition throws exception doReturn(heartBeatFuture).when(veniceWriter) - .sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong()); + .sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); doAnswer(invocation -> { throw new Exception("mock exception"); - }).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition1sep), any(), any(), anyBoolean(), any(), anyLong()); + }).when(veniceWriter) + .sendHeartbeat(eq(pubSubTopicPartition1sep), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); // wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat()); @@ -5316,7 +5321,8 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() { // both partition throws exception doAnswer(invocation -> { throw new Exception("mock exception"); - }).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong()); + }).when(veniceWriter) + .sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); // wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java index b230f349832..2fb09149566 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -138,16 +138,17 @@ public void testProcessControlMessage() { KafkaKey kafkaKey = mock(KafkaKey.class); doReturn(KafkaKey.HEART_BEAT.getKey()).when(kafkaKey).getKey(); ComplexVeniceWriter veniceWriter = mock(ComplexVeniceWriter.class); - when(veniceWriter.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong())) + when(veniceWriter.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(null)); doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter) - .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); materializedViewWriter.setVeniceWriter(veniceWriter); KafkaMessageEnvelope kafkaMessageEnvelope = mock(KafkaMessageEnvelope.class); PartitionConsumptionState partitionConsumptionState = mock(PartitionConsumptionState.class); materializedViewWriter .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); - verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + verify(veniceWriter, never()) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()); } @Test diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 27633e669a9..f5b5e31f867 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -1632,11 +1632,10 @@ private CompletableFuture sendMessage( PubSubProducerCallback internalCallback = new SendMessageErrorLoggerCallback(kafkaValue, logger); PubSubProducerCallback outputCallback = setInternalCallback(callback, internalCallback); - PubSubMessageHeaders finalPubSubMessageHeaders = getHeaders( - kafkaValue.getProducerMetadata(), - false, - LeaderCompleteState.LEADER_NOT_COMPLETED, - pubSubMessageHeaders); + // For data messages, considerSkipVtpHeaderForHeartbeat is set to false because it's not a heartbeat message. + boolean addVtpHeader = shouldAddVtpHeader(kafkaValue.getProducerMetadata(), false); + PubSubMessageHeaders finalPubSubMessageHeaders = + getHeaders(false, LeaderCompleteState.LEADER_NOT_COMPLETED, pubSubMessageHeaders, addVtpHeader); try { return producerAdapter .sendMessage(topicName, partition, key, kafkaValue, finalPubSubMessageHeaders, outputCallback); @@ -1682,22 +1681,19 @@ PubSubProducerCallback setInternalCallback( */ private PubSubMessageHeaders getHeaders( - ProducerMetadata producerMetadata, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, - PubSubMessageHeaders headers) { + PubSubMessageHeaders headers, + boolean addVtpHeader) { PubSubMessageHeader viewPartitionHeader = headers.get(VENICE_VIEW_PARTITIONS_MAP_HEADER); - // If the message is the first message in a segment, we need to add the protocol schema headers. - boolean needVtpHeader = - producerMetadata.getSegmentNumber() == 0 && producerMetadata.getMessageSequenceNumber() == 0; // construct PubSubMessageHeaders only if it is needed PubSubMessageHeaders returnPubSubMessageHeaders = (headers instanceof EmptyPubSubMessageHeaders) - && (needVtpHeader || addLeaderCompleteState || viewPartitionHeader != null) + && (addVtpHeader || addLeaderCompleteState || viewPartitionHeader != null) ? new PubSubMessageHeaders() : headers; - if (needVtpHeader && protocolSchemaHeader != null) { + if (addVtpHeader && protocolSchemaHeader != null) { returnPubSubMessageHeaders.add(protocolSchemaHeader); } if (addLeaderCompleteState) { @@ -1710,6 +1706,29 @@ private PubSubMessageHeaders getHeaders( return returnPubSubMessageHeaders; } + private boolean shouldAddVtpHeader(ProducerMetadata producerMetadata, boolean considerSkipVtpHeaderForHeartbeat) { + /** + * If the message is the first message in a segment for data messages, we need to add the protocol schema headers. + * + * For heartbeat messages, we do not add protocol schema headers for the following reasons: + * 1. Heartbeat messages are sent frequently, adding protocol schema headers will increase the + * message size and bandwidth consumption unnecessarily. + * 2. Consumers don't need new protocol schema headers to process heartbeat messages. + * + * When KME schema evolution happens, what if a server with new KME schema deploys first and sends a heartbeat + * before sending any data message? + * + * Server 1 deploys -> Registers KME v2 in system store during startup. (requires isSystemSchemaInitializationAtStartTimeEnabled = true) + * Server 1 sends heartbeat -> Serialized with KME v2 (no VTP header) + * Server 2 receives heartbeat -> Detects unknown protocol version, schema reader fetches KME v2. (requires KME scheme reader to be present) + * Schema cached locally -> Current and future heartbeats deserialize successfully. + * No VTP headers needed -> System-level schema evolution handles compatibility. + */ + + return !considerSkipVtpHeaderForHeartbeat && producerMetadata.getSegmentNumber() == 0 + && producerMetadata.getMessageSequenceNumber() == 0; + } + /** * An interface which enables the key to contain parts of the {@param producerMetadata} within it, which is * useful for control messages and chunked values. @@ -2224,7 +2243,8 @@ public CompletableFuture sendHeartbeat( LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, - long originTimeStampMs) { + long originTimeStampMs, + boolean considerSkipVtpHeaderForHeartbeat) { if (isClosed) { CompletableFuture future = new CompletableFuture<>(); future.completedFuture(null); @@ -2233,16 +2253,15 @@ public CompletableFuture sendHeartbeat( } KafkaMessageEnvelope kafkaMessageEnvelope = getHeartbeatKME(originTimeStampMs, leaderMetadataWrapper, heartBeatMessage, writerId); + + boolean addVtpHeader = + shouldAddVtpHeader(kafkaMessageEnvelope.getProducerMetadata(), considerSkipVtpHeaderForHeartbeat); return producerAdapter.sendMessage( topicPartition.getPubSubTopic().getName(), topicPartition.getPartitionNumber(), KafkaKey.HEART_BEAT, kafkaMessageEnvelope, - getHeaders( - kafkaMessageEnvelope.getProducerMetadata(), - addLeaderCompleteState, - leaderCompleteState, - EmptyPubSubMessageHeaders.SINGLETON), + getHeaders(addLeaderCompleteState, leaderCompleteState, EmptyPubSubMessageHeaders.SINGLETON, addVtpHeader), callback); } @@ -2253,19 +2272,18 @@ public CompletableFuture sendHeartbeat( LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, - long originTimeStampMs) { + long originTimeStampMs, + boolean considerSkipVtpHeaderForHeartbeat) { KafkaMessageEnvelope kafkaMessageEnvelope = getHeartbeatKME(originTimeStampMs, leaderMetadataWrapper, heartBeatMessage, writerId); + boolean addVtpHeader = + shouldAddVtpHeader(kafkaMessageEnvelope.getProducerMetadata(), considerSkipVtpHeaderForHeartbeat); return producerAdapter.sendMessage( topicName, partitionNumber, KafkaKey.HEART_BEAT, kafkaMessageEnvelope, - getHeaders( - kafkaMessageEnvelope.getProducerMetadata(), - addLeaderCompleteState, - leaderCompleteState, - EmptyPubSubMessageHeaders.SINGLETON), + getHeaders(addLeaderCompleteState, leaderCompleteState, EmptyPubSubMessageHeaders.SINGLETON, addVtpHeader), callback); } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterHeartbeatHeaderTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterHeartbeatHeaderTest.java new file mode 100644 index 00000000000..19828b76a4f --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterHeartbeatHeaderTest.java @@ -0,0 +1,142 @@ +package com.linkedin.venice.writer; + +import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; +import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubMessageHeader; +import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; +import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; +import com.linkedin.venice.pubsub.api.PubSubProducerCallback; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.serialization.DefaultSerializer; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.utils.SystemTime; +import com.linkedin.venice.utils.VeniceProperties; +import java.nio.charset.StandardCharsets; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import org.apache.avro.Schema; +import org.mockito.ArgumentCaptor; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class VeniceWriterHeartbeatHeaderTest { + private VeniceWriter veniceWriter; + private PubSubProducerAdapter mockProducerAdapter; + private Schema testSchema; + private PubSubTopicPartition topicPartition; + private PubSubTopicRepository pubSubTopicRepository; + + @BeforeMethod + public void setUp() { + mockProducerAdapter = mock(PubSubProducerAdapter.class); + testSchema = AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersionSchema(); + pubSubTopicRepository = new PubSubTopicRepository(); + topicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test_topic_v1"), 0); + + // Mock producer adapter to return completed future + when(mockProducerAdapter.sendMessage(anyString(), anyInt(), any(), any(), any(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + // Create VeniceWriter with protocol schema override to enable VTP headers + VeniceWriterOptions options = + new VeniceWriterOptions.Builder("test_topic").setKeyPayloadSerializer(new DefaultSerializer()) + .setValuePayloadSerializer(new DefaultSerializer()) + .setWriteComputePayloadSerializer(new DefaultSerializer()) + .setTime(SystemTime.INSTANCE) + .setPartitioner(null) + .setPartitionCount(1) // Set partition count to avoid "Invalid number of partitions: 0" error + .build(); + + // Create test properties for VeniceWriter + Properties testProperties = new Properties(); + testProperties.put(KAFKA_BOOTSTRAP_SERVERS, "localhost:9092"); + + // Override protocol schema to enable VTP headers. + veniceWriter = new VeniceWriter<>(options, new VeniceProperties(testProperties), mockProducerAdapter, testSchema); + } + + @Test + public void testHeartbeatMessageDoesNotIncludeVtpHeader() { + // Arrange + ArgumentCaptor headersCaptor = ArgumentCaptor.forClass(PubSubMessageHeaders.class); + + // Act - Send heartbeat message + veniceWriter.sendHeartbeat( + topicPartition, + mock(PubSubProducerCallback.class), + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + false, + LeaderCompleteState.LEADER_NOT_COMPLETED, + System.currentTimeMillis(), + true); + + // Assert + verify(mockProducerAdapter).sendMessage( + eq("test_topic_v1"), + eq(0), + eq(KafkaKey.HEART_BEAT), + any(KafkaMessageEnvelope.class), + headersCaptor.capture(), + any(PubSubProducerCallback.class)); + + PubSubMessageHeaders capturedHeaders = headersCaptor.getValue(); + + // Verify VTP header is NOT present in heartbeat message + PubSubMessageHeader vtpHeader = capturedHeaders.get(VENICE_TRANSPORT_PROTOCOL_HEADER); + assertNull(vtpHeader, "Heartbeat messages should not include VTP headers"); + } + + @Test + public void testDataMessageIncludesVtpHeaderWhenFirstMessage() { + // Arrange + ArgumentCaptor headersCaptor = ArgumentCaptor.forClass(PubSubMessageHeaders.class); + + // Act - Send first data message (segment=0, sequence=0) + veniceWriter.put( + "test_key".getBytes(StandardCharsets.UTF_8), + "test_value".getBytes(StandardCharsets.UTF_8), + 1, + (PubSubProducerCallback) null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER); + + // Assert - sendMessage may be called multiple times (for views, etc.) + verify(mockProducerAdapter, atLeastOnce()).sendMessage( + anyString(), + anyInt(), + any(KafkaKey.class), + any(KafkaMessageEnvelope.class), + headersCaptor.capture(), + any(PubSubProducerCallback.class)); + + // Check if any of the captured headers contains VTP header + boolean foundVtpHeader = false; + for (PubSubMessageHeaders capturedHeaders: headersCaptor.getAllValues()) { + PubSubMessageHeader vtpHeader = capturedHeaders.get(VENICE_TRANSPORT_PROTOCOL_HEADER); + if (vtpHeader != null) { + foundVtpHeader = true; + // Verify header contains the protocol schema + String schemaString = new String(vtpHeader.value(), StandardCharsets.UTF_8); + assertTrue(schemaString.contains("KafkaMessageEnvelope"), "VTP header should contain KME schema"); + break; + } + } + + assertTrue(foundVtpHeader, "First data message should include VTP header"); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index ca09aa26830..82406872d77 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -492,14 +492,19 @@ public void testCloseSegmentBasedOnElapsedTime() { } } - @DataProvider(name = "Boolean-LeaderCompleteState") - public static Object[][] booleanBooleanCompression() { - return DataProviderUtils - .allPermutationGenerator(DataProviderUtils.BOOLEAN, new Object[] { LEADER_NOT_COMPLETED, LEADER_COMPLETED }); + @DataProvider(name = "Boolean-Boolean-LeaderCompleteState") + public static Object[][] booleanBooleanLeaderCompleteState() { + return DataProviderUtils.allPermutationGenerator( + DataProviderUtils.BOOLEAN, + DataProviderUtils.BOOLEAN, + new Object[] { LEADER_NOT_COMPLETED, LEADER_COMPLETED }); } - @Test(dataProvider = "Boolean-LeaderCompleteState") - public void testSendHeartbeat(boolean addLeaderCompleteHeader, LeaderCompleteState leaderCompleteState) { + @Test(dataProvider = "Boolean-Boolean-LeaderCompleteState") + public void testSendHeartbeat( + boolean considerSkipVtpHeaderForHeartbeat, + boolean addLeaderCompleteHeader, + LeaderCompleteState leaderCompleteState) { PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); CompletableFuture mockedFuture = mock(CompletableFuture.class); when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); @@ -529,7 +534,8 @@ public void testSendHeartbeat(boolean addLeaderCompleteHeader, LeaderCompleteSta DEFAULT_LEADER_METADATA_WRAPPER, addLeaderCompleteHeader, leaderCompleteState, - System.currentTimeMillis()); + System.currentTimeMillis(), + considerSkipVtpHeaderForHeartbeat); } ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); ArgumentCaptor kafkaKeyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class); @@ -556,10 +562,22 @@ public void testSendHeartbeat(boolean addLeaderCompleteHeader, LeaderCompleteSta } for (PubSubMessageHeaders pubSubMessageHeaders: pubSubMessageHeadersArgumentCaptor.getAllValues()) { - assertEquals(pubSubMessageHeaders.toList().size(), addLeaderCompleteHeader ? 2 : 1); + // Calculate expected header count: + // - VTP header: included when considerSkipVtpHeaderForHeartbeat = false + // - Leader complete header: included when addLeaderCompleteHeader = true + int expectedHeaderCount = 0; + if (!considerSkipVtpHeaderForHeartbeat) { + expectedHeaderCount++; // VTP header + } + if (addLeaderCompleteHeader) { + expectedHeaderCount++; // Leader complete header + } + + assertEquals(pubSubMessageHeaders.toList().size(), expectedHeaderCount); + if (addLeaderCompleteHeader) { - // 0: VENICE_TRANSPORT_PROTOCOL_HEADER, 1: VENICE_LEADER_COMPLETION_STATE_HEADER - PubSubMessageHeader leaderCompleteHeader = pubSubMessageHeaders.toList().get(1); + // Find the leader complete header + PubSubMessageHeader leaderCompleteHeader = pubSubMessageHeaders.get(VENICE_LEADER_COMPLETION_STATE_HEADER); assertEquals(leaderCompleteHeader.key(), VENICE_LEADER_COMPLETION_STATE_HEADER); assertEquals(leaderCompleteHeader.value()[0], leaderCompleteState.getValue()); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestServerKMERegistrationFromMessageHeader.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestServerKMERegistrationFromMessageHeader.java index e983ec79d2b..4b6389401d5 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestServerKMERegistrationFromMessageHeader.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestServerKMERegistrationFromMessageHeader.java @@ -1,7 +1,9 @@ package com.linkedin.venice.helix; import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED; +import static com.linkedin.venice.integration.utils.VeniceServerWrapper.CLIENT_CONFIG_FOR_CONSUMER; +import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.VersionCreationResponse; @@ -9,16 +11,22 @@ import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.integration.utils.VeniceServerWrapper; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.KafkaValueSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.writer.LeaderCompleteState; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.HashMap; @@ -27,8 +35,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -50,8 +58,11 @@ public class TestServerKMERegistrationFromMessageHeader { private VeniceKafkaSerializer keySerializer; private VeniceServerWrapper server; private String clusterName; + private KafkaValueSerializer valueSerializer; + private int pushVersion; + private HelixReadWriteSchemaRepository repo; - @BeforeClass + @BeforeMethod(alwaysRun = true) public void setUp() { VeniceClusterCreateOptions options = new VeniceClusterCreateOptions.Builder().numberOfControllers(numOfController) .numberOfServers(0) @@ -66,51 +77,31 @@ public void setUp() { Properties serverProperties = new Properties(); Properties serverFeatureProperties = new Properties(); + + // Enable KME registration from message header. serverProperties.put(KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED, true); + + // Create client config for consumer to enable schema readers. + ClientConfig clientConfig = new ClientConfig().setVeniceURL(cluster.getZk().getAddress()) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setSslFactory(SslUtils.getVeniceLocalSslFactory()); + serverFeatureProperties.put(CLIENT_CONFIG_FOR_CONSUMER, clientConfig); + server = cluster.addVeniceServer(serverFeatureProperties, serverProperties); } - @AfterClass(alwaysRun = true) + @AfterMethod(alwaysRun = true) public void cleanUp() { + repo.clear(); + valueSerializer.close(); + veniceWriter.close(); Utils.closeQuietlyWithErrorLogged(cluster); } @Test(timeOut = TEST_TIMEOUT) public void testServerKMERegistrationFromMessageHeader() { storeName = Utils.getUniqueString("venice-store"); - cluster.getNewStore(storeName, KEY_SCHEMA, VALUE_SCHEMA); - VersionCreationResponse creationResponse = cluster.getNewVersion(storeName, false); - - storeVersion = creationResponse.getKafkaTopic(); - int pushVersion = Version.parseVersionFromKafkaTopicName(storeVersion); - PubSubProducerAdapterFactory pubSubProducerAdapterFactory = - cluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA); - - veniceWriter = - IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) - .createVeniceWriter( - new VeniceWriterOptions.Builder(storeVersion).setKeyPayloadSerializer(keySerializer).build()); - - VeniceControllerWrapper leaderController = cluster.getLeaderVeniceController(); - KafkaValueSerializer valueSerializer = - server.getVeniceServer().getKafkaStoreIngestionService().getKafkaValueSerializer(); - - HelixReadWriteSchemaRepositoryAdapter adapter = - (HelixReadWriteSchemaRepositoryAdapter) (leaderController.getVeniceHelixAdmin() - .getHelixVeniceClusterResources(clusterName) - .getSchemaRepository()); - HelixReadWriteSchemaRepository repo = - (HelixReadWriteSchemaRepository) adapter.getReadWriteRegularStoreSchemaRepository(); - - // Wait until the latest schema appears in child colo's schema repository (ZK). - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> { - Assert.assertEquals( - repo.getValueSchema( - AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), - AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()).getId(), - AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()); - }); + commonSetup(); // Remove the latest schema from child controller's local value serializer and remove it from child colo's schema // repository (ZK). @@ -145,4 +136,106 @@ public void testServerKMERegistrationFromMessageHeader() { return currentVersion == pushVersion; }); } + + /** + * Integration test that validates the VTP header optimization for heartbeat messages. + * + * This test demonstrates that Venice can safely exclude VTP headers from heartbeat messages + * while maintaining full schema evolution capabilities through its system-level infrastructure. + * + * Test Scenario: + * 1. Remove all local KME schemas to simulate a server without cached schema knowledge + * 2. Send multi-heartbeats with dependentFeatureEnabled=true (excludes VTP headers) + * 3. Complete a full push cycle to verify schema evolution works without heartbeat VTP headers + * + * Key Validation Points: + * - Heartbeat messages exclude VTP headers when optimization is enabled + * - Schema evolution infrastructure (KME registration + schema reader) handles compatibility + * - Push completion succeeds despite heartbeats lacking schema information + * - System-level schema management is sufficient for operational needs + * + * This proves that frequent heartbeat messages can be optimized for performance without + * compromising Venice's robust schema evolution capabilities. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testServerDoNotIncludeKMESchemaInHeartbeatMessage() { + storeName = Utils.getUniqueString("venice-store-do-not-include-kme-schema"); + commonSetup(); + + // Step 1: Remove all local KME schemas to simulate a server without cached schema knowledge + // This creates the scenario where schema evolution infrastructure must handle compatibility + valueSerializer.removeAllSchemas(); + LOGGER.info("Server local schemas are removed - simulating schema evolution scenario"); + + // Step 2: Send multiple heartbeat messages with VTP header optimization enabled + // The last parameter (dependentFeatureEnabled=true) tells VeniceWriter to exclude VTP headers + // because we can rely on system-level schema evolution infrastructure + PubSubTopicRepository topicRepository = new PubSubTopicRepository(); + PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topicRepository.getTopic(storeVersion), 0); + + for (int i = 0; i < 10; i++) { + veniceWriter.sendHeartbeat( + topicPartition, + null, // No callback needed for this test + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + false, // Don't add leader complete state + LeaderCompleteState.LEADER_NOT_COMPLETED, + System.currentTimeMillis(), + true); // dependentFeatureEnabled=true -> excludes VTP headers + + LOGGER.info( + "Sent optimized heartbeat message #{} (no VTP headers) to topic partition: {}", + i, + topicPartition.getPubSubTopic().getName() + "-" + topicPartition.getPartitionNumber()); + } + + // Step 3: Execute a complete push cycle to validate that schema evolution works + // Even though heartbeats excluded VTP headers, the system should handle schema compatibility + // through KME schema registration and RouterBackedSchemaReader + veniceWriter.broadcastStartOfPush(false, false, CompressionStrategy.NO_OP, new HashMap<>()); + veniceWriter.broadcastEndOfPush(new HashMap<>()); + + // Step 4: Verify successful push completion despite optimized heartbeats + // This proves that excluding VTP headers from heartbeats doesn't break schema evolution + String controllerUrl = cluster.getAllControllersURLs(); + TestUtils.waitForNonDeterministicCompletion(30, TimeUnit.SECONDS, () -> { + int currentVersion = + ControllerClient.getStore(controllerUrl, cluster.getClusterName(), storeName).getStore().getCurrentVersion(); + return currentVersion == pushVersion; + }); + } + + private void commonSetup() { + cluster.getNewStore(storeName, KEY_SCHEMA, VALUE_SCHEMA); + VersionCreationResponse creationResponse = cluster.getNewVersion(storeName, false); + + storeVersion = creationResponse.getKafkaTopic(); + pushVersion = Version.parseVersionFromKafkaTopicName(storeVersion); + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = + cluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); + keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA); + + veniceWriter = + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) + .createVeniceWriter( + new VeniceWriterOptions.Builder(storeVersion).setKeyPayloadSerializer(keySerializer).build()); + + VeniceControllerWrapper leaderController = cluster.getLeaderVeniceController(); + valueSerializer = server.getVeniceServer().getKafkaStoreIngestionService().getKafkaValueSerializer(); + + HelixReadWriteSchemaRepositoryAdapter adapter = + (HelixReadWriteSchemaRepositoryAdapter) (leaderController.getVeniceHelixAdmin() + .getHelixVeniceClusterResources(clusterName) + .getSchemaRepository()); + repo = (HelixReadWriteSchemaRepository) adapter.getReadWriteRegularStoreSchemaRepository(); + + // Wait until the latest schema appears in child colo's schema repository (ZK). + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> { + Assert.assertEquals( + repo.getValueSchema( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()).getId(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()); + }); + } }