Skip to content

Commit 19e4a3d

Browse files
committed
Inline Put, Push, Ack, Confirm
Signed-off-by: dorjesinpo <[email protected]>
1 parent 546fd67 commit 19e4a3d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1874
-1653
lines changed

src/groups/mqb/mqba/mqba_clientsession.cpp

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -345,16 +345,16 @@ struct BuildAckOverflowFunctor {
345345
// -------------------------
346346

347347
ClientSessionState::ClientSessionState(
348-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
349-
BlobSpPool* blobSpPool,
350-
bdlbb::BlobBufferFactory* bufferFactory,
351-
bmqp::EncodingType::Enum encodingType,
352-
bslma::Allocator* allocator)
348+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
349+
BlobSpPool* blobSpPool,
350+
bdlbb::BlobBufferFactory* bufferFactory,
351+
bmqp::EncodingType::Enum encodingType,
352+
bslma::Allocator* allocator)
353353
: d_allocator_p(allocator)
354354
, d_channelBufferQueue(allocator)
355355
, d_unackedMessageInfos(d_allocator_p)
356356
, d_dispatcherClientData()
357-
, d_statContext_mp(clientStatContext)
357+
, d_statContext_sp(clientStatContext)
358358
, d_bufferFactory_p(bufferFactory)
359359
, d_blobSpPool_p(blobSpPool)
360360
, d_schemaEventBuilder(blobSpPool, encodingType, allocator)
@@ -662,7 +662,7 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,
662662
queueStats = invalidQueueStats();
663663
}
664664
else {
665-
queueStats = subQueueCiter->value().d_stats.get();
665+
queueStats = subQueueCiter->value().d_stats_sp.get();
666666
}
667667
}
668668

@@ -1906,23 +1906,10 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
19061906
return false; // RETURN
19071907
}
19081908

1909-
StreamsMap::iterator subQueueIt =
1910-
queueIt->second.d_subQueueInfosMap.findBySubIdSafe(queueId.subId());
1911-
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
1912-
subQueueIt == queueIt->second.d_subQueueInfosMap.end())) {
1913-
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
1914-
1915-
if (eventType == bmqp::EventType::e_CONFIRM) {
1916-
// Update invalid queue stats
1917-
invalidQueueStats()->onEvent(
1918-
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1919-
1);
1920-
}
1921-
1922-
*errorStream << "for an unknown subQueueId";
1923-
1924-
return false; // RETURN
1925-
}
1909+
// Do not lookup 'queueId.subId()'.
1910+
// 'QueueHandle::confirmMessageDispatched' does the check.
1911+
// Note, that it does not update stats (on "bmq://invalid/queue").
1912+
// It does log warnings.
19261913

19271914
*queueHandle = queueIt->second.d_handle_p;
19281915
BSLS_ASSERT_SAFE(queueHandle);
@@ -1968,13 +1955,6 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
19681955
return false; // RETURN
19691956
}
19701957

1971-
if (eventType == bmqp::EventType::e_CONFIRM) {
1972-
// Update stats for the queue (or subStream of the queue)
1973-
subQueueIt->value().d_stats->onEvent(
1974-
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1975-
1);
1976-
}
1977-
19781958
return true;
19791959
}
19801960

