Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,13 @@ public void handleStoreDeleted(Store store) {
? new OptimizedKafkaValueSerializer(newSchemaEncountered)
: new OptimizedKafkaValueSerializer();

kafkaMessageEnvelopeSchemaReader.ifPresent(kafkaValueSerializer::setSchemaReader);
kafkaMessageEnvelopeSchemaReader.ifPresent(reader -> {
LOGGER.info(
"Initialized KME schema reader. Type: {}, Latest value schema ID: {}",
reader.getClass().getSimpleName(),
reader.getLatestValueSchemaId());
kafkaValueSerializer.setSchemaReader(reader);
Comment on lines +424 to +429
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The log message at INFO level is logged every time the service initializes. Consider whether this should be rate-limited or moved to DEBUG level to avoid log pollution in environments with frequent restarts, or ensure it's only logged once during service lifetime.

Copilot uses AI. Check for mistakes.
});
PubSubMessageDeserializer pubSubDeserializer = new PubSubMessageDeserializer(
kafkaValueSerializer,
new LandFillObjectPool<>(KafkaMessageEnvelope::new),
Expand Down Expand Up @@ -1518,4 +1524,7 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In
}
}

public boolean isKMESchemeReaderPresent() {
return kafkaMessageEnvelopeSchemaReader.isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3943,7 +3943,7 @@ private void sendIngestionHeartbeatToVT(
originTimeStampMs);
}

Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Changing the visibility of 'sendIngestionHeartbeat' from private to package-private for testing purposes is acceptable, but consider adding a comment explaining that this is intentionally package-private for test access to make the design decision explicit.

Suggested change
/**
* Package-private for test access. This method's visibility is intentionally not private
* to allow unit tests in the same package to invoke it.
*/

