Skip to content
Open
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c37a275
Revert "upgrade xstream to 1.4.20 to pick up fixes for 2 CVEs (#2763)"
zpinto Mar 14, 2024
b362310
Merge pull request #1 from linkedin/zpinto/revert_xstream_bump
zpinto Mar 14, 2024
cc13eab
Merge pull request #2 from linkedin/master
zpinto Apr 30, 2024
3c94e69
[Linkedin/Helix] -- Provide JDK 1.8 (backward) compatibility for meta…
himanshukandwal May 9, 2024
15c4121
Merge pull request #4 from linkedin/hkandwal/enable-meta-client-jdk8-…
himanshukandwal May 9, 2024
72d305b
[Linkedin/Helix] -- Provide JDK 1.8 (backward) compatibility for heli…
himanshukandwal May 9, 2024
4c52f19
Merge pull request #5 from linkedin/hkandwal/enable-zk-cli-jdk8-compat
himanshukandwal May 9, 2024
46c4b37
Merge pull request #6 from linkedin/master
zpinto Jun 12, 2024
c71e8a0
Merge pull request #7 from linkedin/master
zpinto Jun 13, 2024
5edb9ee
Merge pull request #8 from apache/master
himanshukandwal Jul 24, 2024
6f39863
Merge branch 'apache:master' into release
abhilash1in Jul 29, 2024
2e2d4fe
Merge branch 'apache:master' into release
abhilash1in Jul 29, 2024
ca3adbc
Merge branch 'apache:master' into release
abhilash1in Aug 6, 2024
3bd9750
Merge pull request #12 from linkedin/1.4.3-dev-202412052251
zpinto Dec 10, 2024
1535a1a
Merge branch 'release' into gspencer/release-20250221
GrantPSpencer Feb 21, 2025
1344bbf
pin xstreams in helix-rest/pom.xml to 1.4.19 as 1.4.21 is banned at LI
GrantPSpencer Feb 21, 2025
eb54f63
Release for helix 1.4.3-dev-202502211050
GrantPSpencer Feb 21, 2025
5164c87
Merge pull request #26 from linkedin/xmarkgaox/release-20250507
MarkGaox May 7, 2025
98b332a
Created the Sensor for DistClusterController failure
vivek8420 Jul 21, 2025
aad3152
wip
vivek8420 Jul 22, 2025
0c74eb3
Moved the DistCluster state model sensor to clusterstatusmonitor
vivek8420 Jul 23, 2025
62ff616
Merge branch 'release' of github.com-personal:linkedin/helix into sen…
vivek8420 Jul 23, 2025
08f60f6
removed the unused changes
vivek8420 Jul 23, 2025
49c2749
removed the unused changes
vivek8420 Jul 23, 2025
4bb0cdd
Added the test for new sensor
vivek8420 Jul 23, 2025
c8ba3ba
added the unit tests
vivek8420 Jul 23, 2025
4fbe103
removed the .vscode files changes
vivek8420 Jul 23, 2025
9394247
Addressed the comments
vivek8420 Jul 23, 2025
1da4f4e
Addressed the comments
vivek8420 Jul 23, 2025
a2a08c7
Addressed the comments
vivek8420 Jul 23, 2025
755623e
Addressed the comments
vivek8420 Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
private AtomicLong _continuousResourceRebalanceFailureCount = new AtomicLong(0L);
private AtomicLong _continuousTaskRebalanceFailureCount = new AtomicLong(0L);
private AtomicLong _leaderFailureCount = new AtomicLong(0L);
private AtomicLong _resetLeaderFailureCount = new AtomicLong(0L);

private final ConcurrentHashMap<String, ResourceMonitor> _resourceMonitorMap =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -660,6 +662,8 @@ public void reset() {
_rebalanceFailureCount.set(0L);
_continuousResourceRebalanceFailureCount.set(0L);
_continuousTaskRebalanceFailureCount.set(0L);
_leaderFailureCount.set(0L);
_resetLeaderFailureCount.set(0L);
} catch (Exception e) {
LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
}
Expand Down Expand Up @@ -1137,4 +1141,28 @@ public long getNumOfResourcesRebalanceThrottledGauge() {
}
return total;
}

