Skip to content

Commit 64541d3

Browse files
authored
Add test for topology migration by resource group (apache#2933)
Add test to prove and preserve the behavior of allowing a topology migration which can isolate shuffling to a single resource group.
1 parent 9b339b5 commit 64541d3

File tree

1 file changed

+333
-0
lines changed

1 file changed

+333
-0
lines changed
Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
package org.apache.helix.integration.controller;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.Date;
25+
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.UUID;
31+
import java.util.stream.Collectors;
32+
33+
import org.apache.helix.ConfigAccessor;
34+
import org.apache.helix.PropertyKey;
35+
import org.apache.helix.PropertyPathBuilder;
36+
import org.apache.helix.TestHelper;
37+
import org.apache.helix.common.ZkTestBase;
38+
import org.apache.helix.examples.LeaderStandbyStateModelFactory;
39+
import org.apache.helix.integration.manager.ClusterControllerManager;
40+
import org.apache.helix.integration.manager.MockParticipantManager;
41+
import org.apache.helix.model.BuiltInStateModelDefinitions;
42+
import org.apache.helix.model.ClusterConfig;
43+
import org.apache.helix.model.ControllerHistory;
44+
import org.apache.helix.model.ExternalView;
45+
import org.apache.helix.model.IdealState;
46+
import org.apache.helix.model.InstanceConfig;
47+
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
48+
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
49+
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
50+
import org.testng.Assert;
51+
import org.testng.annotations.AfterClass;
52+
import org.testng.annotations.BeforeClass;
53+
import org.testng.annotations.Test;
54+
55+
public class TestTopologyMigration extends ZkTestBase {
56+
private static final int START_PORT = 12918; // Starting port for mock participants
57+
private static int _nextStartPort = START_PORT; // Incremental port for participants
58+
private static final String TEST_CAPACITY_KEY = "TestCapacityKey";
59+
private static final int TEST_CAPACITY_VALUE = 100; // Default instance capacity for testing
60+
private static final String RACK = "rack"; // Rack identifier in topology
61+
private static final String HOST = "host"; // Host identifier in topology
62+
private static final String APPLICATION_INSTANCE_ID = "applicationInstanceId";
63+
private static final String MZ = "mz"; // Migrated zone identifier
64+
private static final String INIT_TOPOLOGY = String.format("/%s/%s", RACK, HOST);
65+
// Initial topology format
66+
private static final String MIGRATED_TOPOLOGY =
67+
String.format("/%s/%s/%s", MZ, HOST, APPLICATION_INSTANCE_ID); // New topology format
68+
private static final int INIT_ZONE_COUNT = 12; // Initial zone count
69+
private static final int MIGRATE_ZONE_COUNT = 6; // Zone count post-migration
70+
private static final int RESOURCE_COUNT = 2; // Number of resources in the cluster
71+
private static final int INSTANCES_PER_RESOURCE = 12; // Number of instances per resource
72+
private static final int PARTITIONS = 3; // Number of partitions
73+
private static final int REPLICA = 6; // Number of replicas
74+
private static final long DEFAULT_RESOURCE_DELAY_TIME = 1800000L;
75+
// Delay time for resource rebalance
76+
77+
private final String CLASS_NAME = getShortClassName(); // Test class name
78+
private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; // Cluster name for testing
79+
80+
protected ClusterControllerManager _controller; // Cluster controller instance
81+
private final List<MockParticipantManager> _participants = new ArrayList<>();
82+
// List of participant managers
83+
private final Set<String> _allDBs = new HashSet<>(); // Set of all databases
84+
private ZkHelixClusterVerifier _clusterVerifier; // Cluster verifier
85+
private ConfigAccessor _configAccessor; // Config accessor
86+
87+
/**
88+
* Sets up the test cluster and initializes participants before running tests.
89+
*/
90+
@BeforeClass
91+
public void beforeClass() throws Exception {
92+
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
93+
94+
_gSetupTool.addCluster(CLUSTER_NAME, true);
95+
96+
// Start cluster controller
97+
String controllerName = CONTROLLER_PREFIX + "_0";
98+
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
99+
_controller.syncStart();
100+
_configAccessor = new ConfigAccessor(_gZkClient);
101+
102+
// Set up cluster configuration and participants
103+
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
104+
setupClusterConfig(INIT_TOPOLOGY, RACK);
105+
setupInitResourcesAndParticipants();
106+
107+
// Initialize cluster verifier for validating state
108+
_clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
109+
.setResources(_allDBs).setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
110+
.build();
111+
}
112+
113+
/**
114+
* Cleans up after the test by stopping participants and dropping resources.
115+
*/
116+
@AfterClass
117+
public void afterClass() {
118+
// Drop all databases from the cluster
119+
for (String db : _allDBs) {
120+
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
121+
}
122+
123+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
124+
125+
// Stop all participants and controller
126+
for (MockParticipantManager participant : _participants) {
127+
participant.syncStop();
128+
}
129+
_controller.syncStop();
130+
}
131+
132+
/**
133+
* Sets up the cluster configuration with the given topology and fault zone type.
134+
*/
135+
private void setupClusterConfig(String topology, String faultZoneType) {
136+
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
137+
clusterConfig.stateTransitionCancelEnabled(true);
138+
clusterConfig.setDelayRebalaceEnabled(true);
139+
clusterConfig.setRebalanceDelayTime(DEFAULT_RESOURCE_DELAY_TIME);
140+
clusterConfig.setTopology(topology);
141+
clusterConfig.setFaultZoneType(faultZoneType);
142+
clusterConfig.setTopologyAwareEnabled(true);
143+
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(TEST_CAPACITY_KEY));
144+
clusterConfig.setDefaultInstanceCapacityMap(
145+
Collections.singletonMap(TEST_CAPACITY_KEY, TEST_CAPACITY_VALUE));
146+
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(TEST_CAPACITY_KEY, 1));
147+
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
148+
}
149+
150+
/**
151+
* Sets up initial resources and mock participants for the cluster.
152+
*/
153+
private void setupInitResourcesAndParticipants() throws Exception {
154+
for (int i = 0; i < RESOURCE_COUNT; i++) {
155+
String dbName = "TestDB_" + i;
156+
157+
// Create and start participants for the resource
158+
for (int j = 0; j < INSTANCES_PER_RESOURCE; j++) {
159+
String participantName = "localhost_" + _nextStartPort;
160+
161+
InstanceConfig instanceConfig = new InstanceConfig.Builder().setDomain(
162+
String.format("%s=%s, %s=%s", RACK, j % INIT_ZONE_COUNT, HOST, participantName))
163+
.addTag(dbName).build(participantName);
164+
165+
_gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, instanceConfig);
166+
167+
MockParticipantManager participant = createParticipant(participantName);
168+
participant.syncStart();
169+
_nextStartPort++;
170+
_participants.add(participant);
171+
}
172+
173+
// Set up IdealState for the resource
174+
IdealState is = createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
175+
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1);
176+
is.setResourceGroupName(dbName);
177+
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is);
178+
179+
_allDBs.add(dbName);
180+
}
181+
}
182+
183+
/**
184+
* Creates and starts a mock participant with a registered state model factory.
185+
*/
186+
private MockParticipantManager createParticipant(String participantName) throws Exception {
187+
MockParticipantManager participant =
188+
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10, null);
189+
participant.getStateMachineEngine()
190+
.registerStateModelFactory("LeaderStandby", new LeaderStandbyStateModelFactory());
191+
return participant;
192+
}
193+
194+
/**
195+
* Tests topology migration with and without domain updates, ensuring no shuffling occurs.
196+
*/
197+
@Test
198+
public void testTopologyMigrationByResourceGroup() throws Exception {
199+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
200+
201+
// Step 1: Migrate to new topology in maintenance mode
202+
Map<String, ExternalView> originalEVs = getEVs();
203+
List<InstanceConfig> instanceConfigs =
204+
_gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME).stream().map(
205+
instanceName -> _gSetupTool.getClusterManagementTool()
206+
.getInstanceConfig(CLUSTER_NAME, instanceName)).collect(Collectors.toList());
207+
208+
setAndVerifyMaintenanceMode(true);
209+
setupClusterConfig(MIGRATED_TOPOLOGY, MZ);
210+
migrateInstanceConfigTopology(instanceConfigs);
211+
setAndVerifyMaintenanceMode(false);
212+
213+
// Verify cluster did not have shuffling anywhere after
214+
// the migration to the new topology
215+
validateNoShufflingOccurred(originalEVs, null);
216+
217+
// Step 2: Update domain values for one resource group at a time
218+
for (String updatingDb : _allDBs) {
219+
Map<String, ExternalView> preMigrationEVs = getEVs();
220+
setAndVerifyMaintenanceMode(true);
221+
migrateDomainForResourceGroup(updatingDb);
222+
setAndVerifyMaintenanceMode(false);
223+
224+
// Verify cluster only had shuffling in the resource group that was updated
225+
validateNoShufflingOccurred(preMigrationEVs, updatingDb);
226+
}
227+
}
228+
229+
/**
230+
* Set MaintenanceMode and verify that controller has processed it.
231+
*/
232+
private void setAndVerifyMaintenanceMode(boolean enable) throws Exception {
233+
if (enable) {
234+
// Check that the cluster converged to the best possible state that should be calculated
235+
// by the controller before we change the maintenance mode.
236+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
237+
}
238+
239+
_gSetupTool.getClusterManagementTool()
240+
.manuallyEnableMaintenanceMode(CLUSTER_NAME, enable, "", Collections.emptyMap());
241+
242+
if (!enable) {
243+
// Check that the cluster converged to the best possible state that should be calculated
244+
// by the controller after we have changed the maintenance mode.
245+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
246+
}
247+
}
248+
249+
/**
250+
* Retrieves the ExternalViews for all databases in the cluster.
251+
*/
252+
private Map<String, ExternalView> getEVs() {
253+
Map<String, ExternalView> externalViews = new HashMap<>();
254+
for (String db : _allDBs) {
255+
externalViews.put(db,
256+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db));
257+
}
258+
return externalViews;
259+
}
260+
261+
/**
262+
* Compares the contents of two ExternalViews to determine if they are equal.
263+
*/
264+
private boolean compareExternalViews(ExternalView oldEV, ExternalView newEV) {
265+
if (oldEV == null || newEV == null) {
266+
return false;
267+
}
268+
269+
Map<String, Map<String, String>> oldEVMap = oldEV.getRecord().getMapFields();
270+
Map<String, Map<String, String>> newEVMap = newEV.getRecord().getMapFields();
271+
272+
if (oldEVMap.size() != newEVMap.size()) {
273+
return false;
274+
}
275+
276+
for (String partition : oldEVMap.keySet()) {
277+
if (!oldEVMap.get(partition).equals(newEVMap.get(partition))) {
278+
return false;
279+
}
280+
}
281+
return true;
282+
}
283+
284+
private void validateNoShufflingOccurred(Map<String, ExternalView> originalEVs,
285+
String shouldShuffleDB) {
286+
Map<String, ExternalView> updatedEVs = getEVs();
287+
for (String db : _allDBs) {
288+
if (db.equals(shouldShuffleDB)) {
289+
Assert.assertFalse(compareExternalViews(originalEVs.get(db), updatedEVs.get(db)),
290+
String.format("Expected shuffling didn't occur for database %s", db));
291+
} else {
292+
Assert.assertTrue(compareExternalViews(originalEVs.get(db), updatedEVs.get(db)),
293+
String.format("Unexpected shuffling occurred for database %s", db));
294+
}
295+
}
296+
}
297+
298+
private void migrateInstanceConfigTopology(List<InstanceConfig> instanceConfigs)
299+
throws Exception {
300+
301+
for (InstanceConfig instanceConfig : instanceConfigs) {
302+
String rackId = instanceConfig.getDomainAsMap().get(RACK);
303+
String hostId = instanceConfig.getDomainAsMap().get(HOST);
304+
305+
// Set new domain based on the new topology format
306+
String newDomain =
307+
String.format("%s=%s, %s=%s, %s=%s", MZ, rackId, HOST, hostId, APPLICATION_INSTANCE_ID,
308+
hostId);
309+
instanceConfig.setDomain(newDomain);
310+
311+
// Update the instance configuration in the cluster
312+
_gSetupTool.getClusterManagementTool()
313+
.setInstanceConfig(CLUSTER_NAME, instanceConfig.getInstanceName(), instanceConfig);
314+
}
315+
}
316+
317+
private void migrateDomainForResourceGroup(String resourceGroup) throws Exception {
318+
int instanceIndex = 0;
319+
for (MockParticipantManager participant : _participants) {
320+
InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool()
321+
.getInstanceConfig(CLUSTER_NAME, participant.getInstanceName());
322+
if (instanceConfig.containsTag(resourceGroup)) {
323+
Map<String, String> newDomain = instanceConfig.getDomainAsMap();
324+
newDomain.put(MZ, String.valueOf(instanceIndex % MIGRATE_ZONE_COUNT));
325+
newDomain.put(APPLICATION_INSTANCE_ID, UUID.randomUUID().toString());
326+
instanceConfig.setDomain(newDomain);
327+
_gSetupTool.getClusterManagementTool()
328+
.setInstanceConfig(CLUSTER_NAME, participant.getInstanceName(), instanceConfig);
329+
instanceIndex++;
330+
}
331+
}
332+
}
333+
}

0 commit comments

Comments
 (0)