Skip to content

Commit 71507d0

Browse files
authored
[controller][config][adminTool] Implement automated store-migration tasks and add pause/resume migration APIs to StoreMigrationManage (#1927)
* Implementation of store migration tasks, make the delay (in seconds) configurable in migration scheduler to ensure the integration test completes on time, add an isMigrating check to avoid scheduling duplicate migrations, and introduce a pauseAfterStep argument.
1 parent f91eeee commit 71507d0

File tree

26 files changed

+1746
-148
lines changed

26 files changed

+1746
-148
lines changed

clients/venice-admin-tool/README

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ java -jar venice-admin-tool-all.jar --list-storage-nodes --url <url> --cluster <
559559
java -jar venice-admin-tool-all.jar --list-store-push-info --url <url> --store <store> [--cluster <cluster>] [--partition-detail-enabled <partition-detail-enabled>]
560560
java -jar venice-admin-tool-all.jar --list-stores --url <url> --cluster <cluster> [--include-system-stores <include-system-stores>]
561561
java -jar venice-admin-tool-all.jar --migrate-store --url <url> --store <store> --cluster-src <cluster-src> --cluster-dest <cluster-dest>
562-
java -jar venice-admin-tool-all.jar --auto-migrate-store --url <url> --store <store> --cluster-src <cluster-src> --cluster-dest <cluster-dest> [--initial-step <initial-step>] [--abort-on-failure <abort-on-failure>]
562+
java -jar venice-admin-tool-all.jar --auto-migrate-store --url <url> --store <store> --cluster-src <cluster-src> --cluster-dest <cluster-dest> [--initial-step <initial-step>] [--paused-after-step <paused-after-step>] [--abort-on-failure <abort-on-failure>]
563563
java -jar venice-admin-tool-all.jar --migration-status --url <url> --store <store> --cluster-src <cluster-src> --cluster-dest <cluster-dest>
564564
java -jar venice-admin-tool-all.jar --monitor-data-recovery --url <url> --dest-fabric <dest-fabric> --datetime <datetime> [--stores <stores>] [--cluster <cluster>] [--interval <interval>]
565565
java -jar venice-admin-tool-all.jar --new-storage-persona --url <url> --cluster <cluster> --storage-persona <storage-persona> --storage-quota <storage-quota> --store <store> --owner <owner>

clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2495,6 +2495,8 @@ public static void autoMigrateStore(CommandLine cmd) {
24952495
Optional<Boolean> abortOnFailure =
24962496
Optional.ofNullable(getOptionalArgument(cmd, Arg.ABORT_ON_FAILURE)).map(Boolean::parseBoolean);
24972497
Optional<Integer> currStep = Optional.ofNullable(getOptionalArgument(cmd, Arg.INITIAL_STEP)).map(Integer::parseInt);
2498+
Optional<Integer> pauseAfterStep =
2499+
Optional.ofNullable(getOptionalArgument(cmd, Arg.PAUSE_AFTER_STEP)).map(Integer::parseInt);
24982500

24992501
if (srcClusterName.equals(destClusterName)) {
25002502
throw new VeniceException("Source and destination cluster cannot be the same!");
@@ -2508,7 +2510,7 @@ public static void autoMigrateStore(CommandLine cmd) {
25082510
assertStoreNotMigrating(srcControllerClient, storeName);
25092511

25102512
StoreMigrationResponse storeMigrationResponse =
2511-
srcControllerClient.autoMigrateStore(storeName, destClusterName, currStep, abortOnFailure);
2513+
srcControllerClient.autoMigrateStore(storeName, destClusterName, currStep, pauseAfterStep, abortOnFailure);
25122514
printObject(storeMigrationResponse);
25132515

25142516
if (storeMigrationResponse.isError()) {

clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ public enum Arg {
331331
"enum-schema-evolution-allowed", "esea", true, "Allow enum schema evolution for a store"
332332
), INITIAL_STEP("initial-step", "is", true, "Initial step of the auto store migration"),
333333
ABORT_ON_FAILURE("abort-on-failure", "aof", true, "Abort the auto store migration if any step fails"),
334+
PAUSE_AFTER_STEP("pause-after-step", "pas", true, "Pause the auto store migration after this step"),
334335
STORE_LIFECYCLE_HOOKS_LIST("store-lifecycle-hooks-list", "slhl", true, "List of store lifecycle hooks"),
335336
KEY_URN_COMPRESSION_EANBLED(
336337
"key-urn-compression-enabled", "kuce", true, "Enable/Disable key urn compression for a store."

clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public void testAutoMigrateStore() throws ParseException, IOException {
228228
storeAutoMigrationResponse.setSrcClusterName(srcCluster);
229229
storeAutoMigrationResponse.setCluster(dstCluster);
230230
storeAutoMigrationResponse.setName(storeName);
231-
when(srcControllerClient.autoMigrateStore(eq(storeName), eq(dstCluster), any(), any()))
231+
when(srcControllerClient.autoMigrateStore(eq(storeName), eq(dstCluster), any(), any(), any()))
232232
.thenReturn(storeAutoMigrationResponse);
233233

234234
StoreResponse storeResponse = new StoreResponse();
@@ -246,9 +246,11 @@ public void testAutoMigrateStore() throws ParseException, IOException {
246246
.thenReturn(destControllerClient);
247247

248248
AdminTool.autoMigrateStore(fullCmd);
249-
Mockito.verify(srcControllerClient).autoMigrateStore(storeName, dstCluster, Optional.of(0), Optional.of(true));
249+
Mockito.verify(srcControllerClient)
250+
.autoMigrateStore(storeName, dstCluster, Optional.of(0), Optional.empty(), Optional.of(true));
250251
AdminTool.autoMigrateStore(BasicCmd);
251-
Mockito.verify(srcControllerClient).autoMigrateStore(storeName, dstCluster, Optional.empty(), Optional.empty());
252+
Mockito.verify(srcControllerClient)
253+
.autoMigrateStore(storeName, dstCluster, Optional.empty(), Optional.empty(), Optional.empty());
252254

253255
srcStoreInfo.setMigrating(true);
254256
storeResponse.setStore(srcStoreInfo);

internal/venice-client-common/src/main/java/com/linkedin/venice/utils/RedundantExceptionFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ public class RedundantExceptionFilter {
1414
private static RedundantExceptionFilter singleton;
1515

1616
private final int bitSetSize;
17-
private final ScheduledExecutorService cleanerExecutor = Executors.newScheduledThreadPool(1);
17+
private final ScheduledExecutorService cleanerExecutor = Executors.newSingleThreadScheduledExecutor(
18+
new DaemonThreadFactory(this.getClass().getName() + "-RedundantExceptionFilterExecutor"));
1819

1920
private BitSet activeBitset;
2021
private BitSet oldBitSet;

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2939,6 +2939,17 @@ private ConfigKeys() {
29392939
*/
29402940
public static final String STORE_MIGRATION_MAX_RETRY_ATTEMPTS = "store.migration.max.retry.attempts";
29412941

2942+
/**
2943+
* (Only matters if MULTITASK_SCHEDULER_SERVICE_ENABLED true). Class name of {@link com.linkedin.venice.controller.multitaskscheduler.MultiTaskSchedulerService} implementation
2944+
*/
2945+
public static final String STORE_MIGRATION_FABRIC_LIST = "store.migration.fabric.list";
2946+
2947+
/**
2948+
* (Only matters if MULTITASK_SCHEDULER_SERVICE_ENABLED true). Class name of {@link com.linkedin.venice.controller.multitaskscheduler.MultiTaskSchedulerService} implementation
2949+
*/
2950+
public static final String STORE_MIGRATION_TASK_SCHEDULING_INTERVAL_SECONDS =
2951+
"store.migration.task.scheduling.interval.seconds";
2952+
29422953
/**
29432954
* The strategy for how to share memory-heavy objects used in the ingestion hot path.
29442955
*/

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public class ControllerApiConstants {
1212

1313
public static final String AUTO_STORE_MIGRATION_CURRENT_STEP = "auto_store_migration_current_step";
1414
public static final String AUTO_STORE_MIGRATION_ABORT_ON_FAILURE = "auto_store_migration_abort_on_failure";
15+
public static final String AUTO_STORE_MIGRATION_PAUSE_AFTER_STEP = "auto_store_migration_pause_after_step";
1516
/**
1617
* @deprecated Use {@link #STORE_NAME} instead.
1718
*/

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static com.linkedin.venice.controllerapi.ControllerApiConstants.AMPLIFICATION_FACTOR;
66
import static com.linkedin.venice.controllerapi.ControllerApiConstants.AUTO_STORE_MIGRATION_ABORT_ON_FAILURE;
77
import static com.linkedin.venice.controllerapi.ControllerApiConstants.AUTO_STORE_MIGRATION_CURRENT_STEP;
8+
import static com.linkedin.venice.controllerapi.ControllerApiConstants.AUTO_STORE_MIGRATION_PAUSE_AFTER_STEP;
89
import static com.linkedin.venice.controllerapi.ControllerApiConstants.BATCH_JOB_HEARTBEAT_ENABLED;
910
import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER;
1011
import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER_DEST;
@@ -627,9 +628,11 @@ public StoreMigrationResponse autoMigrateStore(
627628
String storeName,
628629
String destClusterName,
629630
Optional<Integer> currStep,
631+
Optional<Integer> pauseAfterStep,
630632
Optional<Boolean> abortOnFailure) {
631633
QueryParams params = newParams().add(NAME, storeName).add(CLUSTER_DEST, destClusterName);
632634
currStep.ifPresent(cs -> params.add(AUTO_STORE_MIGRATION_CURRENT_STEP, cs));
635+
pauseAfterStep.ifPresent(pas -> params.add(AUTO_STORE_MIGRATION_PAUSE_AFTER_STEP, pas));
633636
abortOnFailure.ifPresent(aof -> params.add(AUTO_STORE_MIGRATION_ABORT_ON_FAILURE, aof));
634637
params.add(AUTO_STORE_MIGRATION_ABORT_ON_FAILURE, abortOnFailure);
635638
return request(ControllerRoute.AUTO_MIGRATE_STORE, params, StoreMigrationResponse.class);

0 commit comments

Comments
 (0)