Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion src/groups/bmq/bmqa/bmqa_messageeventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <bmqscm_version.h>
// BMQ
#include <bmqa_messageproperties.h>
#include <bmqa_queueid.h>
#include <bmqimp_event.h>
#include <bmqimp_queue.h>
Expand All @@ -27,6 +28,7 @@
#include <bmqt_queueflags.h>

// BDE
#include <ball_log.h>
#include <bsl_memory.h>
#include <bslma_managedptr.h>
#include <bslmf_assert.h>
Expand All @@ -36,6 +38,9 @@
namespace BloombergLP {
namespace bmqa {

// CLASS-SCOPE CATEGORY
BALL_LOG_SET_CLASS_CATEGORY("BMQA.MESSAGEEVENTBUILDER");

namespace {
// Compile time sanity checks
BSLMF_ASSERT(sizeof(Message) == sizeof(MessageImpl));
Expand Down Expand Up @@ -144,6 +149,14 @@ MessageEventBuilder::packMessage(const bmqa::QueueId& queueId)
d_impl.d_guidGenerator_sp->generateGUID(&guid);
builder->setMessageGUID(guid);

// If distributed tracing is enabled, create a span and inject trace
// span into message properties
bsl::shared_ptr<bmqp::MessageProperties> propsWithDT;
bsl::shared_ptr<bmqpi::DTSpan> childSpan;
if (d_impl.d_dtTracer_sp && d_impl.d_dtContext_sp) {
copyPropertiesAndInjectDT(&propsWithDT, &childSpan, builder, queueId);
}

if (queueSpRef->isOldStyle()) {
// Temporary; shall remove after 2nd roll out of "new style" brokers.
rc = builder->packMessageInOldStyle(queueSpRef->id());
Expand All @@ -167,7 +180,10 @@ MessageEventBuilder::packMessage(const bmqa::QueueId& queueId)
}

// Add message related info into the event on success.
msgImplRef.d_event_p->addMessageInfo(queueSpRef, guid, corrId);
msgImplRef.d_event_p->addMessageInfo(queueSpRef,
guid,
corrId,
childSpan);
}

return rc;
Expand Down Expand Up @@ -260,5 +276,63 @@ int MessageEventBuilder::messageEventSize() const
return eventSpRef->putEventBuilder()->eventSize();
}

void MessageEventBuilder::copyPropertiesAndInjectDT(
bsl::shared_ptr<bmqp::MessageProperties>* properties,
bsl::shared_ptr<bmqpi::DTSpan>* span,
bmqp::PutEventBuilder* builder,
const bmqa::QueueId& queueId) const
{
// Get message properties
const bmqp::MessageProperties* props = builder->messageProperties();
bsl::shared_ptr<bmqp::MessageProperties> propsCopy =
bsl::make_shared<bmqp::MessageProperties>(
props ? *props : bmqp::MessageProperties());

// Create a child span of the current span for the PUT operation
bmqpi::DTSpan::Baggage baggage;
bsl::stringstream ss;
bmqt::QueueFlagsUtil::prettyPrint(ss, queueId.flags());
baggage.put("bmq.queue.flags", ss.str());
baggage.put("bmq.queue.uri", queueId.uri().asString());
bsl::shared_ptr<bmqpi::DTSpan> childSpan =
d_impl.d_dtTracer_sp->createChildSpan(d_impl.d_dtContext_sp->span(),
"bmq.message.put",
baggage);
if (!childSpan) {
return;
}

// Serialize the span to inject into message properties
bsl::vector<unsigned char> serializedSpan;
int rc = d_impl.d_dtTracer_sp->serializeSpan(&serializedSpan, childSpan);
if (rc != 0) {
BALL_LOG_WARN
<< "Failed to serialize span for trace span injection, rc: " << rc;
return; // RETURN
}

// Inject the serialized trace span as a reserved binary property
bsl::vector<char> traceSpanData(serializedSpan.begin(),
serializedSpan.end());
rc = propsCopy->setPropertyAsBinary(
bmqp::MessageProperties::k_TRACE_PROPERTY_NAME,
traceSpanData);
if (rc != 0) {
BALL_LOG_WARN
<< "Failed to inject trace span into message properties, rc: "
<< rc;
return; // RETURN
}

// Set the modified properties back to the builder
builder->setMessageProperties(propsCopy.get());

// Return the created span and the properties with injected trace span for
// later use. Span will be stored in correlationId. Properties need to
// be kept alive until it's compressed in PutEventBuilder::packMessage()
*span = childSpan;
*properties = propsCopy;
}

} // close package namespace
} // close enterprise namespace
29 changes: 28 additions & 1 deletion src/groups/bmq/bmqa/bmqa_messageeventbuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@

