Skip to content

Commit 6f33a89

Browse files
committed
WIP
Signed-off-by: dorjesinpo <[email protected]>
1 parent 3ec7eb4 commit 6f33a89

13 files changed

+643
-156
lines changed

src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ int QueueManager::onPushEvent(QueueManager::EventInfos* eventInfos,
385385

386386
if (mps.streamIn(appData, input.isExtended()) == 0) {
387387
// Learn new schema.
388-
schema = mps.makeSchema(d_allocator_p);
388+
schema = mps.getSchema(d_allocator_p);
389389
schema_p = &schema;
390390
}
391391
}

src/groups/bmq/bmqp/bmqp_eventutil.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ Flattener::packMesage(const Protocol::SubQueueInfosArray& subQInfo)
393393

394394
if (mps.streamIn(d_appData, input.isExtended()) == 0) {
395395
// Learn new schema.
396-
schema = mps.makeSchema(d_allocator_p);
396+
schema = mps.getSchema(d_allocator_p);
397397
schema_p = &schema;
398398
}
399399
}

src/groups/bmq/bmqp/bmqp_messageproperties.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,12 @@ class MessageProperties {
471471
// accessing the returned reference after this object changes its
472472
// state.
473473

474-
SchemaPtr makeSchema(bslma::Allocator* allocator);
474+
SchemaPtr getSchema(bslma::Allocator* allocator);
475+
476+
/// Look up an ordinal for the specified `name`, load it into the specified
477+
/// `index`, and return `true` if this object has a valid schema. Return
478+
/// `false` otherwise.
479+
bool loadIndex(int* index, const bsl::string& name) const;
475480

476481
/// Return a blob having the BlazingMQ wire protocol representation of
477482
/// this instance. The specified `info` controls MessagePropertyHeader
@@ -878,7 +883,7 @@ MessageProperties::setPropertyAsBinary(const bsl::string& name,
878883
// ACCESSORS
879884

880885
inline MessageProperties::SchemaPtr
881-
MessageProperties::makeSchema(bslma::Allocator* allocator)
886+
MessageProperties::getSchema(bslma::Allocator* allocator)
882887
{
883888
if (!d_schema) {
884889
d_schema.load(new (*allocator)

src/groups/bmq/bmqp/bmqp_schemalearner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ int SchemaLearner::read(Context& context,
396396
if (rc == 0) {
397397
if (!schemaHandle->d_schema_sp) {
398398
// Learn new schema.
399-
schemaHandle->d_schema_sp = mps->makeSchema(d_allocator_p);
399+
schemaHandle->d_schema_sp = mps->getSchema(d_allocator_p);
400400
}
401401
}
402402
else {

src/groups/bmq/bmqp/bmqp_schemalearner.t.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ static void test2_readingTest()
112112
theLearner.multiplex(clientSession, input),
113113
blob));
114114

115-
bmqp::MessageProperties::SchemaPtr schema1 = out.makeSchema(
115+
bmqp::MessageProperties::SchemaPtr schema1 = out.getSchema(
116116
bmqtst::TestHelperUtil::allocator());
117117

118118
BMQTST_ASSERT_EQ(
@@ -123,7 +123,7 @@ static void test2_readingTest()
123123
blob));
124124

125125
BMQTST_ASSERT_EQ(schema1,
126-
out.makeSchema(bmqtst::TestHelperUtil::allocator()));
126+
out.getSchema(bmqtst::TestHelperUtil::allocator()));
127127
// subsequent call returns the same Schema
128128

129129
int start = bsl::rand() % num;
@@ -146,7 +146,7 @@ static void test2_readingTest()
146146
blob));
147147

148148
bmqp::MessageProperties::SchemaPtr schema2;
149-
schema2 = out.makeSchema(bmqtst::TestHelperUtil::allocator());
149+
schema2 = out.getSchema(bmqtst::TestHelperUtil::allocator());
150150
BMQTST_ASSERT_NE(schema1, schema2);
151151
// ...unless the input is recycled
152152

@@ -196,17 +196,17 @@ static void test3_observingTest()
196196
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out1, input, blob));
197197
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
198198

