Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
Expand All @@ -35,6 +36,7 @@
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
Expand Down Expand Up @@ -81,6 +83,8 @@ public void process(ClusterEvent event) throws Exception {
Map<String, LiveInstance> liveInstanceMap = dataProvider.getLiveInstances();
Map<String, Set<Message>> instanceMessageMap = Maps.newHashMap();
Map<String, InstanceConfig> instanceConfigMap = dataProvider.getInstanceConfigMap();
Map<String, Long> errorPartitionCounts = Maps.newHashMap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change variable name to instanceErrorPartitionCounts


for (Map.Entry<String, InstanceConfig> e : instanceConfigMap.entrySet()) {
String instanceName = e.getKey();
InstanceConfig config = e.getValue();
Expand All @@ -89,6 +93,10 @@ public void process(ClusterEvent event) throws Exception {
liveInstanceSet.add(instanceName);
instanceMessageMap.put(instanceName,
Sets.newHashSet(dataProvider.getMessages(instanceName).values()));

// Count ERROR partitions for this live instance
long errorCount = countErrorPartitions(dataProvider, instanceName);
errorPartitionCounts.put(instanceName, errorCount);
}
if (!config.getInstanceEnabled()) {
disabledInstanceSet.add(instanceName);
Expand All @@ -104,7 +112,7 @@ public void process(ClusterEvent event) throws Exception {
clusterStatusMonitor
.setClusterInstanceStatus(liveInstanceSet, instanceSet, disabledInstanceSet,
disabledPartitions, oldDisabledPartitions, tags, instanceMessageMap,
instanceConfigMap);
instanceConfigMap, errorPartitionCounts);
LogUtil.logDebug(logger, _eventId, "Complete cluster status monitors update.");
}
return null;
Expand All @@ -122,4 +130,45 @@ public Object call() {
});
}
}

/**
* Count the number of partitions in ERROR state for a given instance
* @param dataProvider the data provider containing current state information
* @param instanceName the name of the instance to check
* @return the count of partitions in ERROR state
*/
private long countErrorPartitions(BaseControllerDataProvider dataProvider, String instanceName) {
long errorCount = 0L;

try {
Map<String, LiveInstance> liveInstances = dataProvider.getLiveInstances();
LiveInstance liveInstance = liveInstances.get(instanceName);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the nested format to un nested format. For if liveInstance is not null, return errorCount.

if (liveInstance != null) {
String sessionId = liveInstance.getEphemeralOwner();
Map<String, CurrentState> currentStateMap =
dataProvider.getCurrentState(instanceName, sessionId, false);

if (currentStateMap != null) {
for (CurrentState currentState : currentStateMap.values()) {
if (currentState != null) {
Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
if (partitionStateMap != null) {
for (String state : partitionStateMap.values()) {
if (HelixDefinedState.ERROR.name().equalsIgnoreCase(state)) {
errorCount++;
}
}
}
}
}
}
}
} catch (Exception e) {
LogUtil.logWarn(logger, _eventId,
"Failed to count error partitions for instance: " + instanceName, e);
}

return errorCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,13 @@ private void unregister(ObjectName name) {
* @param tags a map of instance name to the set of tags on it
* @param instanceMessageMap a map of pending messages from each live instance
* @param instanceConfigMap a map of instance name to InstanceConfig (for operation tracking)
* @param errorPartitionCounts a map of instance name to the count of partitions in ERROR state
*/
public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String> instanceSet,
Set<String> disabledInstanceSet, Map<String, Map<String, List<String>>> disabledPartitions,
Map<String, List<String>> oldDisabledPartitions, Map<String, Set<String>> tags,
Map<String, Set<Message>> instanceMessageMap, Map<String, InstanceConfig> instanceConfigMap) {
Map<String, Set<Message>> instanceMessageMap, Map<String, InstanceConfig> instanceConfigMap,
Map<String, Long> errorPartitionCounts) {
synchronized (_instanceMonitorMap) {
// Unregister beans for instances that are no longer configured
Set<String> toUnregister = Sets.newHashSet(_instanceMonitorMap.keySet());
Expand All @@ -288,9 +290,11 @@ public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String> in
try {
ObjectName objectName = getObjectName(getInstanceBeanName(instanceName));
InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName, objectName);
long errorPartitionCount = errorPartitionCounts != null && errorPartitionCounts.containsKey(instanceName)
? errorPartitionCounts.get(instanceName) : 0L;
bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
!disabledInstanceSet.contains(instanceName));
!disabledInstanceSet.contains(instanceName), errorPartitionCount);
monitorsToRegister.add(bean);
} catch (MalformedObjectNameException ex) {
LOG.error("Failed to create instance monitor for instance: {}.", instanceName);
Expand Down Expand Up @@ -322,9 +326,11 @@ public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String> in
// Update the bean
InstanceMonitor bean = _instanceMonitorMap.get(instanceName);
String oldSensorName = bean.getSensorName();
long errorPartitionCount = errorPartitionCounts != null && errorPartitionCounts.containsKey(instanceName)
? errorPartitionCounts.get(instanceName) : 0L;
bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
!disabledInstanceSet.contains(instanceName));
!disabledInstanceSet.contains(instanceName), errorPartitionCount);

