Skip to content

Commit b02291e

Browse files
authored
[Message Prioritization] - Add currentReplicaNumber metadata for message prioritization by client (#3043)
Add currentReplicaNumber metadata for message prioritization by client
1 parent 3fd21d1 commit b02291e

File tree

5 files changed

+826
-42
lines changed

5 files changed

+826
-42
lines changed

helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java

Lines changed: 106 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Set;
2727
import java.util.concurrent.Callable;
2828
import java.util.concurrent.ExecutorService;
29+
import java.util.stream.Collectors;
2930

3031
import org.apache.helix.HelixDataAccessor;
3132
import org.apache.helix.HelixDefinedState;
@@ -57,17 +58,18 @@
5758
* Compares the currentState, pendingState with IdealState and generate messages
5859
*/
5960
public class MessageGenerationPhase extends AbstractBaseStage {
60-
private final static String NO_DESIRED_STATE = "NoDesiredState";
61+
private static final String NO_DESIRED_STATE = "NoDesiredState";
6162

6263
// If we see there is any invalid pending message leaving on host, i.e. message
6364
// tells participant to change from SLAVE to MASTER, and the participant is already
6465
// at MASTER state, we wait for timeout and if the message is still not cleaned up by
6566
// participant, controller will cleanup them proactively to unblock further state
6667
// transition
67-
public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
68+
public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
6869
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
69-
private final static String PENDING_MESSAGE = "pending message";
70-
private final static String STALE_MESSAGE = "stale message";
70+
private static final String PENDING_MESSAGE = "pending message";
71+
private static final String STALE_MESSAGE = "stale message";
72+
private static final String OFFLINE = "OFFLINE";
7173

7274
private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);
7375

@@ -163,6 +165,18 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
163165
// desired-state->list of generated-messages
164166
Map<String, List<Message>> messageMap = new HashMap<>();
165167

168+
/*
169+
* Calculate the current active replica count based on state model type.
170+
* This represents the number of replicas currently serving traffic for this partition
171+
* Active replicas include: top states, secondary top states(excluding OFFLINE) and ERROR
172+
* states.
173+
* Active replicas exclude: OFFLINE and DROPPED states.
174+
* All qualifying state transitions for this partition will receive this same value,
175+
* allowing clients to understand the current availability level and prioritize accordingly.
176+
*/
177+
int currentActiveReplicaCount =
178+
calculateCurrentActiveReplicaCount(currentStateMap, stateModelDef);
179+
166180
for (String instanceName : instanceStateMap.keySet()) {
167181

168182
Set<Message> staleMessages = cache.getStaleMessagesByInstance(instanceName);
@@ -250,17 +264,39 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
250264
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
251265
stateModelDef, cancellationMessage, isCancellationEnabled);
252266
} else {
267+
// Set currentActiveReplicaNumber to provide metadata for potential message
268+
// prioritization by participant.
269+
// Assign the current active replica count to all qualifying upward transitions for this
270+
// partition.
271+
// This ensures consistent prioritization metadata across concurrent state transitions.
272+
// -1 indicates no prioritization metadata, for eg:Downward ST messages get a -1.
273+
int currentActiveReplicaNumber = -1;
274+
275+
/*
276+
* Assign currentActiveReplicaNumber for qualifying upward state transitions.
277+
* Criteria for assignment:
278+
* - Must be an upward state transition according to state model
279+
* - Target state must be considered active (according to state model type)
280+
*/
281+
if (stateModelDef.isUpwardStateTransition(currentState, nextState)
282+
&& isStateActive(nextState, stateModelDef)) {
283+
284+
// All qualifying transitions for this partition get the same
285+
// currentActiveReplicaNumber
286+
currentActiveReplicaNumber = currentActiveReplicaCount;
287+
}
288+
253289
// Create new state transition message
254-
message = MessageUtil
255-
.createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
256-
resource, partition.getPartitionName(), instanceName, currentState, nextState,
257-
sessionIdMap.get(instanceName), stateModelDef.getId());
290+
message = MessageUtil.createStateTransitionMessage(manager.getInstanceName(),
291+
manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
292+
currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
293+
currentActiveReplicaNumber);
258294

