Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public enum ResourceConfigProperty {
EXTERNAL_VIEW_DISABLED,
DELAY_REBALANCE_ENABLED,
PARTITION_CAPACITY_MAP,
RELAXED_DISABLED_PARTITION_CONSTRAINT // Resource-level override for relaxed disabled partition constraint
RELAXED_DISABLED_PARTITION_CONSTRAINT, // Resource-level override for relaxed disabled partition constraint
ACTIVE_STATES_FOR_MIN_ACTIVE_REPLICA_CHECK // List of states to be considered as "active" for min active replica check
}

public enum ResourceConfigConstants {
Expand Down Expand Up @@ -279,6 +280,33 @@ public int getMinActiveReplica() {
return _record.getIntField(ResourceConfigProperty.MIN_ACTIVE_REPLICAS.name(), -1);
}

/**
* Get the list of states that should be considered as "active" for min active replica check.
* If not configured, the default behavior applies (all states except DROPPED, ERROR, and initial state).
*
* @return List of state names to be considered active, or null if not configured
*/
public List<String> getActiveStatesForMinActiveReplicaCheck() {
return _record.getListField(ResourceConfigProperty.ACTIVE_STATES_FOR_MIN_ACTIVE_REPLICA_CHECK.name());
}

/**
* Set the list of states that should be considered as "active" for min active replica check.
* When configured, only replicas in these states will count toward the min active replica constraint.
*
* Example: For a state model with states [LEADER, STANDBY, BOOTSTRAP, OFFLINE],
* setting this to ["LEADER", "STANDBY"] will only count replicas in LEADER or STANDBY states.
*
* @param activeStates List of state names to be considered active, or null to use default behavior
*/
public void setActiveStatesForMinActiveReplicaCheck(List<String> activeStates) {
if (activeStates == null) {
_record.getListFields().remove(ResourceConfigProperty.ACTIVE_STATES_FOR_MIN_ACTIVE_REPLICA_CHECK.name());
} else {
_record.setListField(ResourceConfigProperty.ACTIVE_STATES_FOR_MIN_ACTIVE_REPLICA_CHECK.name(), activeStates);
}
}

public int getMaxPartitionsPerInstance() {
return _record.getIntField(ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.toString(),
Integer.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,11 @@ public static MinActiveReplicaCheckResult siblingNodesActiveReplicaCheckWithDeta
String stateModeDef = externalView.getStateModelDefRef();
StateModelDefinition stateModelDefinition =
dataAccessor.getProperty(propertyKeyBuilder.stateModelDef(stateModeDef));
Set<String> unhealthyStates = new HashSet<>(UNHEALTHY_STATES);
if (stateModelDefinition != null) {
unhealthyStates.add(stateModelDefinition.getInitialState());
}

// Determine which states should be considered "unhealthy" (not active)
Set<String> unhealthyStates = getUnhealthyStates(dataAccessor, propertyKeyBuilder,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch statemodel defintion inside this method only.

resourceName, stateModelDefinition);

for (String partition : externalView.getPartitionSet()) {
Map<String, String> stateByInstanceMap = externalView.getStateMap(partition);
// found the resource hosted on the instance
Expand Down Expand Up @@ -498,4 +499,49 @@ public static MinActiveReplicaCheckResult siblingNodesActiveReplicaCheckWithDeta

return MinActiveReplicaCheckResult.passed();
}

/**
* Get the set of states that should be considered "unhealthy" (not active) for min active replica checks.
*
* The logic is as follows:
* 1. If the resource has ACTIVE_STATES_FOR_MIN_ACTIVE_REPLICA_CHECK configured in ResourceConfig,
* use that list and consider all other states as unhealthy.
* 2. Otherwise, use the default behavior: DROPPED, ERROR, and the initial state are unhealthy.
*
* @param dataAccessor The data accessor
* @param propertyKeyBuilder The property key builder
* @param resourceName The resource name
* @param stateModelDefinition The state model definition
* @return Set of state names considered unhealthy
*/
private static Set<String> getUnhealthyStates(HelixDataAccessor dataAccessor,
PropertyKey.Builder propertyKeyBuilder, String resourceName,
StateModelDefinition stateModelDefinition) {

// Try to get resource config to check for custom active states configuration
ResourceConfig resourceConfig = dataAccessor.getProperty(propertyKeyBuilder.resourceConfig(resourceName));

if (resourceConfig != null) {
List<String> customActiveStates = resourceConfig.getActiveStatesForMinActiveReplicaCheck();
if (customActiveStates != null && !customActiveStates.isEmpty()) {
// Custom active states are configured - all other states are unhealthy
Set<String> unhealthyStates = new HashSet<>(UNHEALTHY_STATES);
if (stateModelDefinition != null && stateModelDefinition.getStatesPriorityList() != null) {
unhealthyStates.addAll(stateModelDefinition.getStatesPriorityList());
}
// Remove the custom active states - what remains are unhealthy
unhealthyStates.removeAll(customActiveStates);
_logger.debug("Resource {} using custom active states for min active replica check: {}. Unhealthy states: {}",
resourceName, customActiveStates, unhealthyStates);
return unhealthyStates;
}
}

// Default behavior: DROPPED, ERROR, and initial state are unhealthy
Set<String> unhealthyStates = new HashSet<>(UNHEALTHY_STATES);
if (stateModelDefinition != null) {
unhealthyStates.add(stateModelDefinition.getInitialState());
}
return unhealthyStates;
}
}
Loading
Loading