Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

// mqbblp_storagemanager.cpp -*-C++-*-
#include <ball_log.h>
#include <mqbblp_storagemanager.h>

#include <mqbscm_version.h>
Expand Down Expand Up @@ -604,12 +605,22 @@ void StorageManager::clearPrimaryForPartitionDispatched(
mqbs::FileStore* fs = d_fileStores[partitionId].get();
PartitionInfo& pinfo = d_partitionInfoVec[partitionId];

if (primary != pinfo.primary()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< " Partition [" << partitionId
<< "]: Failed to clear primary as specified primary: "
<< (primary ? primary->nodeDescription() : "**null**")
<< " is different from current perceived primary: "
<< (pinfo.primary() ? pinfo.primary()->nodeDescription()
: "** null **");
return; // RETURN
}

mqbc::StorageUtil::clearPrimaryForPartition(
fs,
&pinfo,
d_clusterData_p->identity().description(),
partitionId,
primary);
partitionId);
}

void StorageManager::processStorageEventDispatched(
Expand Down
91 changes: 30 additions & 61 deletions src/groups/mqb/mqbc/mqbc_partitionstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,8 @@ class PartitionStateTableActions {
void do_cleanupMetadata_clearPartitionInfo_stopWatchDog_reapplyEvent(
const ARGS& args);

void do_cleanupMetadata_clearPartitionInfo(const ARGS& args);

void
do_cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog(
const ARGS& args);
do_cleanupMetadata_closeRecoveryFileSet_stopWatchDog(const ARGS& args);

void do_cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfPrimary(
const ARGS& args);
Expand Down Expand Up @@ -475,16 +472,14 @@ class PartitionStateTable
REPLICA_HIGHEST_SEQ,
setExpectedDataChunkRange_replicaDataRequestPull,
PRIMARY_HEALING_STG2);
PST_CFG(
PRIMARY_HEALING_STG1,
RST_UNKNOWN,
cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog,
UNKNOWN);
PST_CFG(
PRIMARY_HEALING_STG1,
STOP_NODE,
cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog,
STOPPED);
PST_CFG(PRIMARY_HEALING_STG1,
RST_UNKNOWN,
cleanupMetadata_closeRecoveryFileSet_stopWatchDog,
UNKNOWN);
PST_CFG(PRIMARY_HEALING_STG1,
STOP_NODE,
cleanupMetadata_closeRecoveryFileSet_stopWatchDog,
STOPPED);
PST_CFG(PRIMARY_HEALING_STG1,
WATCH_DOG,
cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfPrimary,
Expand Down Expand Up @@ -533,20 +528,18 @@ class PartitionStateTable
QUORUM_REPLICA_DATA_RSPN,
stopWatchDog_transitionToActivePrimary,
PRIMARY_HEALED);
PST_CFG(
PRIMARY_HEALING_STG2,
RST_UNKNOWN,
cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog,
UNKNOWN);
PST_CFG(PRIMARY_HEALING_STG2,
RST_UNKNOWN,
cleanupMetadata_closeRecoveryFileSet_stopWatchDog,
UNKNOWN);
PST_CFG(PRIMARY_HEALING_STG2,
WATCH_DOG,
cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfPrimary,
UNKNOWN);
PST_CFG(
PRIMARY_HEALING_STG2,
STOP_NODE,
cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog,
STOPPED);
PST_CFG(PRIMARY_HEALING_STG2,
STOP_NODE,
cleanupMetadata_closeRecoveryFileSet_stopWatchDog,
STOPPED);
PST_CFG(REPLICA_HEALING,
DETECT_SELF_PRIMARY,
cleanupMetadata_clearPartitionInfo_stopWatchDog_reapplyEvent,
Expand Down Expand Up @@ -608,20 +601,18 @@ class PartitionStateTable
failureReplicaDataResponsePush_cleanupMetadata_closeRecoveryFileSet_stopWatchDog_reapplyDetectSelfReplica,
UNKNOWN);
PST_CFG(REPLICA_HEALING, LIVE_DATA, bufferLiveData, REPLICA_HEALING);
PST_CFG(
REPLICA_HEALING,
RST_UNKNOWN,
cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog,
UNKNOWN);
PST_CFG(REPLICA_HEALING,
RST_UNKNOWN,
cleanupMetadata_closeRecoveryFileSet_stopWatchDog,
UNKNOWN);
PST_CFG(REPLICA_HEALING,
WATCH_DOG,
cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfReplica,
UNKNOWN);
PST_CFG(
REPLICA_HEALING,
STOP_NODE,
cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog,
STOPPED);
PST_CFG(REPLICA_HEALING,
STOP_NODE,
cleanupMetadata_closeRecoveryFileSet_stopWatchDog,
STOPPED);
PST_CFG(REPLICA_HEALED,
DETECT_SELF_PRIMARY,
cleanupMetadata_clearPartitionInfo_reapplyEvent,
Expand All @@ -643,14 +634,8 @@ class PartitionStateTable
ISSUE_LIVESTREAM,
cleanupMetadata_reapplyDetectSelfReplica,
UNKNOWN);
PST_CFG(REPLICA_HEALED,
RST_UNKNOWN,
cleanupMetadata_clearPartitionInfo,
UNKNOWN);
PST_CFG(REPLICA_HEALED,
STOP_NODE,
cleanupMetadata_clearPartitionInfo,
STOPPED);
PST_CFG(REPLICA_HEALED, RST_UNKNOWN, cleanupMetadata, UNKNOWN);
PST_CFG(REPLICA_HEALED, STOP_NODE, cleanupMetadata, STOPPED);
PST_CFG(PRIMARY_HEALED,
DETECT_SELF_REPLICA,
unsupportedPrimaryDowngrade,
Expand All @@ -665,14 +650,8 @@ class PartitionStateTable
PRIMARY_STATE_RQST,
storeSelfSeq_storeReplicaSeq_primaryStateResponse_replicaDataRequestPush_replicaDataRequestDrop_startSendDataChunks,
PRIMARY_HEALED);
PST_CFG(PRIMARY_HEALED,
RST_UNKNOWN,
cleanupMetadata_clearPartitionInfo,
UNKNOWN);
PST_CFG(PRIMARY_HEALED,
STOP_NODE,
cleanupMetadata_clearPartitionInfo,
STOPPED);
PST_CFG(PRIMARY_HEALED, RST_UNKNOWN, cleanupMetadata, UNKNOWN);
PST_CFG(PRIMARY_HEALED, STOP_NODE, cleanupMetadata, STOPPED);

#undef PST_CFG
}
Expand Down Expand Up @@ -786,21 +765,11 @@ void PartitionStateTableActions<ARGS>::
do_reapplyEvent(args);
}

