Skip to content

Commit cb0c58d

Browse files
waldgange678098
andauthored
Refactor[mqbstat]: add PartitionStats view (#1007)
Signed-off-by: Anton Pryakhin <[email protected]> Co-authored-by: Evgenii Malygin <[email protected]>
1 parent ea30c26 commit cb0c58d

File tree

8 files changed

+359
-363
lines changed

8 files changed

+359
-363
lines changed

src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -895,19 +895,21 @@ struct TestHelper {
895895
bmqtst::TestHelperUtil::allocator());
896896
threadPool.start();
897897

898-
mqbs::FileStore fs(dsCfg,
899-
1,
900-
d_cluster_mp->dispatcher(),
901-
&d_cluster_mp->netCluster(),
902-
&d_cluster_mp->_clusterData()->stats(),
903-
&d_cluster_mp->_clusterData()->blobSpPool(),
904-
&d_cluster_mp->_clusterData()->stateSpPool(),
905-
&threadPool,
906-
d_cluster_mp->isCSLModeEnabled(),
907-
d_cluster_mp->isFSMWorkflow(),
908-
d_cluster_mp->doesFSMwriteQLIST(),
909-
1, // replicationFactor
910-
bmqtst::TestHelperUtil::allocator());
898+
mqbs::FileStore fs(
899+
dsCfg,
900+
1,
901+
d_cluster_mp->dispatcher(),
902+
&d_cluster_mp->netCluster(),
903+
d_cluster_mp->_clusterData()->stats().getPartitionStats(
904+
k_PARTITION_ID),
905+
&d_cluster_mp->_clusterData()->blobSpPool(),
906+
&d_cluster_mp->_clusterData()->stateSpPool(),
907+
&threadPool,
908+
d_cluster_mp->isCSLModeEnabled(),
909+
d_cluster_mp->isFSMWorkflow(),
910+
d_cluster_mp->doesFSMwriteQLIST(),
911+
1, // replicationFactor
912+
bmqtst::TestHelperUtil::allocator());
911913

912914
dynamic_cast<mqbnet::MockCluster&>(d_cluster_mp->netCluster())
913915
._setDisableBroadcast(true);

src/groups/mqb/mqbc/mqbc_storageutil.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,20 +1242,20 @@ int StorageUtil::assignPartitionDispatcherThreads(
12421242
bsl::string("Partition") + bsl::to_string(i));
12431243

12441244
bsl::shared_ptr<mqbs::FileStore> fsSp(
1245-
new (*fileStoreAllocator)
1246-
mqbs::FileStore(dsCfg,
1247-
processorId,
1248-
dispatcher,
1249-
clusterData->membership().netCluster(),
1250-
&clusterData->stats(),
1251-
blobSpPool,
1252-
&clusterData->stateSpPool(),
1253-
threadPool,
1254-
cluster.isCSLModeEnabled(),
1255-
cluster.isFSMWorkflow(),
1256-
cluster.doesFSMwriteQLIST(),
1257-
replicationFactor,
1258-
fileStoreAllocator),
1245+
new (*fileStoreAllocator) mqbs::FileStore(
1246+
dsCfg,
1247+
processorId,
1248+
dispatcher,
1249+
clusterData->membership().netCluster(),
1250+
clusterData->stats().getPartitionStats(dsCfg.partitionId()),
1251+
blobSpPool,
1252+
&clusterData->stateSpPool(),
1253+
threadPool,
1254+
cluster.isCSLModeEnabled(),
1255+
cluster.isFSMWorkflow(),
1256+
cluster.doesFSMwriteQLIST(),
1257+
replicationFactor,
1258+
fileStoreAllocator),
12591259
fileStoreAllocator);
12601260

12611261
(*fileStores)[i] = fsSp;

src/groups/mqb/mqbs/mqbs_filestore.cpp

Lines changed: 32 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2989,10 +2989,7 @@ int FileStore::rollover(bsls::Types::Uint64 timestamp)
29892989
statRecorder.print(BALL_LOG_OUTPUT_STREAM, "ROLLOVER COMPLETE");
29902990
}
29912991

2992-
d_clusterStats_p->onPartitionEvent(
2993-
mqbstat::ClusterStats::PartitionEventType::e_PARTITION_ROLLOVER,
2994-
d_config.partitionId(),
2995-
statRecorder.totalElapsed());
2992+
d_partitionStats_sp->setRoloverTime(statRecorder.totalElapsed());
29962993

29972994
return 0;
29982995
}
@@ -4003,12 +4000,11 @@ int FileStore::issueSyncPointInternal(SyncPointType::Enum type,
40034000
immediateFlush);
40044001

