Skip to content

Commit 0ac9890

Browse files
authored
feat: add support for KafkaDependentProducer instrumentation (#34)
1 parent e8fa90c commit 0ac9890

File tree

4 files changed

+195
-1
lines changed

4 files changed

+195
-1
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace Confluent.Kafka.Extensions.Diagnostics;
2+
3+
/// <summary>
4+
/// Extension methods for <see cref="DependentProducerBuilder{TKey,TValue}" />.
5+
/// </summary>
6+
public static class DependentProducerBuilderExtensions
7+
{
8+
/// <summary>
9+
/// Builds a new instrumented instance of producer.
10+
/// </summary>
11+
public static IProducer<TKey, TValue> BuildWithInstrumentation<TKey, TValue>(
12+
this DependentProducerBuilder<TKey, TValue> producerBuilder
13+
)
14+
{
15+
if (producerBuilder == null) throw new ArgumentNullException(nameof(producerBuilder));
16+
17+
return new InstrumentedProducer<TKey, TValue>(producerBuilder.Build());
18+
}
19+
}

tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public sealed class KafkaDiagnosticsTests : IAssemblyFixture<EnvironmentFixture>
1010
{
1111
private readonly EnvironmentFixture _environmentFixture;
1212
private readonly IProducer<string, string> _producer;
13+
private readonly IProducer<string, string> _dependentProducer;
1314
private readonly IConsumer<string, string> _consumer;
1415

1516
private readonly Func<MatchOptions, MatchOptions> _matchOptions;
@@ -20,15 +21,22 @@ public KafkaDiagnosticsTests(EnvironmentFixture environmentFixture)
2021

2122
var kafkaBootstrapServers = _environmentFixture.KafkaBootstrapServers;
2223
var kafkaClientConfig = new ClientConfig { BootstrapServers = kafkaBootstrapServers };
24+
2325
_producer = new ProducerBuilder<string, string>(new ProducerConfig(kafkaClientConfig))
2426
.SetKeySerializer(Serializers.Utf8)
2527
.SetValueSerializer(Serializers.Utf8)
2628
.BuildWithInstrumentation();
2729

30+
_dependentProducer = new DependentProducerBuilder<string, string>(_producer.Handle)
31+
.SetKeySerializer(Serializers.Utf8)
32+
.SetValueSerializer(Serializers.Utf8)
33+
.BuildWithInstrumentation();
34+
2835
_consumer = new ConsumerBuilder<string, string>(
2936
new ConsumerConfig(new ClientConfig { BootstrapServers = kafkaBootstrapServers })
3037
{
31-
GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest
38+
GroupId = "group",
39+
AutoOffsetReset = AutoOffsetReset.Earliest
3240
})
3341
.SetKeyDeserializer(Deserializers.Utf8)
3442
.SetValueDeserializer(Deserializers.Utf8)
@@ -38,6 +46,55 @@ public KafkaDiagnosticsTests(EnvironmentFixture environmentFixture)
3846
.ExcludeField("RootId").ExcludeField("TagObjects");
3947
}
4048

49+
[Fact]
50+
public async Task DependentProduceAsync()
51+
{
52+
// Arrange
53+
var snapshotName = Snapshot.FullName();
54+
using var listener = CreateActivityListener(activity =>
55+
{
56+
// Assert
57+
activity.Should().MatchSnapshot(snapshotName, _matchOptions);
58+
});
59+
ActivitySource.AddActivityListener(listener);
60+
61+
// Act
62+
await _dependentProducer.ProduceAsync("dependent_produce_async_topic",
63+
new Message<string, string> { Key = "test", Value = "Hello World!" });
64+
}
65+
66+
[Fact]
67+
public async Task DependentProduce()
68+
{
69+
// Arrange
70+
Activity? reportedActivity = null;
71+
using var listener = CreateActivityListener(activity =>
72+
{
73+
reportedActivity = activity;
74+
});
75+
ActivitySource.AddActivityListener(listener);
76+
77+
var delivered = false;
78+
79+
// Act
80+
_dependentProducer.Produce("dependent_produce_topic",
81+
new Message<string, string> { Key = "test", Value = "Hello World!" }, report =>
82+
{
83+
delivered = true;
84+
});
85+
86+
int leftAttempts = 10;
87+
do
88+
{
89+
await Task.Delay(TimeSpan.FromMilliseconds(500));
90+
} while (!delivered && leftAttempts-- > 0);
91+
92+
delivered.Should().BeTrue();
93+
reportedActivity.Should().NotBeNull();
94+
reportedActivity.Should().MatchSnapshot(_matchOptions);
95+
}
96+
97+
4198
[Fact]
4299
public async Task ProduceAsync()
43100
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
{
2+
"Status": "Ok",
3+
"StatusDescription": null,
4+
"HasRemoteParent": false,
5+
"Kind": "Producer",
6+
"OperationName": "produce_topic publish",
7+
"DisplayName": "produce_topic publish",
8+
"Source": {
9+
"Name": "Confluent.Kafka.Extensions.Diagnostics",
10+
"Version": ""
11+
},
12+
"Parent": null,
13+
"ParentId": null,
14+
"Tags": [
15+
{
16+
"Key": "messaging.system",
17+
"Value": "kafka"
18+
},
19+
{
20+
"Key": "messaging.operation",
21+
"Value": "publish"
22+
},
23+
{
24+
"Key": "messaging.destination.kind",
25+
"Value": "topic"
26+
},
27+
{
28+
"Key": "messaging.destination.name",
29+
"Value": "produce_topic"
30+
},
31+
{
32+
"Key": "messaging.kafka.destination.partition",
33+
"Value": "0"
34+
},
35+
{
36+
"Key": "messaging.kafka.message.offset",
37+
"Value": "0"
38+
}
39+
],
40+
"Events": [],
41+
"Links": [],
42+
"Baggage": [],
43+
"Context": {
44+
"TraceId": {},
45+
"SpanId": {},
46+
"TraceFlags": "None",
47+
"TraceState": null,
48+
"IsRemote": false
49+
},
50+
"TraceStateString": null,
51+
"SpanId": {},
52+
"TraceId": {},
53+
"Recorded": false,
54+
"IsAllDataRequested": false,
55+
"ActivityTraceFlags": "None",
56+
"ParentSpanId": {},
57+
"IsStopped": true,
58+
"IdFormat": "W3C"
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
{
2+
"Status": "Ok",
3+
"StatusDescription": null,
4+
"HasRemoteParent": false,
5+
"Kind": "Producer",
6+
"OperationName": "produce_async_topic publish",
7+
"DisplayName": "produce_async_topic publish",
8+
"Source": {
9+
"Name": "Confluent.Kafka.Extensions.Diagnostics",
10+
"Version": ""
11+
},
12+
"Parent": null,
13+
"ParentId": null,
14+
"Tags": [
15+
{
16+
"Key": "messaging.system",
17+
"Value": "kafka"
18+
},
19+
{
20+
"Key": "messaging.operation",
21+
"Value": "publish"
22+
},
23+
{
24+
"Key": "messaging.destination.kind",
25+
"Value": "topic"
26+
},
27+
{
28+
"Key": "messaging.destination.name",
29+
"Value": "produce_async_topic"
30+
},
31+
{
32+
"Key": "messaging.kafka.destination.partition",
33+
"Value": "0"
34+
},
35+
{
36+
"Key": "messaging.kafka.message.offset",
37+
"Value": "1"
38+
}
39+
],
40+
"Events": [],
41+
"Links": [],
42+
"Baggage": [],
43+
"Context": {
44+
"TraceId": {},
45+
"SpanId": {},
46+
"TraceFlags": "None",
47+
"TraceState": null,
48+
"IsRemote": false
49+
},
50+
"TraceStateString": null,
51+
"SpanId": {},
52+
"TraceId": {},
53+
"Recorded": false,
54+
"IsAllDataRequested": false,
55+
"ActivityTraceFlags": "None",
56+
"ParentSpanId": {},
57+
"IsStopped": true,
58+
"IdFormat": "W3C"
59+
}

0 commit comments

Comments
 (0)