Skip to content

Commit 1a1e507

Browse files
committed
[server] Add a heartbeat lag based replica auto resubscribe feature
1 parent f2b82f6 commit 1a1e507

File tree

5 files changed

+100
-2
lines changed

5 files changed

+100
-2
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@
131131
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_REUSABLE_OBJECTS_STRATEGY;
132132
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
133133
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
134+
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED;
135+
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS;
136+
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS;
134137
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
135138
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
136139
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES;
@@ -672,6 +675,11 @@ public class VeniceServerConfig extends VeniceClusterConfig {
672675
private final boolean inactiveTopicPartitionCheckerEnabled;
673676
private final int inactiveTopicPartitionCheckerInternalInSeconds;
674677
private final int inactiveTopicPartitionCheckerThresholdInSeconds;
678+
679+
private final boolean lagBasedReplicaAutoResubscribeEnabled;
680+
private final int lagBasedReplicaAutoResubscribeIntervalInSeconds;
681+
private final int lagBasedReplicaAutoResubscribeThresholdInSeconds;
682+
675683
private final int serverIngestionInfoLogLineLimit;
676684

677685
private final boolean parallelResourceShutdownEnabled;
@@ -1145,6 +1153,13 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
11451153
serverProperties.getInt(SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_INTERNAL_IN_SECONDS, 100);
11461154
this.inactiveTopicPartitionCheckerThresholdInSeconds =
11471155
serverProperties.getInt(SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_THRESHOLD_IN_SECONDS, 5);
1156+
this.lagBasedReplicaAutoResubscribeEnabled =
1157+
serverProperties.getBoolean(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED, false);
1158+
this.lagBasedReplicaAutoResubscribeIntervalInSeconds =
1159+
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS, 300);
1160+
this.lagBasedReplicaAutoResubscribeThresholdInSeconds =
1161+
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS, 600);
1162+
11481163
this.useMetricsBasedPositionInLagComputation =
11491164
serverProperties.getBoolean(SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION, false);
11501165
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
@@ -2065,6 +2080,18 @@ public boolean isInactiveTopicPartitionCheckerEnabled() {
20652080
return inactiveTopicPartitionCheckerEnabled;
20662081
}
20672082

2083+
public boolean isLagBasedReplicaAutoResubscribeEnabled() {
2084+
return lagBasedReplicaAutoResubscribeEnabled;
2085+
}
2086+
2087+
public int getLagBasedReplicaAutoResubscribeIntervalInSeconds() {
2088+
return lagBasedReplicaAutoResubscribeIntervalInSeconds;
2089+
}
2090+
2091+
public int getLagBasedReplicaAutoResubscribeThresholdInSeconds() {
2092+
return lagBasedReplicaAutoResubscribeThresholdInSeconds;
2093+
}
2094+
20682095
public boolean isUseMetricsBasedPositionInLagComputationEnabled() {
20692096
return this.useMetricsBasedPositionInLagComputation;
20702097
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,4 +1518,15 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In
15181518
}
15191519
}
15201520

1521+
public void maybeAddResubscribeRequest(String storeName, int version, int partition) {
1522+
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version));
1523+
StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName());
1524+
if (storeIngestionTask == null) {
1525+
LOGGER.warn("StoreIngestionTask is not available for version topic: {}", versionTopic);
1526+
return;
1527+
}
1528+
storeIngestionTask.getResubscribeRequestQueue().add(partition);
1529+
LOGGER.info("Added replica: {} to pending resubscribe queue.", Utils.getReplicaId(versionTopic, partition));
1530+
}
1531+
15211532
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
249249
protected final TopicManagerRepository topicManagerRepository;
250250
/** Per-partition consumption state map */
251251
protected final ConcurrentMap<Integer, PartitionConsumptionState> partitionConsumptionStateMap;
252+
private final ConcurrentMap<Integer, Long> partitionToPreviousResubscribeTimeMap = new VeniceConcurrentHashMap<>();
253+
private final PriorityBlockingQueue<Integer> resubscribeRequestQueue = new PriorityBlockingQueue<>();
252254
private final AtomicInteger activeReplicaCount = new AtomicInteger(0);
253255
protected final AbstractStoreBufferService storeBufferService;
254256

