Skip to content

Commit a05786e

Browse files
author
Minh Nguyen
committed
Add feature flag
1 parent c1ccac3 commit a05786e

File tree

6 files changed

+46
-12
lines changed

6 files changed

+46
-12
lines changed

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,13 @@ private ConfigKeys() {
530530
public static final String CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED =
531531
"child.controller.admin.topic.consumption.enabled";
532532

533+
/**
534+
* Whether to enable admin operation system store or not.
535+
* If yes, controller will register admin operation system store and process admin operations in rollback cases.
536+
*/
537+
public static final String CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED =
538+
"controller.admin.operation.system.store.enabled";
539+
533540
/**
534541
* This config defines the source region of aggregate hybrid store real-time data when native replication is enabled
535542
*/

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionWithProtocolRollbackIntegrationTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.linkedin.venice.controller.kafka.consumer;
22

3-
import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS;
4-
import static com.linkedin.venice.ConfigKeys.ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE;
3+
import static com.linkedin.venice.ConfigKeys.*;
54

65
import com.linkedin.venice.controller.VeniceParentHelixAdmin;
76
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
@@ -83,6 +82,10 @@ public void setUp() {
8382
Properties parentControllerProps = new Properties();
8483
parentControllerProps.put(ADMIN_CONSUMPTION_MAX_WORKER_THREAD_POOL_SIZE, adminConsumptionMaxWorkerPoolSize);
8584
parentControllerProps.put(ADMIN_CONSUMPTION_CYCLE_TIMEOUT_MS, 3000);
85+
parentControllerProps.put(CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED, true);
86+
87+
Properties childControllerProps = new Properties();
88+
childControllerProps.put(CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED, true);
8689
venice = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
8790
new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(1)
8891
.numberOfClusters(1)
@@ -92,6 +95,7 @@ public void setUp() {
9295
.numberOfRouters(1)
9396
.replicationFactor(1)
9497
.parentControllerProperties(parentControllerProps)
98+
.childControllerProperties(childControllerProps)
9599
.serverProperties(serverProperties)
96100
.build());
97101

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,11 @@ public class VeniceControllerClusterConfig {
659659
private final boolean useMultiRegionRealTimeTopicSwitcher;
660660
private final Set<String> activeActiveRealTimeSourceFabrics;
661661

662+
/**
663+
* Admin operation system store
664+
*/
665+
private final boolean isAdminOperationSystemStoreEnabled;
666+
662667
public VeniceControllerClusterConfig(VeniceProperties props) {
663668
this.props = props;
664669
this.clusterName = props.getString(CLUSTER_NAME);
@@ -1241,6 +1246,8 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
12411246
props.getBoolean(CONTROLLER_BACKUP_VERSION_REPLICA_REDUCTION_ENABLED, false);
12421247
this.useMultiRegionRealTimeTopicSwitcher =
12431248
props.getBoolean(ConfigKeys.CONTROLLER_USE_MULTI_REGION_REAL_TIME_TOPIC_SWITCHER_ENABLED, false);
1249+
this.isAdminOperationSystemStoreEnabled =
1250+
props.getBoolean(ConfigKeys.CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED, false);
12441251

12451252
this.logClusterConfig();
12461253
}
@@ -2185,6 +2192,10 @@ public boolean isRealTimeTopicVersioningEnabled() {
21852192
return isRealTimeTopicVersioningEnabled;
21862193
}
21872194

2195+
public boolean isAdminOperationSystemStoreEnabled() {
2196+
return isAdminOperationSystemStoreEnabled;
2197+
}
2198+
21882199
/**
21892200
* A function that would put a k/v pair into a map with some processing works.
21902201
*/

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@ public boolean isZkSharedMetaSystemSchemaStoreAutoCreationEnabled() {
275275
return getCommonConfig().isZkSharedMetaSystemSchemaStoreAutoCreationEnabled();
276276
}
277277

278+
public boolean isAdminOperationSystemStoreEnabled() {
279+
return getCommonConfig().isAdminOperationSystemStoreEnabled();
280+
}
281+
278282
public boolean isZkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled() {
279283
return getCommonConfig().isZkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled();
280284
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -697,15 +697,17 @@ public VeniceHelixAdmin(
697697
AvroProtocolDefinition.SERVER_STORE_PROPERTIES_PAYLOAD,
698698
multiClusterConfigs,
699699
this));
700-
initRoutines.add(
701-
new SystemSchemaInitializationRoutine(
702-
AvroProtocolDefinition.ADMIN_OPERATION,
703-
multiClusterConfigs,
704-
this,
705-
Optional.empty(),
706-
Optional.empty(),
707-
false,
708-
AdminOperation.class));
700+
if (multiClusterConfigs.isAdminOperationSystemStoreEnabled()) {
701+
initRoutines.add(
702+
new SystemSchemaInitializationRoutine(
703+
AvroProtocolDefinition.ADMIN_OPERATION,
704+
multiClusterConfigs,
705+
this,
706+
Optional.empty(),
707+
Optional.empty(),
708+
false,
709+
AdminOperation.class));
710+
}
709711

710712
if (multiClusterConfigs.isZkSharedMetaSystemSchemaStoreAutoCreationEnabled()) {
711713
// Add routine to create zk shared metadata system store
@@ -1116,6 +1118,10 @@ protected HelixAdmin getHelixAdmin() {
11161118
return this.admin;
11171119
}
11181120

1121+
public boolean isAdminOperationSystemStoreEnabled() {
1122+
return multiClusterConfigs.isAdminOperationSystemStoreEnabled();
1123+
}
1124+
11191125
/**
11201126
* Create a new ZK store and its configuration in the store repository and create schemas in the schema repository.
11211127
* @param clusterName Venice cluster where the store locates.

services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,9 @@ private Pair<Long, AdminOperation> extractExecutionIdAndAndAdminOperation(Defaul
902902
if (MessageType.PUT == messageType) {
903903
Put put = (Put) kafkaValue.payloadUnion;
904904
try {
905-
deserializer.fetchAndStoreSchemaIfAbsent(admin, put.schemaId);
905+
if (admin.isAdminOperationSystemStoreEnabled()) {
906+
deserializer.fetchAndStoreSchemaIfAbsent(admin, put.schemaId);
907+
}
906908
adminOperation = deserializer.deserialize(put.putValue, put.schemaId);
907909
executionIdFromPayload = adminOperation.executionId;
908910
} catch (Exception e) {

0 commit comments

Comments
 (0)