Skip to content

Commit f4d6355

Browse files
authored
set additional tags aligned with OTEL semantics conventions (#12)
* change activity names * set additional tags aligned with OTEL semantics conventions * fix example documentation
1 parent 1cc3f7a commit f4d6355

13 files changed

+504
-28
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ try
6363
consumer.ConsumeWithInstrumentation((result) =>
6464
{
6565
Console.WriteLine(result.Message.Value);
66-
});
66+
}, 2000);
6767
}
6868
catch (ConsumeException e)
6969
{

confluent-kafka-extensions-diagnostics.sln

+6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
Microsoft Visual Studio Solution File, Format Version 12.00
33
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.Kafka.Extensions.Diagnostics", "src\Confluent.Kafka.Extensions.Diagnostics\Confluent.Kafka.Extensions.Diagnostics.csproj", "{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}"
44
EndProject
5+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.Kafka.Extensions.Diagnostics.Tests", "tests\Confluent.Kafka.Extensions.Diagnostics.Tests\Confluent.Kafka.Extensions.Diagnostics.Tests.csproj", "{D51686E0-22EE-41FC-981D-A818D76E8B02}"
6+
EndProject
57
Global
68
GlobalSection(SolutionConfigurationPlatforms) = preSolution
79
Debug|Any CPU = Debug|Any CPU
@@ -12,5 +14,9 @@ Global
1214
{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}.Debug|Any CPU.Build.0 = Debug|Any CPU
1315
{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}.Release|Any CPU.ActiveCfg = Release|Any CPU
1416
{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}.Release|Any CPU.Build.0 = Release|Any CPU
17+
{D51686E0-22EE-41FC-981D-A818D76E8B02}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
18+
{D51686E0-22EE-41FC-981D-A818D76E8B02}.Debug|Any CPU.Build.0 = Debug|Any CPU
19+
{D51686E0-22EE-41FC-981D-A818D76E8B02}.Release|Any CPU.ActiveCfg = Release|Any CPU
20+
{D51686E0-22EE-41FC-981D-A818D76E8B02}.Release|Any CPU.Build.0 = Release|Any CPU
1521
EndGlobalSection
1622
EndGlobal

src/Confluent.Kafka.Extensions.Diagnostics/ActivityDiagnosticsHelper.cs

+66-14
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ internal static class ActivityDiagnosticsHelper
1616
{
1717
try
1818
{
19-
Activity? activity = ActivitySource.StartActivity("Confluent.Kafka.Produce", ActivityKind.Producer,
20-
default(ActivityContext), ActivityTags(partition)!);
19+
Activity? activity = ActivitySource.StartActivity(
20+
$"{partition.Topic} publish", ActivityKind.Producer,
21+
default(ActivityContext), ProducerActivityTags(partition));
2122

2223
if (activity == null)
2324
return null;
@@ -50,13 +51,40 @@ internal static class ActivityDiagnosticsHelper
5051
}
5152
}
5253

53-
internal static Activity? StartConsumeActivity<TKey, TValue>(TopicPartition partition,
54-
Message<TKey, TValue> message)
54+
internal static void UpdateActivityTags<TKey, TValue>(DeliveryResult<TKey, TValue> deliveryResult,
55+
Activity activity)
56+
{
57+
try
58+
{
59+
var activityStatus = deliveryResult.Status switch
60+
{
61+
PersistenceStatus.Persisted => ActivityStatusCode.Ok,
62+
PersistenceStatus.NotPersisted => ActivityStatusCode.Error,
63+
_ => ActivityStatusCode.Unset
64+
};
65+
66+
activity.SetStatus(activityStatus);
67+
if (activityStatus == ActivityStatusCode.Ok)
68+
{
69+
activity.SetTag("messaging.kafka.destination.partition", deliveryResult.Partition.Value.ToString());
70+
activity.SetTag("messaging.kafka.message.offset", deliveryResult.Offset.Value.ToString());
71+
}
72+
}
73+
catch
74+
{
75+
// ignore
76+
}
77+
}
78+
79+
internal static Activity? StartConsumeActivity<TKey, TValue>(ConsumeResult<TKey, TValue> consumerResult,
80+
string memberId)
5581
{
5682
try
5783
{
58-
var activity = ActivitySource.CreateActivity("Confluent.Kafka.Consume", ActivityKind.Consumer,
59-
default(ActivityContext), ActivityTags(partition)!);
84+
var message = consumerResult.Message;
85+
var activity = ActivitySource.CreateActivity(
86+
$"{consumerResult.Topic} process", ActivityKind.Consumer,
87+
default(ActivityContext), ConsumerActivityTags(consumerResult, memberId));
6088

6189
if (activity != null)
6290
{
@@ -98,19 +126,43 @@ private static void SetActivityTags<TKey, TValue>(Activity activity, Message<TKe
98126
{
99127
if (message.Key != null)
100128
{
101-
activity.SetTag("messaging.kafka.message_key", message.Key.ToString());
129+
activity.SetTag("messaging.kafka.message.key", message.Key.ToString());
102130
}
103131
}
104132

105-
private static IEnumerable<KeyValuePair<string, object>> ActivityTags(TopicPartition partition)
133+
private static IEnumerable<KeyValuePair<string, object?>> ProducerActivityTags(TopicPartition partition)
134+
{
135+
var list = ActivityTags(partition, "publish");
136+
137+
list.Add(new KeyValuePair<string, object?>("messaging.destination.kind", "topic"));
138+
list.Add(new KeyValuePair<string, object?>("messaging.destination.name", partition.Topic));
139+
140+
return list;
141+
}
142+
143+
private static IEnumerable<KeyValuePair<string, object?>> ConsumerActivityTags<TKey, TValue>(
144+
ConsumeResult<TKey, TValue> consumerResult, string memberId)
145+
{
146+
IList<KeyValuePair<string, object?>> list = ActivityTags(consumerResult.TopicPartition, "process");
147+
148+
// messaging.consumer.id - For Kafka, set it to {messaging.kafka.consumer.group} - {messaging.kafka.client_id},
149+
// if both are present, or only messaging.kafka.consumer.group
150+
list.Add(new("messaging.source.kind", "topic"));
151+
list.Add(new("messaging.source.name", consumerResult.Topic));
152+
list.Add(new("messaging.kafka.source.partition", consumerResult.Partition.Value.ToString()));
153+
list.Add(new("messaging.kafka.message.offset", consumerResult.Offset.Value.ToString()));
154+
list.Add(new("messaging.kafka.client_id", memberId));
155+
156+
// messaging.kafka.consumer.group - there is no way to access this information from the consumer
157+
158+
return list;
159+
}
160+
161+
private static IList<KeyValuePair<string, object?>> ActivityTags(TopicPartition partition, string operation)
106162
{
107-
return new[]
163+
return new List<KeyValuePair<string, object?>>()
108164
{
109-
new KeyValuePair<string, object>("messaging.system", "kafka"),
110-
new KeyValuePair<string, object>("messaging.destination", partition.Topic),
111-
new KeyValuePair<string, object>("messaging.destination_kind", "topic"), new KeyValuePair<string, object>(
112-
"messaging.kafka.partition",
113-
partition.Partition.ToString())
165+
new("messaging.system", "kafka"), new("messaging.operation", operation)
114166
};
115167
}
116168
}

src/Confluent.Kafka.Extensions.Diagnostics/ConsumerExtensions.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public static async Task ConsumeWithInstrumentation<TKey, TValue>(this IConsumer
1616

1717
var result = consumer.Consume(cancellationToken);
1818

19-
var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message);
19+
var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result, consumer.MemberId);
2020

2121
try
2222
{
@@ -41,7 +41,7 @@ public static async Task<TResult> ConsumeWithInstrumentation<TKey, TValue, TResu
4141

4242
var result = consumer.Consume(cancellationToken);
4343

44-
var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message);
44+
var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result, consumer.MemberId);
4545

4646
try
4747
{
@@ -64,7 +64,7 @@ public static void ConsumeWithInstrumentation<TKey, TValue>(this IConsumer<TKey,
6464

6565
var result = consumer.Consume(millisecondsTimeout);
6666

67-
var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message);
67+
var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result, consumer.MemberId);
6868

6969
try
7070
{

src/Confluent.Kafka.Extensions.Diagnostics/InstrumentedProducer.cs

+18-10
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,15 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
2929

3030
try
3131
{
32-
// todo: get delivery result and put it into the activity
33-
return await _producerImplementation.ProduceAsync(topicPartition, message, cancellationToken)
32+
var result = await _producerImplementation.ProduceAsync(topicPartition, message, cancellationToken)
3433
.ConfigureAwait(false);
34+
35+
if (activity != null)
36+
{
37+
ActivityDiagnosticsHelper.UpdateActivityTags(result, activity);
38+
}
39+
40+
return result;
3541
}
3642
finally
3743
{
@@ -49,14 +55,16 @@ public void Produce(
4955
{
5056
var activity = ActivityDiagnosticsHelper.StartProduceActivity(topicPartition, message);
5157

52-
try
53-
{
54-
_producerImplementation.Produce(topicPartition, message, deliveryHandler);
55-
}
56-
finally
57-
{
58-
activity?.Stop();
59-
}
58+
Action<DeliveryReport<TKey, TValue>>? handler = activity != null
59+
? report =>
60+
{
61+
ActivityDiagnosticsHelper.UpdateActivityTags(report, activity);
62+
activity.Stop();
63+
deliveryHandler?.Invoke(report);
64+
}
65+
: deliveryHandler;
66+
67+
_producerImplementation.Produce(topicPartition, message, handler);
6068
}
6169

6270
public int Poll(TimeSpan timeout) => _producerImplementation.Poll(timeout);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
using Xunit.Extensions.AssemblyFixture;
3+
4+
[assembly: ExcludeFromCodeCoverage]
5+
[assembly: TestFramework(AssemblyFixtureFramework.TypeName, AssemblyFixtureFramework.AssemblyName)]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net6.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
8+
<IsPackable>false</IsPackable>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<PackageReference Include="Confluent.Kafka" Version="2.0.2"/>
13+
<PackageReference Include="FluentAssertions" Version="6.10.0"/>
14+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0"/>
15+
<PackageReference Include="Snapshooter.Xunit" Version="0.13.0"/>
16+
<PackageReference Include="TestEnvironment.Docker.Containers.Kafka" Version="2.1.4"/>
17+
<PackageReference Include="xunit" Version="2.4.2"/>
18+
<PackageReference Include="Xunit.Extensions.AssemblyFixture" Version="2.4.1"/>
19+
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
20+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
21+
<PrivateAssets>all</PrivateAssets>
22+
</PackageReference>
23+
<PackageReference Include="coverlet.collector" Version="3.2.0">
24+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
25+
<PrivateAssets>all</PrivateAssets>
26+
</PackageReference>
27+
</ItemGroup>
28+
29+
<ItemGroup>
30+
<ProjectReference Include="..\..\src\Confluent.Kafka.Extensions.Diagnostics\Confluent.Kafka.Extensions.Diagnostics.csproj"/>
31+
</ItemGroup>
32+
33+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using TestEnvironment.Docker;
2+
using TestEnvironment.Docker.Containers.Kafka;
3+
4+
namespace Confluent.Kafka.Extensions.Diagnostics.Tests;
5+
6+
public class EnvironmentFixture : IAsyncLifetime, IAsyncDisposable
7+
{
8+
private const string KafkaContainerName = "kafka-diagnostics";
9+
10+
private readonly IDockerEnvironment _dockerEnvironment;
11+
12+
13+
public EnvironmentFixture()
14+
{
15+
_dockerEnvironment = new DockerEnvironmentBuilder()
16+
.AddKafkaContainer(p => p with
17+
{
18+
Name = KafkaContainerName, ImageName = "dougdonohoe/kafka-zookeeper", Tag = "2.6.0"
19+
})
20+
.Build();
21+
}
22+
23+
public string KafkaBootstrapServers
24+
{
25+
get
26+
{
27+
var kafkaContainer = _dockerEnvironment.GetContainer<KafkaContainer>(KafkaContainerName);
28+
29+
if (kafkaContainer == null)
30+
throw new InvalidOperationException("Kafka container not found");
31+
32+
return kafkaContainer.GetUrl();
33+
}
34+
}
35+
36+
public async Task InitializeAsync()
37+
{
38+
await _dockerEnvironment.UpAsync();
39+
}
40+
41+
async Task IAsyncLifetime.DisposeAsync()
42+
{
43+
await DisposeAsync();
44+
}
45+
46+
public async ValueTask DisposeAsync()
47+
{
48+
await _dockerEnvironment.DownAsync();
49+
}
50+
}

0 commit comments

Comments
 (0)