Skip to content
Open
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
82d289b
Update lsn quorum on quroum update
bbpetukhov Oct 14, 2025
229f3b6
Add new switch - restart cluster as fsm with single node up and quorum=1
bbpetukhov Oct 14, 2025
57dba69
Implement single source of truth for quorum
bbpetukhov Oct 22, 2025
b8905f8
Fix py format
bbpetukhov Oct 22, 2025
21923c2
Respect default quorum value
bbpetukhov Oct 22, 2025
4e5574f
Implement ClusterQuorumManager - thread safe storage for cluster shar…
bbpetukhov Oct 24, 2025
290e97b
Add 4th switch to test changing cluster state to fsm with 1 node with…
bbpetukhov Oct 24, 2025
54b456b
Merge branch 'main' into it-legacy-to-fsm-switch-via-quorum1
bbpetukhov Oct 24, 2025
24b8c49
clang-format
bbpetukhov Oct 24, 2025
28ecbc3
Fix python linter
bbpetukhov Oct 24, 2025
1dea4fe
Update src/groups/mqb/mqbcfg/mqbcfg_clusterquorummanager.cpp
bbpetukhov Nov 3, 2025
c21a7d2
Update src/groups/mqb/mqbcfg/mqbcfg_clusterquorummanager.h
bbpetukhov Nov 3, 2025
3fe01cb
Fix mqbcfg.mem
bbpetukhov Nov 3, 2025
11fb10d
Update src/groups/mqb/mqbcfg/mqbcfg_clusterquorummanager.h
bbpetukhov Nov 3, 2025
7f5e3c9
Update src/groups/mqb/mqbc/mqbc_storagemanager.cpp
bbpetukhov Nov 3, 2025
5066ebf
Update src/groups/mqb/mqbc/mqbc_storagemanager.h
bbpetukhov Nov 3, 2025
df83cc9
Update src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
bbpetukhov Nov 3, 2025
43fb9a2
Update src/groups/mqb/mqbc/mqbc_clusterstatemanager.h
bbpetukhov Nov 3, 2025
2880025
Update src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp
bbpetukhov Nov 3, 2025
6ec2c5b
Syncronize values of quorum used in expression and logged
bbpetukhov Nov 3, 2025
dcfa810
Refactor ClusterStateManager
bbpetukhov Nov 3, 2025
1bcb775
Remove redundant d_clusterConfig from IncoreClusterStateLedger
bbpetukhov Nov 3, 2025
693ec72
mqbcfg.xsd: Elector quorum from int -> unsigned int
kaikulimu Oct 25, 2025
130d57c
Use unsigned int for quorum
bbpetukhov Nov 3, 2025
210d7bc
Update documentation on elector
bbpetukhov Nov 3, 2025
fdeb9c1
Removed redundant consistencyLevel
bbpetukhov Nov 3, 2025
17768b6
Merge branch 'main' into it-legacy-to-fsm-switch-via-quorum1
bbpetukhov Nov 3, 2025
ad59112
clang-format
bbpetukhov Nov 3, 2025
8c4ab0d
Improved types, fix implicit conversion warnings
bbpetukhov Nov 4, 2025
fa5f622
Allow high quorum values
bbpetukhov Nov 4, 2025
83bca9c
Update src/groups/mqb/mqbcfg/mqbcfg_clusterquorummanager.cpp
bbpetukhov Nov 5, 2025
cef74e7
Update src/groups/mqb/mqbcfg/package/mqbcfg.mem
bbpetukhov Nov 5, 2025
69802cd
Add github action which checks ordering in .mem files
bbpetukhov Nov 5, 2025
97ee3fc
Fix alphabetical lines ordering in all .mem files
bbpetukhov Nov 5, 2025
dac5f12
Update src/integration-tests/test_restart_between_modes.py
bbpetukhov Nov 5, 2025
24a3f38
Remove redundant helper
bbpetukhov Nov 5, 2025
d5f34c2
try enable cluster fixture for all new ITs
bbpetukhov Nov 7, 2025
dfa6e47
Extend IT scenarios to include rollback to previous mode
bbpetukhov Nov 7, 2025
cc5e262
Enable switch_cluster_mode for test_restart_between_Legacy_and_FSM
bbpetukhov Nov 7, 2025
7f0216a
Merge branch 'main' into it-legacy-to-fsm-switch-via-quorum1
bbpetukhov Nov 7, 2025
8afc20d
Update IT - check only alive nodes in case some nodes are switched of…
bbpetukhov Nov 10, 2025
04bfe31
Merge branch 'main' into it-legacy-to-fsm-switch-via-quorum1
bbpetukhov Nov 10, 2025
488bd6d
Refactor ITs. Set replication factor along with quorum to 1 for singl…
bbpetukhov Nov 13, 2025
66c48fd
Merge branch 'main' into it-legacy-to-fsm-switch-via-quorum1
bbpetukhov Nov 13, 2025
2606eac
Fix comments in ITs
bbpetukhov Nov 13, 2025
1171669
Enable all testing switches in ITs
bbpetukhov Nov 14, 2025
391f450
Update docs
bbpetukhov Nov 14, 2025
54cf79e
Remove redundant defatul_consumer
bbpetukhov Nov 14, 2025
de285fb
Update add/remove appids test
bbpetukhov Nov 14, 2025
9daaa78
Extend purge queue/appId test
bbpetukhov Nov 14, 2025
05d6ab4
Merge branch 'main' into it-legacy-to-fsm-switch-via-quorum1
bbpetukhov Nov 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .github/workflows/check-mem-files.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: Check alphabetical order in .mem files

