Skip to content

Commit 09ab18a

Browse files
committed
add more config
1 parent 1a1e507 commit 09ab18a

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
134134
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED;
135135
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS;
136+
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT;
136137
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS;
137138
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
138139
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
@@ -679,6 +680,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
679680
private final boolean lagBasedReplicaAutoResubscribeEnabled;
680681
private final int lagBasedReplicaAutoResubscribeIntervalInSeconds;
681682
private final int lagBasedReplicaAutoResubscribeThresholdInSeconds;
683+
private final int lagBasedReplicaAutoResubscribeMaxReplicaCount;
682684

683685
private final int serverIngestionInfoLogLineLimit;
684686

@@ -1159,7 +1161,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
11591161
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS, 300);
11601162
this.lagBasedReplicaAutoResubscribeThresholdInSeconds =
11611163
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS, 600);
1162-
1164+
this.lagBasedReplicaAutoResubscribeMaxReplicaCount =
1165+
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT, 3);
11631166
this.useMetricsBasedPositionInLagComputation =
11641167
serverProperties.getBoolean(SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION, false);
11651168
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
@@ -2092,6 +2095,10 @@ public int getLagBasedReplicaAutoResubscribeThresholdInSeconds() {
20922095
return lagBasedReplicaAutoResubscribeThresholdInSeconds;
20932096
}
20942097

2098+
public int getLagBasedReplicaAutoResubscribeMaxReplicaCount() {
2099+
return lagBasedReplicaAutoResubscribeMaxReplicaCount;
2100+
}
2101+
20952102
public boolean isUseMetricsBasedPositionInLagComputationEnabled() {
20962103
return this.useMetricsBasedPositionInLagComputation;
20972104
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5220,13 +5220,14 @@ protected static void validateEndOfPushReceivedBeforeTopicSwitch(
52205220
void maybeProcessResubscribeRequest() {
52215221
int count = 0;
52225222
Integer partition;
5223-
while (count < 3 && getResubscribeRequestQueue().peek() != null) {
5223+
while (count < getServerConfig().getLagBasedReplicaAutoResubscribeMaxReplicaCount()
5224+
&& getResubscribeRequestQueue().peek() != null) {
52245225
partition = getResubscribeRequestQueue().poll();
52255226
if (partition == null) {
52265227
break;
52275228
}
52285229
long previousResubscribeTime = getPartitionToPreviousResubscribeTimeMap().getOrDefault(partition, 0L);
5229-
int allowedResubscribeIntervalInSeconds = serverConfig.getLagBasedReplicaAutoResubscribeIntervalInSeconds();
5230+
int allowedResubscribeIntervalInSeconds = getServerConfig().getLagBasedReplicaAutoResubscribeIntervalInSeconds();
52305231
if (System.currentTimeMillis() - previousResubscribeTime < SECONDS
52315232
.toMillis(allowedResubscribeIntervalInSeconds)) {
52325233
LOGGER.info(

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2745,6 +2745,8 @@ private ConfigKeys() {
27452745
"server.lag.based.replica.auto.resubscribe.threshold.in.seconds";
27462746
public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS =
27472747
"server.lag.based.replica.auto.resubscribe.interval.in.seconds";
2748+
public static final String SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT =
2749+
"server.lag.based.replica.auto.resubscribe.max.replica.count";
27482750

27492751
/**
27502752
* Whether to enable producer throughput optimization for realtime workload or not.

0 commit comments

Comments
 (0)