Skip to content

Commit c0f8b56

Browse files
authored
[server] Support parallel shutdown workflow for Venice Server (#2247)
Speed up SIT shutdown by performing parallel unsubscribe and sync offset SIT today unsubscribes and syncs offset for each partition sequentially. For hosts with a large number of partitions, this process is slow and may take more than one minute. SIT executor graceful shutdown timeout is one minute, and after that, it will force close and set an Interrupt exception. This cancels all remaining graceful shutdown tasks for the remaining partitions. After some offline discussion, this PR improves the behavior by implementing parallel shutdown for all partitions, similar to Da Vinci today. This is still best effort, and we keep the timeout in place since we don't want slowness to block the entire server shutdown process. If certain steps are slow, we should continue investigating them. Introduce a new config "server.ingestion.checkpoint.during.graceful.shutdown.enabled" to control this behavior in the Server. By default, it is false (same as now). It will be rolled out gradually everywhere, as there should not be any side effect.
1 parent 3f8d61e commit c0f8b56

File tree

5 files changed

+119
-41
lines changed

5 files changed

+119
-41
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@
158158
import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_NO_READ_THRESHOLD_SECONDS;
159159
import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_SERVICE_SCHEDULE_INTERNAL_SECONDS;
160160
import static com.linkedin.venice.ConfigKeys.SERVER_PARALLEL_BATCH_GET_CHUNK_SIZE;
161+
import static com.linkedin.venice.ConfigKeys.SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED;
161162
import static com.linkedin.venice.ConfigKeys.SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS;
162163
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
163164
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS;
@@ -667,6 +668,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {
667668
private final int inactiveTopicPartitionCheckerThresholdInSeconds;
668669
private final int serverIngestionInfoLogLineLimit;
669670

671+
private final boolean parallelResourceShutdownEnabled;
672+
670673
public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
671674
this(serverProperties, Collections.emptyMap());
672675
}
@@ -1133,6 +1136,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
11331136
this.useMetricsBasedPositionInLagComputation =
11341137
serverProperties.getBoolean(SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION, false);
11351138
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
1139+
this.parallelResourceShutdownEnabled =
1140+
serverProperties.getBoolean(SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED, false);
11361141
}
11371142

