Skip to content

Commit 4d31e07

Browse files
authored
Feat[STATS]: Report csl replication time (#901)
Signed-off-by: Anton Pryakhin <[email protected]>
1 parent f12e8c9 commit 4d31e07

File tree

5 files changed

+57
-4
lines changed

5 files changed

+57
-4
lines changed

src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,10 @@ int IncoreClusterStateLedger::applyRecordInternalImpl(
490490
ClusterMessageInfo info;
491491
info.d_clusterMessage = clusterMessage;
492492
info.d_ackCount = 0;
493-
d_uncommittedAdvisories.insert(bsl::make_pair(sequenceNumber, info));
493+
const AdvisoriesMap::iterator advIt =
494+
d_uncommittedAdvisories
495+
.insert(bsl::make_pair(sequenceNumber, info))
496+
.first;
494497

495498
if (isSelfLeader()) {
496499
// TBD: How/if to handle timeout for this message?
@@ -508,6 +511,10 @@ int IncoreClusterStateLedger::applyRecordInternalImpl(
508511
// record offset should be 0.
509512
BSLS_ASSERT_SAFE(recordOffset == 0);
510513

514+
// Save the replication start time
515+
advIt->second.d_timestampNs =
516+
bdlt::CurrentTime::now().totalNanoseconds();
517+
511518
bsl::shared_ptr<bdlbb::Blob> advisoryEvent =
512519
d_blobSpPool_p->getObject();
513520

@@ -622,6 +629,11 @@ int IncoreClusterStateLedger::applyRecordInternalImpl(
622629
}
623630

624631
if (isSelfLeader()) {
632+
bsls::Types::Int64 replicationTimeNs =
633+
bdlt::CurrentTime::now().totalNanoseconds() -
634+
iter->second.d_timestampNs;
635+
d_clusterData_p->stats().setCslReplicationTime(replicationTimeNs);
636+
625637
bsl::shared_ptr<bdlbb::Blob> commitEvent =
626638
d_blobSpPool_p->getObject();
627639

@@ -644,12 +656,11 @@ int IncoreClusterStateLedger::applyRecordInternalImpl(
644656
// the commit callback should be enqueued to be invoked from the
645657
// cluster dispatcher thread.
646658
// TODO: In phase 2 of IncoreCSL, this can return to enqueueing on
647-
// the
648-
// cluster dispatcher thread
659+
// the cluster dispatcher thread.
649660
d_commitCb(committedControlMessage,
650661
ClusterStateLedgerCommitStatus::e_SUCCESS);
651662

652-
d_uncommittedAdvisories.erase(commit.sequenceNumberCommitted());
663+
d_uncommittedAdvisories.erase(iter);
653664
} break; // BREAK
654665
case (ClusterStateRecordType::e_ACK): {
655666
// PRECONDITIONS

src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ struct IncoreClusterStateLedger_ClusterMessageInfo {
9090
/// - `QueueUnAssignmentAdvisory`.
9191
bmqp_ctrlmsg::ClusterMessage d_clusterMessage;
9292

93+
/// Timestamp in nanoseconds at the moment when the replication of this
94+
/// `ClusterMessage` started.
95+
bsls::Types::Uint64 d_timestampNs;
96+
9397
/// Number of ACKs received for this `ClusterMessage`.
9498
int d_ackCount;
9599

@@ -418,6 +422,7 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger {
418422
inline IncoreClusterStateLedger_ClusterMessageInfo ::
419423
IncoreClusterStateLedger_ClusterMessageInfo(bslma::Allocator* allocator)
420424
: d_clusterMessage(bslma::Default::allocator(allocator))
425+
, d_timestampNs(0)
421426
, d_ackCount(0)
422427
{
423428
// NOTHING
@@ -428,6 +433,7 @@ inline IncoreClusterStateLedger_ClusterMessageInfo ::
428433
const IncoreClusterStateLedger_ClusterMessageInfo& other,
429434
bslma::Allocator* allocator)
430435
: d_clusterMessage(other.d_clusterMessage, allocator)
436+
, d_timestampNs(other.d_timestampNs)
431437
, d_ackCount(other.d_ackCount)
432438
{
433439
// NOTHING

src/groups/mqb/mqbstat/mqbstat_clusterstats.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ struct ClusterStatsIndex {
6868
// Value: Leader status of cluster, non-zero (1) implies leader
6969
// and 0 implies follower
7070
,
71+
e_CSL_REPLICATION_TIME_NS
72+
// Value: time in nanoseconds it took for replication of a new entry
73+
// in CSL file.
74+
,
7175
e_PARTITION_CFG_DATA_BYTES
7276
// Value: Configured size of partitions' data file
7377
,
@@ -150,6 +154,18 @@ bsls::Types::Int64 ClusterStats::getValue(const bmqst::StatContext& context,
150154

151155
return STAT_RANGE(rangeMax, e_LEADER_STATUS);
152156
}
157+
case Stat::e_CSL_REPLICATION_TIME_NS_AVG: {
158+
const bsls::Types::Int64 value = STAT_RANGE(averagePerEvent,
159+
e_CSL_REPLICATION_TIME_NS);
160+
return value == bsl::numeric_limits<bsls::Types::Int64>::max() ? 0
161+
: value;
162+
}
163+
case Stat::e_CSL_REPLICATION_TIME_NS_MAX: {
164+
const bsls::Types::Int64 value = STAT_RANGE(rangeMax,
165+
e_CSL_REPLICATION_TIME_NS);
166+
return value == bsl::numeric_limits<bsls::Types::Int64>::min() ? 0
167+
: value;
168+
}
153169
case Stat::e_PARTITION_CFG_JOURNAL_BYTES: {
154170
return STAT_SINGLE(value, e_PARTITION_CFG_JOURNAL_BYTES);
155171
}
@@ -335,6 +351,13 @@ ClusterStats& ClusterStats::setIsLeader(LeaderStatus::Enum value)
335351
return *this;
336352
}
337353

354+
ClusterStats& ClusterStats::setCslReplicationTime(bsls::Types::Int64 value)
355+
{
356+
d_statContext_mp->reportValue(ClusterStatsIndex::e_CSL_REPLICATION_TIME_NS,
357+
value);
358+
return *this;
359+
}
360+
338361
ClusterStats&
339362
ClusterStats::setPartitionCfgBytes(bsls::Types::Int64 dataBytes,
340363
bsls::Types::Int64 journalBytes)
@@ -542,6 +565,7 @@ ClusterStatsUtil::initializeStatContextCluster(int historySize,
542565
.storeExpiredSubcontextValues(true)
543566
.value("cluster_status")
544567
.value("cluster_leader")
568+
.value("cluster_csl_replication_time_ns", bmqst::StatValue::e_DISCRETE)
545569
.value("cluster.partition.cfg_journal_bytes")
546570
.value("cluster.partition.cfg_data_bytes")
547571
.value("partition_status")

src/groups/mqb/mqbstat/mqbstat_clusterstats.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class ClusterStats {
8181
e_CLUSTER_STATUS,
8282
e_ROLE,
8383
e_LEADER_STATUS,
84+
e_CSL_REPLICATION_TIME_NS_AVG,
85+
e_CSL_REPLICATION_TIME_NS_MAX,
8486
e_PARTITION_CFG_DATA_BYTES,
8587
e_PARTITION_CFG_JOURNAL_BYTES,
8688

@@ -250,6 +252,10 @@ class ClusterStats {
250252
ClusterStats& setNodeRoleForPartition(int partitionId,
251253
PrimaryStatus::Enum value);
252254

255+
/// Set the csl replication time of the StatContext being referred
256+
/// to by this object to be the specified `value`.
257+
ClusterStats& setCslReplicationTime(bsls::Types::Int64 value);
258+
253259
/// Set the partition outstanding bytes of the specified data and
254260
/// journal files for the specified `partitionId` to the corresponding
255261
/// specified `dataBytes` and `journalBytes` values.

src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,12 @@ void PrometheusStatConsumer::captureClusterStats(const LeaderSet& leaders)
680680
{"cluster_partition_cfg_data_bytes",
681681
Stat::e_PARTITION_CFG_DATA_BYTES,
682682
false},
683+
{"cluster_csl_replication_time_ns_avg",
684+
Stat::e_CSL_REPLICATION_TIME_NS_AVG,
685+
false},
686+
{"cluster_csl_replication_time_ns_max",
687+
Stat::e_CSL_REPLICATION_TIME_NS_MAX,
688+
false},
683689
};
684690

685691
Tagger tagger;

0 commit comments

Comments
 (0)