Skip to content

Commit

Permalink
[Java] Keep publishing position updates from canvass and into nominat…
Browse files Browse the repository at this point in the history
…e state so other cluster node members can take action in extended election timeout durations.

(cherry picked from commit 3c434a6)
  • Loading branch information
mjpt777 authored and vyazelenko committed Oct 14, 2024
1 parent c92e58e commit d0849b5
Showing 1 changed file with 33 additions and 21 deletions.
54 changes: 33 additions & 21 deletions aeron-cluster/src/main/java/io/aeron/cluster/Election.java
Original file line number Diff line number Diff line change
Expand Up @@ -648,27 +648,14 @@ private int init(final long nowNs)
private int canvass(final long nowNs)
{
int workCount = 0;
final long deadlineNs = isExtendedCanvass ?
timeOfLastStateChangeNs + ctx.startupCanvassTimeoutNs() :
consensusModuleAgent.timeOfLastLeaderUpdateNs() + ctx.leaderHeartbeatTimeoutNs();

if (hasUpdateIntervalExpired(nowNs, ctx.electionStatusIntervalNs()))
{
timeOfLastUpdateNs = nowNs;
for (final ClusterMember member : clusterMembers)
{
if (member.id() != thisMember.id())
{
if (null == member.publication())
{
ClusterMember.tryAddPublication(
member,
ctx.consensusStreamId(),
ctx.aeron(),
ctx.countedErrorHandler());
}

consensusPublisher.canvassPosition(
member.publication(), logLeadershipTermId, appendPosition, leadershipTermId, thisMember.id());
}
}
publishCanvassPosition();

workCount++;
}
Expand All @@ -678,10 +665,6 @@ private int canvass(final long nowNs)
return workCount;
}

final long deadlineNs = isExtendedCanvass ?
timeOfLastStateChangeNs + ctx.startupCanvassTimeoutNs() :
consensusModuleAgent.timeOfLastLeaderUpdateNs() + ctx.leaderHeartbeatTimeoutNs();

if (ClusterMember.isUnanimousCandidate(clusterMembers, thisMember, gracefulClosedLeaderId) ||
(nowNs >= deadlineNs && ClusterMember.isQuorumCandidate(clusterMembers, thisMember)))
{
Expand All @@ -702,6 +685,14 @@ private int nominate(final long nowNs)
candidateTermId + 1, logPosition, ctx.epochClock().time());
ClusterMember.becomeCandidate(clusterMembers, candidateTermId, thisMember.id());
state(CANDIDATE_BALLOT, nowNs, "");

return 1;
}
else if (hasUpdateIntervalExpired(nowNs, ctx.electionStatusIntervalNs()))
{
timeOfLastUpdateNs = nowNs;
publishCanvassPosition();

return 1;
}

Expand Down Expand Up @@ -1152,6 +1143,27 @@ private int publishCommitPositionOnInterval(final long quorumPosition, final lon
return workCount;
}

private void publishCanvassPosition()
{
for (final ClusterMember member : clusterMembers)
{
if (member.id() != thisMember.id())
{
if (null == member.publication())
{
ClusterMember.tryAddPublication(
member,
ctx.consensusStreamId(),
ctx.aeron(),
ctx.countedErrorHandler());
}

consensusPublisher.canvassPosition(
member.publication(), logLeadershipTermId, appendPosition, leadershipTermId, thisMember.id());
}
}
}

private void publishNewLeadershipTerm(final long timestamp)
{
for (final ClusterMember member : clusterMembers)
Expand Down

0 comments on commit d0849b5

Please sign in to comment.