template <typename ARGS>
void PartitionStateTableActions<ARGS>::do_cleanupMetadata_clearPartitionInfo(
const ARGS& args)
{
do_cleanupMetadata(args);
do_clearPartitionInfo(args);
}

template <typename ARGS>
void PartitionStateTableActions<ARGS>::
do_cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog(
const ARGS& args)
do_cleanupMetadata_closeRecoveryFileSet_stopWatchDog(const ARGS& args)
{
do_cleanupMetadata(args);
do_clearPartitionInfo(args);
do_closeRecoveryFileSet(args);
do_stopWatchDog(args);
}
Expand Down
61 changes: 48 additions & 13 deletions src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,18 @@ void StorageManager::processShutdownEventDispatched(int partitionId)
mqbs::FileStore* fs = d_fileStores[partitionId].get();
BSLS_ASSERT_SAFE(fs);

StorageUtil::processShutdownEventDispatched(
d_clusterData_p,
&d_partitionInfoVec[partitionId],
fs,
partitionId);

StorageUtil::clearPrimaryForPartition(
fs,
&d_partitionInfoVec[partitionId],
d_clusterData_p->identity().description(),
partitionId);

EventData eventDataVec;
eventDataVec.emplace_back(d_clusterData_p->membership().selfNode(),
-1, // placeholder requestId
Expand All @@ -1157,12 +1169,6 @@ void StorageManager::processShutdownEventDispatched(int partitionId)
queueSp->emplace(PartitionFSM::Event::e_STOP_NODE, eventDataVec);

d_partitionFSMVec[partitionId]->popEventAndProcess(queueSp);

StorageUtil::processShutdownEventDispatched(
d_clusterData_p,
&d_partitionInfoVec[partitionId],
fs,
partitionId);
}

