From 7bc8284e54300ea8dc19c73dc6ea60d4790d4987 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Thu, 6 Nov 2025 00:34:46 -0800 Subject: [PATCH 1/8] [server] Add a heartbeat lag based replica auto resubscribe feature --- .../davinci/config/VeniceServerConfig.java | 27 +++++++++++ .../consumer/KafkaStoreIngestionService.java | 11 +++++ .../kafka/consumer/StoreIngestionTask.java | 48 +++++++++++++++++++ .../heartbeat/HeartbeatMonitoringService.java | 9 +++- .../java/com/linkedin/venice/ConfigKeys.java | 7 +++ 5 files changed, 100 insertions(+), 2 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 29aee433ee7..983cb4433d1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -134,6 +134,9 @@ import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS; import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE; +import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS; +import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES; @@ -675,6 +678,11 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final boolean inactiveTopicPartitionCheckerEnabled; private final int inactiveTopicPartitionCheckerInternalInSeconds; private final int inactiveTopicPartitionCheckerThresholdInSeconds; + + private final boolean lagBasedReplicaAutoResubscribeEnabled; + private final int lagBasedReplicaAutoResubscribeIntervalInSeconds; + private final int lagBasedReplicaAutoResubscribeThresholdInSeconds; + private final int serverIngestionInfoLogLineLimit; private final boolean parallelResourceShutdownEnabled; @@ -1149,6 +1157,13 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map partitionConsumptionStateMap; + private final ConcurrentMap partitionToPreviousResubscribeTimeMap = new VeniceConcurrentHashMap<>(); + private final PriorityBlockingQueue resubscribeRequestQueue = new PriorityBlockingQueue<>(); private final AtomicInteger activeReplicaCount = new AtomicInteger(0); protected final AbstractStoreBufferService storeBufferService; @@ -1749,6 +1751,7 @@ public void run() { Store store = storeRepository.getStoreOrThrow(storeName); if (!skipAfterBatchPushUnsubEnabled) { refreshIngestionContextIfChanged(store); + maybeProcessResubscribeRequest(); processConsumerActions(store); checkLongRunningTaskState(); checkIngestionProgress(store); @@ -5217,6 +5220,43 @@ protected static void validateEndOfPushReceivedBeforeTopicSwitch( } } + void maybeProcessResubscribeRequest() { + int count = 0; + Integer partition; + while (count < 3 && getResubscribeRequestQueue().peek() != null) { + partition = getResubscribeRequestQueue().poll(); + if (partition == null) { + break; + } + long previousResubscribeTime = getPartitionToPreviousResubscribeTimeMap().getOrDefault(partition, 0L); + int allowedResubscribeIntervalInSeconds = serverConfig.getLagBasedReplicaAutoResubscribeIntervalInSeconds(); + if (System.currentTimeMillis() - previousResubscribeTime < SECONDS + .toMillis(allowedResubscribeIntervalInSeconds)) { + LOGGER.info( + "Skip resubscribe request for partition: {} of SIT: {} as it has been resubscribed recently at: {}", + partition, + getVersionTopic(), + previousResubscribeTime); + continue; + } + PartitionConsumptionState pcs = getPartitionConsumptionStateMap().get(partition); + if (pcs == null) { + LOGGER.warn( + "Partition: {} does not exist in pcs map for SIT of: {}, will not resubscribe.", + partition, + getVersionTopic()); + continue; + } + try { + getPartitionToPreviousResubscribeTimeMap().put(partition, System.currentTimeMillis()); + resubscribe(pcs); + } catch (Exception e) { + LOGGER.warn("Caught exception when resubscribing for replica: {}", pcs.getReplicaId()); + } + count++; + } + } + AbstractStoreBufferService getStoreBufferService() { return storeBufferService; } @@ -5232,4 +5272,12 @@ ReadOnlyStoreRepository getStoreRepository() { String getLocalKafkaServer() { return localKafkaServer; } + + ConcurrentMap getPartitionToPreviousResubscribeTimeMap() { + return partitionToPreviousResubscribeTimeMap; + } + + PriorityBlockingQueue getResubscribeRequestQueue() { + return resubscribeRequestQueue; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index c0f0a4eb33e..d115af012c4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -58,12 +58,11 @@ public class HeartbeatMonitoringService extends AbstractVeniceService { public static final long INVALID_MESSAGE_TIMESTAMP = -1; public static final long INVALID_HEARTBEAT_LAG = Long.MAX_VALUE; public static final int DEFAULT_LAG_MONITOR_CLEANUP_CYCLE = 5; - private static final Logger LOGGER = LogManager.getLogger(HeartbeatMonitoringService.class); private final ReadOnlyStoreRepository metadataRepository; private final Thread reportingThread; private final Thread lagCleanupAndLoggingThread; - + private final VeniceServerConfig serverConfig; private final Set regionNames; private final String localRegionName; @@ -109,6 +108,7 @@ public HeartbeatMonitoringService( this.customizedViewRepositoryFuture = customizedViewRepositoryFuture; this.nodeId = Utils.getHelixNodeIdentifier(serverConfig.getListenerHostname(), serverConfig.getListenerPort()); this.lagMonitorCleanupCycle = serverConfig.getLagMonitorCleanupCycle(); + this.serverConfig = serverConfig; } private synchronized void initializeEntry( @@ -651,6 +651,11 @@ protected void checkAndMaybeLogHeartbeatDelayMap( version.getKey(), partition.getKey(), region.getKey()); + if (serverConfig.isLagBasedReplicaAutoResubscribeEnabled() + && serverConfig.getLagBasedReplicaAutoResubscribeThresholdInSeconds() < lag) { + kafkaStoreIngestionService + .maybeAddResubscribeRequest(storeName.getKey(), version.getKey(), partition.getKey()); + } } } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index e85db052c27..48a29662f07 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2746,6 +2746,13 @@ private ConfigKeys() { public static final String SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_THRESHOLD_IN_SECONDS = "server.inactive.topic.partition.checker.threshold.in.seconds"; + public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED = + "server.lag.based.replica.auto.resubscribe.enabled"; + public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS = + "server.lag.based.replica.auto.resubscribe.threshold.in.seconds"; + public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS = + "server.lag.based.replica.auto.resubscribe.interval.in.seconds"; + /** * Whether to enable producer throughput optimization for realtime workload or not. * Two strategies: From 97b09925af3592dbf8daac1d7d68768d276af5b9 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 19 Nov 2025 20:27:44 -0800 Subject: [PATCH 2/8] add more config --- .../com/linkedin/davinci/config/VeniceServerConfig.java | 9 ++++++++- .../davinci/kafka/consumer/StoreIngestionTask.java | 5 +++-- .../src/main/java/com/linkedin/venice/ConfigKeys.java | 2 ++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 983cb4433d1..ac004730740 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -136,6 +136,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE; import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS; +import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT; import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED; @@ -682,6 +683,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final boolean lagBasedReplicaAutoResubscribeEnabled; private final int lagBasedReplicaAutoResubscribeIntervalInSeconds; private final int lagBasedReplicaAutoResubscribeThresholdInSeconds; + private final int lagBasedReplicaAutoResubscribeMaxReplicaCount; private final int serverIngestionInfoLogLineLimit; @@ -1163,7 +1165,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map Date: Fri, 21 Nov 2025 01:29:56 -0800 Subject: [PATCH 3/8] add unit test --- .../kafka/consumer/StoreIngestionTask.java | 3 +- .../heartbeat/HeartbeatMonitoringService.java | 26 ++++++--- .../kafka/consumer/AutoResubscribeTest.java | 58 +++++++++++++++++++ .../HeartbeatMonitoringServiceTest.java | 34 +++++++++++ 4 files changed, 113 insertions(+), 8 deletions(-) create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AutoResubscribeTest.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index fecb09c4a4a..5be9d36fb2e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -5249,12 +5249,13 @@ && getResubscribeRequestQueue().peek() != null) { continue; } try { + LOGGER.info("Resubscribing: {}", pcs.getReplicaId()); getPartitionToPreviousResubscribeTimeMap().put(partition, System.currentTimeMillis()); + count++; resubscribe(pcs); } catch (Exception e) { LOGGER.warn("Caught exception when resubscribing for replica: {}", pcs.getReplicaId()); } - count++; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index d115af012c4..a8a9fa19f39 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -619,14 +619,14 @@ protected void record() { .recordFollowerLag(storeName, version, region, heartbeatTs, isReadyToServe))); } - protected void checkAndMaybeLogHeartbeatDelayMap( + void checkAndMaybeLogHeartbeatDelayMap( Map>>> heartbeatTimestamps) { - if (kafkaStoreIngestionService == null) { + if (getKafkaStoreIngestionService() == null) { // Service not initialized yet, skip logging return; } long currentTimestamp = System.currentTimeMillis(); - boolean isLeader = heartbeatTimestamps == leaderHeartbeatTimeStamps; + boolean isLeader = heartbeatTimestamps == getLeaderHeartbeatTimeStamps(); for (Map.Entry>>> storeName: heartbeatTimestamps .entrySet()) { for (Map.Entry>> version: storeName.getValue() @@ -646,14 +646,18 @@ protected void checkAndMaybeLogHeartbeatDelayMap( lag, heartbeatTs, currentTimestamp); - kafkaStoreIngestionService.attemptToPrintIngestionInfoFor( + getKafkaStoreIngestionService().attemptToPrintIngestionInfoFor( storeName.getKey(), version.getKey(), partition.getKey(), region.getKey()); - if (serverConfig.isLagBasedReplicaAutoResubscribeEnabled() - && serverConfig.getLagBasedReplicaAutoResubscribeThresholdInSeconds() < lag) { - kafkaStoreIngestionService + /** + * Here we don't consider whether it is current version or not, as it will need extra logic to extract. + * We will delegate to KafkaConsumerService to determine whether it takes this request as it has information. + */ + if (getServerConfig().isLagBasedReplicaAutoResubscribeEnabled() && TimeUnit.SECONDS + .toMillis(getServerConfig().getLagBasedReplicaAutoResubscribeThresholdInSeconds()) < lag) { + getKafkaStoreIngestionService() .maybeAddResubscribeRequest(storeName.getKey(), version.getKey(), partition.getKey()); } } @@ -896,4 +900,12 @@ String getLocalRegionName() { public void setKafkaStoreIngestionService(KafkaStoreIngestionService kafkaStoreIngestionService) { this.kafkaStoreIngestionService = kafkaStoreIngestionService; } + + KafkaStoreIngestionService getKafkaStoreIngestionService() { + return kafkaStoreIngestionService; + } + + VeniceServerConfig getServerConfig() { + return serverConfig; + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AutoResubscribeTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AutoResubscribeTest.java new file mode 100644 index 00000000000..fdb0c174770 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AutoResubscribeTest.java @@ -0,0 +1,58 @@ +package com.linkedin.davinci.kafka.consumer; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Map; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.testng.annotations.Test; + + +public class AutoResubscribeTest { + @Test + public void testHandleAutoResubscribe() throws InterruptedException { + StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class); + doCallRealMethod().when(storeIngestionTask).maybeProcessResubscribeRequest(); + VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); + doReturn(serverConfig).when(storeIngestionTask).getServerConfig(); + PriorityBlockingQueue resubscribeQueue = new PriorityBlockingQueue<>(); + doReturn(resubscribeQueue).when(storeIngestionTask).getResubscribeRequestQueue(); + Map resubscribeRequestTimestamp = new VeniceConcurrentHashMap<>(); + doReturn(resubscribeRequestTimestamp).when(storeIngestionTask).getPartitionToPreviousResubscribeTimeMap(); + Map partitionConsumptionStateMap = new VeniceConcurrentHashMap<>(); + doReturn(partitionConsumptionStateMap).when(storeIngestionTask).getPartitionConsumptionStateMap(); + doReturn(300).when(serverConfig).getLagBasedReplicaAutoResubscribeIntervalInSeconds(); + + doReturn(2).when(serverConfig).getLagBasedReplicaAutoResubscribeMaxReplicaCount(); + + PartitionConsumptionState pcs1 = mock(PartitionConsumptionState.class); + PartitionConsumptionState pcs3 = mock(PartitionConsumptionState.class); + PartitionConsumptionState pcs4 = mock(PartitionConsumptionState.class); + partitionConsumptionStateMap.put(1, pcs1); + partitionConsumptionStateMap.put(3, pcs3); + partitionConsumptionStateMap.put(4, pcs4); + resubscribeRequestTimestamp.put(0, System.currentTimeMillis()); + resubscribeRequestTimestamp.put(1, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500)); + resubscribeRequestTimestamp.put(2, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500)); + resubscribeRequestTimestamp.put(4, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500)); + resubscribeQueue.put(0); + resubscribeQueue.put(1); + resubscribeQueue.put(2); + resubscribeQueue.put(3); + resubscribeQueue.put(4); + + storeIngestionTask.maybeProcessResubscribeRequest(); + verify(storeIngestionTask, times(1)).resubscribe(eq(pcs1)); + verify(storeIngestionTask, times(1)).resubscribe(eq(pcs3)); + verify(storeIngestionTask, never()).resubscribe(eq(pcs4)); + + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java index 2fa506eae67..181e36386cf 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.venice.exceptions.VeniceNoHelixResourceException; @@ -42,6 +43,7 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricsRepository; import java.time.Duration; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -812,4 +814,36 @@ public void testLargestHeartbeatLag() { Assert.assertEquals(aggregatedHeartbeatLagEntry.getCurrentVersionHeartbeatLag(), 8000L); Assert.assertEquals(aggregatedHeartbeatLagEntry.getNonCurrentVersionHeartbeatLag(), 9900L); } + + @Test + public void testTriggerAutoResubscribe() { + String store = "foo"; + int version = 100; + int partition = 123; + String region = "dc1"; + Map>>> heartbeatTimestamps = new HashMap<>(); + HeartbeatTimeStampEntry entry = + new HeartbeatTimeStampEntry(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(15), true, true); + heartbeatTimestamps.put(store, new HashMap<>()); + heartbeatTimestamps.get(store).put(version, new HashMap<>()); + heartbeatTimestamps.get(store).get(version).put(partition, new HashMap<>()); + heartbeatTimestamps.get(store).get(version).get(partition).put(region, entry); + + HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); + KafkaStoreIngestionService kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class); + VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); + doReturn(serverConfig).when(heartbeatMonitoringService).getServerConfig(); + doReturn(kafkaStoreIngestionService).when(heartbeatMonitoringService).getKafkaStoreIngestionService(); + doCallRealMethod().when(heartbeatMonitoringService).checkAndMaybeLogHeartbeatDelayMap(anyMap()); + + // Config not enabled, nothing happen + heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps); + verify(kafkaStoreIngestionService, never()).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition)); + + // Config enabled, trigger resubscribe. + doReturn(true).when(serverConfig).isLagBasedReplicaAutoResubscribeEnabled(); + doReturn(600).when(serverConfig).getLagBasedReplicaAutoResubscribeThresholdInSeconds(); + heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps); + verify(kafkaStoreIngestionService, times(1)).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition)); + } } From 1a54051dc7913ead2417f71fafd118b483aa354b Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Tue, 25 Nov 2025 16:45:19 -0800 Subject: [PATCH 4/8] Address some of the comments --- .../kafka/consumer/StoreIngestionTask.java | 2 +- .../java/com/linkedin/venice/ConfigKeys.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 5be9d36fb2e..56154160690 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -5254,7 +5254,7 @@ && getResubscribeRequestQueue().peek() != null) { count++; resubscribe(pcs); } catch (Exception e) { - LOGGER.warn("Caught exception when resubscribing for replica: {}", pcs.getReplicaId()); + LOGGER.error("Caught exception when resubscribing for replica: {}", pcs.getReplicaId(), e); } } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index b3aa8f099fd..cf737d2f19b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2746,12 +2746,31 @@ private ConfigKeys() { public static final String SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_THRESHOLD_IN_SECONDS = "server.inactive.topic.partition.checker.threshold.in.seconds"; + /** + * Config to enable/disable lag based replica auto-resubscribe feature. + * Default is false as we will plan to roll out step-by-step. + */ public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED = "server.lag.based.replica.auto.resubscribe.enabled"; + /** + * Config to control the time lag threshold in seconds to trigger this auto-resubscribe feature. + * Default is 600s = 10 min. + */ public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS = "server.lag.based.replica.auto.resubscribe.threshold.in.seconds"; + /** + * Config to control the interval a replica is re-subscribed after previous attempt. This config intends to give replica + * sometime to auto-remediate the lag after re-subscription. + * Default is 300s = 5 min. + */ public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS = "server.lag.based.replica.auto.resubscribe.interval.in.seconds"; + /** + * Config to control the maximum number of replicas can be resubscribed in one single store ingestion task check. + * This is to make sure in case resubscribe feature does not work as expected or encounter slowness during the process, + * the SIT thread will keep functioning and serve other requests. + * Default is 3. + */ public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT = "server.lag.based.replica.auto.resubscribe.max.replica.count"; From a322c108c70a4c2fc43ddb184e859c646e75207d Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Tue, 25 Nov 2025 16:45:47 -0800 Subject: [PATCH 5/8] Address some of the comments --- .../java/com/linkedin/davinci/config/VeniceServerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index ac004730740..78f91896fa9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -133,11 +133,11 @@ import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_REUSABLE_OBJECTS_STRATEGY; import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS; -import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE; import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT; import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS; +import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE; import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES; From 76d9238e544b28f9899f2a431c22842d84579dc5 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Tue, 25 Nov 2025 16:57:08 -0800 Subject: [PATCH 6/8] exclude error replica --- .../kafka/consumer/StoreIngestionTask.java | 11 +++++++++-- .../kafka/consumer/AutoResubscribeTest.java | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 56154160690..635e8323931 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -5243,11 +5243,18 @@ && getResubscribeRequestQueue().peek() != null) { PartitionConsumptionState pcs = getPartitionConsumptionStateMap().get(partition); if (pcs == null) { LOGGER.warn( - "Partition: {} does not exist in pcs map for SIT of: {}, will not resubscribe.", - partition, + "Replica: {} does not exist in pcs map for SIT of: {}, will not resubscribe.", + Utils.getReplicaId(versionTopic, partition), getVersionTopic()); continue; } + /** + * As of now, this feature intends to resolve ingestion performance issue introduced by consumer. We will rely on + * the error reset feature to handle the error replica properly. + */ + if (pcs.isErrorReported()) { + LOGGER.warn("Replica: {} is already errored, will not resubscribe to repair.", pcs.getReplicaId()); + } try { LOGGER.info("Resubscribing: {}", pcs.getReplicaId()); getPartitionToPreviousResubscribeTimeMap().put(partition, System.currentTimeMillis()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AutoResubscribeTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AutoResubscribeTest.java index fdb0c174770..1a4d42351fc 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AutoResubscribeTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AutoResubscribeTest.java @@ -33,26 +33,44 @@ public void testHandleAutoResubscribe() throws InterruptedException { doReturn(2).when(serverConfig).getLagBasedReplicaAutoResubscribeMaxReplicaCount(); + /** + * The test setup goes as follows: + * P0: Not stale + * P1: Stale + * P2: Does not have PCS + * P3: Stale + * P4: Stale but ERROR + * P5: Stale + * + * It is expected to resubscribe P1/P3. + */ + PartitionConsumptionState pcs1 = mock(PartitionConsumptionState.class); PartitionConsumptionState pcs3 = mock(PartitionConsumptionState.class); PartitionConsumptionState pcs4 = mock(PartitionConsumptionState.class); + PartitionConsumptionState pcs5 = mock(PartitionConsumptionState.class); partitionConsumptionStateMap.put(1, pcs1); partitionConsumptionStateMap.put(3, pcs3); partitionConsumptionStateMap.put(4, pcs4); + partitionConsumptionStateMap.put(5, pcs4); + doReturn(true).when(pcs4).isErrorReported(); resubscribeRequestTimestamp.put(0, System.currentTimeMillis()); resubscribeRequestTimestamp.put(1, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500)); resubscribeRequestTimestamp.put(2, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500)); resubscribeRequestTimestamp.put(4, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500)); + resubscribeRequestTimestamp.put(5, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500)); resubscribeQueue.put(0); resubscribeQueue.put(1); resubscribeQueue.put(2); resubscribeQueue.put(3); resubscribeQueue.put(4); + resubscribeQueue.put(5); storeIngestionTask.maybeProcessResubscribeRequest(); verify(storeIngestionTask, times(1)).resubscribe(eq(pcs1)); verify(storeIngestionTask, times(1)).resubscribe(eq(pcs3)); verify(storeIngestionTask, never()).resubscribe(eq(pcs4)); + verify(storeIngestionTask, never()).resubscribe(eq(pcs5)); } } From c90092e8aaf39440040aa51ecd33fa5d6591223d Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Tue, 25 Nov 2025 17:15:04 -0800 Subject: [PATCH 7/8] address comments --- .../linkedin/davinci/kafka/consumer/StoreIngestionTask.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 635e8323931..b451460e592 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -5226,9 +5226,6 @@ void maybeProcessResubscribeRequest() { while (count < getServerConfig().getLagBasedReplicaAutoResubscribeMaxReplicaCount() && getResubscribeRequestQueue().peek() != null) { partition = getResubscribeRequestQueue().poll(); - if (partition == null) { - break; - } long previousResubscribeTime = getPartitionToPreviousResubscribeTimeMap().getOrDefault(partition, 0L); int allowedResubscribeIntervalInSeconds = getServerConfig().getLagBasedReplicaAutoResubscribeIntervalInSeconds(); if (System.currentTimeMillis() - previousResubscribeTime < SECONDS From fb827e7dba27b5a89948b69ac528ef029aeb6da3 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 26 Nov 2025 00:58:45 -0800 Subject: [PATCH 8/8] small tweak to exclude sep RT --- .../heartbeat/HeartbeatMonitoringService.java | 10 +++++++--- .../heartbeat/HeartbeatMonitoringServiceTest.java | 7 +++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index a8a9fa19f39..a691c091291 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -653,10 +653,14 @@ void checkAndMaybeLogHeartbeatDelayMap( region.getKey()); /** * Here we don't consider whether it is current version or not, as it will need extra logic to extract. - * We will delegate to KafkaConsumerService to determine whether it takes this request as it has information. + * In this layer, we will filter out untracked region (separated realtime region). + * We will delegate to KafkaStoreIngestionService to determine whether it takes this request as it has + * information about SIT current version. */ - if (getServerConfig().isLagBasedReplicaAutoResubscribeEnabled() && TimeUnit.SECONDS - .toMillis(getServerConfig().getLagBasedReplicaAutoResubscribeThresholdInSeconds()) < lag) { + if (getServerConfig().isLagBasedReplicaAutoResubscribeEnabled() + && TimeUnit.SECONDS + .toMillis(getServerConfig().getLagBasedReplicaAutoResubscribeThresholdInSeconds()) < lag + && !Utils.isSeparateTopicRegion(region.getKey())) { getKafkaStoreIngestionService() .maybeAddResubscribeRequest(storeName.getKey(), version.getKey(), partition.getKey()); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java index 181e36386cf..99b6349e3d1 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java @@ -845,5 +845,12 @@ public void testTriggerAutoResubscribe() { doReturn(600).when(serverConfig).getLagBasedReplicaAutoResubscribeThresholdInSeconds(); heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps); verify(kafkaStoreIngestionService, times(1)).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition)); + + // Config enabled, does not trigger resubscribe for sep region. + heartbeatTimestamps.get(store).get(version).get(partition).remove(region); + region = "dc1_sep"; + heartbeatTimestamps.get(store).get(version).get(partition).put(region, entry); + heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps); + verify(kafkaStoreIngestionService, times(1)).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition)); } }