40054002
// Report cluster's partition stats
4006-
d_clusterStats_p->setPartitionBytes(d_config.partitionId(),
4007-
fs->d_outstandingBytesData,
4008-
fs->d_outstandingBytesJournal,
4009-
fs->d_dataFilePosition,
4010-
fs->d_journalFilePosition,
4011-
d_sequenceNum);
4003+
d_partitionStats_sp->setPartitionBytes(fs->d_outstandingBytesData,
4004+
fs->d_outstandingBytesJournal,
4005+
fs->d_dataFilePosition,
4006+
fs->d_journalFilePosition,
4007+
d_sequenceNum);
40124008

40134009
return rc_SUCCESS;
40144010
}
@@ -4084,11 +4080,7 @@ void FileStore::processReceiptEvent(unsigned int primaryLeaseId,
40844080
const bsls::Types::Int64 timeDelta =
40854081
bmqsys::Time::highResolutionTimer() -
40864082
from->second.d_handle->second.d_arrivalTimepoint;
4087-
d_clusterStats_p->onPartitionEvent(
4088-
mqbstat::ClusterStats::PartitionEventType::
4089-
e_PARTITION_REPLICATION,
4090-
d_config.partitionId(),
4091-
timeDelta);
4083+
d_partitionStats_sp->setReplicationTime(timeDelta);
40924084

40934085
// notify the queue
40944086
const mqbu::StorageKey& queueKey = from->second.d_queueKey;
@@ -5106,26 +5098,27 @@ void FileStore::flushIfNeeded(bool immediateFlush)
51065098
}
51075099

51085100
// CREATORS
5109-
FileStore::FileStore(const DataStoreConfig& config,
5110-
int processorId,
5111-
mqbi::Dispatcher* dispatcher,
5112-
mqbnet::Cluster* cluster,
5113-
mqbstat::ClusterStats* clusterStats,
5114-
BlobSpPool* blobSpPool,
5115-
StateSpPool* statePool,
5116-
bdlmt::FixedThreadPool* miscWorkThreadPool,
5117-
bool isCSLModeEnabled,
5118-
bool isFSMWorkflow,
5119-
bool doesFSMwriteQLIST,
5120-
int replicationFactor,
5121-
bslma::Allocator* allocator)
5101+
FileStore::FileStore(
5102+
const DataStoreConfig& config,
5103+
int processorId,
5104+
mqbi::Dispatcher* dispatcher,
5105+
mqbnet::Cluster* cluster,
5106+
const bsl::shared_ptr<mqbstat::PartitionStats>& partitionStats,
5107+
BlobSpPool* blobSpPool,
5108+
StateSpPool* statePool,
5109+
bdlmt::FixedThreadPool* miscWorkThreadPool,
5110+
bool isCSLModeEnabled,
5111+
bool isFSMWorkflow,
5112+
bool doesFSMwriteQLIST,
5113+
int replicationFactor,
5114+
bslma::Allocator* allocator)
51225115
: d_allocator_p(allocator)
51235116
, d_allocators(allocator)
51245117
, d_storageAllocatorStore(d_allocators.get("QueueStorage"))
51255118
, d_config(config)
51265119
, d_partitionDescription(allocator)
51275120
, d_dispatcherClientData()
5128-
, d_clusterStats_p(clusterStats)
5121+
, d_partitionStats_sp(partitionStats)
51295122
, d_blobSpPool_p(blobSpPool)
51305123
, d_statePool_p(statePool)
51315124
, d_aliasedBufferDeleterSpPool(1024, d_allocators.get("AliasedBufferDeleters"))
@@ -5249,12 +5242,11 @@ int FileStore::open(const QueueKeyInfoMap& queueKeyInfoMap)
52495242

52505243
// Report cluster's partition stats
52515244
const FileSet* fs = d_fileSets[0].get();
5252-
d_clusterStats_p->setPartitionBytes(d_config.partitionId(),
5253-
fs->d_outstandingBytesData,
5254-
fs->d_outstandingBytesJournal,
5255-
fs->d_dataFilePosition,
5256-
fs->d_journalFilePosition,
5257-
d_sequenceNum);
5245+
d_partitionStats_sp->setPartitionBytes(fs->d_outstandingBytesData,
5246+
fs->d_outstandingBytesJournal,
5247+
fs->d_dataFilePosition,
5248+
fs->d_journalFilePosition,
5249+
d_sequenceNum);
52585250