#include <bmqa_message.h>
#include <bmqa_messageevent.h>
#include <bmqt_resultcode.h>
#include <bmqp_messageproperties.h>
#include <bmqpi_dtcontext.h>
#include <bmqpi_dttracer.h>

// BDE
#include <bsl_memory.h>
Expand All @@ -269,6 +271,8 @@ class QueueId;
}
namespace bmqp {
class MessageGUIDGenerator;
class PutEventBuilder;
class MessageProperties;
}

namespace bmqa {
Expand Down Expand Up @@ -309,6 +313,12 @@ struct MessageEventBuilderImpl {
/// CONTRACT: the stored value is correct every moment when in READ mode,
/// and the value is not guaranteed to be correct when in WRITE mode.
int d_messageEventSizeFinal;

/// Distributed tracing tracer object.
bsl::shared_ptr<bmqpi::DTTracer> d_dtTracer_sp;

/// Distributed tracing context object.
bsl::shared_ptr<bmqpi::DTContext> d_dtContext_sp;
};

// =========================
Expand Down Expand Up @@ -378,6 +388,23 @@ class MessageEventBuilder {
/// value represents the length of entire message event, *including*
/// BlazingMQ wire protocol overhead.
int messageEventSize() const;

private:
// ACCESSORS

/// Create a child distributed tracing span and inject it into the message
/// properties. Load into the specified `properties` a copy of the
/// message properties from the specified `builder` with the trace
/// context injected, and load into the specified `span` the created
/// span. Use the specified `queueId` for queue metadata. If span
/// creation or serialization fails, `properties` and `span` are left
/// unchanged. Note that the returned `properties` must be kept alive
/// until message serialization completes.
void copyPropertiesAndInjectDT(
bsl::shared_ptr<bmqp::MessageProperties>* properties,
bsl::shared_ptr<bmqpi::DTSpan>* span,
bmqp::PutEventBuilder* builder,
const bmqa::QueueId& queueId) const;
};

} // close package namespace
Expand Down
4 changes: 4 additions & 0 deletions src/groups/bmq/bmqa/bmqa_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,10 @@ void Session::loadMessageEventBuilder(MessageEventBuilder* builder)

builderImplRef.d_guidGenerator_sp = d_impl.d_guidGenerator_sp;

// Set distributed tracing tracer and context
builderImplRef.d_dtTracer_sp = d_impl.d_sessionOptions.tracer();
builderImplRef.d_dtContext_sp = d_impl.d_sessionOptions.traceContext();

// Get bmqimp::Event sharedptr from MessageEventBuilderImpl
bsl::shared_ptr<bmqimp::Event>& eventImplSpRef =
reinterpret_cast<bsl::shared_ptr<bmqimp::Event>&>(
Expand Down
72 changes: 72 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1307,8 +1307,8 @@
}

bmqt::QueueOptions options(d_session.d_allocator_p);
options.setMaxUnconfirmedMessages(ci.maxUnconfirmedMessages())

Check warning on line 1310 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / Build Prometheus plugin [ubuntu]

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 1310 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / Build [ubuntu] / Build [ubuntu] ac5a1a0855a709d2e4225ee44a94a30b63e746d4 bmqbrkr bmqtool bmqstoragetool all.it

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 1310 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / UT [c++] / Build [ubuntu] ac5a1a0855a709d2e4225ee44a94a30b63e746d4 all.t

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]
.setMaxUnconfirmedBytes(ci.maxUnconfirmedBytes())

Check warning on line 1311 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / Build Prometheus plugin [ubuntu]

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 1311 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / Build [ubuntu] / Build [ubuntu] ac5a1a0855a709d2e4225ee44a94a30b63e746d4 bmqbrkr bmqtool bmqstoragetool all.it

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 1311 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / UT [c++] / Build [ubuntu] ac5a1a0855a709d2e4225ee44a94a30b63e746d4 all.t

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]
.setConsumerPriority(ci.consumerPriority());

if (queue->options().hasSuspendsOnBadHostHealth()) {
Expand Down Expand Up @@ -3461,6 +3461,10 @@
int msgCount = 0;
while ((rc = confirmIter.next()) == 1) {
++msgCount;

// Remove and finish the span for the message
const bmqt::MessageGUID& guid = confirmIter.message().messageGUID();
d_consumerSpans.erase(guid);
}

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(rc < 0)) {
Expand Down Expand Up @@ -3594,6 +3598,24 @@
}
}