199-
bmqp::MessageProperties::SchemaPtr schema1 = out1.makeSchema(
199+
bmqp::MessageProperties::SchemaPtr schema1 = out1.getSchema(
200200
bmqtst::TestHelperUtil::allocator());
201201

202202
BMQTST_ASSERT_EQ(schema1,
203-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
203+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
204204
// subsequent call returns the same Schema
205205

206206
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
207207

208208
BMQTST_ASSERT_EQ(schema1,
209-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
209+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
210210
// subsequent call returns the same Schema
211211

212212
bmqp::MessagePropertiesInfo recycledInput(true, 1, true);
@@ -216,7 +216,7 @@ static void test3_observingTest()
216216
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
217217

218218
BMQTST_ASSERT_NE(schema1,
219-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
219+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
220220
// ...unless the input is recycled
221221
}
222222

src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -860,8 +860,10 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState(
860860
const bsl::string& appId,
861861
const mqbu::StorageKey& appKey,
862862
bslma::Allocator* allocator)
863-
: d_routing_sp(new(*allocator) Routers::AppContext(queueContext, allocator),
864-
allocator)
863+
: d_routing_sp(
864+
new(*allocator)
865+
Routers::AppContext(queueContext, upstreamSubQueueId, allocator),
866+
allocator)
865867
, d_redeliveryList(allocator)
866868
, d_putAsideList(allocator)
867869
, d_priorityCount(0)
@@ -1374,6 +1376,14 @@ Routers::Result QueueEngineUtil_AppState::selectConsumer(
13741376
<< currentMessage->guid();
13751377
}
13761378
}
1379+
else if (result == Routers::e_SUCCESS &&
1380+
d_routing_sp->d_queue.d_preader->numRuns() % 10000 == 0) {
1381+
BALL_LOG_INFO << "[THROTTLED] Queue '" << d_queue_p->description()
1382+
<< "', appId = '" << appId() << "' cache hits "
1383+
<< d_routing_sp->root().hits() << " iterations "
1384+
<< d_routing_sp->root().iterations() << " runs "
1385+
<< d_routing_sp->d_queue.d_preader->numRuns();
1386+
}
13771387

13781388
return result;
13791389
}
@@ -1397,9 +1407,9 @@ int QueueEngineUtil_AppState::setSubscription(
13971407
return 0;
13981408
}
13991409

1400-
bool QueueEngineUtil_AppState::evaluateAppSubcription()
1410+
bool QueueEngineUtil_AppState::evaluateAppSubcription(unsigned int run)
14011411
{
1402-
return d_appSubscription.evaluate();
1412+
return d_appSubscription.evaluate(run);
14031413
}
14041414

14051415
void QueueEngineUtil_AppState::authorize(const mqbu::StorageKey& appKey,

src/groups/mqb/mqbblp/mqbblp_queueengineutil.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ struct QueueEngineUtil_AppState {
499499
int setSubscription(const mqbconfm::Expression& value);
500500

501501
/// Evaluate the application subscription
502-
bool evaluateAppSubcription();
502+
bool evaluateAppSubcription(unsigned int run);
503503

504504
/// Change the state to authorized, thus enabling delivery
505505
void authorize(const mqbu::StorageKey& appKey, unsigned int appOrdinal);

src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1244,7 +1244,8 @@ void RelayQueueEngine::configureHandle(
12441244
App_State* app = findApp(upstreamSubQueueId);
12451245
BSLS_ASSERT_SAFE(app);
12461246

1247-
context->initializeRouting(d_queueState_p->routingContext());
1247+
context->initializeRouting(d_queueState_p->routingContext(),
1248+
app->upstreamSubQueueId());
12481249

12491250
configureApp(*app, handle, streamParameters, context);
12501251
}

src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
203203

204204
void reset();
205205

206-
void initializeRouting(Routers::QueueRoutingContext& queueContext);
206+
void initializeRouting(Routers::QueueRoutingContext& queueContext,
207+
unsigned int upstreamSubQueueId);
207208
};
208209

209210
private:
@@ -659,10 +660,13 @@ inline void RelayQueueEngine::ConfigureContext::resetCallback()
659660
}
660661

661662
inline void RelayQueueEngine::ConfigureContext::initializeRouting(
662-
Routers::QueueRoutingContext& queueContext)
663+
Routers::QueueRoutingContext& queueContext,
664+
unsigned int upstreamSubQueueId)
663665
{
664666
d_routing_sp.reset(new (*d_allocator_p)
665-
Routers::AppContext(queueContext, d_allocator_p),
667+
Routers::AppContext(queueContext,
668+
upstreamSubQueueId,
669+
d_allocator_p),
666670
d_allocator_p);
667671
}
668672

src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -967,6 +967,7 @@ void RootQueueEngine::configureHandle(
967967

968968
affectedApp->routing().reset(new (*d_allocator_p) Routers::AppContext(
969969
d_queueState_p->routingContext(),
970+
affectedApp->upstreamSubQueueId(),
970971
d_allocator_p),
971972
d_allocator_p);
972973

@@ -1190,6 +1191,7 @@ void RootQueueEngine::releaseHandle(
11901191
bsl::shared_ptr<Routers::AppContext> replacement(
11911192
new (*d_allocator_p) Routers::AppContext(
11921193
d_queueState_p->routingContext(),
1194+
app->upstreamSubQueueId(),
11931195
d_allocator_p),
11941196
d_allocator_p);
11951197

@@ -1738,8 +1740,8 @@ RootQueueEngine::logAppSubscriptionInfo(bsl::ostream& stream,
17381740
}
17391741
else {
17401742
BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = "
1741-
<< rc;
1742-
stream << "Message Properties: Failed to acquire [rc: " << rc
1743+
<< ret;
1744+
stream << "Message Properties: Failed to acquire [rc: " << ret
17431745
<< "]\n";
17441746
}
17451747
}
@@ -2066,7 +2068,7 @@ mqbi::StorageResult::Enum RootQueueEngine::evaluateAppSubscriptions(
20662068

20672069
// 'setPropertiesReader' is done in 'QueueRoutingContext' ctor
20682070

2069-
queue.d_preader->next(appData, mpi);
2071+
queue.d_preader->start(appData, mpi);
20702072

20712073
bdlb::ScopeExitAny guard(
20722074
bdlf::BindUtil::bind(&Routers::MessagePropertiesReader::clear,
@@ -2077,7 +2079,7 @@ mqbi::StorageResult::Enum RootQueueEngine::evaluateAppSubscriptions(
20772079

20782080
for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) {
20792081
AppStateSp& app = it->second;
2080-
if (!app->evaluateAppSubcription()) {
2082+
if (!app->evaluateAppSubcription(queue.d_preader->numRuns())) {
20812083
result = d_queueState_p->storage()->autoConfirm(app->appKey(),
20822084
timestamp);
20832085

0 commit comments

Comments
 (0)