@@ -2116,7 +2096,7 @@ void ClientSession::onPushEvent(const mqbi::DispatcherPushEvent& event)
21162096
blob->length());
21172097
}
21182098
else {
2119-
subQueueCiter->value().d_stats->onEvent(
2099+
subQueueCiter->value().onEvent(
21202100
mqbstat::QueueStatsClient::EventType::e_PUSH,
21212101
blob->length());
21222102
}
@@ -2267,9 +2247,8 @@ void ClientSession::onPutEvent(const mqbi::DispatcherPutEvent& event)
22672247
BSLS_ASSERT_SAFE(queueStatePtr && subQueueInfoPtr);
22682248
BSLS_ASSERT_SAFE(queueStatePtr->d_handle_p);
22692249

2270-
subQueueInfoPtr->d_stats->onEvent(
2271-
mqbstat::QueueStatsClient::EventType::e_PUT,
2272-
appDataSp->length());
2250+
subQueueInfoPtr->onEvent(mqbstat::QueueStatsClient::EventType::e_PUT,
2251+
appDataSp->length());
22732252

22742253
const bool isAtMostOnce =
22752254
queueStatePtr->d_handle_p->queue()->isAtMostOnce();
@@ -2436,7 +2415,7 @@ mqbstat::QueueStatsClient* ClientSession::invalidQueueStats()
24362415
d_state.d_invalidQueueStats.makeValue();
24372416
d_state.d_invalidQueueStats.value().initialize(
24382417
"bmq://invalid/queue",
2439-
d_state.d_statContext_mp.get(),
2418+
d_state.d_statContext_sp.get(),
24402419
d_state.d_allocator_p);
24412420
// TBD: The queue uri should be '** INVALID QUEUE **', but that can
24422421
// only be done once the stats UI panel has been updated to
@@ -2639,17 +2618,17 @@ bool ClientSession::validatePutMessage(QueueState** queueState,
26392618

26402619
// CREATORS
26412620
ClientSession::ClientSession(
2642-
const bsl::shared_ptr<bmqio::Channel>& channel,
2643-
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
2644-
const bsl::string& sessionDescription,
2645-
mqbi::Dispatcher* dispatcher,
2646-
mqbblp::ClusterCatalog* clusterCatalog,
2647-
mqbi::DomainFactory* domainFactory,
2648-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
2649-
ClientSessionState::BlobSpPool* blobSpPool,
2650-
bdlbb::BlobBufferFactory* bufferFactory,
2651-
bdlmt::EventScheduler* scheduler,
2652-
bslma::Allocator* allocator)
2621+
const bsl::shared_ptr<bmqio::Channel>& channel,
2622+
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
2623+
const bsl::string& sessionDescription,
2624+
mqbi::Dispatcher* dispatcher,
2625+
mqbblp::ClusterCatalog* clusterCatalog,
2626+
mqbi::DomainFactory* domainFactory,
2627+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
2628+
ClientSessionState::BlobSpPool* blobSpPool,
2629+
bdlbb::BlobBufferFactory* bufferFactory,
2630+
bdlmt::EventScheduler* scheduler,
2631+
bslma::Allocator* allocator)
26532632
: d_self(this) // use default allocator
26542633
, d_operationState(e_RUNNING)
26552634
, d_isDisconnecting(false)
@@ -2670,7 +2649,7 @@ ClientSession::ClientSession(
26702649
allocator)
26712650
, d_queueSessionManager(this,
26722651
*d_clientIdentity_p,
2673-
d_state.d_statContext_mp.get(),
2652+
d_state.d_statContext_sp,
26742653
domainFactory,
26752654
allocator)
26762655
, d_clusterCatalog_p(clusterCatalog)

src/groups/mqb/mqba/mqba_clientsession.h

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,6 @@ struct ClientSessionState {
149149
typedef bsl::pair<UnackedMessageInfoMap::iterator, bool>
150150
UnackedMessageInfoMapInsertRc;
151151

152-
typedef bslma::ManagedPtr<bmqst::StatContext> StatContextMp;
153-
154152
public:
155153
// PUBLIC DATA
156154

@@ -173,7 +171,7 @@ struct ClientSessionState {
173171

174172
/// Stat context dedicated to this domain, to use as the parent stat
175173
/// context for any queue in this domain.
176-
StatContextMp d_statContext_mp;
174+
const bsl::shared_ptr<bmqst::StatContext> d_statContext_sp;
177175

178176
/// Blob buffer factory to use.
179177
///
@@ -225,11 +223,11 @@ struct ClientSessionState {
225223
/// builder will use. Memory allocations are performed using the
226224
/// specified `allocator`.
227225
ClientSessionState(
228-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
229-
BlobSpPool* blobSpPool,
230-
bdlbb::BlobBufferFactory* bufferFactory,
231-
bmqp::EncodingType::Enum encodingType,
232-
bslma::Allocator* allocator);
226+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
227+
BlobSpPool* blobSpPool,
228+
bdlbb::BlobBufferFactory* bufferFactory,
229+
bmqp::EncodingType::Enum encodingType,
230+
bslma::Allocator* allocator);
233231
};
234232

235233
// ===================
@@ -632,11 +630,11 @@ class ClientSession : public mqbnet::Session,
632630
mqbi::Dispatcher* dispatcher,
633631
mqbblp::ClusterCatalog* clusterCatalog,
634632
mqbi::DomainFactory* domainFactory,
635-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
636-
ClientSessionState::BlobSpPool* blobSpPool,
637-
bdlbb::BlobBufferFactory* bufferFactory,
638-
bdlmt::EventScheduler* scheduler,
639-
bslma::Allocator* allocator);
633+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
634+
ClientSessionState::BlobSpPool* blobSpPool,
635+
bdlbb::BlobBufferFactory* bufferFactory,
636+
bdlmt::EventScheduler* scheduler,
637+
bslma::Allocator* allocator);
640638

