Skip to content

Commit 642919d

Browse files
committed
[server] Add a heartbeat lag based replica auto resubscribe feature
1 parent c0f8b56 commit 642919d

File tree

5 files changed

+100
-2
lines changed

5 files changed

+100
-2
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@
129129
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_REUSABLE_OBJECTS_STRATEGY;
130130
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
131131
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
132+
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED;
133+
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS;
134+
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS;
132135
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
133136
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
134137
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES;
@@ -666,6 +669,11 @@ public class VeniceServerConfig extends VeniceClusterConfig {
666669
private final boolean inactiveTopicPartitionCheckerEnabled;
667670
private final int inactiveTopicPartitionCheckerInternalInSeconds;
668671
private final int inactiveTopicPartitionCheckerThresholdInSeconds;
672+
673+
private final boolean lagBasedReplicaAutoResubscribeEnabled;
674+
private final int lagBasedReplicaAutoResubscribeIntervalInSeconds;
675+
private final int lagBasedReplicaAutoResubscribeThresholdInSeconds;
676+
669677
private final int serverIngestionInfoLogLineLimit;
670678

671679
private final boolean parallelResourceShutdownEnabled;
@@ -1133,6 +1141,13 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
11331141
serverProperties.getInt(SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_INTERNAL_IN_SECONDS, 100);
11341142
this.inactiveTopicPartitionCheckerThresholdInSeconds =
11351143
serverProperties.getInt(SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_THRESHOLD_IN_SECONDS, 5);
1144+
this.lagBasedReplicaAutoResubscribeEnabled =
1145+
serverProperties.getBoolean(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED, false);
1146+
this.lagBasedReplicaAutoResubscribeIntervalInSeconds =
1147+
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS, 300);
1148+
this.lagBasedReplicaAutoResubscribeThresholdInSeconds =
1149+
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS, 600);
1150+
11361151
this.useMetricsBasedPositionInLagComputation =
11371152
serverProperties.getBoolean(SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION, false);
11381153
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
@@ -2041,6 +2056,18 @@ public boolean isInactiveTopicPartitionCheckerEnabled() {
20412056
return inactiveTopicPartitionCheckerEnabled;
20422057
}
20432058

2059+
public boolean isLagBasedReplicaAutoResubscribeEnabled() {
2060+
return lagBasedReplicaAutoResubscribeEnabled;
2061+
}
2062+
2063+
public int getLagBasedReplicaAutoResubscribeIntervalInSeconds() {
2064+
return lagBasedReplicaAutoResubscribeIntervalInSeconds;
2065+
}
2066+
2067+
public int getLagBasedReplicaAutoResubscribeThresholdInSeconds() {
2068+
return lagBasedReplicaAutoResubscribeThresholdInSeconds;
2069+
}
2070+
20442071
public boolean isUseMetricsBasedPositionInLagComputationEnabled() {
20452072
return this.useMetricsBasedPositionInLagComputation;
20462073
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,4 +1491,15 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In
14911491
}
14921492
}
14931493

1494+
public void maybeAddResubscribeRequest(String storeName, int version, int partition) {
1495+
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version));
1496+
StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName());
1497+
if (storeIngestionTask == null) {
1498+
LOGGER.warn("StoreIngestionTask is not available for version topic: {}", versionTopic);
1499+
return;
1500+
}
1501+
storeIngestionTask.getResubscribeRequestQueue().add(partition);
1502+
LOGGER.info("Added replica: {} to pending resubscribe queue.", Utils.getReplicaId(versionTopic, partition));
1503+
}
1504+
14941505
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
245245
protected final TopicManagerRepository topicManagerRepository;
246246
/** Per-partition consumption state map */
247247
protected final ConcurrentMap<Integer, PartitionConsumptionState> partitionConsumptionStateMap;
248+
private final ConcurrentMap<Integer, Long> partitionToPreviousResubscribeTimeMap = new VeniceConcurrentHashMap<>();
249+
private final PriorityBlockingQueue<Integer> resubscribeRequestQueue = new PriorityBlockingQueue<>();
248250
private final AtomicInteger activeReplicaCount = new AtomicInteger(0);
249251
protected final AbstractStoreBufferService storeBufferService;
250252

