Skip to content
Open

WIP #1008

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqa/bmqa_messageproperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class MessageProperties {
/// Constant representing the maximum size of a
/// `bmqp::MessageProperties` object, so that the below AlignedBuffer
/// is big enough.
static const int k_MAX_SIZEOF_BMQP_MESSAGEPROPERTIES = 184;
static const int k_MAX_SIZEOF_BMQP_MESSAGEPROPERTIES = 192;

// PRIVATE TYPES
typedef bsls::AlignedBuffer<k_MAX_SIZEOF_BMQP_MESSAGEPROPERTIES>
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@

if (mps.streamIn(appData, input.isExtended()) == 0) {
// Learn new schema.
schema = mps.makeSchema(d_allocator_p);
schema = mps.getSchema(d_allocator_p);
schema_p = &schema;
}
}
Expand Down Expand Up @@ -426,7 +426,7 @@
}

// Update message count
*messageCount += subQueueInfos.size();

Check warning on line 429 in src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp

View workflow job for this annotation

GitHub Actions / Build Prometheus plugin [ubuntu]

conversion from ‘size_t’ {aka ‘long unsigned int’} to ‘int’ may change value [-Wconversion]

Check warning on line 429 in src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp

View workflow job for this annotation

GitHub Actions / Build [ubuntu] / Build [ubuntu] abbb8401c4937ed5f91e5c6f1672f5d282ecf95d bmqbrkr bmqtool bmqstoragetool all.it

conversion from ‘size_t’ {aka ‘long unsigned int’} to ‘int’ may change value [-Wconversion]

Check warning on line 429 in src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp

View workflow job for this annotation

GitHub Actions / UT [c++] / Build [ubuntu] abbb8401c4937ed5f91e5c6f1672f5d282ecf95d all.t

