Skip to content

Commit

Permalink
Filter out consumer/producer specific config for admin client (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-quix authored Apr 26, 2024
1 parent 9d4b44c commit 5f7d6bd
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
6 changes: 3 additions & 3 deletions builds/csharp/nuget/build_nugets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import fileinput
from typing import List

version = "0.7.1.0"
informal_version = "0.7.1.0"
nuget_version = "0.7.1.0"
version = "0.7.2.0"
informal_version = "0.7.2.0-dev1"
nuget_version = "0.7.2.0-dev1"


def updatecsproj(projfilepath):
Expand Down
14 changes: 13 additions & 1 deletion src/QuixStreams.Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage)
{
// Log nothing
}
adminClient = new AdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build();
adminClient = GetAdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build();
}
var metadata = adminClient.GetMetadata(partition.Topic, TimeSpan.FromSeconds(5));
if (metadata == null)
Expand Down Expand Up @@ -342,6 +342,18 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage)
this.logger.LogTrace("[{0}] Open finished", this.configId);
}
}

private AdminClientBuilder GetAdminClientBuilder(ConsumerConfig config)
{
var filteredConfig = config.Where(prop =>
!prop.Key.StartsWith("dotnet.producer.") &&
!prop.Key.StartsWith("dotnet.consumer."))
.ToDictionary(k => k.Key, v => v.Value);

var adminConfig = new ConsumerConfig(filteredConfig);

return new AdminClientBuilder(adminConfig);
}

private void AutomaticOffsetsCommittedHandler(IConsumer<byte[], byte[]> consumer, CommittedOffsets offsets)
{
Expand Down
16 changes: 14 additions & 2 deletions src/QuixStreams.Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage)
// Log nothing
}

using var adminClient = new AdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build();
using var adminClient = GetAdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build();

var metadata = adminClient.GetMetadata(topic, TimeSpan.FromSeconds(10));
if (metadata == null)
Expand All @@ -182,6 +182,18 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage)
}
}

private AdminClientBuilder GetAdminClientBuilder(ProducerConfig config)
{
var filteredConfig = config.Where(prop =>
!prop.Key.StartsWith("dotnet.producer.") &&
!prop.Key.StartsWith("dotnet.consumer."))
.ToDictionary(k => k.Key, v => v.Value);

var adminConfig = new ProducerConfig(filteredConfig);

return new AdminClientBuilder(adminConfig);
}

private async Task UpdateMaxMessageSize(TimeSpan maxWait)
{
var max = DateTime.UtcNow.Add(maxWait);
Expand All @@ -192,7 +204,7 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage)
// Log nothing
}

using (var adminClient = new AdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build())
using (var adminClient = GetAdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build())
{
var maxRequestTimeTopic = max - DateTime.UtcNow;
var topicConfig = await adminClient.DescribeConfigsAsync(new ConfigResource[]
Expand Down

0 comments on commit 5f7d6bd

Please sign in to comment.