Copilot uses AI. Check for mistakes.
private CompletableFuture<PubSubProduceResult> sendIngestionHeartbeat(
CompletableFuture<PubSubProduceResult> sendIngestionHeartbeat(
PartitionConsumptionState partitionConsumptionState,
PubSubTopicPartition topicPartition,
PubSubProducerCallback callback,
Expand All @@ -3953,6 +3953,8 @@ private CompletableFuture<PubSubProduceResult> sendIngestionHeartbeat(
LeaderCompleteState leaderCompleteState,
long originTimeStampMs) {
CompletableFuture<PubSubProduceResult> heartBeatFuture;
boolean dependentFeatureEnabled = isSystemSchemaInitializationAtStartTimeEnabled()
&& getHeartbeatMonitoringService().getKafkaStoreIngestionService().isKMESchemeReaderPresent();
try {
heartBeatFuture = partitionConsumptionState.getVeniceWriterLazyRef()
.get()
Expand All @@ -3962,7 +3964,8 @@ private CompletableFuture<PubSubProduceResult> sendIngestionHeartbeat(
leaderMetadataWrapper,
addLeaderCompleteState,
leaderCompleteState,
originTimeStampMs);
originTimeStampMs,
dependentFeatureEnabled);
if (shouldLog) {
heartBeatFuture
.whenComplete((ignore, throwable) -> logIngestionHeartbeat(topicPartition, (Exception) throwable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
private final Version version;
private boolean skipValidationForSeekableClientEnabled = false;
private long lastResubscriptionCheckTimestamp = System.currentTimeMillis();
private final boolean isSystemSchemaInitializationAtStartTimeEnabled;

public StoreIngestionTask(
StorageService storageService,
Expand Down Expand Up @@ -682,6 +683,7 @@ public StoreIngestionTask(
this.parallelProcessingThreadPool = builder.getAAWCWorkLoadProcessingThreadPool();
this.hostName = Utils.getHostName() + "_" + storeVersionConfig.getListenerPort();
this.zkHelixAdmin = zkHelixAdmin;
this.isSystemSchemaInitializationAtStartTimeEnabled = serverConfig.isSystemSchemaInitializationAtStartTimeEnabled();
}

/** Package-private on purpose, only intended for tests. Do not use for production use cases. */
Expand Down Expand Up @@ -5003,6 +5005,10 @@ boolean isGlobalRtDivEnabled() {
return isGlobalRtDivEnabled;
}

boolean isSystemSchemaInitializationAtStartTimeEnabled() {
return isSystemSchemaInitializationAtStartTimeEnabled;
}

/** When Global RT DIV is enabled the ConsumptionTask's DIV is exclusively used to validate data integrity. */
@VisibleForTesting
public DataIntegrityValidator getDataIntegrityValidator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,4 +889,8 @@ String getLocalRegionName() {
public void setKafkaStoreIngestionService(KafkaStoreIngestionService kafkaStoreIngestionService) {
this.kafkaStoreIngestionService = kafkaStoreIngestionService;
}

public KafkaStoreIngestionService getKafkaStoreIngestionService() {
return kafkaStoreIngestionService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
Expand All @@ -87,6 +88,8 @@
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.writer.LeaderCompleteState;
import com.linkedin.venice.writer.LeaderMetadataWrapper;
import com.linkedin.venice.writer.VeniceWriter;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import java.io.IOException;
Expand Down Expand Up @@ -523,6 +526,111 @@ private PubSubMessageProcessedResultWrapper getMockMessage(int seqNumber) {
return pubSubMessageProcessedResultWrapper;
}

@Test
public void testSendIngestionHeartbeatWithDependentFeatures() throws InterruptedException {
setUp();

// Mock the HeartbeatMonitoringService and KafkaStoreIngestionService
HeartbeatMonitoringService mockHeartbeatMonitoringService = mock(HeartbeatMonitoringService.class);
KafkaStoreIngestionService mockKafkaStoreIngestionService = mock(KafkaStoreIngestionService.class);

// Mock the getHeartbeatMonitoringService method to return our mock
doReturn(mockHeartbeatMonitoringService).when(leaderFollowerStoreIngestionTask).getHeartbeatMonitoringService();
when(mockHeartbeatMonitoringService.getKafkaStoreIngestionService()).thenReturn(mockKafkaStoreIngestionService);

// Mock VeniceWriter and its sendHeartbeat method
VeniceWriter mockVeniceWriter = mock(VeniceWriter.class);
Lazy<VeniceWriter<byte[], byte[], byte[]>> lazyMockWriter = Lazy.of(() -> mockVeniceWriter);
when(mockPartitionConsumptionState.getVeniceWriterLazyRef()).thenReturn(lazyMockWriter);

CompletableFuture<PubSubProduceResult> mockFuture = mock(CompletableFuture.class);
when(mockVeniceWriter.sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()))
.thenReturn(mockFuture);

// Mock other required objects
PubSubTopicPartition mockTopicPartition = mock(PubSubTopicPartition.class);
PubSubProducerCallback mockCallback = mock(PubSubProducerCallback.class);
LeaderMetadataWrapper mockLeaderMetadataWrapper = mock(LeaderMetadataWrapper.class);

// Test case 1: Both dependent features enabled - should pass true to sendHeartbeat
doReturn(true).when(leaderFollowerStoreIngestionTask).isSystemSchemaInitializationAtStartTimeEnabled();
when(mockKafkaStoreIngestionService.isKMESchemeReaderPresent()).thenReturn(true);

leaderFollowerStoreIngestionTask.sendIngestionHeartbeat(
mockPartitionConsumptionState,
mockTopicPartition,
mockCallback,
mockLeaderMetadataWrapper,
false, // shouldLog
false, // addLeaderCompleteState
LeaderCompleteState.LEADER_NOT_COMPLETED,
System.currentTimeMillis());

// Verify that sendHeartbeat was called with dependentFeatureEnabled = true
verify(mockVeniceWriter).sendHeartbeat(
eq(mockTopicPartition),
eq(mockCallback),
eq(mockLeaderMetadataWrapper),
eq(false), // addLeaderCompleteState
eq(LeaderCompleteState.LEADER_NOT_COMPLETED),
anyLong(), // originTimeStampMs
eq(true)); // dependentFeatureEnabled should be true

// Reset the mock for next test
clearInvocations(mockVeniceWriter);

// Test case 2: System schema initialization disabled - should pass false to sendHeartbeat
doReturn(false).when(leaderFollowerStoreIngestionTask).isSystemSchemaInitializationAtStartTimeEnabled();
when(mockKafkaStoreIngestionService.isKMESchemeReaderPresent()).thenReturn(true);

leaderFollowerStoreIngestionTask.sendIngestionHeartbeat(
mockPartitionConsumptionState,
mockTopicPartition,
mockCallback,
mockLeaderMetadataWrapper,
false, // shouldLog
false, // addLeaderCompleteState
LeaderCompleteState.LEADER_NOT_COMPLETED,
System.currentTimeMillis());

// Verify that sendHeartbeat was called with dependentFeatureEnabled = false
verify(mockVeniceWriter).sendHeartbeat(
eq(mockTopicPartition),
eq(mockCallback),
eq(mockLeaderMetadataWrapper),
eq(false), // addLeaderCompleteState
eq(LeaderCompleteState.LEADER_NOT_COMPLETED),
anyLong(), // originTimeStampMs
eq(false)); // dependentFeatureEnabled should be false

// Reset the mock for next test
clearInvocations(mockVeniceWriter);

// Test case 3: KME reader disabled - should pass false to sendHeartbeat
doReturn(true).when(leaderFollowerStoreIngestionTask).isSystemSchemaInitializationAtStartTimeEnabled();
when(mockKafkaStoreIngestionService.isKMESchemeReaderPresent()).thenReturn(false);

leaderFollowerStoreIngestionTask.sendIngestionHeartbeat(
mockPartitionConsumptionState,
mockTopicPartition,
mockCallback,
mockLeaderMetadataWrapper,
false, // shouldLog
false, // addLeaderCompleteState
LeaderCompleteState.LEADER_NOT_COMPLETED,
System.currentTimeMillis());

// Verify that sendHeartbeat was called with dependentFeatureEnabled = false
verify(mockVeniceWriter).sendHeartbeat(
eq(mockTopicPartition),
eq(mockCallback),
eq(mockLeaderMetadataWrapper),
eq(false), // addLeaderCompleteState
eq(LeaderCompleteState.LEADER_NOT_COMPLETED),
anyLong(), // originTimeStampMs
eq(false)); // dependentFeatureEnabled should be false
}

@Test(timeOut = 60_000)
public void testIsRecordSelfProduced() throws InterruptedException {
setUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2994,7 +2994,8 @@ public void testDelayedTransitionToOnlineInHybridMode(AAConfig aaConfig) throws
DEFAULT_LEADER_METADATA_WRAPPER,
true,
LeaderCompleteState.getLeaderCompleteState(true),
System.currentTimeMillis());
System.currentTimeMillis(),
true);
messageCountPerPartition[partition]++;
}

Expand Down Expand Up @@ -5134,7 +5135,8 @@ public void testMaybeSendIngestionHeartbeat(
VeniceWriterFactory veniceWriterFactory = mock(VeniceWriterFactory.class);
CompletableFuture heartBeatFuture = new CompletableFuture();
heartBeatFuture.complete(null);
doReturn(heartBeatFuture).when(veniceWriter).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong());
doReturn(heartBeatFuture).when(veniceWriter)
.sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
doReturn(veniceWriter).when(veniceWriterFactory).createVeniceWriter(any());

doReturn(Lazy.of(() -> veniceWriter)).when(pcs).getVeniceWriterLazyRef();
Expand Down Expand Up @@ -5168,9 +5170,9 @@ public void testMaybeSendIngestionHeartbeat(
// Second invocation should be skipped since it shouldn't be time for another heartbeat yet.
ingestionTask.maybeSendIngestionHeartbeat();
if (hybridConfig == HYBRID && isRealTimeTopic && nodeType == NodeType.LEADER) {
verify(veniceWriter, times(1)).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong());
verify(veniceWriter, times(1)).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
} else {
verify(veniceWriter, never()).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong());
verify(veniceWriter, never()).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
}

/**
Expand Down Expand Up @@ -5272,17 +5274,19 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() {
PubSubTopicPartition pubSubTopicPartition1sep = new PubSubTopicPartitionImpl(sepRTtopic, 1);

// all succeeded
doReturn(heartBeatFuture).when(veniceWriter).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong());
doReturn(heartBeatFuture).when(veniceWriter)
.sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
AtomicReference<Set<String>> failedPartitions = new AtomicReference<>(null);
failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat());
assertEquals(failedPartitions.get().size(), 0);

// 1 partition throws exception
doReturn(heartBeatFuture).when(veniceWriter)
.sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong());
.sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
doAnswer(invocation -> {
throw new Exception("mock exception");
}).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition1), any(), any(), anyBoolean(), any(), anyLong());
}).when(veniceWriter)
.sendHeartbeat(eq(pubSubTopicPartition1), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
// wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat());
Expand All @@ -5297,10 +5301,11 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() {

// 1 partition throws exception
doReturn(heartBeatFuture).when(veniceWriter)
.sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong());
.sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
doAnswer(invocation -> {
throw new Exception("mock exception");
}).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition1sep), any(), any(), anyBoolean(), any(), anyLong());
}).when(veniceWriter)
.sendHeartbeat(eq(pubSubTopicPartition1sep), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
// wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat());
Expand All @@ -5316,7 +5321,8 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() {
// both partition throws exception
doAnswer(invocation -> {
throw new Exception("mock exception");
}).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong());
}).when(veniceWriter)
.sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
// wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,17 @@ public void testProcessControlMessage() {
KafkaKey kafkaKey = mock(KafkaKey.class);
doReturn(KafkaKey.HEART_BEAT.getKey()).when(kafkaKey).getKey();
ComplexVeniceWriter veniceWriter = mock(ComplexVeniceWriter.class);
when(veniceWriter.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()))
when(veniceWriter.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter)
.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong());
.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
materializedViewWriter.setVeniceWriter(veniceWriter);
KafkaMessageEnvelope kafkaMessageEnvelope = mock(KafkaMessageEnvelope.class);
PartitionConsumptionState partitionConsumptionState = mock(PartitionConsumptionState.class);
materializedViewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState);
verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong());
verify(veniceWriter, never())
.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong(), anyBoolean());
}

@Test
Expand Down
Loading
Loading