Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2408,7 +2408,10 @@ Cluster::Cluster(const bslstl::StringRef& name,
statContexts.find("clusters")->second,
statContexts,
allocator)
, d_state(this, clusterConfig.partitionConfig().numPartitions(), allocator)
, d_state(this,
clusterConfig.partitionConfig().numPartitions(),
false, // isTemporary
allocator)
, d_storageManager_mp()
, d_clusterOrchestrator(d_clusterData.clusterConfig(),
this,
Expand Down Expand Up @@ -3359,6 +3362,8 @@ void Cluster::onClusterLeader(mqbnet::ClusterNode* node,
<< "Encountered leader-primary divergence: this node is "
"still the primary but the leadership has gone to "
<< (node ? node->hostName() : "UNDEFINED");
d_clusterData.membership().setSelfNodeStatus(
bmqp_ctrlmsg::NodeStatus::E_STOPPING);
mqbu::ExitUtil::shutdown(
mqbu::ExitCode::e_UNSUPPORTED_SCENARIO);
}
Expand Down
89 changes: 53 additions & 36 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched(

BALL_LOG_INFO << d_clusterData_p->identity().description()
<< " Partition [" << partitionId
<< "]: primary node (self) successfully synced the "
<< "]: primary node (self) successfully synced the"
<< " partition. Current leaseId: " << pinfo.primaryLeaseId();

// Update primary status via cluster state manager which will also notify
Expand Down Expand Up @@ -510,11 +510,19 @@ void ClusterOrchestrator::onNodeUnavailable(mqbnet::ClusterNode* node)
d_clusterData_p->electorInfo().electorState() ||
mqbc::ElectorInfoLeaderStatus::e_PASSIVE ==
d_clusterData_p->electorInfo().leaderStatus() ||
bmqp_ctrlmsg::NodeStatus::E_AVAILABLE !=
d_clusterData_p->membership().selfNodeStatus()) {
(!d_clusterConfig.clusterAttributes().isFSMWorkflow() &&
bmqp_ctrlmsg::NodeStatus::E_AVAILABLE !=
d_clusterData_p->membership().selfNodeStatus())) {
// Nothing to do if self is not active leader, or if self is active
// leader but is stopping.

// In FSM mode, it is incorrect to check whether self is AVAILABLE,
// because self could be healing while another node becomes
// unavailable, and in that case self should still be able to reassign
// primaries. In legacy mode, self sets status to AVAILABLE
// nonchalantly, and it works out by preventing us from returning
// early.

return; // RETURN
}

Expand Down Expand Up @@ -1073,22 +1081,23 @@ void ClusterOrchestrator::processNodeStatusAdvisory(
d_clusterData_p->electorInfo().electorState() &&
mqbc::ElectorInfoLeaderStatus::e_ACTIVE ==
d_clusterData_p->electorInfo().leaderStatus()) {
// Self is ACTIVE leader. Although self has sent leader advisory
// to 'source' when 'source' came up (see
// 'processNodeStateChange'), it sends the advisory again, in case
// 'source' ignored the previous advisory due to not recognizing
// self as leader. Although self does send a leader heartbeat to
// 'source', but due to race related to thread scheduling, 'source'
// may see that heartbeat after the first advisory.

bsl::vector<bmqp_ctrlmsg::PartitionPrimaryInfo> partitions;
mqbc::ClusterUtil::loadPartitionsInfo(&partitions,
*clusterState());
d_stateManager_mp->sendClusterState(
true, // sendPartitionPrimaryInfo
true, // sendQueuesInfo
source,
partitions);
// In FSM mode, the Cluster FSM will take care of keeping the
// follower up-to-date.
if (!d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
// Self is ACTIVE leader. Although self has sent leader
// advisory to 'source' when 'source' came up (see
// 'processNodeStateChange'), it sends the advisory again, in
// case 'source' ignored the previous advisory due to not
// recognizing self as leader. Although self does send a
// leader heartbeat to 'source', but due to race related to
// thread scheduling, 'source' may see that heartbeat after the
// first advisory.

d_stateManager_mp->sendClusterState(
true, // sendPartitionPrimaryInfo
true, // sendQueuesInfo
source);
}
}
else if (d_clusterConfig.clusterAttributes().isFSMWorkflow() &&
source->nodeId() ==
Expand Down Expand Up @@ -1189,19 +1198,19 @@ void ClusterOrchestrator::processNodeStateChangeEvent(
}

if (d_clusterData_p->electorInfo().isSelfActiveLeader()) {
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": leader (self) is ACTIVE; will send leader"
<< " advisory to new node: "
<< node->nodeDescription();

bsl::vector<bmqp_ctrlmsg::PartitionPrimaryInfo> partitions;
mqbc::ClusterUtil::loadPartitionsInfo(&partitions,
*clusterState());
d_stateManager_mp->sendClusterState(
true, // sendPartitionPrimaryInfo
true, // sendQueuesInfo
node,
partitions);
// In FSM mode, the Cluster FSM will take care of keeping the
// follower up-to-date.
if (!d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
BALL_LOG_INFO
<< d_clusterData_p->identity().description()
<< ": leader (self) is ACTIVE; will send leader"
<< " advisory to new node: " << node->nodeDescription();

d_stateManager_mp->sendClusterState(
true, // sendPartitionPrimaryInfo
true, // sendQueuesInfo
node);
}
}

// For each partition for which self is primary, notify the storageMgr
Expand Down Expand Up @@ -1762,6 +1771,17 @@ void ClusterOrchestrator::processLeaderPassiveNotification(
BSLS_ASSERT_SAFE(notification.choice().isLeaderPassiveValue());
BSLS_ASSERT_SAFE(notifier);

if (d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
BALL_LOG_INFO
<< d_clusterData_p->identity().description()
<< ": In FSM mode, *always ignoring* leader passive notification: "
<< notification << ", from peer node '"
<< notifier->nodeDescription()
<< ". This is because when a node transitions to healed "
<< "follower, it must already set leader status as ACTIVE.";
return; // RETURN
}

if (d_clusterData_p->electorInfo().electorState() !=
mqbnet::ElectorState::e_LEADER) {
// This node may have gone from leader to non-leader.
Expand Down Expand Up @@ -1809,12 +1829,9 @@ void ClusterOrchestrator::processLeaderPassiveNotification(
}

// Self is an ACTIVE leader - broadcast 'LeaaderAdvisory'
bsl::vector<bmqp_ctrlmsg::PartitionPrimaryInfo> partitions;
mqbc::ClusterUtil::loadPartitionsInfo(&partitions, *clusterState());
d_stateManager_mp->sendClusterState(true, // sendPartitionPrimaryInfo
true, // sendQueuesInfo
0,
partitions);
0);
}

void ClusterOrchestrator::onRecoverySuccess()
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,8 @@ ClusterProxy::ClusterProxy(
statContexts,
allocator)
, d_state(this,
0, // Partition count. Proxy has no notion of partition.
0, // Partition count. Proxy has no notion of partition.
false, // isTemporary
allocator)
, d_activeNodeManager(d_clusterData.membership().netCluster()->nodes(),
description(),
Expand Down
23 changes: 13 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3742,8 +3742,8 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)

// If a specific partitionId is specified, check if partition is assigned
// to a primary node, and if that primary is ACTIVE.
bool isSelfPrimary = false;
const ClusterStatePartitionInfo* pinfo = 0;
bool isSelfPrimaryAndLeader = false;
const ClusterStatePartitionInfo* pinfo = 0;

if (!allPartitions) {
pinfo = &(d_clusterState_p->partition(partitionId));
Expand All @@ -3762,14 +3762,17 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
}

// Primary for this partitionId is ACTIVE. Check if self is the
// primary.
// primary and leader. If self is primary but not leader, this is
// primary-leader divergence and we should not proceed with state
// restore.

isSelfPrimary = pinfo->primaryNode() ==
d_clusterData_p->membership().selfNode();
isSelfPrimaryAndLeader =
pinfo->primaryNode() == d_clusterData_p->membership().selfNode() &&
d_clusterData_p->electorInfo().isSelfLeader();
}

/// TODO (FSM); remove after switching to FSM
if (!d_cluster_p->isFSMWorkflow() && isSelfPrimary) {
if (!d_cluster_p->isFSMWorkflow() && isSelfPrimaryAndLeader) {
// Note that this fails if there are data
mqbc::ClusterState::AssignmentVisitor doubleAssignmentVisitor =
bdlf::BindUtil::bindS(d_allocator_p,
Expand Down Expand Up @@ -3831,7 +3834,7 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)

// Verify the CSL if needed by comparing it with the Domain config
if (liveQInfo.d_queue_sp) {
if (isSelfPrimary) {
if (isSelfPrimaryAndLeader) {
// We are assuming that it is not possible for a node to be
// primary, lose primary-ship and regain primary-ship;
// unless eventually the node went down in which case it
Expand Down Expand Up @@ -4472,9 +4475,9 @@ void ClusterQueueHelper::onQueueUpdated(
QueueContextMapIter qiter = d_queues.find(uri);
BSLS_ASSERT_SAFE(qiter != d_queues.end());

QueueContext& queueContext = *qiter->second;
mqbi::Queue* queue = queueContext.d_liveQInfo.d_queue_sp.get();
const int partitionId = queueContext.partitionId();
QueueContext& queueContext = *qiter->second;
mqbi::Queue* queue = queueContext.d_liveQInfo.d_queue_sp.get();
const int partitionId = queueContext.partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
Expand Down
38 changes: 26 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,25 @@ void ClusterStateManager::onCommit(
<< ": Committed advisory: " << advisory << ", with status '"
<< status << "'";

// NOTE: We *must* apply the advisory to in-memory cluster state before
// setting leader status as ACTIVE. Otherwise, the state-restore event
// emitted by setting as ACTIVE, might assign a queue with a different
// queue key than the one specified in the advisory.
mqbc::ClusterUtil::apply(d_state_p, clusterMessage, *d_clusterData_p);

// Leader status is set to ACTIVE during FSM, not CSL. Since we are still
// in the phase of enabling CSL, we need to explicitly set it here.
if (clusterMessage.choice().isLeaderAdvisoryValue()) {
d_clusterData_p->electorInfo().setLeaderStatus(
mqbc::ElectorInfoLeaderStatus::e_ACTIVE);
if (d_clusterData_p->electorInfo().isSelfLeader()) {
d_clusterData_p->electorInfo().onSelfActiveLeader();
}
else {
d_clusterData_p->electorInfo().setLeaderStatus(
mqbc::ElectorInfoLeaderStatus::e_ACTIVE);
}
}

mqbc::ClusterUtil::apply(d_state_p, clusterMessage, *d_clusterData_p);
d_trustCSL = true;
}

void ClusterStateManager::onSelfActiveLeader()
Expand All @@ -114,15 +125,17 @@ void ClusterStateManager::onSelfActiveLeader()
<< ": setting self as active leader, will assign partitions "
<< " and broadcast cluster state to peers.";

d_clusterData_p->electorInfo().onSelfActiveLeader();

bsl::vector<bmqp_ctrlmsg::PartitionPrimaryInfo> partitions;
assignPartitions(&partitions);

sendClusterState(true, // sendPartitionPrimaryInfo
true, // sendQueuesInfo
0,
partitions);

// NOTE: Do not notify elector that we are active leader yet. Wait until
// the commit of the first CSL advisory, in case we need to construct a CSL
// "merged state" to ensure correctness.
}

void ClusterStateManager::leaderSyncCb()
Expand Down Expand Up @@ -688,6 +701,7 @@ ClusterStateManager::ClusterStateManager(
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_allocators(d_allocator_p)
, d_trustCSL(false)
, d_isStarted(false)
, d_clusterConfig(clusterConfig)
, d_cluster_p(cluster)
Expand Down Expand Up @@ -827,16 +841,14 @@ void ClusterStateManager::assignPartitions(

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(d_clusterData_p->electorInfo().isSelfActiveLeader());
BSLS_ASSERT_SAFE(d_clusterData_p->electorInfo().isSelfLeader());
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());
BSLS_ASSERT_SAFE(partitions && partitions->empty());

mqbc::ClusterUtil::assignPartitions(
partitions,
d_state_p,
d_clusterConfig.masterAssignment(),
*d_clusterData_p,
d_clusterConfig.clusterAttributes().isCSLModeEnabled());
mqbc::ClusterUtil::assignPartitions(partitions,
d_state_p,
d_clusterConfig.masterAssignment(),
*d_clusterData_p);
}

bool ClusterStateManager::assignQueue(const bmqt::Uri& uri,
Expand Down Expand Up @@ -911,6 +923,8 @@ void ClusterStateManager::sendClusterState(
*d_state_p,
sendPartitionPrimaryInfo,
sendQueuesInfo,
d_trustCSL,
d_allocator_p,
node,
partitions);
}
Expand Down
4 changes: 4 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
/// Allocator store to spawn new allocators for sub-components.
bmqma::CountingAllocatorStore d_allocators;

/// Whether to trust CSL as the source of truth. At startup, do *not*
/// trust CSL. Only trust CSL after the first CSL advisory commit.
bool d_trustCSL;

bool d_isStarted;

/// Cluster configuration to use.
Expand Down
Loading