Skip to content

Commit 0f3f999

Browse files
committed
WIP
Signed-off-by: dorjesinpo <[email protected]>
1 parent 3ec7eb4 commit 0f3f999

21 files changed

+722
-196
lines changed

src/groups/bmq/bmqa/bmqa_messageproperties.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class MessageProperties {
101101
/// Constant representing the maximum size of a
102102
/// `bmqp::MessageProperties` object, so that the below AlignedBuffer
103103
/// is big enough.
104-
static const int k_MAX_SIZEOF_BMQP_MESSAGEPROPERTIES = 184;
104+
static const int k_MAX_SIZEOF_BMQP_MESSAGEPROPERTIES = 192;
105105

106106
// PRIVATE TYPES
107107
typedef bsls::AlignedBuffer<k_MAX_SIZEOF_BMQP_MESSAGEPROPERTIES>

src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ int QueueManager::onPushEvent(QueueManager::EventInfos* eventInfos,
385385

386386
if (mps.streamIn(appData, input.isExtended()) == 0) {
387387
// Learn new schema.
388-
schema = mps.makeSchema(d_allocator_p);
388+
schema = mps.getSchema(d_allocator_p);
389389
schema_p = &schema;
390390
}
391391
}

src/groups/bmq/bmqp/bmqp_eventutil.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ Flattener::packMesage(const Protocol::SubQueueInfosArray& subQInfo)
393393

394394
if (mps.streamIn(d_appData, input.isExtended()) == 0) {
395395
// Learn new schema.
396-
schema = mps.makeSchema(d_allocator_p);
396+
schema = mps.getSchema(d_allocator_p);
397397
schema_p = &schema;
398398
}
399399
}

src/groups/bmq/bmqp/bmqp_messageproperties.cpp

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -242,16 +242,14 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
242242
BSLS_ASSERT_SAFE(p.d_offset);
243243

244244
bmqu::BlobPosition position;
245-
int rc = bmqu::BlobUtil::findOffsetSafe(&position,
246-
d_blob.object(),
247-
p.d_offset);
245+
int rc = bmqu::BlobUtil::findOffsetSafe(&position, *d_blob_p, p.d_offset);
248246
BSLS_ASSERT_SAFE(rc == 0);
249247