void StorageManager::forceFlushFileStores()
Expand Down Expand Up @@ -1494,6 +1500,7 @@ void StorageManager::do_storeReplicaSeq(const PartitionFSMArgsSp& args)
}
}

// TODO rm
void StorageManager::do_storePartitionInfo(const PartitionFSMArgsSp& args)
{
// executed by the *QUEUE DISPATCHER* thread associated with the paritionId
Expand Down Expand Up @@ -1532,6 +1539,7 @@ void StorageManager::do_storePartitionInfo(const PartitionFSMArgsSp& args)
}
}

// TODO rm
void StorageManager::do_clearPartitionInfo(const PartitionFSMArgsSp& args)
{
// executed by the *QUEUE DISPATCHER* thread associated with the paritionId
Expand All @@ -1547,7 +1555,10 @@ void StorageManager::do_clearPartitionInfo(const PartitionFSMArgsSp& args)

const PartitionFSMEventData& eventData = eventDataVec[0];
const int partitionId = eventData.partitionId();
mqbnet::ClusterNode* primaryNode = eventData.primary();
mqbnet::ClusterNode* primaryNode =
eventData.primary(); // TODO Verify correctness, but no need since
// this method will be deleted anyways
(void)primaryNode;
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));

Expand All @@ -1559,8 +1570,7 @@ void StorageManager::do_clearPartitionInfo(const PartitionFSMArgsSp& args)
fs,
&pinfo,
d_clusterData_p->identity().description(),
partitionId,
primaryNode);
partitionId);
}

void StorageManager::do_replicaStateRequest(const PartitionFSMArgsSp& args)
Expand Down Expand Up @@ -3911,13 +3921,23 @@ void StorageManager::stop()
d_isStarted = false;

