Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 96 additions & 37 deletions src/groups/bmq/bmqp/bmqp_messageproperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ class PropertyValueStreamOutVisitor {
sizeof(nboValue));
}

void operator()(const bsl::string& value)
void operator()(const bsl::string_view& value)
{
bdlbb::BlobUtil::append(d_blob_p,
value.c_str(),
value.data(),
static_cast<int>(value.length()));
}

Expand Down Expand Up @@ -234,6 +234,24 @@ MessageProperties::getPropertyValue(const Property& property) const
return property.d_value;
}

const MessageProperties::PropertyVariant&
MessageProperties::getPropertyValueAsString(const Property& property) const
{
if (property.d_value.isUnset()) {
BSLA_MAYBE_UNUSED bool result = streamInPropertyValue(property);
BSLS_ASSERT_SAFE(result);
// We assert 'true' result because the length and offset have already
// been checked.
}
PropertyVariant& v = property.d_value;

if (v.is<bsl::string_view>()) {
v = bsl::string(v.the<bsl::string_view>(), d_allocator_p);
}

return v;
}

bool MessageProperties::streamInPropertyValue(const Property& p) const
{
// PRECONDITIONS
Expand All @@ -242,16 +260,14 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
BSLS_ASSERT_SAFE(p.d_offset);

bmqu::BlobPosition position;
int rc = bmqu::BlobUtil::findOffsetSafe(&position,
d_blob.object(),
p.d_offset);
int rc = bmqu::BlobUtil::findOffsetSafe(&position, *d_blob_p, p.d_offset);
BSLS_ASSERT_SAFE(rc == 0);

switch (p.d_type) {
case bmqt::PropertyType::e_BOOL: {
char value;
rc = bmqu::BlobUtil::readNBytes(&value,
d_blob.object(),
*d_blob_p,
position,
sizeof(value));

Expand All @@ -261,7 +277,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_CHAR: {
char value;
rc = bmqu::BlobUtil::readNBytes(&value,
d_blob.object(),
*d_blob_p,
position,
sizeof(value));

Expand All @@ -271,7 +287,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_SHORT: {
bdlb::BigEndianInt16 nboValue;
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
d_blob.object(),
*d_blob_p,
position,
sizeof(nboValue));

Expand All @@ -281,7 +297,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_INT32: {
bdlb::BigEndianInt32 nboValue;
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
d_blob.object(),
*d_blob_p,
position,
sizeof(nboValue));

Expand All @@ -291,7 +307,7 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
case bmqt::PropertyType::e_INT64: {
bdlb::BigEndianInt64 nboValue;
rc = bmqu::BlobUtil::readNBytes(reinterpret_cast<char*>(&nboValue),
d_blob.object(),
*d_blob_p,
position,
sizeof(nboValue));

Expand All @@ -300,19 +316,38 @@ bool MessageProperties::streamInPropertyValue(const Property& p) const
}

case bmqt::PropertyType::e_STRING: {
bsl::string value(p.d_length, ' ');
rc = bmqu::BlobUtil::readNBytes(&value[0],
d_blob.object(),
position,
p.d_length);
// Try to avoid copying the string. 'd_blop' already keeps a copy.
bmqu::BlobPosition end;
const int ret =
bmqu::BlobUtil::findOffset(&end, *d_blob_p, position, p.d_length);
bool doCopy = true;
if (ret == 0) {
// Do not align
if (position.buffer() == end.buffer() ||
(position.buffer() + 1 == end.buffer() && end.byte() == 0)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth extracting this to bmqu::BlobUtil::isDataContinuous(start, end)

// Section is good
char* start = d_blob_p->buffer(position.buffer()).data() +
position.byte();

p.d_value = bsl::string_view(start, p.d_length);
doCopy = false;
}
}
if (doCopy) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still get rid of doCopy if we just return when we read a property

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but is it that much better?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the current one

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without micro-benchmarks we can only speculate if it is better or not

bsl::string value(p.d_length, ' ', d_allocator_p);
rc = bmqu::BlobUtil::readNBytes(&value[0],
*d_blob_p,
position,
p.d_length);
p.d_value = value;
}

p.d_value = value;
break;
}
case bmqt::PropertyType::e_BINARY: {
bsl::vector<char> value(p.d_length);
bsl::vector<char> value(p.d_length, d_allocator_p);
rc = bmqu::BlobUtil::readNBytes(&value[0],
d_blob.object(),
*d_blob_p,
position,
p.d_length);

Expand All @@ -336,14 +371,16 @@ MessageProperties::MessageProperties(bslma::Allocator* basicAllocator)
, d_totalSize(0)
, d_originalSize(0)
, d_blob()
, d_isBlobConstructed(false)
, d_isDirty(true) // by default, this should be true
, d_blob_p(d_blob.address())
, d_mphSize(0)
, d_mphOffset(0)
, d_numProps(0)
, d_dataOffset(0)
, d_schema()
, d_originalNumProps(0)
, d_isBlobConstructed(false)
, d_isDirty(true) // by default, this should be true
, d_doDeepCopy(true)
{
}

Expand All @@ -354,14 +391,16 @@ MessageProperties::MessageProperties(const MessageProperties& other,
, d_totalSize(other.d_totalSize)
, d_originalSize(other.d_originalSize)
, d_blob()
, d_isBlobConstructed(false)
, d_isDirty(other.d_isDirty)
, d_blob_p(d_blob.address())
, d_mphSize(other.d_mphSize)
, d_mphOffset(other.d_mphOffset)
, d_numProps(other.d_numProps)
, d_dataOffset(other.d_dataOffset)
, d_schema(other.d_schema)
, d_originalNumProps(other.d_originalNumProps)
, d_isBlobConstructed(false)
, d_isDirty(other.d_isDirty)
, d_doDeepCopy(other.d_doDeepCopy)
{
if (other.d_isBlobConstructed) {
new (d_blob.buffer())
Expand Down Expand Up @@ -394,6 +433,7 @@ MessageProperties& MessageProperties::operator=(const MessageProperties& rhs)

if (rhs.d_isBlobConstructed) {
new (d_blob.buffer()) bdlbb::Blob(rhs.d_blob.object(), d_allocator_p);
d_blob_p = d_blob.address();
d_isBlobConstructed = true;
}

Expand Down Expand Up @@ -547,9 +587,15 @@ int MessageProperties::streamInHeader(const bdlbb::Blob& blob)
return rc_INCORRECT_LENGTH; // RETURN
}

new (d_blob.buffer()) bdlbb::Blob(d_allocator_p);
bdlbb::BlobUtil::append(d_blob.address(), blob, 0, d_totalSize);
d_isBlobConstructed = true;
if (d_doDeepCopy) {
new (d_blob.buffer()) bdlbb::Blob(d_allocator_p);
bdlbb::BlobUtil::append(d_blob.address(), blob, 0, d_totalSize);
d_blob_p = d_blob.address();
d_isBlobConstructed = true;
}
else {
d_blob_p = &blob;
}
d_originalSize = d_totalSize;
d_originalNumProps = d_numProps;

Expand All @@ -567,17 +613,18 @@ int MessageProperties::streamInPropertyHeader(Property* property,
BSLS_ASSERT_SAFE(property);
BSLS_ASSERT_SAFE(totalLength);
BSLS_ASSERT_SAFE(d_dataOffset && start);
BSLS_ASSERT_SAFE(d_isBlobConstructed);
BSLS_ASSERT_SAFE(d_blob_p);
BSLS_ASSERT_SAFE(d_isBlobConstructed || d_blob_p != d_blob.address());

bmqu::BlobPosition position;

if (bmqu::BlobUtil::findOffsetSafe(&position, d_blob.object(), start)) {
if (bmqu::BlobUtil::findOffsetSafe(&position, *d_blob_p, start)) {
// Failed to advance blob to next 'MessagePropertyHeader' location.
return rc_NO_MSG_PROPERTY_HEADER; // RETURN
}

bmqu::BlobObjectProxy<MessagePropertyHeader> mpHeader(
&d_blob.object(),
d_blob_p,
position,
d_mphSize,
true, // read flag
Expand Down Expand Up @@ -693,14 +740,14 @@ int MessageProperties::streamInPropertyHeader(Property* property,
name->assign(nameLen, ' ');
bmqu::BlobPosition namePosition;
int rc = bmqu::BlobUtil::findOffsetSafe(&namePosition,
d_blob.object(),
*d_blob_p,
offset);
if (rc) {
return rc_MISSING_PROPERTY_AREA; // RETURN
}

rc = bmqu::BlobUtil::readNBytes(name->begin(),
d_blob.object(),
*d_blob_p,
namePosition,
nameLen);
if (rc) {
Expand Down Expand Up @@ -768,6 +815,9 @@ int MessageProperties::streamIn(const bdlbb::Blob& blob,
return rc_MISSING_MSG_PROPERTY_HEADERS; // RETURN
}

// TODO: This could always build schema on the fly if it is missing to
// avoid extra iteration over properties in subsequent 'getSchema'.

rc = loadProperties(true, isNewStyleProperties);
if (rc != rc_SUCCESS) {
return rc; // RETURN
Expand Down Expand Up @@ -801,13 +851,14 @@ bdld::Datum
MessageProperties::getPropertyRef(const bsl::string& name,
bslma::Allocator* basicAllocator) const
{
PropertyMapConstIter cit = findProperty(name);
if (cit == d_properties.end()) {
PropertyMapIter it = findProperty(name);
if (it == d_properties.end()) {
return bdld::Datum::createError(-1); // RETURN
}
const Property& property = it->second;

const PropertyVariant& v = getPropertyValue(cit->second);
switch (cit->second.d_type) {
const PropertyVariant& v = getPropertyValue(property);
switch (property.d_type) {
case bmqt::PropertyType::e_BOOL:
return bdld::Datum::createBoolean(v.the<bool>());
case bmqt::PropertyType::e_CHAR:
Expand All @@ -820,8 +871,14 @@ MessageProperties::getPropertyRef(const bsl::string& name,
return bdld::Datum::createInteger64(v.the<bsls::Types::Int64>(),
basicAllocator);
case bmqt::PropertyType::e_STRING:
return bdld::Datum::createStringRef(v.the<bsl::string>(),
basicAllocator);
if (v.is<bsl::string>()) {
return bdld::Datum::createStringRef(v.the<bsl::string>(),
basicAllocator);
}
else {
return bdld::Datum::createStringRef(v.the<bsl::string_view>(),
basicAllocator);
}
case bmqt::PropertyType::e_BINARY:
// do not want to use binary
return bdld::Datum::createError(-2);
Expand Down Expand Up @@ -885,6 +942,8 @@ MessageProperties::streamOut(bdlbb::BlobBufferFactory* bufferFactory,
}

new (d_blob.buffer()) bdlbb::Blob(bufferFactory, d_allocator_p);

d_blob_p = d_blob.address();
d_isBlobConstructed = true;

if (0 == numProperties()) {
Expand Down Expand Up @@ -1006,7 +1065,7 @@ MessageProperties::streamOut(bdlbb::BlobBufferFactory* bufferFactory,
msgPropsHdr.reset();
d_isDirty = false;

return d_blob.object();
return *d_blob_p;
}

bsl::ostream& MessageProperties::print(bsl::ostream& stream,
Expand Down
Loading
Loading