Skip to content

Commit d3a410b

Browse files
committed
add unit test
1 parent 09ab18a commit d3a410b

File tree

4 files changed

+113
-8
lines changed

4 files changed

+113
-8
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5246,12 +5246,13 @@ && getResubscribeRequestQueue().peek() != null) {
52465246
continue;
52475247
}
52485248
try {
5249+
LOGGER.info("Resubscribing: {}", pcs.getReplicaId());
52495250
getPartitionToPreviousResubscribeTimeMap().put(partition, System.currentTimeMillis());
5251+
count++;
52505252
resubscribe(pcs);
52515253
} catch (Exception e) {
52525254
LOGGER.warn("Caught exception when resubscribing for replica: {}", pcs.getReplicaId());
52535255
}
5254-
count++;
52555256
}
52565257
}
52575258

clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -617,14 +617,14 @@ protected void record() {
617617
.recordFollowerLag(storeName, version, region, heartbeatTs, isReadyToServe)));
618618
}
619619

620-
protected void checkAndMaybeLogHeartbeatDelayMap(
620+
void checkAndMaybeLogHeartbeatDelayMap(
621621
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps) {
622-
if (kafkaStoreIngestionService == null) {
622+
if (getKafkaStoreIngestionService() == null) {
623623
// Service not initialized yet, skip logging
624624
return;
625625
}
626626
long currentTimestamp = System.currentTimeMillis();
627-
boolean isLeader = heartbeatTimestamps == leaderHeartbeatTimeStamps;
627+
boolean isLeader = heartbeatTimestamps == getLeaderHeartbeatTimeStamps();
628628
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> storeName: heartbeatTimestamps
629629
.entrySet()) {
630630
for (Map.Entry<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>> version: storeName.getValue()
@@ -644,14 +644,18 @@ protected void checkAndMaybeLogHeartbeatDelayMap(
644644
lag,
645645
heartbeatTs,
646646
currentTimestamp);
647-
kafkaStoreIngestionService.attemptToPrintIngestionInfoFor(
647+
getKafkaStoreIngestionService().attemptToPrintIngestionInfoFor(
648648
storeName.getKey(),
649649
version.getKey(),
650650
partition.getKey(),
651651
region.getKey());
652-
if (serverConfig.isLagBasedReplicaAutoResubscribeEnabled()
653-
&& serverConfig.getLagBasedReplicaAutoResubscribeThresholdInSeconds() < lag) {
654-
kafkaStoreIngestionService
652+
/**
653+
* Here we don't consider whether it is current version or not, as it will need extra logic to extract.
654+
* We will delegate to KafkaConsumerService to determine whether it takes this request as it has information.
655+
*/
656+
if (getServerConfig().isLagBasedReplicaAutoResubscribeEnabled() && TimeUnit.SECONDS
657+
.toMillis(getServerConfig().getLagBasedReplicaAutoResubscribeThresholdInSeconds()) < lag) {
658+
getKafkaStoreIngestionService()
655659
.maybeAddResubscribeRequest(storeName.getKey(), version.getKey(), partition.getKey());
656660
}
657661
}
@@ -894,4 +898,12 @@ String getLocalRegionName() {
894898
public void setKafkaStoreIngestionService(KafkaStoreIngestionService kafkaStoreIngestionService) {
895899
this.kafkaStoreIngestionService = kafkaStoreIngestionService;
896900
}
901+
902+
KafkaStoreIngestionService getKafkaStoreIngestionService() {
903+
return kafkaStoreIngestionService;
904+
}
905+
906+
VeniceServerConfig getServerConfig() {
907+
return serverConfig;
908+
}
897909
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.linkedin.davinci.kafka.consumer;
2+
3+
import static org.mockito.ArgumentMatchers.eq;
4+
import static org.mockito.Mockito.doCallRealMethod;
5+
import static org.mockito.Mockito.doReturn;
6+
import static org.mockito.Mockito.mock;
7+
import static org.mockito.Mockito.never;
8+
import static org.mockito.Mockito.times;
9+
import static org.mockito.Mockito.verify;
10+
11+
import com.linkedin.davinci.config.VeniceServerConfig;
12+
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
13+
import java.util.Map;
14+
import java.util.concurrent.PriorityBlockingQueue;
15+
import java.util.concurrent.TimeUnit;
16+
import org.testng.annotations.Test;
17+
18+
19+
public class AutoResubscribeTest {
20+
@Test
21+
public void testHandleAutoResubscribe() throws InterruptedException {
22+
StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class);
23+
doCallRealMethod().when(storeIngestionTask).maybeProcessResubscribeRequest();
24+
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
25+
doReturn(serverConfig).when(storeIngestionTask).getServerConfig();
26+
PriorityBlockingQueue<Integer> resubscribeQueue = new PriorityBlockingQueue<>();
27+
doReturn(resubscribeQueue).when(storeIngestionTask).getResubscribeRequestQueue();
28+
Map<Integer, Long> resubscribeRequestTimestamp = new VeniceConcurrentHashMap<>();
29+
doReturn(resubscribeRequestTimestamp).when(storeIngestionTask).getPartitionToPreviousResubscribeTimeMap();
30+
Map<Integer, PartitionConsumptionState> partitionConsumptionStateMap = new VeniceConcurrentHashMap<>();
31+
doReturn(partitionConsumptionStateMap).when(storeIngestionTask).getPartitionConsumptionStateMap();
32+
doReturn(300).when(serverConfig).getLagBasedReplicaAutoResubscribeIntervalInSeconds();
33+
34+
doReturn(2).when(serverConfig).getLagBasedReplicaAutoResubscribeMaxReplicaCount();
35+
36+
PartitionConsumptionState pcs1 = mock(PartitionConsumptionState.class);
37+
PartitionConsumptionState pcs3 = mock(PartitionConsumptionState.class);
38+
PartitionConsumptionState pcs4 = mock(PartitionConsumptionState.class);
39+
partitionConsumptionStateMap.put(1, pcs1);
40+
partitionConsumptionStateMap.put(3, pcs3);
41+
partitionConsumptionStateMap.put(4, pcs4);
42+
resubscribeRequestTimestamp.put(0, System.currentTimeMillis());
43+
resubscribeRequestTimestamp.put(1, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500));
44+
resubscribeRequestTimestamp.put(2, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500));
45+
resubscribeRequestTimestamp.put(4, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(500));
46+
resubscribeQueue.put(0);
47+
resubscribeQueue.put(1);
48+
resubscribeQueue.put(2);
49+
resubscribeQueue.put(3);
50+
resubscribeQueue.put(4);
51+
52+
storeIngestionTask.maybeProcessResubscribeRequest();
53+
verify(storeIngestionTask, times(1)).resubscribe(eq(pcs1));
54+
verify(storeIngestionTask, times(1)).resubscribe(eq(pcs3));
55+
verify(storeIngestionTask, never()).resubscribe(eq(pcs4));
56+
57+
}
58+
}

clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.mockito.Mockito.when;
2323

2424
import com.linkedin.davinci.config.VeniceServerConfig;
25+
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
2526
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
2627
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
2728
import com.linkedin.venice.exceptions.VeniceNoHelixResourceException;
@@ -42,6 +43,7 @@
4243
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
4344
import io.tehuti.metrics.MetricsRepository;
4445
import java.time.Duration;
46+
import java.util.HashMap;
4547
import java.util.HashSet;
4648
import java.util.Map;
4749
import java.util.Set;
@@ -783,4 +785,36 @@ public void testLargestHeartbeatLag() {
783785
Assert.assertEquals(aggregatedHeartbeatLagEntry.getCurrentVersionHeartbeatLag(), 8000L);
784786
Assert.assertEquals(aggregatedHeartbeatLagEntry.getNonCurrentVersionHeartbeatLag(), 9900L);
785787
}
788+
789+
@Test
790+
public void testTriggerAutoResubscribe() {
791+
String store = "foo";
792+
int version = 100;
793+
int partition = 123;
794+
String region = "dc1";
795+
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps = new HashMap<>();
796+
HeartbeatTimeStampEntry entry =
797+
new HeartbeatTimeStampEntry(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(15), true, true);
798+
heartbeatTimestamps.put(store, new HashMap<>());
799+
heartbeatTimestamps.get(store).put(version, new HashMap<>());
800+
heartbeatTimestamps.get(store).get(version).put(partition, new HashMap<>());
801+
heartbeatTimestamps.get(store).get(version).get(partition).put(region, entry);
802+
803+
HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class);
804+
KafkaStoreIngestionService kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class);
805+
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
806+
doReturn(serverConfig).when(heartbeatMonitoringService).getServerConfig();
807+
doReturn(kafkaStoreIngestionService).when(heartbeatMonitoringService).getKafkaStoreIngestionService();
808+
doCallRealMethod().when(heartbeatMonitoringService).checkAndMaybeLogHeartbeatDelayMap(anyMap());
809+
810+
// Config not enabled, nothing happen
811+
heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps);
812+
verify(kafkaStoreIngestionService, never()).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition));
813+
814+
// Config enabled, trigger resubscribe.
815+
doReturn(true).when(serverConfig).isLagBasedReplicaAutoResubscribeEnabled();
816+
doReturn(600).when(serverConfig).getLagBasedReplicaAutoResubscribeThresholdInSeconds();
817+
heartbeatMonitoringService.checkAndMaybeLogHeartbeatDelayMap(heartbeatTimestamps);
818+
verify(kafkaStoreIngestionService, times(1)).maybeAddResubscribeRequest(eq(store), eq(version), eq(partition));
819+
}
786820
}

0 commit comments

Comments
 (0)