@@ -168,8 +168,49 @@ void countUnconfirmed(bsls::Types::Int64* result, mqbi::Queue* queue)
168168 bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID);
169169}
170170
171+ template <typename T>
172+ struct ConditionalAdvance {
173+ bool d_doAdvance;
174+
175+ ConditionalAdvance ()
176+ : d_doAdvance(true )
177+ {
178+ // NOTHING
179+ }
180+
181+ void advance (T& x)
182+ {
183+ if (d_doAdvance) {
184+ ++x;
185+ }
186+ else {
187+ d_doAdvance = true ;
188+ }
189+ }
190+ void release () { d_doAdvance = false ; }
191+ };
192+
171193} // close unnamed namespace
172194
195+ // ---------------------------------------
196+ // struct ClusterQueueHelper::QueueContext
197+ // ---------------------------------------
198+
199+ void ClusterQueueHelper::QueueContext::respond (
200+ const bmqp_ctrlmsg::Status& status) const
201+ {
202+ for (bsl::vector<OpenQueueContextSp>::const_iterator
203+ cIt = d_liveQInfo.d_pending .begin (),
204+ cLast = d_liveQInfo.d_pending .end ();
205+ cIt != cLast;
206+ ++cIt) {
207+ (*cIt)->d_callback (status,
208+ 0 ,
209+ bmqp_ctrlmsg::OpenQueueResponse (),
210+ mqbi::Cluster::OpenQueueConfirmationCookie ());
211+ }
212+ }
213+
173214// -------------------------------------------
174215// struct ClusterQueueHelper::OpenQueueContext
175216// -------------------------------------------
@@ -401,15 +442,42 @@ void ClusterQueueHelper::afterPartitionPrimaryAssignment(
401442 }
402443}
403444
404- void ClusterQueueHelper::assignQueue (const QueueContextSp& queueContext)
445+ bool ClusterQueueHelper::assignQueueIfNeeded (
446+ const QueueContextSp& queueContext_sp)
447+ {
448+ // executed by the cluster *DISPATCHER* thread
449+
450+ // PRECONDITIONS
451+ BSLS_ASSERT_SAFE (
452+ d_cluster_p->dispatcher ()->inDispatcherThread (d_cluster_p));
453+
454+ BSLS_ASSERT_SAFE (queueContext_sp);
455+
456+ if (isQueueAssigned (*queueContext_sp)) {
457+ // Already assigned, nothing to do
458+ return true ; // RETURN
459+ }
460+
461+ // Queue is not assigned to a partition; get it assigned. If
462+ // self is leader, it will assign it locally, if not it will
463+ // send a request to the leader, etc.
464+
465+ return assignQueue (queueContext_sp);
466+ }
467+
468+ bool ClusterQueueHelper::assignQueue (const QueueContextSp& queueContext)
405469{
406470 // executed by the cluster *DISPATCHER* thread
407471
408472 // PRECONDITIONS
409473 BSLS_ASSERT_SAFE (
410474 d_cluster_p->dispatcher ()->inDispatcherThread (d_cluster_p));
475+
476+ BSLS_ASSERT_SAFE (queueContext);
411477 BSLS_ASSERT_SAFE (!isQueueAssigned (*queueContext));
412478
479+ bool result = true ;
480+
413481 if (d_cluster_p->isRemote ()) {
414482 // Assigning a queue in a remote, is simply giving it a new queueId.
415483 queueContext->d_liveQInfo .d_id = getNextQueueId ();
@@ -419,11 +487,12 @@ void ClusterQueueHelper::assignQueue(const QueueContextSp& queueContext)
419487 if (d_clusterData_p->electorInfo ().isSelfLeader ()) {
420488 bmqp_ctrlmsg::Status status (d_allocator_p);
421489
422- if (!d_clusterStateManager_p->assignQueue (queueContext->uri (),
423- &status)) {
424- processRejectedQueueAssignment (queueContext.get (), status);
490+ result = d_clusterStateManager_p->assignQueue (queueContext->uri (),
491+ &status);
492+
493+ if (result == false ) {
494+ queueContext->respond (status);
425495 }
426- // else, all other failure are transient. will retry.
427496 }
428497 else {
429498 requestQueueAssignment (queueContext->uri ());
@@ -439,6 +508,8 @@ void ClusterQueueHelper::assignQueue(const QueueContextSp& queueContext)
439508 << queueContext->uri ()
440509 << " ' (waiting for an ACTIVE leader)." ;
441510 }
511+
512+ return result;
442513}
443514
444515void ClusterQueueHelper::requestQueueAssignment (const bmqt::Uri& uri)
@@ -618,9 +689,8 @@ void ClusterQueueHelper::onQueueAssignmentResponse(
618689
619690 QueueContextMapIter qit = d_queues.find (uri);
620691 if (qit != d_queues.end ()) {
621- const QueueContext* rejected = qit->second .get ();
622-
623- processRejectedQueueAssignment (rejected, status);
692+ qit->second ->respond (status);
693+ d_queues.erase (qit);
624694 }
625695 }
626696 }
@@ -3520,29 +3590,6 @@ void ClusterQueueHelper::restoreState(int partitionId)
35203590 }
35213591}
35223592
3523- void ClusterQueueHelper::processRejectedQueueAssignment (
3524- const QueueContext* rejected,
3525- const bmqp_ctrlmsg::Status& status)
3526- {
3527- // executed by the cluster *DISPATCHER* thread
3528-
3529- // PRECONDITIONS
3530- BSLS_ASSERT_SAFE (
3531- d_cluster_p->dispatcher ()->inDispatcherThread (d_cluster_p));
3532-
3533- for (bsl::vector<OpenQueueContextSp>::const_iterator
3534- cIt = rejected->d_liveQInfo .d_pending .begin (),
3535- cLast = rejected->d_liveQInfo .d_pending .end ();
3536- cIt != cLast;
3537- ++cIt) {
3538- (*cIt)->d_callback (status,
3539- 0 ,
3540- bmqp_ctrlmsg::OpenQueueResponse (),
3541- mqbi::Cluster::OpenQueueConfirmationCookie ());
3542- }
3543- d_queues.erase (rejected->uri ());
3544- }
3545-
35463593void ClusterQueueHelper::restoreStateRemote ()
35473594{
35483595 // executed by the cluster *DISPATCHER* thread
@@ -3564,9 +3611,11 @@ void ClusterQueueHelper::restoreStateRemote()
35643611
35653612 // Attempt to re-issue open-queue requests for all applicable queues.
35663613
3614+ ConditionalAdvance<QueueContextMapConstIter> conditional;
3615+
35673616 for (QueueContextMapConstIter cit = d_queues.cbegin ();
35683617 cit != d_queues.cend ();
3569- ++ cit) {
3618+ conditional. advance ( cit) ) {
35703619 const QueueContextSp& queueContext = cit->second ;
35713620 QueueLiveState& liveQInfo = queueContext->d_liveQInfo ;
35723621
@@ -3581,10 +3630,13 @@ void ClusterQueueHelper::restoreStateRemote()
35813630 continue ; // CONTINUE
35823631 }
35833632
3584- if (!isQueueAssigned (*queueContext. get () )) {
3633+ if (!isQueueAssigned (*queueContext)) {
35853634 // Queue is not assigned to a partition; get it assigned.
35863635
3587- assignQueue (queueContext);
3636+ if (!assignQueue (queueContext)) {
3637+ conditional.release ();
3638+ cit = d_queues.erase (cit);
3639+ }
35883640
35893641 continue ; // CONTINUE
35903642 }
@@ -3715,10 +3767,10 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
37153767 d_clusterState_p->iterateDoubleAssignments (partitionId,
37163768 doubleAssignmentVisitor);
37173769 }
3718-
3770+ ConditionalAdvance<QueueContextMapConstIter> conditional;
37193771 for (QueueContextMapConstIter cit = d_queues.cbegin ();
37203772 cit != d_queues.cend ();
3721- ++ cit) {
3773+ conditional. advance ( cit) ) {
37223774 const QueueContextSp& queueContext = cit->second ;
37233775 QueueLiveState& liveQInfo = queueContext->d_liveQInfo ;
37243776 if (allPartitions) {
@@ -3739,12 +3791,13 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
37393791 continue ; // CONTINUE
37403792 }
37413793
3742- if (!isQueueAssigned (*queueContext.get ())) {
3743- // Queue is not assigned to a partition; get it assigned. If
3744- // self is leader, it will assign it locally, if not it will
3745- // send a request to the leader, etc.
3794+ if (!isQueueAssigned (*queueContext)) {
3795+ // Queue is not assigned to a partition; get it assigned.
37463796
3747- assignQueue (queueContext);
3797+ if (!assignQueue (queueContext)) {
3798+ conditional.release ();
3799+ cit = d_queues.erase (cit);
3800+ }
37483801
37493802 continue ; // CONTINUE
37503803 }
@@ -4738,30 +4791,35 @@ void ClusterQueueHelper::openQueue(
47384791
47394792 // 'OpenQueueContext::~OpenQueueContext' decrements 'd_inFlight' counter.
47404793
4794+ bool isAssigned = false ;
4795+
47414796 // Check if we are already aware of the queue.
47424797 if (queueContextIt != d_queues.end ()) {
47434798 // Already aware of the queue; but the queue may not yet have been
47444799 // assigned.
4745- context->setQueueContext (queueContextIt->second .get ());
4800+ QueueContext& queueContext = *queueContextIt->second ;
4801+
4802+ context->setQueueContext (&queueContext);
47464803
47474804 // In case queue was marked for expiration, explicitly unmark it. Note
47484805 // that self may be a replica or a passive primary, but it's ok to
47494806 // simply unmark the queue. Also note that this is the necessary and
47504807 // sufficient place to unmark a queue, as 'openQueue' is the entry
47514808 // point.
4752- queueContextIt-> second -> d_liveQInfo .d_queueExpirationTimestampMs = 0 ;
4809+ queueContext. d_liveQInfo .d_queueExpirationTimestampMs = 0 ;
47534810
4754- if (isQueuePrimaryAvailable (*(context-> queueContext ()) )) {
4811+ if (isQueuePrimaryAvailable (queueContext)) {
47554812 // Queue is already assigned and the primary is AVAILABLE, all
47564813 // good; move on to next step, i.e., processing the open request.
47574814 processOpenQueueRequest (context);
4815+ isAssigned = true ;
47584816 }
47594817 else {
47604818 // The queue is already known but either not assigned, or its
47614819 // primary is not yet available. In both scenarios, we append that
47624820 // context to the pending list that will be picked up and resumed
47634821 // once the next event (primary available, queue assigned) happens.
4764- QueueContext& queueContext = *(queueContextIt-> second . get ());
4822+
47654823 queueContext.d_liveQInfo .d_pending .push_back (context);
47664824
47674825 BALL_LOGTHROTTLE_INFO_BLOCK (k_MAX_INSTANT_MESSAGES,
@@ -4811,14 +4869,13 @@ void ClusterQueueHelper::openQueue(
48114869 // advisory are received). So to be safe, we explicitly attempt to
48124870 // assign the queue, which is a no-op in case there is no leader.
48134871
4814- if (!isQueueAssigned (*(queueContextIt->second ))) {
4815- // In CSL, unassignment is async.
4816- // Since QueueUnassignmentAdvisory can contain multiple queues,
4817- // canceling pending Advisory is not an option.
4818- // Instead, initiate new QueueAssignemntAdvisory which must
4819- // take effect after old QueueUnassignemntAdvisory.
4820- assignQueue (queueContextIt->second );
4821- }
4872+ isAssigned = isQueueAssigned (queueContext);
4873+
4874+ // In CSL, unassignment is async.
4875+ // Since QueueUnassignmentAdvisory can contain multiple queues,
4876+ // canceling pending Advisory is not an option.
4877+ // Instead, initiate new QueueAssignemntAdvisory which must
4878+ // take effect after old QueueUnassignemntAdvisory.
48224879 }
48234880 }
48244881 else {
@@ -4827,14 +4884,20 @@ void ClusterQueueHelper::openQueue(
48274884 QueueContextSp queueContext;
48284885 queueContext.createInplace (d_allocator_p, uriKey, d_allocator_p);
48294886
4830- d_queues[uriKey] = queueContext;
48314887 context->setQueueContext (queueContext.get ());
48324888
48334889 // Register the context to the pending list.
48344890 queueContext->d_liveQInfo .d_pending .push_back (context);
48354891
4892+ // Need to insert before calling 'assignQueue'
4893+ queueContextIt = d_queues.emplace (uriKey, queueContext).first ;
4894+ }
4895+
4896+ if (!isAssigned) {
48364897 // Initiate the assignment.
4837- assignQueue (queueContext);
4898+ if (!assignQueue (queueContextIt->second )) {
4899+ d_queues.erase (queueContextIt);
4900+ }
48384901 }
48394902}
48404903
0 commit comments