@Override
public long getLeaderFailureCounter() {
return _leaderFailureCount.get();
}

@Override
public long getResetLeaderFailureCounter() {
return _resetLeaderFailureCount.get();
}

/**
* Report a leadership failure for distributed controller
*/
public void reportLeaderFailure() {
_leaderFailureCount.incrementAndGet();
}

/**
* Report when controller is still leader during reset
*/
public void reportResetLeaderFailure() {
_resetLeaderFailureCount.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,14 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
* state partition is larger than configured threshold (default is 1).
*/
long getNumOfResourcesRebalanceThrottledGauge();

/**
* @return The number of leadership failures for distributed controllers
*/
long getLeaderFailureCounter();

/**
* @return The number of times controller was still leader during reset
*/
long getResetLeaderFailureCounter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.helix.NotificationContext;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.model.Message;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,8 +38,10 @@
@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"})
public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel {
private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class);

protected volatile Optional<HelixManager> _controllerOpt = Optional.empty();
private final Set<Pipeline.Type> _enabledPipelineTypes;
private ClusterStatusMonitor _clusterStatusMonitor;

// dedicated lock object to avoid cross-instance contention from Optional.empty() singleton
private final Object _controllerLock = new Object();
Expand Down Expand Up @@ -73,7 +76,15 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte
newController.connect();
newController.startTimerTasks();
_controllerOpt = Optional.of(newController);
logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
if (!newController.isLeader()) {
logger.warn("Controller Leader session is not the same as the current session for {}. "
+ "This should not happen. Controller: {}", clusterName ,controllerName);

// Publish metrics through ClusterStatusMonitor when not a leader
getClusterStatusMonitor(clusterName).reportLeaderFailure();
} else {
logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
}
} else {
logger.error("controller already exists:" + _controllerOpt.get().getInstanceName() + " for "
+ clusterName);
Expand All @@ -90,6 +101,7 @@ public void onBecomeStandbyFromLeader(Message message, NotificationContext conte

if (_controllerOpt.isPresent()) {
reset();
//TODO: log this message only if reset is successful
logStateTransition("LEADER", "STANDBY", clusterName, controllerName);
} else {
logger.error("No controller exists for " + clusterName);
Expand All @@ -104,6 +116,7 @@ public void onBecomeOfflineFromStandby(Message message, NotificationContext cont
@Override
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
reset();
//TODO: log this message only if reset is successful
logStateTransition("OFFLINE", "DROPPED", message == null ? "" : message.getPartitionName(),
message == null ? "" : message.getTgtName());
}
Expand All @@ -117,11 +130,33 @@ public String getStateModeInstanceDescription(String partitionName, String insta
public void reset() {
synchronized (_controllerLock) {
if (_controllerOpt.isPresent()) {
String clusterName = _controllerOpt.get().getClusterName();
logger.info("Disconnecting controller: " + _controllerOpt.get().getInstanceName() + " for "
+ _controllerOpt.get().getClusterName());
_controllerOpt.get().disconnect();
if(_controllerOpt.get().isLeader()) {
logger.warn("Controller is still leader after disconnecting: {} for {}",
_controllerOpt.get().getInstanceName(), _controllerOpt.get().getClusterName());

// Publish metrics when controller is still leader during reset
getClusterStatusMonitor(clusterName).reportResetLeaderFailure();
}
_controllerOpt = Optional.empty();
}

// Clean up cluster status monitor
if (_clusterStatusMonitor != null) {
_clusterStatusMonitor.reset();
_clusterStatusMonitor = null;
}
}
}

private ClusterStatusMonitor getClusterStatusMonitor(String clusterName) {
if (_clusterStatusMonitor == null) {
_clusterStatusMonitor = new ClusterStatusMonitor(clusterName);
_clusterStatusMonitor.active();
}
return _clusterStatusMonitor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -610,4 +610,75 @@ private void verifyMessageMetrics(ClusterStatusMonitor monitor, Map<String, Doub
}
}
}

@Test
public void testLeaderFailureMetrics() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;

ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
monitor.active();
ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());

Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));

