Skip to content

Commit 8f789be

Browse files
authored
Fix[MQB]: always attempt to process pending open queue requests (#932)
Signed-off-by: dorjesinpo <[email protected]>
1 parent 89a12ae commit 8f789be

File tree

1 file changed

+41
-48
lines changed

1 file changed

+41
-48
lines changed

src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp

Lines changed: 41 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,8 @@ void ClusterQueueHelper::onQueueContextAssigned(
845845
}
846846
}
847847

848+
// REVISIT: 'processOpenQueueRequest' seems to do similar (possibly
849+
// redundant) check for 'hasActiveAvailablePrimary',
848850
if (havePending && haveActivePrimary && isAvailable) {
849851
processPendingContexts(queueContext);
850852
}
@@ -3812,8 +3814,8 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
38123814
continue; // CONTINUE;
38133815
}
38143816

3815-
BSLS_ASSERT_SAFE(isQueueAssigned(*queueContext.get()));
3816-
BSLS_ASSERT_SAFE(isQueuePrimaryAvailable(*queueContext.get()));
3817+
BSLS_ASSERT_SAFE(isQueueAssigned(*queueContext));
3818+
BSLS_ASSERT_SAFE(isQueuePrimaryAvailable(*queueContext));
38173819

38183820
// Verify the CSL if needed by comparing it with the Domain config
38193821
if (liveQInfo.d_queue_sp) {
@@ -3860,22 +3862,22 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
38603862
if (mqbi::ClusterErrorCode::e_OK == result) {
38613863
// Cannot continue until 'onQueueUpdated'
38623864
// Send QueueUpdateAdvisory and _wait_ for commit
3865+
3866+
continue; // CONTINUE
38633867
}
3864-
else {
3865-
// An update error is CSL error (in
3866-
// 'ClusterStateLedger::apply'). This queue cannot
3867-
// convertToLocal
3868-
// ('RootQueueEngine::initializeAppId' would assert
3869-
// if there is no storage for some app).
3870-
3871-
BSLS_ASSERT_SAFE(false && "Failure to update Apps "
3872-
"before convertToLocal");
3873-
}
3868+
3869+
// An update error is CSL error (in
3870+
// 'ClusterStateLedger::apply'). This queue cannot
3871+
// convertToLocal
3872+
// ('RootQueueEngine::initializeAppId' would assert
3873+
// if there is no storage for some app).
3874+
3875+
BSLS_ASSERT_SAFE(
3876+
false &&
3877+
"Failure to update Apps before convertToLocal");
38743878
}
38753879
else {
38763880
convertToLocal(queueContext, domain);
3877-
3878-
processPendingContexts(queueContext);
38793881
}
38803882
}
38813883
else {
@@ -3903,6 +3905,9 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
39033905
// In case of other type of failure, just continue
39043906
// processing other queues instead of stopping the
39053907
// 'state restore' sequence.
3908+
3909+
// REVISIT: this code sends pending Open Queue requests
3910+
// without waiting for the ReOpen Queue Response.
39063911
}
39073912
else {
39083913
BMQ_LOGTHROTTLE_INFO
@@ -3920,25 +3925,14 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
39203925
// requests were not processed, but appended to the pending
39213926
// context list, so once we have an active primary, we
39223927
// should process them.
3923-
if (!queueContext->d_liveQInfo.d_pending.empty()) {
3924-
// Proceed with pending contexts, if any.
3925-
BMQ_LOGTHROTTLE_INFO
3926-
<< d_cluster_p->description()
3927-
<< ": Proceeding with "
3928-
<< queueContext->d_liveQInfo.d_pending.size()
3929-
<< " associated pending contexts for '"
3930-
<< queueContext->uri() << "'";
3931-
processPendingContexts(queueContext);
3932-
}
39333928
}
39343929
}
3935-
else {
3936-
// Queue instance is not created, but the queue is assigned.
3937-
// Proceed ahead.
3938-
3939-
onQueueContextAssigned(queueContext);
3940-
}
3930+
// else, Queue instance is not created, but the queue is assigned.
3931+
// Proceed ahead.
39413932
}
3933+
3934+
// In all cases, _attempt_ to process pending open queue requests
3935+
onQueueContextAssigned(queueContext);
39423936
}
39433937
}
39443938

@@ -4562,27 +4556,26 @@ void ClusterQueueHelper::onQueueUpdated(
45624556
<< ", addedAppIds: " << printer1
45634557
<< ", removedAppIds: " << printer2;
45644558

4565-
// Resume open queue request(s) waiting for new App(s).
4566-
processPendingContexts(qiter->second);
4559+
if (!queueContext.d_liveQInfo.d_pendingUpdates.empty()) {
4560+
// Swap the contexts to process them one by one and also clear the
4561+
// pendingContexts of the queue info: they will be enqueued back, if
4562+
// needed.
4563+
bsl::vector<VoidFunctor> pending(d_allocator_p);
4564+
pending.swap(queueContext.d_liveQInfo.d_pendingUpdates);
45674565

4568-
if (queueContext.d_liveQInfo.d_pendingUpdates.empty()) {
4569-
return; // RETURN
4570-
}
4566+
BMQ_LOGTHROTTLE_INFO << d_cluster_p->description() << ": processing "
4567+
<< pending.size() << " pending Apps Updates.";
45714568

4572-
// Swap the contexts to process them one by one and also clear the
4573-
// pendingContexts of the queue info: they will be enqueued back, if
4574-
// needed.
4575-
bsl::vector<VoidFunctor> pending(d_allocator_p);
4576-
pending.swap(queueContext.d_liveQInfo.d_pendingUpdates);
4577-
4578-
BMQ_LOGTHROTTLE_INFO << d_cluster_p->description() << ": processing "
4579-
<< pending.size() << " pending Apps Updates.";
4580-
4581-
for (bsl::vector<VoidFunctor>::iterator it = pending.begin();
4582-
it != pending.end();
4583-
++it) {
4584-
(*it)();
4569+
for (bsl::vector<VoidFunctor>::iterator it = pending.begin();
4570+
it != pending.end();
4571+
++it) {
4572+
(*it)();
4573+
}
45854574
}
4575+
4576+
// Resume open queue request(s) waiting for new App(s) _after_
4577+
// 'convertToLocal'
4578+
processPendingContexts(qiter->second);
45864579
}
45874580

45884581
void ClusterQueueHelper::onUpstreamNodeChange(mqbnet::ClusterNode* node,

0 commit comments

Comments
 (0)