Skip to content

Commit 935ef42

Browse files
authored
add instrumentation for consumer (#1)
1 parent 7a0dd8d commit 935ef42

6 files changed

+190
-31
lines changed

README.md

+62-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
![GitHub Actions Badge](https://github.com/vhatsura/confluent-kafka-extensions-diagnostics/actions/workflows/continuous.integration.yml/badge.svg)
44
[![NuGet Badge](https://buildstats.info/nuget/Confluent.Kafka.Extensions.Diagnostics)](https://www.nuget.org/packages/Confluent.Kafka.Extensions.Diagnostics/)
55

6+
The `Confluent.Kafka.Extensions.Diagnostics` package enables instrumentation of the `Confluent.Kafka` library
7+
via [Activity API](https://docs.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs).
8+
69
## Installation
710

811
```powershell
@@ -11,7 +14,65 @@ Install-Package Confluent.Kafka.Extensions.Diagnostics
1114

1215
## Usage
1316

17+
### Producer
18+
19+
Producer instrumentation is done via wrapper class and, for this reason, the producer usage is not needed to be rewritten. However,
20+
to enable producer instrumentation, `BuildWithInstrumentation` method should be called on the producer builder instead of `Build`.
21+
After that, all produce calls (sync and async) will be instrumented.
22+
1423
```csharp
24+
using Confluent.Kafka;
25+
using Confluent.Kafka.Extensions.Diagnostics;
26+
27+
28+
using var producer =
29+
new ProducerBuilder<Null, string>(new ProducerConfig(new ClientConfig { BootstrapServers = "localhost:9092" }))
30+
.SetKeySerializer(Serializers.Null)
31+
.SetValueSerializer(Serializers.Utf8)
32+
.BuildWithInstrumentation();
33+
34+
await producer.ProduceAsync("topic", new Message<Null, string> { Value = "Hello World!" });
35+
1536
```
1637

17-
## Roadmap
38+
### Consumer
39+
40+
Unfortunately, consumer interface of `Confluent.Kafka` library is not very flexible. Therefore, the instrumentation is implemented
41+
via an extension method on the consumer itself. For this reason, the consumer usage should be rewritten as follows:
42+
43+
```csharp
44+
using Confluent.Kafka;
45+
using Confluent.Kafka.Extensions.Diagnostics;
46+
47+
using var consumer = new ConsumerBuilder<Ignore, string>(
48+
new ConsumerConfig(new ClientConfig { BootstrapServers = "localhost:9092" })
49+
{
50+
GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest
51+
})
52+
.SetValueDeserializer(Deserializers.Utf8)
53+
.Build();
54+
55+
consumer.Subscribe("topic");
56+
57+
try
58+
{
59+
while (true)
60+
{
61+
try
62+
{
63+
consumer.ConsumeWithInstrumentation((result) =>
64+
{
65+
Console.WriteLine(result.Message.Value);
66+
});
67+
}
68+
catch (ConsumeException e)
69+
{
70+
Console.WriteLine($"Error occured: {e.Error.Reason}");
71+
}
72+
}
73+
}
74+
catch (OperationCanceledException)
75+
{
76+
consumer.Close();
77+
}
78+
```
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,30 @@
1-
using System.Diagnostics;
1+
using System.Diagnostics;
22
using System.Text;
33

44
namespace Confluent.Kafka.Extensions.Diagnostics;
55

66
internal static class ActivityDiagnosticsHelper
77
{
88
private const string ActivitySourceName = "Confluent.Kafka.Extensions.Diagnostics";
9+
private const string TraceParentHeaderName = "traceparent";
10+
private const string TraceStateHeaderName = "tracestate";
911

1012
private static ActivitySource ActivitySource { get; } = new(ActivitySourceName);
1113

12-
internal static Activity? Start<TKey, TValue>(TopicPartition partition, Message<TKey, TValue> message)
14+
internal static Activity? StartProduceActivity<TKey, TValue>(TopicPartition partition,
15+
Message<TKey, TValue> message)
1316
{
1417
try
1518
{
16-
Activity? activity = ActivitySource.StartActivity("Confluent.Kafka.Produce", ActivityKind.Client,
17-
default(ActivityContext),
18-
new[]
19-
{
20-
new KeyValuePair<string, object>("messaging.system", "kafka"),
21-
new KeyValuePair<string, object>("messaging.destination", partition.Topic),
22-
new KeyValuePair<string, object>("messaging.destination_kind", "topic"),
23-
new KeyValuePair<string, object>("messaging.kafka.partition", partition.Partition.ToString())
24-
}!);
25-
26-
if (activity == null) return null;
19+
Activity? activity = ActivitySource.StartActivity("Confluent.Kafka.Produce", ActivityKind.Producer,
20+
default(ActivityContext), ActivityTags(partition)!);
21+
22+
if (activity == null)
23+
return null;
2724

2825
if (activity.IsAllDataRequested)
2926
{
30-
if (message.Key != null)
31-
{
32-
activity.SetTag("messaging.kafka.message_key", message.Key.ToString());
33-
}
34-
35-
if (message.Value != null)
36-
{
37-
int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString()!);
38-
activity.AddTag("messaging.message_payload_size_bytes", messagePayloadBytes.ToString());
39-
}
27+
SetActivityTags(activity, message);
4028
}
4129

4230
if (message.Headers == null)
@@ -45,12 +33,12 @@ internal static class ActivityDiagnosticsHelper
4533
}
4634

4735
if (activity.Id != null)
48-
message.Headers.Add("traceparent", Encoding.UTF8.GetBytes(activity.Id));
36+
message.Headers.Add(TraceParentHeaderName, Encoding.UTF8.GetBytes(activity.Id));
4937

5038
var tracestateStr = activity.Context.TraceState;
5139
if (tracestateStr?.Length > 0)
5240
{
53-
message.Headers.Add("tracestate", Encoding.UTF8.GetBytes(tracestateStr));
41+
message.Headers.Add(TraceStateHeaderName, Encoding.UTF8.GetBytes(tracestateStr));
5442
}
5543

5644
return activity;
@@ -61,4 +49,60 @@ internal static class ActivityDiagnosticsHelper
6149
return null;
6250
}
6351
}
52+
53+
internal static Activity? StartConsumeActivity<TKey, TValue>(TopicPartition partition,
54+
Message<TKey, TValue> message)
55+
{
56+
var activity = ActivitySource.CreateActivity("Confluent.Kafka.Consume", ActivityKind.Consumer,
57+
default(ActivityContext), ActivityTags(partition)!);
58+
59+
if (activity != null)
60+
{
61+
var traceParentHeader = message.Headers?.FirstOrDefault(x => x.Key == TraceParentHeaderName);
62+
var traceStateHeader = message.Headers?.FirstOrDefault(x => x.Key == TraceStateHeaderName);
63+
64+
var traceParent = traceParentHeader != null
65+
? Encoding.UTF8.GetString(traceParentHeader.GetValueBytes())
66+
: null;
67+
var traceState = traceStateHeader != null
68+
? Encoding.UTF8.GetString(traceStateHeader.GetValueBytes())
69+
: null;
70+
71+
if (ActivityContext.TryParse(traceParent, traceState, out var activityContext))
72+
{
73+
activity.SetParentId(activityContext.TraceId, activityContext.SpanId, activityContext.TraceFlags);
74+
activity.TraceStateString = activityContext.TraceState;
75+
}
76+
77+
if (activity.IsAllDataRequested)
78+
{
79+
SetActivityTags(activity, message);
80+
}
81+
82+
activity.Start();
83+
}
84+
85+
86+
return activity;
87+
}
88+
89+
private static void SetActivityTags<TKey, TValue>(Activity activity, Message<TKey, TValue> message)
90+
{
91+
if (message.Key != null)
92+
{
93+
activity.SetTag("messaging.kafka.message_key", message.Key.ToString());
94+
}
95+
}
96+
97+
private static IEnumerable<KeyValuePair<string, object>> ActivityTags(TopicPartition partition)
98+
{
99+
return new[]
100+
{
101+
new KeyValuePair<string, object>("messaging.system", "kafka"),
102+
new KeyValuePair<string, object>("messaging.destination", partition.Topic),
103+
new KeyValuePair<string, object>("messaging.destination_kind", "topic"), new KeyValuePair<string, object>(
104+
"messaging.kafka.partition",
105+
partition.Partition.ToString())
106+
};
107+
}
64108
}

src/Confluent.Kafka.Extensions.Diagnostics/Confluent.Kafka.Extensions.Diagnostics.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
</PropertyGroup>
2626

2727
<ItemGroup>
28-
<PackageReference Include="Confluent.Kafka" Version="[1.6.2, 2.0.0)"/>
28+
<PackageReference Include="Confluent.Kafka" Version="[1.6.2, 2.0.0)" />
2929
<PackageReference Include="GitVersion.MsBuild" Version="5.10.3">
3030
<PrivateAssets>all</PrivateAssets>
3131
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
namespace Confluent.Kafka.Extensions.Diagnostics;
2+
3+
/// <summary>
4+
/// Extension methods for <see cref="IConsumer{TKey,TValue}" />.
5+
/// </summary>
6+
public static class ConsumerExtensions
7+
{
8+
/// <summary>
9+
/// Consumes a message from the topic with instrumentation.
10+
/// </summary>
11+
public static async Task ConsumeWithInstrumentation<TKey, TValue>(this IConsumer<TKey, TValue> consumer,
12+
Func<ConsumeResult<TKey, TValue>, CancellationToken, Task> action, CancellationToken cancellationToken)
13+
{
14+
var result = consumer.Consume(cancellationToken);
15+
16+
var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message);
17+
18+
try
19+
{
20+
await action(result, cancellationToken);
21+
}
22+
finally
23+
{
24+
activity?.Stop();
25+
}
26+
}
27+
28+
/// <summary>
29+
/// Consumes a message from the topic with instrumentation.
30+
/// </summary>
31+
public static void ConsumeWithInstrumentation<TKey, TValue>(this IConsumer<TKey, TValue> consumer,
32+
Action<ConsumeResult<TKey, TValue>> action, int millisecondsTimeout)
33+
{
34+
var result = consumer.Consume(millisecondsTimeout);
35+
36+
var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message);
37+
38+
try
39+
{
40+
action(result);
41+
}
42+
finally
43+
{
44+
activity?.Stop();
45+
}
46+
}
47+
}

src/Confluent.Kafka.Extensions.Diagnostics/InstrumentedProducer.cs

+5-4
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
2525
TopicPartition topicPartition, Message<TKey, TValue> message,
2626
CancellationToken cancellationToken = new CancellationToken())
2727
{
28-
var activity = ActivityDiagnosticsHelper.Start(topicPartition, message);
28+
var activity = ActivityDiagnosticsHelper.StartProduceActivity(topicPartition, message);
2929

3030
try
3131
{
32+
// todo: get delivery result and put it into the activity
3233
return await _producerImplementation.ProduceAsync(topicPartition, message, cancellationToken);
3334
}
3435
finally
@@ -38,14 +39,14 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
3839
}
3940

4041
public void Produce(
41-
string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null) =>
42+
string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) =>
4243
Produce(new TopicPartition(topic, Partition.Any), message, deliveryHandler);
4344

4445
public void Produce(
4546
TopicPartition topicPartition, Message<TKey, TValue> message,
46-
Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
47+
Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)
4748
{
48-
var activity = ActivityDiagnosticsHelper.Start(topicPartition, message);
49+
var activity = ActivityDiagnosticsHelper.StartProduceActivity(topicPartition, message);
4950

5051
try
5152
{

src/Confluent.Kafka.Extensions.Diagnostics/ProducerBuilderExtensions.cs

+6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
namespace Confluent.Kafka.Extensions.Diagnostics;
22

3+
/// <summary>
4+
/// Extension methods for <see cref="ProducerBuilder{TKey,TValue}" />.
5+
/// </summary>
36
public static class ProducerBuilderExtensions
47
{
8+
/// <summary>
9+
/// Builds a new instrumented instance of producer.
10+
/// </summary>
511
public static IProducer<TKey, TValue> BuildWithInstrumentation<TKey, TValue>(
612
this ProducerBuilder<TKey, TValue> producerBuilder)
713
{

0 commit comments

Comments
 (0)