for (size_t pid = 0; pid < d_fileStores.size(); ++pid) {
mqbs::FileStore* fs = d_fileStores[pid].get();
BSLS_ASSERT_SAFE(fs);

fs->execute(
bdlf::BindUtil::bind(&StorageUtil::clearPrimaryForPartition,
fs,
&d_partitionInfoVec[pid],
d_clusterData_p->identity().description(),
pid));

EventData eventDataVec;
eventDataVec.emplace_back(d_clusterData_p->membership().selfNode(),
-1, // placeholder requestId
pid,
1);

dispatchEventToPartition(d_fileStores[pid].get(),
dispatchEventToPartition(fs,
PartitionFSM::Event::e_STOP_NODE,
eventDataVec);
}
Expand Down Expand Up @@ -4202,6 +4222,13 @@ void StorageManager::setPrimaryForPartition(int partitionId,
partitionId < static_cast<int>(d_fileStores.size()));
BSLS_ASSERT_SAFE(primaryNode);

/** TODO Call:
PartitionInfo& pinfo = d_partitionInfoVec[partitionId];
pinfo.setPrimary(primaryNode);
pinfo.setPrimaryLeaseId(primaryLeaseId);
pinfo.setPrimaryStatus(bmqp_ctrlmsg::PrimaryStatus::E_PASSIVE);
*/

if (primaryNode->nodeId() ==
d_clusterData_p->membership().selfNode()->nodeId()) {
processPrimaryDetect(partitionId, primaryNode, primaryLeaseId);
Expand Down Expand Up @@ -4229,6 +4256,17 @@ void StorageManager::clearPrimaryForPartition(int partitionId,
<< " Partition [" << partitionId << "]: "
<< "Self Transition back to Unknown in the Partition FSM.";

mqbs::FileStore* fs = d_fileStores[partitionId].get();
BSLS_ASSERT_SAFE(fs);

fs->execute(bdlf::BindUtil::bind(&StorageUtil::clearPrimaryForPartition,
fs,
&d_partitionInfoVec[partitionId],
d_clusterData_p->identity().description(),
partitionId));

// TODO Remove `clearPartitionInfo` from FSM action

EventData eventDataVec;
eventDataVec.emplace_back(
d_clusterData_p->membership().selfNode(),
Expand All @@ -4238,9 +4276,6 @@ void StorageManager::clearPrimaryForPartition(int partitionId,
primary,
d_clusterState.partitionsInfo().at(partitionId).primaryLeaseId());

mqbs::FileStore* fs = d_fileStores[partitionId].get();
BSLS_ASSERT_SAFE(fs);

dispatchEventToPartition(fs,
PartitionFSM::Event::e_RST_UNKNOWN,
eventDataVec);
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbc/mqbc_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ class StorageManager BSLS_KEYWORD_FINAL
/// for the i-th partitionId.
bsl::vector<unsigned int> d_numReplicaDataResponsesReceivedVec;

/// Whether `d_queueKeyInfoMapVec` has been initialized.
/// Whether `d_queueKeyInfoMapVec` has been initialized. This data
/// structure only needs to be initialized once at startup, and no more.
bsls::AtomicBool d_isQueueKeyInfoMapVecInitialized;

/// Mapping from queue key to queue info indexed by partitionId, populated
Expand Down
13 changes: 4 additions & 9 deletions src/groups/mqb/mqbc/mqbc_storageutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1265,11 +1265,10 @@ int StorageUtil::assignPartitionDispatcherThreads(
}

void StorageUtil::clearPrimaryForPartition(
mqbs::FileStore* fs,
PartitionInfo* partitionInfo,
const bsl::string& clusterDescription,
int partitionId,
mqbnet::ClusterNode* primary)
mqbs::FileStore* fs,
PartitionInfo* partitionInfo,
const bsl::string& clusterDescription,
int partitionId)
{
// executed by *QUEUE_DISPATCHER* thread associated with 'partitionId'

Expand All @@ -1284,10 +1283,6 @@ void StorageUtil::clearPrimaryForPartition(
return; // RETURN
}

if (primary != partitionInfo->primary()) {
return; // RETURN
}

BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId
<< "]: processing 'clear-primary' event. Current primary: "
<< partitionInfo->primary()->nodeDescription()
Expand Down
10 changes: 4 additions & 6 deletions src/groups/mqb/mqbc/mqbc_storageutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,18 +499,16 @@ struct StorageUtil {
const bdlb::NullableValue<QueueCreationCb>& queueCreationCb =
bdlb::NullableValue<QueueCreationCb>());

/// Clear the specified `primary` of the specified `partitionId` from
/// the specified `fs` and `partitionInfo`, using the specified
/// `clusterDescription`. Behavior is undefined unless the specified
/// `partitionId` is in range and the specified `primary` is not null.
/// Clear the primary of the specified `partitionId` from the specified
/// `fs` and `partitionInfo`, using the specified clusterDescription`.
/// Behavior is undefined unless the specified `partitionId` is in range.
///
/// THREAD: Executed by the queue dispatcher thread associated with
/// 'partitionId'.
static void clearPrimaryForPartition(mqbs::FileStore* fs,
PartitionInfo* partitionInfo,
const bsl::string& clusterDescription,
int partitionId,
mqbnet::ClusterNode* primary);
int partitionId);

/// Find the minimum required disk space using the specified `config`.
static bsls::Types::Uint64
Expand Down
3 changes: 3 additions & 0 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5581,6 +5581,9 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle,
BSLS_ASSERT_SAFE(!queueKey.isNull());
BSLS_ASSERT_SAFE(d_fileSets.size() > 0);

BALL_LOG_ERROR << "xxm0 " << queueUri << " " << queueKey << " "
<< appIdKeyPairs.size() << " " << isNewQueue;

enum {
rc_SUCCESS = 0,
rc_STOPPING = -1,
Expand Down
Loading