Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_REUSABLE_OBJECTS_STRATEGY;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
Expand Down Expand Up @@ -675,6 +679,12 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean inactiveTopicPartitionCheckerEnabled;
private final int inactiveTopicPartitionCheckerInternalInSeconds;
private final int inactiveTopicPartitionCheckerThresholdInSeconds;

private final boolean lagBasedReplicaAutoResubscribeEnabled;
private final int lagBasedReplicaAutoResubscribeIntervalInSeconds;
private final int lagBasedReplicaAutoResubscribeThresholdInSeconds;
private final int lagBasedReplicaAutoResubscribeMaxReplicaCount;

private final int serverIngestionInfoLogLineLimit;

private final boolean parallelResourceShutdownEnabled;
Expand Down Expand Up @@ -1149,6 +1159,14 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getInt(SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_INTERNAL_IN_SECONDS, 100);
this.inactiveTopicPartitionCheckerThresholdInSeconds =
serverProperties.getInt(SERVER_INACTIVE_TOPIC_PARTITION_CHECKER_THRESHOLD_IN_SECONDS, 5);
this.lagBasedReplicaAutoResubscribeEnabled =
serverProperties.getBoolean(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_ENABLED, false);
this.lagBasedReplicaAutoResubscribeIntervalInSeconds =
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_INTERVAL_IN_SECONDS, 300);
this.lagBasedReplicaAutoResubscribeThresholdInSeconds =
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS, 600);
this.lagBasedReplicaAutoResubscribeMaxReplicaCount =
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT, 3);
this.useMetricsBasedPositionInLagComputation =
serverProperties.getBoolean(SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION, false);
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
Expand Down Expand Up @@ -2075,6 +2093,22 @@ public boolean isInactiveTopicPartitionCheckerEnabled() {
return inactiveTopicPartitionCheckerEnabled;
}

public boolean isLagBasedReplicaAutoResubscribeEnabled() {
return lagBasedReplicaAutoResubscribeEnabled;
}

public int getLagBasedReplicaAutoResubscribeIntervalInSeconds() {
return lagBasedReplicaAutoResubscribeIntervalInSeconds;
}

public int getLagBasedReplicaAutoResubscribeThresholdInSeconds() {
return lagBasedReplicaAutoResubscribeThresholdInSeconds;
}

public int getLagBasedReplicaAutoResubscribeMaxReplicaCount() {
return lagBasedReplicaAutoResubscribeMaxReplicaCount;
}

public boolean isUseMetricsBasedPositionInLagComputationEnabled() {
return this.useMetricsBasedPositionInLagComputation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1533,4 +1533,15 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In
}
}

public void maybeAddResubscribeRequest(String storeName, int version, int partition) {
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version));
StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName());
if (storeIngestionTask == null) {
LOGGER.warn("StoreIngestionTask is not available for version topic: {}", versionTopic);
return;
}
storeIngestionTask.getResubscribeRequestQueue().add(partition);
LOGGER.info("Added replica: {} to pending resubscribe queue.", Utils.getReplicaId(versionTopic, partition));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final TopicManagerRepository topicManagerRepository;
/** Per-partition consumption state map */
protected final ConcurrentMap<Integer, PartitionConsumptionState> partitionConsumptionStateMap;
private final ConcurrentMap<Integer, Long> partitionToPreviousResubscribeTimeMap = new VeniceConcurrentHashMap<>();
private final PriorityBlockingQueue<Integer> resubscribeRequestQueue = new PriorityBlockingQueue<>();
private final AtomicInteger activeReplicaCount = new AtomicInteger(0);
protected final AbstractStoreBufferService storeBufferService;