259295
if (logger.isDebugEnabled()) {
260296
LogUtil.logDebug(logger, _eventId, String.format(
261-
"Resource %s partition %s for instance %s with currentState %s and nextState %s",
297+
"Resource %s partition %s for instance %s with currentState %s, nextState %s and currentActiveReplicaNumber %d",
262298
resource.getResourceName(), partition.getPartitionName(), instanceName,
263-
currentState, nextState));
299+
currentState, nextState, currentActiveReplicaNumber));
264300
}
265301
}
266302
}
@@ -290,7 +326,66 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
290326
} // end of for-each-partition
291327
}
292328

293-
private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
329+
/**
330+
* Calculate the current active replica count based on state model type.
331+
* The count includes replicas in top states, secondary top states (excluding OFFLINE),
332+
* and ERROR states since helix considers them active.Count excludes OFFLINE and DROPPED states.
333+
* @param currentStateMap
334+
* @param stateModelDef
335+
* @return The number of replicas currently in active states, used to determine the
336+
* currentActiveReplicaNumber for the partition.
337+
*/
338+
private int calculateCurrentActiveReplicaCount(Map<String, String> currentStateMap,
339+
StateModelDefinition stateModelDef) {
340+
return (int) currentStateMap.values().stream()
341+
.filter(state -> stateModelDef.getTopState().contains(state) // Top states (MASTER, ONLINE,
342+
// LEADER)
343+
|| getActiveSecondaryTopStates(stateModelDef).contains(state) // Active secondary states
344+
// (SLAVE, STANDBY,
345+
// BOOTSTRAP)
346+
|| HelixDefinedState.ERROR.name().equals(state) // ERROR states (still considered
347+
// active)
348+
// DROPPED and OFFLINE are automatically excluded by getActiveSecondaryTopStates()
349+
).count();
350+
}
351+
352+
/**
353+
* Get active secondary top states - states that are not non-serving states like OFFLINE and
354+
* DROPPED.
355+
* Reasons for elimination:
356+
* - getSecondTopStates() can include OFFLINE as a secondary top state in some state models.
357+
* Example - OnlineOffline:
358+
* getSecondTopStates() = ["OFFLINE"] as it transitions to ONLINE.
359+
* After filtering: activeSecondaryTopStates=[] (removes "OFFLINE" as it's not a serving state).
360+
* @param stateModelDef
361+
*/
362+
private List<String> getActiveSecondaryTopStates(StateModelDefinition stateModelDef) {
363+
return stateModelDef.getSecondTopStates().stream()
364+
// Remove non-serving states
365+
.filter(state -> !OFFLINE.equals(state) && !HelixDefinedState.DROPPED.name().equals(state))
366+
.collect(Collectors.toList());
367+
}
368+
369+
/**
370+
* Determines if the given state is considered active based on the state model type.
371+
* Active states include: top states, active secondary top states (excluding OFFLINE),
372+
* and ERROR states. Active states exclude OFFLINE and DROPPED states.
373+
* ERROR state replicas are always considered active in HELIX as they do not
374+
* affect availability.
375+
* @param state
376+
* @param stateModelDef
377+
* @return true if the state is considered active, false otherwise
378+
*/
379+
private boolean isStateActive(String state, StateModelDefinition stateModelDef) {
380+
// ERROR state is always considered active regardless of state model type
381+
if (HelixDefinedState.ERROR.name().equals(state)) {
382+
return true;
383+
}
384+
return stateModelDef.getTopState().contains(state)
385+
|| getActiveSecondaryTopStates(stateModelDef).contains(state);
386+
}
387+
388+
private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
294389
String initialState) {
295390
if (pendingMessage == null) {
296391
return false;

helix-core/src/main/java/org/apache/helix/model/Message.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public enum Attributes {
104104
RELAY_FROM,
105105
EXPIRY_PERIOD,
106106
SRC_CLUSTER,
107-
ST_REBALANCE_TYPE
107+
ST_REBALANCE_TYPE,
108+
CURRENT_ACTIVE_REPLICA_NUMBER
108109
}
109110

110111
/**
@@ -137,12 +138,8 @@ public enum STRebalanceType {
137138
/**
138139
* Compares the creation time of two Messages
139140
*/
140-
public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>() {
141-
@Override
142-
public int compare(Message m1, Message m2) {
143-
return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp()));
144-
}
145-
};
141+
public static final Comparator<Message> CREATE_TIME_COMPARATOR =
142+
(m1, m2) -> Long.compare(m2.getCreateTimeStamp(), m1.getCreateTimeStamp());
146143