// Update instance operation duration metrics
if (instanceConfigMap != null && instanceConfigMap.containsKey(instanceName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public enum InstanceMonitorMetric {
MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge"),
MESSAGE_QUEUE_SIZE_GAUGE("MessageQueueSizeGauge"),
PASTDUE_MESSAGE_GAUGE("PastDueMessageGauge"),
ERROR_PARTITIONS_GAUGE("ErrorPartitions"),
INSTANCE_OPERATION_DURATION_ENABLE_GAUGE("InstanceOperationDuration_ENABLE"),
INSTANCE_OPERATION_DURATION_DISABLE_GAUGE("InstanceOperationDuration_DISABLE"),
INSTANCE_OPERATION_DURATION_EVACUATE_GAUGE("InstanceOperationDuration_EVACUATE"),
Expand Down Expand Up @@ -89,6 +90,7 @@ public String metricName() {
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
private SimpleDynamicMetric<Long> _messageQueueSizeGauge;
private SimpleDynamicMetric<Long> _pastDueMessageGauge;
private SimpleDynamicMetric<Long> _errorPartitionsGauge;
private SimpleDynamicMetric<Long> _partitionCountGauge;
private SimpleDynamicMetric<Long> _topStatePartitionCountGauge;

Expand Down Expand Up @@ -148,6 +150,8 @@ private void createMetrics() {
_pastDueMessageGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.PASTDUE_MESSAGE_GAUGE.metricName(),
0L);
_errorPartitionsGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.ERROR_PARTITIONS_GAUGE.metricName(),
_partitionCountGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.PARTITION_COUNT_GAUGE.metricName(),
0L);
Expand Down Expand Up @@ -178,6 +182,7 @@ private void createMetrics() {
_maxCapacityUsageGauge,
_messageQueueSizeGauge,
_pastDueMessageGauge,
_errorPartitionsGauge,
_partitionCountGauge,
_topStatePartitionCountGauge,
_instanceOperationDurationEnableGauge,
Expand Down Expand Up @@ -219,6 +224,7 @@ protected long getDisabledPartitions() {

protected long getPastDueMessageGauge() { return _pastDueMessageGauge.getValue(); }

protected long getErrorPartitions() { return _errorPartitionsGauge.getValue(); }
protected long getPartitionCount() { return _partitionCountGauge.getValue(); }

protected long getTopStatePartitionCount() { return _topStatePartitionCountGauge.getValue(); }
Expand Down Expand Up @@ -281,10 +287,11 @@ private String serializedTags() {
* @param disabledPartitions current disabled partitions
* @param isLive true if running, false otherwise
* @param isEnabled true if enabled, false if disabled
* @param errorPartitionCount number of partitions in ERROR state on this instance
*/
public synchronized void updateInstance(Set<String> tags,
Map<String, List<String>> disabledPartitions, List<String> oldDisabledPartitions,
boolean isLive, boolean isEnabled) {
boolean isLive, boolean isEnabled, long errorPartitionCount) {
if (tags == null || tags.isEmpty()) {
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
} else {
Expand Down Expand Up @@ -313,6 +320,7 @@ public synchronized void updateInstance(Set<String> tags,
_enabledStatusGauge.updateValue(isEnabled ? 1L : 0L);
_disabledPartitionsGauge.updateValue(numDisabledPartitions);
_allPartitionsDisabledGauge.updateValue(allPartitionsDisabled ? 1L : 0L);
_errorPartitionsGauge.updateValue(errorPartitionCount);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void testMessageMetrics() throws Exception {

monitor.setClusterInstanceStatus(liveInstanceSet, liveInstanceSet, Collections.emptySet(),
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), instanceMessageMap,
Collections.emptyMap());
Collections.emptyMap(), Collections.emptyMap());

Assert.assertEquals(monitor.getInstanceMessageQueueBacklog(), 25 * n);
Assert.assertEquals(monitor.getTotalPastDueMessageGauge(), 15 * n);
Expand Down Expand Up @@ -446,7 +446,8 @@ public void testUpdateInstanceCapacityStatus()
// Call setClusterInstanceStatus to register instance monitors.
monitor.setClusterInstanceStatus(maxUsageMap.keySet(), maxUsageMap.keySet(),
Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap());

// Update instance capacity status.
for (Map.Entry<String, Double> usageEntry : maxUsageMap.entrySet()) {
Expand Down Expand Up @@ -694,7 +695,8 @@ public void testClusterLevelInstanceOperationCounts() throws Exception {
Collections.emptyMap(), // oldDisabledPartitions
Collections.emptyMap(), // tags
Collections.emptyMap(), // instanceMessageMap
instanceConfigMap // instanceConfigMap
instanceConfigMap, // instanceConfigMap
Collections.emptyMap() // errorPartitionCounts
);

// Verify cluster-level counts
Expand Down Expand Up @@ -750,7 +752,8 @@ public void testClusterLevelInstanceOperationCounts() throws Exception {
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
instanceConfigMap
instanceConfigMap,
Collections.emptyMap()
);

// Verify updated counts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testInstanceMonitor()
// Update metrics.
monitor.updateMaxCapacityUsage(0.5d);
monitor.increaseMessageCount(10L);
monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(), true, true);
monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(), true, true, 0L);
monitor.updateMessageQueueSize(100L);
monitor.updatePastDueMessageGauge(50L);

Expand All @@ -77,6 +77,7 @@ public void testInstanceMonitor()
Assert.assertEquals(monitor.getMaxCapacityUsageGauge(), 0.5d);
Assert.assertEquals(monitor.getMessageQueueSizeGauge(), 100L);
Assert.assertEquals(monitor.getPastDueMessageGauge(), 50L);
Assert.assertEquals(monitor.getErrorPartitions(), 0L);

monitor.unregister();
}
Expand Down Expand Up @@ -321,6 +322,39 @@ public void testInstanceOperationDurationWithInstanceConfigAPI()
}

@Test
public void testErrorPartitionsGauge() throws JMException {
String testCluster = "testCluster";
String testInstance = "testInstance";
String testDomain = "testDomain:key=value";
Set<String> tags = ImmutableSet.of("test");

InstanceMonitor monitor =
new InstanceMonitor(testCluster, testInstance, new ObjectName(testDomain));

// Verify initial state - no error partitions
Assert.assertEquals(monitor.getErrorPartitions(), 0L);

// Simulate instance with 3 error partitions
monitor.updateInstance(tags, ImmutableMap.of(), Collections.emptyList(), true, true, 3L);
Assert.assertEquals(monitor.getErrorPartitions(), 3L);

// Update with more error partitions
monitor.updateInstance(tags, ImmutableMap.of(), Collections.emptyList(), true, true, 5L);
Assert.assertEquals(monitor.getErrorPartitions(), 5L);

// Update with zero error partitions (partitions recovered)
monitor.updateInstance(tags, ImmutableMap.of(), Collections.emptyList(), true, true, 0L);
Assert.assertEquals(monitor.getErrorPartitions(), 0L);

// Test with instance offline - error partition count should still be tracked
monitor.updateInstance(tags, ImmutableMap.of(), Collections.emptyList(), false, true, 2L);
Assert.assertEquals(monitor.getErrorPartitions(), 2L);
Assert.assertEquals(monitor.getOnline(), 0L);

// Test with instance disabled - error partition count should still be tracked
monitor.updateInstance(tags, ImmutableMap.of(), Collections.emptyList(), true, false, 4L);
Assert.assertEquals(monitor.getErrorPartitions(), 4L);
Assert.assertEquals(monitor.getEnabled(), 0L);
public void testPartitionCountMetrics() throws JMException {
String testCluster = "testCluster";
String testInstance = "testInstance";
Expand Down Expand Up @@ -359,6 +393,52 @@ public void testPartitionCountMetrics() throws JMException {
}

@Test
public void testErrorPartitionsWithDisabledPartitions() throws JMException {
String testCluster = "testCluster";
String testInstance = "testInstance";
String testDomain = "testDomain:key=value";
Set<String> tags = ImmutableSet.of("test");
Map<String, List<String>> disabledPartitions = ImmutableMap.of(
"resource1", ImmutableList.of("partition1", "partition2"),
"resource2", ImmutableList.of("partition3")
);

InstanceMonitor monitor =
new InstanceMonitor(testCluster, testInstance, new ObjectName(testDomain));

// Instance has both disabled partitions and error partitions
monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(), true, true, 2L);

// Verify both metrics are tracked independently
Assert.assertEquals(monitor.getDisabledPartitions(), 3L, "Should have 3 disabled partitions");
Assert.assertEquals(monitor.getErrorPartitions(), 2L, "Should have 2 error partitions");

// Update error partition count while keeping disabled partitions the same
monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(), true, true, 5L);
Assert.assertEquals(monitor.getDisabledPartitions(), 3L, "Disabled partitions should remain 3");
Assert.assertEquals(monitor.getErrorPartitions(), 5L, "Error partitions should now be 5");

monitor.unregister();
}

@Test
public void testErrorPartitionsMultipleUpdates() throws JMException {
String testCluster = "testCluster";
String testInstance = "testInstance";
String testDomain = "testDomain:key=value";
Set<String> tags = ImmutableSet.of("test");

InstanceMonitor monitor =
new InstanceMonitor(testCluster, testInstance, new ObjectName(testDomain));

// Simulate multiple updates with varying error partition counts
long[] errorCounts = {0L, 1L, 3L, 2L, 5L, 0L, 1L};

for (long errorCount : errorCounts) {
monitor.updateInstance(tags, ImmutableMap.of(), Collections.emptyList(), true, true, errorCount);
Assert.assertEquals(monitor.getErrorPartitions(), errorCount,
"Error partition count should be " + errorCount);
}
public void testPartitionCountEdgeCases() throws JMException {
String testCluster = "testCluster";
String testInstance = "testInstance";
Expand Down
Loading