@@ -1696,6 +1698,7 @@ public void run() {
16961698
Store store = storeRepository.getStoreOrThrow(storeName);
16971699
if (!skipAfterBatchPushUnsubEnabled) {
16981700
refreshIngestionContextIfChanged(store);
1701+
maybeProcessResubscribeRequest();
16991702
processConsumerActions(store);
17001703
checkLongRunningTaskState();
17011704
checkIngestionProgress(store);
@@ -5104,7 +5107,52 @@ protected static void validateEndOfPushReceivedBeforeTopicSwitch(
51045107
}
51055108
}
51065109

5110+
void maybeProcessResubscribeRequest() {
5111+
int count = 0;
5112+
Integer partition;
5113+
while (count < 3 && getResubscribeRequestQueue().peek() != null) {
5114+
partition = getResubscribeRequestQueue().poll();
5115+
if (partition == null) {
5116+
break;
5117+
}
5118+
long previousResubscribeTime = getPartitionToPreviousResubscribeTimeMap().getOrDefault(partition, 0L);
5119+
int allowedResubscribeIntervalInSeconds = serverConfig.getLagBasedReplicaAutoResubscribeIntervalInSeconds();
5120+
if (System.currentTimeMillis() - previousResubscribeTime < SECONDS
5121+
.toMillis(allowedResubscribeIntervalInSeconds)) {
5122+
LOGGER.info(
5123+
"Skip resubscribe request for partition: {} of SIT: {} as it has been resubscribed recently at: {}",
5124+
partition,
5125+
getVersionTopic(),
5126+
previousResubscribeTime);
5127+
continue;
5128+
}
5129+
PartitionConsumptionState pcs = getPartitionConsumptionStateMap().get(partition);
5130+
if (pcs == null) {
5131+
LOGGER.warn(
5132+
"Partition: {} does not exist in pcs map for SIT of: {}, will not resubscribe.",
5133+
partition,
5134+
getVersionTopic());
5135+
continue;
5136+
}
5137+
try {
5138+
getPartitionToPreviousResubscribeTimeMap().put(partition, System.currentTimeMillis());
5139+
resubscribe(pcs);
5140+
} catch (Exception e) {
5141+
LOGGER.warn("Caught exception when resubscribing for replica: {}", pcs.getReplicaId());
5142+
}
5143+
count++;
5144+
}
5145+
}
5146+
51075147
AbstractStoreBufferService getStoreBufferService() {
51085148
return storeBufferService;
51095149
}
5150+
5151+
public ConcurrentMap<Integer, Long> getPartitionToPreviousResubscribeTimeMap() {
5152+
return partitionToPreviousResubscribeTimeMap;
5153+
}
5154+
5155+
public PriorityBlockingQueue<Integer> getResubscribeRequestQueue() {
5156+
return resubscribeRequestQueue;
5157+
}
51105158
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,12 @@ public class HeartbeatMonitoringService extends AbstractVeniceService {
5757
public static final long DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS = TimeUnit.MINUTES.toMillis(10);
5858
public static final long INVALID_MESSAGE_TIMESTAMP = -1;
5959
public static final long INVALID_HEARTBEAT_LAG = Long.MAX_VALUE;
60-
6160
private static final Logger LOGGER = LogManager.getLogger(HeartbeatMonitoringService.class);
6261
private static final int DEFAULT_LAG_MONITOR_CLEANUP_CYCLE = 5;
6362
private final ReadOnlyStoreRepository metadataRepository;
6463
private final Thread reportingThread;
6564
private final Thread lagCleanupAndLoggingThread;
66-
65+
private final VeniceServerConfig serverConfig;
6766
private final Set<String> regionNames;
6867
private final String localRegionName;
6968

@@ -107,6 +106,7 @@ public HeartbeatMonitoringService(
107106
this.heartbeatMonitoringServiceStats = heartbeatMonitoringServiceStats;
108107
this.customizedViewRepositoryFuture = customizedViewRepositoryFuture;
109108
this.nodeId = Utils.getHelixNodeIdentifier(serverConfig.getListenerHostname(), serverConfig.getListenerPort());
109+
this.serverConfig = serverConfig;
110110
}
111111

112112
private synchronized void initializeEntry(
@@ -649,6 +649,11 @@ protected void checkAndMaybeLogHeartbeatDelayMap(
649649
version.getKey(),
650650
partition.getKey(),
651651
region.getKey());
652+
if (serverConfig.isLagBasedReplicaAutoResubscribeEnabled()
653+
&& serverConfig.getLagBasedReplicaAutoResubscribeThresholdInSeconds() < lag) {
654+
kafkaStoreIngestionService
655+
.maybeAddResubscribeRequest(storeName.getKey(), version.getKey(), partition.getKey());
656+
}
652657
}
653658
}
654659
}

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2698,6 +2698,13 @@ private ConfigKeys() {
26982698
public static final String SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_THRESHOLD_IN_SECONDS =
26992699
"server.inactive.topic.partition.checker.threshold.in.seconds";
27002700

2701+
public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED =
2702+
"server.lag.based.replica.auto.resubscribe.enabled";
2703+
public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS =
2704+
"server.lag.based.replica.auto.resubscribe.threshold.in.seconds";
2705+
public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS =
2706+
"server.lag.based.replica.auto.resubscribe.interval.in.seconds";
2707+
27012708
/**
27022709
* Whether to enable producer throughput optimization for realtime workload or not.
27032710
* Two strategies:

0 commit comments

Comments
 (0)