147144
/**
148145
* Instantiate a message
@@ -935,6 +932,39 @@ public void setSrcClusterName(String clusterName) {
935932
_record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
936933
}
937934

935+
/**
936+
* Set current active replica count for participant-side message prioritization.
937+
* This field indicates the number of replicas currently in active states (including ERROR states)
938+
* for this partition at the time the state transition message is generated.
939+
* Active states include top states, secondary top states (for single-top state models),
940+
* and ERROR states.
941+
* This metadata enables participants to prioritize recovery scenarios (low active counts)
942+
* over load balancing scenarios (high active counts) in custom thread pools or message handlers.
943+
* For example, 2→3 transitions get higher priority than 3→4 transitions.
944+
* Default value is -1 for transitions that don't require prioritization metadata.(for eg :
945+
* downward transitions).
946+
* @param currentActiveReplicaNumber the number of currently active replicas (-1 when there is no
947+
* prioritization metadata,
948+
* >=0 for transitions containing current availability level)
949+
*/
950+
public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) {
951+
_record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(),
952+
currentActiveReplicaNumber);
953+
}
954+
955+
/**
956+
* Get the current active replica count for this partition at message generation time.
957+
* This value represents the number of replicas in active states (including ERROR states) before
958+
* any state transitions occur, enabling participant-side message prioritization based on
959+
* current availability levels.
960+
* @return current active replica count, or -1 for cases where we don't provide metadata for
961+
* prioritization like downward state transitions.
962+
*/
963+
964+
public int getCurrentActiveReplicaNumber() {
965+
return _record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1);
966+
}
967+
938968
/**
939969
* Check if this message is targetted for a controller
940970
* @return true if this is a controller message, false otherwise

helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,4 +505,23 @@ public static Map<String, Integer> getStateCounts(Map<String, String> stateMap)
505505
}
506506
return stateCounts;
507507
}
508+
509+
/**
510+
* Check if a state transition is upward
511+
* @param fromState source state
512+
* @param toState destination state
513+
* @return True if it's an upward state transition, false otherwise
514+
*/
515+
public boolean isUpwardStateTransition(String fromState, String toState) {
516+
Map<String, Integer> statePriorityMap = getStatePriorityMap();
517+
518+
Integer fromStateWeight = statePriorityMap.get(fromState);
519+
Integer toStateWeight = statePriorityMap.get(toState);
520+
521+
if (fromStateWeight == null || toStateWeight == null) {
522+
return false;
523+
}
524+
525+
return toStateWeight < fromStateWeight;
526+
}
508527
}

helix-core/src/main/java/org/apache/helix/util/MessageUtil.java

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
4848
toState);
4949

5050
Message message =
51-
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
51+
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
5252
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
5353
nextState, sessionId, stateModelDefName);
5454

@@ -60,28 +60,6 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
6060
return null;
6161
}
6262

