Skip to content

Commit 9a1206e

Browse files
authored
Fix mqbc::StorageMgr: Buffer artificial primary status adv if not healed (#971)
Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
1 parent 169c39d commit 9a1206e

File tree

2 files changed

+23
-1
lines changed

2 files changed

+23
-1
lines changed

src/groups/mqb/mqbc/mqbc_partitionfsm.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ PartitionFSM& PartitionFSM::unregisterObserver(PartitionFSMObserver* observer)
5757
void PartitionFSM::popEventAndProcess(
5858
const bsl::shared_ptr<bsl::queue<EventWithData> >& eventsQueue)
5959
{
60+
// executed by *QUEUE_DISPATCHER* thread associated with 'partitionId'
61+
6062
// PRECONDITIONS
6163
BSLS_ASSERT_SAFE(!eventsQueue->empty());
6264

src/groups/mqb/mqbc/mqbc_storagemanager.cpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,24 @@ void StorageManager::setPrimaryStatusForPartitionDispatched(
358358

359359
pinfo.setPrimaryStatus(value);
360360
if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == value) {
361+
if (!d_partitionFSMVec[partitionId]->isSelfHealed()) {
362+
BALL_LOG_WARN << d_clusterData_p->identity().description()
363+
<< " Partition [" << partitionId << "]: "
364+
<< "Partition FSM is not yet healed; deferring "
365+
<< "setting active primary in FileStore until "
366+
<< "healing is complete. Will buffer an artificial "
367+
<< "PrimaryStatusAdvisory for Partition FSM to "
368+
<< "process when it becomes healed";
369+
370+
bmqp_ctrlmsg::PrimaryStatusAdvisory primaryAdv;
371+
primaryAdv.partitionId() = partitionId;
372+
primaryAdv.primaryLeaseId() = pinfo.primaryLeaseId();
373+
primaryAdv.status() = value;
374+
375+
bufferPrimaryStatusAdvisoryDispatched(primaryAdv, pinfo.primary());
376+
return; // RETURN
377+
}
378+
361379
d_fileStores[partitionId]->setActivePrimary(pinfo.primary(),
362380
pinfo.primaryLeaseId());
363381

@@ -1085,10 +1103,12 @@ void StorageManager::bufferPrimaryStatusAdvisoryDispatched(
10851103
const int pid = advisory.partitionId();
10861104
BSLS_ASSERT_SAFE(0 <= pid && pid < static_cast<int>(d_fileStores.size()));
10871105
BSLS_ASSERT_SAFE(d_fileStores[pid]->inDispatcherThread());
1106+
BSLS_ASSERT_SAFE(source);
10881107

10891108
BALL_LOG_INFO << d_clusterData_p->identity().description()
10901109
<< " Partition [" << pid
1091-
<< "]: Buffering primary status advisory: " << advisory;
1110+
<< "]: Buffering primary status advisory: " << advisory
1111+
<< " from " << source->nodeDescription();
10921112

10931113
d_bufferedPrimaryStatusAdvisoryInfosVec.at(pid).push_back(
10941114
bsl::make_pair(advisory, source));

0 commit comments

Comments
 (0)