@@ -1750,6 +1752,7 @@ public void run() {
17501752
Store store = storeRepository.getStoreOrThrow(storeName);
17511753
if (!skipAfterBatchPushUnsubEnabled) {
17521754
refreshIngestionContextIfChanged(store);
1755+
maybeProcessResubscribeRequest();
17531756
processConsumerActions(store);
17541757
checkLongRunningTaskState();
17551758
checkIngestionProgress(store);
@@ -5214,6 +5217,43 @@ protected static void validateEndOfPushReceivedBeforeTopicSwitch(
52145217
}
52155218
}
52165219

5220+
void maybeProcessResubscribeRequest() {
5221+
int count = 0;
5222+
Integer partition;
5223+
while (count < 3 && getResubscribeRequestQueue().peek() != null) {
5224+
partition = getResubscribeRequestQueue().poll();
5225+
if (partition == null) {
5226+
break;
5227+
}
5228+
long previousResubscribeTime = getPartitionToPreviousResubscribeTimeMap().getOrDefault(partition, 0L);
5229+
int allowedResubscribeIntervalInSeconds = serverConfig.getLagBasedReplicaAutoResubscribeIntervalInSeconds();
5230+
if (System.currentTimeMillis() - previousResubscribeTime < SECONDS
5231+
.toMillis(allowedResubscribeIntervalInSeconds)) {
5232+
LOGGER.info(
5233+
"Skip resubscribe request for partition: {} of SIT: {} as it has been resubscribed recently at: {}",
5234+
partition,
5235+
getVersionTopic(),
5236+
previousResubscribeTime);
5237+
continue;
5238+
}
5239+
PartitionConsumptionState pcs = getPartitionConsumptionStateMap().get(partition);
5240+
if (pcs == null) {
5241+
LOGGER.warn(
5242+
"Partition: {} does not exist in pcs map for SIT of: {}, will not resubscribe.",
5243+
partition,
5244+
getVersionTopic());
5245+
continue;
5246+
}
5247+
try {
5248+
getPartitionToPreviousResubscribeTimeMap().put(partition, System.currentTimeMillis());
5249+
resubscribe(pcs);
5250+
} catch (Exception e) {
5251+
LOGGER.warn("Caught exception when resubscribing for replica: {}", pcs.getReplicaId());
5252+
}
5253+
count++;
5254+
}
5255+
}
5256+
52175257
AbstractStoreBufferService getStoreBufferService() {
52185258
return storeBufferService;
52195259
}
@@ -5225,4 +5265,12 @@ long getBootstrapTimeoutInMs() {
52255265
ReadOnlyStoreRepository getStoreRepository() {
52265266
return storeRepository;
52275267
}
5268+
5269+
public ConcurrentMap<Integer, Long> getPartitionToPreviousResubscribeTimeMap() {
5270+
return partitionToPreviousResubscribeTimeMap;
5271+
}
5272+
5273+
public PriorityBlockingQueue<Integer> getResubscribeRequestQueue() {
5274+
return resubscribeRequestQueue;
5275+
}
52285276
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,12 @@ public class HeartbeatMonitoringService extends AbstractVeniceService {
5757
public static final long DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS = TimeUnit.MINUTES.toMillis(10);
5858
public static final long INVALID_MESSAGE_TIMESTAMP = -1;
5959
public static final long INVALID_HEARTBEAT_LAG = Long.MAX_VALUE;
60-
6160
private static final Logger LOGGER = LogManager.getLogger(HeartbeatMonitoringService.class);
6261
private static final int DEFAULT_LAG_MONITOR_CLEANUP_CYCLE = 5;
6362
private final ReadOnlyStoreRepository metadataRepository;
6463
private final Thread reportingThread;
6564
private final Thread lagCleanupAndLoggingThread;
66-
65+
private final VeniceServerConfig serverConfig;
6766
private final Set<String> regionNames;
6867
private final String localRegionName;
6968

@@ -107,6 +106,7 @@ public HeartbeatMonitoringService(
107106
this.heartbeatMonitoringServiceStats = heartbeatMonitoringServiceStats;
108107
this.customizedViewRepositoryFuture = customizedViewRepositoryFuture;
109108
this.nodeId = Utils.getHelixNodeIdentifier(serverConfig.getListenerHostname(), serverConfig.getListenerPort());
109+
this.serverConfig = serverConfig;
110110
}
111111

112112
private synchronized void initializeEntry(
@@ -649,6 +649,11 @@ protected void checkAndMaybeLogHeartbeatDelayMap(
649649
version.getKey(),
650650
partition.getKey(),
651651
region.getKey());
652+
if (serverConfig.isLagBasedReplicaAutoResubscribeEnabled()
653+
&& serverConfig.getLagBasedReplicaAutoResubscribeThresholdInSeconds() < lag) {
654+
kafkaStoreIngestionService
655+
.maybeAddResubscribeRequest(storeName.getKey(), version.getKey(), partition.getKey());
656+
}
652657
}
653658
}
654659
}

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2739,6 +2739,13 @@ private ConfigKeys() {
27392739
public static final String SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_THRESHOLD_IN_SECONDS =
27402740
"server.inactive.topic.partition.checker.threshold.in.seconds";
27412741

2742+
public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED =
2743+
"server.lag.based.replica.auto.resubscribe.enabled";
2744+
public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS =
2745+
"server.lag.based.replica.auto.resubscribe.threshold.in.seconds";
2746+
public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS =
2747+
"server.lag.based.replica.auto.resubscribe.interval.in.seconds";
2748+
27422749
/**
27432750
* Whether to enable producer throughput optimization for realtime workload or not.
27442751
* Two strategies:

0 commit comments

Comments
 (0)