641639
/// Destructor
642640
~ClientSession() BSLS_KEYWORD_OVERRIDE;

src/groups/mqb/mqba/mqba_clientsession.t.cpp

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ class MyMockDomain : public mqbmock::Domain {
585585
/// calls the specified `callback` with a new queue handle created
586586
/// using the specified `handleParameters`. The specified `uri` and
587587
/// `clientContext` are ignored.
588-
void openQueue(BSLA_UNUSED const bmqt::Uri& uri,
588+
void openQueue(const bmqt::Uri& uri,
589589
const bsl::shared_ptr<mqbi::QueueHandleRequesterContext>&
590590
clientContext,
591591
const bmqp_ctrlmsg::QueueHandleParameters& handleParameters,
@@ -605,8 +605,15 @@ class MyMockDomain : public mqbmock::Domain {
605605
handleParameters,
606606
d_allocator_p);
607607

608-
OpenQueueConfirmationCookie confirmationCookie;
609-
confirmationCookie.createInplace(d_allocator_p, d_queueHandle.get());
608+
mqbi::OpenQueueConfirmationCookieSp confirmationCookie;
609+
confirmationCookie.createInplace(d_allocator_p);
610+
confirmationCookie->d_handle = d_queueHandle.get();
611+
612+
confirmationCookie->d_stats_sp.createInplace(d_allocator_p);
613+
confirmationCookie->d_stats_sp->initialize(
614+
uri,
615+
clientContext->statContext().get(),
616+
d_allocator_p);
610617

611618
bmqp_ctrlmsg::Status status(d_allocator_p);
612619
status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;
@@ -648,18 +655,18 @@ class TestBench {
648655

649656
public:
650657
// DATA
651-
bdlbb::PooledBlobBufferFactory d_bufferFactory;
652-
BlobSpPool d_blobSpPool;
658+
bdlbb::PooledBlobBufferFactory d_bufferFactory;
659+
BlobSpPool d_blobSpPool;
653660
bsl::shared_ptr<bmqio::TestChannel> d_channel;
654-
mqbmock::Cluster d_cluster;
655-
mqbmock::Dispatcher d_mockDispatcher;
656-
MyMockDomain d_domain;
657-
mqbmock::DomainFactory d_mockDomainFactory;
658-
bslma::ManagedPtr<bmqst::StatContext> d_clientStatContext_mp;
659-
bdlmt::EventScheduler d_scheduler;
660-
TestClock d_testClock;
661-
mqba::ClientSession d_cs;
662-
bslma::Allocator* d_allocator_p;
661+
mqbmock::Cluster d_cluster;
662+
mqbmock::Dispatcher d_mockDispatcher;
663+
MyMockDomain d_domain;
664+
mqbmock::DomainFactory d_mockDomainFactory;
665+
const bsl::shared_ptr<bmqst::StatContext> d_clientStatContext_sp;
666+
bdlmt::EventScheduler d_scheduler;
667+
TestClock d_testClock;
668+
mqba::ClientSession d_cs;
669+
bslma::Allocator* d_allocator_p;
663670

664671
static const int k_PAYLOAD_LENGTH = 36;
665672

@@ -682,9 +689,8 @@ class TestBench {
682689
, d_mockDispatcher(allocator)
683690
, d_domain(&d_mockDispatcher, &d_cluster, atMostOnce, allocator)
684691
, d_mockDomainFactory(d_domain, allocator)
685-
, d_clientStatContext_mp(
686-
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator)
687-
.managedPtr())
692+
, d_clientStatContext_sp(
693+
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator))
688694
, d_scheduler(bsls::SystemClockType::e_MONOTONIC, allocator)
689695
, d_testClock(d_scheduler)
690696
, d_cs(d_channel,
@@ -693,7 +699,7 @@ class TestBench {
693699
setInDispatcherThread(&d_mockDispatcher),
694700
0, // ClusterCatalog
695701
&d_mockDomainFactory,
696-
d_clientStatContext_mp,
702+
d_clientStatContext_sp,
697703
&d_blobSpPool,
698704
&d_bufferFactory,
699705
&d_scheduler,

0 commit comments

Comments
 (0)