on:
pull_request:
types:
- "opened"
- "reopened"
- "synchronize"
- "labeled"
- "unlabeled"

jobs:
check-sorting:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Check alphabetical order in .mem files
run: |
#!/usr/bin/env bash
set -euo pipefail

echo "Checking all .mem files in src/ for sorted order..."

# Find all .mem files recursively under src/
files=$(find src -type f -name "*.mem")

# Track whether we find any unsorted files
unsorted=0

for f in $files; do
# Compare file with its sorted version
if ! diff -q <(sort "$f") "$f" > /dev/null; then
echo "❌ File not sorted alphabetically: $f"
echo " To fix, run: sort -o $f $f"
unsorted=1
fi
done

if [ "$unsorted" -eq 1 ]; then
echo
echo "Some .mem files are not sorted alphabetically."
exit 1
fi

echo "✅ All .mem files are sorted."
2 changes: 1 addition & 1 deletion src/applications/bmqtool/package/bmqtool.mem
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ m_bmqtool_application
m_bmqtool_filelogger
m_bmqtool_inpututil
m_bmqtool_interactive
m_bmqtool_storageinspector
m_bmqtool_messages
m_bmqtool_parameters
m_bmqtool_poster
m_bmqtool_statutil
m_bmqtool_storageinspector
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/package/bmqp.mem
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ bmqp_ctrlmsg_messages
bmqp_event
bmqp_eventutil
bmqp_heartbeatmonitor
bmqp_messageproperties
bmqp_messageguidgenerator
bmqp_messageproperties
bmqp_optionsview
bmqp_optionutil
bmqp_protocol
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/group/bmq.mem
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# [OFFLINE ONLY]

# [OFFLINE ONLY]
bmqa
bmqc
bmqeval
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbauthn/package/mqbauthn.mem
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

mqbauthn_anonauthenticator
mqbauthn_authenticationcontroller
mqbauthn_basicauthenticator
mqbauthn_pluginlibrary
mqbauthn_authenticationcontroller

14 changes: 7 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,11 @@ void ClusterOrchestrator::onNodeUnavailable(mqbnet::ClusterNode* node)