// Add Distributed Trace spans for the messages in the event
while (msgIterator.next()) {
const bmqt::MessageGUID& guid = msgIterator.header().messageGUID();

// Create ONE span per message and store it by GUID
bmqpi::DTSpan::Baggage baggage;
bsl::stringstream ss;
bmqt::QueueFlagsUtil::prettyPrint(ss, msgIterator.header().flags());
baggage.put("bmq.queue.flags", ss.str());
bslma::ManagedPtr<void> span = restoreDTPropertyAndActivateChildSpan(
msgIterator,
"bmq.message.push",
baggage);
if (span) {
d_consumerSpans[guid] = span;
}
}

// Update event stats
d_eventsStats.onEvent(EventsStatsEventType::e_PUSH,
eventByteCount,
Expand Down Expand Up @@ -5603,6 +5625,7 @@
, d_usingSessionEventHandler(eventHandlerCb) // UnspecifiedBool operator ...
, d_messageCorrelationIdContainer(
d_allocators.get("messageCorrelationIdContainer"))
, d_consumerSpans(allocator)
, d_fsmThread(bslmt::ThreadUtil::invalidHandle())
, d_fsmThreadChecker()
, d_fsmEventQueue(k_FSMQUEUE_INITIAL_CAPACITY,
Expand Down Expand Up @@ -6411,7 +6434,7 @@
bsl::vector<bsl::shared_ptr<Queue> > pendingQueues(&localAllocator);
d_queueManager.lookupQueuesByState(&pendingQueues, QueueState::e_PENDING);

d_numPendingReopenQueues = pendingQueues.size();

Check warning on line 6437 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / Build Prometheus plugin [ubuntu]

conversion from ‘bsl::vectorBase<bsl::shared_ptr<BloombergLP::bmqimp::Queue> >::size_type’ {aka ‘long unsigned int’} to ‘int’ may change value [-Wconversion]

Check warning on line 6437 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / Build [ubuntu] / Build [ubuntu] ac5a1a0855a709d2e4225ee44a94a30b63e746d4 bmqbrkr bmqtool bmqstoragetool all.it

conversion from ‘bsl::vectorBase<bsl::shared_ptr<BloombergLP::bmqimp::Queue> >::size_type’ {aka ‘long unsigned int’} to ‘int’ may change value [-Wconversion]

Check warning on line 6437 in src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

View workflow job for this annotation

GitHub Actions / UT [c++] / Build [ubuntu] ac5a1a0855a709d2e4225ee44a94a30b63e746d4 all.t

conversion from ‘bsl::vectorBase<bsl::shared_ptr<BloombergLP::bmqimp::Queue> >::size_type’ {aka ‘long unsigned int’} to ‘int’ may change value [-Wconversion]
if (d_numPendingReopenQueues == 0) {
// Fast path
BALL_LOG_INFO << id() << "No queues need to be reopened.";
Expand Down Expand Up @@ -7069,6 +7092,55 @@
return result;
}

bslma::ManagedPtr<void> BrokerSession::restoreDTPropertyAndActivateChildSpan(
const bmqp::PushMessageIterator& iterator,
const bsl::string_view& operation,
const bmqpi::DTSpan::Baggage& baggage) const
{
// If we pass all the checks and create a GUTS child span, return a
// managed pointer that points to a span scope which will be released upon
// the message confirm, otherwise return an empty managed pointer which
// just to satisfy the type and doesn't do anything.

const bsl::shared_ptr<bmqpi::DTTracer>& tracer = d_sessionOptions.tracer();
const bsl::shared_ptr<bmqpi::DTContext>& context =
d_sessionOptions.traceContext();

if (!tracer || !context) {
return bslma::ManagedPtr<void>(); // RETURN
}

// If we fail at any step, create a span without parent
bsl::shared_ptr<bmqpi::DTSpan> childSpan;
bsl::vector<unsigned char> vecUnsignedChar;

bmqp::MessageProperties properties;
const int rc = iterator.loadMessageProperties(&properties);
if (rc != 0 || !properties.hasProperty(
bmqp::MessageProperties::k_TRACE_PROPERTY_NAME)) {
// If we fail to load message properties or the property
// is not present, create a span without parent
childSpan = tracer->createChildSpan(bsl::shared_ptr<bmqpi::DTSpan>(),
operation,
baggage);
}
else {
const bsl::vector<char>& vecChar = properties.getPropertyAsBinary(
bmqp::MessageProperties::k_TRACE_PROPERTY_NAME);

vecUnsignedChar.resize(vecChar.size());
bsl::copy(vecChar.begin(), vecChar.end(), vecUnsignedChar.begin());

tracer->deserializeAndCreateChildSpan(&childSpan,
vecUnsignedChar,
operation,
baggage);
}

// scope on the childSpan if we can successfully create one
return context->scope(childSpan);
}

int BrokerSession::post(const bdlbb::Blob& eventBlob,
const bsls::TimeInterval& timeout)
{
Expand Down
19 changes: 19 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ class BrokerSession BSLS_CPP11_FINAL {

typedef bsl::unordered_map<int, int> QueueRetransmissionTimeoutMap;

typedef bsl::unordered_map<bmqt::MessageGUID, bslma::ManagedPtr<void> >
MessageGUIDToSpanMap;

class SessionFsm {
private:
BrokerSession& d_session;
Expand Down Expand Up @@ -788,6 +791,13 @@ class BrokerSession BSLS_CPP11_FINAL {
MessageCorrelationIdContainer d_messageCorrelationIdContainer;
// Message correlationId container

MessageGUIDToSpanMap d_consumerSpans;
// Map from MessageGUID to DTSpan for
// consumer-side distributed tracing.
// Stores spans created when PUSH
// messages are received, removed when
// CONFIRM messages are sent.

bslmt::ThreadUtil::Handle d_fsmThread;
// FSM thread handle

Expand Down Expand Up @@ -1484,6 +1494,15 @@ class BrokerSession BSLS_CPP11_FINAL {
const bmqpi::DTSpan::Baggage& baggage =
bmqpi::DTSpan::Baggage()) const;

/// Restore the DTContext from the properties of the given `iterator` and
/// create a child span based on the current span. Return a managed
/// pointer to the current span which might be NULL.
bslma::ManagedPtr<void> restoreDTPropertyAndActivateChildSpan(
const bmqp::PushMessageIterator& iterator,
const bsl::string_view& operation,
const bmqpi::DTSpan::Baggage& baggage =
bmqpi::DTSpan::Baggage()) const;

/// True if the session is started.
bool isStarted() const;

Expand Down
26 changes: 26 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2594,6 +2594,12 @@ class DTTestSpan : public bmqpi::DTSpan {
{
return d_operation;
}

int finish() const BSLS_KEYWORD_OVERRIDE
{
// NOT IMPLEMENTED
return 0;
}
};

class DTTestTracer : public bmqpi::DTTracer {
Expand Down Expand Up @@ -2630,6 +2636,26 @@ class DTTestTracer : public bmqpi::DTTracer {
d_allocator_p);
return result;
}

int
serializeSpan(BSLS_ANNOTATION_UNUSED bsl::vector<unsigned char>* buffer,
BSLS_ANNOTATION_UNUSED const bsl::shared_ptr<bmqpi::DTSpan>&
dtSpan) const BSLS_KEYWORD_OVERRIDE
{
// NOT IMPLEMENTED
return 0;
}

int deserializeAndCreateChildSpan(
BSLS_ANNOTATION_UNUSED bsl::shared_ptr<bmqpi::DTSpan>* child,
BSLS_ANNOTATION_UNUSED const bsl::vector<unsigned char>& buffer,
BSLS_ANNOTATION_UNUSED const bsl::string_view& operation,
BSLS_ANNOTATION_UNUSED const bmqpi::DTSpan::Baggage& baggage) const
BSLS_KEYWORD_OVERRIDE
{
// NOT IMPLEMENTED
return 0;
}
};

} // close unnamed namespace
Expand Down
9 changes: 5 additions & 4 deletions src/groups/bmq/bmqimp/bmqimp_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,10 @@ Event& Event::insertQueue(unsigned int subscriptionId,
return *this;
}

void Event::addMessageInfo(const bsl::shared_ptr<Queue>& queue,
const bmqt::MessageGUID& guid,
const bmqt::CorrelationId& corrId)
void Event::addMessageInfo(const bsl::shared_ptr<Queue>& queue,
const bmqt::MessageGUID& guid,
const bmqt::CorrelationId& corrId,
const bsl::shared_ptr<bmqpi::DTSpan>& dtSpan)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(queue);
Expand All @@ -406,7 +407,7 @@ void Event::addMessageInfo(const bsl::shared_ptr<Queue>& queue,
// correlationId container.
if (!corrId.isUnset()) {
bmqp::QueueId qId(queue->id(), queue->subQueueId());
d_messageCorrelationIdContainer_p->add(guid, corrId, qId);
d_messageCorrelationIdContainer_p->add(guid, corrId, qId, dtSpan);
}
}

Expand Down
Loading
Loading