63-
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
64-
Resource resource, String partitionName, String instanceName, String currentState,
65-
String nextState, String tgtSessionId, String stateModelDefName) {
66-
Message message =
67-
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName,
68-
srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId,
69-
stateModelDefName);
70-
71-
// Set the retry count for state transition messages.
72-
// TODO: make the retry count configurable in ClusterConfig or IdealState
73-
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
74-
75-
if (resource.getResourceGroupName() != null) {
76-
message.setResourceGroupName(resource.getResourceGroupName());
77-
}
78-
if (resource.getResourceTag() != null) {
79-
message.setResourceTag(resource.getResourceTag());
80-
}
81-
82-
return message;
83-
}
84-
8563
/**
8664
* Creates a message to change participant status
8765
* {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
@@ -121,7 +99,7 @@ private static Message createBasicMessage(Message.MessageType messageType, Strin
12199
}
122100

123101
/* Creates state transition or state transition cancellation message */
124-
private static Message createStateTransitionMessage(Message.MessageType messageType,
102+
private static Message createBasicStateTransitionMessage(Message.MessageType messageType,
125103
String srcInstanceName, String srcSessionId, Resource resource, String partitionName,
126104
String instanceName, String currentState, String nextState, String tgtSessionId,
127105
String stateModelDefName) {
@@ -136,4 +114,72 @@ private static Message createStateTransitionMessage(Message.MessageType messageT
136114

137115
return message;
138116
}
117+
118+
/**
119+
* Create a state transition message with replica prioritization metadata
120+
* @param srcInstanceName source instance name
121+
* @param srcSessionId source session id
122+
* @param resource resource
123+
* @param partitionName partition name
124+
* @param instanceName target instance name
125+
* @param currentState current state
126+
* @param nextState next state
127+
* @param tgtSessionId target session id
128+
* @param stateModelDefName state model definition name
129+
* @param currentActiveReplicaNumber The number of replicas currently in active states
130+
* for this partition before any state transitions occur. This metadata
131+
* enables participant-side message prioritization by indicating the
132+
* current availability level (e.g., 0→1 recovery scenarios get higher
133+
* priority than 2→3 load balancing scenarios). Set to -1 for transitions
134+
* that should not be prioritized (downward transitions).
135+
* Active states include top states, secondary top states (for single-top
136+
* state models), and ERROR states since they are still considered active by Helix.
137+
* @return state transition message
138+
*/
139+
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
140+
Resource resource, String partitionName, String instanceName, String currentState,
141+
String nextState, String tgtSessionId, String stateModelDefName,
142+
int currentActiveReplicaNumber) {
143+
Message message = createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION,
144+
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
145+
nextState, tgtSessionId, stateModelDefName);
146+
147+
// Set the retry count for state transition messages.
148+
// TODO: make the retry count configurable in ClusterConfig or IdealState
149+
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
150+
151+
if (resource.getResourceGroupName() != null) {
152+
message.setResourceGroupName(resource.getResourceGroupName());
153+
}
154+
if (resource.getResourceTag() != null) {
155+
message.setResourceTag(resource.getResourceTag());
156+
}
157+
158+
// Set current active replica number for participant-side prioritization
159+
message.setCurrentActiveReplicaNumber(currentActiveReplicaNumber);
160+
161+
return message;
162+
}
163+
164+
/**
165+
* Create a state transition message (backward compatibility)
166+
* @param srcInstanceName source instance name
167+
* @param srcSessionId source session id
168+
* @param resource resource
169+
* @param partitionName partition name
170+
* @param instanceName target instance name
171+
* @param currentState current state
172+
* @param nextState next state
173+
* @param tgtSessionId target session id
174+
* @param stateModelDefName state model definition name
175+
* @return state transition message
176+
*/
177+
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
178+
Resource resource, String partitionName, String instanceName, String currentState,
179+
String nextState, String tgtSessionId, String stateModelDefName) {
180+
// currentActiveReplicaNumber is set to -1 for ST messages needing no prioritization metadata
181+
// (backward compatibility)
182+
return createStateTransitionMessage(srcInstanceName, srcSessionId, resource, partitionName,
183+
instanceName, currentState, nextState, tgtSessionId, stateModelDefName, -1);
184+
}
139185
}

0 commit comments

Comments
 (0)