From 5f7d6bd897f9c01d0489ad5770c697386b5aeb83 Mon Sep 17 00:00:00 2001 From: PeterN <79838809+peter-quix@users.noreply.github.com> Date: Fri, 26 Apr 2024 12:49:05 +0100 Subject: [PATCH] Filter out consumer/producer specific config for admin client (#51) --- builds/csharp/nuget/build_nugets.py | 6 +++--- src/QuixStreams.Kafka/KafkaConsumer.cs | 14 +++++++++++++- src/QuixStreams.Kafka/KafkaProducer.cs | 16 ++++++++++++++-- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/builds/csharp/nuget/build_nugets.py b/builds/csharp/nuget/build_nugets.py index 76c0a0c..065e0ec 100644 --- a/builds/csharp/nuget/build_nugets.py +++ b/builds/csharp/nuget/build_nugets.py @@ -6,9 +6,9 @@ import fileinput from typing import List -version = "0.7.1.0" -informal_version = "0.7.1.0" -nuget_version = "0.7.1.0" +version = "0.7.2.0" +informal_version = "0.7.2.0-dev1" +nuget_version = "0.7.2.0-dev1" def updatecsproj(projfilepath): diff --git a/src/QuixStreams.Kafka/KafkaConsumer.cs b/src/QuixStreams.Kafka/KafkaConsumer.cs index bf4375b..abeea5d 100644 --- a/src/QuixStreams.Kafka/KafkaConsumer.cs +++ b/src/QuixStreams.Kafka/KafkaConsumer.cs @@ -253,7 +253,7 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage) { // Log nothing } - adminClient = new AdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build(); + adminClient = GetAdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build(); } var metadata = adminClient.GetMetadata(partition.Topic, TimeSpan.FromSeconds(5)); if (metadata == null) @@ -342,6 +342,18 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage) this.logger.LogTrace("[{0}] Open finished", this.configId); } } + + private AdminClientBuilder GetAdminClientBuilder(ConsumerConfig config) + { + var filteredConfig = config.Where(prop => + !prop.Key.StartsWith("dotnet.producer.") && + !prop.Key.StartsWith("dotnet.consumer.")) + .ToDictionary(k => k.Key, v => v.Value); + + var adminConfig = new ConsumerConfig(filteredConfig); + + return new AdminClientBuilder(adminConfig); + } private void AutomaticOffsetsCommittedHandler(IConsumer consumer, CommittedOffsets offsets) { diff --git a/src/QuixStreams.Kafka/KafkaProducer.cs b/src/QuixStreams.Kafka/KafkaProducer.cs index fa8190f..7ea2ae0 100644 --- a/src/QuixStreams.Kafka/KafkaProducer.cs +++ b/src/QuixStreams.Kafka/KafkaProducer.cs @@ -157,7 +157,7 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage) // Log nothing } - using var adminClient = new AdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build(); + using var adminClient = GetAdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build(); var metadata = adminClient.GetMetadata(topic, TimeSpan.FromSeconds(10)); if (metadata == null) @@ -182,6 +182,18 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage) } } + private AdminClientBuilder GetAdminClientBuilder(ProducerConfig config) + { + var filteredConfig = config.Where(prop => + !prop.Key.StartsWith("dotnet.producer.") && + !prop.Key.StartsWith("dotnet.consumer.")) + .ToDictionary(k => k.Key, v => v.Value); + + var adminConfig = new ProducerConfig(filteredConfig); + + return new AdminClientBuilder(adminConfig); + } + private async Task UpdateMaxMessageSize(TimeSpan maxWait) { var max = DateTime.UtcNow.Add(maxWait); @@ -192,7 +204,7 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage) // Log nothing } - using (var adminClient = new AdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build()) + using (var adminClient = GetAdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build()) { var maxRequestTimeTopic = max - DateTime.UtcNow; var topicConfig = await adminClient.DescribeConfigsAsync(new ConfigResource[]