Skip to content

Commit 09f2a05

Browse files
authored
do not throw exception for existing cap-msg-group message header. overwrite it instead. tolerate kafka message duplicate headers (#1623)
1 parent 40a56d2 commit 09f2a05

File tree

6 files changed

+8
-7
lines changed

6 files changed

+8
-7
lines changed

src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ Task Consume()
111111

112112
var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null);
113113

114-
message.Headers.Add(Headers.Group, _groupId);
114+
message.Headers[Headers.Group] = _groupId;
115115

116116
return OnMessageCallback!(message, response.Messages[0].ReceiptHandle);
117117
}

src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private TransportMessage ConvertMessage(ServiceBusReceivedMessage message)
278278
var headers = message.ApplicationProperties
279279
.ToDictionary(x => x.Key, y => y.Value?.ToString());
280280

281-
headers.Add(Headers.Group, _subscriptionName);
281+
headers[Headers.Group] = _subscriptionName;
282282

283283
if (_asbOptions.CustomHeadersBuilder != null)
284284
{

src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ private async Task Consume(ConsumeResult<string, byte[]> consumerResult)
171171
foreach (var header in consumerResult.Message.Headers)
172172
{
173173
var val = header.GetValueBytes();
174-
headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null);
174+
headers[header.Key] = val != null ? Encoding.UTF8.GetString(val) : null;
175175
}
176176

177-
headers.Add(Headers.Group, _groupId);
177+
headers[Headers.Group] = _groupId;
178178

179179
if (_kafkaOptions.CustomHeadersBuilder != null)
180180
{

src/DotNetCore.CAP.NATS/NATSConsumerClient.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ Task Consume()
166166
headers.Add(h, e.Message.Header[h]);
167167
}
168168

169-
headers.Add(Headers.Group, _groupName);
169+
headers[Headers.Group] = _groupName;
170170

171171
if (_natsOptions.CustomHeadersBuilder != null)
172172
{

src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ Task Consume()
7878
headers.Add(header.Key, header.Value);
7979
}
8080

81-
headers.Add(Headers.Group, _groupId);
81+
headers[Headers.Group] = _groupId;
8282

8383
var message = new TransportMessage(headers, consumerResult.Data);
8484

src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Generic;
66
using System.Text;
7+
using System.Text.RegularExpressions;
78
using System.Threading;
89
using System.Threading.Tasks;
910
using DotNetCore.CAP.Messages;
@@ -67,7 +68,7 @@ Task Consume()
6768
headers.Add(header.Key, header.Value?.ToString());
6869
}
6970

70-
headers.Add(Messages.Headers.Group, _groupName);
71+
headers[Messages.Headers.Group] = _groupName;
7172

7273
if (_customHeadersBuilder != null)
7374
{

0 commit comments

Comments
 (0)