conversion from ‘size_t’ {aka ‘long unsigned int’} to ‘int’ may change value [-Wconversion]
}
else {
eventInfos->back().d_ids.push_back(bmqp::EventUtilQueueInfo(
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_eventutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ Flattener::packMesage(const Protocol::SubQueueInfosArray& subQInfo)

if (mps.streamIn(d_appData, input.isExtended()) == 0) {
// Learn new schema.
schema = mps.makeSchema(d_allocator_p);
schema = mps.getSchema(d_allocator_p);
schema_p = &schema;
}
}
Expand Down
48 changes: 29 additions & 19 deletions src/groups/bmq/bmqp/bmqp_messageproperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,14 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
BSLS_ASSERT_SAFE(p.d_offset);

bmqu::BlobPosition position;
int rc = bmqu::BlobUtil::findOffsetSafe(&position,
d_blob.object(),
p.d_offset);
int rc = bmqu::BlobUtil::findOffsetSafe(&position, *d_blob_p, p.d_offset);
BSLS_ASSERT_SAFE(rc == 0);

switch (p.d_type) {
case bmqt::PropertyType::e_BOOL: {
char value;
rc = bmqu::BlobUtil::readNBytes(&value,
d_blob.object(),
*d_blob_p,
position,
sizeof(value));

Expand All @@ -261,7 +259,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_CHAR: {
char value;
rc = bmqu::BlobUtil::readNBytes(&value,
d_blob.object(),
*d_blob_p,
position,
sizeof(value));

Expand All @@ -271,7 +269,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_SHORT: {
bdlb::BigEndianInt16 nboValue;
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
d_blob.object(),
*d_blob_p,
position,
sizeof(nboValue));

Expand All @@ -281,7 +279,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_INT32: {
bdlb::BigEndianInt32 nboValue;
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
d_blob.object(),
*d_blob_p,
position,
sizeof(nboValue));

Expand All @@ -291,7 +289,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_INT64: {
bdlb::BigEndianInt64 nboValue;
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
d_blob.object(),
*d_blob_p,
position,
sizeof(nboValue));

Expand All @@ -302,7 +300,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_STRING: {
bsl::string value(p.d_length, ' ');
rc = bmqu::BlobUtil::readNBytes(&value[0],
d_blob.object(),
*d_blob_p,
position,
p.d_length);

Expand All @@ -312,7 +310,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_BINARY: {
bsl::vector<char> value(p.d_length);
rc = bmqu::BlobUtil::readNBytes(&value[0],
d_blob.object(),
*d_blob_p,
position,
p.d_length);

Expand All @@ -336,6 +334,7 @@ MessageProperties::MessageProperties(bslma::Allocator* basicAllocator)
, d_totalSize(0)
, d_originalSize(0)
, d_blob()
, d_blob_p(d_blob.address())
, d_isBlobConstructed(false)
, d_isDirty(true) // by default, this should be true
, d_mphSize(0)
Expand All @@ -344,6 +343,7 @@ MessageProperties::MessageProperties(bslma::Allocator* basicAllocator)
, d_dataOffset(0)
, d_schema()
, d_originalNumProps(0)
, d_doDeepCopy(true)
{
}

Expand All @@ -354,6 +354,7 @@ MessageProperties::MessageProperties(const MessageProperties& other,
, d_totalSize(other.d_totalSize)
, d_originalSize(other.d_originalSize)
, d_blob()
, d_blob_p(d_blob.address())
, d_isBlobConstructed(false)
, d_isDirty(other.d_isDirty)
, d_mphSize(other.d_mphSize)
Expand All @@ -362,6 +363,7 @@ MessageProperties::MessageProperties(const MessageProperties& other,
, d_dataOffset(other.d_dataOffset)
, d_schema(other.d_schema)
, d_originalNumProps(other.d_originalNumProps)
, d_doDeepCopy(other.d_doDeepCopy)
{
if (other.d_isBlobConstructed) {
new (d_blob.buffer())
Expand Down Expand Up @@ -394,6 +396,7 @@ MessageProperties& MessageProperties::operator=(const MessageProperties& rhs)

if (rhs.d_isBlobConstructed) {
new (d_blob.buffer()) bdlbb::Blob(rhs.d_blob.object(), d_allocator_p);
d_blob_p = d_blob.address();
d_isBlobConstructed = true;
}

Expand Down Expand Up @@ -547,9 +550,15 @@ int MessageProperties::streamInHeader(const bdlbb::Blob& blob)
return rc_INCORRECT_LENGTH; // RETURN
}

new (d_blob.buffer()) bdlbb::Blob(d_allocator_p);
bdlbb::BlobUtil::append(d_blob.address(), blob, 0, d_totalSize);
d_isBlobConstructed = true;
if (d_doDeepCopy) {
new (d_blob.buffer()) bdlbb::Blob(d_allocator_p);
bdlbb::BlobUtil::append(d_blob.address(), blob, 0, d_totalSize);
d_blob_p = d_blob.address();
d_isBlobConstructed = true;
}
else {
d_blob_p = &blob;
}
d_originalSize = d_totalSize;
d_originalNumProps = d_numProps;

Expand All @@ -567,17 +576,16 @@ int MessageProperties::streamInPropertyHeader(Property* property,
BSLS_ASSERT_SAFE(property);
BSLS_ASSERT_SAFE(totalLength);
BSLS_ASSERT_SAFE(d_dataOffset && start);
BSLS_ASSERT_SAFE(d_isBlobConstructed);

bmqu::BlobPosition position;

if (bmqu::BlobUtil::findOffsetSafe(&position, d_blob.object(), start)) {
if (bmqu::BlobUtil::findOffsetSafe(&position, *d_blob_p, start)) {
// Failed to advance blob to next 'MessagePropertyHeader' location.
return rc_NO_MSG_PROPERTY_HEADER; // RETURN
}

bmqu::BlobObjectProxy<MessagePropertyHeader> mpHeader(
&d_blob.object(),
d_blob_p,
position,
d_mphSize,
true, // read flag
Expand Down Expand Up @@ -693,14 +701,14 @@ int MessageProperties::streamInPropertyHeader(Property* property,
name->assign(nameLen, ' ');
bmqu::BlobPosition namePosition;
int rc = bmqu::BlobUtil::findOffsetSafe(&namePosition,
d_blob.object(),
*d_blob_p,
offset);
if (rc) {
return rc_MISSING_PROPERTY_AREA; // RETURN
}

rc = bmqu::BlobUtil::readNBytes(name->begin(),
d_blob.object(),
*d_blob_p,
namePosition,
nameLen);
if (rc) {
Expand Down Expand Up @@ -885,6 +893,8 @@ MessageProperties::streamOut(bdlbb::BlobBufferFactory* bufferFactory,
}

new (d_blob.buffer()) bdlbb::Blob(bufferFactory, d_allocator_p);

d_blob_p = d_blob.address();
d_isBlobConstructed = true;

if (0 == numProperties()) {
Expand Down Expand Up @@ -1006,7 +1016,7 @@ MessageProperties::streamOut(bdlbb::BlobBufferFactory* bufferFactory,
msgPropsHdr.reset();
d_isDirty = false;

return d_blob.object();
return *d_blob_p;
}

bsl::ostream& MessageProperties::print(bsl::ostream& stream,
Expand Down
19 changes: 17 additions & 2 deletions src/groups/bmq/bmqp/bmqp_messageproperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class MessageProperties {
// use when reading incrementally.

mutable BlobObjectBuffer d_blob; // Wire representation.
mutable const bdlbb::Blob* d_blob_p; // Wire representation.

mutable bool d_isBlobConstructed;
// Flag indicating if an instance of
Expand Down Expand Up @@ -244,6 +245,8 @@ class MessageProperties {
// Incremental reading needs it to
// recognize last property.

bool d_doDeepCopy;

private:
// PRIVATE CLASS METHODS

Expand Down Expand Up @@ -409,6 +412,8 @@ class MessageProperties {
const MessagePropertiesInfo& info,
const SchemaPtr& schema);

void setDeepCopy(bool value);

/// Parse and load all previously unparsed properties headers using the
/// specified `isNewStyleProperties` as an indicator of encoding style.
/// (Properties headers are not parsed in the presence of schema unless
Expand Down Expand Up @@ -471,7 +476,12 @@ class MessageProperties {
// accessing the returned reference after this object changes its
// state.

SchemaPtr makeSchema(bslma::Allocator* allocator);
SchemaPtr getSchema(bslma::Allocator* allocator);

/// Look up an ordinal for the specified `name`, load it into the specified
/// `index`, and return `true` if this object has a valid schema. Return
/// `false` otherwise.
bool loadIndex(int* index, const bsl::string& name) const;

/// Return a blob having the BlazingMQ wire protocol representation of
/// this instance. The specified `info` controls MessagePropertyHeader
Expand Down Expand Up @@ -875,10 +885,15 @@ MessageProperties::setPropertyAsBinary(const bsl::string& name,
return setProperty(name, value);
}

inline void MessageProperties::setDeepCopy(bool value)
{
d_doDeepCopy = value;
}

// ACCESSORS

inline MessageProperties::SchemaPtr
MessageProperties::makeSchema(bslma::Allocator* allocator)
MessageProperties::getSchema(bslma::Allocator* allocator)
{
if (!d_schema) {
d_schema.load(new (*allocator)
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_schemalearner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ int SchemaLearner::read(Context& context,
if (rc == 0) {
if (!schemaHandle->d_schema_sp) {
// Learn new schema.
schemaHandle->d_schema_sp = mps->makeSchema(d_allocator_p);
schemaHandle->d_schema_sp = mps->getSchema(d_allocator_p);
}
}
else {
Expand Down
14 changes: 7 additions & 7 deletions src/groups/bmq/bmqp/bmqp_schemalearner.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ static void test2_readingTest()
theLearner.multiplex(clientSession, input),
blob));

bmqp::MessageProperties::SchemaPtr schema1 = out.makeSchema(
bmqp::MessageProperties::SchemaPtr schema1 = out.getSchema(
bmqtst::TestHelperUtil::allocator());

BMQTST_ASSERT_EQ(
Expand All @@ -123,7 +123,7 @@ static void test2_readingTest()
blob));

BMQTST_ASSERT_EQ(schema1,
out.makeSchema(bmqtst::TestHelperUtil::allocator()));
out.getSchema(bmqtst::TestHelperUtil::allocator()));
// subsequent call returns the same Schema

int start = bsl::rand() % num;
Expand All @@ -146,7 +146,7 @@ static void test2_readingTest()
blob));

bmqp::MessageProperties::SchemaPtr schema2;
schema2 = out.makeSchema(bmqtst::TestHelperUtil::allocator());
schema2 = out.getSchema(bmqtst::TestHelperUtil::allocator());
BMQTST_ASSERT_NE(schema1, schema2);
// ...unless the input is recycled

Expand Down Expand Up @@ -196,17 +196,17 @@ static void test3_observingTest()
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out1, input, blob));
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));

bmqp::MessageProperties::SchemaPtr schema1 = out1.makeSchema(
bmqp::MessageProperties::SchemaPtr schema1 = out1.getSchema(
bmqtst::TestHelperUtil::allocator());

BMQTST_ASSERT_EQ(schema1,
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
// subsequent call returns the same Schema

BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));

BMQTST_ASSERT_EQ(schema1,
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
// subsequent call returns the same Schema

bmqp::MessagePropertiesInfo recycledInput(true, 1, true);
Expand All @@ -216,7 +216,7 @@ static void test3_observingTest()
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));

BMQTST_ASSERT_NE(schema1,
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
// ...unless the input is recycled
}

Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,8 @@ void LocalQueue::loadInternals(mqbcmd::LocalQueue* out) const
BSLS_ASSERT_SAFE(d_state_p->queue()->dispatcher()->inDispatcherThread(
d_state_p->queue()));

d_queueEngine_mp->loadInternals(&out->queueEngine());
d_queueEngine_mp->loadInternals(&out->queueEngine(),
bsl::numeric_limits<unsigned int>::max());
}

mqbi::Domain* LocalQueue::domain() const
Expand Down
18 changes: 14 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -860,8 +860,10 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState(
const bsl::string& appId,
const mqbu::StorageKey& appKey,
bslma::Allocator* allocator)
: d_routing_sp(new(*allocator) Routers::AppContext(queueContext, allocator),
allocator)
: d_routing_sp(
new(*allocator)
Routers::AppContext(queueContext, upstreamSubQueueId, allocator),
allocator)
, d_redeliveryList(allocator)
, d_putAsideList(allocator)
, d_priorityCount(0)
Expand Down Expand Up @@ -1374,6 +1376,14 @@ Routers::Result QueueEngineUtil_AppState::selectConsumer(
<< currentMessage->guid();
}
}
else if (result == Routers::e_SUCCESS &&
d_routing_sp->d_queue.d_preader->numRuns() % 10000 == 0) {
BALL_LOG_INFO << "[THROTTLED] Queue '" << d_queue_p->description()
<< "', appId = '" << appId() << "' cache hits "
<< d_routing_sp->root().hits() << " iterations "
<< d_routing_sp->root().iterations() << " runs "
<< d_routing_sp->d_queue.d_preader->numRuns();
}

return result;
}
Expand All @@ -1397,9 +1407,9 @@ int QueueEngineUtil_AppState::setSubscription(
return 0;
}

bool QueueEngineUtil_AppState::evaluateAppSubcription()
bool QueueEngineUtil_AppState::evaluateAppSubcription(unsigned int run)
{
return d_appSubscription.evaluate();
return d_appSubscription.evaluate(run);
}

void QueueEngineUtil_AppState::authorize(const mqbu::StorageKey& appKey,
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ struct QueueEngineUtil_AppState {
int setSubscription(const mqbconfm::Expression& value);

/// Evaluate the application subscription
bool evaluateAppSubcription();
bool evaluateAppSubcription(unsigned int run);

/// Change the state to authorized, thus enabling delivery
void authorize(const mqbu::StorageKey& appKey, unsigned int appOrdinal);
Expand Down
Loading
Loading