Expand Down Expand Up @@ -1749,6 +1751,7 @@ public void run() {
Store store = storeRepository.getStoreOrThrow(storeName);
if (!skipAfterBatchPushUnsubEnabled) {
refreshIngestionContextIfChanged(store);
maybeProcessResubscribeRequest();
processConsumerActions(store);
checkLongRunningTaskState();
checkIngestionProgress(store);
Expand Down Expand Up @@ -5217,6 +5220,49 @@ protected static void validateEndOfPushReceivedBeforeTopicSwitch(
}
}

void maybeProcessResubscribeRequest() {
int count = 0;
Integer partition;
while (count < getServerConfig().getLagBasedReplicaAutoResubscribeMaxReplicaCount()
&& getResubscribeRequestQueue().peek() != null) {
partition = getResubscribeRequestQueue().poll();
long previousResubscribeTime = getPartitionToPreviousResubscribeTimeMap().getOrDefault(partition, 0L);
int allowedResubscribeIntervalInSeconds = getServerConfig().getLagBasedReplicaAutoResubscribeIntervalInSeconds();
if (System.currentTimeMillis() - previousResubscribeTime < SECONDS
.toMillis(allowedResubscribeIntervalInSeconds)) {
LOGGER.info(
"Skip resubscribe request for partition: {} of SIT: {} as it has been resubscribed recently at: {}",
partition,
getVersionTopic(),
previousResubscribeTime);
continue;
}
PartitionConsumptionState pcs = getPartitionConsumptionStateMap().get(partition);
if (pcs == null) {
LOGGER.warn(
"Replica: {} does not exist in pcs map for SIT of: {}, will not resubscribe.",
Utils.getReplicaId(versionTopic, partition),
getVersionTopic());
continue;
}
/**
* As of now, this feature intends to resolve ingestion performance issue introduced by consumer. We will rely on
* the error reset feature to handle the error replica properly.
*/
if (pcs.isErrorReported()) {
LOGGER.warn("Replica: {} is already errored, will not resubscribe to repair.", pcs.getReplicaId());
}
try {
LOGGER.info("Resubscribing: {}", pcs.getReplicaId());
getPartitionToPreviousResubscribeTimeMap().put(partition, System.currentTimeMillis());
count++;
resubscribe(pcs);
} catch (Exception e) {
LOGGER.error("Caught exception when resubscribing for replica: {}", pcs.getReplicaId(), e);
}
}
}

AbstractStoreBufferService getStoreBufferService() {
return storeBufferService;
}
Expand All @@ -5232,4 +5278,12 @@ ReadOnlyStoreRepository getStoreRepository() {
String getLocalKafkaServer() {
return localKafkaServer;
}

ConcurrentMap<Integer, Long> getPartitionToPreviousResubscribeTimeMap() {
return partitionToPreviousResubscribeTimeMap;
}

PriorityBlockingQueue<Integer> getResubscribeRequestQueue() {
return resubscribeRequestQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,11 @@ public class HeartbeatMonitoringService extends AbstractVeniceService {
public static final long INVALID_MESSAGE_TIMESTAMP = -1;
public static final long INVALID_HEARTBEAT_LAG = Long.MAX_VALUE;
public static final int DEFAULT_LAG_MONITOR_CLEANUP_CYCLE = 5;

private static final Logger LOGGER = LogManager.getLogger(HeartbeatMonitoringService.class);
private final ReadOnlyStoreRepository metadataRepository;
private final Thread reportingThread;
private final Thread lagCleanupAndLoggingThread;

private final VeniceServerConfig serverConfig;
private final Set<String> regionNames;
private final String localRegionName;

Expand Down Expand Up @@ -109,6 +108,7 @@ public HeartbeatMonitoringService(
this.customizedViewRepositoryFuture = customizedViewRepositoryFuture;
this.nodeId = Utils.getHelixNodeIdentifier(serverConfig.getListenerHostname(), serverConfig.getListenerPort());
this.lagMonitorCleanupCycle = serverConfig.getLagMonitorCleanupCycle();
this.serverConfig = serverConfig;
}

private synchronized void initializeEntry(
Expand Down Expand Up @@ -619,14 +619,14 @@ protected void record() {
.recordFollowerLag(storeName, version, region, heartbeatTs, isReadyToServe)));
}

protected void checkAndMaybeLogHeartbeatDelayMap(
void checkAndMaybeLogHeartbeatDelayMap(
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps) {
if (kafkaStoreIngestionService == null) {
if (getKafkaStoreIngestionService() == null) {
// Service not initialized yet, skip logging
return;
}
long currentTimestamp = System.currentTimeMillis();
boolean isLeader = heartbeatTimestamps == leaderHeartbeatTimeStamps;
boolean isLeader = heartbeatTimestamps == getLeaderHeartbeatTimeStamps();
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> storeName: heartbeatTimestamps
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>> version: storeName.getValue()
Expand All @@ -646,11 +646,24 @@ protected void checkAndMaybeLogHeartbeatDelayMap(
lag,
heartbeatTs,
currentTimestamp);
kafkaStoreIngestionService.attemptToPrintIngestionInfoFor(
getKafkaStoreIngestionService().attemptToPrintIngestionInfoFor(
storeName.getKey(),
version.getKey(),
partition.getKey(),
region.getKey());
/**
* Here we don't consider whether it is current version or not, as it will need extra logic to extract.
* In this layer, we will filter out untracked region (separated realtime region).
* We will delegate to KafkaStoreIngestionService to determine whether it takes this request as it has
* information about SIT current version.
*/
if (getServerConfig().isLagBasedReplicaAutoResubscribeEnabled()
&& TimeUnit.SECONDS
.toMillis(getServerConfig().getLagBasedReplicaAutoResubscribeThresholdInSeconds()) < lag
&& !Utils.isSeparateTopicRegion(region.getKey())) {
getKafkaStoreIngestionService()
.maybeAddResubscribeRequest(storeName.getKey(), version.getKey(), partition.getKey());
}
}
}
}
Expand Down Expand Up @@ -891,4 +904,12 @@ String getLocalRegionName() {
public void setKafkaStoreIngestionService(KafkaStoreIngestionService kafkaStoreIngestionService) {
this.kafkaStoreIngestionService = kafkaStoreIngestionService;
}

KafkaStoreIngestionService getKafkaStoreIngestionService() {
return kafkaStoreIngestionService;
}

VeniceServerConfig getServerConfig() {
return serverConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.linkedin.davinci.kafka.consumer;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;


public class AutoResubscribeTest {
@Test
public void testHandleAutoResubscribe() throws InterruptedException {
StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class);
doCallRealMethod().when(storeIngestionTask).maybeProcessResubscribeRequest();
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
doReturn(serverConfig).when(storeIngestionTask).getServerConfig();
PriorityBlockingQueue<Integer> resubscribeQueue = new PriorityBlockingQueue<>();
doReturn(resubscribeQueue).when(storeIngestionTask).getResubscribeRequestQueue();
Map<Integer, Long> resubscribeRequestTimestamp = new VeniceConcurrentHashMap<>();
doReturn(resubscribeRequestTimestamp).when(storeIngestionTask).getPartitionToPreviousResubscribeTimeMap();
Map<Integer, PartitionConsumptionState> partitionConsumptionStateMap = new VeniceConcurrentHashMap<>();
doReturn(partitionConsumptionStateMap).when(storeIngestionTask).getPartitionConsumptionStateMap();
doReturn(300).when(serverConfig).getLagBasedReplicaAutoResubscribeIntervalInSeconds();

doReturn(2).when(serverConfig).getLagBasedReplicaAutoResubscribeMaxReplicaCount();

/**
* The test setup goes as follows:
* P0: Not stale
* P1: Stale
* P2: Does not have PCS
* P3: Stale
* P4: Stale but ERROR
* P5: Stale
*
* It is expected to resubscribe P1/P3.
*/

PartitionConsumptionState pcs1 = mock(PartitionConsumptionState.class);
PartitionConsumptionState pcs3 = mock(PartitionConsumptionState.class);
PartitionConsumptionState pcs4 = mock(PartitionConsumptionState.class);
PartitionConsumptionState pcs5 = mock(PartitionConsumptionState.class);
partitionConsumptionStateMap.put(1, pcs1);
partitionConsumptionStateMap.put(3, pcs3);
partitionConsumptionStateMap.put(4, pcs4);
partitionConsumptionStateMap.put(5, pcs4);
doReturn(true).when(pcs4).isErrorReported();
resubscribeRequestTimestamp.put(0, System.currentTimeMillis());
resubscribeRequestTimestamp.put(1, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500));
resubscribeRequestTimestamp.put(2, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500));
resubscribeRequestTimestamp.put(4, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500));
resubscribeRequestTimestamp.put(5, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500));
resubscribeQueue.put(0);
resubscribeQueue.put(1);
resubscribeQueue.put(2);
resubscribeQueue.put(3);
resubscribeQueue.put(4);
resubscribeQueue.put(5);

storeIngestionTask.maybeProcessResubscribeRequest();
verify(storeIngestionTask, times(1)).resubscribe(eq(pcs1));
verify(storeIngestionTask, times(1)).resubscribe(eq(pcs3));
verify(storeIngestionTask, never()).resubscribe(eq(pcs4));
verify(storeIngestionTask, never()).resubscribe(eq(pcs5));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.when;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
import com.linkedin.venice.exceptions.VeniceNoHelixResourceException;
Expand All @@ -42,6 +43,7 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -812,4 +814,43 @@ public void testLargestHeartbeatLag() {
Assert.assertEquals(aggregatedHeartbeatLagEntry.getCurrentVersionHeartbeatLag(), 8000L);
Assert.assertEquals(aggregatedHeartbeatLagEntry.getNonCurrentVersionHeartbeatLag(), 9900L);
}

@Test
public void testTriggerAutoResubscribe() {
String store = "foo";
int version = 100;
int partition = 123;
String region = "dc1";
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps = new HashMap<>();
HeartbeatTimeStampEntry entry =
new HeartbeatTimeStampEntry(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(15), true, true);
heartbeatTimestamps.put(store, new HashMap<>());
heartbeatTimestamps.get(store).put(version, new HashMap<>());
heartbeatTimestamps.get(store).get(version).put(partition, new HashMap<>());
heartbeatTimestamps.get(store).get(version).get(partition).put(region, entry);

HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class);
KafkaStoreIngestionService kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class);
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
doReturn(serverConfig).when(heartbeatMonitoringService).getServerConfig();
doReturn(kafkaStoreIngestionService).when(heartbeatMonitoringService).getKafkaStoreIngestionService();
doCallRealMethod().when(heartbeatMonitoringService).checkAndMaybeLogHeartbeatDelayMap(anyMap());

// Config not enabled, nothing happen
heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps);
verify(kafkaStoreIngestionService, never()).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition));

// Config enabled, trigger resubscribe.
doReturn(true).when(serverConfig).isLagBasedReplicaAutoResubscribeEnabled();
doReturn(600).when(serverConfig).getLagBasedReplicaAutoResubscribeThresholdInSeconds();
heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps);
verify(kafkaStoreIngestionService, times(1)).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition));

// Config enabled, does not trigger resubscribe for sep region.
heartbeatTimestamps.get(store).get(version).get(partition).remove(region);
region = "dc1_sep";
heartbeatTimestamps.get(store).get(version).get(partition).put(region, entry);
heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps);
verify(kafkaStoreIngestionService, times(1)).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition));
}
}
Loading
Loading