Skip to content

Commit 8a8e362

Browse files
committed
Added integration test
1 parent dc01155 commit 8a8e362

File tree

2 files changed

+320
-5
lines changed

2 files changed

+320
-5
lines changed

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,10 @@ public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebala
8181
Long totalCapacity = clusterModel.getContext().getClusterCapacityMap().get(capacityKey);
8282
Long remainingCapacity = clusterRemainingCap.getValue();
8383
Long totalUsage = totalCapacity - remainingCapacity;
84-
LOG.error("Insufficient {} capacity in cluster. Total capacity: {}, Total usage required: {}, Deficit: {}",
85-
capacityKey, totalCapacity, totalUsage, Math.abs(remainingCapacity));
8684
throw new HelixRebalanceException(String
87-
.format("The cluster does not have enough %s capacity for all partitions. Total capacity: %d, Required: %d, Deficit: %d",
88-
capacityKey, totalCapacity, totalUsage, Math.abs(remainingCapacity)),
89-
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
85+
.format("The cluster '%s' does not have enough %s capacity for all partitions. Total capacity: %d, Required: %d, Deficit: %d",
86+
clusterModel.getContext().getClusterName(), capacityKey, totalCapacity, totalUsage, Math.abs(remainingCapacity)),
87+
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
9088
}
9189
// estimate remain capacity after assignment + %1 of current cluster capacity before assignment
9290
positiveEstimateClusterRemainCap.put(capacityKey,
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
package org.apache.helix.integration.rebalancer.WagedRebalancer;
2+
3+
import java.util.ArrayList;
4+
import java.util.Date;
5+
import java.util.HashSet;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Set;
9+
10+
import com.google.common.collect.ImmutableMap;
11+
import org.apache.helix.HelixDataAccessor;
12+
import org.apache.helix.TestHelper;
13+
import org.apache.helix.common.ZkTestBase;
14+
import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
15+
import org.apache.helix.integration.manager.ClusterControllerManager;
16+
import org.apache.helix.integration.manager.MockParticipantManager;
17+
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
18+
import org.apache.helix.model.BuiltInStateModelDefinitions;
19+
import org.apache.helix.model.ClusterConfig;
20+
import org.apache.helix.model.ExternalView;
21+
import org.apache.helix.model.IdealState;
22+
import org.apache.helix.model.InstanceConfig;
23+
import org.apache.helix.model.ResourceConfig;
24+
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
import org.testng.Assert;
28+
import org.testng.annotations.AfterClass;
29+
import org.testng.annotations.BeforeClass;
30+
import org.testng.annotations.Test;
31+
32+
/**
33+
* Integration test for integer overflow fix in WAGED rebalancer capacity calculations.
34+
*
35+
* This test verifies that the WAGED rebalancer correctly handles capacity values that
36+
* would cause integer overflow when aggregated across many instances.
37+
*
38+
* Background:
39+
* - Prior to the fix, capacity was calculated using Integer (32-bit signed)
40+
* - Integer.MAX_VALUE = 2,147,483,647
41+
* - When total cluster capacity exceeded this, it wrapped to negative values
42+
* - This caused "insufficient capacity" errors even in nearly empty clusters
43+
*
44+
* Test Scenario:
45+
* - Creates 10 instances with 250,000,000 DISK capacity each
46+
* - Total DISK: 10 × 250,000,000 = 2,500,000,000 > Integer.MAX_VALUE
47+
* - With the Long-based fix, this should work correctly
48+
* - Without the fix, capacity would wrap to negative and cause failures
49+
*
50+
* @see https://github.com/apache/helix/issues/3084
51+
*/
52+
public class TestWagedCapacityIntegerOverflow extends ZkTestBase {
53+
private static final Logger LOG = LoggerFactory.getLogger(TestWagedCapacityIntegerOverflow.class);
54+
55+
// Test configuration
56+
protected final int NUM_NODE = 10;
57+
protected static final int START_PORT = 14000;
58+
protected static final int NUM_RESOURCES = 5;
59+
protected static final int PARTITIONS_PER_RESOURCE = 10;
60+
protected static final int REPLICAS = 3;
61+
62+
// Capacity values that will cause overflow when summed
63+
// 10 instances × 250,000,000 = 2,500,000,000 > Integer.MAX_VALUE (2,147,483,647)
64+
private static final int DISK_CAPACITY = 250_000_000;
65+
private static final int CU_CAPACITY = 13_000;
66+
private static final int PARTCOUNT_CAPACITY = 800;
67+
68+
// Partition weights (small values)
69+
private static final int DISK_WEIGHT = 1000;
70+
private static final int CU_WEIGHT = 10;
71+
private static final int PARTCOUNT_WEIGHT = 1;
72+
73+
protected final String CLASS_NAME = getShortClassName();
74+
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
75+
protected ClusterControllerManager _controller;
76+
protected AssignmentMetadataStore _assignmentMetadataStore;
77+
protected StrictMatchExternalViewVerifier _clusterVerifier;
78+
79+
List<MockParticipantManager> _participants = new ArrayList<>();
80+
List<String> _nodes = new ArrayList<>();
81+
private Set<String> _allDBs = new HashSet<>();
82+
83+
@BeforeClass
84+
public void beforeClass() throws Exception {
85+
LOG.info("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
86+
87+
// Create cluster
88+
_gSetupTool.addCluster(CLUSTER_NAME, true);
89+
90+
// Configure cluster for WAGED rebalancer with high capacity values
91+
HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
92+
ClusterConfig clusterConfig = dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
93+
94+
// Set instance capacity keys
95+
List<String> capacityKeys = new ArrayList<>();
96+
capacityKeys.add("CU");
97+
capacityKeys.add("DISK");
98+
capacityKeys.add("PARTCOUNT");
99+
clusterConfig.setInstanceCapacityKeys(capacityKeys);
100+
101+
// Set default instance capacity with values that will overflow when summed
102+
Map<String, Integer> defaultInstanceCapacity = ImmutableMap.of(
103+
"CU", CU_CAPACITY,
104+
"DISK", DISK_CAPACITY, // This is the critical value for overflow test
105+
"PARTCOUNT", PARTCOUNT_CAPACITY
106+
);
107+
clusterConfig.setDefaultInstanceCapacityMap(defaultInstanceCapacity);
108+
109+
// Set default partition weights (small values)
110+
Map<String, Integer> defaultPartitionWeight = ImmutableMap.of(
111+
"CU", CU_WEIGHT,
112+
"DISK", DISK_WEIGHT,
113+
"PARTCOUNT", PARTCOUNT_WEIGHT
114+
);
115+
clusterConfig.setDefaultPartitionWeightMap(defaultPartitionWeight);
116+
117+
dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), clusterConfig);
118+
119+
// Log the configuration for verification
120+
long totalDiskCapacity = (long) NUM_NODE * DISK_CAPACITY;
121+
LOG.info("Cluster configuration:");
122+
LOG.info(" Instances: {}", NUM_NODE);
123+
LOG.info(" DISK capacity per instance: {}", DISK_CAPACITY);
124+
LOG.info(" Total DISK capacity: {}", totalDiskCapacity);
125+
LOG.info(" Integer.MAX_VALUE: {}", Integer.MAX_VALUE);
126+
LOG.info(" Overflow test: {} > {} = {}",
127+
totalDiskCapacity, Integer.MAX_VALUE, totalDiskCapacity > Integer.MAX_VALUE);
128+
129+
// Verify overflow condition
130+
Assert.assertTrue(totalDiskCapacity > Integer.MAX_VALUE,
131+
"Total capacity must exceed Integer.MAX_VALUE to test overflow fix");
132+
133+
// Add instances
134+
for (int i = 0; i < NUM_NODE; i++) {
135+
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
136+
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
137+
_nodes.add(storageNodeName);
138+
139+
// Optionally set per-instance capacity (to test both default and per-instance configs)
140+
if (i % 2 == 0) {
141+
InstanceConfig instanceConfig = dataAccessor.getProperty(
142+
dataAccessor.keyBuilder().instanceConfig(storageNodeName));
143+
Map<String, Integer> instanceCapacity = ImmutableMap.of(
144+
"CU", CU_CAPACITY,
145+
"DISK", DISK_CAPACITY + (i * 1000), // Slightly vary capacity
146+
"PARTCOUNT", PARTCOUNT_CAPACITY
147+
);
148+
instanceConfig.setInstanceCapacityMap(instanceCapacity);
149+
dataAccessor.setProperty(dataAccessor.keyBuilder().instanceConfig(storageNodeName),
150+
instanceConfig);
151+
}
152+
}
153+
154+
// Start participants
155+
for (String node : _nodes) {
156+
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
157+
participant.syncStart();
158+
_participants.add(participant);
159+
}
160+
161+
// Start controller
162+
String controllerName = CONTROLLER_PREFIX + "_0";
163+
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
164+
_controller.syncStart();
165+
166+
// Enable persist best possible assignment
167+
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
168+
169+
// Initialize cluster verifier
170+
_clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME)
171+
.setZkClient(_gZkClient)
172+
.setDeactivatedNodeAwareness(true)
173+
.setResources(_allDBs)
174+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
175+
.build();
176+
}
177+
178+
@AfterClass
179+
public void afterClass() throws Exception {
180+
// Stop participants
181+
for (MockParticipantManager participant : _participants) {
182+
if (participant != null && participant.isConnected()) {
183+
participant.syncStop();
184+
}
185+
}
186+
_participants.clear();
187+
188+
// Stop controller
189+
if (_controller != null && _controller.isConnected()) {
190+
_controller.syncStop();
191+
}
192+
193+
// Stop verifier
194+
if (_clusterVerifier != null) {
195+
_clusterVerifier.close();
196+
}
197+
198+
// Delete cluster
199+
deleteCluster(CLUSTER_NAME);
200+
201+
LOG.info("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
202+
}
203+
204+
/**
205+
* Test that verifies rebalancing works correctly when total capacity exceeds Integer.MAX_VALUE.
206+
*
207+
* This test would FAIL with the old Integer-based capacity calculation due to overflow,
208+
* but should SUCCEED with the Long-based fix.
209+
*/
210+
@Test
211+
public void testRebalanceWithCapacityOverflow() throws Exception {
212+
LOG.info("Starting testRebalanceWithCapacityOverflow");
213+
214+
// Create resources with WAGED rebalancer
215+
for (int i = 0; i < NUM_RESOURCES; i++) {
216+
String dbName = "TestDB_" + i;
217+
createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
218+
BuiltInStateModelDefinitions.MasterSlave.name(),
219+
PARTITIONS_PER_RESOURCE, REPLICAS, REPLICAS);
220+
_allDBs.add(dbName);
221+
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, REPLICAS);
222+
223+
// Set partition capacity for the resource
224+
HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
225+
ResourceConfig resourceConfig = new ResourceConfig(dbName);
226+
Map<String, Integer> partitionCapacity = ImmutableMap.of(
227+
"CU", CU_WEIGHT,
228+
"DISK", DISK_WEIGHT,
229+
"PARTCOUNT", PARTCOUNT_WEIGHT
230+
);
231+
232+
try {
233+
Map<String, Map<String, Integer>> partitionCapacityMap = new java.util.HashMap<>();
234+
partitionCapacityMap.put(ResourceConfig.DEFAULT_PARTITION_KEY, partitionCapacity);
235+
resourceConfig.setPartitionCapacityMap(partitionCapacityMap);
236+
dataAccessor.setProperty(dataAccessor.keyBuilder().resourceConfig(dbName), resourceConfig);
237+
} catch (Exception e) {
238+
LOG.warn("Failed to set partition capacity for {}: {}", dbName, e.getMessage());
239+
}
240+
}
241+
242+
// Update verifier with all resources
243+
_clusterVerifier.close();
244+
_clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME)
245+
.setZkClient(_gZkClient)
246+
.setDeactivatedNodeAwareness(true)
247+
.setResources(_allDBs)
248+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
249+
.build();
250+
251+
// Verify cluster reaches stable state
252+
// This is the key assertion - rebalancing should succeed despite capacity > Integer.MAX_VALUE
253+
Assert.assertTrue(_clusterVerifier.verifyByPolling(),
254+
"Cluster should reach stable state with capacity exceeding Integer.MAX_VALUE");
255+
256+
// Verify all resources have correct external view
257+
HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
258+
for (String dbName : _allDBs) {
259+
IdealState idealState = dataAccessor.getProperty(
260+
dataAccessor.keyBuilder().idealStates(dbName));
261+
ExternalView externalView = dataAccessor.getProperty(
262+
dataAccessor.keyBuilder().externalView(dbName));
263+
264+
Assert.assertNotNull(idealState, "IdealState should exist for " + dbName);
265+
Assert.assertNotNull(externalView, "ExternalView should exist for " + dbName);
266+
Assert.assertEquals(externalView.getPartitionSet().size(), PARTITIONS_PER_RESOURCE,
267+
"All partitions should be assigned for " + dbName);
268+
269+
// Verify each partition has correct number of replicas
270+
for (String partition : externalView.getPartitionSet()) {
271+
Map<String, String> stateMap = externalView.getStateMap(partition);
272+
Assert.assertNotNull(stateMap, "State map should exist for partition " + partition);
273+
Assert.assertEquals(stateMap.size(), REPLICAS,
274+
"Partition " + partition + " should have " + REPLICAS + " replicas");
275+
276+
// Verify there's exactly one MASTER
277+
long masterCount = stateMap.values().stream()
278+
.filter(state -> "MASTER".equals(state))
279+
.count();
280+
Assert.assertEquals(masterCount, 1,
281+
"Partition " + partition + " should have exactly one MASTER");
282+
}
283+
}
284+
285+
LOG.info("Successfully verified cluster with total DISK capacity {} > Integer.MAX_VALUE ({})",
286+
(long) NUM_NODE * DISK_CAPACITY, Integer.MAX_VALUE);
287+
}
288+
289+
/**
290+
* Test adding a new resource after initial rebalancing still works correctly.
291+
* Verifies that capacity overflow fix works for incremental changes.
292+
*/
293+
@Test
294+
public void testAddResourceWithOverflowCapacity() throws Exception {
295+
LOG.info("Starting testAddResourceWithOverflowCapacity");
296+
297+
String newDbName = "TestDB_New";
298+
createResourceWithWagedRebalance(CLUSTER_NAME, newDbName,
299+
BuiltInStateModelDefinitions.MasterSlave.name(),
300+
PARTITIONS_PER_RESOURCE, REPLICAS, REPLICAS);
301+
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, newDbName, REPLICAS);
302+
_allDBs.add(newDbName);
303+
304+
// Verify cluster still reaches stable state
305+
HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
306+
Assert.assertTrue(TestHelper.verify(() -> {
307+
ExternalView externalView = dataAccessor.getProperty(
308+
dataAccessor.keyBuilder().externalView(newDbName));
309+
return externalView != null &&
310+
externalView.getPartitionSet().size() == PARTITIONS_PER_RESOURCE;
311+
}, TestHelper.WAIT_DURATION),
312+
"New resource should be assigned correctly despite capacity overflows from integer limit");
313+
314+
LOG.info("Successfully added new resource");
315+
}
316+
}
317+

0 commit comments

Comments
 (0)