250248
switch (p.d_type) {
251249
case bmqt::PropertyType::e_BOOL: {
252250
char value;
253251
rc = bmqu::BlobUtil::readNBytes(&value,
254-
d_blob.object(),
252+
*d_blob_p,
255253
position,
256254
sizeof(value));
257255

@@ -261,7 +259,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
261259
case bmqt::PropertyType::e_CHAR: {
262260
char value;
263261
rc = bmqu::BlobUtil::readNBytes(&value,
264-
d_blob.object(),
262+
*d_blob_p,
265263
position,
266264
sizeof(value));
267265

@@ -271,7 +269,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
271269
case bmqt::PropertyType::e_SHORT: {
272270
bdlb::BigEndianInt16 nboValue;
273271
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
274-
d_blob.object(),
272+
*d_blob_p,
275273
position,
276274
sizeof(nboValue));
277275

@@ -281,7 +279,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
281279
case bmqt::PropertyType::e_INT32: {
282280
bdlb::BigEndianInt32 nboValue;
283281
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
284-
d_blob.object(),
282+
*d_blob_p,
285283
position,
286284
sizeof(nboValue));
287285

@@ -291,7 +289,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
291289
case bmqt::PropertyType::e_INT64: {
292290
bdlb::BigEndianInt64 nboValue;
293291
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
294-
d_blob.object(),
292+
*d_blob_p,
295293
position,
296294
sizeof(nboValue));
297295

@@ -302,7 +300,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
302300
case bmqt::PropertyType::e_STRING: {
303301
bsl::string value(p.d_length, ' ');
304302
rc = bmqu::BlobUtil::readNBytes(&value[0],
305-
d_blob.object(),
303+
*d_blob_p,
306304
position,
307305
p.d_length);
308306

@@ -312,7 +310,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
312310
case bmqt::PropertyType::e_BINARY: {
313311
bsl::vector<char> value(p.d_length);
314312
rc = bmqu::BlobUtil::readNBytes(&value[0],
315-
d_blob.object(),
313+
*d_blob_p,
316314
position,
317315
p.d_length);
318316

@@ -336,6 +334,7 @@ MessageProperties::MessageProperties(bslma::Allocator* basicAllocator)
336334
, d_totalSize(0)
337335
, d_originalSize(0)
338336
, d_blob()
337+
, d_blob_p(d_blob.address())
339338
, d_isBlobConstructed(false)
340339
, d_isDirty(true) // by default, this should be true
341340
, d_mphSize(0)
@@ -344,6 +343,7 @@ MessageProperties::MessageProperties(bslma::Allocator* basicAllocator)
344343
, d_dataOffset(0)
345344
, d_schema()
346345
, d_originalNumProps(0)
346+
, d_doDeepCopy(true)
347347
{
348348
}
349349

@@ -354,6 +354,7 @@ MessageProperties::MessageProperties(const MessageProperties& other,
354354
, d_totalSize(other.d_totalSize)
355355
, d_originalSize(other.d_originalSize)
356356
, d_blob()
357+
, d_blob_p(d_blob.address())
357358
, d_isBlobConstructed(false)
358359
, d_isDirty(other.d_isDirty)
359360
, d_mphSize(other.d_mphSize)
@@ -362,6 +363,7 @@ MessageProperties::MessageProperties(const MessageProperties& other,
362363
, d_dataOffset(other.d_dataOffset)
363364
, d_schema(other.d_schema)
364365
, d_originalNumProps(other.d_originalNumProps)
366+
, d_doDeepCopy(other.d_doDeepCopy)
365367
{
366368
if (other.d_isBlobConstructed) {
367369
new (d_blob.buffer())
@@ -394,6 +396,7 @@ MessageProperties& MessageProperties::operator=(const MessageProperties& rhs)
394396

395397
if (rhs.d_isBlobConstructed) {
396398
new (d_blob.buffer()) bdlbb::Blob(rhs.d_blob.object(), d_allocator_p);
399+
d_blob_p = d_blob.address();
397400
d_isBlobConstructed = true;
398401
}
399402

@@ -547,9 +550,15 @@ int MessageProperties::streamInHeader(const bdlbb::Blob& blob)
547550
return rc_INCORRECT_LENGTH; // RETURN
548551
}
549552

550-
new (d_blob.buffer()) bdlbb::Blob(d_allocator_p);
551-
bdlbb::BlobUtil::append(d_blob.address(), blob, 0, d_totalSize);
552-
d_isBlobConstructed = true;
553+
if (d_doDeepCopy) {
554+
new (d_blob.buffer()) bdlbb::Blob(d_allocator_p);
555+
bdlbb::BlobUtil::append(d_blob.address(), blob, 0, d_totalSize);
556+
d_blob_p = d_blob.address();
557+
d_isBlobConstructed = true;
558+
}
559+
else {
560+
d_blob_p = &blob;
561+
}
553562
d_originalSize = d_totalSize;
554563
d_originalNumProps = d_numProps;
555564

@@ -567,17 +576,16 @@ int MessageProperties::streamInPropertyHeader(Property* property,
567576
BSLS_ASSERT_SAFE(property);
568577
BSLS_ASSERT_SAFE(totalLength);
569578
BSLS_ASSERT_SAFE(d_dataOffset && start);
570-
BSLS_ASSERT_SAFE(d_isBlobConstructed);
571579

572580
bmqu::BlobPosition position;
573581

574-
if (bmqu::BlobUtil::findOffsetSafe(&position, d_blob.object(), start)) {
582+
if (bmqu::BlobUtil::findOffsetSafe(&position, *d_blob_p, start)) {
575583
// Failed to advance blob to next 'MessagePropertyHeader' location.
576584
return rc_NO_MSG_PROPERTY_HEADER; // RETURN
577585
}
578586

579587
bmqu::BlobObjectProxy<MessagePropertyHeader> mpHeader(
580-
&d_blob.object(),
588+
d_blob_p,
581589
position,
582590
d_mphSize,
583591
true, // read flag
@@ -693,14 +701,14 @@ int MessageProperties::streamInPropertyHeader(Property* property,
693701
name->assign(nameLen, ' ');
694702
bmqu::BlobPosition namePosition;
695703
int rc = bmqu::BlobUtil::findOffsetSafe(&namePosition,
696-
d_blob.object(),
704+
*d_blob_p,
697705
offset);
698706
if (rc) {
699707
return rc_MISSING_PROPERTY_AREA; // RETURN
700708
}
701709

702710
rc = bmqu::BlobUtil::readNBytes(name->begin(),
703-
d_blob.object(),
711+
*d_blob_p,
704712
namePosition,
705713
nameLen);
706714
if (rc) {
@@ -885,6 +893,8 @@ MessageProperties::streamOut(bdlbb::BlobBufferFactory* bufferFactory,
885893
}
886894

887895
new (d_blob.buffer()) bdlbb::Blob(bufferFactory, d_allocator_p);
896+
897+
d_blob_p = d_blob.address();
888898
d_isBlobConstructed = true;
889899

890900
if (0 == numProperties()) {
@@ -1006,7 +1016,7 @@ MessageProperties::streamOut(bdlbb::BlobBufferFactory* bufferFactory,
10061016
msgPropsHdr.reset();
10071017
d_isDirty = false;
10081018

1009-
return d_blob.object();
1019+
return *d_blob_p;
10101020
}
10111021

10121022
bsl::ostream& MessageProperties::print(bsl::ostream& stream,

src/groups/bmq/bmqp/bmqp_messageproperties.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ class MessageProperties {
216216
// use when reading incrementally.
217217

218218
mutable BlobObjectBuffer d_blob; // Wire representation.
219+
mutable const bdlbb::Blob* d_blob_p; // Wire representation.
219220

220221
mutable bool d_isBlobConstructed;
221222
// Flag indicating if an instance of
@@ -244,6 +245,8 @@ class MessageProperties {
244245
// Incremental reading needs it to
245246
// recognize last property.
246247

248+
bool d_doDeepCopy;
249+
247250
private:
248251
// PRIVATE CLASS METHODS
249252

@@ -409,6 +412,8 @@ class MessageProperties {
409412
const MessagePropertiesInfo& info,
410413
const SchemaPtr& schema);
411414

415+
void setDeepCopy(bool value);
416+
412417
/// Parse and load all previously unparsed properties headers using the
413418
/// specified `isNewStyleProperties` as an indicator of encoding style.
414419
/// (Properties headers are not parsed in the presence of schema unless
@@ -471,7 +476,12 @@ class MessageProperties {
471476
// accessing the returned reference after this object changes its
472477
// state.
473478

474-
SchemaPtr makeSchema(bslma::Allocator* allocator);
479+
SchemaPtr getSchema(bslma::Allocator* allocator);
480+
481+
/// Look up an ordinal for the specified `name`, load it into the specified
482+
/// `index`, and return `true` if this object has a valid schema. Return
483+
/// `false` otherwise.
484+
bool loadIndex(int* index, const bsl::string& name) const;
475485

476486
/// Return a blob having the BlazingMQ wire protocol representation of
477487
/// this instance. The specified `info` controls MessagePropertyHeader
@@ -875,10 +885,15 @@ MessageProperties::setPropertyAsBinary(const bsl::string& name,
875885
return setProperty(name, value);
876886
}
877887

888+
inline void MessageProperties::setDeepCopy(bool value)
889+
{
890+
d_doDeepCopy = value;
891+
}
892+
878893
// ACCESSORS
879894

880895
inline MessageProperties::SchemaPtr
881-
MessageProperties::makeSchema(bslma::Allocator* allocator)
896+
MessageProperties::getSchema(bslma::Allocator* allocator)
882897
{
883898
if (!d_schema) {
884899
d_schema.load(new (*allocator)

src/groups/bmq/bmqp/bmqp_schemalearner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ int SchemaLearner::read(Context& context,
396396
if (rc == 0) {
397397
if (!schemaHandle->d_schema_sp) {
398398
// Learn new schema.
399-
schemaHandle->d_schema_sp = mps->makeSchema(d_allocator_p);
399+
schemaHandle->d_schema_sp = mps->getSchema(d_allocator_p);
400400
}
401401
}
402402
else {

src/groups/bmq/bmqp/bmqp_schemalearner.t.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ static void test2_readingTest()
112112
theLearner.multiplex(clientSession, input),
113113
blob));
114114

115-
bmqp::MessageProperties::SchemaPtr schema1 = out.makeSchema(
115+
bmqp::MessageProperties::SchemaPtr schema1 = out.getSchema(
116116
bmqtst::TestHelperUtil::allocator());
117117

118118
BMQTST_ASSERT_EQ(
@@ -123,7 +123,7 @@ static void test2_readingTest()
123123
blob));
124124

125125
BMQTST_ASSERT_EQ(schema1,
126-
out.makeSchema(bmqtst::TestHelperUtil::allocator()));
126+
out.getSchema(bmqtst::TestHelperUtil::allocator()));
127127
// subsequent call returns the same Schema
128128

129129
int start = bsl::rand() % num;
@@ -146,7 +146,7 @@ static void test2_readingTest()
146146
blob));
147147

148148
bmqp::MessageProperties::SchemaPtr schema2;
149-
schema2 = out.makeSchema(bmqtst::TestHelperUtil::allocator());
149+
schema2 = out.getSchema(bmqtst::TestHelperUtil::allocator());
150150
BMQTST_ASSERT_NE(schema1, schema2);
151151
// ...unless the input is recycled
152152

@@ -196,17 +196,17 @@ static void test3_observingTest()
196196
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out1, input, blob));
197197
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
198198

199-
bmqp::MessageProperties::SchemaPtr schema1 = out1.makeSchema(
199+
bmqp::MessageProperties::SchemaPtr schema1 = out1.getSchema(
200200
bmqtst::TestHelperUtil::allocator());
201201

202202
BMQTST_ASSERT_EQ(schema1,
203-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
203+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
204204
// subsequent call returns the same Schema
205205

206206
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
207207

208208
BMQTST_ASSERT_EQ(schema1,
209-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
209+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
210210
// subsequent call returns the same Schema
211211

212212
bmqp::MessagePropertiesInfo recycledInput(true, 1, true);
@@ -216,7 +216,7 @@ static void test3_observingTest()
216216
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
217217

218218
BMQTST_ASSERT_NE(schema1,
219-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
219+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
220220
// ...unless the input is recycled
221221
}
222222

src/groups/mqb/mqbblp/mqbblp_localqueue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,8 @@ void LocalQueue::loadInternals(mqbcmd::LocalQueue* out) const
657657
BSLS_ASSERT_SAFE(d_state_p->queue()->dispatcher()->inDispatcherThread(
658658
d_state_p->queue()));
659659

660-
d_queueEngine_mp->loadInternals(&out->queueEngine());
660+
d_queueEngine_mp->loadInternals(&out->queueEngine(),
661+
bsl::numeric_limits<unsigned int>::max());
661662
}
662663

663664
mqbi::Domain* LocalQueue::domain() const

src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -860,8 +860,10 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState(
860860
const bsl::string& appId,
861861
const mqbu::StorageKey& appKey,
862862
bslma::Allocator* allocator)
863-
: d_routing_sp(new(*allocator) Routers::AppContext(queueContext, allocator),
864-
allocator)
863+
: d_routing_sp(
864+
new(*allocator)
865+
Routers::AppContext(queueContext, upstreamSubQueueId, allocator),
866+
allocator)
865867
, d_redeliveryList(allocator)
866868
, d_putAsideList(allocator)
867869
, d_priorityCount(0)
@@ -1374,6 +1376,14 @@ Routers::Result QueueEngineUtil_AppState::selectConsumer(
13741376
<< currentMessage->guid();
13751377
}
13761378
}
1379+
else if (result == Routers::e_SUCCESS &&
1380+
d_routing_sp->d_queue.d_preader->numRuns() % 10000 == 0) {
1381+
BALL_LOG_INFO << "[THROTTLED] Queue '" << d_queue_p->description()
1382+
<< "', appId = '" << appId() << "' cache hits "
1383+
<< d_routing_sp->root().hits() << " iterations "
1384+
<< d_routing_sp->root().iterations() << " runs "
1385+
<< d_routing_sp->d_queue.d_preader->numRuns();
1386+
}
13771387

13781388
return result;
13791389
}
@@ -1397,9 +1407,9 @@ int QueueEngineUtil_AppState::setSubscription(
13971407
return 0;
13981408
}
13991409

1400-
bool QueueEngineUtil_AppState::evaluateAppSubcription()
1410+
bool QueueEngineUtil_AppState::evaluateAppSubcription(unsigned int run)
14011411
{
1402-
return d_appSubscription.evaluate();
1412+
return d_appSubscription.evaluate(run);
14031413
}
14041414

14051415
void QueueEngineUtil_AppState::authorize(const mqbu::StorageKey& appKey,

src/groups/mqb/mqbblp/mqbblp_queueengineutil.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ struct QueueEngineUtil_AppState {
499499
int setSubscription(const mqbconfm::Expression& value);
500500

501501
/// Evaluate the application subscription
502-
bool evaluateAppSubcription();
502+
bool evaluateAppSubcription(unsigned int run);
503503

504504
/// Change the state to authorized, thus enabling delivery
505505
void authorize(const mqbu::StorageKey& appKey, unsigned int appOrdinal);

0 commit comments

Comments
 (0)