// Initial state - leadership failure counter should be 0
Object initialCount = _server.getAttribute(clusterMonitorObjName, "LeaderFailureCounter");
Assert.assertTrue(initialCount instanceof Long);
Assert.assertEquals(initialCount, 0L);

// Report leadership failure multiple times
monitor.reportLeaderFailure();
monitor.reportLeaderFailure();
monitor.reportLeaderFailure();

// Verify counter increased
Object updatedCount = _server.getAttribute(clusterMonitorObjName, "LeaderFailureCounter");
Assert.assertTrue(updatedCount instanceof Long);
Assert.assertEquals(updatedCount, 3L);

// Verify getter method returns same value
Assert.assertEquals(monitor.getLeaderFailureCounter(), 3L);

// Clean up
monitor.reset();
Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
"Failed to unregister ClusterStatusMonitor.");
}

@Test
public void testResetLeaderFailureMetrics() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;

ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
monitor.active();
ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());

Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));

// Initial state - still leader during reset counter should be 0
Object initialCount = _server.getAttribute(clusterMonitorObjName, "ResetLeaderFailureCounter");
Assert.assertTrue(initialCount instanceof Long);
Assert.assertEquals((Long) initialCount, Long.valueOf(0));

// Report still leader during reset multiple times
monitor.reportResetLeaderFailure();
monitor.reportResetLeaderFailure();

// Verify counter increased
Object updatedCount = _server.getAttribute(clusterMonitorObjName, "ResetLeaderFailureCounter");
Assert.assertTrue(updatedCount instanceof Long);
Assert.assertEquals((Long) updatedCount, Long.valueOf(2));

// Verify getter method returns same value
Assert.assertEquals(monitor.getResetLeaderFailureCounter(), 2L);

// Clean up
monitor.reset();
Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
"Failed to unregister ClusterStatusMonitor.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.model.Message;
Expand Down Expand Up @@ -265,4 +266,36 @@ public void testExplicitLockIndependence() throws Exception {
Assert.assertTrue(instance1Interrupted.get(),
"Instance1 should have been interrupted while holding its lock");
}

@Test()
public void testOnBecomeLeaderFromStandby_whenMultipleInstancesTrigger() throws Exception {
// First controller becomes leader
Message message = new Message(MessageType.STATE_TRANSITION, "0");
message.setPartitionName(clusterName);
message.setTgtName("controller_0");
stateModel.onBecomeLeaderFromStandby(message, new NotificationContext(null));

// Second controller attempts to become leader
message = new Message(MessageType.STATE_TRANSITION, "1");
message.setPartitionName(clusterName);
message.setTgtName("controller_1");
DistClusterControllerStateModel stateModel2 = new DistClusterControllerStateModel(ZK_ADDR);
stateModel2.onBecomeLeaderFromStandby(message, new NotificationContext(null));

// Verify leadership states
// controller_0 is the leader of clusterName.
Assert.assertTrue(stateModel._controllerOpt.get().isLeader());
// controller_1 was not able to become leader because controller_0 was already the leader.
Assert.assertFalse(stateModel2._controllerOpt.get().isLeader());

// Verify that leadership failure metric was reported by accessing the internal monitor
// Use reflection to get the internal ClusterStatusMonitor from stateModel2
Field monitorField = DistClusterControllerStateModel.class.getDeclaredField("_clusterStatusMonitor");
monitorField.setAccessible(true);
ClusterStatusMonitor internalMonitor = (ClusterStatusMonitor) monitorField.get(stateModel2);

// The monitor should have been created and the metric should be incremented
Assert.assertEquals(internalMonitor.getLeaderFailureCounter(), 1,
"Leadership failure metric should be incremented when second controller fails to become leader");
}
}