BALL_LOG_INFO_BLOCK
{
BALL_LOG_OUTPUT_STREAM
<< d_clusterData_p->identity().description() << ": "
<< node->nodeDescription() << " has gone down. "
<< "Node was primary for " << ns->primaryPartitions().size()
<< " partition(s): [";
BALL_LOG_OUTPUT_STREAM << d_clusterData_p->identity().description()
<< ": " << node->nodeDescription()
<< " has gone down. " << "Node was primary for "
<< ns->primaryPartitions().size()
<< " partition(s): [";
for (unsigned int i = 0; i < ns->primaryPartitions().size(); ++i) {
BALL_LOG_OUTPUT_STREAM << ns->primaryPartitions()[i];
if (i + 1 < ns->primaryPartitions().size()) {
Expand Down Expand Up @@ -582,7 +582,6 @@ ClusterOrchestrator::ClusterOrchestrator(
mqbc::IncoreClusterStateLedger>(
d_allocators.get("ClusterStateLedger"),
clusterConfig,
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_clusterData_p,
clusterState,
&d_clusterData_p->blobSpPool())),
Expand All @@ -599,7 +598,6 @@ ClusterOrchestrator::ClusterOrchestrator(
mqbc::IncoreClusterStateLedger>(
d_allocators.get("ClusterStateLedger"),
clusterConfig,
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_clusterData_p,
clusterState,
&d_clusterData_p->blobSpPool())),
Expand Down Expand Up @@ -685,6 +683,7 @@ int ClusterOrchestrator::start(bsl::ostream& errorDescription)
new (*d_allocator_p) mqbnet::Elector(
d_clusterConfig.elector(),
&d_clusterData_p->cluster(),
&d_clusterData_p->quorumManager(),
bdlf::BindUtil::bind(&ClusterOrchestrator::onElectorStateChange,
this,
_1, // ElectorState
Expand Down Expand Up @@ -1897,6 +1896,7 @@ int ClusterOrchestrator::processCommand(
mqbcmd::ElectorResult electorResult;
int rc = d_elector_mp->processCommand(&electorResult,
command.elector());

if (electorResult.isErrorValue()) {
result->makeError(electorResult.error());
}
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ ClusterData::ClusterData(
, d_resources(resources)
, d_dispatcherClientData()
, d_clusterConfig(clusterConfig)
, d_quorumManager(clusterConfig.elector().quorum(),
static_cast<unsigned int>(clusterConfig.nodes().size()))
, d_clusterProxyConfig(clusterProxyConfig)
, d_electorInfo(cluster)
, d_membership(netCluster, allocator)
Expand Down
12 changes: 12 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <mqbc_clustermembership.h>
#include <mqbc_controlmessagetransmitter.h>
#include <mqbc_electorinfo.h>
#include <mqbcfg_clusterquorummanager.h>
#include <mqbcfg_messages.h>
#include <mqbi_cluster.h>
#include <mqbi_dispatcher.h>
Expand Down Expand Up @@ -171,6 +172,9 @@ class ClusterData {
/// Elector information.
ElectorInfo d_electorInfo;

/// Quorum manager.
mqbcfg::ClusterQuorumManager d_quorumManager;

/// The membership information of the cluster.
ClusterMembership d_membership;

Expand Down Expand Up @@ -262,6 +266,9 @@ class ClusterData {
/// Get a modifiable reference to this object's cluster.
mqbi::Cluster& cluster();

/// Get a modifiable reference to this object's quorum manager.
mqbcfg::ClusterQuorumManager& quorumManager();

/// Get a modifiable reference to this object's messageTransmitter.
ControlMessageTransmitter& messageTransmitter();

Expand Down Expand Up @@ -391,6 +398,11 @@ inline mqbi::Cluster& ClusterData::cluster()
return *d_cluster_p;
}

inline mqbcfg::ClusterQuorumManager& ClusterData::quorumManager()
{
return d_quorumManager;
}

inline ControlMessageTransmitter& ClusterData::messageTransmitter()
{
return d_messageTransmitter;
Expand Down
3 changes: 1 addition & 2 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ ClusterStateManager::ClusterStateManager(
, d_state_p(clusterState)
, d_clusterFSM(*this)
, d_nodeToLedgerLSNMap(allocator)
, d_lsnQuorum((clusterConfig.nodes().size() / 2) + 1)
// TODO Add cluster config to determine Eventual vs Strong
, d_clusterStateLedger_mp(clusterStateLedger)
, d_storageManager_p(0)
Expand Down Expand Up @@ -596,7 +595,7 @@ void ClusterStateManager::do_checkLSNQuorum(const ClusterFSMArgsSp& args)
BSLS_ASSERT_SAFE(d_clusterData_p->electorInfo().isSelfLeader() &&
d_clusterFSM.isSelfLeader());

if (d_nodeToLedgerLSNMap.size() >= d_lsnQuorum) {
if (d_nodeToLedgerLSNMap.size() >= getLsnQuorum()) {
// If we have a quorum of LSNs (including self LSN)

BALL_LOG_INFO << d_clusterData_p->identity().description()
Expand Down
12 changes: 8 additions & 4 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
/// ledger leader sequence number.
NodeToLSNMap d_nodeToLedgerLSNMap;

/// Number of LSN views gathered from cluster nodes (including self)
/// required to achieve a quorum.
unsigned int d_lsnQuorum;

/// Underlying cluster state ledger.
ClusterStateLedgerMp d_clusterStateLedger_mp;

Expand Down Expand Up @@ -629,6 +625,9 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
///
/// @note Used for testing purposes only.
const NodeToLSNMap& nodeToLSNMap() const;

/// Get the quorum value.
unsigned int getLsnQuorum() const;
};

// ============================================================================
Expand Down Expand Up @@ -676,6 +675,11 @@ ClusterStateManager::nodeToLSNMap() const
return d_nodeToLedgerLSNMap;
}

inline unsigned int ClusterStateManager::getLsnQuorum() const
{
return d_clusterData_p->quorumManager().quorum();
}

} // close package namespace
} // close enterprise namespace

Expand Down
28 changes: 10 additions & 18 deletions src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,6 @@ int IncoreClusterStateLedger::applyRecordInternalImpl(
<< "' to all cluster nodes";
}

// A follower does not reply Ack under eventual consistency
if (!isSelfLeader() &&
d_consistencyLevel == ClusterStateLedgerConsistency::e_EVENTUAL) {
return rc_SUCCESS; // RETURN
}

bmqp_ctrlmsg::ClusterMessage ackMessage;
ackMessage.choice().makeLeaderAdvisoryAck().sequenceNumberAcked() =
sequenceNumber;
Expand Down Expand Up @@ -688,7 +682,10 @@ int IncoreClusterStateLedger::applyRecordInternalImpl(
}

iter->second.d_ackCount += 1;
if (iter->second.d_ackCount == d_ackQuorum) {

const unsigned int ackQuorum = getAckQuorum();

if (iter->second.d_ackCount == ackQuorum) {
// Consistency level reached. Apply a commit message for the
// advisory, broadcast it, and invoke the 'CommitCb'.
bmqp_ctrlmsg::ClusterMessage commitMessage;
Expand All @@ -701,7 +698,7 @@ int IncoreClusterStateLedger::applyRecordInternalImpl(
BSLS_ASSERT_SAFE(commitAdvisory.sequenceNumber() >
commitAdvisory.sequenceNumberCommitted());

BALL_LOG_INFO << description() << "Quorum of " << d_ackQuorum
BALL_LOG_INFO << description() << "Quorum of " << ackQuorum
<< " acks is achieved for advisory of seqNum "
<< ack.sequenceNumberAcked()
<< ", creating and applying commit advisory: "
Expand Down Expand Up @@ -1157,23 +1154,18 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event,

// CREATORS
IncoreClusterStateLedger::IncoreClusterStateLedger(
const mqbcfg::ClusterDefinition& clusterDefinition,
ClusterStateLedgerConsistency::Enum consistencyLevel,
ClusterData* clusterData,
ClusterState* clusterState,
BlobSpPool* blobSpPool_p,
bslma::Allocator* allocator)
const mqbcfg::ClusterDefinition& clusterDefinition,
ClusterData* clusterData,
ClusterState* clusterState,
BlobSpPool* blobSpPool_p,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_isOpen(false)
, d_blobSpPool_p(blobSpPool_p)
, d_description(allocator)
, d_commitCb()
, d_clusterData_p(clusterData)
, d_clusterState_p(clusterState)
, d_consistencyLevel(consistencyLevel)
, d_ackQuorum(consistencyLevel == ClusterStateLedgerConsistency::e_STRONG
? (clusterDefinition.nodes().size() / 2) + 1
: 1)
, d_ledgerConfig(allocator)
, d_ledger_mp(0)
, d_uncommittedAdvisories(allocator)
Expand Down
30 changes: 15 additions & 15 deletions src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ struct IncoreClusterStateLedger_ClusterMessageInfo {
bsls::Types::Uint64 d_timestampNs;

/// Number of ACKs received for this `ClusterMessage`.
int d_ackCount;
unsigned int d_ackCount;

// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(IncoreClusterStateLedger_ClusterMessageInfo,
Expand Down Expand Up @@ -184,13 +184,6 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger {
/// Cluster's state.
const ClusterState* d_clusterState_p;

/// Desired consistency level (eventual vs. strong), configured by the
/// user.
ClusterStateLedgerConsistency::Enum d_consistencyLevel;

/// Number of nodes required to achieve consistency level.
int d_ackQuorum;

/// Ledger configuration.
mqbsi::LedgerConfig d_ledgerConfig;

Expand Down Expand Up @@ -282,6 +275,9 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger {
/// dispatcher thread.
bool isSelfLeader() const;

// Return the acknowledgment quorum required for this ledger.
unsigned int getAckQuorum() const;

public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(IncoreClusterStateLedger,
Expand All @@ -290,16 +286,15 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger {
// CREATORS

/// Create a new @bbref{mqbc::IncoreClusterStateLedger} with the specified
/// `clusterDefinition`, `consistencyLevel`, `clusterData` and
/// `clusterDefinition`, `clusterData` and
/// `clusterState`, and using the specified `bufferFactory` and `allocator`
/// to supply memory.
IncoreClusterStateLedger(
const mqbcfg::ClusterDefinition& clusterDefinition,
ClusterStateLedgerConsistency::Enum consistencyLevel,
ClusterData* clusterData,
ClusterState* clusterState,
BlobSpPool* blobSpPool_p,
bslma::Allocator* allocator);
const mqbcfg::ClusterDefinition& clusterDefinition,
ClusterData* clusterData,
ClusterState* clusterState,
BlobSpPool* blobSpPool_p,
bslma::Allocator* allocator);

/// Destructor.
~IncoreClusterStateLedger() BSLS_KEYWORD_OVERRIDE;
Expand Down Expand Up @@ -456,6 +451,11 @@ inline bool IncoreClusterStateLedger::isSelfLeader() const
return d_clusterData_p->electorInfo().isSelfLeader();
}

inline unsigned int IncoreClusterStateLedger::getAckQuorum() const
{
return d_clusterData_p->quorumManager().quorum();
}

// MANIPULATORS
// (virtual mqbc::ClusterStateLedger)
inline void IncoreClusterStateLedger::setCommitCb(const CommitCb& value)
Expand Down
5 changes: 2 additions & 3 deletions src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ struct Tester {
new (*bmqtst::TestHelperUtil::allocator())
mqbc::IncoreClusterStateLedger(
d_cluster_mp->_clusterDefinition(),
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_cluster_mp->_clusterData(),
d_cluster_mp->_state(),
d_cluster_mp->_blobSpPool(),
Expand Down Expand Up @@ -1752,8 +1751,8 @@ static void test11_persistanceAcrossRolloverLeader()
qadvisory.queues().push_back(qinfoI);
}

size_t i = 0;
bool hasUncommittedBeforeRollover =
int i = 0;
bool hasUncommittedBeforeRollover =
true; // Either the qadvisory or its commit can trigger rollover. If
// the commit triggers rollover, that means we have an
// uncomitted advisory before rollover.
Expand Down
Loading
Loading