52595251
return rc_SUCCESS;
52605252
}
@@ -6635,9 +6627,8 @@ void FileStore::setActivePrimary(mqbnet::ClusterNode* primaryNode,
66356627

66366628
if (primaryNode->nodeId() != d_config.nodeId()) {
66376629
d_isPrimary = false;
6638-
d_clusterStats_p->setNodeRoleForPartition(
6639-
d_config.partitionId(),
6640-
mqbstat::ClusterStats::PrimaryStatus::e_REPLICA);
6630+
d_partitionStats_sp->setNodeRole(
6631+
mqbstat::PartitionStats::PrimaryStatus::e_REPLICA);
66416632
d_config.scheduler()->cancelEvent(&d_syncPointEventHandle);
66426633
d_config.scheduler()->cancelEvent(
66436634
&d_partitionHighwatermarkEventHandle);
@@ -6659,9 +6650,8 @@ void FileStore::setActivePrimary(mqbnet::ClusterNode* primaryNode,
66596650
}
66606651

66616652
d_isPrimary = true;
6662-
d_clusterStats_p->setNodeRoleForPartition(
6663-
d_config.partitionId(),
6664-
mqbstat::ClusterStats::PrimaryStatus::e_PRIMARY);
6653+
d_partitionStats_sp->setNodeRole(
6654+
mqbstat::PartitionStats::PrimaryStatus::e_PRIMARY);
66656655

66666656
for (StorageMapIter sIt = d_storages.begin(); sIt != d_storages.end();
66676657
++sIt) {
@@ -7243,11 +7233,7 @@ void FileStore::setReplicationFactor(int value)
72437233
const bsls::Types::Int64 timeDelta =
72447234
bmqsys::Time::highResolutionTimer() -
72457235
it->second.d_handle->second.d_arrivalTimepoint;
7246-
d_clusterStats_p->onPartitionEvent(
7247-
mqbstat::ClusterStats::PartitionEventType::
7248-
e_PARTITION_REPLICATION,
7249-
d_config.partitionId(),
7250-
timeDelta);
7236+
d_partitionStats_sp->setReplicationTime(timeDelta);
72517237

72527238
// notify the queue.
72537239
const mqbu::StorageKey& queueKey = it->second.d_queueKey;

src/groups/mqb/mqbs/mqbs_filestore.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ namespace mqbi {
9595
class Domain;
9696
}
9797
namespace mqbstat {
98-
class ClusterStats;
98+
class PartitionStats;
9999
}
100100

101101
namespace mqbs {
@@ -285,9 +285,9 @@ class FileStore BSLS_KEYWORD_FINAL : public DataStore {
285285
// Dispatcher client data associated
286286
// with this instance.
287287

288-
mqbstat::ClusterStats* d_clusterStats_p;
288+
bsl::shared_ptr<mqbstat::PartitionStats> d_partitionStats_sp;
289289
// Stat object associated to the
290-
// Cluster this FileStore belongs to,
290+
// Partition this FileStore belongs to,
291291
// used to report partition level
292292
// metrics.
293293

@@ -708,13 +708,13 @@ class FileStore BSLS_KEYWORD_FINAL : public DataStore {
708708
/// `dispatcher`, `cluster`, `clusterStats`, `blobSpPool`,
709709
/// `miscWorkThreadPool`, `isCSLModeEnabled`, `isFSMWorkflow` and
710710
/// `allocator`.
711-
FileStore(const DataStoreConfig& config,
712-
int processorId,
713-
mqbi::Dispatcher* dispatcher,
714-
mqbnet::Cluster* cluster,
715-
mqbstat::ClusterStats* clusterStats,
716-
BlobSpPool* blobSpPool,
717-
StateSpPool* statePool,
711+
FileStore(const DataStoreConfig& config,
712+
int processorId,
713+
mqbi::Dispatcher* dispatcher,
714+
mqbnet::Cluster* cluster,
715+
const bsl::shared_ptr<mqbstat::PartitionStats>& partitionStats,
716+
BlobSpPool* blobSpPool,
717+
StateSpPool* statePool,
718718
bdlmt::FixedThreadPool* miscWorkThreadPool,
719719
bool isCSLModeEnabled,
720720
bool isFSMWorkflow,

src/groups/mqb/mqbs/mqbs_filestore.t.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ struct Tester {
223223
0, // processorId
224224
&d_dispatcher,
225225
d_cluster_mp.get(),
226-
&d_clusterStats,
226+
d_clusterStats.getPartitionStats(
227+
d_dsCfg.partitionId()),
227228
d_blobSpPool_sp.get(),
228229
&d_statePool,
229230
&d_miscWorkThreadPool,

0 commit comments

Comments
 (0)