Skip to content

Commit 3ec7eb4

Browse files
authored
Feat[STATS]: Partition replication time (#909)
Signed-off-by: Anton Pryakhin <[email protected]>
1 parent 581abb8 commit 3ec7eb4

File tree

14 files changed

+113
-87
lines changed

14 files changed

+113
-87
lines changed

src/groups/mqb/mqbblp/mqbblp_localqueue.cpp

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -484,24 +484,16 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
484484

485485
// Send acknowledgement if post failed or if ack was requested (both could
486486
// be true as well).
487-
if (res != mqbi::StorageResult::e_SUCCESS || haveReceipt) {
488-
// Calculate time delta between PUT and ACK
489-
const bsls::Types::Int64 timeDelta =
490-
bmqsys::Time::highResolutionTimer() - timePoint;
491-
d_state_p->stats()
492-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(
493-
timeDelta);
494-
if (res != mqbi::StorageResult::e_SUCCESS || doAck) {
495-
bmqp::AckMessage ackMessage;
496-
ackMessage
497-
.setStatus(bmqp::ProtocolUtil::ackResultToCode(
498-
mqbi::StorageResult::toAckResult(res)))
499-
.setMessageGUID(putHeader.messageGUID());
500-
// CorrelationId & QueueId are left unset as those fields will
501-
// be filled downstream.
502-
503-
source->onAckMessage(ackMessage);
504-
}
487+
if (res != mqbi::StorageResult::e_SUCCESS || (haveReceipt && doAck)) {
488+
bmqp::AckMessage ackMessage;
489+
ackMessage
490+
.setStatus(bmqp::ProtocolUtil::ackResultToCode(
491+
mqbi::StorageResult::toAckResult(res)))
492+
.setMessageGUID(putHeader.messageGUID());
493+
// CorrelationId & QueueId are left unset as those fields will
494+
// be filled downstream.
495+
496+
source->onAckMessage(ackMessage);
505497
}
506498

507499
if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY(res ==
@@ -547,17 +539,9 @@ void LocalQueue::onPushMessage(
547539
"onPushMessage should not be called on LocalQueue");
548540
}
549541

550-
void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
551-
mqbi::QueueHandle* qH,
552-
const bsls::Types::Int64& arrivalTimepoint)
542+
void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
543+
mqbi::QueueHandle* qH)
553544
{
554-
// Calculate time delta between PUT and ACK
555-
const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() -
556-
arrivalTimepoint;
557-
558-
d_state_p->stats()
559-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(timeDelta);
560-
561545
if (d_state_p->handleCatalog().hasHandle(qH)) {
562546
// Send acknowledgement
563547
bmqp::AckMessage ackMessage;

src/groups/mqb/mqbblp/mqbblp_localqueue.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,13 +180,10 @@ class LocalQueue BSLS_CPP11_FINAL {
180180

181181
/// Invoked by the Data Store when it receives quorum Receipts for the
182182
/// specified `msgGUID`. Send ACK to the specified `qH` if it is
183-
/// present in the queue handle catalog. Update ACK time stats using
184-
/// the specified `arrivalTimepoint`.
183+
/// present in the queue handle catalog.
185184
///
186185
/// THREAD: This method is called from the Storage dispatcher thread.
187-
void onReceipt(const bmqt::MessageGUID& msgGUID,
188-
mqbi::QueueHandle* qH,
189-
const bsls::Types::Int64& arrivalTimepoint);
186+
void onReceipt(const bmqt::MessageGUID& msgGUID, mqbi::QueueHandle* qH);
190187

191188
/// Invoked by the Data Store when it removes (times out waiting for
192189
/// quorum Receipts for) a message with the specified `msgGUID`. Send

src/groups/mqb/mqbblp/mqbblp_queue.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -593,13 +593,12 @@ void Queue::onOpenUpstream(bsls::Types::Uint64 genCount,
593593
}
594594
}
595595

596-
void Queue::onReceipt(const bmqt::MessageGUID& msgGUID,
597-
mqbi::QueueHandle* queueHandle,
598-
const bsls::Types::Int64& arrivalTimepoint)
596+
void Queue::onReceipt(const bmqt::MessageGUID& msgGUID,
597+
mqbi::QueueHandle* queueHandle)
599598
{
600599
BSLS_ASSERT_SAFE(d_localQueue_mp);
601600

602-
d_localQueue_mp->onReceipt(msgGUID, queueHandle, arrivalTimepoint);
601+
d_localQueue_mp->onReceipt(msgGUID, queueHandle);
603602
}
604603

605604
void Queue::onRemoval(const bmqt::MessageGUID& msgGUID,

src/groups/mqb/mqbblp/mqbblp_queue.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -328,14 +328,11 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue {
328328

329329
/// Invoked by the Data Store when it receives quorum Receipts for the
330330
/// specified `msgGUID`. Send ACK to the specified `queueHandle` if it
331-
/// is present in the queue handle catalog. Update AVK time stats using
332-
/// the specified `arrivalTimepoint`.
331+
/// is present in the queue handle catalog.
333332
///
334333
/// THREAD: This method is called from the Storage dispatcher thread.
335-
void onReceipt(const bmqt::MessageGUID& msgGUID,
336-
mqbi::QueueHandle* queueHandle,
337-
const bsls::Types::Int64& arrivalTimepoint)
338-
BSLS_KEYWORD_OVERRIDE;
334+
void onReceipt(const bmqt::MessageGUID& msgGUID,
335+
mqbi::QueueHandle* queueHandle) BSLS_KEYWORD_OVERRIDE;
339336

340337
/// Invoked by the Data Store when it removes (times out waiting for
341338
/// quorum Receipts for) a message with the specified `msgGUID`. Send

src/groups/mqb/mqbi/mqbi_cluster.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,8 @@ class Cluster : public DispatcherClient {
288288
/// used by this cluster.
289289
virtual RequestManagerType& requestManager() = 0;
290290

291-
// Return a reference offering a modifiable access to the multi request
292-
// manager used by this cluster.
291+
/// Return a reference offering a modifiable access to the multi request
292+
/// manager used by this cluster.
293293
virtual MultiRequestManagerType& multiRequestManager() = 0;
294294

295295
/// Send the specified `request` with the specified `timeout` to the

src/groups/mqb/mqbi/mqbi_queue.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -864,13 +864,11 @@ class Queue : public DispatcherClient {
864864

865865
/// Invoked by the Data Store when it receives quorum Receipts for the
866866
/// specified `msgGUID`. Send ACK to the specified `queueHandle` if it
867-
/// is present in the queue handle catalog. Update AVK time stats using
868-
/// the specified `arrivalTimepoint`.
867+
/// is present in the queue handle catalog.
869868
///
870869
/// THREAD: This method is called from the Queue's dispatcher thread.
871-
virtual void onReceipt(const bmqt::MessageGUID& msgGUID,
872-
mqbi::QueueHandle* queueHandle,
873-
const bsls::Types::Int64& arrivalTimepoint) = 0;
870+
virtual void onReceipt(const bmqt::MessageGUID& msgGUID,
871+
mqbi::QueueHandle* queueHandle) = 0;
874872

875873
/// Invoked by the Data Store when it removes (times out waiting for
876874
/// quorum Receipts for) a message with the specified `msgGUID`. Send

src/groups/mqb/mqbmock/mqbmock_queue.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,7 @@ void Queue::onOpenFailure(BSLA_UNUSED unsigned int subQueueId)
363363
}
364364

365365
void Queue::onReceipt(BSLA_UNUSED const bmqt::MessageGUID& msgGUID,
366-
BSLA_UNUSED mqbi::QueueHandle* qH,
367-
BSLA_UNUSED const bsls::Types::Int64& arrivalTimepoint)
366+
BSLA_UNUSED mqbi::QueueHandle* qH)
368367
{
369368
// NOTHING
370369
}

src/groups/mqb/mqbmock/mqbmock_queue.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -319,10 +319,8 @@ class Queue : public mqbi::Queue {
319319
/// Invoked by the Data Store when it receives quorum Receipts.
320320
///
321321
/// THREAD: This method is called from the Queue's dispatcher thread.
322-
void onReceipt(const bmqt::MessageGUID& msgGUID,
323-
mqbi::QueueHandle* qH,
324-
const bsls::Types::Int64& arrivalTimepoint)
325-
BSLS_KEYWORD_OVERRIDE;
322+
void onReceipt(const bmqt::MessageGUID& msgGUID,
323+
mqbi::QueueHandle* qH) BSLS_KEYWORD_OVERRIDE;
326324

327325
/// Invoked by the Data Store when it removes (times out waiting for
328326
/// quorum Receipts for) a message with the specified `msgGUID`. Send

src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,6 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes,
374374
queue()
375375
->stats()
376376
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE>(
377-
378377
msgSize);
379378

380379
d_isEmpty.storeRelaxed(0);

src/groups/mqb/mqbs/mqbs_filestore.cpp

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4078,8 +4078,19 @@ void FileStore::processReceiptEvent(unsigned int primaryLeaseId,
40784078
}
40794079
if (++(from->second.d_count) >= d_replicationFactor) {
40804080
from->second.d_handle->second.d_hasReceipt = true;
4081-
// notify the queue
40824081

4082+
// Calculate time it took for the message to be stored and
4083+
// replicated.
4084+
const bsls::Types::Int64 timeDelta =
4085+
bmqsys::Time::highResolutionTimer() -
4086+
from->second.d_handle->second.d_arrivalTimepoint;
4087+
d_clusterStats_p->onPartitionEvent(
4088+
mqbstat::ClusterStats::PartitionEventType::
4089+
e_PARTITION_REPLICATION,
4090+
d_config.partitionId(),
4091+
timeDelta);
4092+
4093+
// notify the queue
40834094
const mqbu::StorageKey& queueKey = from->second.d_queueKey;
40844095
bool haveQueue = (queueKey == lastKey);
40854096
if (!haveQueue) {
@@ -4089,16 +4100,12 @@ void FileStore::processReceiptEvent(unsigned int primaryLeaseId,
40894100
lastKey = queueKey;
40904101
lastQueue = sit->second->queue();
40914102
BSLS_ASSERT_SAFE(lastQueue);
4092-
40934103
affectedQueues.insert(lastQueue);
40944104
}
40954105
// else the queue and its storage are gone; ignore the receipt
40964106
}
40974107
if (haveQueue) {
4098-
lastQueue->onReceipt(
4099-
from->second.d_guid,
4100-
from->second.d_qH,
4101-
from->second.d_handle->second.d_arrivalTimepoint);
4108+
lastQueue->onReceipt(from->second.d_guid, from->second.d_qH);
41024109
} // else the queue is gone
41034110
from = d_unreceipted.erase(from);
41044111
}
@@ -7276,8 +7283,19 @@ void FileStore::setReplicationFactor(int value)
72767283
while (it != d_unreceipted.end()) {
72777284
if (it->second.d_count >= d_replicationFactor) {
72787285
it->second.d_handle->second.d_hasReceipt = true;
7279-
// notify the queue.
72807286

7287+
// Calculate time it took for the message to be stored and
7288+
// replicated.
7289+
const bsls::Types::Int64 timeDelta =
7290+
bmqsys::Time::highResolutionTimer() -
7291+
it->second.d_handle->second.d_arrivalTimepoint;
7292+
d_clusterStats_p->onPartitionEvent(
7293+
mqbstat::ClusterStats::PartitionEventType::
7294+
e_PARTITION_REPLICATION,
7295+
d_config.partitionId(),
7296+
timeDelta);
7297+
7298+
// notify the queue.
72817299
const mqbu::StorageKey& queueKey = it->second.d_queueKey;
72827300
bool haveQueue = (queueKey == lastKey);
72837301
if (!haveQueue) {
@@ -7287,16 +7305,12 @@ void FileStore::setReplicationFactor(int value)
72877305
lastKey = queueKey;
72887306
lastQueue = sit->second->queue();
72897307
BSLS_ASSERT_SAFE(lastQueue);
7290-
72917308
affectedQueues.insert(lastQueue);
72927309
}
72937310
// else the queue and its storage are gone; ignore the receipt
72947311
}
72957312
if (haveQueue) {
7296-
lastQueue->onReceipt(
7297-
it->second.d_guid,
7298-
it->second.d_qH,
7299-
it->second.d_handle->second.d_arrivalTimepoint);
7313+
lastQueue->onReceipt(it->second.d_guid, it->second.d_qH);
73007314
} // else the queue is gone
73017315
it = d_unreceipted.erase(it);
73027316
}

0 commit comments

Comments
 (0)