11381143
List<Double> extractThrottleLimitFactorsFor(VeniceProperties serverProperties, String configKey) {
@@ -2043,4 +2048,8 @@ public boolean isUseMetricsBasedPositionInLagComputationEnabled() {
20432048
public int getServerIngestionInfoLogLineLimit() {
20442049
return this.serverIngestionInfoLogLineLimit;
20452050
}
2051+
2052+
public boolean isParallelResourceShutdownEnabled() {
2053+
return parallelResourceShutdownEnabled;
2054+
}
20462055
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1386,7 +1386,7 @@ public ByteBuffer getPartitionOffsetRecords(String topicName, int partition) {
13861386
*/
13871387
public void syncTopicPartitionOffset(String topicName, int partition) {
13881388
StoreIngestionTask storeIngestionTask = getStoreIngestionTask(topicName);
1389-
storeIngestionTask.updateOffsetMetadataAndSync(topicName, partition);
1389+
storeIngestionTask.updateOffsetMetadataAndSync(partition);
13901390
}
13911391

13921392
public final ReadOnlyStoreRepository getMetadataRepo() {

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1713,12 +1713,13 @@ public void run() {
17131713
List<CompletableFuture<Void>> shutdownFutures = new ArrayList<>(partitionConsumptionStateMap.size());
17141714

17151715
/**
1716-
* Speed up DaVinci shutdown by closing partitions concurrently.
1716+
* Speed shutdown by closing partitions concurrently. For Server it is controlled by server config, for DaVinci
1717+
* client it is always enabled.
17171718
*/
1718-
ExecutorService shutdownExecutorForDvc =
1719-
isDaVinciClient ? Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2) : null;
1719+
boolean enableParallelShutdown = serverConfig.isParallelResourceShutdownEnabled() || isDaVinciClient;
1720+
ExecutorService shutdownExecutor =
1721+
enableParallelShutdown ? Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2) : null;
17201722

1721-
// If the ingestion task is stopped gracefully (server stops), persist processed offset to disk
17221723
for (Map.Entry<Integer, PartitionConsumptionState> entry: partitionConsumptionStateMap.entrySet()) {
17231724
/**
17241725
* Now, there are two threads, which could potentially trigger {@link #syncOffset(String, PartitionConsumptionState)}:
@@ -1732,36 +1733,11 @@ public void run() {
17321733
* offset and checksum, since the checksum could change in another thread, but the corresponding offset change
17331734
* hasn't been applied yet, when checkpointing happens in current thread.
17341735
*/
1735-
1736-
Runnable shutdownRunnable = () -> {
1737-
int partition = entry.getKey();
1738-
PartitionConsumptionState partitionConsumptionState = entry.getValue();
1739-
consumerUnSubscribeAllTopics(partitionConsumptionState);
1740-
1741-
if (ingestionCheckpointDuringGracefulShutdownEnabled) {
1742-
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(versionTopic, partition);
1743-
try {
1744-
CompletableFuture<Void> cmdFuture = storeBufferService.execSyncOffsetCommandAsync(topicPartition, this);
1745-
waitForSyncOffsetCmd(cmdFuture, topicPartition);
1746-
waitForAllMessageToBeProcessedFromTopicPartition(topicPartition, partitionConsumptionState);
1747-
} catch (InterruptedException e) {
1748-
throw new VeniceException(e);
1749-
}
1750-
}
1751-
};
1752-
1753-
if (shutdownExecutorForDvc != null) {
1754-
shutdownFutures.add(CompletableFuture.runAsync(shutdownRunnable, shutdownExecutorForDvc));
1755-
} else {
1756-
/**
1757-
* TODO: evaluate whether we need to apply concurrent shutdown in Venice Server or not.
1758-
*/
1759-
shutdownRunnable.run();
1760-
}
1736+
executeShutdownRunnable(entry.getValue(), shutdownFutures, shutdownExecutor);
17611737
}
1762-
if (isDaVinciClient) {
1738+
if (enableParallelShutdown) {
17631739
/**
1764-
* DaVinci shutdown shouldn't take that long because of high concurrency, and it is fine to specify a high timeout here
1740+
* Shutdown shouldn't take that long because of high concurrency, and it is fine to specify a high timeout here
17651741
* to avoid infinite wait in case there is some regression.
17661742
*/
17671743
CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])).get(60, SECONDS);
@@ -1817,6 +1793,31 @@ public void run() {
18171793
}
18181794
}
18191795

1796+
void executeShutdownRunnable(
1797+
PartitionConsumptionState partitionConsumptionState,
1798+
List<CompletableFuture<Void>> shutdownFutures,
1799+
ExecutorService shutdownExecutor) {
1800+
Runnable shutdownRunnable = () -> {
1801+
consumerUnSubscribeAllTopics(partitionConsumptionState);
1802+
// If the ingestion task is stopped gracefully (server stops), persist processed offset to disk.
1803+
if (getServerConfig().isServerIngestionCheckpointDuringGracefulShutdownEnabled()) {
1804+
try {
1805+
PubSubTopicPartition topicPartition = partitionConsumptionState.getReplicaTopicPartition();
1806+
CompletableFuture<Void> cmdFuture = getStoreBufferService().execSyncOffsetCommandAsync(topicPartition, this);
1807+
waitForSyncOffsetCmd(cmdFuture, topicPartition);
1808+
waitForAllMessageToBeProcessedFromTopicPartition(topicPartition, partitionConsumptionState);
1809+
} catch (InterruptedException e) {
1810+
throw new VeniceException(e);
1811+
}
1812+
}
1813+
};
1814+
if (shutdownExecutor != null) {
1815+
shutdownFutures.add(CompletableFuture.runAsync(shutdownRunnable, shutdownExecutor));
1816+
} else {
1817+
shutdownRunnable.run();
1818+
}
1819+
}
1820+
18201821
private void waitForSyncOffsetCmd(CompletableFuture<Void> cmdFuture, PubSubTopicPartition topicPartition)
18211822
throws InterruptedException {
18221823
try {
@@ -1858,7 +1859,7 @@ protected void updateOffsetMetadataAndSyncOffset(DataIntegrityValidator div, @No
18581859
div.updateOffsetRecordForPartition(PartitionTracker.VERSION_TOPIC, pcs.getPartition(), pcs.getOffsetRecord());
18591860
// update the offset metadata in the OffsetRecord.
18601861
updateOffsetMetadataInOffsetRecord(pcs);
1861-
syncOffset(kafkaVersionTopic, pcs);
1862+
syncOffset(pcs);
18621863
}
18631864

18641865
/**
@@ -1868,7 +1869,7 @@ protected void updateAndSyncOffsetFromSnapshot(PartitionTracker vtDivSnapshot, P
18681869
PartitionConsumptionState pcs = getPartitionConsumptionState(topicPartition.getPartitionNumber());
18691870
vtDivSnapshot.updateOffsetRecord(PartitionTracker.VERSION_TOPIC, pcs.getOffsetRecord());
18701871
updateOffsetMetadataInOffsetRecord(pcs);
1871-
syncOffset(kafkaVersionTopic, pcs);
1872+
syncOffset(pcs);
18721873
}
18731874

18741875
private void handleIngestionException(Exception e) {
@@ -2760,10 +2761,9 @@ boolean shouldSyncOffset(
27602761
/**
27612762
* This method flushes data partition on disk and syncs the underlying database with {@link OffsetRecord}.
27622763
* Note that the updates for {@link OffsetRecord} is happened in {@link #updateOffsetMetadataInOffsetRecord}
2763-
* @param topic, the given version topic(VT) for the store.
27642764
* @param pcs, the corresponding {@link PartitionConsumptionState} to sync with.
27652765
*/
2766-
private void syncOffset(String topic, PartitionConsumptionState pcs) {
2766+
private void syncOffset(PartitionConsumptionState pcs) {
27672767
int partition = pcs.getPartition();
27682768
if (this.storageEngine.isClosed()) {
27692769
LOGGER.warn("Storage engine has been closed. Could not execute sync offset for replica: {}", pcs.getReplicaId());
@@ -3723,8 +3723,13 @@ public boolean consumerHasAnySubscription() {
37233723

37243724
public boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) {
37253725
int partitionId = partitionConsumptionState.getPartition();
3726-
return aggKafkaConsumerService
3727-
.hasConsumerAssignedFor(versionTopic, new PubSubTopicPartitionImpl(topic, partitionId));
3726+
PubSubTopicPartition pubSubTopicPartition;
3727+
if (topic.isVersionTopic()) {
3728+
pubSubTopicPartition = partitionConsumptionState.getReplicaTopicPartition();
3729+
} else {
3730+
pubSubTopicPartition = new PubSubTopicPartitionImpl(topic, partitionId);
3731+
}
3732+
return aggKafkaConsumerService.hasConsumerAssignedFor(versionTopic, pubSubTopicPartition);
37283733
}
37293734

37303735
/**
@@ -4555,10 +4560,10 @@ public VeniceServerConfig getServerConfig() {
45554560
return serverConfig;
45564561
}
45574562

4558-
public void updateOffsetMetadataAndSync(String topic, int partitionId) {
4563+
public void updateOffsetMetadataAndSync(int partitionId) {
45594564
PartitionConsumptionState pcs = getPartitionConsumptionState(partitionId);
45604565
updateOffsetMetadataInOffsetRecord(pcs);
4561-
syncOffset(topic, pcs);
4566+
syncOffset(pcs);
45624567
}
45634568

45644569
/**
@@ -5098,4 +5103,8 @@ protected static void validateEndOfPushReceivedBeforeTopicSwitch(
50985103
throw new VeniceMessageException(errorMessage);
50995104
}
51005105
}
5106+
5107+
AbstractStoreBufferService getStoreBufferService() {
5108+
return storeBufferService;
5109+
}
51015110
}

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@
158158
import com.linkedin.venice.pubsub.PubSubContext;
159159
import com.linkedin.venice.pubsub.PubSubPositionDeserializer;
160160
import com.linkedin.venice.pubsub.PubSubPositionTypeRegistry;
161+
import com.linkedin.venice.pubsub.PubSubTopicImpl;
161162
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
162163
import com.linkedin.venice.pubsub.PubSubTopicRepository;
163164
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
@@ -6267,6 +6268,60 @@ public void testResubscribeAsLeaderFromVersionTopic(boolean aaEnabled) throws In
62676268

62686269
}
62696270

6271+
@Test
6272+
public void testParallelShutdown() throws InterruptedException {
6273+
// Setup test data
6274+
StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class);
6275+
PartitionConsumptionState pcs = mock(PartitionConsumptionState.class);
6276+
List<CompletableFuture<Void>> shutdownFutures = new ArrayList<>();
6277+
ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor();
6278+
6279+
// Mock server config to enable checkpointing during shutdown
6280+
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
6281+
when(serverConfig.isServerIngestionCheckpointDuringGracefulShutdownEnabled()).thenReturn(true);
6282+
when(storeIngestionTask.getServerConfig()).thenReturn(serverConfig);
6283+
6284+
// Mock store buffer service
6285+
StoreBufferService storeBufferService = mock(StoreBufferService.class);
6286+
when(storeIngestionTask.getStoreBufferService()).thenReturn(storeBufferService);
6287+
when(storeBufferService.execSyncOffsetCommandAsync(any(), any()))
6288+
.thenReturn(CompletableFuture.completedFuture(null));
6289+
6290+
doCallRealMethod().when(storeIngestionTask).executeShutdownRunnable(any(), anyList(), any());
6291+
6292+
// Set up test data
6293+
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(new PubSubTopicImpl("test_topic_v1"), 0);
6294+
when(pcs.getReplicaTopicPartition()).thenReturn(topicPartition);
6295+
6296+
// Call the method under test
6297+
storeIngestionTask.executeShutdownRunnable(pcs, shutdownFutures, shutdownExecutor);
6298+
6299+
// Wait for async operation to complete
6300+
Assert.assertEquals(shutdownFutures.size(), 1);
6301+
shutdownFutures.forEach(CompletableFuture::join);
6302+
6303+
// Verify behavior
6304+
verify(storeIngestionTask).consumerUnSubscribeAllTopics(pcs);
6305+
verify(storeBufferService).execSyncOffsetCommandAsync(topicPartition, storeIngestionTask);
6306+
verify(storeIngestionTask).waitForAllMessageToBeProcessedFromTopicPartition(topicPartition, pcs);
6307+
6308+
// Test with null executor (synchronous execution)
6309+
shutdownFutures.clear();
6310+
storeIngestionTask.executeShutdownRunnable(pcs, shutdownFutures, null);
6311+
assertTrue(shutdownFutures.isEmpty(), "No futures should be added when executor is null");
6312+
verify(storeIngestionTask, times(2)).consumerUnSubscribeAllTopics(pcs);
6313+
6314+
// Test when checkpointing is disabled
6315+
when(serverConfig.isServerIngestionCheckpointDuringGracefulShutdownEnabled()).thenReturn(false);
6316+
storeIngestionTask.executeShutdownRunnable(pcs, shutdownFutures, shutdownExecutor);
6317+
Assert.assertEquals(shutdownFutures.size(), 1);
6318+
shutdownFutures.forEach(CompletableFuture::join);
6319+
verify(storeIngestionTask, times(3)).consumerUnSubscribeAllTopics(pcs);
6320+
6321+
// Clean up
6322+
shutdownExecutor.shutdown();
6323+
}
6324+
62706325
private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(
62716326
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride) {
62726327
// mock the store config

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,11 @@ private ConfigKeys() {
10481048
public static final String SERVER_ADAPTIVE_THROTTLER_READ_COMPUTE_GET_LATENCY_THRESHOLD =
10491049
"server.adaptive.throttler.read.compute.latency.threshold";
10501050

1051+
/**
1052+
* Config to enable parallel resource shutdown operation to speed up overall ingestion task shutdown.
1053+
*/
1054+
public static final String SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED = "server.parallel.resource.shutdown.enabled";
1055+
10511056
/**
10521057
* A list of fully-qualified class names of all stats classes that needs to be initialized in isolated ingestion process,
10531058
* separated by comma. This config will help isolated ingestion process to register extra stats needed for monitoring,

0 commit comments

Comments
 (0)