Skip to content

Adding proper OpenTelemetry integration via. registration helpers and better context propagation #1528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OTel", "projects\Test\OTel\OTel.csproj", "{33E86EAF-C269-4336-8E5C-71418AE360A2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -90,6 +94,14 @@ Global
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.Build.0 = Release|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU
{33E86EAF-C269-4336-8E5C-71418AE360A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{33E86EAF-C269-4336-8E5C-71418AE360A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{33E86EAF-C269-4336-8E5C-71418AE360A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{33E86EAF-C269-4336-8E5C-71418AE360A2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -104,6 +116,7 @@ Global
{F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{33E86EAF-C269-4336-8E5C-71418AE360A2} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client.OpenTelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# RabbitMQ .NET Client - OAuth2
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
<NoWarn>$(NoWarn);CS1591</NoWarn>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyTitle>RabbitMQ OpenTelemetry Integration Package for .NET</AssemblyTitle>
<Authors>VMware</Authors>
<Company>VMware, Inc. or its affiliates.</Company>
<Copyright>Copyright © 2007-2023 VMware, Inc. or its affiliates.</Copyright>
<Description>The RabbitMQ OAuth2 Client Library for .NET enables OAuth2 token refresh for RabbitMQ.Client</Description>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageIcon>icon.png</PackageIcon>
<PackageLicenseExpression>Apache-2.0 OR MPL-2.0</PackageLicenseExpression>
<PackageProjectUrl>https://www.rabbitmq.com/dotnet.html</PackageProjectUrl>
<PackageTags>rabbitmq, amqp, oauth2</PackageTags>
<Product>RabbitMQ</Product>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<RepositoryUrl>https://github.com/rabbitmq/rabbitmq-dotnet-client.git</RepositoryUrl>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<AssemblyOriginatorKeyFile>../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<MinVerTagPrefix>otel-</MinVerTagPrefix>
<MinVerVerbosity>minimal</MinVerVerbosity>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>../../packages</PackageOutputPath>
<PackageReadmeFile>README.md</PackageReadmeFile>
<LangVersion>7.3</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Release' And '$(CI)' == 'true'">
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
<Deterministic>true</Deterministic>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
</PropertyGroup>

<ItemGroup Condition="'$(Configuration)' == 'Release' and '$(SourceRoot)' == ''">
<SourceRoot Include="$(MSBuildThisFileDirectory)/" />
</ItemGroup>

<ItemGroup>
<None Remove="icon.png" />
<Content Include="icon.png" PackagePath="" />
<None Include="README.md" Pack="true" PackagePath="/" />
<InternalsVisibleTo Include="Unit" />
<InternalsVisibleTo Include="Benchmarks" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" PrivateAssets="all" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="all" />
<PackageReference Include="OpenTelemetry.Api" Version="1.7.0" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../RabbitMQ.Client/RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace RabbitMQ.Client.OpenTelemetry
{
public class RabbitMQOpenTelemetryConfiguration
{
public bool PropagateBaggage { get; set; } = true;
public bool UseRoutingKeyAsOperationName { get; set; } = true;
public bool IncludePublishers { get; set; } = true;
public bool IncludeSubscribers { get; set; } = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using OpenTelemetry.Context.Propagation;
using RabbitMQ.Client;
using RabbitMQ.Client.OpenTelemetry;

namespace OpenTelemetry.Trace
{
public static class OpenTelemetryExtensions
{
internal static TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;

public static TracerProviderBuilder AddRabbitMQ(this TracerProviderBuilder builder,
RabbitMQOpenTelemetryConfiguration configuration)
{
if (configuration.PropagateBaggage)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to allow user to provide a preconfigured propagator instance?
E.g. users could want to use B3 propagation or XRay AWS propagator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason bu my lack of knowledge :). Are you then referring to these as alternatives to the tracestate propagator? Are there examples for me to look at?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean you're forcing a specific propagator - a CompositeTextMapPropagator or TraceContextPropagator which is not necessary.

I believe you should just use the DefaultTextMapPropagator.
Users can configure it however they want to, check out https://github.com/open-telemetry/opentelemetry-dotnet/blob/9b246750ab76e94986fc433d5c7af046a137e477/src/OpenTelemetry.Extensions.Propagators/README.md?plain=1#L29

If I understand the intent correctly you want to users to be able to disable baggage propagation - any specific reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean you're forcing a specific propagator - a CompositeTextMapPropagator or TraceContextPropagator which is not necessary.

I believe you should just use the DefaultTextMapPropagator.

Users can configure it however they want to, check out https://github.com/open-telemetry/opentelemetry-dotnet/blob/9b246750ab76e94986fc433d5c7af046a137e477/src/OpenTelemetry.Extensions.Propagators/README.md?plain=1#L29

Ahh gotcha! Saw that the DefaultTextMapPropagator was a Noop so I thought this was the way. I'll change it :)

If I understand the intent correctly you want to users to be able to disable baggage propagation - any specific reason?

Nope. I'll remove it. Does it make sense to then have a composite propagator with the default textmap propagator and the baggage propagator or do users normally configure the baggage propagator themselves?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The otel sdk should set the default one to the composite.

Not completely sure what sets OpenTelemetry.Context.Propagation.Propagators.DefaultTextMapPropagator, but I see it's set after OTel SDK is configured:)

{
s_propagator = new CompositeTextMapPropagator(new TextMapPropagator[]
{
new TraceContextPropagator(), new BaggagePropagator()
});
}
else
{
s_propagator = new TraceContextPropagator();
}

RabbitMQActivitySource.UseRoutingKeyAsOperationName = configuration.UseRoutingKeyAsOperationName;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you feel it's something we should document better /provide more clear guidance on in the messaging semconv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a good idea if people are running into high-cardinality issues with their messaging span names.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Operation the right name here? Since this is focused on OpenTelemetry, I'm wondering if just "SpanName" is a better wording.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTel config: UseRoutingKeyInSpanName
RabbitMQActivitySource property: UseRoutingKeyInActivityName
Make more sense?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a good idea if people are running into high-cardinality issues with their messaging span names.

the span name cardinality requirements are soft (undefined) - open-telemetry/opentelemetry-specification#3534. Things like HTTP route we put there are like middlish-cardinality and I think the same is true about queue names/routing key.

so if cardinality is the only problem, I'd not make span name configurable (worst case users can rename spans with a processor). So again, no strong opinions

RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor;
RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector;
Comment on lines +16 to +17
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this part? Can it be set by default in main package?

I would expect that this method will only call builder.AddSource("RabbitMQ.Client.*") as a convenient way to register it in TracerProviderBuilder. Other parts should be done in the instrumentation library.

Alternative options is to have here the second method:

public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder, Action<RabbitMQSpecificOptions>? configure);

where you can configure other methods externally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenTelemetryContextInjector is OTel specific for Baggage handling, that's pretty much the only reason for the package, because we don't want to pull OpenTelemetry.API dependencies into the main library.


if (configuration.IncludeSubscribers)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any strong opinion here, but want to challenge the need for this flag (and IncludePublishers)
if users want more control, they can always call AddSource themselves.

I guess my broader question is when would users want to enable rabbitmq via an instrumentation library if all they really need to do is call
builder.AddSource("RabbitMQ.Client.*")

Copy link
Contributor Author

@stebet stebet Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any strong opinion here, but want to challenge the need for this flag (and IncludePublishers) if users want more control, they can always call AddSource themselves.

I guess my broader question is when would users want to enable rabbitmq via an instrumentation library if all they really need to do is call builder.AddSource("RabbitMQ.Client.*")

The main reason for the library was to simplify the OTel configuration and setup for users (with the extension method) and to work around the current issue that since OTel and Activity Baggage propagation is different we do not have to make the OpenTelemetry.Api package a dependency on the RabbitMQ library itself.

Also it takes care of configuring the custom propagators to use the OTel propagators instead of the default Activity propagators.

I also just changed the defaults in the latest commits to have both publishers and subscriber event sources enabled, so users can just call .AddRabbitMQInstrumentation() and get both publishers, subscribers and OTel propagation without the need configure anything else by themselves.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm honestly not sure whether the separation of allowing people to turn on/off subscriber or publisher. Is that actually a usecase?

I can understand adding it here if that's a usecase seeing as this is for simplification. My question is, should you allow people to just do one or the other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm honestly not sure whether the separation of allowing people to turn on/off subscriber or publisher. Is that actually a usecase?

I can understand adding it here if that's a usecase seeing as this is for simplification. My question is, should you allow people to just do one or the other?

No idea, but I think it was you and @lmolkova that suggested I separated the ActivitySources in the earlier OTel pr: https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1261/files#r1392469382

I have no preference either way really

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having different names is more future-proof, but also it costs nothing since you can enable sources with wildcard.

I didn't try to push you to remove the instrumentation library, was just trying to understand what'd be the better case for it.

The Activity.Baggage problem is an interesting one. @martinjt do you know if there are any plans to make baggages work better together? I can create issues/ping people/etc - it'd be great if we could solve it for everyone without instrumentations trying to patch it in each case.

Copy link
Contributor Author

@stebet stebet Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't try to push you to remove the instrumentation library, was just trying to understand what'd be the better case for it.

No worries. I didn't take it as such :)

The Activity.Baggage problem is an interesting one. @martinjt do you know if there are any plans to make baggages work better together? I can create issues/ping people/etc - it'd be great if we could solve it for everyone without instrumentations trying to patch it in each case.

Here's an old issue that relates to it (CC @cijothomas): open-telemetry/opentelemetry-dotnet#1842

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Activity baggage vs OTel Baggage is still an unresolved problem! It'll eventually be fixed, though we don't have specifics or timeline. (its not likely coming in this year, there is nothing tracking it in .NET 9 release timeframe.)

{
builder.AddSource(RabbitMQActivitySource.SubscriberSourceName);
}

if (configuration.IncludePublishers)
{
builder.AddSource(RabbitMQActivitySource.PublisherSourceName);
}

return builder;
}

private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props)
{
// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = s_propagator.Extract(default, props.Headers, OpenTelemetryContextGetter);
Baggage.Current = parentContext.Baggage;
return parentContext.ActivityContext;
}

private static IEnumerable<string> OpenTelemetryContextGetter(IDictionary<string, object> carrier, string key)
{
try
{
if (carrier.TryGetValue(key, out object value))
{
byte[] bytes = value as byte[];
return new[] { Encoding.UTF8.GetString(bytes) };
}
}
catch (Exception)
{
//this.logger.LogError(ex, "Failed to extract trace context.");
}

return Enumerable.Empty<string>();
}

private static void OpenTelemetryContextInjector(Activity activity, IDictionary<string, object> props)
{
// Inject the current Activity's context into the message headers.
s_propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter);
}

private static void OpenTelemetryContextSetter(IDictionary<string, object> carrier, string key, string value)
{
carrier[key] = Encoding.UTF8.GetBytes(value);
}
}
}
Binary file added projects/RabbitMQ.Client.OpenTelemetry/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -956,4 +956,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action<System.Diagnostics.Activity, System.Collections.Generic.IDictionary<string, object>>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void
~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
<InternalsVisibleTo Include="Integration" />
<InternalsVisibleTo Include="SequentialIntegration" />
<InternalsVisibleTo Include="Benchmarks" />
<InternalsVisibleTo Include="OTel" />
</ItemGroup>

<ItemGroup>
Expand Down
14 changes: 9 additions & 5 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.client.impl;

Expand Down Expand Up @@ -86,17 +87,20 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
/// <remarks>
/// The publication occurs with mandatory=false and immediate=false.
/// </remarks>
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties,
ReadOnlyMemory<byte> body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
}

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey,
ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

#nullable disable

Expand Down
36 changes: 5 additions & 31 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1036,20 +1036,17 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
try
{
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext)
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
: default;

if (sendActivity != null)
{
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
// TODO cancellation token
await ModelSendAsync(in cmd, in props, body, CancellationToken.None);
}
else
{
// TODO cancellation token
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
}
}
Expand All @@ -1068,20 +1065,6 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
}
}

private static void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value)
{
if (!(propsObj is Dictionary<string, object> headers))
{
return;
}

// Only propagate headers if they haven't already been set
if (!headers.ContainsKey(key))
{
headers[key] = value;
}
}

public async void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey,
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
Expand All @@ -1097,21 +1080,17 @@ public async void BasicPublish<TProperties>(CachedString exchange, CachedString
try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext)
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
: default;

if (sendActivity != null)
{
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
// TODO cancellation token
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
await ModelSendAsync(in cmd, in props, body, CancellationToken.None);
}
else
{
// TODO cancellation token
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
}
}
Expand Down Expand Up @@ -1145,21 +1124,17 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext)
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
: default;

if (sendActivity != null)
{
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
// TODO cancellation token
await ModelSendAsync(in cmd, in props, body, CancellationToken.None);
}
else
{
// TODO cancellation token
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
}
}
Expand Down Expand Up @@ -1868,9 +1843,8 @@ private static BasicProperties PopulateActivityAndPropagateTraceId<TProperties>(
}

var headers = props.Headers ?? new Dictionary<string, object>();

// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties);
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
props.Headers = headers;
return props;
}
Expand Down
Loading