1414// limitations under the License.
1515
1616// mqbc_clusterutil.cpp -*-C++-*-
17+ #include " mqbc_clusterstateledger.h"
1718#include < mqbc_clusterutil.h>
1819
1920#include < mqbscm_version.h>
@@ -1394,8 +1395,9 @@ void ClusterUtil::sendClusterState(
13941395 const ClusterState& clusterState,
13951396 bool sendPartitionPrimaryInfo,
13961397 bool sendQueuesInfo,
1398+ bslma::Allocator* allocator,
13971399 mqbnet::ClusterNode* node,
1398- const bsl::vector<bmqp_ctrlmsg::PartitionPrimaryInfo>& partitions )
1400+ const bsl::vector<bmqp_ctrlmsg::PartitionPrimaryInfo>& newlyAssigned )
13991401{
14001402 // executed by the cluster *DISPATCHER* thread
14011403
@@ -1406,6 +1408,13 @@ void ClusterUtil::sendClusterState(
14061408 BSLS_ASSERT_SAFE (mqbnet::ElectorState::e_LEADER ==
14071409 clusterData->electorInfo ().electorState ());
14081410 BSLS_ASSERT_SAFE (ledger && ledger->isOpen ());
1411+ if (!sendPartitionPrimaryInfo) {
1412+ BSLS_ASSERT_SAFE (newlyAssigned.empty ());
1413+ }
1414+ else if (!newlyAssigned.empty ()) {
1415+ BSLS_ASSERT_SAFE (newlyAssigned.size () ==
1416+ clusterState.partitions ().size ());
1417+ }
14091418
14101419 if (clusterData->clusterConfig ().clusterAttributes ().isFSMWorkflow ()) {
14111420 // In FSM mode, the *only* possible caller of this method is
@@ -1414,14 +1423,6 @@ void ClusterUtil::sendClusterState(
14141423 BSLS_ASSERT_SAFE (sendPartitionPrimaryInfo && !sendQueuesInfo);
14151424 }
14161425
1417- if (sendPartitionPrimaryInfo) {
1418- BSLS_ASSERT_SAFE (partitions.size () ==
1419- clusterState.partitions ().size ());
1420- }
1421- else {
1422- BSLS_ASSERT_SAFE (partitions.empty ());
1423- };
1424-
14251426 if (bmqp_ctrlmsg::NodeStatus::E_STOPPING ==
14261427 clusterData->membership ().selfNodeStatus ()) {
14271428 // No need to send cluster state since self is stopping. After self
@@ -1445,8 +1446,47 @@ void ClusterUtil::sendClusterState(
14451446 clusterData->electorInfo ().nextLeaderMessageSequence (
14461447 &advisory.sequenceNumber ());
14471448
1448- advisory.partitions () = partitions;
1449- loadQueuesInfo (&advisory.queues (), clusterState);
1449+ if (!newlyAssigned.empty ()) {
1450+ // If we are sending a cluster state snapshot and there are new
1451+ // assignments, it implies that this will be the first leader
1452+ // advisory and thus there won't be any uncommitted CSL advisories.
1453+ advisory.partitions () = newlyAssigned;
1454+ loadQueuesInfo (&advisory.queues (), clusterState);
1455+ }
1456+ else {
1457+ // We construct a "merged state" of current cluster state and
1458+ // information from uncommitted CSL advisories, before sending over
1459+ // to follower(s).
1460+ //
1461+ // NOTE: This code path is *only* reachable in non-FSM mode.
1462+ ClusterState tempState (
1463+ &clusterData->cluster (),
1464+ clusterData->clusterConfig ().partitionConfig ().numPartitions (),
1465+ allocator);
1466+ apply (&tempState, clusterMessage, *clusterData);
1467+ const int rc = load (&tempState,
1468+ ledger->getIterator ().get (),
1469+ *clusterData,
1470+ allocator);
1471+ if (rc != 0 ) {
1472+ BALL_LOG_ERROR
1473+ << clusterData->identity ().description ()
1474+ << " : Failed to load CSL content to temp cluster "
1475+ << " state as part of 'sendClusterState', rc: " << rc;
1476+ }
1477+
1478+ ClusterStateLedger::ClusterMessageCRefList uncommittedAdvisories;
1479+ ledger->uncommittedAdvisories (&uncommittedAdvisories);
1480+ for (ClusterStateLedger::ClusterMessageCRefList::const_iterator
1481+ cit = uncommittedAdvisories.begin ();
1482+ cit != uncommittedAdvisories.end ();
1483+ ++cit) {
1484+ apply (&tempState, *cit, *clusterData);
1485+ }
1486+
1487+ loadPartitionsInfo (&advisory.partitions (), tempState);
1488+ loadQueuesInfo (&advisory.queues (), tempState);
1489+ }
14501490 }
14511491 else if (sendPartitionPrimaryInfo) {
14521492 bmqp_ctrlmsg::PartitionPrimaryAdvisory& advisory =
@@ -1455,7 +1495,7 @@ void ClusterUtil::sendClusterState(
14551495 clusterData->electorInfo ().nextLeaderMessageSequence (
14561496 &advisory.sequenceNumber ());
14571497
1458- advisory.partitions () = partitions ;
1498+ advisory.partitions () = newlyAssigned ;
14591499 }
14601500 else {
14611501 BSLS_ASSERT_SAFE (sendQueuesInfo);
@@ -1960,7 +2000,7 @@ void ClusterUtil::loadPartitionsInfo(
19602000 const ClusterState& state)
19612001{
19622002 // PRECONDITIONS
1963- BSLS_ASSERT_SAFE (out);
2003+ BSLS_ASSERT_SAFE (out && out-> empty () );
19642004
19652005 for (int pid = 0 ; pid < static_cast <int >(state.partitions ().size ());
19662006 ++pid) {
0 commit comments